forked from robcarver17/pysystemtrade
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongo_IB_client_id.py
47 lines (38 loc) · 1.39 KB
/
mongo_IB_client_id.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from sysbrokers.IB.client.ib_client_id import ibBrokerClientIdData
from syscore.constants import arg_not_supplied
from sysdata.mongodb.mongo_generic import mongoDataWithSingleKey
from syslogging.logger import *
IB_CLIENT_COLLECTION = "IBClientTracker"
IB_ID_REF = "client_id"
class mongoIbBrokerClientIdData(ibBrokerClientIdData):
"""
Read and write data class to get next used client id
"""
def __init__(
self,
mongo_db=arg_not_supplied,
idoffset=arg_not_supplied,
log=get_logger("mongoIDTracker"),
):
super().__init__(log=log, idoffset=idoffset)
self._mongo_data = mongoDataWithSingleKey(
IB_CLIENT_COLLECTION, IB_ID_REF, mongo_db
)
@property
def mongo_data(self):
return self._mongo_data
def _repr__(self):
return "Tracking IB client IDs, mongodb %s" % (str(self.mongo_data))
def _get_list_of_clientids(self) -> list:
return self.mongo_data.get_list_of_keys()
def _lock_clientid(self, next_id: int):
self.mongo_data.add_data(next_id, {})
self.log.debug("Locked IB client ID %d" % next_id)
def release_clientid(self, clientid: int):
"""
Delete a client id lock
:param clientid:
:return: None
"""
self.mongo_data.delete_data_without_any_warning(clientid)
self.log.debug("Released IB client ID %d" % clientid)