From aabb34f3e8716820622999b0baf0485a7ac8a2f3 Mon Sep 17 00:00:00 2001 From: Carter Tinney Date: Thu, 5 Sep 2019 13:49:49 -0700 Subject: [PATCH] Changed Python quickstarts and tutorials to use V2 SDK (#15) * Changed quickstarts to use V2 SDK Edge tutorial compatibility with V2 SDK * fixed message bug --- iot-edge/Tutorials/PythonFilter/main.py | 130 +++++------------- .../Tutorials/PythonFilter/requirements.txt | 2 +- .../simulated-device-2/SimulatedDevice.py | 88 ++++++------ .../simulated-device/SimulatedDevice.py | 34 ++--- 4 files changed, 86 insertions(+), 168 deletions(-) diff --git a/iot-edge/Tutorials/PythonFilter/main.py b/iot-edge/Tutorials/PythonFilter/main.py index a183fbe..6f67986 100644 --- a/iot-edge/Tutorials/PythonFilter/main.py +++ b/iot-edge/Tutorials/PythonFilter/main.py @@ -4,121 +4,57 @@ # For guidance, see https://docs.microsoft.com/azure/iot-edge/tutorial-python-module -import os -import random -import time import sys -import iothub_client -from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider -from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError - -# messageTimeout - the maximum time in milliseconds until a message times out. -# The timeout period starts at IoTHubClient.send_event_async. -# By default, messages do not expire. -MESSAGE_TIMEOUT = 10000 +import time +import threading +from azure.iot.device import IoTHubModuleClient, Message # global counters -RECEIVE_CALLBACKS = 0 -SEND_CALLBACKS = 0 - -# Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported. -PROTOCOL = IoTHubTransportProvider.MQTT - -# String containing Hostname, Device Id & Device Key & Module Id in the format: -# "HostName=;DeviceId=;SharedAccessKey=;ModuleId=;GatewayHostName=" -CONNECTION_STRING = "[Device Connection String]" - -# Callback received when the message that we're forwarding is processed. -def send_confirmation_callback(message, result, user_context): - global SEND_CALLBACKS - print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) ) - map_properties = message.properties() - key_value_pair = map_properties.get_internals() - print ( " Properties: %s" % key_value_pair ) - SEND_CALLBACKS += 1 - print ( " Total calls confirmed: %d" % SEND_CALLBACKS ) - - -# receive_message_callback is invoked when an incoming message arrives on the specified -# input queue (in the case of this sample, "input1"). Because this is a filter module, -# we will forward this message onto the "output1" queue. -def receive_message_callback(message, hubManager): - global RECEIVE_CALLBACKS - message_buffer = message.get_bytearray() - size = len(message_buffer) - print ( " Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) ) - map_properties = message.properties() - key_value_pair = map_properties.get_internals() - print ( " Properties: %s" % key_value_pair ) - RECEIVE_CALLBACKS += 1 - print ( " Total calls received: %d" % RECEIVE_CALLBACKS ) - hubManager.forward_event_to_output("output1", message, 0) - return IoTHubMessageDispositionResult.ACCEPTED - - -class HubManager(object): - - def __init__( - self, - connection_string): - self.client_protocol = PROTOCOL - self.client = IoTHubClient(connection_string, PROTOCOL) - - # set the time until a message times out - self.client.set_option("messageTimeout", MESSAGE_TIMEOUT) - # some embedded platforms need certificate information - self.set_certificates() - - # sets the callback when a message arrives on "input1" queue. Messages sent to - # other inputs or to the default will be silently discarded. - self.client.set_message_callback("input1", receive_message_callback, self) - - def set_certificates(self): - isWindows = sys.platform.lower() in ['windows', 'win32'] - if not isWindows: - CERT_FILE = os.environ['EdgeModuleCACertificateFile'] - print("Adding TrustedCerts from: {0}".format(CERT_FILE)) - - # this brings in x509 privateKey and certificate - file = open(CERT_FILE) - try: - self.client.set_option("TrustedCerts", file.read()) - print ( "set_option TrustedCerts successful" ) - except IoTHubClientError as iothub_client_error: - print ( "set_option TrustedCerts failed (%s)" % iothub_client_error ) - - file.close() - - # Forwards the message received onto the next stage in the process. - def forward_event_to_output(self, outputQueueName, event, send_context): - self.client.send_event_async( - outputQueueName, event, send_confirmation_callback, send_context) - -def main(connection_string): +RECEIVED_MESSAGES = 0 + +def receive_message_listener(client): + # This listener function only triggers for messages sent to "input1". + # Messages sent to other inputs or to the default will be silently discarded. + global RECEIVED_MESSAGES + while True: + message = client.receive_message_on_input("input1") # blocking call + RECEIVED_MESSAGES += 1 + print("Message received on input1") + print( " Data: <<{}>>".format(message.data) ) + print( " Properties: {}".format(message.custom_properties)) + print( " Total calls received: {}".format(RECEIVED_MESSAGES)) + print("Forwarding message to output1") + client.send_message_to_output(message, "output1") + print("Message successfully forwarded") + +def main(): try: - print ( "\nPython %s\n" % sys.version ) + print ( "\nPython {}\n".format(sys.version) ) print ( "IoT Hub Client for Python" ) - hub_manager = HubManager(connection_string) + client = IoTHubModuleClient.create_from_edge_environment() + + # Begin listening for messages + message_listener_thread = threading.Thread(target=receive_message_listener, args=(client,)) + message_listener_thread.daemon = True + message_listener_thread.start() - print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol ) + print ( "Starting the IoT Hub Python sample...") print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ") while True: time.sleep(1000) - except IoTHubError as iothub_error: - print ( "Unexpected error %s from IoTHub" % iothub_error ) - return except KeyboardInterrupt: print ( "IoTHubClient sample stopped" ) + except: + print ( "Unexpected error from IoTHub" ) + return if __name__ == '__main__': try: - CONNECTION_STRING = os.environ['EdgeHubConnectionString'] + main() except Exception as error: print ( error ) sys.exit(1) - - main(CONNECTION_STRING) \ No newline at end of file diff --git a/iot-edge/Tutorials/PythonFilter/requirements.txt b/iot-edge/Tutorials/PythonFilter/requirements.txt index 79b971b..4d0e7a5 100644 --- a/iot-edge/Tutorials/PythonFilter/requirements.txt +++ b/iot-edge/Tutorials/PythonFilter/requirements.txt @@ -1 +1 @@ -azure-iothub-device-client==1.3.0.0b0 \ No newline at end of file +azure-iot-device>=2.0.0rc10 \ No newline at end of file diff --git a/iot-hub/Quickstarts/simulated-device-2/SimulatedDevice.py b/iot-hub/Quickstarts/simulated-device-2/SimulatedDevice.py index bbc0423..5a0d94c 100644 --- a/iot-hub/Quickstarts/simulated-device-2/SimulatedDevice.py +++ b/iot-hub/Quickstarts/simulated-device-2/SimulatedDevice.py @@ -3,60 +3,58 @@ import random import time -import sys +import threading # Using the Python Device SDK for IoT Hub: # https://github.com/Azure/azure-iot-sdk-python # The sample connects to a device-specific MQTT endpoint on your IoT Hub. -import iothub_client -# pylint: disable=E0611 -from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult -from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue +from azure.iot.device import IoTHubDeviceClient, Message, MethodResponse # The device connection string to authenticate the device with your IoT hub. # Using the Azure CLI: # az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyNodeDevice --output table CONNECTION_STRING = "{Your IoT hub device connection string}" -# Using the MQTT protocol. -PROTOCOL = IoTHubTransportProvider.MQTT -MESSAGE_TIMEOUT = 10000 - # Define the JSON message to send to IoT Hub. TEMPERATURE = 20.0 HUMIDITY = 60 -MSG_TXT = "{\"temperature\": %.2f,\"humidity\": %.2f}" +MSG_TXT = '{{"temperature": {temperature},"humidity": {humidity}}}' INTERVAL = 1 -def send_confirmation_callback(message, result, user_context): - print ( "IoT Hub responded to message with status: %s" % (result) ) - def iothub_client_init(): # Create an IoT Hub client - client = IoTHubClient(CONNECTION_STRING, PROTOCOL) + client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING) return client -# Handle direct method calls from IoT Hub -def device_method_callback(method_name, payload, user_context): + +def device_method_listener(device_client): global INTERVAL - print ( "\nMethod callback called with:\nmethodName = %s\npayload = %s" % (method_name, payload) ) - device_method_return_value = DeviceMethodReturnValue() - if method_name == "SetTelemetryInterval": - try: - INTERVAL = int(payload) - # Build and send the acknowledgment. - device_method_return_value.response = "{ \"Response\": \"Executed direct method %s\" }" % method_name - device_method_return_value.status = 200 - except ValueError: - # Build and send an error response. - device_method_return_value.response = "{ \"Response\": \"Invalid parameter\" }" - device_method_return_value.status = 400 - else: - # Build and send an error response. - device_method_return_value.response = "{ \"Response\": \"Direct method not defined: %s\" }" % method_name - device_method_return_value.status = 404 - return device_method_return_value + while True: + method_request = device_client.receive_method_request() + print ( + "\nMethod callback called with:\nmethodName = {method_name}\npayload = {payload}".format( + method_name=method_request.name, + payload=method_request.payload + ) + ) + if method_request.name == "SetTelemetryInterval": + try: + INTERVAL = int(method_request.payload) + except ValueError: + response_payload = {"Response": "Invalid parameter"} + response_status = 400 + else: + response_payload = {"Response": "Executed direct method {}".format(method_request.name)} + response_status = 200 + else: + response_payload = {"Response": "Direct method {} not defined".format(method_request.name)} + response_status = 404 + + method_response = MethodResponse(method_request.request_id, response_status, payload=response_payload) + device_client.send_method_response(method_response) + + def iothub_client_telemetry_sample_run(): @@ -64,33 +62,31 @@ def iothub_client_telemetry_sample_run(): client = iothub_client_init() print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" ) - # Set up the callback method for direct method calls from the hub. - client.set_device_method_callback( - device_method_callback, None) + # Start a thread to listen + device_method_thread = threading.Thread(target=device_method_listener, args=(client,)) + device_method_thread.daemon = True + device_method_thread.start() while True: # Build the message with simulated telemetry values. temperature = TEMPERATURE + (random.random() * 15) humidity = HUMIDITY + (random.random() * 20) - msg_txt_formatted = MSG_TXT % (temperature, humidity) - message = IoTHubMessage(msg_txt_formatted) + msg_txt_formatted = MSG_TXT.format(temperature=temperature, humidity=humidity) + message = Message(msg_txt_formatted) # Add a custom application property to the message. # An IoT hub can filter on these properties without access to the message body. - prop_map = message.properties() if temperature > 30: - prop_map.add("temperatureAlert", "true") + message.custom_properties["temperatureAlert"] = "true" else: - prop_map.add("temperatureAlert", "false") + message.custom_properties["temperatureAlert"] = "false" # Send the message. - print( "Sending message: %s" % message.get_string() ) - client.send_event_async(message, send_confirmation_callback, None) + print( "Sending message: {}".format(message) ) + client.send_message(message) + print( "Message sent" ) time.sleep(INTERVAL) - except IoTHubError as iothub_error: - print ( "Unexpected error %s from IoTHub" % iothub_error ) - return except KeyboardInterrupt: print ( "IoTHubClient sample stopped" ) diff --git a/iot-hub/Quickstarts/simulated-device/SimulatedDevice.py b/iot-hub/Quickstarts/simulated-device/SimulatedDevice.py index db8c9fc..8a97811 100644 --- a/iot-hub/Quickstarts/simulated-device/SimulatedDevice.py +++ b/iot-hub/Quickstarts/simulated-device/SimulatedDevice.py @@ -3,36 +3,25 @@ import random import time -import sys # Using the Python Device SDK for IoT Hub: # https://github.com/Azure/azure-iot-sdk-python # The sample connects to a device-specific MQTT endpoint on your IoT Hub. -import iothub_client -# pylint: disable=E0611 -from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult -from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue +from azure.iot.device import IoTHubDeviceClient, Message # The device connection string to authenticate the device with your IoT hub. # Using the Azure CLI: # az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyNodeDevice --output table CONNECTION_STRING = "{Your IoT hub device connection string}" -# Using the MQTT protocol. -PROTOCOL = IoTHubTransportProvider.MQTT -MESSAGE_TIMEOUT = 10000 - # Define the JSON message to send to IoT Hub. TEMPERATURE = 20.0 HUMIDITY = 60 -MSG_TXT = "{\"temperature\": %.2f,\"humidity\": %.2f}" - -def send_confirmation_callback(message, result, user_context): - print ( "IoT Hub responded to message with status: %s" % (result) ) +MSG_TXT = '{{"temperature": {temperature},"humidity": {humidity}}}' def iothub_client_init(): # Create an IoT Hub client - client = IoTHubClient(CONNECTION_STRING, PROTOCOL) + client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING) return client def iothub_client_telemetry_sample_run(): @@ -45,25 +34,22 @@ def iothub_client_telemetry_sample_run(): # Build the message with simulated telemetry values. temperature = TEMPERATURE + (random.random() * 15) humidity = HUMIDITY + (random.random() * 20) - msg_txt_formatted = MSG_TXT % (temperature, humidity) - message = IoTHubMessage(msg_txt_formatted) + msg_txt_formatted = MSG_TXT.format(temperature=temperature, humidity=humidity) + message = Message(msg_txt_formatted) # Add a custom application property to the message. # An IoT hub can filter on these properties without access to the message body. - prop_map = message.properties() if temperature > 30: - prop_map.add("temperatureAlert", "true") + message.custom_properties["temperatureAlert"] = "true" else: - prop_map.add("temperatureAlert", "false") + message.custom_properties["temperatureAlert"] = "false" # Send the message. - print( "Sending message: %s" % message.get_string() ) - client.send_event_async(message, send_confirmation_callback, None) + print( "Sending message: {}".format(message) ) + client.send_message(message) + print ( "Message successfully sent" ) time.sleep(1) - except IoTHubError as iothub_error: - print ( "Unexpected error %s from IoTHub" % iothub_error ) - return except KeyboardInterrupt: print ( "IoTHubClient sample stopped" )