diff --git a/.github/workflows/check-license.yaml b/.github/workflows/check-license.yaml index 7b97d2b86b..2977dd9b1e 100644 --- a/.github/workflows/check-license.yaml +++ b/.github/workflows/check-license.yaml @@ -21,7 +21,7 @@ jobs: - name: Check License Header run: | CHECK="" - for file in $(grep -rl --exclude-dir={.git,build,**vernemq**} \ + for file in $(grep -rl --exclude-dir={.git,build} \ --exclude=\*.{crt,key,pem,zed,hcl,md,json,csv,mod,sum,tmpl,args} \ --exclude={CODEOWNERS,LICENSE,MAINTAINERS} \ .); do diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index 32bd0353c7..0e713513b4 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -26,7 +26,6 @@ import ( smqlog "github.com/absmach/supermq/logger" "github.com/absmach/supermq/mqtt" "github.com/absmach/supermq/mqtt/events" - mqtttracing "github.com/absmach/supermq/mqtt/tracing" "github.com/absmach/supermq/pkg/errors" "github.com/absmach/supermq/pkg/grpcclient" jaegerclient "github.com/absmach/supermq/pkg/jaeger" @@ -34,7 +33,6 @@ import ( brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" msgevents "github.com/absmach/supermq/pkg/messaging/events" "github.com/absmach/supermq/pkg/messaging/handler" - mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt" "github.com/absmach/supermq/pkg/server" "github.com/absmach/supermq/pkg/uuid" "github.com/caarlos0/env/v11" @@ -50,24 +48,21 @@ const ( ) type config struct { - LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` - MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` - MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` - MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` - MQTTForwarderTimeout time.Duration `env:"SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` - MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` - MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"` - HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` - HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` - HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` - HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` - Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""` - JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` - SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` - TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` + LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` + MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` + MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` + MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` + MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` + HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` + HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` + HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` + Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""` + JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` + SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` + InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` + ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` } func main() { @@ -126,38 +121,6 @@ func main() { }() tracer := tp.Tracer(svcName) - bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) - exitCode = 1 - return - } - defer bsub.Close() - bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub) - - mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout) - if err != nil { - logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err)) - exitCode = 1 - return - } - defer mpub.Close() - - mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL) - if err != nil { - logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err)) - exitCode = 1 - return - } - - fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger) - fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels) - if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil { - logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err)) - exitCode = 1 - return - } - np, err := brokers.NewPublisher(ctx, cfg.BrokerURL) if err != nil { logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) diff --git a/docker/.env b/docker/.env index 989bad6e05..b4e293c1d5 100644 --- a/docker/.env +++ b/docker/.env @@ -20,7 +20,6 @@ SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT} # Configs for nats as MQTT broker SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz SMQ_NATS_WS_TARGET_PATH= -SMQ_NATS_MQTT_QOS=1 ## RabbitMQ SMQ_RABBITMQ_PORT=5672 @@ -35,23 +34,14 @@ SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ SMQ_MESSAGE_BROKER_TYPE=nats SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL} -## VERNEMQ -SMQ_DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on -SMQ_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL=error -SMQ_VERNEMQ_HEALTH_CHECK=http://vernemq:8888/health -SMQ_VERNEMQ_WS_TARGET_PATH=/mqtt -SMQ_VERNEMQ_MQTT_QOS=2 - ## MQTT Broker -SMQ_MQTT_BROKER_TYPE=vernemq -SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_VERNEMQ_HEALTH_CHECK} -SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_VERNEMQ_MQTT_QOS} +SMQ_MQTT_BROKER_TYPE=nats +SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK} SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 -SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_VERNEMQ_WS_TARGET_PATH} ## Redis SMQ_REDIS_TCP_PORT=6379 @@ -70,7 +60,7 @@ SMQ_JAEGER_TRACE_RATIO=1.0 SMQ_JAEGER_MEMORY_MAX_TRACES=5000 ## Call home -SMQ_SEND_TELEMETRY=true +SMQ_SEND_TELEMETRY=false ## Postgres SMQ_POSTGRES_MAX_CONNECTIONS=100 @@ -310,7 +300,7 @@ SMQ_CLIENTS_INSTANCE_ID= #### Clients Client Config SMQ_CLIENTS_URL=http://clients:9006 SMQ_CLIENTS_AUTH_GRPC_URL=clients:7006 -SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s +SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=300s SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/clients-grpc-client.crt} SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/clients-grpc-client.key} SMQ_CLIENTS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} @@ -338,7 +328,7 @@ SMQ_CHANNELS_INSTANCE_ID= #### Channels Client Config SMQ_CHANNELS_URL=http://channels:9005 SMQ_CHANNELS_GRPC_URL=channels:7005 -SMQ_CHANNELS_GRPC_TIMEOUT=1s +SMQ_CHANNELS_GRPC_TIMEOUT=300s SMQ_CHANNELS_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/channels-grpc-client.crt} SMQ_CHANNELS_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/channels-grpc-client.key} SMQ_CHANNELS_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} @@ -354,7 +344,6 @@ SMQ_HTTP_ADAPTER_INSTANCE_ID= ### MQTT SMQ_MQTT_ADAPTER_LOG_LEVEL=debug SMQ_MQTT_ADAPTER_MQTT_PORT=1883 -SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s SMQ_MQTT_ADAPTER_WS_PORT=8080 SMQ_MQTT_ADAPTER_INSTANCE= SMQ_MQTT_ADAPTER_INSTANCE_ID= diff --git a/docker/README.md b/docker/README.md index 938cab6f1d..34d8148c5e 100644 --- a/docker/README.md +++ b/docker/README.md @@ -26,25 +26,21 @@ To pull docker images from a specific release you need to change the value of `S SuperMQ supports configurable MQTT broker and Message broker, which also acts as an events store. SuperMQ uses two types of brokers: -1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This can either be 'VerneMQ' or 'NATS'. +1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This is NATS. 2. MESSAGE_BROKER: Manages message exchange between SuperMQ core, optional, and external services. This can either be 'NATS' or 'RabbitMQ'. This is used to store messages for distributed processing. Events store: This is used by SuperMQ services to store events for distributed processing. SuperMQ uses a single service to be the message broker and events store. This can either be 'NATS' or 'RabbitMQ'. Redis can also be used as an events store, but it requires a message broker to be deployed along with it for message exchange. -This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. +This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. -The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use VerneMQ as a MQTT_BROKER and NATS as a MESSAGE_BROKER and EVENTS_STORE. +The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use NATS as a MQTT_BROKER, MESSAGE_BROKER and EVENTS_STORE. Therefore, the following combinations are possible: -- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS -- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis -- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ -- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis -- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ -- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis -- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS -- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis +- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ +- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis +- MESSAGE_BROKER: NATS, EVENTS_STORE: NATS +- MESSAGE_BROKER: NATS, EVENTS_STORE: Redis For Message brokers other than NATS, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker: @@ -70,20 +66,6 @@ SMQ_ES_TYPE=redis SMQ_ES_URL=${SMQ_REDIS_URL} ``` -For MQTT broker other than VerneMQ, you would need to change the `docker/.env`. For example, to use NATS as a MQTT broker: - -```env -SMQ_MQTT_BROKER_TYPE=nats -SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} -SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS} -SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} -SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 -SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK} -SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} -SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 -SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_WS_TARGET_PATH} -``` - ### RabbitMQ configuration ```yaml @@ -125,9 +107,9 @@ By using environment variables file at `docker/.env` you can modify the below gi `SMQ_NGINX_SERVER_NAME` environmental variable is used to configure nginx directive `server_name`. If environmental variable `SMQ_NGINX_SERVER_NAME` is empty then default value `localhost` will set to `server_name`. -`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned. +`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned. -`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned. +`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned. `SMQ_NGINX_SERVER_CLIENT_CA` environmental variable is used to configure nginx directive `ssl_client_certificate`. If environmental variable `SMQ_NGINX_SERVER_CLIENT_CA` is empty then by default certificate in the path `docker/ssl/certs/ca.crt` will be assigned. diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index ae702bebea..1f527d735c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -14,7 +14,6 @@ volumes: supermq-channels-db-volume: supermq-clients-redis-volume: supermq-broker-volume: - supermq-mqtt-broker-volume: supermq-spicedb-db-volume: supermq-auth-db-volume: supermq-pat-db-volume: @@ -964,7 +963,6 @@ services: container_name: supermq-mqtt depends_on: - clients - - vernemq - nats restart: on-failure environment: @@ -972,14 +970,11 @@ services: SMQ_MQTT_ADAPTER_MQTT_PORT: ${SMQ_MQTT_ADAPTER_MQTT_PORT} SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST} SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT} - SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT: ${SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT} SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK} - SMQ_MQTT_ADAPTER_MQTT_QOS: ${SMQ_MQTT_ADAPTER_MQTT_QOS} SMQ_MQTT_ADAPTER_WS_PORT: ${SMQ_MQTT_ADAPTER_WS_PORT} SMQ_MQTT_ADAPTER_INSTANCE_ID: ${SMQ_MQTT_ADAPTER_INSTANCE_ID} SMQ_MQTT_ADAPTER_WS_TARGET_HOST: ${SMQ_MQTT_ADAPTER_WS_TARGET_HOST} SMQ_MQTT_ADAPTER_WS_TARGET_PORT: ${SMQ_MQTT_ADAPTER_WS_TARGET_PORT} - SMQ_MQTT_ADAPTER_WS_TARGET_PATH: ${SMQ_MQTT_ADAPTER_WS_TARGET_PATH} SMQ_MQTT_ADAPTER_INSTANCE: ${SMQ_MQTT_ADAPTER_INSTANCE} SMQ_ES_URL: ${SMQ_ES_URL} SMQ_CLIENTS_AUTH_GRPC_URL: ${SMQ_CLIENTS_AUTH_GRPC_URL} @@ -1285,18 +1280,6 @@ services: bind: create_host_path: true - vernemq: - image: supermq/vernemq:${SMQ_RELEASE_TAG} - container_name: supermq-vernemq - restart: on-failure - environment: - DOCKER_VERNEMQ_ALLOW_ANONYMOUS: ${SMQ_DOCKER_VERNEMQ_ALLOW_ANONYMOUS} - DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: ${SMQ_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL} - networks: - - supermq-base-net - volumes: - - supermq-mqtt-broker-volume:/var/lib/vernemq - nats: image: nats:2.10.9-alpine container_name: supermq-nats diff --git a/docker/vernemq/Dockerfile b/docker/vernemq/Dockerfile deleted file mode 100644 index 76152b1f51..0000000000 --- a/docker/vernemq/Dockerfile +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (c) Abstract Machines -# SPDX-License-Identifier: Apache-2.0 - -# Builder -FROM erlang:25.3.2.8-alpine AS builder -RUN apk add --update git build-base bsd-compat-headers openssl-dev snappy-dev curl \ - && git clone -b 1.13.0 https://github.com/vernemq/vernemq \ - && cd vernemq \ - && make -j 16 rel - -# Executor -FROM alpine:3.19 - -COPY --from=builder /vernemq/_build/default/rel / - -RUN apk --no-cache --update --available upgrade && \ - apk add --no-cache ncurses-libs openssl libstdc++ jq curl bash snappy-dev && \ - addgroup --gid 10000 vernemq && \ - adduser --uid 10000 -H -D -G vernemq -h /vernemq vernemq && \ - install -d -o vernemq -g vernemq /vernemq - -# Defaults -ENV DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR="app=vernemq" \ - DOCKER_VERNEMQ_LOG__CONSOLE=console \ - PATH="/vernemq/bin:$PATH" \ - VERNEMQ_VERSION="1.13.0" - -WORKDIR /vernemq - -COPY --chown=10000:10000 bin/vernemq.sh /usr/sbin/start_vernemq -COPY --chown=10000:10000 files/vm.args /vernemq/etc/vm.args - -RUN chown -R 10000:10000 /vernemq && \ - ln -s /vernemq/etc /etc/vernemq && \ - ln -s /vernemq/data /var/lib/vernemq && \ - ln -s /vernemq/log /var/log/vernemq - -# Ports -# 1883 MQTT -# 8883 MQTT/SSL -# 8080 MQTT WebSockets -# 44053 VerneMQ Message Distribution -# 4369 EPMD - Erlang Port Mapper Daemon -# 8888 Health, API, Prometheus Metrics -# 9100 9101 9102 9103 9104 9105 9106 9107 9108 9109 Specific Distributed Erlang Port Range - -EXPOSE 1883 8883 8080 44053 4369 8888 \ - 9100 9101 9102 9103 9104 9105 9106 9107 9108 9109 - - -VOLUME ["/vernemq/log", "/vernemq/data", "/vernemq/etc"] - -HEALTHCHECK CMD vernemq ping | grep -q pong - -USER vernemq -CMD ["start_vernemq"] \ No newline at end of file diff --git a/docker/vernemq/bin/vernemq.sh b/docker/vernemq/bin/vernemq.sh deleted file mode 100755 index 4c990dafd1..0000000000 --- a/docker/vernemq/bin/vernemq.sh +++ /dev/null @@ -1,352 +0,0 @@ -#!/usr/bin/env sh - -NET_INTERFACE=$(route | grep '^default' | grep -o '[^ ]*$') -NET_INTERFACE=${DOCKER_NET_INTERFACE:-${NET_INTERFACE}} -IP_ADDRESS=$(ip -4 addr show ${NET_INTERFACE} | grep -oE '[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}' | sed -e "s/^[[:space:]]*//" | head -n 1) -IP_ADDRESS=${DOCKER_IP_ADDRESS:-${IP_ADDRESS}} - -VERNEMQ_ETC_DIR="/vernemq/etc" -VERNEMQ_VM_ARGS_FILE="${VERNEMQ_ETC_DIR}/vm.args" -VERNEMQ_CONF_FILE="${VERNEMQ_ETC_DIR}/vernemq.conf" -VERNEMQ_CONF_LOCAL_FILE="${VERNEMQ_ETC_DIR}/vernemq.conf.local" - -SECRETS_KUBERNETES_DIR="/var/run/secrets/kubernetes.io/serviceaccount" - -# Function to check istio readiness -istio_health() { - cmd=$(curl -s http://localhost:15021/healthz/ready > /dev/null) - status=$? - return $status -} - -# Ensure we have all files and needed directory write permissions -if [ ! -d ${VERNEMQ_ETC_DIR} ]; then - echo "Configuration directory at ${VERNEMQ_ETC_DIR} does not exist, exiting" >&2 - exit 1 -fi -if [ ! -f ${VERNEMQ_VM_ARGS_FILE} ]; then - echo "ls -l ${VERNEMQ_ETC_DIR}" - ls -l ${VERNEMQ_ETC_DIR} - echo "###" >&2 - echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} does not exist, exiting" >&2 - echo "###" >&2 - exit 1 -fi -if [ ! -w ${VERNEMQ_VM_ARGS_FILE} ]; then - echo "# whoami" - whoami - echo "# ls -l ${VERNEMQ_ETC_DIR}" - ls -l ${VERNEMQ_ETC_DIR} - echo "###" >&2 - echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} exists, but there are no write permissions! Exiting." >&2 - echo "###" >&2 - exit 1 -fi -if [ ! -s ${VERNEMQ_VM_ARGS_FILE} ]; then - echo "ls -l ${VERNEMQ_ETC_DIR}" - ls -l ${VERNEMQ_ETC_DIR} - echo "###" >&2 - echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} is empty! This will not work." >&2 - echo "### Exiting now." >&2 - echo "###" >&2 - exit 1 -fi - -# Ensure the Erlang node name is set correctly -if env | grep "DOCKER_VERNEMQ_NODENAME" -q; then - sed -i.bak -r "s/-name VerneMQ@.+/-name VerneMQ@${DOCKER_VERNEMQ_NODENAME}/" ${VERNEMQ_VM_ARGS_FILE} -else - if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then - NODENAME=$(hostname -i) - sed -i.bak -r "s/VerneMQ@.+/VerneMQ@${NODENAME}/" ${VERNEMQ_VM_ARGS_FILE} - else - sed -i.bak -r "s/-name VerneMQ@.+/-name VerneMQ@${IP_ADDRESS}/" ${VERNEMQ_VM_ARGS_FILE} - fi -fi - -if env | grep "DOCKER_VERNEMQ_DISCOVERY_NODE" -q; then - discovery_node=$DOCKER_VERNEMQ_DISCOVERY_NODE - if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then - tmp='' - while [[ -z "$tmp" ]]; do - tmp=$(getent hosts tasks.$discovery_node | awk '{print $1}' | head -n 1) - sleep 1 - done - discovery_node=$tmp - fi - if [ -n "$DOCKER_VERNEMQ_COMPOSE" ]; then - tmp='' - while [[ -z "$tmp" ]]; do - tmp=$(getent hosts $discovery_node | awk '{print $1}' | head -n 1) - sleep 1 - done - discovery_node=$tmp - fi - - sed -i.bak -r "/-eval.+/d" ${VERNEMQ_VM_ARGS_FILE} - echo "-eval \"vmq_server_cmd:node_join('VerneMQ@$discovery_node')\"" >> ${VERNEMQ_VM_ARGS_FILE} -fi - -# If you encounter "SSL certification error (subject name does not match the host name)", you may try to set DOCKER_VERNEMQ_KUBERNETES_INSECURE to "1". -insecure="" -if env | grep "DOCKER_VERNEMQ_KUBERNETES_INSECURE" -q; then - echo "Using curl with \"--insecure\" argument to access kubernetes API without matching SSL certificate" - insecure="--insecure" -fi - -if env | grep "DOCKER_VERNEMQ_KUBERNETES_ISTIO_ENABLED" -q; then - istio_health - while [ $status != 0 ]; do - istio_health - sleep 1 - done - echo "Istio ready" -fi - -# Function to call a HTTP GET request on the given URL Path, using the hostname -# of the current k8s cluster name. Usage: "k8sCurlGet /my/path" -function k8sCurlGet () { - local urlPath=$1 - - local hostname="kubernetes.default.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}" - local certsFile="${SECRETS_KUBERNETES_DIR}/ca.crt" - local token=$(cat ${SECRETS_KUBERNETES_DIR}/token) - local header="Authorization: Bearer ${token}" - local url="https://${hostname}/${urlPath}" - - curl -sS ${insecure} --cacert ${certsFile} -H "${header}" ${url} \ - || ( echo "### Error on accessing URL ${url}" ) -} - -DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME=${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME:-cluster.local} -if [ -d "${SECRETS_KUBERNETES_DIR}" ] ; then - # Let's get the namespace if it isn't set - DOCKER_VERNEMQ_KUBERNETES_NAMESPACE=${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE:-$(cat "${SECRETS_KUBERNETES_DIR}/namespace")} - - # Check the API access that will be needed in the TERM signal handler - podResponse=$(k8sCurlGet api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods/$(hostname) ) - statefulSetName=$(echo ${podResponse} | jq -r '.metadata.ownerReferences[0].name') - statefulSetPath="apis/apps/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/statefulsets/${statefulSetName}" - statefulSetResponse=$(k8sCurlGet ${statefulSetPath} ) - isCodeForbidden=$(echo ${statefulSetResponse} | jq '.code == 403') - if [[ ${isCodeForbidden} == "true" ]]; then - echo "Permission error: Cannot access URL ${statefulSetPath}: $(echo ${statefulSetResponse} | jq '.reason,.code,.message')" - exit 1 - else - numReplicas=$(echo ${statefulSetResponse} | jq '.status.replicas') - echo "Permissions ok: Our pod $(hostname) belongs to StatefulSet ${statefulSetName} with ${numReplicas} replicas" - fi -fi - -# Set up kubernetes node discovery -start_join_cluster=0 -if env | grep "DOCKER_VERNEMQ_DISCOVERY_KUBERNETES" -q; then - # Let's set our nodename correctly - # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#list-pod-v1-core - podList=$(k8sCurlGet "api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods?labelSelector=${DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR}") - VERNEMQ_KUBERNETES_SUBDOMAIN=${DOCKER_VERNEMQ_KUBERNETES_SUBDOMAIN:-$(echo ${podList} | jq '.items[0].spec.subdomain' | tr '\n' '"' | sed 's/"//g')} - if [[ $VERNEMQ_KUBERNETES_SUBDOMAIN == "null" ]]; then - VERNEMQ_KUBERNETES_HOSTNAME=${MY_POD_NAME}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME} - else - VERNEMQ_KUBERNETES_HOSTNAME=${MY_POD_NAME}.${VERNEMQ_KUBERNETES_SUBDOMAIN}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME} - fi - - sed -i.bak -r "s/VerneMQ@.+/VerneMQ@${VERNEMQ_KUBERNETES_HOSTNAME}/" ${VERNEMQ_VM_ARGS_FILE} - # Hack into K8S DNS resolution (temporarily) - kube_pod_names=$(echo ${podList} | jq '.items[].spec.hostname' | sed 's/"//g' | tr '\n' ' ' | sed 's/ *$//') - - for kube_pod_name in $kube_pod_names; do - if [[ $kube_pod_name == "null" ]]; then - echo "Kubernetes discovery selected, but no pods found. Maybe we're the first?" - echo "Anyway, we won't attempt to join any cluster." - break - fi - if [[ $kube_pod_name != $MY_POD_NAME ]]; then - discoveryHostname="${kube_pod_name}.${VERNEMQ_KUBERNETES_SUBDOMAIN}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}" - start_join_cluster=1 - echo "Will join an existing Kubernetes cluster with discovery node at ${discoveryHostname}" - echo "-eval \"vmq_server_cmd:node_join('VerneMQ@${discoveryHostname}')\"" >> ${VERNEMQ_VM_ARGS_FILE} - echo "Did I previously leave the cluster? If so, purging old state." - curl -fsSL http://${discoveryHostname}:8888/status.json >/dev/null 2>&1 || - (echo "Can't download status.json, better to exit now" && exit 1) - curl -fsSL http://${discoveryHostname}:8888/status.json | grep -q ${VERNEMQ_KUBERNETES_HOSTNAME} || - (echo "Cluster doesn't know about me, this means I've left previously. Purging old state..." && rm -rf /vernemq/data/*) - break - fi - done -fi - -if [ -f "${VERNEMQ_CONF_LOCAL_FILE}" ]; then - cp "${VERNEMQ_CONF_LOCAL_FILE}" ${VERNEMQ_CONF_FILE} - sed -i -r "s/###IPADDRESS###/${IP_ADDRESS}/" ${VERNEMQ_CONF_FILE} -else - sed -i '/########## Start ##########/,/########## End ##########/d' ${VERNEMQ_CONF_FILE} - - echo "########## Start ##########" >> ${VERNEMQ_CONF_FILE} - - env | grep DOCKER_VERNEMQ | grep -v 'DISCOVERY_NODE\|KUBERNETES\|SWARM\|COMPOSE\|DOCKER_VERNEMQ_USER' | cut -c 16- | awk '{match($0,/^[A-Z0-9_]*/)}{print tolower(substr($0,RSTART,RLENGTH)) substr($0,RLENGTH+1)}' | sed 's/__/./g' >> ${VERNEMQ_CONF_FILE} - - users_are_set=$(env | grep DOCKER_VERNEMQ_USER) - if [ ! -z "$users_are_set" ]; then - echo "vmq_passwd.password_file = /vernemq/etc/vmq.passwd" >> ${VERNEMQ_CONF_FILE} - touch /vernemq/etc/vmq.passwd - fi - - for vernemq_user in $(env | grep DOCKER_VERNEMQ_USER); do - username=$(echo $vernemq_user | awk -F '=' '{ print $1 }' | sed 's/DOCKER_VERNEMQ_USER_//g' | tr '[:upper:]' '[:lower:]') - password=$(echo $vernemq_user | awk -F '=' '{ print $2 }') - /vernemq/bin/vmq-passwd /vernemq/etc/vmq.passwd $username <> ${VERNEMQ_CONF_FILE} - fi - - if [ -z "$DOCKER_VERNEMQ_ERLANG__DISTRIBUTION__PORT_RANGE__MAXIMUM" ]; then - echo "erlang.distribution.port_range.maximum = 9109" >> ${VERNEMQ_CONF_FILE} - fi - - if [ -z "$DOCKER_VERNEMQ_LISTENER__TCP__DEFAULT" ]; then - echo "listener.tcp.default = ${IP_ADDRESS}:1883" >> ${VERNEMQ_CONF_FILE} - fi - - if [ -z "$DOCKER_VERNEMQ_LISTENER__WS__DEFAULT" ]; then - echo "listener.ws.default = ${IP_ADDRESS}:8080" >> ${VERNEMQ_CONF_FILE} - fi - - if [ -z "$DOCKER_VERNEMQ_LISTENER__VMQ__CLUSTERING" ]; then - echo "listener.vmq.clustering = ${IP_ADDRESS}:44053" >> ${VERNEMQ_CONF_FILE} - fi - - if [ -z "$DOCKER_VERNEMQ_LISTENER__HTTP__METRICS" ]; then - echo "listener.http.metrics = ${IP_ADDRESS}:8888" >> ${VERNEMQ_CONF_FILE} - fi - - echo "########## End ##########" >> ${VERNEMQ_CONF_FILE} -fi - -if [ ! -z "$DOCKER_VERNEMQ_ERLANG__MAX_PORTS" ]; then - sed -i.bak -r "s/\+Q.+/\+Q ${DOCKER_VERNEMQ_ERLANG__MAX_PORTS}/" ${VERNEMQ_VM_ARGS_FILE} -fi - -if [ ! -z "$DOCKER_VERNEMQ_ERLANG__PROCESS_LIMIT" ]; then - sed -i.bak -r "s/\+P.+/\+P ${DOCKER_VERNEMQ_ERLANG__PROCESS_LIMIT}/" ${VERNEMQ_VM_ARGS_FILE} -fi - -if [ ! -z "$DOCKER_VERNEMQ_ERLANG__MAX_ETS_TABLES" ]; then - sed -i.bak -r "s/\+e.+/\+e ${DOCKER_VERNEMQ_ERLANG__MAX_ETS_TABLES}/" ${VERNEMQ_VM_ARGS_FILE} -fi - -if [ ! -z "$DOCKER_VERNEMQ_ERLANG__DISTRIBUTION_BUFFER_SIZE" ]; then - sed -i.bak -r "s/\+zdbbl.+/\+zdbbl ${DOCKER_VERNEMQ_ERLANG__DISTRIBUTION_BUFFER_SIZE}/" ${VERNEMQ_VM_ARGS_FILE} -fi - -# Check configuration file -/vernemq/bin/vernemq config generate 2>&1 > /dev/null | tee /tmp/config.out | grep error - -if [ $? -ne 1 ]; then - echo "configuration error, exit" - echo "$(cat /tmp/config.out)" - exit $? -fi - -pid=0 - -# SIGUSR1-handler -siguser1_handler() { - echo "stopped" -} - -# SIGTERM-handler -sigterm_handler() { - if [ $pid -ne 0 ]; then - if [ -d "${SECRETS_KUBERNETES_DIR}" ] ; then - # this will stop the VerneMQ process, but first drain the node from all existing client sessions (-k) - if [ -n "$VERNEMQ_KUBERNETES_HOSTNAME" ]; then - terminating_node_name=VerneMQ@$VERNEMQ_KUBERNETES_HOSTNAME - else - terminating_node_name=VerneMQ@$IP_ADDRESS - fi - podList=$(k8sCurlGet "api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods?labelSelector=${DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR}") - kube_pod_names=$(echo ${podList} | jq '.items[].spec.hostname' | sed 's/"//g' | tr '\n' ' ' | sed 's/ *$//') - if [ "$kube_pod_names" = "$MY_POD_NAME" ]; then - echo "I'm the only pod remaining. Not performing leave and/or state purge." - /vernemq/bin/vmq-admin node stop >/dev/null - else - # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#read-pod-v1-core - podResponse=$(k8sCurlGet api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods/$(hostname) ) - statefulSetName=$(echo ${podResponse} | jq -r '.metadata.ownerReferences[0].name') - - # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#-strong-read-operations-statefulset-v1-apps-strong- - statefulSetResponse=$(k8sCurlGet "apis/apps/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/statefulsets/${statefulSetName}" ) - - isCodeForbidden=$(echo ${statefulSetResponse} | jq '.code == 403') - if [[ ${isCodeForbidden} == "true" ]]; then - echo "Permission error: Cannot access URL ${statefulSetPath}: $(echo ${statefulSetResponse} | jq '.reason,.code,.message')" - fi - - reschedule=$(echo ${statefulSetResponse} | jq '.status.replicas == .status.readyReplicas') - scaled_down=$(echo ${statefulSetResponse} | jq '.status.currentReplicas == .status.updatedReplicas') - - if [[ $reschedule == "true" ]]; then - # Perhaps is an scale down? - if [[ $scaled_down == "true" ]]; then - echo "Seems that this is a scale down scenario. Leaving cluster." - /vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/* - else - echo "Reschedule is true. Not leaving the cluster." - /vernemq/bin/vmq-admin node stop >/dev/null - fi - else - echo "Reschedule is false. Leaving the cluster." - /vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/* - fi - fi - else - if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then - terminating_node_name=VerneMQ@$(hostname -i) - # For Swarm we keep the old "cluster leave" approach for now - echo "Swarm node is leaving the cluster." - /vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/* - else - # In non-k8s mode: Stop the vernemq node gracefully - /vernemq/bin/vmq-admin node stop >/dev/null - fi - fi - kill -s TERM ${pid} - WAITFOR_PID=${pid} - pid=0 - wait ${WAITFOR_PID} - fi - exit 143; # 128 + 15 -- SIGTERM -} - -if [ ! -s ${VERNEMQ_VM_ARGS_FILE} ]; then - echo "ls -l ${VERNEMQ_ETC_DIR}" - ls -l ${VERNEMQ_ETC_DIR} - echo "###" >&2 - echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} is empty! This will not work." >&2 - echo "### Exiting now." >&2 - echo "###" >&2 - exit 1 -fi - -# Setup OS signal handlers -trap 'siguser1_handler' SIGUSR1 -trap 'sigterm_handler' SIGTERM - -# Start VerneMQ -/vernemq/bin/vernemq console -noshell -noinput $@ & -pid=$! -if [ $start_join_cluster -eq 1 ]; then - mkdir -p /var/log/vernemq/log - join_cluster > /var/log/vernemq/log/join_cluster.log & -fi -if [ -n "$API_KEY" ]; then - sleep 10 && echo "Adding API_KEY..." && /vernemq/bin/vmq-admin api-key add key="${API_KEY:-DEFAULT}" - vmq-admin api-key show -fi -wait $pid diff --git a/docker/vernemq/files/vm.args b/docker/vernemq/files/vm.args deleted file mode 100644 index afb3c022bb..0000000000 --- a/docker/vernemq/files/vm.args +++ /dev/null @@ -1,15 +0,0 @@ -+P 512000 -+e 256000 --env ERL_CRASH_DUMP /erl_crash.dump --env ERL_FULLSWEEP_AFTER 0 -+Q 512000 -+A 64 --setcookie vmq --name VerneMQ@127.0.0.1 -+K true -+W w -+sbwt none -+sbwtdcpu none -+sbwtdio none --smp enable -+zdbbl 32768 diff --git a/go.mod b/go.mod index a66cf183d0..6d781de1e5 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.4 require ( github.com/0x6flab/namegenerator v1.4.0 github.com/absmach/callhome v0.14.0 - github.com/absmach/certs v0.0.0-20241014135535-3f118b801054 + github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b github.com/absmach/mgate v0.4.5 github.com/absmach/senml v1.0.6 github.com/authzed/authzed-go v1.3.0 @@ -13,7 +13,6 @@ require ( github.com/authzed/spicedb v1.40.0 github.com/caarlos0/env/v11 v11.3.1 github.com/cenkalti/backoff/v4 v4.3.0 - github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/fatih/color v1.18.0 github.com/go-chi/chi/v5 v5.2.0 github.com/go-kit/kit v0.13.0 @@ -52,7 +51,7 @@ require ( golang.org/x/crypto v0.32.0 golang.org/x/oauth2 v0.25.0 golang.org/x/sync v0.10.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f + google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.4 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df @@ -65,7 +64,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect - github.com/antlr4-go/antlr/v4 v4.13.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/authzed/cel-go v0.20.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/ccoveille/go-safecast v1.5.0 // indirect @@ -80,6 +79,7 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dsnet/golib/memfile v1.0.0 // indirect + github.com/eclipse/paho.mqtt.golang v1.5.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -91,7 +91,7 @@ require ( github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/goccy/go-json v0.10.3 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect @@ -102,9 +102,9 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect - github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 // indirect + github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9 // indirect github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect - github.com/hashicorp/go-sockaddr v1.0.6 // indirect + github.com/hashicorp/go-sockaddr v1.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect @@ -114,13 +114,13 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/jzelinskie/stringz v0.0.3 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.6 // indirect github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect @@ -131,22 +131,22 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.14 // indirect - github.com/pion/dtls/v3 v3.0.2 // indirect - github.com/pion/logging v0.2.2 // indirect + github.com/pion/dtls/v3 v3.0.4 // indirect + github.com/pion/logging v0.2.3 // indirect github.com/pion/transport/v3 v3.0.7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.61.0 // indirect + github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rs/zerolog v1.33.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect - github.com/samber/lo v1.49.0 // indirect + github.com/samber/lo v1.49.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/smarty/assertions v1.15.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect @@ -157,14 +157,14 @@ require ( go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.8.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect ) diff --git a/go.sum b/go.sum index f9206d18c3..e655a94303 100644 --- a/go.sum +++ b/go.sum @@ -19,14 +19,14 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2BWs= github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk= -github.com/absmach/certs v0.0.0-20241014135535-3f118b801054 h1:NsIwp+ueKxDx8XftruA4hz8WUgyWq7eBE344nJt0LJg= -github.com/absmach/certs v0.0.0-20241014135535-3f118b801054/go.mod h1:bEAb/HjPztlrMmz8dLeJTke4Tzu9yW3+hY5eldEUtSY= +github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b h1:EGIqL1bARjRSS7kH98Q5O/g7lZN/Q0KtAVX5mxRcq84= +github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b/go.mod h1:g6Kqge7RVxwt+LRxqt+09cqa2SgPAwXvIPoyPsEqZlQ= github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI= github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo= github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk= github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ= -github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= -github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= +github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/authzed/authzed-go v1.3.0 h1:jKIMpYDy+6WoOwl32HRURxLZxNGm+I7ObUlTntEPcXA= github.com/authzed/authzed-go v1.3.0/go.mod h1:MYkXImtFAxrM/bVZvmC/WO+gZC9RLlvpCM51SLaUZb0= github.com/authzed/cel-go v0.20.2 h1:GlmLecGry7Z8HU0k+hmaHHUV05ZHrsFxduXHtIePvck= @@ -99,8 +99,6 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE= -github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw= github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0= github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= @@ -127,8 +125,8 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw= github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= -github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= -github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid/v5 v5.3.0 h1:m0mUMr+oVYUdxpMLgSYCZiXe7PuVPnI94+OMeVBNedk= @@ -173,12 +171,12 @@ github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISH github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc= github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= -github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 h1:iBt4Ew4XEGLfh6/bPk4rSYmuZJGizr6/x/AEizP0CQc= -github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8/go.mod h1:aiJI+PIApBRQG7FZTEBx5GiiX+HbOHilUdNxUZi4eV0= +github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9 h1:FW0YttEnUNDJ2WL9XcrrfteS1xW8u+sh4ggM8pN5isQ= +github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9/go.mod h1:Ll013mhdmsVDuoIXVfBtvgGJsXDYkTw1kooNcoCXuE0= github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts= github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= -github.com/hashicorp/go-sockaddr v1.0.6 h1:RSG8rKU28VTUTvEKghe5gIhIQpv8evvNpnDEyqO4u9I= -github.com/hashicorp/go-sockaddr v1.0.6/go.mod h1:uoUUmtwU7n9Dv3O4SNLeFvg0SxQ3lyjsj6+CCykpaxI= +github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= +github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/vault/api v1.15.0 h1:O24FYQCWwhwKnF7CuSqP30S51rTV7vz1iACXE/pj5DA= @@ -253,8 +251,8 @@ github.com/jzelinskie/stringz v0.0.3 h1:0GhG3lVMYrYtIvRbxvQI6zqRTT1P1xyQlpa0FhfU github.com/jzelinskie/stringz v0.0.3/go.mod h1:hHYbgxJuNLRw91CmpuFsYEOyQqpDVFg8pvEh23vy4P0= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -287,8 +285,9 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= @@ -328,10 +327,10 @@ github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4S github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0= -github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k= -github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= -github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= +github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= +github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI= +github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90= github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0= github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -351,8 +350,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ= -github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= @@ -373,8 +372,8 @@ github.com/rubenv/sql-migrate v1.7.1/go.mod h1:Ob2Psprc0/3ggbM6wCzyYVFFuc6FyZrb2 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= -github.com/samber/lo v1.49.0 h1:AGnTnQrg1jpFuwECPUSoxZCfVH5W22b605kWSry3YxM= -github.com/samber/lo v1.49.0/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= +github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= +github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -390,8 +389,9 @@ github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sS github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/sqids/sqids-go v0.4.1 h1:eQKYzmAZbLlRwHeHYPF35QhgxwZHLnlmVj9AkIj/rrw= github.com/sqids/sqids-go v0.4.1/go.mod h1:EMwHuPQgSNFS0A49jESTfIQS+066XQTVhukrzEPScl8= github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= @@ -485,8 +485,8 @@ golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZP golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -569,8 +569,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= -golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -599,10 +599,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= -google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 h1:A2ni10G3UlplFrWdCDJTl7D7mJ7GSRm37S+PDimaKRw= +google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -635,7 +635,7 @@ gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= +k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8= moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE= diff --git a/mqtt/README.md b/mqtt/README.md index aaa92c1307..467cf090b5 100644 --- a/mqtt/README.md +++ b/mqtt/README.md @@ -6,22 +6,19 @@ MQTT adapter provides an MQTT API for sending messages through the platform. MQT The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | +| Variable | Description | Default | +| ----------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | | SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info | | SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 | | SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost | | SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 | -| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 | -| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s | | SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" | | SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 | | SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost | | SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 | -| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt | | SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" | -| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | -| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | +| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | +| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | | SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | | SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | | SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | @@ -29,7 +26,7 @@ The service is configured using the environment variables presented in the follo | SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | | SMQ_JAEGER_URL | Jaeger server URL | | | SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | +| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | | SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" | ## Deployment @@ -56,13 +53,10 @@ SMQ_MQTT_ADAPTER_LOG_LEVEL=info \ SMQ_MQTT_ADAPTER_MQTT_PORT=1883 \ SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=localhost \ SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 \ -SMQ_MQTT_ADAPTER_MQTT_QOS=1 \ -SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s \ SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK="" \ SMQ_MQTT_ADAPTER_WS_PORT=8080 \ SMQ_MQTT_ADAPTER_WS_TARGET_HOST=localhost \ SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 \ -SMQ_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \ SMQ_MQTT_ADAPTER_INSTANCE="" \ SMQ_CLIENTS_AUTH_GRPC_URL=localhost:7000 \ SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \ diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go deleted file mode 100644 index 323854809c..0000000000 --- a/mqtt/forwarder.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt - -import ( - "context" - "fmt" - "log/slog" - "strings" - - "github.com/absmach/supermq/pkg/messaging" -) - -// Forwarder specifies MQTT forwarder interface API. -type Forwarder interface { - // Forward subscribes to the Subscriber and - // publishes messages using provided Publisher. - Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error -} - -type forwarder struct { - topic string - logger *slog.Logger -} - -// NewForwarder returns new Forwarder implementation. -func NewForwarder(topic string, logger *slog.Logger) Forwarder { - return forwarder{ - topic: topic, - logger: logger, - } -} - -func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { - subCfg := messaging.SubscriberConfig{ - ID: id, - Topic: f.topic, - Handler: handle(ctx, pub, f.logger), - } - - return sub.Subscribe(ctx, subCfg) -} - -func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) handleFunc { - return func(msg *messaging.Message) error { - if msg.GetProtocol() == protocol { - return nil - } - // Use concatenation instead of fmt.Sprintf for the - // sake of simplicity and performance. - topic := "channels/" + msg.GetChannel() + "/messages" - if msg.GetSubtopic() != "" { - topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/") - } - - go func() { - if err := pub.Publish(ctx, topic, msg); err != nil { - logger.Warn(fmt.Sprintf("Failed to forward message: %s", err)) - } - }() - - return nil - } -} - -type handleFunc func(msg *messaging.Message) error - -func (h handleFunc) Handle(msg *messaging.Message) error { - return h(msg) -} - -func (h handleFunc) Cancel() error { - return nil -} diff --git a/mqtt/tracing/doc.go b/mqtt/tracing/doc.go deleted file mode 100644 index 557d3934f1..0000000000 --- a/mqtt/tracing/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package tracing provides tracing instrumentation for SuperMQ MQTT adapter service. -// -// This package provides tracing middleware for SuperMQ MQTT adapter service. -// It can be used to trace incoming requests and add tracing capabilities to -// SuperMQ MQTT adapter service. -// -// For more details about tracing instrumentation for SuperMQ messaging refer -// to the documentation at https://docs.supermq.abstractmachines.fr/tracing/. -package tracing diff --git a/mqtt/tracing/forwarder.go b/mqtt/tracing/forwarder.go deleted file mode 100644 index c4db29ecbf..0000000000 --- a/mqtt/tracing/forwarder.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package tracing - -import ( - "context" - "fmt" - - "github.com/absmach/supermq/mqtt" - "github.com/absmach/supermq/pkg/messaging" - "github.com/absmach/supermq/pkg/server" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -const forwardOP = "process" - -var _ mqtt.Forwarder = (*forwarderMiddleware)(nil) - -type forwarderMiddleware struct { - topic string - forwarder mqtt.Forwarder - tracer trace.Tracer - host server.Config -} - -// New creates new mqtt forwarder tracing middleware. -func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, topic string) mqtt.Forwarder { - return &forwarderMiddleware{ - forwarder: forwarder, - tracer: tracer, - topic: topic, - host: config, - } -} - -// Forward traces mqtt forward operations. -func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { - subject := fmt.Sprintf("channels.%s.messages", fm.topic) - spanName := fmt.Sprintf("%s %s", subject, forwardOP) - - ctx, span := fm.tracer.Start(ctx, - spanName, - trace.WithAttributes( - attribute.String("messaging.system", "mqtt"), - attribute.Bool("messaging.destination.anonymous", false), - attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"), - attribute.Bool("messaging.destination.temporary", true), - attribute.String("network.protocol.name", "mqtt"), - attribute.String("network.protocol.version", "3.1.1"), - attribute.String("network.transport", "tcp"), - attribute.String("network.type", "ipv4"), - attribute.String("messaging.operation", forwardOP), - attribute.String("messaging.client_id", id), - attribute.String("server.address", fm.host.Host), - attribute.String("server.socket.port", fm.host.Port), - ), - ) - defer span.End() - - return fm.forwarder.Forward(ctx, id, sub, pub) -} diff --git a/pkg/messaging/mqtt/docs.go b/pkg/messaging/mqtt/docs.go deleted file mode 100644 index 2afbf14587..0000000000 --- a/pkg/messaging/mqtt/docs.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package mqtt hold the implementation of the Publisher and PubSub -// interfaces for the MQTT messaging system, the internal messaging -// broker of the SuperMQ IoT platform. Due to the practical requirements -// implementation Publisher is created alongside PubSub. The reason for -// this is that Subscriber implementation of MQTT brings the burden of -// additional struct fields which are not used by Publisher. Subscriber -// is not implemented separately because PubSub can be used where Subscriber is needed. -package mqtt diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go deleted file mode 100644 index 54364c31de..0000000000 --- a/pkg/messaging/mqtt/publisher.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt - -import ( - "context" - "errors" - "time" - - "github.com/absmach/supermq/pkg/messaging" - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -var errPublishTimeout = errors.New("failed to publish due to timeout reached") - -var _ messaging.Publisher = (*publisher)(nil) - -type publisher struct { - client mqtt.Client - timeout time.Duration - qos uint8 -} - -// NewPublisher returns a new MQTT message publisher. -func NewPublisher(address string, qos uint8, timeout time.Duration) (messaging.Publisher, error) { - client, err := newClient(address, "mqtt-publisher", timeout) - if err != nil { - return nil, err - } - - ret := publisher{ - client: client, - timeout: timeout, - qos: qos, - } - return ret, nil -} - -func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { - if topic == "" { - return ErrEmptyTopic - } - - // Publish only the payload and not the whole message. - token := pub.client.Publish(topic, byte(pub.qos), false, msg.GetPayload()) - if token.Error() != nil { - return token.Error() - } - - if ok := token.WaitTimeout(pub.timeout); !ok { - return errPublishTimeout - } - - return nil -} - -func (pub publisher) Close() error { - pub.client.Disconnect(uint(pub.timeout)) - return nil -} diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go deleted file mode 100644 index 37c81a122d..0000000000 --- a/pkg/messaging/mqtt/pubsub.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt - -import ( - "context" - "errors" - "fmt" - "log/slog" - "sync" - "time" - - "github.com/absmach/supermq/pkg/messaging" - mqtt "github.com/eclipse/paho.mqtt.golang" - "google.golang.org/protobuf/proto" -) - -const username = "supermq-mqtt" - -var ( - // ErrConnect indicates that connection to MQTT broker failed. - ErrConnect = errors.New("failed to connect to MQTT broker") - - // errSubscribeTimeout indicates that the subscription failed due to timeout. - errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") - - // errUnsubscribeTimeout indicates that unsubscribe failed due to timeout. - errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") - - // errUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted. - errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic") - - // ErrNotSubscribed indicates that the topic is not subscribed to. - ErrNotSubscribed = errors.New("not subscribed") - - // ErrEmptyTopic indicates the absence of topic. - ErrEmptyTopic = errors.New("empty topic") - - // ErrEmptyID indicates the absence of ID. - ErrEmptyID = errors.New("empty ID") -) - -var _ messaging.PubSub = (*pubsub)(nil) - -type subscription struct { - client mqtt.Client - topics []string - cancel func() error -} - -type pubsub struct { - publisher - logger *slog.Logger - mu sync.RWMutex - address string - timeout time.Duration - subscriptions map[string]subscription -} - -// NewPubSub returns MQTT message publisher/subscriber. -func NewPubSub(url string, qos uint8, timeout time.Duration, logger *slog.Logger) (messaging.PubSub, error) { - client, err := newClient(url, "mqtt-publisher", timeout) - if err != nil { - return nil, err - } - ret := &pubsub{ - publisher: publisher{ - client: client, - timeout: timeout, - qos: qos, - }, - address: url, - timeout: timeout, - logger: logger, - subscriptions: make(map[string]subscription), - } - return ret, nil -} - -func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { - if cfg.ID == "" { - return ErrEmptyID - } - if cfg.Topic == "" { - return ErrEmptyTopic - } - ps.mu.Lock() - defer ps.mu.Unlock() - - s, ok := ps.subscriptions[cfg.ID] - // If the client exists, check if it's subscribed to the topic and unsubscribe if needed. - switch ok { - case true: - if ok := s.contains(cfg.Topic); ok { - if err := s.unsubscribe(cfg.Topic, ps.timeout); err != nil { - return err - } - } - default: - client, err := newClient(ps.address, cfg.ID, ps.timeout) - if err != nil { - return err - } - s = subscription{ - client: client, - topics: []string{}, - cancel: cfg.Handler.Cancel, - } - } - s.topics = append(s.topics, cfg.Topic) - ps.subscriptions[cfg.ID] = s - - token := s.client.Subscribe(cfg.Topic, byte(ps.qos), ps.mqttHandler(cfg.Handler)) - if token.Error() != nil { - return token.Error() - } - if ok := token.WaitTimeout(ps.timeout); !ok { - return errSubscribeTimeout - } - - return nil -} - -func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { - if id == "" { - return ErrEmptyID - } - if topic == "" { - return ErrEmptyTopic - } - ps.mu.Lock() - defer ps.mu.Unlock() - - s, ok := ps.subscriptions[id] - if !ok || !s.contains(topic) { - return ErrNotSubscribed - } - - if err := s.unsubscribe(topic, ps.timeout); err != nil { - return err - } - ps.subscriptions[id] = s - - if len(s.topics) == 0 { - delete(ps.subscriptions, id) - } - return nil -} - -func (s *subscription) unsubscribe(topic string, timeout time.Duration) error { - if s.cancel != nil { - if err := s.cancel(); err != nil { - return err - } - } - - token := s.client.Unsubscribe(topic) - if token.Error() != nil { - return token.Error() - } - - if ok := token.WaitTimeout(timeout); !ok { - return errUnsubscribeTimeout - } - if ok := s.delete(topic); !ok { - return errUnsubscribeDeleteTopic - } - return token.Error() -} - -func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) { - opts := mqtt.NewClientOptions(). - SetUsername(username). - AddBroker(address). - SetClientID(id) - client := mqtt.NewClient(opts) - token := client.Connect() - if token.Error() != nil { - return nil, token.Error() - } - - if ok := token.WaitTimeout(timeout); !ok { - return nil, ErrConnect - } - - return client, nil -} - -func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler { - return func(_ mqtt.Client, m mqtt.Message) { - var msg messaging.Message - if err := proto.Unmarshal(m.Payload(), &msg); err != nil { - ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) - return - } - - if err := h.Handle(&msg); err != nil { - ps.logger.Warn(fmt.Sprintf("Failed to handle SuperMQ message: %s", err)) - } - } -} - -// Contains checks if a topic is present. -func (s subscription) contains(topic string) bool { - return s.indexOf(topic) != -1 -} - -// Finds the index of an item in the topics. -func (s subscription) indexOf(element string) int { - for k, v := range s.topics { - if element == v { - return k - } - } - return -1 -} - -// Deletes a topic from the slice. -func (s *subscription) delete(topic string) bool { - index := s.indexOf(topic) - if index == -1 { - return false - } - topics := make([]string, len(s.topics)-1) - copy(topics[:index], s.topics[:index]) - copy(topics[index:], s.topics[index+1:]) - s.topics = topics - return true -} diff --git a/pkg/messaging/mqtt/pubsub_test.go b/pkg/messaging/mqtt/pubsub_test.go deleted file mode 100644 index 9835dfb485..0000000000 --- a/pkg/messaging/mqtt/pubsub_test.go +++ /dev/null @@ -1,474 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt_test - -import ( - "context" - "errors" - "fmt" - "testing" - "time" - - "github.com/absmach/supermq/pkg/messaging" - mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" -) - -const ( - topic = "topic" - chansPrefix = "channels" - channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" - subtopic = "engine" - tokenTimeout = 100 * time.Millisecond -) - -var data = []byte("payload") - -// ErrFailedHandleMessage indicates that the message couldn't be handled. -var errFailedHandleMessage = errors.New("failed to handle supermq message") - -func TestPublisher(t *testing.T) { - msgChan := make(chan []byte) - - // Subscribing with topic, and with subtopic, so that we can publish messages. - client, err := newClient(address, "clientID1", brokerTimeout) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - token := client.Subscribe(topic, qos, func(_ mqtt.Client, m mqtt.Message) { - msgChan <- m.Payload() - }) - if ok := token.WaitTimeout(tokenTimeout); !ok { - assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", topic)) - } - assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) - - token = client.Subscribe(fmt.Sprintf("%s.%s", topic, subtopic), qos, func(_ mqtt.Client, m mqtt.Message) { - msgChan <- m.Payload() - }) - if ok := token.WaitTimeout(tokenTimeout); !ok { - assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", fmt.Sprintf("%s.%s", topic, subtopic))) - } - assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) - - t.Cleanup(func() { - token := client.Unsubscribe(topic, fmt.Sprintf("%s.%s", topic, subtopic)) - token.WaitTimeout(tokenTimeout) - assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) - - client.Disconnect(100) - }) - - // Test publish with an empty topic. - err = pubsub.Publish(context.TODO(), "", &messaging.Message{Payload: data}) - assert.Equal(t, err, mqttpubsub.ErrEmptyTopic, fmt.Sprintf("Publish with empty topic: expected: %s, got: %s", mqttpubsub.ErrEmptyTopic, err)) - - cases := []struct { - desc string - channel string - subtopic string - payload []byte - }{ - { - desc: "publish message with nil payload", - payload: nil, - }, - { - desc: "publish message with string payload", - payload: data, - }, - { - desc: "publish message with channel", - payload: data, - channel: channel, - }, - { - desc: "publish message with subtopic", - payload: data, - subtopic: subtopic, - }, - { - desc: "publish message with channel and subtopic", - payload: data, - channel: channel, - subtopic: subtopic, - }, - } - for _, tc := range cases { - expectedMsg := messaging.Message{ - Publisher: "clientID11", - Channel: tc.channel, - Subtopic: tc.subtopic, - Payload: tc.payload, - } - - err := pubsub.Publish(context.TODO(), topic, &expectedMsg) - assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) - - data, err := proto.Marshal(&expectedMsg) - assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) - - receivedMsg := <-msgChan - if tc.payload != nil { - assert.Equal(t, expectedMsg.GetPayload(), receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg)) - } - } -} - -func TestSubscribe(t *testing.T) { - msgChan := make(chan *messaging.Message) - - // Creating client to Publish messages to subscribed topic. - client, err := newClient(address, "supermq", brokerTimeout) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - t.Cleanup(func() { - client.Unsubscribe() - client.Disconnect(100) - }) - - cases := []struct { - desc string - topic string - clientID string - err error - handler messaging.MessageHandler - }{ - { - desc: "Subscribe to a topic with an ID", - topic: topic, - clientID: "clientid1", - err: nil, - handler: handler{false, "clientid1", msgChan}, - }, - { - desc: "Subscribe to the same topic with a different ID", - topic: topic, - clientID: "clientid2", - err: nil, - handler: handler{false, "clientid2", msgChan}, - }, - { - desc: "Subscribe to an already subscribed topic with an ID", - topic: topic, - clientID: "clientid1", - err: nil, - handler: handler{false, "clientid1", msgChan}, - }, - { - desc: "Subscribe to a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s", topic, subtopic), - clientID: "clientid1", - err: nil, - handler: handler{false, "clientid1", msgChan}, - }, - { - desc: "Subscribe to an already subscribed topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s", topic, subtopic), - clientID: "clientid1", - err: nil, - handler: handler{false, "clientid1", msgChan}, - }, - { - desc: "Subscribe to an empty topic with an ID", - topic: "", - clientID: "clientid1", - err: mqttpubsub.ErrEmptyTopic, - handler: handler{false, "clientid1", msgChan}, - }, - { - desc: "Subscribe to a topic with empty id", - topic: topic, - clientID: "", - err: mqttpubsub.ErrEmptyID, - handler: handler{false, "", msgChan}, - }, - } - for _, tc := range cases { - subCfg := messaging.SubscriberConfig{ - ID: tc.clientID, - Topic: tc.topic, - Handler: tc.handler, - } - err = pubsub.Subscribe(context.TODO(), subCfg) - assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) - - if tc.err == nil { - expectedMsg := messaging.Message{ - Publisher: "clientID1", - Channel: channel, - Subtopic: subtopic, - Payload: data, - } - data, err := proto.Marshal(&expectedMsg) - assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) - - token := client.Publish(tc.topic, qos, false, data) - token.WaitTimeout(tokenTimeout) - assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) - - receivedMsg := <-msgChan - assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - } - } -} - -func TestPubSub(t *testing.T) { - msgChan := make(chan *messaging.Message) - - cases := []struct { - desc string - topic string - clientID string - err error - handler messaging.MessageHandler - }{ - { - desc: "Subscribe to a topic with an ID", - topic: topic, - clientID: "clientid7", - err: nil, - handler: handler{false, "clientid7", msgChan}, - }, - { - desc: "Subscribe to the same topic with a different ID", - topic: topic, - clientID: "clientid8", - err: nil, - handler: handler{false, "clientid8", msgChan}, - }, - { - desc: "Subscribe to a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s", topic, subtopic), - clientID: "clientid7", - err: nil, - handler: handler{false, "clientid7", msgChan}, - }, - { - desc: "Subscribe to an empty topic with an ID", - topic: "", - clientID: "clientid7", - err: mqttpubsub.ErrEmptyTopic, - handler: handler{false, "clientid7", msgChan}, - }, - { - desc: "Subscribe to a topic with empty id", - topic: topic, - clientID: "", - err: mqttpubsub.ErrEmptyID, - handler: handler{false, "", msgChan}, - }, - } - for _, tc := range cases { - subCfg := messaging.SubscriberConfig{ - ID: tc.clientID, - Topic: tc.topic, - Handler: tc.handler, - } - err := pubsub.Subscribe(context.TODO(), subCfg) - assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) - - if tc.err == nil { - // Use pubsub to subscribe to a topic, and then publish messages to that topic. - expectedMsg := messaging.Message{ - Publisher: "clientID", - Channel: channel, - Subtopic: subtopic, - Payload: data, - } - data, err := proto.Marshal(&expectedMsg) - assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) - - msg := messaging.Message{ - Payload: data, - } - // Publish message, and then receive it on message channel. - err = pubsub.Publish(context.TODO(), topic, &msg) - assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) - - receivedMsg := <-msgChan - assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - } - } -} - -func TestUnsubscribe(t *testing.T) { - msgChan := make(chan *messaging.Message) - - cases := []struct { - desc string - topic string - clientID string - err error - subscribe bool // True for subscribe and false for unsubscribe. - handler messaging.MessageHandler - }{ - { - desc: "Subscribe to a topic with an ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "clientid4", - err: nil, - subscribe: true, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Subscribe to the same topic with a different ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "clientid9", - err: nil, - subscribe: true, - handler: handler{false, "clientid9", msgChan}, - }, - { - desc: "Unsubscribe from a topic with an ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "clientid4", - err: nil, - subscribe: false, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Unsubscribe from same topic with different ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "clientid9", - err: nil, - subscribe: false, - handler: handler{false, "clientid9", msgChan}, - }, - { - desc: "Unsubscribe from a non-existent topic with an ID", - topic: "h", - clientID: "clientid4", - err: mqttpubsub.ErrNotSubscribed, - subscribe: false, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Unsubscribe from an already unsubscribed topic with an ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "clientid4", - err: mqttpubsub.ErrNotSubscribed, - subscribe: false, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Subscribe to a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), - clientID: "clientidd4", - err: nil, - subscribe: true, - handler: handler{false, "clientidd4", msgChan}, - }, - { - desc: "Unsubscribe from a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), - clientID: "clientidd4", - err: nil, - subscribe: false, - handler: handler{false, "clientidd4", msgChan}, - }, - { - desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), - clientID: "clientid4", - err: mqttpubsub.ErrNotSubscribed, - subscribe: false, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Unsubscribe from an empty topic with an ID", - topic: "", - clientID: "clientid4", - err: mqttpubsub.ErrEmptyTopic, - subscribe: false, - handler: handler{false, "clientid4", msgChan}, - }, - { - desc: "Unsubscribe from a topic with empty ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic), - clientID: "", - err: mqttpubsub.ErrEmptyID, - subscribe: false, - handler: handler{false, "", msgChan}, - }, - { - desc: "Subscribe to a new topic with an ID", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"), - clientID: "clientid55", - err: nil, - subscribe: true, - handler: handler{true, "clientid5", msgChan}, - }, - { - desc: "Unsubscribe from a topic with an ID with failing handler", - topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"), - clientID: "clientid55", - err: errFailedHandleMessage, - subscribe: false, - handler: handler{true, "clientid5", msgChan}, - }, - { - desc: "Subscribe to a new topic with subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic), - clientID: "clientid55", - err: nil, - subscribe: true, - handler: handler{true, "clientid5", msgChan}, - }, - { - desc: "Unsubscribe from a topic with subtopic with an ID with failing handler", - topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic), - clientID: "clientid55", - err: errFailedHandleMessage, - subscribe: false, - handler: handler{true, "clientid5", msgChan}, - }, - } - for _, tc := range cases { - subCfg := messaging.SubscriberConfig{ - ID: tc.clientID, - Topic: tc.topic, - Handler: tc.handler, - } - switch tc.subscribe { - case true: - err := pubsub.Subscribe(context.TODO(), subCfg) - assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) - default: - err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) - assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) - } - } -} - -type handler struct { - fail bool - publisher string - msgChan chan *messaging.Message -} - -func (h handler) Handle(msg *messaging.Message) error { - if msg.GetPublisher() != h.publisher { - h.msgChan <- msg - } - return nil -} - -func (h handler) Cancel() error { - if h.fail { - return errFailedHandleMessage - } - return nil -} diff --git a/pkg/messaging/mqtt/setup_test.go b/pkg/messaging/mqtt/setup_test.go deleted file mode 100644 index 9a01af691b..0000000000 --- a/pkg/messaging/mqtt/setup_test.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package mqtt_test - -import ( - "fmt" - "log" - "log/slog" - "os" - "os/signal" - "syscall" - "testing" - "time" - - smqlog "github.com/absmach/supermq/logger" - "github.com/absmach/supermq/pkg/messaging" - mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" -) - -var ( - pubsub messaging.PubSub - logger *slog.Logger - address string -) - -const ( - username = "supermq-mqtt" - qos = 2 - port = "1883/tcp" - brokerTimeout = 30 * time.Second - poolMaxWait = 120 * time.Second -) - -func TestMain(m *testing.M) { - pool, err := dockertest.NewPool("") - if err != nil { - log.Fatalf("Could not connect to docker: %s", err) - } - - container, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "eclipse-mosquitto", - Tag: "1.6.15", - }, func(config *docker.HostConfig) { - config.AutoRemove = true - config.RestartPolicy = docker.RestartPolicy{Name: "no"} - }) - if err != nil { - log.Fatalf("Could not start container: %s", err) - } - - handleInterrupt(pool, container) - - address = fmt.Sprintf("%s:%s", "localhost", container.GetPort(port)) - pool.MaxWait = poolMaxWait - - logger, err = smqlog.New(os.Stdout, "debug") - if err != nil { - log.Fatal(err.Error()) - } - - if err := pool.Retry(func() error { - pubsub, err = mqttpubsub.NewPubSub(address, 2, brokerTimeout, logger) - return err - }); err != nil { - log.Fatalf("Could not connect to docker: %s", err) - } - - code := m.Run() - if err := pool.Purge(container); err != nil { - log.Fatalf("Could not purge container: %s", err) - } - - os.Exit(code) - - defer func() { - err = pubsub.Close() - if err != nil { - log.Fatal(err.Error()) - } - }() -} - -func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { - c := make(chan os.Signal, 2) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - if err := pool.Purge(container); err != nil { - log.Fatalf("Could not purge container: %s", err) - } - os.Exit(0) - }() -} - -func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) { - opts := mqtt.NewClientOptions(). - SetUsername(username). - AddBroker(address). - SetClientID(id) - - client := mqtt.NewClient(opts) - token := client.Connect() - if token.Error() != nil { - return nil, token.Error() - } - - ok := token.WaitTimeout(timeout) - if !ok { - return nil, mqttpubsub.ErrConnect - } - - if token.Error() != nil { - return nil, token.Error() - } - - return client, nil -}