-
Notifications
You must be signed in to change notification settings - Fork 853
/
Copy pathib_connection.py
133 lines (107 loc) · 4.1 KB
/
ib_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
IB connection using ib-insync https://ib-insync.readthedocs.io/api.html
"""
import time
from ib_insync import IB
from sysbrokers.IB.ib_connection_defaults import ib_defaults
from syscore.exceptions import missingData
from syscore.constants import arg_not_supplied
from syslogging.logger import *
from sysdata.config.production_config import get_production_config
class connectionIB(object):
"""
Connection object for connecting IB
(A database plug in will need to be added for streaming prices)
"""
def __init__(
self,
client_id: int,
ib_ipaddress: str = arg_not_supplied,
ib_port: int = arg_not_supplied,
account: str = arg_not_supplied,
log_name: str = "connectionIB",
):
"""
:param client_id: client id
:param ipaddress: IP address of machine running IB Gateway or TWS. If not passed then will get from private config file, or defaults
:param port: Port listened to by IB Gateway or TWS
:param log_name: calling log name
:param mongo_db: mongoDB connection
"""
# resolve defaults
ipaddress, port, __ = ib_defaults(ib_ipaddress=ib_ipaddress, ib_port=ib_port)
self._ib_connection_config = dict(
ipaddress=ipaddress, port=port, client=client_id
)
# The client id is pulled from a mongo database
# If for example you want to use a different database you could do something like:
# connectionIB(mongo_ib_tracker =
# mongoIBclientIDtracker(database_name="another")
# If you copy for another broker include these lines
self._log = get_logger(
"connectionIB",
{
TYPE_LOG_LABEL: log_name,
BROKER_LOG_LABEL: "IB",
CLIENTID_LOG_LABEL: client_id,
},
)
# You can pass a client id yourself, or let IB find one
try:
self._init_connection(
ipaddress=ipaddress, port=port, client_id=client_id, account=account
)
except Exception as e:
# Log all exceptions generated during connection as critical error.
# Under the default production setup this should send an email.
# Error is reraised as we can't really continue and user intervention is required
self.log.critical(
f"IB connection falied with exception - {e}, connection aborted."
)
raise
def _init_connection(
self, ipaddress: str, port: int, client_id: int, account=arg_not_supplied
):
ib = IB()
try:
if account is arg_not_supplied:
## not passed get from config
account = get_broker_account()
except missingData:
self.log.error(
"Broker account ID not found in private config - may cause issues"
)
ib.connect(ipaddress, port, clientId=client_id)
else:
## conncect using account
ib.connect(ipaddress, port, clientId=client_id, account=account)
# Sometimes takes a few seconds to resolve... only have to do this once per process so no biggie
time.sleep(5)
self._ib = ib
self._account = account
@property
def ib(self):
return self._ib
@property
def log(self):
return self._log
def __repr__(self):
return "IB broker connection" + str(self._ib_connection_config)
def client_id(self):
return self._ib_connection_config["client"]
@property
def account(self):
return self._account
def close_connection(self):
self.log.debug("Terminating %s" % str(self._ib_connection_config))
try:
# Try and disconnect IB client
self.ib.disconnect()
except BaseException:
self.log.warning(
"Trying to disconnect IB client failed... ensure process is killed"
)
def get_broker_account() -> str:
production_config = get_production_config()
account_id = production_config.get_element("broker_account")
return account_id