-
Notifications
You must be signed in to change notification settings - Fork 853
/
Copy pathcsv_futures_contracts.py
77 lines (59 loc) · 2.58 KB
/
csv_futures_contracts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from syscore.fileutils import resolve_path_and_filename_for_package
from syscore.constants import arg_not_supplied
from sysdata.futures.contracts import futuresContractData, listOfFuturesContracts
from syslogging.logger import *
import pandas as pd
class csvFuturesContractData(futuresContractData):
"""
Only used for backup purposes at the moment
Therefore, there is no 'read' or other methods as yet
"""
def __init__(
self, datapath=arg_not_supplied, log=get_logger("csvFuturesContractData")
):
super().__init__(log=log)
if datapath is arg_not_supplied:
raise Exception("Need to pass datapath")
self._datapath = datapath
def __repr__(self):
return "Instruments data from %s" % self.datapath
@property
def datapath(self):
return self._datapath
def _filename_for_instrument_code(self, instrument_code: str):
return resolve_path_and_filename_for_package(
self.datapath, "%s.csv" % instrument_code
)
def write_contract_list_as_df(
self, instrument_code: str, contract_list: listOfFuturesContracts
):
list_of_expiry = [x.expiry_date.as_str() for x in contract_list]
list_of_contract_date = [x.date_str for x in contract_list]
list_of_sampling = [x.currently_sampling for x in contract_list]
df = pd.DataFrame(
dict(
date=list_of_contract_date,
sampling=list_of_sampling,
expiry=list_of_expiry,
)
)
filename = self._filename_for_instrument_code(instrument_code)
df.to_csv(filename)
def get_list_of_contract_dates_for_instrument_code(self, instrument_code):
raise NotImplementedError
def get_all_contract_objects_for_instrument_code(self, instrument_code):
raise NotImplementedError("used for backup only no read methods")
def _delete_contract_data_without_any_warning_be_careful(
self, instrument_code, contract_date
):
raise NotImplementedError(".csv are read only")
def is_contract_in_data(self, instrument_code, contract_date):
raise NotImplementedError("used for backup only no read methods")
def _add_contract_object_without_checking_for_existing_entry(self, contract_object):
raise NotImplementedError(
".csv can only write contract list en bloc use write_contract_list_as_df"
)
def _get_contract_data_without_checking(
self, instrument_code: str, contract_date: str
):
raise NotImplementedError("used for backup only no read methods")