forked from mvfc/ofx2csv
-
Notifications
You must be signed in to change notification settings - Fork 5
/
ofx2csv.py
185 lines (146 loc) · 6 KB
/
ofx2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env` python3
#
# Convert Quicken QFX/OFX file into CSV
#
# Origin code from whistler/ofx2csv.py
# https://gist.github.com/whistler/e7c21c70d1cbb9c4b15d
#
# NKN: Modified for more generically convert all data in transactions, and
# positions into two CSV files.
from datetime import datetime
from pathlib import Path
from decimal import Decimal
from csv import DictWriter
from glob import glob
from ofxparse import OfxParser
import json
import argparse
import pprint
def myDir(obj):
items = []
for key in dir(obj):
if not key.startswith('__'):
items.append(key)
return items
def showStructure(obj, depth : int =5):
if depth==0: return obj
if isinstance(obj, str): return obj
if isinstance(obj, int): return obj
if isinstance(obj, Decimal): return float(obj)
if isinstance(obj, list):
items = []
for item in list(obj):
items.append(showStructure(item, depth-1))
return items
if isinstance(obj, dict):
items = {}
for key in dict(obj):
items[key] = showStructure(obj[key], depth-1)
return items
items = {}
for key in dir(obj):
if not key.startswith('__'):
items[key] = showStructure(getattr(obj, key), depth-1)
return items
def get_statement_from_qfx(qfx : OfxParser):
#DBG print("qfx :{}".format(myDir(qfx )))
#DBG print("qfx.account :{}".format(myDir(qfx.account )))
#DBG print("qfx.account.statement :{}".format(myDir(qfx.account.statement )))
#DBG print("qfx.account.statement.transactions:{}".format(myDir(qfx.account.statement.transactions)))
#DBG print("accounts:")
#DBG pprint.pprint(showStructure(qfx.accounts, 2))
#DBG print("statement:")
#DBG pprint.pprint(showStructure(qfx.accounts[0].statement, 1))
#DBG print("transactions:")
#DBG pprint.pprint(showStructure(qfx.accounts[0].statement.transactions, 2))
#DBG print("positions:")
#DBG pprint.pprint(showStructure(qfx.accounts[0].statement.positions, 2))
#DBG print("security_list:")
#DBG pprint.pprint(showStructure(qfx.security_list))
# Read all security names
secId2Name = {}
for security in qfx.security_list:
secId2Name[security.uniqueid] = security.name
# Convert all accounts transactions into a table/list of activity
statement = []
for account in qfx.accounts:
for transaction in account.statement.transactions:
line = {}
for field in dir(account):
if not field.startswith('__') and field!=":":
val = getattr(account, field)
if isinstance(val, str): line[field] = val
if isinstance(val, Decimal): line[field] = float(val)
if isinstance(val, datetime): line[field] = val.strftime('%m/%d/%Y')
for field in dir(transaction):
if not field.startswith('__'):
val = getattr(transaction, field)
if field=="security": val = secId2Name[val]
if isinstance(val, str): line[field] = val
if isinstance(val, Decimal): line[field] = float(val)
if isinstance(val, datetime): line[field] = val.strftime('%m/%d/%Y')
#if not statement: print("{}".format(line.keys()))
#print("{}".format(line.values()))
statement.append(line)
return statement
def get_positions_from_qfx(qfx : OfxParser):
#DBG print("positions:")
#DBG pprint.pprint(showStructure(qfx.accounts[0].statement.positions, 2))
#DBG print("security_list:")
#DBG pprint.pprint(showStructure(qfx.security_list))
# Read all security names
secId2Name = {}
for security in qfx.security_list:
secId2Name[security.uniqueid] = security.name
# Convert all accounts positions into a table/list of activity
positions = []
for account in qfx.accounts:
for position in account.statement.positions:
line = {}
for field in dir(account):
if not field.startswith('__') and field!=":":
val = getattr(account, field)
if isinstance(val, str): line[field] = val
if isinstance(val, Decimal): line[field] = float(val)
if isinstance(val, datetime): line[field] = val.strftime('%m/%d/%Y')
for field in dir(position):
if not field.startswith('__'):
val = getattr(position, field)
if field=="security": val = secId2Name[val]
if isinstance(val, str): line[field] = val
if isinstance(val, Decimal): line[field] = float(val)
if isinstance(val, datetime): line[field] = val.strftime('%m/%d/%Y')
#if not positions: print("{}".format(line.keys()))
#print("{}".format(line.values()))
positions.append(line)
return positions
def save_files(table, outputtype, out_file):
if outputtype == 'csv':
print(" Save to {}...".format(out_file))
with out_file.open('w', newline='') as f:
writer = DictWriter(f
, fieldnames=table[0].keys()
, extrasaction='ignore')
writer.writeheader()
for line in table: writer.writerow(line)
elif outputtype == 'json':
print(" Save to {}...".format(out_file))
with out_file.open('w') as f: json.dump(table, f)
else:
print('{} type unsupported'.format(outputtype))
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("-o", "--outputtype", help = "csv or json", default="csv")
argparser.add_argument("-i", "--input", nargs='+', help = "input file(s)", default=["*.qfx"])
args = argparser.parse_args()
outputtype = args.outputtype
for input in args.input:
files = glob(input)
for qfx_file in files:
print("Reading {}...".format(qfx_file))
qfx = OfxParser.parse(open(qfx_file, encoding="latin-1"))
statement = get_statement_from_qfx(qfx)
save_files(statement, outputtype, Path(qfx_file).with_suffix('.' + outputtype))
positions = get_positions_from_qfx(qfx)
save_files(positions, outputtype, Path(qfx_file).with_suffix('.positions.' + outputtype))
main()