-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPyBackendless.py
232 lines (201 loc) · 8.03 KB
/
PyBackendless.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# -*- coding: utf-8 -*-
"""
Author: Blake Anderson, Mar 30 2016, [email protected]
Updated: Aug 11, 2017 with 4.X compatibility
Wrapper for Backendless REST API basic user functionality including:
1) Registering user
2) Logging in an existing user
3) Updating user objects
4) Logging out
https://github.com/blakebjorn/Backendless-Python
Pretty much any pull request will be accepted
"""
from __future__ import print_function
import json
import os
import pickle
import requests
class Backendless():
def __init__(self, application_id, rest_api_key, time_out=30, verbose=True):
self.applicationId = application_id
self.restAPIkey = rest_api_key
self.timeOut = time_out # network timeout on requests
self.baseUrl = "https://api.backendless.com/{}/{}".format(self.applicationId, self.restAPIkey)
self.generalHeaders = {'Content-type': 'application/json'}
self.activeLogin = False
self.verbose = verbose
self.userToken = ""
def initialize_user(self, response):
self.objectId = response['objectId']
self.userToken = response['user-token']
self.userData = response
self.userHeaders = dict(self.generalHeaders)
self.userHeaders['user-token'] = self.userToken
self.activeLogin = True
def post_request(self, url, headers, data):
try:
response = requests.post(url, data=data, headers=headers, timeout=self.timeOut)
except Exception as e:
if self.verbose:
print(e)
response = self.handle_response(e)
return response
def put_request(self, url, headers, data):
try:
response = requests.put(url, data=data, headers=headers, timeout=self.timeOut)
except Exception as e:
if self.verbose:
print(e)
response = self.handle_response(e)
return response
def get_request(self, url, headers):
try:
response = requests.get(url, headers=headers, timeout=self.timeOut)
except Exception as e:
if self.verbose:
print(e)
response = self.handle_response(e)
return response
def handle_response(self, e):
if e == requests.exceptions.Timeout:
response = {'error': 'CONNECTION_TIMEOUT'}
elif e == requests.exceptions.ConnectionError:
response = {'error': 'CONNECTION_ERROR'}
elif e == requests.exceptions.HTTPError:
response = {'error': 'HTTP_ERROR'}
elif e == requests.exceptions.TooManyRedirects:
response = {'error': 'TOO_MANY_REDIRECTS_ERROR'}
elif e == requests.exceptions.RequestException:
response = {'error': 'UNSPECIFIED_REQUEST_ERROR'}
else:
response = {'error': 'UNKNOWN_ERROR: ' + str(e)}
return response
def register_user(self, data):
"""
Creates new user object.
:param data: dictionary with format {"email":"[email protected]","password":"foo"}, where "email" is the "identity" entry
:type data: dict
:return: Json response
:rtype : dict
"""
requestUrl = self.baseUrl + "/users/register"
headers = self.generalHeaders
response = self.post_request(requestUrl, headers, json.dumps(data))
if type(response) == requests.models.Response:
response = response.json()
return response
def login_user(self, data):
"""
Logs in to an existing user. Saves user ID, login token, and user data to the variables: objectId, userToken, and userData, respectively.
:param data: dictionary with format {"login":"[email protected]","password":"foo"}. The "login" value is the user "identity" entry
:type data: dict
:return: Json response
:rtype : dict
"""
if not self.activeLogin:
requestUrl = self.baseUrl + "/users/login"
headers = self.generalHeaders
response = self.post_request(requestUrl, headers, json.dumps(data))
if type(response) == requests.models.Response:
if response.status_code == 200:
self.initialize_user(response.json())
response = response.json()
return response
else:
return {'error': 'Must log out before logging in again'}
def update_user_object(self, data):
"""
Updates a field for the logged in user.
:param data: dictionary with format {"field":"value","field2":"value2"}
:type data: dict
:return: Json response
:rtype : dict
"""
if self.activeLogin:
headers = self.userHeaders
requestUrl = self.baseUrl + "/users/" + str(self.objectId)
response = self.put_request(requestUrl, headers, json.dumps(data))
if type(response) == requests.models.Response:
response = response.json()
return response
else:
return {'error': "Must log in before updating user objects"}
def logout(self):
"""
Logs out the active user.
:return: Json response
:rtype : dict
"""
if self.activeLogin:
requestUrl = self.baseUrl + "/users/logout"
headers = dict(self.userHeaders)
headers.pop("Content-type")
response = self.get_request(requestUrl, headers)
if response.status_code == 200:
self.activeLogin = False
response = {'message': 'Logout Successful'}
if type(response) == requests.models.Response:
response = response.json()
return response
else:
return {'error': "Must log in before logging out users"}
def validate_session(self):
"""
Checks if the loaded user-token is valid.
:return: Text response
:rtype : str
"""
if self.userToken != "":
requestUrl = self.baseUrl + "/users/isvalidusertoken/" + self.userToken
headers = dict(self.generalHeaders)
headers.pop("Content-type")
response = self.get_request(requestUrl, headers)
if type(response) == requests.models.Response and response.status_code == 200:
if response.text == 'true':
self.activeLogin = True
return response.text
else:
return response
else:
return "No loaded user token"
def create_token(self, fileName='token.p'):
"""
Saves all data returned from initial log-in response to a pickled object. Returns True on successful file write
:return: Boolean
:rtype : bool
"""
if self.activeLogin:
try:
with open(fileName, 'wb') as file_:
pickle.dump(self.userData, file_)
return True
except Exception as e:
print('Error writing token, exception:', e)
return False
else:
return False
def read_token(self, fileName='token.p'):
"""
Reads a pickled log-in object, and initializes the user.
Session must be validated elsewhere to refresh the user login token.
Returns True on successful file read/user initialization.
:return: Boolean
:rtype : bool
"""
if os.path.isfile(fileName):
try:
with open(fileName, 'rb') as file_:
response = pickle.load(file_)
self.loginResponse = response
self.objectId = response['objectId']
self.userToken = response['user-token']
self.userData = response
self.userHeaders = dict(self.generalHeaders)
self.userHeaders['user-token'] = self.userToken
return True
except Exception as e:
print('Error reading token, exception:', e)
return False
else:
print(fileName, 'does not exist.')
return False