Skip to content

Commit b703066

Browse files
Adding Pandas sample file
1 parent fc488ca commit b703066

File tree

12 files changed

+328
-0
lines changed

12 files changed

+328
-0
lines changed

pd_sql/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pyodbc/
2+
venv/

pd_sql/__init__.py

Whitespace-only changes.

pd_sql/__main__.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import argparse
2+
from pathlib import Path
3+
from app import App
4+
5+
6+
def parse_arguments():
7+
parser = argparse.ArgumentParser(description='Liability Cash Flow Generation Tool')
8+
parser.add_argument('--log',
9+
action='store_true',
10+
help='Log messages')
11+
parser.add_argument('--debug',
12+
action='store_true',
13+
help='Save files in various stages into csv files')
14+
parser.add_argument('--debug_dir',
15+
required=False,
16+
help='Dir where to save CSV files to')
17+
parser.add_argument('--dbpwd',
18+
required=False,
19+
help='Password to SQL Server (for Linux ODBC drivers)')
20+
args = parser.parse_args()
21+
22+
return args.log, args.debug, args.debug_dir, args.dbpwd
23+
24+
25+
if __name__ == '__main__':
26+
"""
27+
This small application demonstrates how it's to read from a CSV file and
28+
save it into a database.
29+
NOTE:
30+
There is some overhead here with date type (datetime64) when read
31+
from a database which is relevant only in Linux and ok in Windows.
32+
33+
params: Supply password as a params when executed for Linux systems.
34+
No need in windows as you can save the password in the ODBC
35+
connection settings
36+
"""
37+
# Parse arguments
38+
logging_enabled, debug_enabled, debug_dir, dbpwd = parse_arguments()
39+
40+
print(f'Logging:{logging_enabled}')
41+
print(f'Debug:{debug_enabled}')
42+
print(f'Debug dir:{debug_dir}')
43+
44+
# Initialise a global app
45+
# this will hold the state and components of the app
46+
app = App(log_flag=logging_enabled,
47+
debug_flag=debug_enabled,
48+
debug_dir=debug_dir,
49+
dbpwd=dbpwd)
50+
51+
# Main implementation
52+
csv_path = Path('~/docker/test/exchange-rates/data')
53+
daily_csv_file = 'daily.csv'
54+
55+
# Read file to a DataFrame
56+
csv_df = app.get_csv_data(csv_path.joinpath(daily_csv_file))
57+
csv_df = csv_df.astype({'Date': 'datetime64'})
58+
csv_df = csv_df.rename(columns={'Date': 'RateDate'})
59+
csv_df = csv_df.fillna(0)
60+
# Check the data
61+
app.debug_df('Rates from CSV:', csv_df)
62+
63+
# Save Rates Into DB
64+
app.save_rates_into_db(csv_df)
65+
66+
67+
68+

pd_sql/app.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import logging
2+
import pandas as pd
3+
from pathlib import Path
4+
from time import time
5+
from functools import wraps
6+
7+
from db.connection import new_connection
8+
from db.lib import insert_rates, update_rates
9+
from db.Findb.Daily_rates import daily_rates_tbl, get_all_rates
10+
from util.config import load_config, get_config
11+
12+
13+
def timing(f):
14+
@wraps(f)
15+
def wrap(*args, **kw):
16+
logger = args[0].logger
17+
ts = time()
18+
logger.info('*' * 50)
19+
logger.info('Executing: {}()...'.format(f.__name__.upper()))
20+
logger.info('*' * 50)
21+
result = f(*args, **kw)
22+
te = time()
23+
logger.info('*' * 50)
24+
logger.info('{}() took: {:.0f}min {:.2f}sec '.format(f.__name__.upper(), (te-ts)/60, te-ts))
25+
logger.info('*' * 50)
26+
return result
27+
return wrap
28+
29+
30+
class App(object):
31+
32+
def __init__(self, log_flag=None, debug_flag=None, debug_dir=None, dbpwd=None):
33+
34+
self.logger = App.set_up_logger(log_flag, debug_flag)
35+
self.debug_enabled = debug_flag
36+
self.debug_dir = None
37+
38+
self.config = load_config('conf.ini')
39+
dsn_name = self.get_config('DSN', 'Name')
40+
self.db_connection = new_connection(dsn_name, dbpwd)
41+
42+
@staticmethod
43+
def set_up_logger(log_flag, debug_flag):
44+
45+
# Set logging level
46+
logging_level = logging.ERROR
47+
if log_flag:
48+
logging_level = logging.INFO
49+
50+
if debug_flag:
51+
logging_level = logging.DEBUG
52+
53+
# Setup logger
54+
logging.basicConfig(level=logging_level,
55+
format='%(asctime)-15s %(name)-5s %(levelname)-8s %(message)s')
56+
57+
logger = logging.getLogger(__name__)
58+
logger.setLevel(logging_level)
59+
60+
return logger
61+
62+
def set_up_debug_dir(self):
63+
dir_ = None
64+
if self.debug_enabled:
65+
dir_ = Path(self.debug_dir)
66+
if not dir_.exists():
67+
raise Exception(f"Dir: {self.debug_dir} does not exists!")
68+
return dir_
69+
70+
def debug_dump_df(self, name, df):
71+
72+
# Check if folder setup
73+
if not self.debug_dir:
74+
self.debug_dir = App.set_up_debug_dir()
75+
76+
if self.debug_enabled:
77+
file = self.debug_dir.joinpath(name).with_suffix('.csv')
78+
self.debug(f'Dumping into file: {file}')
79+
df.to_csv(file)
80+
81+
def debug(self, msg):
82+
self.logger.debug(msg)
83+
84+
def log(self, msg):
85+
self.logger.info(msg)
86+
87+
def debug_df(self, title, df):
88+
self.debug('\n' + title + ' - (head) \n' + df.head().to_string())
89+
90+
@timing
91+
def get_config(self, section, name):
92+
return get_config(self.config, section, name)
93+
94+
@staticmethod
95+
def get_csv_data(file):
96+
return pd.read_csv(file)
97+
98+
@timing
99+
def save_rates_into_db(self, rates_df):
100+
# Get rates from DB
101+
rates_db_df = get_all_rates(self.db_connection)
102+
rates_db_df = rates_db_df.astype({'RateDate': 'datetime64'})
103+
# Left Outer Join so we check for existence
104+
joined = pd.merge(rates_df,
105+
rates_db_df,
106+
left_on=['RateDate', 'Country'],
107+
right_on=['RateDate', 'Country'],
108+
how='left',
109+
suffixes=['', '_table'])
110+
111+
# Separate Inserts and updates
112+
cond = joined['Value_table'].isnull()
113+
inserts = joined[cond]
114+
updates = joined[~cond]
115+
116+
# Check
117+
self.debug_df('Inserts:', inserts)
118+
self.debug_df('Updates:', updates)
119+
120+
if inserts.index.size > 0:
121+
insert_rates(self.db_connection,
122+
daily_rates_tbl,
123+
inserts)
124+
125+
if updates.index.size > 0:
126+
update_rates(self.db_connection,
127+
daily_rates_tbl,
128+
updates)
129+
130+
def __del__(self):
131+
if self.db_connection:
132+
self.db_connection.close()
133+
del self.db_connection

pd_sql/conf.ini

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
3+
[DSN]
4+
Name=MSSQLServerDatabase

pd_sql/db/Findb/Daily_rates.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from sqlalchemy import DECIMAL, String, Date, MetaData
2+
from db.lib import LabeledColumn, LabeledTable, make_select_list, table_reader
3+
4+
db_schema = 'Findb.dbo'
5+
daily_rates_tbl = LabeledTable(
6+
'daily_rates', MetaData()
7+
, LabeledColumn('RateDate', Date, select_exp="format(RateDate, 'yyyyMMdd')")
8+
, LabeledColumn('Country', String)
9+
, LabeledColumn('Value', DECIMAL)
10+
, schema=db_schema
11+
)
12+
13+
14+
def get_all_rates(connection):
15+
select_list = make_select_list(daily_rates_tbl.select_columns_exp_dict)
16+
df = table_reader(
17+
connection
18+
, columns=select_list
19+
, table=f'{db_schema}.{daily_rates_tbl.name}'
20+
)
21+
return df
22+

pd_sql/db/Findb/__init__.py

Whitespace-only changes.

pd_sql/db/__init__.py

Whitespace-only changes.

pd_sql/db/connection.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import sqlalchemy as sqla
2+
import platform
3+
4+
5+
def new_connection(dsn_name, dbpwd=None):
6+
print('Creating new connection')
7+
8+
if platform.system() == 'Linux':
9+
return sqla.create_engine(f"mssql+pyodbc://sa:{dbpwd}@{dsn_name}", echo=True).connect()
10+
11+
return sqla.create_engine(f"mssql+pyodbc://{dsn_name}", echo=True).connect()
12+

pd_sql/db/lib.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import pandas as pd
2+
from sqlalchemy import Column, Table, and_
3+
from lib.converter import make_upsert_list, make_select_list
4+
5+
6+
class Db:
7+
def __init__(self, name, schema):
8+
self.name = name
9+
self.schema = schema
10+
self.db_schema = '.'.join([name, schema])
11+
12+
13+
class LabeledColumn(Column):
14+
15+
def __init__(self, name, *args, label=None, select_exp=None, **kwargs):
16+
Column.__init__(self, name, *args, **kwargs)
17+
self.label = label if label else name
18+
self.select_exp = select_exp
19+
20+
21+
class LabeledTable(Table):
22+
23+
def __init__(self, *args, **kwargs):
24+
Table.__init__(self, *args, **kwargs)
25+
26+
@property
27+
def labels(self):
28+
return [col.label for col in self.columns]
29+
30+
@property
31+
def column_label_dict(self):
32+
return {col.name: col.label for col in self.columns}
33+
34+
@property
35+
def select_columns_exp_dict(self):
36+
return {col.select_exp
37+
if col.select_exp
38+
else col.name: col.label
39+
for col in self.columns}
40+
41+
@property
42+
def select_columns_dict(self):
43+
return {col.name: col.label
44+
for col in self.columns}
45+
46+
47+
def table_reader(connection, table, columns, condition=''):
48+
cond = '' if condition == '' else f'where {condition}'
49+
sql = 'select {} from {} {}'.format(','.join(columns), table, cond)
50+
51+
return pd.read_sql(sql, connection)
52+
53+
54+
def insert_rates(connection, table, inserts):
55+
# Make dictionary of parameters for the inserts
56+
ins_params = make_upsert_list(inserts)
57+
connection.execute(
58+
table.insert(),
59+
ins_params)
60+
61+
62+
def update_rates(connection, table, updates):
63+
# Make dictionary of parameters for the updates
64+
upd_params = make_upsert_list(updates)
65+
# Execute statement (will autocommit)
66+
# Generate single updates
67+
for uld in upd_params:
68+
connection.execute(
69+
table.update().
70+
where(
71+
and_(
72+
table.c.RateDate == uld['RateDate'],
73+
table.c.Country == uld['Country'])),
74+
upd_params)

0 commit comments

Comments
 (0)