Skip to content

Commit

Permalink
Merge pull request #3 from fschoell/feature/dmetering
Browse files Browse the repository at this point in the history
include metering
  • Loading branch information
ArnaudBger authored Mar 5, 2024
2 parents 0da4bda + 1e7234e commit 2f971e0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
/substreams/blockmeta-service-v0.1.0.spkg
.idea
1 change: 1 addition & 0 deletions server/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
badger3:
.idea
blockmeta
16 changes: 16 additions & 0 deletions server/cmd/blockmeta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
authGRPC "github.com/streamingfast/dauth/grpc"
authNull "github.com/streamingfast/dauth/null"
"github.com/streamingfast/derr"
"github.com/streamingfast/dmetering"
meteringGRPC "github.com/streamingfast/dmetering/grpc"
meteringLogger "github.com/streamingfast/dmetering/logger"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)
Expand All @@ -19,6 +22,7 @@ var (
listenAddress = flag.String("grpc-listen-addr", ":9000", "The gRPC server listen address")
sinkServerAddress = flag.String("sink-addr", ":9001", "The sink server address")
authUrl = flag.String("auth-url", "null://", "The URL of the auth server")
meteringUrl = flag.String("metering-url", "null://", "The URL of the dmetering sink")
corsHostRegexAllowFlag = flag.String("cors-host-regex-allow", "^localhost", "Regex to allow CORS origin requests from, defaults to localhost only")
)

Expand All @@ -31,6 +35,10 @@ func main() {
authNull.Register()
authGRPC.Register()

meteringGRPC.Register()
meteringLogger.Register()
dmetering.RegisterNull()

if *sinkServerAddress == "" {
zlog.Error("sink server address is required")
os.Exit(1)
Expand All @@ -49,6 +57,14 @@ func main() {
os.Exit(1)
}

eventEmitter, err := dmetering.New(*meteringUrl, zlog)
if err != nil {
zlog.Error("unable to create event emitter", zap.Error(err))
os.Exit(1)
}
defer eventEmitter.Shutdown(nil)
dmetering.SetDefaultEmitter(eventEmitter)

var corsHostRegexAllow *regexp.Regexp
if *corsHostRegexAllowFlag != "" {
hostRegex, err := regexp.Compile(*corsHostRegexAllowFlag)
Expand Down
10 changes: 10 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/streamingfast/dauth v0.0.0-20240213192022-da4946165b42
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f
github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c
github.com/streamingfast/shutter v1.5.0
github.com/streamingfast/substreams-sink-kv v0.1.3-0.20240214210842-435682b6b7f2
Expand All @@ -27,8 +28,10 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.15.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.39.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.0.0-20221018185641-36f91511cfd7 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand All @@ -40,9 +43,16 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/paulbellamy/ratecounter v0.2.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/sethvargo/go-retry v0.2.3 // indirect
github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707 // indirect
github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
go.opencensus.io v0.24.0 // indirect
Expand Down
21 changes: 21 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ github.com/aws/aws-sdk-go v1.44.233/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d h1:fSlGu5ePbkjBidXuj2O5j9EcYrVB5Cr6/wdkYyDgxZk=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d/go.mod h1:yCBkgASmKHgUOFjK9h1sOytUVgA+JkQjqj3xYP4AdWY=
Expand All @@ -50,6 +52,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20231109132714-523115ebc101 h1:7To3pQ+pZo0i3dsWEbinPNFs5gPSBOsJtx3wTT94VBY=
Expand Down Expand Up @@ -128,17 +132,29 @@ github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffkt
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA=
github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rs/cors v1.8.3 h1:O+qNyWn7Z+F9M0ILBHgMVPuB1xTOucVd5gtaYyXBpRo=
github.com/rs/cors v1.8.3/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/sethvargo/go-retry v0.2.3 h1:oYlgvIvsju3jNbottWABtbnoLC+GDtLdBHxKWxQm/iU=
Expand All @@ -150,6 +166,10 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe h1:Rak301ON2Ubgs2WnCpu5Cy12tGy8XSy1teDdJquIJLg=
github.com/streamingfast/dgrpc v0.0.0-20240215122024-a128a25110fe/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f h1:GEi9o8xu++upU2AWAR2WG/0Digk3KlTT/ZKzDko0vpk=
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707 h1:GW12wt2UQ42jgfqtYo4BrtqJWUPpNYdQGztlHwYpFRc=
github.com/streamingfast/dmetrics v0.0.0-20230830150831-a7c98f755707/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYpueJYOQReTJpfAhAPk0uZD4I58LfiUAr4IMc=
github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q=
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU=
Expand Down Expand Up @@ -260,6 +280,7 @@ golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
48 changes: 42 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"github.com/streamingfast/derr"
dgrpcserver "github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/dgrpc/server/connectrpc"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/shutter"
pbkv "github.com/streamingfast/substreams-sink-kv/pb/substreams/sink/kv/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -186,7 +188,10 @@ func (s *GrpcServer) NumToID(ctx context.Context, in *connect.Request[pbbmsrv.Nu
}

blockPbTimestamp := valueToTimestamp(response.KeyValues[0].Value)
return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/NumToID")

return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil
}

func (s *GrpcServer) IDToNum(ctx context.Context, in *connect.Request[pbbmsrv.IDToNumReq]) (*connect.Response[pbbmsrv.BlockResp], error) {
Expand All @@ -211,7 +216,10 @@ func (s *GrpcServer) IDToNum(ctx context.Context, in *connect.Request[pbbmsrv.ID
}

blockPbTimestamp := valueToTimestamp(response.KeyValues[0].Value)
return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/IDToNum")

return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil
}

func (s *GrpcServer) Head(ctx context.Context, _ *connect.Request[pbbmsrv.Empty]) (*connect.Response[pbbmsrv.BlockResp], error) {
Expand All @@ -229,8 +237,10 @@ func (s *GrpcServer) Head(ctx context.Context, _ *connect.Request[pbbmsrv.Empty]
}

blockNum := valueToBlockNumber(response.KeyValues[0].Value)
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.Block/Head")

return &connect.Response[pbbmsrv.BlockResp]{Msg: &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}}, nil
return &connect.Response[pbbmsrv.BlockResp]{Msg: msg}, nil
}

func (s *GrpcServer) At(ctx context.Context, in *connect.Request[pbbmsrv.TimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) {
Expand All @@ -252,7 +262,10 @@ func (s *GrpcServer) At(ctx context.Context, in *connect.Request[pbbmsrv.TimeReq
}

blockNum := valueToBlockNumber(response.KeyValues[0].Value)
return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/At")

return connect.NewResponse(msg), nil
}

func (s *GrpcServer) Before(ctx context.Context, in *connect.Request[pbbmsrv.RelativeTimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) {
Expand Down Expand Up @@ -281,7 +294,10 @@ func (s *GrpcServer) Before(ctx context.Context, in *connect.Request[pbbmsrv.Rel
blockNum = valueToBlockNumber(response.KeyValues[i].Value)
break
}
return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/Before")

return connect.NewResponse(msg), nil
}

func (s *GrpcServer) After(ctx context.Context, in *connect.Request[pbbmsrv.RelativeTimeReq]) (*connect.Response[pbbmsrv.BlockResp], error) {
Expand Down Expand Up @@ -312,6 +328,26 @@ func (s *GrpcServer) After(ctx context.Context, in *connect.Request[pbbmsrv.Rela

break
}
msg := &pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}
sendMeteringEvent(ctx, proto.Size(msg), "sf.blockmeta.v2.BlockByTime/After")

return connect.NewResponse(&pbbmsrv.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}), nil
return connect.NewResponse(msg), nil
}

func sendMeteringEvent(ctx context.Context, size int, endpoint string) {

auth := dauth.FromContext(ctx)
event := dmetering.Event{
UserID: auth.UserID(),
ApiKeyID: auth.APIKeyID(),
IpAddress: auth.RealIP(),
Meta: auth.Meta(),
Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(size),
"requests": 1,
},
Timestamp: time.Now(),
}
dmetering.Emit(ctx, event)
}

0 comments on commit 2f971e0

Please sign in to comment.