diff --git a/.github/workflows/docker-build-kuksa.yml b/.github/workflows/docker-build-kuksa.yml new file mode 100644 index 0000000..7ca0113 --- /dev/null +++ b/.github/workflows/docker-build-kuksa.yml @@ -0,0 +1,59 @@ +name: kuksa + +on: + workflow_dispatch: + push: + paths: + - ".github/workflows/docker-build-kuksa.yml" + - "kuksa/**" + branches: + - "main" + pull_request: + paths: + - ".github/workflows/docker-build-kuksa.yml" + - "kuksa/**" + branches: + - "main" + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/kuksa + +jobs: + build-kuksa: + name: "Build multi-arch image" + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Docker meta + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push + uses: docker/build-push-action@v5 + with: + push: ${{ github.event_name != 'pull_request' }} + context: ./kuksa + file: ./kuksa/Dockerfile + platforms: linux/amd64,linux/arm64,linux/arm/v7 + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/kuksa/.dockerignore b/kuksa/.dockerignore new file mode 100644 index 0000000..ab36462 --- /dev/null +++ b/kuksa/.dockerignore @@ -0,0 +1,2 @@ +deployment.json +__pycache__ \ No newline at end of file diff --git a/kuksa/.gitignore b/kuksa/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/kuksa/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/kuksa/Dockerfile b/kuksa/Dockerfile new file mode 100644 index 0000000..f3ec0b3 --- /dev/null +++ b/kuksa/Dockerfile @@ -0,0 +1,20 @@ +# Use an official Python runtime as a parent image +FROM python:3.8-slim + +# Set the working directory in the container +WORKDIR /app + +# Copy the current directory contents into the container at /app +COPY . /app + +# Install any needed packages specified in requirements.txt +RUN apt-get -y update \ + && apt-get -y install git \ + && pip install --no-cache-dir -r /app/requirements.txt \ + && apt-get remove -y git \ + && apt-get autoremove -y \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Run edge_client.py when the container launches +CMD ["python", "-u" "/app/edge_client.py"] \ No newline at end of file diff --git a/kuksa/README.md b/kuksa/README.md new file mode 100644 index 0000000..87a90ed --- /dev/null +++ b/kuksa/README.md @@ -0,0 +1,125 @@ +![Kanto logo](https://github.com/eclipse-kanto/kanto/raw/main/logo/kanto.svg) + +# Eclipse Kanto - Eclipse Kuksa Integration + +# Introduction + +This is an example application that connects to [Eclipse Kuksa Databroker](https://github.com/eclipse-kuksa/kuksa-databroker) and demonstrates how the COVESA Vehicle Signal Specification(VSS) data could be transformed to a digital twin using Eclipse Kanto. For the application it is transparent which Kanto cloud connectivity options is used(AWS, Azure or Suite). Nevertheless, for the completeness of the next guide steps, an AWS connector is chosen, so the VSS data from a Kuksa Databroker will be presented as an AWS IoT Shadow. + +# Installation + +## Prerequisites +You must have an installed and working instance of: +* Eclipse Kanto Container Management +* Eclipse Kanto AWS Connector that is connected to a Thing in AWS IoT Core + +## Steps +Create container and start the Kuksa Databroker +```shell +kanto-cm create --name=databroker ghcr.io/eclipse-kuksa/kuksa-databroker:0.4.4 --insecure +kanto-cm start --name=databroker +``` + +Create container and start the Kuksa Databroker CLI in a dedicated terminal, where VSS data will be fed at later point +```shell +kanto-cm create --name=cli --i --t --hosts=databroker:container_databroker-host --rp=no ghcr.io/eclipse-kuksa/kuksa-databroker-cli:0.4.4 --server=databroker:55555 +kanto-cm start --i --a --name=cli +``` + +Create container and start the Kuksa Example Application +```shell +kanto-cm create -f ./deployment.json +kanto-cm start --name=vss +``` + +There should be a new device shadow named 'VSS' in your AWS IoT Thing. With the VSS data from the Kuksa Databroker displayed as a shadow state +```json +{ + "state": { + "reported": { + "Vehicle": { + "Length": { + "timestamp": "2024-05-06T12:31:33.732487+00:00", + "value": 0 + }, + "CurbWeight": { + "timestamp": "2024-05-06T12:31:33.732469+00:00", + "value": 0 + }, + "GrossWeight": { + "timestamp": "2024-05-06T12:31:33.732484+00:00", + "value": 0 + }, + "StartTime": { + "timestamp": "2024-05-06T12:31:33.732633+00:00", + "value": "0000-01-01T00:00Z" + }, + "MaxTowWeight": { + "timestamp": "2024-05-06T12:31:33.732493+00:00", + "value": 0 + }, + "MaxTowBallWeight": { + "timestamp": "2024-05-06T12:31:33.732492+00:00", + "value": 0 + }, + "Speed": { + "timestamp": "2024-05-06T12:31:33.732486+00:00", + "value": 0 + }, + "Height": { + "timestamp": "2024-05-06T12:31:33.732485+00:00", + "value": 0 + }, + "Width": { + "timestamp": "2024-05-06T12:31:33.732869+00:00", + "value": 0 + } + } + } + } +} +``` + +You can go back to the Kuksa Databroker CLI terminal and feed new data to the Kuksa Databroker +```shell +feed Vehicle.Speed 120 +feed Vehicle.CurrentLocation.Altitude 640 +feed Vehicle.CurrentLocation.Latitude 43 +feed Vehicle.CurrentLocation.Longitude 25 +``` + +The Kuksa Example Application is subscribed for changes of this VSS data paths and the values are updated in the VSS shadow as well +```json +{ + "state": { + "reported": { + "Vehicle": { + ... + "Speed": { + "timestamp": "2024-05-06T15:11:12.911755+00:00", + "value": 120 + }, + ... + "CurrentLocation": { + "Altitude": 640, + "Latitude": 43, + "Longitude": 25 + } + } + } + } +} +``` + +# Control +The Kuksa Example Application is based on python scripts that allows configuring of connection settings for the local MQTT broker and the Kuksa Databroker and also which VSS data paths to be followed. Allowed arguments and their default values: +| Argument | Type | Default | Description | +| -------- | ---- | ------- | ----------- | +|mqtt_host |string|localhost|MQTT broker host | +|mqtt_port |int |1883 |MQTT broker port | +|mqtt_username |string| |MQTT username | +|mqtt_password |string| |MQTT password | +|kuksa_host |string|localhost|Kuksa Databroker host| +|kuksa_port |int |55555 |Kuksa Databroker port| +|vss_paths |string|Vehicle.CurrentLocation.Altitude,Vehicle.CurrentLocation.Latitude,Vehicle.CurrentLocation.Longitude,Vehicle.Speed| Comma separated VSS data paths to subscribe to | +|log_level |string| info |Logging level, possible values are critical,fatal,error,warn,warning,info,debug,notset \ No newline at end of file diff --git a/kuksa/deployment.json b/kuksa/deployment.json new file mode 100644 index 0000000..46959d1 --- /dev/null +++ b/kuksa/deployment.json @@ -0,0 +1,22 @@ +{ + "container_name": "vss", + "image": { + "name": "ghcr.io/eclipse-kanto/example-applications/kuksa:main" + }, + "config": { + "cmd": [ + "python", + "-u", + "/app/edge_client.py", + "--mqtt_host=ctrhost", + "--kuksa_host=databroker" + ] + }, + "host_config": { + "extra_hosts": [ + "ctrhost:host_ip", + "databroker:container_databroker-host" + ] + } + } + \ No newline at end of file diff --git a/kuksa/edge_client.py b/kuksa/edge_client.py new file mode 100644 index 0000000..e3d572e --- /dev/null +++ b/kuksa/edge_client.py @@ -0,0 +1,149 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-IDentifier: EPL-2.0 OR Apache-2.0 + +import argparse +import logging +import signal +import sys + +import paho.mqtt.client as mqtt +from ditto.client import Client +from ditto.model.feature import Feature +from ditto.protocol.things.commands import Command +from kuksa_client import KuksaClientThread + +from edge_device_info import EdgeDeviceInfo +from utils import process_tree, process_signal + +FEATURE_ID_VSS = "VSS" + +EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO = "edge/thing/response" +EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO_REQUEST = "edge/thing/request" + +VSS_PATHS = "Vehicle.CurrentLocation.Altitude,Vehicle.CurrentLocation.Latitude,Vehicle.CurrentLocation.Longitude,Vehicle.Speed" + +class EdgeClient: + def __init__(self, host, port, paths): + self.mqtt_client = None + self.kuksa_client = KuksaClientThread(config={'ip':host,'protocol': 'grpc', 'port': port, 'insecure': True}) + self.device_info = None + self.ditto_client = None + self.vss_paths = paths + self.log = logging.getLogger('EDGE_CLIENT') + + def on_connect(self, client:mqtt.Client, obj, flags, rc): + self.log.info("Connected with result code - " + str(rc)) + self.mqtt_client = client + # init ditto client + self.ditto_client = Client(paho_client=self.mqtt_client) + self.ditto_client.connect() + # init kuksa client + self.kuksa_client.start() + # trigger initialization + self.mqtt_client.subscribe(EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO) + self.mqtt_client.publish(EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO_REQUEST, None, 1) + + def on_message(self, client, userdata, msg): + try: + if msg.topic == EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO: + if self.device_info is None: + self.device_info = EdgeDeviceInfo() + self.device_info.unmarshal_json(msg.payload) + self.add_vss_feature() + self.subscribe() + else: + self.log.info('Device info already available - discarding message') + return + except Exception as ex: + self.log.error(ex) + + def subscribe(self): + self.log.info(f'Subscribing to VSS data paths - {self.vss_paths}') + self.kuksa_client.subscribeMultiple(self.vss_paths, self.on_kuksa_signal) + + def add_vss_feature(self): + # add the vss feature + feature = Feature + cmd = Command(self.device_info.deviceId).feature(FEATURE_ID_VSS).modify(Feature().to_ditto_dict()) + cmd_envelope = cmd.envelope(response_required=False, content_type="application/json") + self.ditto_client.send(cmd_envelope) + + # add the vss tree as properties + vss_tree = self.kuksa_client.getValue('Vehicle.*') + processed = process_tree(vss_tree) + for key, val in processed.items(): + cmd = Command(self.device_info.deviceId).feature_property(FEATURE_ID_VSS, key.replace('.','/')).modify(val) + cmd_envelope = cmd.envelope(response_required=False, content_type="application/json") + self.ditto_client.send(cmd_envelope) + + def on_kuksa_signal(self, message): + self.log.info(f'Received signal - {message}') + if self.device_info is None: + self.log.info("No device info is initialized to process VSS data") + return + processed = process_signal(message) + # update property + self.log.info(f'Updating VSS properties - {processed}') + for key, val in processed.items(): + cmd = Command(self.device_info.deviceId).feature_property(FEATURE_ID_VSS, key.replace('.','/')).modify(val) + cmd_envelope = cmd.envelope(response_required=False, content_type="application/json") + self.ditto_client.send(cmd_envelope) + + def shutdown(self): + self.kuksa_client.stop() + self.ditto_client.disconnect() + +def parse_args(): + parser = argparse.ArgumentParser(description="Edge Client with configurable MQTT and Kuksa settings", formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--mqtt_host", type=str, default="localhost", help="MQTT broker host") + parser.add_argument("--mqtt_port", type=int, default=1883, help="MQTT broker port") + parser.add_argument("--mqtt_username", type=str, default=None, help="MQTT username") + parser.add_argument("--mqtt_password", type=str, default=None, help="MQTT password") + parser.add_argument("--kuksa_host", type=str, default="localhost", help="Kuksa Databroker host") + parser.add_argument("--kuksa_port", type=int, default=55555, help="Kuksa Databroker port") + parser.add_argument("--vss_paths", type=str, default=VSS_PATHS, help="Comma separated VSS data paths to subscribe to") + parser.add_argument("--log_level", type=str, default='info', help="Logging level", choices=list(map(str.lower,logging._nameToLevel.keys()))) + return parser.parse_args() + +if __name__ == "__main__": + args = parse_args() + + # Set logging + logging.basicConfig(level=args.log_level.upper()) + log = logging.getLogger(__name__) + + # Set VSS data paths to subscribe to + args.vss_paths = [s.strip() for s in args.vss_paths.split(",")] + + paho_client = mqtt.Client() + edge_client = EdgeClient(args.kuksa_host, args.kuksa_port, args.vss_paths) + + # Set MQTT username and password if provided + if args.mqtt_username and args.mqtt_password: + self.mqtt_client.username_pw_set(mqtt_username, mqtt_password) + + paho_client.on_connect = edge_client.on_connect + paho_client.on_message = edge_client.on_message + paho_client.connect(args.mqtt_host, args.mqtt_port) + + + def termination_signal_received(signal_number, frame): + log.info("Received termination signal. Shutting down") + edge_client.shutdown() + paho_client.disconnect() + + + signal.signal(signal.SIGINT, termination_signal_received) + signal.signal(signal.SIGQUIT, termination_signal_received) + signal.signal(signal.SIGTERM, termination_signal_received) + log.info('before loop forever') + paho_client.loop_forever() \ No newline at end of file diff --git a/kuksa/edge_device_info.py b/kuksa/edge_device_info.py new file mode 100644 index 0000000..154b301 --- /dev/null +++ b/kuksa/edge_device_info.py @@ -0,0 +1,53 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-IDentifier: EPL-2.0 OR Apache-2.0 + +import json +import logging + +from ditto.model.namespaced_id import NamespacedID + +DEVICE_ID_KEY = "deviceId" +TENANT_ID_KEY = "tenantId" +POLICY_ID_KEY = "policyId" + +ALLOWED_KEYS = [DEVICE_ID_KEY, TENANT_ID_KEY, POLICY_ID_KEY] + + +class EdgeDeviceInfo: + def __init__(self, **kwargs): + self.deviceId = None + self.tenantId = None + self.policyId = None + self.log = logging.getLogger('EDGE_DEVICE') + + for k, v in kwargs.items(): + if k in ALLOWED_KEYS: + self.__setattr__(k, v) + + def unmarshal_json(self, data: json): + try: + envelope_dict = json.loads(data) + except json.JSONDecodeError as jex: + return jex + + for k, v in envelope_dict.copy().items(): + if k == DEVICE_ID_KEY: + self.deviceId = NamespacedID().from_string(v) + + if k == POLICY_ID_KEY: + self.policyId = NamespacedID().from_string(v) + + if k == TENANT_ID_KEY: + self.tenantId = v + + self.log.info(f'Device info - {envelope_dict}') + return 0 \ No newline at end of file diff --git a/kuksa/requirements.txt b/kuksa/requirements.txt new file mode 100644 index 0000000..161b15a --- /dev/null +++ b/kuksa/requirements.txt @@ -0,0 +1,3 @@ +paho-mqtt==1.5.1 +kuksa-client==0.4.3 +ditto-client @ git+https://github.com/eclipse-ditto/ditto-clients-python@b215e49b4ceaa7253ffc4b5c1db3968482a631bb \ No newline at end of file diff --git a/kuksa/utils.py b/kuksa/utils.py new file mode 100644 index 0000000..fce5552 --- /dev/null +++ b/kuksa/utils.py @@ -0,0 +1,34 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-IDentifier: EPL-2.0 OR Apache-2.0 + +import json + +# returns a key-value pairs where the key is the VSS path and the value is the +# value reported for this path +def process_tree(vss_tree): + res = {} + vss_data = json.loads(vss_tree) + for item in vss_data: + if item.get('value') is not None: + res[item['path']] = item['value'] + return res + + +# returns a key-value pairs where the key is the VSS path and the value is the +# value reported for this path +def process_signal(vss_signal): + res = {} + vss_signal_json = json.loads(vss_signal) + for item in vss_signal_json: + entry = item['entry'] + res[entry['path']] = entry['value']['value'] + return res \ No newline at end of file