From 81ee8578e57a61f9d76745ae7f4ce6f84495d1b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Tue, 5 Mar 2024 12:27:55 +0100 Subject: [PATCH 1/2] include dmetering plugin and --metering-url flag --- .gitignore | 1 + server/.gitignore | 1 + server/cmd/blockmeta/main.go | 16 ++++++++++++ server/go.mod | 10 ++++++++ server/go.sum | 21 ++++++++++++++++ server/server.go | 48 +++++++++++++++++++++++++++++++----- 6 files changed, 91 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 58e9c3b..23098bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .DS_Store /substreams/blockmeta-service-v0.1.0.spkg +.idea diff --git a/server/.gitignore b/server/.gitignore index 75bd885..23f66c6 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -1,2 +1,3 @@ badger3: .idea +blockmeta diff --git a/server/cmd/blockmeta/main.go b/server/cmd/blockmeta/main.go index 2513cda..8be2d18 100644 --- a/server/cmd/blockmeta/main.go +++ b/server/cmd/blockmeta/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "github.com/streamingfast/dmetering" "os" "regexp" @@ -11,6 +12,8 @@ import ( authGRPC "github.com/streamingfast/dauth/grpc" authNull "github.com/streamingfast/dauth/null" "github.com/streamingfast/derr" + meteringGRPC "github.com/streamingfast/dmetering/grpc" + meteringLogger "github.com/streamingfast/dmetering/logger" "github.com/streamingfast/logging" "go.uber.org/zap" ) @@ -19,6 +22,7 @@ var ( listenAddress = flag.String("grpc-listen-addr", ":9000", "The gRPC server listen address") sinkServerAddress = flag.String("sink-addr", ":9001", "The sink server address") authUrl = flag.String("auth-url", "null://", "The URL of the auth server") + meteringUrl = flag.String("metering-url", "null://", "The URL of the dmetering sink") corsHostRegexAllowFlag = flag.String("cors-host-regex-allow", "^localhost", "Regex to allow CORS origin requests from, defaults to localhost only") ) @@ -31,6 +35,10 @@ func main() { authNull.Register() authGRPC.Register() + meteringGRPC.Register() + meteringLogger.Register() + dmetering.RegisterNull() + if *sinkServerAddress == "" { zlog.Error("sink server address is required") os.Exit(1) @@ -49,6 +57,14 @@ func main() { os.Exit(1) } + eventEmitter, err := dmetering.New(*meteringUrl, zlog) + if err != nil { + zlog.Error("unable to create event emitter", zap.Error(err)) + os.Exit(1) + } + defer eventEmitter.Shutdown(nil) + dmetering.SetDefaultEmitter(eventEmitter) + var corsHostRegexAllow *regexp.Regexp if *corsHostRegexAllowFlag != "" { hostRegex, err := regexp.Compile(*corsHostRegexAllowFlag) diff --git a/server/go.mod b/server/go.mod index 2617b9b..0acaef4 100644 --- a/server/go.mod +++ b/server/go.mod @@ -8,6 +8,7 @@ require ( github.com/streamingfast/dauth v0.0.0-20240213192022-da4946165b42 github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe + github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c github.com/streamingfast/shutter v1.5.0 github.com/streamingfast/substreams-sink-kv v0.1.3-0.20240214210842-435682b6b7f2 @@ -27,8 +28,10 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.15.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.39.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.0.0-20221018185641-36f91511cfd7 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -40,9 +43,16 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/openzipkin/zipkin-go v0.4.2 // indirect + github.com/paulbellamy/ratecounter v0.2.0 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.0 // indirect github.com/sethvargo/go-retry v0.2.3 // indirect + github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707 // indirect github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect go.opencensus.io v0.24.0 // indirect diff --git a/server/go.sum b/server/go.sum index 3cb24a6..efdcc40 100644 --- a/server/go.sum +++ b/server/go.sum @@ -42,6 +42,8 @@ github.com/aws/aws-sdk-go v1.44.233/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc= github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d h1:fSlGu5ePbkjBidXuj2O5j9EcYrVB5Cr6/wdkYyDgxZk= github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d/go.mod h1:yCBkgASmKHgUOFjK9h1sOytUVgA+JkQjqj3xYP4AdWY= @@ -50,6 +52,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231109132714-523115ebc101 h1:7To3pQ+pZo0i3dsWEbinPNFs5gPSBOsJtx3wTT94VBY= @@ -128,17 +132,29 @@ github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffkt github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY= +github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rs/cors v1.8.3 h1:O+qNyWn7Z+F9M0ILBHgMVPuB1xTOucVd5gtaYyXBpRo= github.com/rs/cors v1.8.3/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/sethvargo/go-retry v0.2.3 h1:oYlgvIvsju3jNbottWABtbnoLC+GDtLdBHxKWxQm/iU= @@ -150,6 +166,10 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y= github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe h1:Rak301ON2Ubgs2WnCpu5Cy12tGy8XSy1teDdJquIJLg= github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY= +github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f h1:GEi9o8xu++upU2AWAR2WG/0Digk3KlTT/ZKzDko0vpk= +github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707 h1:GW12wt2UQ42jgfqtYo4BrtqJWUPpNYdQGztlHwYpFRc= +github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYpueJYOQReTJpfAhAPk0uZD4I58LfiUAr4IMc= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= @@ -260,6 +280,7 @@ golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ= golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/server/server.go b/server/server.go index f331635..e1200fc 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/streamingfast/dmetering" + "google.golang.org/protobuf/proto" "net/http" "net/url" "regexp" @@ -186,7 +188,10 @@ func (s *GrpcServer) NumToID(ctx context.Context, in *connect.Request[pbbmsrv.Nu } blockPbTimestamp := valueToTimestamp(response.KeyValues[0].Value) - return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/NumToID") + + return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil } func (s *GrpcServer) IDToNum(ctx context.Context, in *connect.Request[pbbmsrv.IDToNumReq]) (*connect.Response[pbbmsrv.BlockResp], error) { @@ -211,7 +216,10 @@ func (s *GrpcServer) IDToNum(ctx context.Context, in *connect.Request[pbbmsrv.ID } blockPbTimestamp := valueToTimestamp(response.KeyValues[0].Value) - return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/IDToNum") + + return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil } func (s *GrpcServer) Head(ctx context.Context, _ *connect.Request[pbbmsrv.Empty]) (*connect.Response[pbbmsrv.BlockResp], error) { @@ -229,8 +237,10 @@ func (s *GrpcServer) Head(ctx context.Context, _ *connect.Request[pbbmsrv.Empty] } blockNum := valueToBlockNumber(response.KeyValues[0].Value) + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/Head") - return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil + return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil } func (s *GrpcServer) At(ctx context.Context, in *connect.Request[pbbmsrv.TimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) { @@ -252,7 +262,10 @@ func (s *GrpcServer) At(ctx context.Context, in *connect.Request[pbbmsrv.TimeReq } blockNum := valueToBlockNumber(response.KeyValues[0].Value) - return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/At") + + return connect.NewResponse(msg), nil } func (s *GrpcServer) Before(ctx context.Context, in *connect.Request[pbbmsrv.RelativeTimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) { @@ -281,7 +294,10 @@ func (s *GrpcServer) Before(ctx context.Context, in *connect.Request[pbbmsrv.Rel blockNum = valueToBlockNumber(response.KeyValues[i].Value) break } - return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/Before") + + return connect.NewResponse(msg), nil } func (s *GrpcServer) After(ctx context.Context, in *connect.Request[pbbmsrv.RelativeTimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) { @@ -312,6 +328,26 @@ func (s *GrpcServer) After(ctx context.Context, in *connect.Request[pbbmsrv.Rela break } + msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp} + sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/After") - return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil + return connect.NewResponse(msg), nil +} + +func sendMeteringEvent(ctx context.Context, size int, endpoint string) { + + auth := dauth.FromContext(ctx) + event := dmetering.Event{ + UserID: auth.UserID(), + ApiKeyID: auth.APIKeyID(), + IpAddress: auth.RealIP(), + Meta: auth.Meta(), + Endpoint: endpoint, + Metrics: map[string]float64{ + "egress_bytes": float64(size), + "requests": 1, + }, + Timestamp: time.Now(), + } + dmetering.Emit(ctx, event) } From 1e7234e4aa587ce0ce0aa55cd7d5e9e9fa5cf8a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Tue, 5 Mar 2024 15:10:18 +0100 Subject: [PATCH 2/2] cleanup --- server/cmd/blockmeta/main.go | 2 +- server/server.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/cmd/blockmeta/main.go b/server/cmd/blockmeta/main.go index 8be2d18..39a276d 100644 --- a/server/cmd/blockmeta/main.go +++ b/server/cmd/blockmeta/main.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "github.com/streamingfast/dmetering" "os" "regexp" @@ -12,6 +11,7 @@ import ( authGRPC "github.com/streamingfast/dauth/grpc" authNull "github.com/streamingfast/dauth/null" "github.com/streamingfast/derr" + "github.com/streamingfast/dmetering" meteringGRPC "github.com/streamingfast/dmetering/grpc" meteringLogger "github.com/streamingfast/dmetering/logger" "github.com/streamingfast/logging" diff --git a/server/server.go b/server/server.go index e1200fc..356c305 100644 --- a/server/server.go +++ b/server/server.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/streamingfast/dmetering" - "google.golang.org/protobuf/proto" "net/http" "net/url" "regexp" @@ -22,11 +20,13 @@ import ( "github.com/streamingfast/derr" dgrpcserver "github.com/streamingfast/dgrpc/server" "github.com/streamingfast/dgrpc/server/connectrpc" + "github.com/streamingfast/dmetering" "github.com/streamingfast/shutter" pbkv "github.com/streamingfast/substreams-sink-kv/pb/substreams/sink/kv/v1" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" )