This repository has been archived by the owner on Jul 25, 2024. It is now read-only.
forked from Azure-Samples/azure-iot-samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
137 lines (109 loc) · 5.46 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
import logging
import time
import json
import sys
import boto3
def updateDeviceState(device_seq, device_state):
host = "a3mfkf3z93nqt8.iot.us-west-2.amazonaws.com"
rootCAPath = "certs/root-CA.crt"
certificatePath = "certs/team55device%s.cert.pem" % (device_seq+1)
privateKeyPath = "certs/team55device%s.private.key" % (device_seq+1)
clientId = "team55"
thingName = "team55device%s" % (device_seq+1)
# Custom Shadow callback
def customShadowCallback_Update(payload, responseStatus, token):
# payload is a JSON string ready to be parsed using json.loads(...)
# in both Py2.x and Py3.x
if responseStatus == "timeout":
print("Update request " + token + " time out!")
if responseStatus == "accepted":
payloadDict = json.loads(payload)
print("~~~~~~~~~~~~~~~~~~~~~~~")
print("Update request with token: " + token + " accepted!")
print("property: " + json.dumps(payloadDict["state"]["desired"]))
print("~~~~~~~~~~~~~~~~~~~~~~~\n\n")
if responseStatus == "rejected":
print("Update request " + token + " rejected!")
def customShadowCallback_Delete(payload, responseStatus, token):
if responseStatus == "timeout":
print("Delete request " + token + " time out!")
if responseStatus == "accepted":
print("~~~~~~~~~~~~~~~~~~~~~~~")
print("Delete request with token: " + token + " accepted!")
print("~~~~~~~~~~~~~~~~~~~~~~~\n\n")
if responseStatus == "rejected":
print("Delete request " + token + " rejected!")
# logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTShadowClient
myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId)
myAWSIoTMQTTShadowClient.configureEndpoint(host, 8883)
myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTShadowClient configuration
myAWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTShadowClient.configureMQTTOperationTimeout(5) # 5 sec
myAWSIoTMQTTShadowClient.connect()
# Create a deviceShadow with persistent subscription
deviceShadowHandler = myAWSIoTMQTTShadowClient.createShadowHandlerWithName(thingName, True)
# Delete shadow JSON doc
deviceShadowHandler.shadowDelete(customShadowCallback_Delete, 5)
deviceShadowHandler.shadowUpdate(json.dumps({"state": {"desired": device_state}}), customShadowCallback_Update, 5)
time.sleep(5)
myAWSIoTMQTTShadowClient.disconnect()
def device_state(low, high):
return {"median": (low + high) / 2, "low_median": low, "high_median": high}
N = 1000000
num_devices = 2
low_median = 0
high_median = N
device_ids = range(0, num_devices)
if len(sys.argv) > 1 and 'init' == sys.argv[1]:
if len(sys.argv) > 2:
updateDeviceState(int(sys.argv[2]), device_state(low_median, high_median))
else:
for device_id in device_ids:
updateDeviceState(device_id, device_state(low_median, high_median))
else:
sqs_client = boto3.client('sqs')
while True:
approx_median = (low_median + high_median) / 2
counts = {str(device_id+1): None for device_id in device_ids}
while any([count is None for count in counts.values()]):
messages = sqs_client.receive_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/617297736688/hack1', MaxNumberOfMessages=10)
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
for message in messages['Messages']: # 'Messages' is a list
# next, we delete the message from the queue so no one else will process it again
device_body = json.loads(message['Body'])
print(device_body)
counts[device_body['device']] = device_body['message']['counts']
print(list([count is None for count in counts.values()]))
sqs_client.delete_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/617297736688/hack1', ReceiptHandle=message['ReceiptHandle'])
time.sleep(5)
lcount, ecount, hcount = 0, 0, 0
for device_id, count in counts.iteritems():
low, eq, high = count
lcount += low
hcount += high
ecount += eq
print(low_median, lcount, approx_median, ecount, hcount, high_median)
if lcount < hcount:
low_median = approx_median
elif hcount < lcount:
high_median = approx_median
print("Median: " + str(approx_median))
if approx_median == (low_median + high_median) / 2:
if (ecount == 0) and ((lcount + hcount) % 2 == 0):
print("Median %f: " % (low_median + high_median) / 2.0)
else:
print("Median %d: " % approx_median)
break
for device_id in device_ids:
updateDeviceState(device_id, updateDeviceState(device_id, device_state(low_median, high_median)))