diff --git a/.github/workflows/check-generated-files.yml b/.github/workflows/check-generated-files.yml index 49c4c00d953..113547f126d 100644 --- a/.github/workflows/check-generated-files.yml +++ b/.github/workflows/check-generated-files.yml @@ -51,6 +51,7 @@ jobs: - "auth/policies.go" - "pkg/events/events.go" - "provision/service.go" + - "eventlogs/events.go" - name: Set up protoc if: steps.changes.outputs.proto == 'true' @@ -111,6 +112,8 @@ jobs: mv ./pkg/events/mocks/publisher.go ./pkg/events/mocks/publisher.go.tmp mv ./pkg/events/mocks/subscriber.go ./pkg/events/mocks/subscriber.go.tmp mv ./provision/mocks/service.go ./provision/mocks/service.go.tmp + mv ./eventlogs/mocks/repository.go ./eventlogs/mocks/repository.go.tmp + mv ./eventlogs/mocks/service.go ./eventlogs/mocks/service.go.tmp make mocks @@ -140,3 +143,5 @@ jobs: check_mock_changes ./pkg/events/mocks/publisher.go "ES Publisher ./pkg/events/mocks/publisher.go" check_mock_changes ./pkg/events/mocks/subscriber.go "EE Subscriber ./pkg/events/mocks/subscriber.go" check_mock_changes ./provision/mocks/service.go "Provision Service ./provision/mocks/service.go" + check_mock_changes ./eventlogs/mocks/repository.go "Event Logs Repository ./eventlogs/mocks/repository.go" + check_mock_changes ./eventlogs/mocks/service.go "Event Logs Service ./eventlogs/mocks/service.go" diff --git a/Makefile b/Makefile index f43285ff587..0b3fd15f9a2 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ MG_DOCKER_IMAGE_NAME_PREFIX ?= magistrala BUILD_DIR = build SERVICES = auth users things http coap ws lora influxdb-writer influxdb-reader mongodb-writer \ mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader timescale-writer timescale-reader cli \ - bootstrap opcua twins mqtt provision certs smtp-notifier smpp-notifier invitations + bootstrap opcua twins mqtt provision certs smtp-notifier smpp-notifier invitations event-logs DOCKERS = $(addprefix docker_,$(SERVICES)) DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES)) CGO_ENABLED ?= 0 @@ -78,7 +78,7 @@ endef ADDON_SERVICES = bootstrap cassandra-reader cassandra-writer certs \ influxdb-reader influxdb-writer lora-adapter mongodb-reader mongodb-writer \ opcua-adapter postgres-reader postgres-writer provision smpp-notifier smtp-notifier \ - timescale-reader timescale-writer twins + timescale-reader timescale-writer twins event-logs EXTERNAL_SERVICES = vault prometheus diff --git a/api/openapi/event-logs.yml b/api/openapi/event-logs.yml new file mode 100644 index 00000000000..681a72a92d0 --- /dev/null +++ b/api/openapi/event-logs.yml @@ -0,0 +1,278 @@ +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +openapi: 3.0.3 +info: + title: Magistrala Event Logs Service + description: | + This is the Event Logs Server based on the OpenAPI 3.0 specification. It is the HTTP API for viewing event logs history. You can now help us improve the API whether it's by making changes to the definition itself or to the code. + Some useful links: + - [The Magistrala repository](https://github.com/absmach/magistrala) + contact: + email: info@mainflux.com + license: + name: Apache 2.0 + url: https://github.com/absmach/magistrala/blob/master/LICENSE + version: 0.14.0 + +servers: + - url: http://localhost:9021 + - url: https://localhost:9021 + +tags: + - name: event-logs + description: Everything about your Event Logs + externalDocs: + description: Find out more about Event Logs + url: http://docs.mainflux.io/ + +paths: + /events/{id}{type}: + get: + tags: + - event-logs + summary: List event logs + description: | + Retrieves a list of events. Due to performance concerns, data + is retrieved in subsets. The API must ensure that the entire + dataset is consumed either by making subsequent requests, or by + increasing the subset size of the initial request. + parameters: + - $ref: "#/components/parameters/id" + - $ref: "#/components/parameters/type" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/offset" + - $ref: "#/components/parameters/operation" + - $ref: "#/components/parameters/from" + - $ref: "#/components/parameters/to" + security: + - bearerAuth: [] + responses: + "200": + $ref: "#/components/responses/EventPageRes" + "400": + description: Failed due to malformed query parameters. + "401": + description: | + Missing or invalid access token provided. + This endpoint is available only for administrators. + "404": + description: A non-existent entity request. + "422": + description: Database can't process request. + "500": + $ref: "#/components/responses/ServiceError" + + /health: + get: + summary: Retrieves service health check info. + tags: + - health + responses: + "200": + $ref: "#/components/responses/HealthRes" + "500": + $ref: "#/components/responses/ServiceError" + +components: + schemas: + Event: + type: object + properties: + id: + type: string + format: uuid + example: bb7edb32-2eac-4aad-aebe-ed96fe073879 + description: Entity unique identifier. + operation: + type: string + example: users.create + description: Event operation. + occurred_at: + type: string + format: date-time + example: "2024-01-11T12:05:07.449053Z" + description: Time when the event occurred. + payload: + type: object + description: Event payload. + example: + { + "created_at": "2024-01-11T12:05:07.435357Z", + "metadata": "e30=", + "name": "Moors-Mlacak", + "owner": "599a7f51-3f16-41b2-8224-3b1044cea83b", + "status": "enabled", + } + xml: + name: event + + EventPage: + type: object + properties: + events: + type: array + minItems: 0 + uniqueItems: true + items: + $ref: "#/components/schemas/Event" + total: + type: integer + example: 1 + description: Total number of items. + offset: + type: integer + description: Number of items to skip during retrieval. + limit: + type: integer + example: 10 + description: Maximum number of items to return in one page. + required: + - events + - total + - offset + + Error: + type: object + properties: + error: + type: string + description: Error message + example: { "error": "malformed entity specification" } + + HealthRes: + type: object + properties: + status: + type: string + description: Service status. + enum: + - pass + version: + type: string + description: Service version. + example: 0.14.0 + commit: + type: string + description: Service commit hash. + example: 7d6f4dc4f7f0c1fa3dc24eddfb18bb5073ff4f62 + description: + type: string + description: Service description. + example: service + build_time: + type: string + description: Service build time. + example: 1970-01-01_00:00:00 + + parameters: + id: + name: id + description: Unique identifier for an entity, e.g. user, group, domain, etc. + in: path + schema: + type: string + format: uuid + required: true + example: bb7edb32-2eac-4aad-aebe-ed96fe073879 + + type: + name: type + description: Type of entity, e.g. user, group, domain, etc. + in: path + schema: + type: string + enum: + - user + - group + - domain + - thing + - platform + - thing + required: true + example: user + + offset: + name: offset + description: Number of items to skip during retrieval. + in: query + schema: + type: integer + default: 0 + minimum: 0 + required: false + example: "0" + + limit: + name: limit + description: Size of the subset to retrieve. + in: query + schema: + type: integer + default: 10 + maximum: 10 + minimum: 1 + required: false + example: "10" + + operation: + name: operation + description: Event operation. + in: query + schema: + type: string + required: false + example: users.create + + from: + name: from + description: Start date in nanoseconds. + in: query + schema: + type: string + format: int64 + required: false + example: 1704974066014408296 + + to: + name: to + description: End date in nanoseconds. + in: query + schema: + type: string + format: int64 + required: false + example: 1704974066014408296 + + responses: + EventPageRes: + description: Data retrieved. + content: + application/json: + schema: + $ref: "#/components/schemas/EventPage" + + HealthRes: + description: Service Health Check. + content: + application/health+json: + schema: + $ref: "#/components/schemas/HealthRes" + + ServiceError: + description: Unexpected server-side error occurred. + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + securitySchemes: + bearerAuth: + type: http + scheme: bearer + bearerFormat: JWT + description: | + * User access: "Authorization: Bearer " + +security: + - bearerAuth: [] diff --git a/cli/events.go b/cli/events.go new file mode 100644 index 00000000000..caadc038d7e --- /dev/null +++ b/cli/events.go @@ -0,0 +1,50 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package cli + +import ( + mgxsdk "github.com/absmach/magistrala/pkg/sdk/go" + "github.com/spf13/cobra" +) + +var cmdEvents = cobra.Command{ + Use: "get ", + Short: "Get events", + Long: "Get events\n" + + "Usage:\n" + + "\tmagistrala-cli events get - lists all events\n" + + "\tmagistrala-cli events get --offset --limit - lists all events with provided offset and limit\n", + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 3 { + logUsage(cmd.Use) + return + } + + pageMetadata := mgxsdk.PageMetadata{ + Offset: Offset, + Limit: Limit, + } + + events, err := sdk.Events(pageMetadata, args[0], args[1], args[2]) + if err != nil { + logError(err) + return + } + + logJSON(events) + }, +} + +// NewEventsCmd returns invitations command. +func NewEventsCmd() *cobra.Command { + cmd := cobra.Command{ + Use: "events get", + Short: "events logs", + Long: `events to read event history`, + } + + cmd.AddCommand(&cmdEvents) + + return &cmd +} diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 8edf9129884..4ff194ff664 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -21,6 +21,7 @@ const ( defDomainsURL string = defURL + ":8189" defCertsURL string = defURL + ":9019" defInvitationsURL string = defURL + ":9020" + defEventsURL string = defURL + ":9021" ) func main() { @@ -34,6 +35,7 @@ func main() { CertsURL: defCertsURL, DomainsURL: defDomainsURL, InvitationsURL: defInvitationsURL, + EventsURL: defEventsURL, MsgContentType: sdk.ContentType(msgContentType), TLSVerification: false, HostURL: defURL, @@ -68,6 +70,7 @@ func main() { subscriptionsCmd := cli.NewSubscriptionCmd() configCmd := cli.NewConfigCmd() invitationsCmd := cli.NewInvitationsCmd() + eventsCmd := cli.NewEventsCmd() // Root Commands rootCmd.AddCommand(healthCmd) @@ -83,6 +86,7 @@ func main() { rootCmd.AddCommand(subscriptionsCmd) rootCmd.AddCommand(configCmd) rootCmd.AddCommand(invitationsCmd) + rootCmd.AddCommand(eventsCmd) // Root Flags rootCmd.PersistentFlags().StringVarP( @@ -149,6 +153,14 @@ func main() { "Inivitations URL", ) + rootCmd.PersistentFlags().StringVarP( + &sdkConf.EventsURL, + "events-url", + "e", + sdkConf.EventsURL, + "Events URL", + ) + rootCmd.PersistentFlags().StringVarP( &sdkConf.HostURL, "host-url", diff --git a/cmd/event-logs/main.go b/cmd/event-logs/main.go new file mode 100644 index 00000000000..28e575ccd4f --- /dev/null +++ b/cmd/event-logs/main.go @@ -0,0 +1,172 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package main contains event-logs main function to start the event-logs service. +package main + +import ( + "context" + "fmt" + "log" + "net/url" + "os" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/eventlogs/api" + "github.com/absmach/magistrala/eventlogs/middleware" + eventlogspg "github.com/absmach/magistrala/eventlogs/postgres" + "github.com/absmach/magistrala/internal" + jaegerclient "github.com/absmach/magistrala/internal/clients/jaeger" + pgclient "github.com/absmach/magistrala/internal/clients/postgres" + "github.com/absmach/magistrala/internal/server" + httpserver "github.com/absmach/magistrala/internal/server/http" + mglog "github.com/absmach/magistrala/logger" + "github.com/absmach/magistrala/pkg/auth" + "github.com/absmach/magistrala/pkg/events/store" + "github.com/absmach/magistrala/pkg/uuid" + "github.com/caarlos0/env/v10" + chclient "github.com/mainflux/callhome/pkg/client" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +const ( + svcName = "event-logs" + envPrefixDB = "MG_EVENT_LOGS_" + envPrefixHTTP = "MG_EVENT_LOGS_HTTP_" + envPrefixAuth = "MG_AUTH_GRPC_" + defDB = "events" + defSvcHTTPPort = "9021" +) + +type config struct { + LogLevel string `env:"MG_EVENT_LOGS_LOG_LEVEL" envDefault:"info"` + ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"` + JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` + SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"` + InstanceID string `env:"MG_EVENT_LOGS_INSTANCE_ID" envDefault:""` + TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"` +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load %s configuration : %s", svcName, err) + } + + logger, err := mglog.New(os.Stdout, cfg.LogLevel) + if err != nil { + log.Fatalf("failed to init logger: %s", err) + } + + var exitCode int + defer mglog.ExitWithError(&exitCode) + + if cfg.InstanceID == "" { + if cfg.InstanceID, err = uuid.New().ID(); err != nil { + logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err)) + exitCode = 1 + return + } + } + + dbConfig := pgclient.Config{Name: defDB} + if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s Postgres configuration : %s", svcName, err)) + exitCode = 1 + return + } + db, err := pgclient.Setup(dbConfig, *eventlogspg.Migration()) + if err != nil { + logger.Fatal(err.Error()) + } + defer db.Close() + repo := eventlogspg.NewRepository(db) + + authConfig := auth.Config{} + if err := env.ParseWithOptions(&authConfig, env.Options{Prefix: envPrefixAuth}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err)) + exitCode = 1 + return + } + + ac, acHandler, err := auth.Setup(authConfig) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer acHandler.Close() + + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio) + if err != nil { + logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err)) + exitCode = 1 + return + } + defer func() { + if err := tp.Shutdown(ctx); err != nil { + logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err)) + } + }() + tracer := tp.Tracer(svcName) + + svc := newService(repo, ac, logger, tracer) + + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to create subscriber: %s", err)) + exitCode = 1 + return + } + + logger.Info("Subscribed to Event Store") + + if err := eventlogs.Start(ctx, svcName, subscriber, repo); err != nil { + logger.Error(fmt.Sprintf("failed to start %s service: %s", svcName, err)) + exitCode = 1 + return + } + + httpServerConfig := server.Config{Port: defSvcHTTPPort} + if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + exitCode = 1 + return + } + + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, svcName, cfg.InstanceID), logger) + + if cfg.SendTelemetry { + chc := chclient.New(svcName, magistrala.Version, logger, cancel) + go chc.CallHome(ctx) + } + + g.Go(func() error { + return hs.Start() + }) + + g.Go(func() error { + return server.StopSignalHandler(ctx, cancel, logger, svcName, hs) + }) + + if err := g.Wait(); err != nil { + logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err)) + } +} + +func newService(repo eventlogs.Repository, authClient magistrala.AuthServiceClient, logger mglog.Logger, tracer trace.Tracer) eventlogs.Service { + svc := eventlogs.NewService(repo, authClient) + svc = middleware.LoggingMiddleware(svc, logger) + counter, latency := internal.MakeMetrics("eventlogs", "event_writer") + svc = middleware.MetricsMiddleware(svc, counter, latency) + svc = middleware.Tracing(svc, tracer) + + return svc +} diff --git a/docker/.env b/docker/.env index 81761fd3b95..b5691186155 100644 --- a/docker/.env +++ b/docker/.env @@ -121,7 +121,6 @@ MG_SPICEDB_HOST=magistrala-spicedb MG_SPICEDB_PORT=50051 MG_SPICEDB_DATASTORE_ENGINE=postgres - ### Invitations MG_INVITATIONS_LOG_LEVEL=info MG_INVITATIONS_HTTP_HOST=invitations @@ -563,6 +562,23 @@ MG_SMPP_SRC_ADDR_NPI=0 MG_SMPP_DST_ADDR_NPI=1 MG_SMPP_NOTIFIER_INSTANCE_ID= +### Event Logs +MG_EVENT_LOGS_LOG_LEVEL=info +MG_EVENT_LOGS_HTTP_HOST=event-logs +MG_EVENT_LOGS_HTTP_PORT=9021 +MG_EVENT_LOGS_HTTP_SERVER_CERT= +MG_EVENT_LOGS_HTTP_SERVER_KEY= +MG_EVENT_LOGS_HOST=magistrala-event-logs-db +MG_EVENT_LOGS_PORT=5432 +MG_EVENT_LOGS_USER=magistrala +MG_EVENT_LOGS_PASS=magistrala +MG_EVENT_LOGS_NAME=events +MG_EVENT_LOGS_SSL_MODE=disable +MG_EVENT_LOGS_SSL_CERT= +MG_EVENT_LOGS_SSL_KEY= +MG_EVENT_LOGS_SSL_ROOT_CERT= +MG_EVENT_LOGS_INSTANCE_ID= + ### GRAFANA and PROMETHEUS MG_PROMETHEUS_PORT=9090 MG_GRAFANA_PORT=3000 diff --git a/docker/addons/event-logs/docker-compose.yml b/docker/addons/event-logs/docker-compose.yml new file mode 100644 index 00000000000..e253efa9181 --- /dev/null +++ b/docker/addons/event-logs/docker-compose.yml @@ -0,0 +1,69 @@ +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional Postgres and event-logs services +# for Magistrala platform. Since these are optional, this file is dependent of docker-compose file +# from /docker. In order to run these services, execute command: +# docker-compose -f docker/docker-compose.yml -f docker/addons/event-logs/docker-compose.yml up +# from project root. PostgreSQL default port (5432) is exposed, so you can use various tools for database +# inspection and data visualization. + +version: "3.7" + +networks: + magistrala-base-net: + +volumes: + magistrala-event-logs-volume: + +services: + event-logs-db: + image: postgres:15.3-alpine + container_name: magistrala-event-logs-db + restart: on-failure + command: postgres -c "max_connections=${MG_POSTGRES_MAX_CONNECTIONS}" + environment: + POSTGRES_USER: ${MG_EVENT_LOGS_USER} + POSTGRES_PASSWORD: ${MG_EVENT_LOGS_PASS} + POSTGRES_DB: ${MG_EVENT_LOGS_NAME} + MG_POSTGRES_MAX_CONNECTIONS: ${MG_POSTGRES_MAX_CONNECTIONS} + networks: + - magistrala-base-net + volumes: + - magistrala-event-logs-volume:/var/lib/postgresql/data + + event-logs: + image: magistrala/event-logs:${MG_RELEASE_TAG} + container_name: magistrala-event-logs + depends_on: + - event-logs-db + restart: on-failure + environment: + MG_EVENT_LOGS_LOG_LEVEL: ${MG_EVENT_LOGS_LOG_LEVEL} + MG_EVENT_LOGS_HTTP_HOST: ${MG_EVENT_LOGS_HTTP_HOST} + MG_EVENT_LOGS_HTTP_PORT: ${MG_EVENT_LOGS_HTTP_PORT} + MG_EVENT_LOGS_HTTP_SERVER_CERT: ${MG_EVENT_LOGS_HTTP_SERVER_CERT} + MG_EVENT_LOGS_HTTP_SERVER_KEY: ${MG_EVENT_LOGS_HTTP_SERVER_KEY} + MG_EVENT_LOGS_HOST: ${MG_EVENT_LOGS_HOST} + MG_EVENT_LOGS_PORT: ${MG_EVENT_LOGS_PORT} + MG_EVENT_LOGS_USER: ${MG_EVENT_LOGS_USER} + MG_EVENT_LOGS_PASS: ${MG_EVENT_LOGS_PASS} + MG_EVENT_LOGS_NAME: ${MG_EVENT_LOGS_NAME} + MG_EVENT_LOGS_SSL_MODE: ${MG_EVENT_LOGS_SSL_MODE} + MG_EVENT_LOGS_SSL_CERT: ${MG_EVENT_LOGS_SSL_CERT} + MG_EVENT_LOGS_SSL_KEY: ${MG_EVENT_LOGS_SSL_KEY} + MG_EVENT_LOGS_SSL_ROOT_CERT: ${MG_EVENT_LOGS_SSL_ROOT_CERT} + MG_AUTH_GRPC_URL: ${MG_AUTH_GRPC_URL} + MG_AUTH_GRPC_TIMEOUT: ${MG_AUTH_GRPC_TIMEOUT} + MG_AUTH_GRPC_CLIENT_CERT: ${MG_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt} + MG_AUTH_GRPC_CLIENT_KEY: ${MG_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key} + MG_AUTH_GRPC_SERVER_CA_CERTS: ${MG_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt} + MG_ES_URL: ${MG_ES_URL} + MG_JAEGER_URL: ${MG_JAEGER_URL} + MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO} + MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY} + MG_EVENT_LOGS_INSTANCE_ID: ${MG_EVENT_LOGS_INSTANCE_ID} + ports: + - ${MG_EVENT_LOGS_HTTP_PORT}:${MG_EVENT_LOGS_HTTP_PORT} + networks: + - magistrala-base-net diff --git a/eventlogs/api/doc.go b/eventlogs/api/doc.go new file mode 100644 index 00000000000..2424852cc46 --- /dev/null +++ b/eventlogs/api/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package api contains API-related concerns: endpoint definitions, middlewares +// and all resource representations. +package api diff --git a/eventlogs/api/endpoint.go b/eventlogs/api/endpoint.go new file mode 100644 index 00000000000..c592147e11b --- /dev/null +++ b/eventlogs/api/endpoint.go @@ -0,0 +1,31 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/internal/apiutil" + "github.com/absmach/magistrala/pkg/errors" + "github.com/go-kit/kit/endpoint" +) + +func listEventsEndpoint(svc eventlogs.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(listEventsReq) + if err := req.validate(); err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + + page, err := svc.ReadAll(ctx, req.token, req.page) + if err != nil { + return nil, err + } + + return pageRes{ + EventsPage: page, + }, nil + } +} diff --git a/eventlogs/api/endpoint_test.go b/eventlogs/api/endpoint_test.go new file mode 100644 index 00000000000..ade1d5506da --- /dev/null +++ b/eventlogs/api/endpoint_test.go @@ -0,0 +1,202 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api_test + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/absmach/magistrala/auth" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/eventlogs/api" + "github.com/absmach/magistrala/eventlogs/mocks" + "github.com/absmach/magistrala/internal/apiutil" + "github.com/absmach/magistrala/internal/testsutil" + mglog "github.com/absmach/magistrala/logger" + svcerr "github.com/absmach/magistrala/pkg/errors/service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var ( + validToken = "valid" + validID = testsutil.GenerateUUID(&testing.T{}) +) + +type testRequest struct { + client *http.Client + method string + url string + token string + body io.Reader +} + +func (tr testRequest) make() (*http.Response, error) { + req, err := http.NewRequest(tr.method, tr.url, tr.body) + if err != nil { + return nil, err + } + + if tr.token != "" { + req.Header.Set("Authorization", apiutil.BearerPrefix+tr.token) + } + + return tr.client.Do(req) +} + +func newEventsServer() (*httptest.Server, *mocks.Service) { + svc := new(mocks.Service) + + logger := mglog.NewMock() + mux := api.MakeHandler(svc, logger, "event-logs", "test") + return httptest.NewServer(mux), svc +} + +func TestListEventsEndpoint(t *testing.T) { + es, svc := newEventsServer() + + cases := []struct { + desc string + token string + url string + contentType string + status int + svcErr error + }{ + { + desc: "successful", + token: validToken, + url: validID + "/" + auth.UserType, + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "empty token", + token: "", + url: validID + "/" + auth.UserType, + + status: http.StatusUnauthorized, + svcErr: nil, + }, + { + desc: "with service error", + token: validToken, + url: validID + "/" + auth.UserType, + status: http.StatusForbidden, + svcErr: svcerr.ErrAuthorization, + }, + { + desc: "with offset", + token: validToken, + url: validID + "/" + auth.UserType + "?offset=10", + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "with invalid offset", + token: validToken, + url: validID + "/" + auth.UserType + "?offset=ten", + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with limit", + token: validToken, + url: validID + "/" + auth.UserType + "?limit=10", + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "with invalid limit", + token: validToken, + url: validID + "/" + auth.UserType + "?limit=ten", + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with operation", + token: validToken, + url: validID + "/" + auth.UserType + "?operation=user.create", + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "with invalid operation", + token: validToken, + url: validID + "/" + auth.UserType + "?operation=user.create&operation=user.update", + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with from", + token: validToken, + url: validID + "/" + auth.UserType + fmt.Sprintf("?to=%d", time.Now().UnixNano()), + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "with invalid from", + token: validToken, + url: validID + "/" + auth.UserType + "?from=ten", + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with to", + token: validToken, + url: validID + "/" + auth.UserType + fmt.Sprintf("?to=%d", time.Now().UnixNano()), + status: http.StatusOK, + svcErr: nil, + }, + { + desc: "with invalid to", + token: validToken, + url: validID + "/" + auth.UserType + "?to=ten", + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with empty id", + token: validToken, + url: "/" + auth.UserType, + status: http.StatusBadRequest, + svcErr: nil, + }, + { + desc: "with empty id type", + token: validToken, + url: validID + "/", + status: http.StatusNotFound, + svcErr: nil, + }, + { + desc: "with invalid id type", + token: validToken, + url: validID + "/invalid", + status: http.StatusBadRequest, + svcErr: nil, + }, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + repoCall := svc.On("ReadAll", mock.Anything, c.token, mock.Anything).Return(eventlogs.EventsPage{}, c.svcErr) + req := testRequest{ + client: es.Client(), + method: http.MethodGet, + url: es.URL + "/events/" + c.url, + token: c.token, + } + resp, err := req.make() + assert.Nil(t, err, c.desc) + defer resp.Body.Close() + assert.Equal(t, c.status, resp.StatusCode, c.desc) + repoCall.Unset() + }) + } +} diff --git a/eventlogs/api/requests.go b/eventlogs/api/requests.go new file mode 100644 index 00000000000..5c4430917e8 --- /dev/null +++ b/eventlogs/api/requests.go @@ -0,0 +1,37 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "github.com/absmach/magistrala/auth" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/internal/apiutil" +) + +const maxLimitSize = 1000 + +type listEventsReq struct { + token string + page eventlogs.Page +} + +func (req listEventsReq) validate() error { + if req.token == "" { + return apiutil.ErrBearerToken + } + if req.page.ID == "" { + return apiutil.ErrMissingID + } + if req.page.EntityType == "" { + return apiutil.ErrMissingEntityType + } + if req.page.EntityType != auth.UserType && req.page.EntityType != auth.GroupType && req.page.EntityType != auth.ThingType && req.page.EntityType != auth.DomainType && req.page.EntityType != auth.PlatformType { + return apiutil.ErrInvalidEntityType + } + if req.page.Limit > maxLimitSize { + return apiutil.ErrLimitSize + } + + return nil +} diff --git a/eventlogs/api/requests_test.go b/eventlogs/api/requests_test.go new file mode 100644 index 00000000000..6474c7642cc --- /dev/null +++ b/eventlogs/api/requests_test.go @@ -0,0 +1,104 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "testing" + + "github.com/absmach/magistrala/auth" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/internal/apiutil" + "github.com/absmach/magistrala/internal/testsutil" + "github.com/stretchr/testify/assert" +) + +var ( + token = "token" + limit uint64 = 10 +) + +func TestListEventsReqValidate(t *testing.T) { + cases := []struct { + desc string + req listEventsReq + err error + }{ + { + desc: "valid", + req: listEventsReq{ + token: token, + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + EntityType: auth.UserType, + Limit: limit, + }, + }, + err: nil, + }, + { + desc: "missing token", + req: listEventsReq{ + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + EntityType: auth.UserType, + Limit: limit, + }, + }, + err: apiutil.ErrBearerToken, + }, + { + desc: "missing id", + req: listEventsReq{ + token: token, + page: eventlogs.Page{ + EntityType: auth.UserType, + Limit: limit, + }, + }, + err: apiutil.ErrMissingID, + }, + { + desc: "missing entity type", + req: listEventsReq{ + token: token, + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + Limit: limit, + }, + }, + err: apiutil.ErrMissingEntityType, + }, + { + desc: "invalid entity type", + req: listEventsReq{ + token: token, + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + EntityType: "invalid", + Limit: limit, + }, + }, + err: apiutil.ErrInvalidEntityType, + }, + { + desc: "invalid limit size", + req: listEventsReq{ + token: token, + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + EntityType: auth.UserType, + Limit: maxLimitSize + 1, + }, + }, + err: apiutil.ErrLimitSize, + }, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + err := c.req.validate() + assert.Equal(t, c.err, err) + }) + } +} diff --git a/eventlogs/api/responses.go b/eventlogs/api/responses.go new file mode 100644 index 00000000000..1550b16fb24 --- /dev/null +++ b/eventlogs/api/responses.go @@ -0,0 +1,29 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "net/http" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/eventlogs" +) + +var _ magistrala.Response = (*pageRes)(nil) + +type pageRes struct { + eventlogs.EventsPage `json:",inline"` +} + +func (res pageRes) Headers() map[string]string { + return map[string]string{} +} + +func (res pageRes) Code() int { + return http.StatusOK +} + +func (res pageRes) Empty() bool { + return false +} diff --git a/eventlogs/api/transport.go b/eventlogs/api/transport.go new file mode 100644 index 00000000000..b7adf824890 --- /dev/null +++ b/eventlogs/api/transport.go @@ -0,0 +1,92 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + "net/http" + "time" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/internal/api" + "github.com/absmach/magistrala/internal/apiutil" + mglog "github.com/absmach/magistrala/logger" + "github.com/absmach/magistrala/pkg/errors" + "github.com/go-chi/chi/v5" + kithttp "github.com/go-kit/kit/transport/http" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const ( + operationKey = "operation" + fromKey = "from" + toKey = "to" +) + +// MakeHandler returns a HTTP API handler with health check and metrics. +func MakeHandler(svc eventlogs.Service, logger mglog.Logger, svcName, instanceID string) http.Handler { + opts := []kithttp.ServerOption{ + kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)), + } + + mux := chi.NewRouter() + + mux.Get("/events/{id}/{type}", otelhttp.NewHandler(kithttp.NewServer( + listEventsEndpoint(svc), + decodeListEventsReq, + api.EncodeResponse, + opts..., + ), "list_events").ServeHTTP) + + mux.Get("/health", magistrala.Health(svcName, instanceID)) + mux.Handle("/metrics", promhttp.Handler()) + + return mux +} + +func decodeListEventsReq(_ context.Context, r *http.Request) (interface{}, error) { + offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + limit, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + operation, err := apiutil.ReadStringQuery(r, operationKey, "") + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + from, err := apiutil.ReadNumQuery[int64](r, fromKey, 0) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + if from == 0 { + from = time.Now().Add(-24 * time.Hour).UnixNano() + } + to, err := apiutil.ReadNumQuery[int64](r, toKey, 0) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + if to == 0 { + to = time.Now().UnixNano() + } + + req := listEventsReq{ + token: apiutil.ExtractBearerToken(r), + page: eventlogs.Page{ + Offset: offset, + Limit: limit, + ID: chi.URLParam(r, "id"), + EntityType: chi.URLParam(r, "type"), + Operation: operation, + From: time.Unix(0, from), + To: time.Unix(0, to), + }, + } + + return req, nil +} diff --git a/eventlogs/consumer.go b/eventlogs/consumer.go new file mode 100644 index 00000000000..6c29e8b404b --- /dev/null +++ b/eventlogs/consumer.go @@ -0,0 +1,69 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs + +import ( + "context" + "time" + + "github.com/absmach/magistrala/pkg/events" + "github.com/absmach/magistrala/pkg/events/store" +) + +// Start method starts consuming messages received from Event store. +func Start(ctx context.Context, consumer string, sub events.Subscriber, repository Repository) error { + subCfg := events.SubscriberConfig{ + Consumer: consumer, + Stream: store.StreamAllEvents, + Handler: Handle(ctx, repository), + } + + return sub.Subscribe(ctx, subCfg) +} + +func Handle(ctx context.Context, repository Repository) handleFunc { + return func(ctx context.Context, event events.Event) error { + data, err := event.Encode() + if err != nil { + return err + } + + id, ok := data["id"].(string) + if !ok { + return nil + } + delete(data, "id") + + operation, ok := data["operation"].(string) + if !ok { + return nil + } + delete(data, "operation") + + occurredAt, ok := data["occurred_at"].(float64) + if !ok { + return nil + } + delete(data, "occurred_at") + + dbEvent := Event{ + ID: id, + Operation: operation, + OccurredAt: time.Unix(0, int64(occurredAt)), + Payload: data, + } + + return repository.Save(ctx, dbEvent) + } +} + +type handleFunc func(ctx context.Context, event events.Event) error + +func (h handleFunc) Handle(ctx context.Context, event events.Event) error { + return h(ctx, event) +} + +func (h handleFunc) Cancel() error { + return nil +} diff --git a/eventlogs/consumer_test.go b/eventlogs/consumer_test.go new file mode 100644 index 00000000000..86b61199b4e --- /dev/null +++ b/eventlogs/consumer_test.go @@ -0,0 +1,134 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs_test + +import ( + "context" + "encoding/json" + "math/rand" + "strings" + "testing" + "time" + + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/eventlogs/mocks" + "github.com/absmach/magistrala/internal/testsutil" + "github.com/stretchr/testify/assert" +) + +var ( + operation = "users.create" + payload = map[string]interface{}{ + "temperature": rand.Float64(), + "humidity": rand.Float64(), + "sensor_id": rand.Intn(1000), + "locations": []string{ + strings.Repeat("a", 1024), + strings.Repeat("a", 1024), + strings.Repeat("a", 1024), + }, + "status": rand.Intn(1000), + "timestamp": time.Now().UnixNano(), + } +) + +type testEvent struct { + data map[string]interface{} +} + +func (e testEvent) Encode() (map[string]interface{}, error) { + return e.data, nil +} + +func TestHandle(t *testing.T) { + repo := new(mocks.Repository) + cases := []struct { + desc string + event testEvent + err error + }{ + { + desc: "success", + event: testEvent{ + data: map[string]interface{}{ + "id": testsutil.GenerateUUID(t), + "operation": operation, + "occurred_at": time.Now().UnixNano(), + "payload": payload, + }, + }, + err: nil, + }, + { + desc: "with missing id", + event: testEvent{ + data: map[string]interface{}{ + "id": "", + "operation": operation, + "occurred_at": time.Now().UnixNano(), + "payload": payload, + }, + }, + err: nil, + }, + { + desc: "with missing operation", + event: testEvent{ + data: map[string]interface{}{ + "id": testsutil.GenerateUUID(t), + "operation": "", + "occurred_at": time.Now().UnixNano(), + "payload": payload, + }, + }, + err: nil, + }, + { + desc: "with missing occurred_at", + event: testEvent{ + data: map[string]interface{}{ + "id": testsutil.GenerateUUID(t), + "operation": operation, + "occurred_at": 0, + "payload": payload, + }, + }, + err: nil, + }, + { + desc: "with missing payload", + event: testEvent{ + data: map[string]interface{}{ + "id": testsutil.GenerateUUID(t), + "operation": operation, + "occurred_at": time.Now().UnixNano(), + "payload": map[string]interface{}{}, + }, + }, + err: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + data, err := json.Marshal(tc.event.data) + assert.NoError(t, err) + + var event map[string]interface{} + err = json.Unmarshal(data, &event) + assert.NoError(t, err) + + dbEvent := eventlogs.Event{ + ID: event["id"].(string), + Operation: event["operation"].(string), + OccurredAt: time.Unix(0, int64(event["occurred_at"].(float64))), + Payload: event["payload"].(map[string]interface{}), + } + repoCall := repo.On("Save", context.Background(), dbEvent) + err = eventlogs.Handle(context.Background(), repo)(context.Background(), tc.event) + assert.Equal(t, tc.err, err) + repoCall.Unset() + }) + } +} diff --git a/eventlogs/events.go b/eventlogs/events.go new file mode 100644 index 00000000000..14b96ff6e21 --- /dev/null +++ b/eventlogs/events.go @@ -0,0 +1,71 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs + +import ( + "context" + "encoding/json" + "time" +) + +// Event represents an event. +type Event struct { + ID string `json:"id,omitempty" db:"id,omitempty"` + Operation string `json:"operation,omitempty" db:"operation,omitempty"` + OccurredAt time.Time `json:"occurred_at,omitempty" db:"occurred_at,omitempty"` + Payload map[string]interface{} `json:"payload,omitempty" db:"payload,omitempty"` +} + +// EventsPage represents a page of events. +type EventsPage struct { + Total uint64 `json:"total"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` + Events []Event `json:"events"` +} + +// Page is used to filter events. +type Page struct { + Offset uint64 `json:"offset" db:"offset"` + Limit uint64 `json:"limit" db:"limit"` + ID string `json:"id,omitempty" db:"id,omitempty"` + EntityType string `json:"entity_type,omitempty"` + Operation string `json:"operation,omitempty" db:"operation,omitempty"` + From time.Time `json:"from,omitempty" db:"from,omitempty"` + To time.Time `json:"to,omitempty" db:"to,omitempty"` +} + +func (page EventsPage) MarshalJSON() ([]byte, error) { + type Alias EventsPage + a := struct { + Alias + }{ + Alias: Alias(page), + } + + if a.Events == nil { + a.Events = make([]Event, 0) + } + + return json.Marshal(a) +} + +// Service provides access to the event log service. +// +//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines" +type Service interface { + // ReadAll retrieves all events from the database with the given page. + ReadAll(ctx context.Context, token string, page Page) (EventsPage, error) +} + +// Repository provides access to the event log database. +// +//go:generate mockery --name Repository --output=./mocks --filename repository.go --quiet --note "Copyright (c) Abstract Machines" +type Repository interface { + // Save persists the event to a database. + Save(ctx context.Context, event Event) error + + // RetrieveAll retrieves all events from the database with the given page. + RetrieveAll(ctx context.Context, page Page) (EventsPage, error) +} diff --git a/eventlogs/events_test.go b/eventlogs/events_test.go new file mode 100644 index 00000000000..74cc2b78bba --- /dev/null +++ b/eventlogs/events_test.go @@ -0,0 +1,56 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs_test + +import ( + "fmt" + "testing" + "time" + + "github.com/absmach/magistrala/eventlogs" + "github.com/stretchr/testify/assert" +) + +func TestEventsPage_MarshalJSON(t *testing.T) { + occurredAt := time.Now() + + cases := []struct { + desc string + page eventlogs.EventsPage + res string + }{ + { + desc: "empty page", + page: eventlogs.EventsPage{ + Events: []eventlogs.Event(nil), + }, + res: `{"total":0,"offset":0,"limit":0,"events":[]}`, + }, + { + desc: "page with events", + page: eventlogs.EventsPage{ + Total: 1, + Offset: 0, + Limit: 0, + Events: []eventlogs.Event{ + { + ID: "123", + Operation: "123", + OccurredAt: occurredAt, + Payload: map[string]interface{}{"123": "123"}, + }, + }, + }, + res: fmt.Sprintf(`{"total":1,"offset":0,"limit":0,"events":[{"id":"123","operation":"123","occurred_at":"%s","payload":{"123":"123"}}]}`, occurredAt.Format(time.RFC3339Nano)), + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + data, err := tc.page.MarshalJSON() + assert.NoError(t, err, "Unexpected error: %v", err) + assert.Equal(t, tc.res, string(data)) + }) + } +} diff --git a/eventlogs/middleware/logging.go b/eventlogs/middleware/logging.go new file mode 100644 index 00000000000..81238b84a48 --- /dev/null +++ b/eventlogs/middleware/logging.go @@ -0,0 +1,41 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + "fmt" + "time" + + "github.com/absmach/magistrala/eventlogs" + mglog "github.com/absmach/magistrala/logger" +) + +var _ eventlogs.Service = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger mglog.Logger + service eventlogs.Service +} + +// LoggingMiddleware adds logging facilities to the adapter. +func LoggingMiddleware(service eventlogs.Service, logger mglog.Logger) eventlogs.Service { + return &loggingMiddleware{ + logger: logger, + service: service, + } +} + +func (lm *loggingMiddleware) ReadAll(ctx context.Context, token string, page eventlogs.Page) (eventsPage eventlogs.EventsPage, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method read_all for operation %s with query %v took %s to complete", page.Operation, page, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.service.ReadAll(ctx, token, page) +} diff --git a/eventlogs/middleware/metrics.go b/eventlogs/middleware/metrics.go new file mode 100644 index 00000000000..5ebb47f98c8 --- /dev/null +++ b/eventlogs/middleware/metrics.go @@ -0,0 +1,39 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + "time" + + "github.com/absmach/magistrala/eventlogs" + "github.com/go-kit/kit/metrics" +) + +var _ eventlogs.Service = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + service eventlogs.Service +} + +// MetricsMiddleware returns new message repository +// with Save method wrapped to expose metrics. +func MetricsMiddleware(service eventlogs.Service, counter metrics.Counter, latency metrics.Histogram) eventlogs.Service { + return &metricsMiddleware{ + counter: counter, + latency: latency, + service: service, + } +} + +func (mm *metricsMiddleware) ReadAll(ctx context.Context, token string, page eventlogs.Page) (eventlogs.EventsPage, error) { + defer func(begin time.Time) { + mm.counter.With("method", "read_all").Add(1) + mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.service.ReadAll(ctx, token, page) +} diff --git a/eventlogs/middleware/tracing.go b/eventlogs/middleware/tracing.go new file mode 100644 index 00000000000..dc97740d6ef --- /dev/null +++ b/eventlogs/middleware/tracing.go @@ -0,0 +1,33 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package middleware + +import ( + "context" + + "github.com/absmach/magistrala/eventlogs" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var _ eventlogs.Service = (*tracing)(nil) + +type tracing struct { + tracer trace.Tracer + svc eventlogs.Service +} + +func Tracing(svc eventlogs.Service, tracer trace.Tracer) eventlogs.Service { + return &tracing{tracer, svc} +} + +func (tm *tracing) ReadAll(ctx context.Context, token string, page eventlogs.Page) (eventlogs.EventsPage, error) { + ctx, span := tm.tracer.Start(ctx, "read_all", trace.WithAttributes( + attribute.String("id", page.ID), + attribute.String("entity_type", page.EntityType), + )) + defer span.End() + + return tm.svc.ReadAll(ctx, token, page) +} diff --git a/eventlogs/mocks/doc.go b/eventlogs/mocks/doc.go new file mode 100644 index 00000000000..16ed198afd4 --- /dev/null +++ b/eventlogs/mocks/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package mocks contains mocks for testing purposes. +package mocks diff --git a/eventlogs/mocks/repository.go b/eventlogs/mocks/repository.go new file mode 100644 index 00000000000..60905cd6595 --- /dev/null +++ b/eventlogs/mocks/repository.go @@ -0,0 +1,77 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + context "context" + + eventlogs "github.com/absmach/magistrala/eventlogs" + mock "github.com/stretchr/testify/mock" +) + +// Repository is an autogenerated mock type for the Repository type +type Repository struct { + mock.Mock +} + +// RetrieveAll provides a mock function with given fields: ctx, page +func (_m *Repository) RetrieveAll(ctx context.Context, page eventlogs.Page) (eventlogs.EventsPage, error) { + ret := _m.Called(ctx, page) + + if len(ret) == 0 { + panic("no return value specified for RetrieveAll") + } + + var r0 eventlogs.EventsPage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, eventlogs.Page) (eventlogs.EventsPage, error)); ok { + return rf(ctx, page) + } + if rf, ok := ret.Get(0).(func(context.Context, eventlogs.Page) eventlogs.EventsPage); ok { + r0 = rf(ctx, page) + } else { + r0 = ret.Get(0).(eventlogs.EventsPage) + } + + if rf, ok := ret.Get(1).(func(context.Context, eventlogs.Page) error); ok { + r1 = rf(ctx, page) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Save provides a mock function with given fields: ctx, event +func (_m *Repository) Save(ctx context.Context, event eventlogs.Event) error { + ret := _m.Called(ctx, event) + + if len(ret) == 0 { + panic("no return value specified for Save") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, eventlogs.Event) error); ok { + r0 = rf(ctx, event) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewRepository creates a new instance of Repository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *Repository { + mock := &Repository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/eventlogs/mocks/service.go b/eventlogs/mocks/service.go new file mode 100644 index 00000000000..b68da351270 --- /dev/null +++ b/eventlogs/mocks/service.go @@ -0,0 +1,59 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + context "context" + + eventlogs "github.com/absmach/magistrala/eventlogs" + mock "github.com/stretchr/testify/mock" +) + +// Service is an autogenerated mock type for the Service type +type Service struct { + mock.Mock +} + +// ReadAll provides a mock function with given fields: ctx, token, page +func (_m *Service) ReadAll(ctx context.Context, token string, page eventlogs.Page) (eventlogs.EventsPage, error) { + ret := _m.Called(ctx, token, page) + + if len(ret) == 0 { + panic("no return value specified for ReadAll") + } + + var r0 eventlogs.EventsPage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, eventlogs.Page) (eventlogs.EventsPage, error)); ok { + return rf(ctx, token, page) + } + if rf, ok := ret.Get(0).(func(context.Context, string, eventlogs.Page) eventlogs.EventsPage); ok { + r0 = rf(ctx, token, page) + } else { + r0 = ret.Get(0).(eventlogs.EventsPage) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, eventlogs.Page) error); ok { + r1 = rf(ctx, token, page) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewService creates a new instance of Service. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewService(t interface { + mock.TestingT + Cleanup(func()) +}) *Service { + mock := &Service{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/eventlogs/postgres/doc.go b/eventlogs/postgres/doc.go new file mode 100644 index 00000000000..0125aa82bd7 --- /dev/null +++ b/eventlogs/postgres/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package postgres provides a postgres implementation of the eventlog repository. +package postgres diff --git a/eventlogs/postgres/events.go b/eventlogs/postgres/events.go new file mode 100644 index 00000000000..3df02535601 --- /dev/null +++ b/eventlogs/postgres/events.go @@ -0,0 +1,146 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/internal/postgres" + "github.com/absmach/magistrala/pkg/errors" + repoerr "github.com/absmach/magistrala/pkg/errors/repository" +) + +type repository struct { + db postgres.Database +} + +func NewRepository(db postgres.Database) eventlogs.Repository { + return &repository{db: db} +} + +func (repo *repository) Save(ctx context.Context, event eventlogs.Event) (err error) { + q := `INSERT INTO events (id, operation, occurred_at, payload) + VALUES (:id, :operation, :occurred_at, :payload)` + + dbEvent, err := toDBEvent(event) + if err != nil { + return errors.Wrap(repoerr.ErrCreateEntity, err) + } + + if _, err = repo.db.NamedExecContext(ctx, q, dbEvent); err != nil { + return postgres.HandleError(repoerr.ErrCreateEntity, err) + } + + return nil +} + +func (repo *repository) RetrieveAll(ctx context.Context, page eventlogs.Page) (eventlogs.EventsPage, error) { + query := pageQuery(page) + + q := fmt.Sprintf("SELECT id, operation, occurred_at, payload FROM events %s ORDER BY occurred_at LIMIT :limit OFFSET :offset;", query) + + rows, err := repo.db.NamedQueryContext(ctx, q, page) + if err != nil { + return eventlogs.EventsPage{}, postgres.HandleError(repoerr.ErrViewEntity, err) + } + defer rows.Close() + + var items []eventlogs.Event + for rows.Next() { + var item dbEvent + if err = rows.StructScan(&item); err != nil { + return eventlogs.EventsPage{}, postgres.HandleError(repoerr.ErrViewEntity, err) + } + event, err := toEvent(item) + if err != nil { + return eventlogs.EventsPage{}, err + } + items = append(items, event) + } + + tq := fmt.Sprintf(`SELECT COUNT(*) FROM events %s`, query) + + total, err := postgres.Total(ctx, repo.db, tq, page) + if err != nil { + return eventlogs.EventsPage{}, postgres.HandleError(repoerr.ErrViewEntity, err) + } + + eventsPage := eventlogs.EventsPage{ + Total: total, + Offset: page.Offset, + Limit: page.Limit, + Events: items, + } + + return eventsPage, nil +} + +func pageQuery(pm eventlogs.Page) string { + var query []string + var emq string + if pm.ID != "" { + query = append(query, "id = :id") + } + if pm.Operation != "" { + query = append(query, "operation = :operation") + } + if !pm.From.IsZero() { + query = append(query, "occurred_at >= :from") + } + if !pm.To.IsZero() { + query = append(query, "occurred_at <= :to") + } + + if len(query) > 0 { + emq = fmt.Sprintf("WHERE %s", strings.Join(query, " AND ")) + } + + return emq +} + +type dbEvent struct { + ID string `db:"id"` + Operation string `db:"operation"` + OccurredAt time.Time `db:"occurred_at"` + Payload []byte `db:"payload"` +} + +func toDBEvent(event eventlogs.Event) (dbEvent, error) { + data := []byte("{}") + if len(event.Payload) > 0 { + b, err := json.Marshal(event.Payload) + if err != nil { + return dbEvent{}, errors.Wrap(repoerr.ErrMalformedEntity, err) + } + data = b + } + + return dbEvent{ + ID: event.ID, + Operation: event.Operation, + OccurredAt: event.OccurredAt, + Payload: data, + }, nil +} + +func toEvent(event dbEvent) (eventlogs.Event, error) { + var payload map[string]interface{} + if event.Payload != nil { + if err := json.Unmarshal(event.Payload, &payload); err != nil { + return eventlogs.Event{}, errors.Wrap(repoerr.ErrMalformedEntity, err) + } + } + + return eventlogs.Event{ + ID: event.ID, + Operation: event.Operation, + OccurredAt: event.OccurredAt, + Payload: payload, + }, nil +} diff --git a/eventlogs/postgres/events_test.go b/eventlogs/postgres/events_test.go new file mode 100644 index 00000000000..902fe361f6d --- /dev/null +++ b/eventlogs/postgres/events_test.go @@ -0,0 +1,498 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres_test + +import ( + "context" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/eventlogs/postgres" + "github.com/absmach/magistrala/internal/testsutil" + "github.com/absmach/magistrala/pkg/errors" + repoerr "github.com/absmach/magistrala/pkg/errors/repository" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + invalidUUID = strings.Repeat("a", 37) + operation = "user.create" + payload = map[string]interface{}{ + "temperature": rand.Float64(), + "humidity": rand.Float64(), + "sensor_id": rand.Intn(1000), + "locations": []string{ + strings.Repeat("a", 1024), + strings.Repeat("a", 1024), + strings.Repeat("a", 1024), + }, + "status": rand.Intn(1000), + "timestamp": time.Now().UnixNano(), + } +) + +func TestEventsSave(t *testing.T) { + t.Cleanup(func() { + _, err := db.Exec("DELETE FROM events") + require.Nil(t, err, fmt.Sprintf("clean events unexpected error: %s", err)) + }) + repo := postgres.NewRepository(database) + + id := testsutil.GenerateUUID(t) + occurredAt := time.Now() + + cases := []struct { + desc string + event eventlogs.Event + err error + }{ + { + desc: "new event successfully", + event: eventlogs.Event{ + ID: id, + Operation: operation, + OccurredAt: occurredAt, + Payload: payload, + }, + err: nil, + }, + { + desc: "with duplicate event", + event: eventlogs.Event{ + ID: id, + Operation: operation, + OccurredAt: occurredAt, + Payload: payload, + }, + err: repoerr.ErrConflict, + }, + { + desc: "with massive event payload", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: operation, + OccurredAt: time.Now(), + Payload: map[string]interface{}{ + "metadata": map[string]interface{}{ + "metadata": map[string]interface{}{ + "metadata": map[string]interface{}{ + "metadata": map[string]interface{}{ + "metadata": map[string]interface{}{ + "data": payload, + }, + "data": payload, + }, + "data": payload, + }, + "data": payload, + }, + "data": payload, + }, + "data": payload, + }, + }, + err: nil, + }, + { + desc: "with invalid event id", + event: eventlogs.Event{ + ID: invalidUUID, + Operation: operation, + OccurredAt: time.Now(), + Payload: payload, + }, + err: repoerr.ErrMalformedEntity, + }, + { + desc: "with nil event id", + event: eventlogs.Event{ + Operation: operation, + OccurredAt: time.Now(), + Payload: payload, + }, + err: repoerr.ErrMalformedEntity, + }, + { + desc: "with empty event id", + event: eventlogs.Event{ + ID: "", + Operation: operation, + OccurredAt: time.Now(), + Payload: payload, + }, + err: repoerr.ErrMalformedEntity, + }, + { + desc: "with nil event operation", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + OccurredAt: time.Now(), + Payload: payload, + }, + err: repoerr.ErrCreateEntity, + }, + { + desc: "with empty event operation", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: "", + OccurredAt: time.Now(), + Payload: payload, + }, + err: repoerr.ErrCreateEntity, + }, + { + desc: "with nil event occurred_at", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: operation, + Payload: payload, + }, + err: repoerr.ErrCreateEntity, + }, + { + desc: "with empty event occurred_at", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: operation, + OccurredAt: time.Time{}, + Payload: payload, + }, + err: repoerr.ErrCreateEntity, + }, + { + desc: "with nil event payload", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: operation, + OccurredAt: time.Now(), + }, + err: nil, + }, + { + desc: "with empty event payload", + event: eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: operation, + OccurredAt: time.Now(), + Payload: map[string]interface{}{}, + }, + err: nil, + }, + { + desc: "with empty event", + event: eventlogs.Event{}, + err: repoerr.ErrMalformedEntity, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + switch err := repo.Save(context.Background(), tc.event); { + case err == nil: + assert.Nil(t, err) + default: + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } + }) + } +} + +func TestEventsRetrieveAll(t *testing.T) { + t.Cleanup(func() { + _, err := db.Exec("DELETE FROM events") + require.Nil(t, err, fmt.Sprintf("clean events unexpected error: %s", err)) + }) + repo := postgres.NewRepository(database) + + num := 200 + + var items []eventlogs.Event + for i := 0; i < num; i++ { + event := eventlogs.Event{ + ID: testsutil.GenerateUUID(t), + Operation: fmt.Sprintf("%s-%d", operation, i), + OccurredAt: time.Now().UTC().Truncate(time.Millisecond), + Payload: payload, + } + err := repo.Save(context.Background(), event) + require.Nil(t, err, fmt.Sprintf("create event unexpected error: %s", err)) + items = append(items, event) + } + + cases := []struct { + desc string + page eventlogs.Page + response eventlogs.EventsPage + err error + }{ + { + desc: "successfully", + page: eventlogs.Page{ + Offset: 0, + Limit: 1, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 1, + Events: items[:1], + }, + err: nil, + }, + { + desc: "with offset and empty limit", + page: eventlogs.Page{ + Offset: 10, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 10, + Limit: 0, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with limit and empty offset", + page: eventlogs.Page{ + Limit: 50, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 50, + Events: items[:50], + }, + }, + { + desc: "with offset and limit", + page: eventlogs.Page{ + Offset: 10, + Limit: 50, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 10, + Limit: 50, + Events: items[10:60], + }, + }, + { + desc: "with offset out of range", + page: eventlogs.Page{ + Offset: 1000, + Limit: 50, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 1000, + Limit: 50, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with offset and limit out of range", + page: eventlogs.Page{ + Offset: 170, + Limit: 50, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 170, + Limit: 50, + Events: items[170:200], + }, + }, + { + desc: "with limit out of range", + page: eventlogs.Page{ + Offset: 0, + Limit: 1000, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 1000, + Events: items, + }, + }, + { + desc: "with empty page", + page: eventlogs.Page{}, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 0, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with id", + page: eventlogs.Page{ + ID: items[0].ID, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 1, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event{items[0]}, + }, + }, + { + desc: "with operation", + page: eventlogs.Page{ + Operation: items[0].Operation, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 1, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event{items[0]}, + }, + }, + { + desc: "with from", + page: eventlogs.Page{ + From: items[0].OccurredAt, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 10, + Events: items[:10], + }, + }, + { + desc: "with to", + page: eventlogs.Page{ + To: items[num-1].OccurredAt, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 10, + Events: items[:10], + }, + }, + { + desc: "with from and to", + page: eventlogs.Page{ + From: items[0].OccurredAt, + To: items[num-1].OccurredAt, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: uint64(num), + Offset: 0, + Limit: 10, + Events: items[:10], + }, + }, + { + desc: "with all filters", + page: eventlogs.Page{ + ID: items[0].ID, + Operation: items[0].Operation, + From: items[0].OccurredAt, + To: items[num-1].OccurredAt, + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 1, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event{items[0]}, + }, + }, + { + desc: "with invalid id", + page: eventlogs.Page{ + ID: testsutil.GenerateUUID(t), + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 0, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with invalid operation", + page: eventlogs.Page{ + Operation: strings.Repeat("a", 37), + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 0, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with invalid from", + page: eventlogs.Page{ + From: time.Now().UTC().Truncate(time.Millisecond), + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 0, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event(nil), + }, + }, + { + desc: "with invalid to", + page: eventlogs.Page{ + To: time.Now().UTC().Truncate(time.Millisecond).Add(-time.Hour), + Offset: 0, + Limit: 10, + }, + response: eventlogs.EventsPage{ + Total: 0, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event(nil), + }, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + page, err := repo.RetrieveAll(context.Background(), tc.page) + assert.Equal(t, tc.response.Total, page.Total) + assert.Equal(t, tc.response.Offset, page.Offset) + assert.Equal(t, tc.response.Limit, page.Limit) + assert.ElementsMatch(t, removePayload(t, page.Events), removePayload(t, tc.response.Events)) + assert.Equal(t, tc.err, err) + }) + } +} + +func removePayload(t *testing.T, events []eventlogs.Event) []eventlogs.Event { + var items []eventlogs.Event + for _, e := range events { + e.Payload = nil + items = append(items, e) + } + return items +} diff --git a/eventlogs/postgres/init.go b/eventlogs/postgres/init.go new file mode 100644 index 00000000000..40268563aa9 --- /dev/null +++ b/eventlogs/postgres/init.go @@ -0,0 +1,32 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access + migrate "github.com/rubenv/sql-migrate" +) + +func Migration() *migrate.MemoryMigrationSource { + return &migrate.MemoryMigrationSource{ + Migrations: []*migrate.Migration{ + { + Id: "events_01", + Up: []string{ + `CREATE TABLE IF NOT EXISTS events ( + id UUID NOT NULL, + operation VARCHAR NOT NULL, + occurred_at TIMESTAMP NOT NULL, + payload JSONB NOT NULL, + UNIQUE (id, operation, occurred_at), + PRIMARY KEY (id, operation, occurred_at) + )`, + }, + Down: []string{ + `DROP TABLE IF EXISTS events`, + }, + }, + }, + } +} diff --git a/eventlogs/postgres/setup_test.go b/eventlogs/postgres/setup_test.go new file mode 100644 index 00000000000..ff6ee15008b --- /dev/null +++ b/eventlogs/postgres/setup_test.go @@ -0,0 +1,97 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres_test + +import ( + "database/sql" + "fmt" + "log" + "os" + "testing" + "time" + + epostgres "github.com/absmach/magistrala/eventlogs/postgres" + cpostgres "github.com/absmach/magistrala/internal/clients/postgres" + "github.com/absmach/magistrala/internal/postgres" + "github.com/jmoiron/sqlx" + dockertest "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "go.opentelemetry.io/otel" +) + +var ( + db *sqlx.DB + database postgres.Database + tracer = otel.Tracer("repo_tests") +) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + container, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "15.1-alpine", + Env: []string{ + "POSTGRES_USER=test", + "POSTGRES_PASSWORD=test", + "POSTGRES_DB=test", + "listen_addresses = '*'", + }, + }, func(config *docker.HostConfig) { + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + + port := container.GetPort("5432/tcp") + + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + pool.MaxWait = 120 * time.Second + if err := pool.Retry(func() error { + url := fmt.Sprintf("host=localhost port=%s user=test dbname=test password=test sslmode=disable", port) + db, err := sql.Open("pgx", url) + if err != nil { + return err + } + return db.Ping() + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + dbConfig := cpostgres.Config{ + Host: "localhost", + Port: port, + User: "test", + Pass: "test", + Name: "test", + SSLMode: "disable", + SSLCert: "", + SSLKey: "", + SSLRootCert: "", + } + + if db, err = cpostgres.Setup(dbConfig, *epostgres.Migration()); err != nil { + log.Fatalf("Could not setup test DB connection: %s", err) + } + + if db, err = cpostgres.Connect(dbConfig); err != nil { + log.Fatalf("Could not setup test DB connection: %s", err) + } + database = postgres.NewDatabase(db, dbConfig, tracer) + + code := m.Run() + + // Defers will not be run when using os.Exit + db.Close() + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} diff --git a/eventlogs/service.go b/eventlogs/service.go new file mode 100644 index 00000000000..e46695134ee --- /dev/null +++ b/eventlogs/service.go @@ -0,0 +1,54 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs + +import ( + "context" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/auth" + "github.com/absmach/magistrala/pkg/errors" + svcerr "github.com/absmach/magistrala/pkg/errors/service" +) + +type service struct { + auth magistrala.AuthServiceClient + repository Repository +} + +func NewService(repository Repository, authClient magistrala.AuthServiceClient) Service { + return &service{ + auth: authClient, + repository: repository, + } +} + +func (svc *service) ReadAll(ctx context.Context, token string, page Page) (EventsPage, error) { + if err := svc.authorize(ctx, token, page.ID, page.EntityType); err != nil { + return EventsPage{}, err + } + + return svc.repository.RetrieveAll(ctx, page) +} + +func (svc *service) authorize(ctx context.Context, token, id, entityType string) error { + req := &magistrala.AuthorizeReq{ + SubjectType: auth.UserType, + SubjectKind: auth.TokenKind, + Subject: token, + Permission: auth.ViewPermission, + ObjectType: entityType, + Object: id, + } + + res, err := svc.auth.Authorize(ctx, req) + if err != nil { + return errors.Wrap(svcerr.ErrAuthorization, err) + } + if !res.GetAuthorized() { + return svcerr.ErrAuthorization + } + + return nil +} diff --git a/eventlogs/service_test.go b/eventlogs/service_test.go new file mode 100644 index 00000000000..231fb9458cc --- /dev/null +++ b/eventlogs/service_test.go @@ -0,0 +1,125 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package eventlogs_test + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/absmach/magistrala" + "github.com/absmach/magistrala/auth" + authmocks "github.com/absmach/magistrala/auth/mocks" + "github.com/absmach/magistrala/eventlogs" + "github.com/absmach/magistrala/eventlogs/mocks" + "github.com/absmach/magistrala/internal/testsutil" + "github.com/absmach/magistrala/pkg/errors" + repoerr "github.com/absmach/magistrala/pkg/errors/repository" + svcerr "github.com/absmach/magistrala/pkg/errors/service" + "github.com/stretchr/testify/assert" +) + +func TestReadAll(t *testing.T) { + repo := new(mocks.Repository) + authsvc := new(authmocks.Service) + svc := eventlogs.NewService(repo, authsvc) + + validToken := "token" + validPage := eventlogs.Page{ + Offset: 0, + Limit: 10, + ID: testsutil.GenerateUUID(t), + EntityType: auth.UserType, + } + + cases := []struct { + desc string + token string + page eventlogs.Page + resp eventlogs.EventsPage + authRes *magistrala.AuthorizeRes + authErr error + repoErr error + err error + }{ + { + desc: "successful", + token: validToken, + page: validPage, + resp: eventlogs.EventsPage{ + Total: 1, + Offset: 0, + Limit: 10, + Events: []eventlogs.Event{ + { + ID: testsutil.GenerateUUID(t), + Operation: "user.create", + OccurredAt: time.Now().Add(-time.Hour), + Payload: map[string]interface{}{ + "temperature": rand.Float64(), + "humidity": rand.Float64(), + "sensor_id": rand.Intn(1000), + }, + }, + }, + }, + authRes: &magistrala.AuthorizeRes{Authorized: true}, + authErr: nil, + repoErr: nil, + err: nil, + }, + { + desc: "invalid token", + token: "invalid", + page: validPage, + authRes: &magistrala.AuthorizeRes{Authorized: false}, + authErr: svcerr.ErrAuthorization, + err: svcerr.ErrAuthorization, + }, + { + desc: "with repo error", + token: validToken, + page: validPage, + resp: eventlogs.EventsPage{}, + authRes: &magistrala.AuthorizeRes{Authorized: true}, + authErr: nil, + repoErr: repoerr.ErrViewEntity, + err: repoerr.ErrViewEntity, + }, + { + desc: "with failed to authorize", + token: validToken, + page: validPage, + resp: eventlogs.EventsPage{}, + authRes: &magistrala.AuthorizeRes{Authorized: false}, + authErr: nil, + repoErr: nil, + err: svcerr.ErrAuthorization, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + authReq := &magistrala.AuthorizeReq{ + SubjectType: auth.UserType, + SubjectKind: auth.TokenKind, + Subject: tc.token, + Permission: auth.ViewPermission, + ObjectType: tc.page.EntityType, + Object: tc.page.ID, + } + repocall := authsvc.On("Authorize", context.Background(), authReq).Return(tc.authRes, tc.authErr) + repocall1 := repo.On("RetrieveAll", context.Background(), tc.page).Return(tc.resp, tc.repoErr) + resp, err := svc.ReadAll(context.Background(), tc.token, tc.page) + if tc.err == nil { + assert.Equal(t, tc.resp, resp, tc.desc) + } + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + repocall.Unset() + repocall1.Unset() + }) + } +} diff --git a/internal/api/common.go b/internal/api/common.go index a3c92ac118d..940b491e1bd 100644 --- a/internal/api/common.go +++ b/internal/api/common.go @@ -115,7 +115,10 @@ func EncodeError(_ context.Context, err error, w http.ResponseWriter) { errors.Contains(err, apiutil.ErrMissingMemberKind), errors.Contains(err, apiutil.ErrLimitSize), errors.Contains(err, apiutil.ErrBearerKey), - errors.Contains(err, apiutil.ErrNameSize): + errors.Contains(err, apiutil.ErrNameSize), + errors.Contains(err, apiutil.ErrInvalidQueryParams), + errors.Contains(err, apiutil.ErrMissingEntityType), + errors.Contains(err, apiutil.ErrInvalidEntityType): w.WriteHeader(http.StatusBadRequest) case errors.Contains(err, svcerr.ErrAuthentication), errors.Contains(err, errors.ErrAuthentication), diff --git a/internal/apiutil/errors.go b/internal/apiutil/errors.go index bca1c710edc..dfad2481bbb 100644 --- a/internal/apiutil/errors.go +++ b/internal/apiutil/errors.go @@ -164,4 +164,10 @@ var ( // ErrRollbackTx indicates failed to rollback transaction. ErrRollbackTx = errors.New("failed to rollback transaction") + + // ErrMissingEntityType indicates missing entity type. + ErrMissingEntityType = errors.New("missing entity type") + + // ErrInvalidEntityType indicates invalid entity type. + ErrInvalidEntityType = errors.New("invalid entity type") ) diff --git a/mqtt/events/events.go b/mqtt/events/events.go index bbe8682c47b..d4aef1075bf 100644 --- a/mqtt/events/events.go +++ b/mqtt/events/events.go @@ -15,8 +15,8 @@ type mqttEvent struct { func (me mqttEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ - "thing_id": me.clientID, - "event_type": me.eventType, - "instance": me.instance, + "id": me.clientID, + "operation": me.eventType, + "instance": me.instance, }, nil } diff --git a/pkg/events/store/store_nats.go b/pkg/events/store/store_nats.go index 76f58188453..49417339534 100644 --- a/pkg/events/store/store_nats.go +++ b/pkg/events/store/store_nats.go @@ -15,6 +15,9 @@ import ( "github.com/absmach/magistrala/pkg/events/nats" ) +// StreamAllEvents represents subject to subscribe for all the events. +const StreamAllEvents = "events.>" + func init() { log.Println("The binary was build using nats as the events store") } diff --git a/pkg/events/store/store_rabbitmq.go b/pkg/events/store/store_rabbitmq.go index 9703e390c23..9a2f8f10e66 100644 --- a/pkg/events/store/store_rabbitmq.go +++ b/pkg/events/store/store_rabbitmq.go @@ -15,6 +15,9 @@ import ( "github.com/absmach/magistrala/pkg/events/rabbitmq" ) +// StreamAllEvents represents subject to subscribe for all the events. +const StreamAllEvents = "events.#" + func init() { log.Println("The binary was build using rabbitmq as the events store") } diff --git a/pkg/events/store/store_redis.go b/pkg/events/store/store_redis.go index 8f7f63bed8c..b2cb74250b2 100644 --- a/pkg/events/store/store_redis.go +++ b/pkg/events/store/store_redis.go @@ -15,6 +15,9 @@ import ( "github.com/absmach/magistrala/pkg/events/redis" ) +// StreamAllEvents represents subject to subscribe for all the events. +const StreamAllEvents = ">" + func init() { log.Println("The binary was build using redis as the events store") } diff --git a/pkg/sdk/go/events.go b/pkg/sdk/go/events.go new file mode 100644 index 00000000000..dea31d98150 --- /dev/null +++ b/pkg/sdk/go/events.go @@ -0,0 +1,47 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package sdk + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/absmach/magistrala/pkg/errors" +) + +const eventsEndpoint = "events" + +type Event struct { + ID string `json:"id,omitempty"` + Operation string `json:"operation,omitempty"` + OccurredAt time.Time `json:"occurred_at,omitempty"` + Payload Metadata `json:"payload,omitempty"` +} + +type EventsPage struct { + Total uint64 `json:"total"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` + Events []Event `json:"events"` +} + +func (sdk mgSDK) Events(pm PageMetadata, id, entityType, token string) (events EventsPage, err error) { + url, err := sdk.withQueryParams(sdk.eventsURL, eventsEndpoint+"/"+id+"/"+entityType, pm) + if err != nil { + return EventsPage{}, errors.NewSDKError(err) + } + + _, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, nil, http.StatusOK) + if sdkerr != nil { + return EventsPage{}, sdkerr + } + + var eventsPage EventsPage + if err := json.Unmarshal(body, &eventsPage); err != nil { + return EventsPage{}, errors.NewSDKError(err) + } + + return eventsPage, nil +} diff --git a/pkg/sdk/go/sdk.go b/pkg/sdk/go/sdk.go index 219e84a9534..1740af62e85 100644 --- a/pkg/sdk/go/sdk.go +++ b/pkg/sdk/go/sdk.go @@ -98,6 +98,9 @@ type PageMetadata struct { UserID string `json:"user_id,omitempty"` DomainID string `json:"domain_id,omitempty"` Relation string `json:"relation,omitempty"` + Operation string `json:"operation,omitempty"` + From int64 `json:"from,omitempty"` + To int64 `json:"to,omitempty"` } // Credentials represent client credentials: it contains @@ -1154,6 +1157,13 @@ type SDK interface { // err := sdk.DeleteInvitation("userID", "domainID", "token") // fmt.Println(err) DeleteInvitation(userID, domainID, token string) (err error) + + // Events returns a list of events. + // + // For example: + // events, _ := sdk.Events(PageMetadata{Offset: 0, Limit: 10, Operation: "users.create"}, "userID", "user", "token") + // fmt.Println(events) + Events(pm PageMetadata, id, entityType, token string) (events EventsPage, err error) } type mgSDK struct { @@ -1165,6 +1175,7 @@ type mgSDK struct { usersURL string domainsURL string invitationsURL string + eventsURL string HostURL string msgContentType ContentType @@ -1181,6 +1192,7 @@ type Config struct { UsersURL string DomainsURL string InvitationsURL string + EventsURL string HostURL string MsgContentType ContentType @@ -1198,6 +1210,7 @@ func NewSDK(conf Config) SDK { usersURL: conf.UsersURL, domainsURL: conf.DomainsURL, invitationsURL: conf.InvitationsURL, + eventsURL: conf.EventsURL, HostURL: conf.HostURL, msgContentType: conf.MsgContentType, diff --git a/pkg/sdk/mocks/sdk.go b/pkg/sdk/mocks/sdk.go index 8c58eac30c5..b6b0ba6dfc5 100644 --- a/pkg/sdk/mocks/sdk.go +++ b/pkg/sdk/mocks/sdk.go @@ -1208,6 +1208,34 @@ func (_m *SDK) EnableUser(id string, token string) (sdk.User, errors.SDKError) { return r0, r1 } +// Events provides a mock function with given fields: pm, id, entityType, token +func (_m *SDK) Events(pm sdk.PageMetadata, id string, entityType string, token string) (sdk.EventsPage, error) { + ret := _m.Called(pm, id, entityType, token) + + if len(ret) == 0 { + panic("no return value specified for Events") + } + + var r0 sdk.EventsPage + var r1 error + if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string, string) (sdk.EventsPage, error)); ok { + return rf(pm, id, entityType, token) + } + if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string, string) sdk.EventsPage); ok { + r0 = rf(pm, id, entityType, token) + } else { + r0 = ret.Get(0).(sdk.EventsPage) + } + + if rf, ok := ret.Get(1).(func(sdk.PageMetadata, string, string, string) error); ok { + r1 = rf(pm, id, entityType, token) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Group provides a mock function with given fields: id, token func (_m *SDK) Group(id string, token string) (sdk.Group, errors.SDKError) { ret := _m.Called(id, token)