diff --git a/Makefile b/Makefile index 21ea1ee22..d0ca3cb20 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ MG_DOCKER_IMAGE_NAME_PREFIX ?= ghcr.io/absmach/magistrala BUILD_DIR = build SERVICES = auth users things http coap ws postgres-writer postgres-reader timescale-writer \ - timescale-reader cli bootstrap mqtt provision certs invitations journal + timescale-reader cli bootstrap mqtt provision certs invitations journal re TEST_API_SERVICES = journal auth bootstrap certs http invitations notifiers provision readers things users TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES)) DOCKERS = $(addprefix docker_,$(SERVICES)) diff --git a/cmd/re/main.go b/cmd/re/main.go new file mode 100644 index 000000000..4ce170b67 --- /dev/null +++ b/cmd/re/main.go @@ -0,0 +1,210 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package main contains rule engine main function to start the service. +package main + +import ( + "context" + "fmt" + "log" + "log/slog" + "net/url" + "os" + "time" + + chclient "github.com/absmach/callhome/pkg/client" + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/consumers" + redisclient "github.com/absmach/magistrala/internal/clients/redis" + mglog "github.com/absmach/magistrala/logger" + authnsvc "github.com/absmach/magistrala/pkg/authn/authsvc" + mgauthz "github.com/absmach/magistrala/pkg/authz" + authzsvc "github.com/absmach/magistrala/pkg/authz/authsvc" + "github.com/absmach/magistrala/pkg/grpcclient" + jaegerclient "github.com/absmach/magistrala/pkg/jaeger" + "github.com/absmach/magistrala/pkg/messaging/brokers" + brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing" + pgclient "github.com/absmach/magistrala/pkg/postgres" + "github.com/absmach/magistrala/pkg/server" + httpserver "github.com/absmach/magistrala/pkg/server/http" + "github.com/absmach/magistrala/pkg/uuid" + "github.com/absmach/magistrala/re" + httpapi "github.com/absmach/magistrala/re/api" + repg "github.com/absmach/magistrala/re/postgres" + "github.com/caarlos0/env/v11" + "github.com/jmoiron/sqlx" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +const ( + svcName = "rules_engine" + envPrefixDB = "MG_RE_DB_" + envPrefixHTTP = "MG_RE_HTTP_" + envPrefixAuth = "MG_AUTH_GRPC_" + defDB = "r" + defSvcHTTPPort = "9008" +) + +type config struct { + LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""` + JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"` + ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"` + CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"` + CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"` + TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"` + ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"` + BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + // Create new rule engine configuration + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load %s configuration : %s", svcName, err) + } + + var logger *slog.Logger + logger, err := mglog.New(os.Stdout, cfg.LogLevel) + if err != nil { + log.Fatalf("failed to init logger: %s", err.Error()) + } + + var exitCode int + defer mglog.ExitWithError(&exitCode) + + if cfg.InstanceID == "" { + if cfg.InstanceID, err = uuid.New().ID(); err != nil { + logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err)) + exitCode = 1 + return + } + } + + // Create new database for rule engine. + dbConfig := pgclient.Config{Name: defDB} + if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + db, err := pgclient.Setup(dbConfig, *repg.Migration()) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer db.Close() + + tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio) + if err != nil { + logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err)) + exitCode = 1 + return + } + defer func() { + if err := tp.Shutdown(ctx); err != nil { + logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err)) + } + }() + tracer := tp.Tracer(svcName) + + pubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) + exitCode = 1 + return + } + defer pubSub.Close() + + httpServerConfig := server.Config{Port: defSvcHTTPPort} + if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + exitCode = 1 + return + } + pubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, pubSub) + + // Setup new redis cache client + cacheclient, err := redisclient.Connect(cfg.CacheURL) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer cacheclient.Close() + + grpcCfg := grpcclient.Config{} + if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil { + logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err)) + exitCode = 1 + return + } + authn, authnClient, err := authnsvc.NewAuthentication(ctx, grpcCfg) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer authnClient.Close() + logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure()) + + authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer authzClient.Close() + logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure()) + + svc, err := newService(ctx, db, dbConfig, authz, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to create services: %s", err)) + exitCode = 1 + return + } + + if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create Rule Engine: %s", err)) + exitCode = 1 + return + } + httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, logger, cfg.InstanceID), logger) + + if cfg.SendTelemetry { + chc := chclient.New(svcName, magistrala.Version, logger, cancel) + go chc.CallHome(ctx) + } + + // Start all servers + g.Go(func() error { + return httpSvc.Start() + }) + + g.Go(func() error { + return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvc) + }) + + if err := g.Wait(); err != nil { + logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err)) + } +} + +func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) { + database := pgclient.NewDatabase(db, dbConfig, tracer) + repo := repg.NewRepository(database) + idp := uuid.New() + + // csvc = authzmw.AuthorizationMiddleware(csvc, authz) + csvc := re.NewService(repo, idp, nil) + + return csvc, nil +} diff --git a/consumers/messages.go b/consumers/messages.go index 0d25edf6f..6b37f081a 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -125,10 +125,7 @@ func loadConfig(configPath string) (config, error) { SubscriberCfg: subscriberConfig{ Subjects: []string{brokers.SubjectAllChannels}, }, - TransformerCfg: transformerConfig{ - Format: defFormat, - ContentType: defContentType, - }, + TransformerCfg: transformerConfig{}, } data, err := os.ReadFile(configPath) @@ -152,8 +149,7 @@ func makeTransformer(cfg transformerConfig, logger *slog.Logger) transformers.Tr logger.Info("Using JSON transformer") return json.New(cfg.TimeFields) default: - logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format)) - os.Exit(1) + logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s; continuing without transformer", cfg.Format)) return nil } } diff --git a/docker/.env b/docker/.env index 305d2c062..614a11fd6 100644 --- a/docker/.env +++ b/docker/.env @@ -246,6 +246,23 @@ MG_THINGS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/things-grpc-client.crt} MG_THINGS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/things-grpc-client.key} MG_THINGS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} +### RE +MG_RE_LOG_LEVEL=debug +MG_RE_HTTP_HOST=re +MG_RE_HTTP_PORT=9008 +MG_RE_HTTP_SERVER_CERT= +MG_RE_HTTP_SERVER_KEY= +MG_RE_DB_HOST=re-db +MG_RE_DB_PORT=5432 +MG_RE_DB_USER=magistrala +MG_RE_DB_PASS=magistrala +MG_RE_DB_NAME=rule_engine +MG_RE_DB_SSL_MODE=disable +MG_RE_DB_SSL_CERT= +MG_RE_DB_SSL_KEY= +MG_RE_DB_SSL_ROOT_CERT= +MG_RE_INSTANCE_ID= + ### HTTP MG_HTTP_ADAPTER_LOG_LEVEL=debug MG_HTTP_ADAPTER_HOST=http-adapter diff --git a/docker/addons/re/config.toml b/docker/addons/re/config.toml new file mode 100644 index 000000000..cd080af54 --- /dev/null +++ b/docker/addons/re/config.toml @@ -0,0 +1,8 @@ +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +# To listen all messsage broker subjects use default value "channels.>". +# To subscribe to specific subjects use values starting by "channels." and +# followed by a subtopic (e.g ["channels..sub.topic.x", ...]). +[subscriber] +subjects = ["channels.>"] diff --git a/docker/addons/re/docker-compose.yml b/docker/addons/re/docker-compose.yml new file mode 100644 index 000000000..d54e34eb7 --- /dev/null +++ b/docker/addons/re/docker-compose.yml @@ -0,0 +1,92 @@ +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional Rule Egine service for Magistrala platform. +# Since these are optional, this file is dependent of docker-compose file +# from /docker. In order to run these services, execute command: +# docker compose -f docker/docker-compose.yml -f docker/addons/re/docker-compose.yml up +# from project root. PostgreSQL port is mapped, so you can use various tools for database +# inspection and data visualization. + +networks: + magistrala-base-net: + driver: bridge + +volumes: + magistrala-re-db-volume: + +services: + re-db: + image: postgres:16.2-alpine + container_name: magistrala-re-db + restart: on-failure + command: postgres -c "max_connections=${MG_POSTGRES_MAX_CONNECTIONS}" + environment: + POSTGRES_USER: ${MG_RE_DB_USER} + POSTGRES_PASSWORD: ${MG_RE_DB_PASS} + POSTGRES_DB: ${MG_RE_DB_NAME} + MG_POSTGRES_MAX_CONNECTIONS: ${MG_POSTGRES_MAX_CONNECTIONS} + ports: + - 6008:5432 + networks: + - magistrala-base-net + volumes: + - magistrala-re-db-volume:/var/lib/postgresql/data + + re: + image: ghcr.io/absmach/magistrala/re:${MG_RELEASE_TAG} + container_name: magistrala-re + depends_on: + - re-db + restart: on-failure + environment: + MG_RE_LOG_LEVEL: ${MG_RE_LOG_LEVEL} + MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT} + MG_RE_HTTP_HOST: ${MG_RE_HTTP_HOST} + MG_RE_HTTP_SERVER_CERT: ${MG_RE_HTTP_SERVER_CERT} + MG_RE_HTTP_SERVER_KEY: ${MG_RE_HTTP_SERVER_KEY} + MG_RE_DB_HOST: ${MG_RE_DB_HOST} + MG_RE_DB_PORT: ${MG_RE_DB_PORT} + MG_RE_DB_USER: ${MG_RE_DB_USER} + MG_RE_DB_PASS: ${MG_RE_DB_PASS} + MG_RE_DB_NAME: ${MG_RE_DB_NAME} + MG_RE_DB_SSL_MODE: ${MG_RE_DB_SSL_MODE} + MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT} + MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY} + MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT} + MG_MESSAGE_BROKER_URL: ${MG_MESSAGE_BROKER_URL} + MG_ES_URL: ${MG_ES_URL} + MG_JAEGER_URL: ${MG_JAEGER_URL} + MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO} + MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY} + MG_AUTH_GRPC_URL: ${MG_AUTH_GRPC_URL} + MG_AUTH_GRPC_TIMEOUT: ${MG_AUTH_GRPC_TIMEOUT} + MG_AUTH_GRPC_CLIENT_CERT: ${MG_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt} + MG_AUTH_GRPC_CLIENT_KEY: ${MG_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key} + MG_AUTH_GRPC_SERVER_CA_CERTS: ${MG_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt} + MG_SPICEDB_PRE_SHARED_KEY: ${MG_SPICEDB_PRE_SHARED_KEY} + MG_SPICEDB_HOST: ${MG_SPICEDB_HOST} + MG_SPICEDB_PORT: ${MG_SPICEDB_PORT} + MG_RE_INSTANCE_ID: ${MG_RE_INSTANCE_ID} + ports: + - ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT} + networks: + - magistrala-base-net + volumes: + # Auth gRPC client certificates + - type: bind + source: ${MG_AUTH_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert} + target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_CERT:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_AUTH_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key} + target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_KEY:+.key} + bind: + create_host_path: true + - type: bind + source: ${MG_AUTH_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca} + target: /auth-grpc-server-ca${MG_AUTH_GRPC_SERVER_CA_CERTS:+.crt} + bind: + create_host_path: true + - ./config.toml:/config.toml diff --git a/docker/addons/timescale-writer/docker-compose.yml b/docker/addons/timescale-writer/docker-compose.yml index 125315a44..71fcd3c50 100644 --- a/docker/addons/timescale-writer/docker-compose.yml +++ b/docker/addons/timescale-writer/docker-compose.yml @@ -10,6 +10,7 @@ networks: magistrala-base-net: + driver: bridge volumes: magistrala-timescale-writer-volume: diff --git a/go.mod b/go.mod index 655f64874..a2ba581d6 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/fatih/color v1.18.0 + github.com/go-chi/chi v1.5.5 github.com/go-chi/chi/v5 v5.1.0 github.com/go-kit/kit v0.13.0 github.com/gofrs/uuid/v5 v5.3.0 @@ -43,6 +44,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 + github.com/yuin/gopher-lua v1.1.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 go.opentelemetry.io/otel v1.33.0 diff --git a/go.sum b/go.sum index 6509a04e6..de4c20b2e 100644 --- a/go.sum +++ b/go.sum @@ -440,6 +440,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJu github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= diff --git a/re/README.md b/re/README.md new file mode 100644 index 000000000..5cf795011 --- /dev/null +++ b/re/README.md @@ -0,0 +1,5 @@ +# Magistrala Rule Engine + + +[doc]: https://docs.magistrala.abstractmachines.fr +[compose]: ../docker/docker-compose.yml diff --git a/re/api/doc.go b/re/api/doc.go new file mode 100644 index 000000000..2424852cc --- /dev/null +++ b/re/api/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package api contains API-related concerns: endpoint definitions, middlewares +// and all resource representations. +package api diff --git a/re/api/endpoints.go b/re/api/endpoints.go new file mode 100644 index 000000000..6286ba852 --- /dev/null +++ b/re/api/endpoints.go @@ -0,0 +1,97 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + + "github.com/absmach/magistrala/internal/api" + "github.com/absmach/magistrala/pkg/authn" + svcerr "github.com/absmach/magistrala/pkg/errors/service" + "github.com/absmach/magistrala/re" + "github.com/go-kit/kit/endpoint" +) + +func addRuleEndpoint(s re.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthorization + } + + req := request.(addRuleReq) + rule, err := s.AddRule(ctx, session, req.Rule) + if err != nil { + return addRuleRes{}, err + } + return addRuleRes{Rule: rule, created: true}, nil + } +} + +func viewRuleEndpoint(s re.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthorization + } + + req := request.(viewRuleReq) + rule, err := s.ViewRule(ctx, session, req.id) + if err != nil { + return viewRuleRes{}, err + } + return viewRuleRes{Rule: rule}, nil + } +} + +func updateRuleEndpoint(s re.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthorization + } + + req := request.(updateRuleReq) + rule, err := s.UpdateRule(ctx, session, req.Rule) + if err != nil { + return updateRuleRes{}, err + } + return updateRuleRes{Rule: rule}, nil + } +} + +func listRulesEndpoint(s re.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthorization + } + + req := request.(listRulesReq) + page, err := s.ListRules(ctx, session, req.PageMeta) + if err != nil { + return rulesPageRes{}, nil + } + ret := rulesPageRes{ + Rules: page.Rules, + } + return ret, nil + } +} + +func upadateRuleStatusEndpoint(s re.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + session, ok := ctx.Value(api.SessionKey).(authn.Session) + if !ok { + return nil, svcerr.ErrAuthorization + } + + req := request.(changeRuleStatusReq) + err := s.RemoveRule(ctx, session, req.id) + if err != nil { + return changeRoleStatusRes{false}, err + } + return changeRoleStatusRes{true}, nil + } +} diff --git a/re/api/requests.go b/re/api/requests.go new file mode 100644 index 000000000..da7952f23 --- /dev/null +++ b/re/api/requests.go @@ -0,0 +1,75 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "github.com/absmach/magistrala/internal/api" + "github.com/absmach/magistrala/pkg/apiutil" + "github.com/absmach/magistrala/re" +) + +const maxLimitSize = 1000 + +type addRuleReq struct { + re.Rule +} + +func (req addRuleReq) validate() error { + return nil +} + +type viewRuleReq struct { + id string +} + +func (req viewRuleReq) validate() error { + if req.id == "" { + return apiutil.ErrMissingID + } + + return nil +} + +type listRulesReq struct { + re.PageMeta +} + +func (req listRulesReq) validate() error { + if req.Limit > maxLimitSize { + return apiutil.ErrLimitSize + } + if req.Dir != "" && (req.Dir != api.AscDir && req.Dir != api.DescDir) { + return apiutil.ErrInvalidDirection + } + + return nil +} + +type updateRuleReq struct { + Rule re.Rule `json:",inline"` +} + +func (req updateRuleReq) validate() error { + if req.Rule.ID == "" { + return apiutil.ErrMissingID + } + if len(req.Rule.Logic.Value) == 0 { + return apiutil.ErrEmptyList + } + + return nil +} + +type changeRuleStatusReq struct { + id string + status re.Status +} + +func (req changeRuleStatusReq) validate() error { + if req.id == "" { + return apiutil.ErrMissingID + } + + return nil +} diff --git a/re/api/responses.go b/re/api/responses.go new file mode 100644 index 000000000..16f30a721 --- /dev/null +++ b/re/api/responses.go @@ -0,0 +1,139 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "fmt" + "net/http" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/re" +) + +var ( + _ magistrala.Response = (*viewRuleRes)(nil) + _ magistrala.Response = (*addRuleRes)(nil) + _ magistrala.Response = (*changeRuleStatusRes)(nil) + _ magistrala.Response = (*rulesPageRes)(nil) + _ magistrala.Response = (*updateRuleRes)(nil) + _ magistrala.Response = (*changeRoleStatusRes)(nil) +) + +type pageRes struct { + Limit uint64 `json:"limit,omitempty"` + Offset uint64 `json:"offset"` + Total uint64 `json:"total"` +} + +type addRuleRes struct { + re.Rule + created bool +} + +func (res addRuleRes) Code() int { + if res.created { + return http.StatusCreated + } + + return http.StatusOK +} + +func (res addRuleRes) Headers() map[string]string { + if res.created { + return map[string]string{ + "Location": fmt.Sprintf("/rules/%s", res.ID), + } + } + + return map[string]string{} +} + +func (res addRuleRes) Empty() bool { + return false +} + +type updateRuleRes struct { + re.Rule `json:",inline"` +} + +func (res updateRuleRes) Code() int { + return http.StatusOK +} + +func (res updateRuleRes) Headers() map[string]string { + return map[string]string{} +} + +func (res updateRuleRes) Empty() bool { + return false +} + +type viewRuleRes struct { + re.Rule `json:",inline"` +} + +func (res viewRuleRes) Code() int { + return http.StatusOK +} + +func (res viewRuleRes) Headers() map[string]string { + return map[string]string{} +} + +func (res viewRuleRes) Empty() bool { + return false +} + +type rulesPageRes struct { + pageRes + Rules []re.Rule `json:"rules"` +} + +func (res rulesPageRes) Code() int { + return http.StatusOK +} + +func (res rulesPageRes) Headers() map[string]string { + return map[string]string{} +} + +func (res rulesPageRes) Empty() bool { + return false +} + +type changeRuleStatusRes struct { + re.Rule `json:",inline"` +} + +func (res changeRuleStatusRes) Code() int { + return http.StatusOK +} + +func (res changeRuleStatusRes) Headers() map[string]string { + return map[string]string{} +} + +func (res changeRuleStatusRes) Empty() bool { + return false +} + +type changeRoleStatusRes struct { + deleted bool +} + +func (res changeRoleStatusRes) Code() int { + if res.deleted { + return http.StatusNoContent + } + + return http.StatusOK +} + +func (res changeRoleStatusRes) Headers() map[string]string { + return map[string]string{} +} + +func (res changeRoleStatusRes) Empty() bool { + return true +} diff --git a/re/api/transport.go b/re/api/transport.go new file mode 100644 index 000000000..848bebe58 --- /dev/null +++ b/re/api/transport.go @@ -0,0 +1,147 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "strings" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/internal/api" + "github.com/absmach/magistrala/invitations" + "github.com/absmach/magistrala/pkg/apiutil" + mgauthn "github.com/absmach/magistrala/pkg/authn" + "github.com/absmach/magistrala/pkg/errors" + "github.com/absmach/magistrala/re" + "github.com/go-chi/chi" + kithttp "github.com/go-kit/kit/transport/http" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const ( + idKey = "ruleID" + inputChannelKey = "input_channel" + outputChannelKey = "output_channel" + statusKey = "status" +) + +// MakeHandler creates an HTTP handler for the service endpoints. +func MakeHandler(svc re.Service, authn mgauthn.Authentication, logger *slog.Logger, instanceID string) http.Handler { + opts := []kithttp.ServerOption{ + kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)), + } + mux := chi.NewRouter() + mux.Group(func(r chi.Router) { + r.Use(api.AuthenticateMiddleware(authn, true)) + r.Route("/{domainID}/rules", func(r chi.Router) { + r.Post("/", otelhttp.NewHandler(kithttp.NewServer( + addRuleEndpoint(svc), + decodeAddRuleRequest, + api.EncodeResponse, + opts..., + ), "create_rule").ServeHTTP) + + r.Get("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer( + viewRuleEndpoint(svc), + decodeViewRuleRequest, + api.EncodeResponse, + opts..., + ), "view_rule").ServeHTTP) + + r.Get("/", otelhttp.NewHandler(kithttp.NewServer( + listRulesEndpoint(svc), + decodeListRulesRequest, + api.EncodeResponse, + opts..., + ), "list_rules").ServeHTTP) + + r.Put("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer( + updateRuleEndpoint(svc), + decodeUpdateRuleRequest, + api.EncodeResponse, + opts..., + ), "update_rule").ServeHTTP) + + r.Put("/{ruleID}/status", otelhttp.NewHandler(kithttp.NewServer( + upadateRuleStatusEndpoint(svc), + decodeUpdateRuleStatusRequest, + api.EncodeResponse, + opts..., + ), "update_rule_status").ServeHTTP) + }) + }) + + mux.Get("/health", magistrala.Health("rule_engine", instanceID)) + mux.Handle("/metrics", promhttp.Handler()) + + return mux +} + +func decodeAddRuleRequest(_ context.Context, r *http.Request) (interface{}, error) { + if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) { + return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) + } + var rule re.Rule + if err := json.NewDecoder(r.Body).Decode(&rule); err != nil { + return nil, err + } + return addRuleReq{Rule: rule}, nil +} + +func decodeViewRuleRequest(_ context.Context, r *http.Request) (interface{}, error) { + id := chi.URLParam(r, idKey) + return viewRuleReq{id: id}, nil +} + +func decodeUpdateRuleRequest(_ context.Context, r *http.Request) (interface{}, error) { + var rule re.Rule + if err := json.NewDecoder(r.Body).Decode(&rule); err != nil { + return nil, err + } + return updateRuleReq{Rule: rule}, nil +} + +func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, error) { + offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + limit, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + ic, err := apiutil.ReadStringQuery(r, inputChannelKey, "") + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + oc, err := apiutil.ReadStringQuery(r, outputChannelKey, "") + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + return listRulesReq{ + PageMeta: re.PageMeta{ + Offset: offset, + Limit: limit, + InputChannel: ic, + OutputChannel: oc, + }, + }, nil +} + +func decodeUpdateRuleStatusRequest(_ context.Context, r *http.Request) (interface{}, error) { + id := r.URL.Query().Get(idKey) + status, err := apiutil.ReadStringQuery(r, statusKey, invitations.All.String()) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + s, err := re.ToStatus(status) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + return changeRuleStatusReq{id: id, status: s}, nil +} diff --git a/re/doc.go b/re/doc.go new file mode 100644 index 000000000..2c28de3bd --- /dev/null +++ b/re/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package re contain the domain concept definitions needed to +// support Magistrala Rule Egine services functionality. +package re diff --git a/re/postgres/init.go b/re/postgres/init.go new file mode 100644 index 000000000..5fb5745a9 --- /dev/null +++ b/re/postgres/init.go @@ -0,0 +1,45 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access + migrate "github.com/rubenv/sql-migrate" +) + +func Migration() *migrate.MemoryMigrationSource { + return &migrate.MemoryMigrationSource{ + Migrations: []*migrate.Migration{ + { + Id: "rules_01", + // VARCHAR(36) for colums with IDs as UUIDS have a maximum of 36 characters + // STATUS 0 to imply enabled and 1 to imply disabled + Up: []string{ + `CREATE TABLE IF NOT EXISTS rules ( + id VARCHAR(36) PRIMARY KEY, + name VARCHAR(1024), + domain_id VARCHAR(36) NOT NULL, + metadata JSONB, + created_at TIMESTAMP, + updated_at TIMESTAMP, + updated_by VARCHAR(254), + input_channel VARCHAR(36), + input_topic TEXT, + output_channel VARCHAR(36), + output_topic TEXT, + status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), + logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), + logic_value BYTEA, + recurring_time TIMESTAMP[], + recurring_type SMALLINT, + recurring_period SMALLINT + )`, + }, + Down: []string{ + `DROP TABLE IF EXISTS rules`, + }, + }, + }, + } +} diff --git a/re/postgres/repository.go b/re/postgres/repository.go new file mode 100644 index 000000000..755aa1701 --- /dev/null +++ b/re/postgres/repository.go @@ -0,0 +1,174 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "fmt" + "strings" + + "github.com/absmach/magistrala/pkg/errors" + repoerr "github.com/absmach/magistrala/pkg/errors/repository" + "github.com/absmach/magistrala/pkg/postgres" + "github.com/absmach/magistrala/re" +) + +// SQL Queries as Strings +const ( + addRuleQuery = ` + INSERT INTO rules (id, domain_id, input_channel, input_topic, logic_type, logic_value, + output_channel, output_topic, recurring_time, recurring_type, recurring_period, status) + VALUES (:id, :domain_id, :input_channel, :input_topic, :logic_type, :logic_value, + :output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :status) + RETURNING id; + ` + + viewRuleQuery = ` + SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel, + output_topic, recurring_time, recurring_type, recurring_period, status + FROM rules + WHERE id = $1; + ` + + updateRuleQuery = ` + UPDATE rules + SET input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type, + logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic, + recurring_time = :recurring_time, recurring_type = :recurring_type, + recurring_period = :recurring_period, status = :status + WHERE id = :id; + ` + + removeRuleQuery = ` + DELETE FROM rules + WHERE id = $1; + ` + + listRulesQuery = ` + SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel, + output_topic, recurring_time, recurring_type, recurring_period, status + FROM rules r %s %s; + ` + + totalQuery = `SELECT COUNT(*) FROM rules r %s;` +) + +type PostgresRepository struct { + DB postgres.Database +} + +func NewRepository(db postgres.Database) re.Repository { + return &PostgresRepository{DB: db} +} + +func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) { + dbr := ruleToDb(r) + _, err := repo.DB.NamedExecContext(ctx, addRuleQuery, dbr) + if err != nil { + return re.Rule{}, err + } + return r, nil +} + +func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) { + row := repo.DB.QueryRowxContext(ctx, viewRuleQuery, id) + if err := row.Err(); err != nil { + return re.Rule{}, err + } + var dbr dbRule + err := row.StructScan(&dbr) + if err != nil { + return re.Rule{}, err + } + ret := dbToRule(dbr) + + return ret, nil +} + +func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) { + dbr := ruleToDb(r) + result, err := repo.DB.NamedExecContext(ctx, updateRuleQuery, dbr) + if err != nil { + return re.Rule{}, err + } + + if _, err := result.RowsAffected(); err != nil { + return re.Rule{}, repoerr.ErrNotFound + } + + return r, nil +} + +func (repo *PostgresRepository) RemoveRule(ctx context.Context, id string) error { + result, err := repo.DB.ExecContext(ctx, removeRuleQuery, id) + if err != nil { + return err + } + + if _, err := result.RowsAffected(); err != nil { + return repoerr.ErrNotFound + } + + return nil +} + +func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (re.Page, error) { + pgData := "" + if pm.Limit != 0 { + pgData = "LIMIT :limit" + } + if pm.Offset != 0 { + pgData += " OFFEST :offset" + } + pq := pageQuery(pm) + q := fmt.Sprintf(listRulesQuery, pq, pgData) + rows, err := repo.DB.NamedQueryContext(ctx, q, pm) + if err != nil { + return re.Page{}, err + } + defer rows.Close() + + var rules []re.Rule + var r dbRule + for rows.Next() { + if err := rows.StructScan(&r); err != nil { + return re.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + rules = append(rules, dbToRule(r)) + } + + cq := fmt.Sprintf(totalQuery, pq) + + total, err := postgres.Total(ctx, repo.DB, cq, pm) + if err != nil { + return re.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + pm.Total = total + ret := re.Page{ + PageMeta: pm, + Rules: rules, + } + + return ret, nil +} + +func pageQuery(pm re.PageMeta) string { + var query []string + if pm.InputChannel != "" { + query = append(query, "r.input_channel = :input_channel") + } + if pm.OutputChannel != "" { + query = append(query, "r.output_channel = :output_channel") + } + if pm.Status != re.AllStatus { + query = append(query, "r.status = :status") + } + + var q string + if len(query) > 0 { + q = fmt.Sprintf("WHERE %s", strings.Join(query, " AND ")) + } + + return q +} diff --git a/re/postgres/rule.go b/re/postgres/rule.go new file mode 100644 index 000000000..3edfb6563 --- /dev/null +++ b/re/postgres/rule.go @@ -0,0 +1,118 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "database/sql" + "time" + + "github.com/absmach/magistrala/re" + "github.com/jackc/pgx/v5/pgtype" +) + +// dbRule represents the database structure for a Rule. +type dbRule struct { + ID string `db:"id"` + DomainID string `db:"domain_id"` + InputChannel string `db:"input_channel"` + InputTopic sql.NullString `db:"input_topic"` + LogicType re.ScriptType `db:"logic_type"` + LogicValue string `db:"logic_value"` + OutputChannel sql.NullString `db:"output_channel"` + OutputTopic sql.NullString `db:"output_topic"` + RecurringTime *pgtype.Array[string] `db:"recurring_time"` + RecurringType re.ReccuringType `db:"recurring_type"` + RecurringPeriod uint `db:"recurring_period"` + Status re.Status `db:"status"` + CreatedAt time.Time `db:"created_at"` + CreatedBy string `db:"created_by"` + UpdatedAt time.Time `db:"updated_at"` + UpdatedBy string `db:"updated_by"` +} + +func ruleToDb(r re.Rule) dbRule { + return dbRule{ + ID: r.ID, + DomainID: r.DomainID, + InputChannel: r.InputChannel, + InputTopic: toNullString(r.InputTopic), + LogicType: r.Logic.Type, + LogicValue: r.Logic.Value, + OutputChannel: toNullString(r.OutputChannel), + OutputTopic: toNullString(r.OutputTopic), + RecurringTime: toStringArray(r.Schedule.Time), + RecurringType: r.Schedule.RecurringType, + RecurringPeriod: r.Schedule.RecurringPeriod, + Status: r.Status, + CreatedAt: r.CreatedAt, + CreatedBy: r.CreatedBy, + UpdatedAt: r.UpdatedAt, + UpdatedBy: r.UpdatedBy, + } +} + +func dbToRule(dto dbRule) re.Rule { + return re.Rule{ + ID: dto.ID, + DomainID: dto.DomainID, + InputChannel: dto.InputChannel, + InputTopic: fromNullString(dto.InputTopic), + Logic: re.Script{ + Type: dto.LogicType, + Value: dto.LogicValue, + }, + OutputChannel: fromNullString(dto.OutputChannel), + OutputTopic: fromNullString(dto.OutputTopic), + Schedule: re.Schedule{ + Time: toTimeSlice(dto.RecurringTime), + RecurringType: dto.RecurringType, + RecurringPeriod: dto.RecurringPeriod, + }, + Status: re.Status(dto.Status), + CreatedAt: dto.CreatedAt, + CreatedBy: dto.CreatedBy, + UpdatedAt: dto.UpdatedAt, + UpdatedBy: dto.UpdatedBy, + } +} + +func toNullString(value string) sql.NullString { + if value == "" { + return sql.NullString{Valid: false} + } + return sql.NullString{String: value, Valid: true} +} + +func fromNullString(nullString sql.NullString) string { + if !nullString.Valid { + return "" + } + return nullString.String +} + +func toStringArray(times []time.Time) *pgtype.Array[string] { + var strArray []string + for _, t := range times { + strArray = append(strArray, t.Format(time.RFC3339)) + } + ret := pgtype.Array[string]{ + Elements: strArray, + Valid: true, + } + return &ret +} + +func toTimeSlice(strArray *pgtype.Array[string]) []time.Time { + if strArray == nil || !strArray.Valid { + return []time.Time{} + } + var times []time.Time + for _, s := range strArray.Elements { + t, err := time.Parse(time.RFC3339, s) + if err == nil { + times = append(times, t) + } + } + return times +} diff --git a/re/service.go b/re/service.go new file mode 100644 index 000000000..02ba36044 --- /dev/null +++ b/re/service.go @@ -0,0 +1,201 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package re + +import ( + "context" + "time" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/consumers" + "github.com/absmach/magistrala/pkg/authn" + "github.com/absmach/magistrala/pkg/messaging" + mgjson "github.com/absmach/magistrala/pkg/transformers/json" + lua "github.com/yuin/gopher-lua" +) + +type ScriptType uint + +type Script struct { + Type ScriptType `json:"type"` + Value string `json:"value"` +} + +// daily, weekly or monthly +type ReccuringType uint + +const ( + None ReccuringType = iota + Daily + Weekly + Monthly +) + +type Schedule struct { + Time []time.Time `json:"date,omitempty"` + RecurringType ReccuringType + RecurringPeriod uint // 1 meaning every Recurring value, 2 meaning every other, and so on. +} + +type Rule struct { + ID string `json:"id"` + DomainID string `json:"domain"` + InputChannel string `json:"input_channel"` + InputTopic string `json:"input_topic"` + Logic Script `json:"logic"` + OutputChannel string `json:"output_channel,omitempty"` + OutputTopic string `json:"output_topic,omitempty"` + Schedule Schedule `json:"schedule,omitempty"` + Status Status `json:"status"` + CreatedAt time.Time `json:"created_at,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + UpdatedAt time.Time `json:"updated_at,omitempty"` + UpdatedBy string `json:"updated_by,omitempty"` +} + +type Repository interface { + AddRule(ctx context.Context, r Rule) (Rule, error) + ViewRule(ctx context.Context, id string) (Rule, error) + UpdateRule(ctx context.Context, r Rule) (Rule, error) + RemoveRule(ctx context.Context, id string) error + ListRules(ctx context.Context, pm PageMeta) (Page, error) +} + +// PageMeta contains page metadata that helps navigation. +type PageMeta struct { + Total uint64 `json:"total" db:"total"` + Offset uint64 `json:"offset" db:"offset"` + Limit uint64 `json:"limit" db:"limit"` + Dir string `json:"dir" db:"dir"` + Name string `json:"name" db:"name"` + InputChannel string `json:"input_channel,omitempty" db:"input_channel"` + OutputChannel string `json:"output_channel,omitempty" db:"output_channel"` + Status Status `json:"status,omitempty" db:"status"` +} + +type Page struct { + PageMeta + Rules []Rule `json:"rules"` +} + +type Service interface { + consumers.AsyncConsumer + AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) + ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error) + UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) + ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error) + RemoveRule(ctx context.Context, session authn.Session, id string) error +} + +type re struct { + idp magistrala.IDProvider + repo Repository + pubSub messaging.PubSub + errors chan error +} + +func NewService(repo Repository, idp magistrala.IDProvider, pubSub messaging.PubSub) Service { + return &re{ + repo: repo, + idp: idp, + pubSub: pubSub, + errors: make(chan error), + } +} + +func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) { + id, err := re.idp.ID() + if err != nil { + return Rule{}, err + } + r.CreatedAt = time.Now() + r.ID = id + r.CreatedBy = session.UserID + r.DomainID = session.DomainID + r.Status = EnabledStatus + return re.repo.AddRule(ctx, r) +} + +func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error) { + return re.repo.ViewRule(ctx, id) +} + +func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) { + return re.repo.UpdateRule(ctx, r) +} + +func (re *re) ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error) { + return re.repo.ListRules(ctx, pm) +} + +func (re *re) RemoveRule(ctx context.Context, session authn.Session, id string) error { + return re.repo.RemoveRule(ctx, id) +} + +func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) { + switch m := msgs.(type) { + case *messaging.Message: + pm := PageMeta{ + InputChannel: m.Channel, + Status: EnabledStatus, + } + page, err := re.repo.ListRules(ctx, pm) + if err != nil { + re.errors <- err + return + } + for _, r := range page.Rules { + go re.process(r, m) + } + case mgjson.Message: + default: + } +} + +func (re *re) Errors() <-chan error { + return re.errors +} + +func (re *re) process(r Rule, msg *messaging.Message) error { + l := lua.NewState() + defer l.Close() + + message := l.NewTable() + + l.RawSet(message, lua.LString("channel"), lua.LString(msg.Channel)) + l.RawSet(message, lua.LString("subtopic"), lua.LString(msg.Subtopic)) + l.RawSet(message, lua.LString("publisher"), lua.LString(msg.Publisher)) + l.RawSet(message, lua.LString("protocol"), lua.LString(msg.Protocol)) + l.RawSet(message, lua.LString("created"), lua.LNumber(msg.Created)) + + pld := l.NewTable() + for i, b := range msg.Payload { + l.RawSet(pld, lua.LNumber(i+1), lua.LNumber(b)) // Lua tables are 1-indexed + } + l.RawSet(message, lua.LString("payload"), pld) + + // Set the message object as a Lua global variable. + l.SetGlobal("message", message) + + if err := l.DoString(string(r.Logic.Value)); err != nil { + return err + } + + result := l.Get(-1) // Get the last result + switch result { + case lua.LNil: + return nil + default: + if len(r.OutputChannel) == 0 { + return nil + } + m := &messaging.Message{ + Publisher: "magistrala.re", + Created: time.Now().Unix(), + Payload: []byte(result.String()), + } + re.pubSub.Publish(context.Background(), m.Channel, m) + } + return nil +} diff --git a/re/status.go b/re/status.go new file mode 100644 index 000000000..5042893bf --- /dev/null +++ b/re/status.go @@ -0,0 +1,80 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package re + +import ( + "encoding/json" + "strings" + + svcerr "github.com/absmach/magistrala/pkg/errors/service" +) + +// Status represents Rule status. +type Status uint8 + +// Possible User status values. +const ( + // EnabledStatus represents enabled Rule. + EnabledStatus Status = iota + // DisabledStatus represents disabled Rule. + DisabledStatus + // DeletedStatus represents a rule that will be deleted. + DeletedStatus + + // AllStatus is used for querying purposes to list rules irrespective + // of their status - both enabled and disabled. It is never stored in the + // database as the actual User status and should always be the largest + // value in this enumeration. + AllStatus +) + +// String representation of the possible status values. +const ( + Disabled = "disabled" + Enabled = "enabled" + Deleted = "deleted" + All = "all" + Unknown = "unknown" +) + +func (s Status) String() string { + switch s { + case DisabledStatus: + return Disabled + case EnabledStatus: + return Enabled + case DeletedStatus: + return Deleted + case AllStatus: + return All + default: + return Unknown + } +} + +// ToStatus converts string value to a valid status. +func ToStatus(status string) (Status, error) { + switch status { + case "", Enabled: + return EnabledStatus, nil + case Disabled: + return DisabledStatus, nil + case Deleted: + return DeletedStatus, nil + case All: + return AllStatus, nil + } + return Status(0), svcerr.ErrInvalidStatus +} + +func (s Status) MarshalJSON() ([]byte, error) { + return json.Marshal(s.String()) +} + +func (s *Status) UnmarshalJSON(data []byte) error { + str := strings.Trim(string(data), "\"") + val, err := ToStatus(str) + *s = val + return err +}