-
Notifications
You must be signed in to change notification settings - Fork 853
/
Copy pathoptimal_positions.py
187 lines (151 loc) · 6.5 KB
/
optimal_positions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Optimal positions describe what a particular trading strategy would like to do, conditional (or not) on prices and
current positions
The exact implementation of this depends on the strategy.
A basic class is an optimal position with buffers around it.
A mean reversion style class would include price buffers
"""
import pandas as pd
from syscore.exceptions import missingData
from sysdata.base_data import baseData
from sysobjects.production.optimal_positions import (
baseOptimalPosition,
from_df_row_to_optimal_position,
add_optimal_position_entry_row_to_positions_as_df,
instrumentStrategyAndOptimalPosition,
listOfOptimalPositionsAcrossInstrumentStrategies,
)
from sysobjects.production.tradeable_object import (
listOfInstrumentStrategies,
instrumentStrategy,
)
class optimalPositionData(baseData):
"""
Store and retrieve the optimal positions assigned to a particular strategy
We store the type of list in the data
"""
def __repr__(self):
return "optimalPositionData object"
def get_list_of_optimal_positions_for_strategy(
self, strategy_name: str
) -> listOfOptimalPositionsAcrossInstrumentStrategies:
list_of_instrument_strategies = (
self.get_list_of_instrument_strategies_for_strategy_with_optimal_position(
strategy_name
)
)
list_of_instrument_strategies_with_positions = (
self.get_list_of_optimal_positions_given_list_of_instrument_strategies(
list_of_instrument_strategies
)
)
return list_of_instrument_strategies_with_positions
def get_list_of_optimal_positions(
self,
) -> listOfOptimalPositionsAcrossInstrumentStrategies:
list_of_instrument_strategies = (
self.get_list_of_instrument_strategies_with_optimal_position()
)
list_of_optimal_positions_and_instrument_strategies = (
self.get_list_of_optimal_positions_given_list_of_instrument_strategies(
list_of_instrument_strategies
)
)
return list_of_optimal_positions_and_instrument_strategies
def get_list_of_optimal_positions_given_list_of_instrument_strategies(
self, list_of_instrument_strategies: listOfInstrumentStrategies
) -> listOfOptimalPositionsAcrossInstrumentStrategies:
list_of_optimal_positions_and_instrument_strategies = [
self.get_instrument_strategy_and_optimal_position(instrument_strategy)
for instrument_strategy in list_of_instrument_strategies
]
list_of_optimal_positions_and_instrument_strategies = (
listOfOptimalPositionsAcrossInstrumentStrategies(
list_of_optimal_positions_and_instrument_strategies
)
)
return list_of_optimal_positions_and_instrument_strategies
def get_instrument_strategy_and_optimal_position(
self, instrument_strategy: instrumentStrategy
) -> instrumentStrategyAndOptimalPosition:
optimal_position = self.get_current_optimal_position_for_instrument_strategy(
instrument_strategy
)
instrument_strategy_and_optimal_position = instrumentStrategyAndOptimalPosition(
instrument_strategy, optimal_position
)
return instrument_strategy_and_optimal_position
def get_list_of_instruments_for_strategy_with_optimal_position(
self, strategy_name: str
) -> list:
list_of_instrument_strategies = (
self.get_list_of_instrument_strategies_with_optimal_position()
)
list_of_instruments = (
list_of_instrument_strategies.get_list_of_instruments_for_strategy(
strategy_name
)
)
return list_of_instruments
def list_of_strategies_with_optimal_position(self) -> list:
list_of_instrument_strategies = (
self.get_list_of_instrument_strategies_with_optimal_position()
)
list_of_strategies = list_of_instrument_strategies.get_list_of_strategies()
return list_of_strategies
def get_list_of_instrument_strategies_for_strategy_with_optimal_position(
self, strategy_name: str
) -> listOfInstrumentStrategies:
list_of_instrument_strategies = (
self.get_list_of_instrument_strategies_with_optimal_position()
)
list_of_instrument_strategies_for_strategy = list_of_instrument_strategies.get_list_of_instrument_strategies_for_strategy(
strategy_name
)
return list_of_instrument_strategies_for_strategy
def get_current_optimal_position_for_instrument_strategy(
self, instrument_strategy: instrumentStrategy
) -> baseOptimalPosition:
existing_optimal_positions_as_df = (
self.get_optimal_position_as_df_for_instrument_strategy(instrument_strategy)
)
final_position_row = existing_optimal_positions_as_df.iloc[-1, :]
optimal_position = from_df_row_to_optimal_position(final_position_row)
return optimal_position
def update_optimal_position_for_instrument_strategy(
self,
instrument_strategy: instrumentStrategy,
position_entry: baseOptimalPosition,
):
try:
existing_optimal_positions_as_df = (
self.get_optimal_position_as_df_for_instrument_strategy(
instrument_strategy
)
)
updated_optimal_positions_as_df = (
add_optimal_position_entry_row_to_positions_as_df(
existing_optimal_positions_as_df, position_entry
)
)
except missingData:
#### Starting from scracth
updated_optimal_positions_as_df = position_entry.as_df_row()
self.write_optimal_position_as_df_for_instrument_strategy_without_checking(
instrument_strategy=instrument_strategy,
optimal_positions_as_df=updated_optimal_positions_as_df,
)
def get_list_of_instrument_strategies_with_optimal_position(
self,
) -> listOfInstrumentStrategies:
raise NotImplementedError
def get_optimal_position_as_df_for_instrument_strategy(
self, instrument_strategy: instrumentStrategy
) -> pd.DataFrame:
raise NotImplementedError
def write_optimal_position_as_df_for_instrument_strategy_without_checking(
self,
instrument_strategy: instrumentStrategy,
optimal_positions_as_df: pd.DataFrame,
) -> pd.DataFrame:
raise NotImplementedError