Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MG-13 - Magistrala Rules engine #16

Merged
merged 10 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
MG_DOCKER_IMAGE_NAME_PREFIX ?= ghcr.io/absmach/magistrala
BUILD_DIR = build
SERVICES = auth users things http coap ws postgres-writer postgres-reader timescale-writer \
timescale-reader cli bootstrap mqtt provision certs invitations journal
timescale-reader cli bootstrap mqtt provision certs invitations journal re
TEST_API_SERVICES = journal auth bootstrap certs http invitations notifiers provision readers things users
TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES))
DOCKERS = $(addprefix docker_,$(SERVICES))
Expand Down
210 changes: 210 additions & 0 deletions cmd/re/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

// Package main contains rule engine main function to start the service.
package main

import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"time"

chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/consumers"
redisclient "github.com/absmach/magistrala/internal/clients/redis"
mglog "github.com/absmach/magistrala/logger"
authnsvc "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authzsvc "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/re"
httpapi "github.com/absmach/magistrala/re/api"
repg "github.com/absmach/magistrala/re/postgres"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

const (
svcName = "rules_engine"
envPrefixDB = "MG_RE_DB_"
envPrefixHTTP = "MG_RE_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "r"
defSvcHTTPPort = "9008"
)

type config struct {
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

// Create new rule engine configuration
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}

var logger *slog.Logger
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}

var exitCode int
defer mglog.ExitWithError(&exitCode)

if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}

// Create new database for rule engine.
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
db, err := pgclient.Setup(dbConfig, *repg.Migration())
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()

tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)

pubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer pubSub.Close()

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
pubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, pubSub)

// Setup new redis cache client
cacheclient, err := redisclient.Connect(cfg.CacheURL)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer cacheclient.Close()

grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnClient, err := authnsvc.NewAuthentication(ctx, grpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnClient.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())

authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzClient.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())

svc, err := newService(ctx, db, dbConfig, authz, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
return
}

if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create Rule Engine: %s", err))
exitCode = 1
return
}
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, logger, cfg.InstanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}

// Start all servers
g.Go(func() error {
return httpSvc.Start()
})

g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvc)
})

if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}

func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
database := pgclient.NewDatabase(db, dbConfig, tracer)
repo := repg.NewRepository(database)
idp := uuid.New()

// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
csvc := re.NewService(repo, idp, nil)

return csvc, nil
}
8 changes: 2 additions & 6 deletions consumers/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
)

const (
defContentType = "application/senml+json"

Check failure on line 24 in consumers/messages.go

View workflow job for this annotation

GitHub Actions / Lint and Build

const `defContentType` is unused (unused)
defFormat = "senml"

Check failure on line 25 in consumers/messages.go

View workflow job for this annotation

GitHub Actions / Lint and Build

const `defFormat` is unused (unused)
)

var (
Expand Down Expand Up @@ -125,10 +125,7 @@
SubscriberCfg: subscriberConfig{
Subjects: []string{brokers.SubjectAllChannels},
},
TransformerCfg: transformerConfig{
Format: defFormat,
ContentType: defContentType,
},
TransformerCfg: transformerConfig{},
}

data, err := os.ReadFile(configPath)
Expand All @@ -152,8 +149,7 @@
logger.Info("Using JSON transformer")
return json.New(cfg.TimeFields)
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
os.Exit(1)
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s; continuing without transformer", cfg.Format))
return nil
}
}
17 changes: 17 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,23 @@ MG_THINGS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/things-grpc-client.crt}
MG_THINGS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/things-grpc-client.key}
MG_THINGS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}

### RE
MG_RE_LOG_LEVEL=debug
MG_RE_HTTP_HOST=re
MG_RE_HTTP_PORT=9008
MG_RE_HTTP_SERVER_CERT=
MG_RE_HTTP_SERVER_KEY=
MG_RE_DB_HOST=re-db
MG_RE_DB_PORT=5432
MG_RE_DB_USER=magistrala
MG_RE_DB_PASS=magistrala
MG_RE_DB_NAME=rule_engine
MG_RE_DB_SSL_MODE=disable
MG_RE_DB_SSL_CERT=
MG_RE_DB_SSL_KEY=
MG_RE_DB_SSL_ROOT_CERT=
MG_RE_INSTANCE_ID=

### HTTP
MG_HTTP_ADAPTER_LOG_LEVEL=debug
MG_HTTP_ADAPTER_HOST=http-adapter
Expand Down
8 changes: 8 additions & 0 deletions docker/addons/re/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0

# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subscriber]
subjects = ["channels.>"]
92 changes: 92 additions & 0 deletions docker/addons/re/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional Rule Egine service for Magistrala platform.
# Since these are optional, this file is dependent of docker-compose file
# from <project_root>/docker. In order to run these services, execute command:
# docker compose -f docker/docker-compose.yml -f docker/addons/re/docker-compose.yml up
# from project root. PostgreSQL port is mapped, so you can use various tools for database
# inspection and data visualization.

networks:
magistrala-base-net:
driver: bridge

volumes:
magistrala-re-db-volume:

services:
re-db:
image: postgres:16.2-alpine
container_name: magistrala-re-db
restart: on-failure
command: postgres -c "max_connections=${MG_POSTGRES_MAX_CONNECTIONS}"
environment:
POSTGRES_USER: ${MG_RE_DB_USER}
POSTGRES_PASSWORD: ${MG_RE_DB_PASS}
POSTGRES_DB: ${MG_RE_DB_NAME}
MG_POSTGRES_MAX_CONNECTIONS: ${MG_POSTGRES_MAX_CONNECTIONS}
ports:
- 6008:5432
networks:
- magistrala-base-net
volumes:
- magistrala-re-db-volume:/var/lib/postgresql/data

re:
image: ghcr.io/absmach/magistrala/re:${MG_RELEASE_TAG}
container_name: magistrala-re
depends_on:
- re-db
restart: on-failure
environment:
MG_RE_LOG_LEVEL: ${MG_RE_LOG_LEVEL}
MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT}
MG_RE_HTTP_HOST: ${MG_RE_HTTP_HOST}
MG_RE_HTTP_SERVER_CERT: ${MG_RE_HTTP_SERVER_CERT}
MG_RE_HTTP_SERVER_KEY: ${MG_RE_HTTP_SERVER_KEY}
MG_RE_DB_HOST: ${MG_RE_DB_HOST}
MG_RE_DB_PORT: ${MG_RE_DB_PORT}
MG_RE_DB_USER: ${MG_RE_DB_USER}
MG_RE_DB_PASS: ${MG_RE_DB_PASS}
MG_RE_DB_NAME: ${MG_RE_DB_NAME}
MG_RE_DB_SSL_MODE: ${MG_RE_DB_SSL_MODE}
MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT}
MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY}
MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT}
MG_MESSAGE_BROKER_URL: ${MG_MESSAGE_BROKER_URL}
MG_ES_URL: ${MG_ES_URL}
MG_JAEGER_URL: ${MG_JAEGER_URL}
MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO}
MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY}
MG_AUTH_GRPC_URL: ${MG_AUTH_GRPC_URL}
MG_AUTH_GRPC_TIMEOUT: ${MG_AUTH_GRPC_TIMEOUT}
MG_AUTH_GRPC_CLIENT_CERT: ${MG_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
MG_AUTH_GRPC_CLIENT_KEY: ${MG_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key}
MG_AUTH_GRPC_SERVER_CA_CERTS: ${MG_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt}
MG_SPICEDB_PRE_SHARED_KEY: ${MG_SPICEDB_PRE_SHARED_KEY}
MG_SPICEDB_HOST: ${MG_SPICEDB_HOST}
MG_SPICEDB_PORT: ${MG_SPICEDB_PORT}
MG_RE_INSTANCE_ID: ${MG_RE_INSTANCE_ID}
ports:
- ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT}
networks:
- magistrala-base-net
volumes:
# Auth gRPC client certificates
- type: bind
source: ${MG_AUTH_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_AUTH_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${MG_AUTH_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /auth-grpc-server-ca${MG_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
- ./config.toml:/config.toml
1 change: 1 addition & 0 deletions docker/addons/timescale-writer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

networks:
magistrala-base-net:
driver: bridge

volumes:
magistrala-timescale-writer-volume:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi v1.5.5
github.com/go-chi/chi/v5 v5.1.0
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid/v5 v5.3.0
Expand Down Expand Up @@ -43,6 +44,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/yuin/gopher-lua v1.1.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
go.opentelemetry.io/otel v1.33.0
Expand Down
Loading
Loading