forked from freenetwork/iqoption_api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
201 lines (160 loc) · 6.22 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# -*- coding: utf-8 -*-
"""Module for IQ option api."""
import time
import json
import logging
import requests
import threading
from iqoption_api.login import Login
from iqoption_api.websocket import Websocket
from iqoption_api.ssid import Ssid
from iqoption_api.subscribe import Subscribe
from iqoption_api.unsubscribe import UnSubscribe
from iqoption_api.setactives import SetActives
from iqoption_api.buy import Buy
# InsecureRequestWarning: Unverified HTTPS request is being made.
# Adding certificate verification is strongly advised.
# See: https://urllib3.readthedocs.org/en/latest/security.html
requests.packages.urllib3.disable_warnings()
class IQOptionAPI(object):
"""Class for communication with IQ option api."""
def __init__(self, host, username, password):
"""
:param str host: The hostname or ip address of a IQ option server.
:param str username: The username of a IQ option server.
:param str password: The password of a IQ option server.
"""
self.https_url = "https://%s/api" % host
self.wss_url = "wss://%s/echo/websocket" % host
self.websocket = None
self.session = requests.Session()
self.session.verify = False
self.session.trust_env = False
self.username = username
self.password = password
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36'}
def set_session_cookie(self, val):
for k,v in self.session.cookies.iteritems():
if not isinstance(v, str):
self.session.cookies[k] = str(v)
requests.utils.add_dict_to_cookiejar(self.session.cookies, val)
def prepare_url(self, resource):
"""
Construct url from resource url.
:param resource: :class:`Resource
<iqoption_api.resource.Resource>`.
:returns: The full url to IQ option http resource.
"""
return '/'.join((self.https_url, resource.url))
def send_http_request(self, resource, method, data=None, params=None, headers=None):
#pylint: disable=too-many-arguments
"""
Send http request to IQ option server.
:param resource: :class:`Resource <iqoption_api.resource.Resource>`.
:param str method: The HTTP request method.
:param dict data: (optional) The HTTP request data.
:param dict params: (optional) The HTTP request params.
:param dict headers: (optional) The HTTP request headers.
:returns: :class:`Response <requests.Response>`.
"""
url = self.prepare_url(resource)
logger = logging.getLogger(__name__)
logger.debug(url)
response = self.session.request(method=method,
url=url,
data=data,
params=params,
headers=headers)
logger.debug(response)
logger.debug(response.text)
logger.debug(response.headers)
logger.debug(response.cookies)
response.raise_for_status()
return response
def send_wss_request(self, name, msg):
"""
Send wss request to IQ option server.
:param chanel: :class:`Chanel <iqoption_api.chanel.Chanel>`.
:param dict data: The websocket request data.
:returns:
"""
data = json.dumps(dict(name=name,
msg=msg))
self.websocket.send(data)
@property
def login(self):
"""
Property for get IQ option login resource.
:returns: :class:`Login
<iqoption_api.login.Login>`.
"""
return Login(self)
@property
def ssid(self):
"""
Property for get IQ option websocket ssid chanel.
:returns: :class:`Ssid
<iqoption_api.ssid.Ssid>`.
"""
return Ssid(self)
@property
def subscribe(self):
"""
Property for get IQ option websocket subscribe chanel.
:returns: :class:`Subscribe
<iqoption_api.subscribe.Subscribe>`.
"""
return Subscribe(self)
@property
def unsubscribe(self):
"""
Property for get IQ option websocket unsubscribe chanel.
:returns: :class:`unsubscribe
<iqoption_api.unsubscribe.UnSubscribe>`.
"""
return UnSubscribe(self)
@property
def setactives(self):
"""
Property for get IQ option websocket setactives chanel.
:returns: :class:`setactives
<iqoption_api.setactives.SetActives>`.
"""
return SetActives(self)
@property
def buy(self):
"""
Property for get IQ option websocket buy chanel.
:returns: :class:`buy
<iqoption_api.buy.Buy>`.
"""
return Buy(self)
def _thread(self):
"""Method for websocket thread."""
self.websocket.run_forever()
def connect(self):
"""Method for connection to api."""
response = self.login(self.username, self.password)
ssid = response.cookies["ssid"]
self.set_session_cookie({"platform": "9", "platform_version": "387.2.4fc", "is_regulated": '0', "authorization":"1", "ru_restricted_popup_shown": "1", "aff": "1503", "afftrack": "home"})
websocket = Websocket(self.wss_url)
websocket.connect()
self.websocket = websocket
websocket_thread = threading.Thread(target=self._thread)
websocket_thread.daemon = True
websocket_thread.start()
self.ssid(ssid)
self.subscribe("deposited")
self.unsubscribe("deposited")
self.unsubscribe("iqguard")
self.unsubscribe("signal")
self.unsubscribe("feedRecentBets")
self.unsubscribe("feedRecentBets2")
self.unsubscribe("feedTopTraders2")
self.unsubscribe("feedRecentBetsMulti")
self.unsubscribe("timeSync")
self.setactives(99)
if(self.websocket.show_value != None and self.websocket.skey != None):
for i in range(60):
time.sleep(1)
self.buy(self.websocket.time, self.websocket.show_value)