From 82db40077dfb223f383cfaa557b35228b04a5b13 Mon Sep 17 00:00:00 2001 From: Nevio Date: Tue, 17 Sep 2024 09:35:21 +0200 Subject: [PATCH] Ability to serve transports, Dockerfile update (docker-compose not yet working) + pprof implementation --- Dockerfile | 21 ++++++--- benchmark/dummy.go | 2 +- benchmark/quic.go | 2 +- benchmark/tcp.go | 2 +- benchmark/udp.go | 2 +- benchmark/uds.go | 2 +- cmd/serve.go | 6 ++- config.yaml | 7 ++- docker-compose.yml | 33 ++++++++------ fdb.go | 57 ++++++++++++++++++++++- go.mod | 20 ++++++++ go.sum | 35 ++++++++++++++ logger/logger.go | 4 +- metrics/metrics.go | 61 +++++++++++++++++++++++++ registry.go | 93 +++++++++++++++++++++++++++++++++----- transports/dummy/server.go | 2 +- transports/quic/server.go | 2 +- transports/tcp/server.go | 2 +- transports/transport.go | 8 +++- transports/udp/server.go | 2 +- transports/uds/server.go | 2 +- 21 files changed, 314 insertions(+), 51 deletions(-) diff --git a/Dockerfile b/Dockerfile index be08c8d..251c1a3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,10 +2,10 @@ FROM golang:1.23-alpine AS builder # Install necessary build tools -RUN apk add --no-cache make gcc musl-dev +RUN apk add --no-cache make git gcc musl-dev # Set the working directory inside the container -WORKDIR /app +WORKDIR /fdb # Copy the Go modules manifest and download dependencies COPY go.mod go.sum ./ @@ -21,12 +21,19 @@ RUN make build FROM alpine:latest # Set the working directory inside the container -WORKDIR /app +WORKDIR /fdb # Copy the built executable from the builder stage -COPY --from=builder /app/build/fdb /app/fdb +COPY --from=builder /fdb/build/fdb /fdb/fdb -# EXPOSE 0000 +# Copy the YAML configuration file +COPY config.yaml /fdb/config.yaml -# Command to run the server -CMD ["./fdb", "serve"] +# Copy the certificate files +COPY data/certs/ /fdb/data/certs/ + +# Expose the necessary ports from the config file +EXPOSE 4434 4433 5011 5022 4060 + +# Command to run the server with the configuration file +CMD ["./fdb", "serve", "--config", "./config.yaml"] diff --git a/benchmark/dummy.go b/benchmark/dummy.go index 1a7e392..9b5bba4 100644 --- a/benchmark/dummy.go +++ b/benchmark/dummy.go @@ -60,7 +60,7 @@ func (ds *DummySuite) Start(ctx context.Context) error { rHandler := transport_dummy.NewDummyReadHandler(db) dummyServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if sErr := dummyServer.Start(); sErr != nil { + if sErr := dummyServer.Start(ctx); sErr != nil { zap.L().Error( "failed to start dummy transport", zap.Error(sErr), diff --git a/benchmark/quic.go b/benchmark/quic.go index 838666b..4ae043c 100644 --- a/benchmark/quic.go +++ b/benchmark/quic.go @@ -65,7 +65,7 @@ func (qs *QuicSuite) Start(ctx context.Context) error { rHandler := transport_quic.NewQuicReadHandler(bDb) quicServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if err := quicServer.Start(); err != nil { + if err := quicServer.Start(ctx); err != nil { return fmt.Errorf("failed to start QUIC server: %w", err) } diff --git a/benchmark/tcp.go b/benchmark/tcp.go index 80a51c0..e2e8787 100644 --- a/benchmark/tcp.go +++ b/benchmark/tcp.go @@ -64,7 +64,7 @@ func (ts *TcpSuite) Start(ctx context.Context) error { rHandler := transport_tcp.NewTCPReadHandler(bDb) tcpServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if sErr := tcpServer.Start(); sErr != nil { + if sErr := tcpServer.Start(ctx); sErr != nil { zap.L().Error("failed to start TCP transport", zap.Error(sErr)) } diff --git a/benchmark/udp.go b/benchmark/udp.go index 0d68e14..fcd10eb 100644 --- a/benchmark/udp.go +++ b/benchmark/udp.go @@ -64,7 +64,7 @@ func (us *UdpSuite) Start(ctx context.Context) error { rHandler := transport_udp.NewUDPReadHandler(bDb) udpServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if sErr := udpServer.Start(); sErr != nil { + if sErr := udpServer.Start(ctx); sErr != nil { zap.L().Error("failed to start UDP transport", zap.Error(sErr)) } diff --git a/benchmark/uds.go b/benchmark/uds.go index 5b72ed7..74d5f05 100644 --- a/benchmark/uds.go +++ b/benchmark/uds.go @@ -65,7 +65,7 @@ func (us *UdsSuite) Start(ctx context.Context) error { rHandler := transport_uds.NewUDSReadHandler(bDb) udsServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if sErr := udsServer.Start(); sErr != nil { + if sErr := udsServer.Start(ctx); sErr != nil { zap.L().Error("failed to start UDS transport", zap.Error(sErr)) } diff --git a/cmd/serve.go b/cmd/serve.go index 48042ba..f734e91 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -21,8 +21,8 @@ func ServeCommand() *cli.Command { }, &cli.StringSliceFlag{ Name: "transports", - Usage: "Specify the transport types (e.g., quic, uds)", - Value: cli.NewStringSlice("quic", "uds"), // Corrected default initialization + Usage: "Specify the transport types", + Value: cli.NewStringSlice("quic", "uds", "tcp", "udp"), }, }, Action: func(c *cli.Context) error { @@ -47,6 +47,8 @@ func ServeCommand() *cli.Command { transports = append(transports, tt) } + + defer fdbc.Stop(transports...) return fdbc.Start(c.Context, transports...) }, } diff --git a/config.yaml b/config.yaml index 1ff5422..705657e 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,7 @@ logger: enabled: true environment: development - level: debug + level: info mdbx: enabled: true @@ -14,6 +14,11 @@ mdbx: growthStep: 4096 # Growth step size (4 KB) filePermissions: 0600 # File permissions for the database +pprof: + - name: fdb + enabled: true + addr: "127.0.0.1:4060" + transports: - type: dummy enabled: true diff --git a/docker-compose.yml b/docker-compose.yml index 1a9bd5a..3033d69 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,33 +6,36 @@ services: context: . dockerfile: Dockerfile ports: - - "8080:8080" # Map the container's port 8080 to the host + - "4434:4434" # Mapping the dummy service port + - "4433:4433" # Mapping the QUIC service port + - "5011:5011" # Mapping the TCP service port + - "5022:5022" # Mapping the UDP service port + - "4060:4060" # Mapping for pprof restart: always environment: - - ENV_VAR1=value1 - - ENV_VAR2=value2 - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 # OpenTelemetry Collector endpoint + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 # OpenTelemetry Collector endpoint volumes: - - ./data:/app/data - command: ["./fdb", "serve"] + - ./data:/app/data # Mounting the data directory for certs and other resources + - ./config.yaml:/fdb/config.yaml # Mounting the configuration file + command: ["./fdb", "serve", "--config", "/fdb/config.yaml"] depends_on: - - otel-collector # Ensure fdb service starts after the OpenTelemetry collector + - otel-collector # Ensure fdb service starts after OpenTelemetry collector otel-collector: image: otel/opentelemetry-collector:latest ports: - - "4317:4317" # OTLP gRPC receiver - - "4318:4318" # OTLP HTTP receiver - - "55680:55680" # OpenTelemetry protocol receiver + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + - "55680:55680" # OpenTelemetry protocol receiver command: ["--config=/etc/otel-collector-config.yaml"] volumes: - - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml # Mount the custom config for the OpenTelemetry collector + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml # Mounting custom config for OpenTelemetry collector jaeger: image: jaegertracing/all-in-one:1.29 ports: - - "16686:16686" # Jaeger UI - - "14250:14250" # Jaeger gRPC - - "6831:6831/udp" # Jaeger UDP port for traces + - "16686:16686" # Jaeger UI + - "14250:14250" # Jaeger gRPC + - "6831:6831/udp" # Jaeger UDP port for traces environment: - - COLLECTOR_ZIPKIN_HTTP_PORT=9411 + - COLLECTOR_ZIPKIN_HTTP_PORT=9411 # Zipkin compatibility mode diff --git a/fdb.go b/fdb.go index df4c88f..3edfd59 100644 --- a/fdb.go +++ b/fdb.go @@ -7,6 +7,7 @@ import ( "github.com/unpackdev/fdb/config" "github.com/unpackdev/fdb/db" "github.com/unpackdev/fdb/logger" + "github.com/unpackdev/fdb/pprof" "github.com/unpackdev/fdb/transports" transport_dummy "github.com/unpackdev/fdb/transports/dummy" transport_quic "github.com/unpackdev/fdb/transports/quic" @@ -15,6 +16,7 @@ import ( transport_uds "github.com/unpackdev/fdb/transports/uds" "github.com/unpackdev/fdb/types" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type FDB struct { @@ -108,10 +110,63 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) { } func (fdb *FDB) Start(ctx context.Context, transports ...types.TransportType) error { - return nil + g, gCtx := errgroup.WithContext(ctx) + + bDb, err := fdb.GetDbManager().GetDb("fdb") + if err != nil { + return fmt.Errorf("failed to retrieve fdb database: %w", err) + } + + pCfg, pcErr := fdb.config.GetPprofByServiceTag("fdb") + if pcErr != nil { + return errors.Wrapf(pcErr, "failed to retrieve fdb pprof config for service tag: %s", "fdb") + } + + if pCfg.Enabled { + g.Go(func() error { + return pprof.New(ctx, *pCfg).Start() + }) + } + + for _, transport := range transports { + transportFn, tnOk := tRegistry[transport] + if !tnOk { + return fmt.Errorf("unknown transport type provided: %v - rejecting serving transports", transport) + } + + iTransport, itErr := transportFn(fdb, bDb) + if itErr != nil { + return errors.Wrapf(itErr, "failure to create transport: %s", transport) + } + + g.Go(func() error { + return iTransport.Start(gCtx) + }) + } + + if gErr := g.Wait(); gErr != nil { + return errors.Wrap(gErr, "failure to start fdb database") + } + + select { + case <-ctx.Done(): + return ctx.Err() + } } func (fdb *FDB) Stop(transports ...types.TransportType) error { + for _, transport := range transports { + t, tErr := fdb.tm.GetTransport(transport) + if tErr != nil { + return tErr + } + + if err := t.Stop(); err != nil { + return err + } + } + + zap.L().Info("All transports successfully stopped") return nil } diff --git a/go.mod b/go.mod index 7e14518..381ccd3 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,12 @@ require ( require ( github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect github.com/onsi/ginkgo/v2 v2.20.2 // indirect github.com/panjf2000/gnet/v2 v2.5.7 // indirect @@ -27,6 +31,18 @@ require ( github.com/urfave/cli/v2 v2.27.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect + go.opentelemetry.io/contrib/bridges/otelslog v0.5.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.6.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.30.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0 // indirect + go.opentelemetry.io/otel/log v0.6.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/otel/sdk v1.30.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.30.0 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect @@ -36,7 +52,11 @@ require ( golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/tools v0.25.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect + google.golang.org/grpc v1.66.2 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 6bfef56..72d044e 100644 --- a/go.sum +++ b/go.sum @@ -10,14 +10,21 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/erigontech/mdbx-go v0.38.4 h1:S9T7mTe9KPcFe4dOoOtVdI6gPzht9y7wMnYfUBgrQLo= github.com/erigontech/mdbx-go v0.38.4/go.mod h1:IcOLQDPw3VM/asP6T5JVPPN4FHHgJtY16XfYjzWKVNI= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ= github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d h1:Azx2B59D4+zpVVtuYb8Oe3uOLi/ift4xfwKdhBX0Cy0= github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -67,6 +74,30 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.opentelemetry.io/contrib/bridges/otelslog v0.5.0 h1:lU3F57OSLK5mQ1PDBVAfDDaKCPv37MrEbCfTzsF4bz0= +go.opentelemetry.io/contrib/bridges/otelslog v0.5.0/go.mod h1:I84u06zJFr8T5D73fslEUbnRBimVVSBhuVw8L8I92AU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.6.0 h1:bZHOb8k/CwwSt0DgvgaoOhBXWNdWqFWaIsGTtg1H3KE= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.6.0/go.mod h1:XlV163j81kDdIt5b5BXCjdqVfqJFy/LJrHA697SorvQ= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.30.0 h1:IyFlqNsi8VT/nwYlLJfdM0y1gavxGpEvnf6FtVfZ6X4= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.30.0/go.mod h1:bxiX8eUeKoAEQmbq/ecUT8UqZwCjZW52yJrXJUSozsk= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0 h1:kn1BudCgwtE7PxLqcZkErpD8GKqLZ6BSzeW9QihQJeM= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.30.0/go.mod h1:ljkUDtAMdleoi9tIG1R6dJUpVwDcYjw3J2Q6Q/SuiC0= +go.opentelemetry.io/otel/log v0.6.0 h1:nH66tr+dmEgW5y+F9LanGJUBYPrRgP4g2EkmPE3LeK8= +go.opentelemetry.io/otel/log v0.6.0/go.mod h1:KdySypjQHhP069JX0z/t26VHwa8vSwzgaKmXtIB3fJM= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= +go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= +go.opentelemetry.io/otel/sdk/log v0.6.0 h1:4J8BwXY4EeDE9Mowg+CyhWVBhTSLXVXodiXxS/+PGqI= +go.opentelemetry.io/otel/sdk/log v0.6.0/go.mod h1:L1DN8RMAduKkrwRAFDEX3E3TLOq46+XMGSbUfHU/+vE= +go.opentelemetry.io/otel/sdk/metric v1.30.0 h1:QJLT8Pe11jyHBHfSAgYH7kEmT24eX792jZO1bo4BXkM= +go.opentelemetry.io/otel/sdk/metric v1.30.0/go.mod h1:waS6P3YqFNzeP01kuo/MBBYqaoBJl7efRQHOaydhy1Y= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= @@ -125,6 +156,10 @@ golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= +google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/logger/logger.go b/logger/logger.go index b99b622..2c6ec06 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -8,7 +8,7 @@ import ( // GetProductionLogger creates and returns a new zap.Logger configured for production use. // The production logger is optimized for performance. It uses a JSON encoder, logs to standard -// error, and writes at InfoLevel and above. +// error, and writes at InfoLevel and above as default. // // Returns: // @@ -57,6 +57,6 @@ func GetLogger(env string, level string) (*zap.Logger, error) { case "production": return GetProductionLogger(configLevel) default: - return nil, fmt.Errorf("failure to construct logger for env: %s", env) + return nil, fmt.Errorf("not supported environment requested env: %s", env) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 1abe097..2b5926e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1 +1,62 @@ package metrics + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// Metrics acts as an OTel metrics manager. +type Metrics struct { + meterProvider *sdkmetric.MeterProvider +} + +// NewMetrics initializes the OTel metric exporter and meter provider. +func NewMetrics(ctx context.Context, endpoint string) (*Metrics, error) { + // Configure the OTLP exporter to send metrics to the OTel collector. + exporter, err := otlpmetric.New( + ctx, + otlpmetricgrpc.NewClient( + otlpmetricgrpc.WithEndpoint(endpoint), + otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ), + ) + if err != nil { + return nil, err + } + + // Set up a periodic reader to batch and export metrics at intervals. + reader := sdkmetric.NewPeriodicReader( + exporter, + sdkmetric.WithInterval(15*time.Second), + ) + + // Create a new MeterProvider with the periodic reader. + provider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + ) + + // Set the global meter provider to ensure consistency across the application. + otel.SetMeterProvider(provider) + + return &Metrics{ + meterProvider: provider, + }, nil +} + +// GetMeter returns a named meter for instrumentation. +func (m *Metrics) GetMeter(instrumentationName string) metric.Meter { + return m.meterProvider.Meter(instrumentationName) +} + +// Shutdown gracefully shuts down the meter provider and flushes any remaining metrics. +func (m *Metrics) Shutdown(ctx context.Context) error { + return m.meterProvider.Shutdown(ctx) +} diff --git a/registry.go b/registry.go index a52aa86..ec5a4b2 100644 --- a/registry.go +++ b/registry.go @@ -3,37 +3,106 @@ package fdb import ( "fmt" "github.com/unpackdev/fdb/db" + "github.com/unpackdev/fdb/transports" + transport_quic "github.com/unpackdev/fdb/transports/quic" transport_tcp "github.com/unpackdev/fdb/transports/tcp" + transport_udp "github.com/unpackdev/fdb/transports/udp" + transport_uds "github.com/unpackdev/fdb/transports/uds" "github.com/unpackdev/fdb/types" "time" ) -var registry = map[types.TransportType]func(fdb *FDB) error{ - types.TCPTransportType: func(fdb *FDB) error { - tcpTransport, err := fdb.GetTransportByType(types.TCPTransportType) +// tRegistry is a transport registry mapping transport types (e.g., QUIC, TCP, UDP, UDS) to their initialization functions. +// Each function initializes the transport, registers appropriate handlers (write, read), +// and returns the instantiated transport or an error if initialization fails. +var tRegistry = map[types.TransportType]func(fdb *FDB, dbP db.Provider) (transports.Transport, error){ + types.QUICTransportType: func(fdb *FDB, dbP db.Provider) (transports.Transport, error) { + quicTransport, err := fdb.GetTransportByType(types.QUICTransportType) if err != nil { - return fmt.Errorf("failed to retrieve TCP transport: %w", err) + return nil, fmt.Errorf("failed to retrieve QUIC transport: %w", err) } - tcpServer, ok := tcpTransport.(*transport_tcp.Server) + quicServer, ok := quicTransport.(*transport_quic.Server) if !ok { - return fmt.Errorf("failed to cast transport to TcpServer") + return nil, fmt.Errorf("failed to cast transport to QuicServer") } - bDb, err := fdb.GetDbManager().GetDb("fdb") + // Create a new BatchWriter with a batch size of 512 and flush interval of 1 second + batchWriter := db.NewBatchWriter(dbP.(*db.Db), 512, 500*time.Millisecond, 15) + + wHandler := transport_quic.NewQuicWriteHandler(dbP, batchWriter) + quicServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) + + rHandler := transport_quic.NewQuicReadHandler(dbP) + quicServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) + + return quicTransport, nil + }, + types.TCPTransportType: func(fdb *FDB, dbP db.Provider) (transports.Transport, error) { + tcpTransport, err := fdb.GetTransportByType(types.TCPTransportType) if err != nil { - return fmt.Errorf("failed to retrieve benchmark database: %w", err) + return nil, fmt.Errorf("failed to retrieve TCP transport: %w", err) + } + + tcpServer, ok := tcpTransport.(*transport_tcp.Server) + if !ok { + return nil, fmt.Errorf("failed to cast transport to TcpServer") } // Create a new BatchWriter with a batch size of 512 and flush interval of 1 second - batchWriter := db.NewBatchWriter(bDb.(*db.Db), 512, 500*time.Millisecond, 15) + batchWriter := db.NewBatchWriter(dbP.(*db.Db), 512, 500*time.Millisecond, 15) - wHandler := transport_tcp.NewTCPWriteHandler(bDb, batchWriter) + wHandler := transport_tcp.NewTCPWriteHandler(dbP, batchWriter) tcpServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) - rHandler := transport_tcp.NewTCPReadHandler(bDb) + rHandler := transport_tcp.NewTCPReadHandler(dbP) tcpServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - return tcpServer.Start() + return tcpTransport, nil + }, + types.UDSTransportType: func(fdb *FDB, dbP db.Provider) (transports.Transport, error) { + udsTransport, err := fdb.GetTransportByType(types.UDSTransportType) + if err != nil { + return nil, fmt.Errorf("failed to retrieve UDS transport: %w", err) + } + + udsServer, ok := udsTransport.(*transport_uds.Server) + if !ok { + return nil, fmt.Errorf("failed to cast transport to UdsServer") + } + + // Create a new BatchWriter with a batch size of 512 and flush interval of 1 second + batchWriter := db.NewBatchWriter(dbP.(*db.Db), 512, 500*time.Millisecond, 15) + + // Register write and read handlers + wHandler := transport_uds.NewUDSWriteHandler(dbP, batchWriter) + udsServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) + + rHandler := transport_uds.NewUDSReadHandler(dbP) + udsServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) + + return udsTransport, nil + }, + types.UDPTransportType: func(fdb *FDB, dbP db.Provider) (transports.Transport, error) { + udpTransport, err := fdb.GetTransportByType(types.UDPTransportType) + if err != nil { + return nil, fmt.Errorf("failed to retrieve UDP transport: %w", err) + } + + udpServer, ok := udpTransport.(*transport_udp.Server) + if !ok { + return nil, fmt.Errorf("failed to cast transport to UdpServer") + } + + // Create a new BatchWriter with a batch size of 512 and flush interval of 1 second + batchWriter := db.NewBatchWriter(dbP.(*db.Db), 512, 500*time.Millisecond, 15) + + wHandler := transport_udp.NewUDPWriteHandler(dbP, batchWriter) + udpServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) + + rHandler := transport_udp.NewUDPReadHandler(dbP) + udpServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) + + return udpTransport, nil }, } diff --git a/transports/dummy/server.go b/transports/dummy/server.go index 628f597..478c169 100644 --- a/transports/dummy/server.go +++ b/transports/dummy/server.go @@ -39,7 +39,7 @@ func (s *Server) Addr() string { } // Start starts the UDS server -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { s.stopChan = make(chan struct{}) s.started = make(chan struct{}, 1) // Initialize the started channel diff --git a/transports/quic/server.go b/transports/quic/server.go index 8bcc4af..a235e26 100644 --- a/transports/quic/server.go +++ b/transports/quic/server.go @@ -55,7 +55,7 @@ func (s *Server) Addr() string { } // Start starts the QUIC server -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { var err error s.listener, err = quic.ListenAddr(s.cnf.Addr(), s.tlsConfig, nil) if err != nil { diff --git a/transports/tcp/server.go b/transports/tcp/server.go index 3809eee..51cd168 100644 --- a/transports/tcp/server.go +++ b/transports/tcp/server.go @@ -42,7 +42,7 @@ func (s *Server) Addr() string { } // Start starts the TCP server using the provided configuration -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { s.stopChan = make(chan struct{}) s.started = make(chan struct{}) // Initialize the started channel listenAddr := s.cnf.Addr() diff --git a/transports/transport.go b/transports/transport.go index fc3bc16..19bbda8 100644 --- a/transports/transport.go +++ b/transports/transport.go @@ -1,3 +1,9 @@ package transports -type Transport interface{} +import "context" + +type Transport interface { + Addr() string + Start(ctx context.Context) error + Stop() error +} diff --git a/transports/udp/server.go b/transports/udp/server.go index 8f77474..63643e7 100644 --- a/transports/udp/server.go +++ b/transports/udp/server.go @@ -42,7 +42,7 @@ func (s *Server) Addr() string { } // Start starts the UDP server using the provided configuration -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { s.stopChan = make(chan struct{}) s.started = make(chan struct{}) // Initialize the started channel listenAddr := "udp://" + s.cnf.Addr() diff --git a/transports/uds/server.go b/transports/uds/server.go index a7ec2bb..7442210 100644 --- a/transports/uds/server.go +++ b/transports/uds/server.go @@ -50,7 +50,7 @@ func (s *Server) Addr() string { } // Start starts the UDS server using the provided configuration -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { s.stopChan = make(chan struct{}) s.started = make(chan struct{}) // Initialize the started channel listenAddr := "unix://" + s.cnf.Addr()