From 72b620f68f8375434c59a3b61083dce08ba6de93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 6 Oct 2022 13:06:03 +0200 Subject: [PATCH 1/8] include more block metrics --- block_payload.go | 9 +++++++++ filesource.go | 5 +++++ hub/hub.go | 3 +++ metrics.go | 9 +++++++++ 4 files changed, 26 insertions(+) diff --git a/block_payload.go b/block_payload.go index caa5830..383ce97 100644 --- a/block_payload.go +++ b/block_payload.go @@ -17,6 +17,7 @@ type BlockPayloadSetter func(block *Block, data []byte) (*Block, error) type BlockPayload interface { Get() (data []byte, err error) + Size() int } type MemoryBlockPayload struct { @@ -35,6 +36,10 @@ func (p *MemoryBlockPayload) Get() (data []byte, err error) { return p.data, err } +func (p *MemoryBlockPayload) Size() int { + return len(p.data) +} + var atmCache *atm.Cache var store dstore.Store @@ -109,6 +114,10 @@ func (p *ATMCachedBlockPayload) Get() (data []byte, err error) { return } +func (p *ATMCachedBlockPayload) Size() int { + return p.dataSize +} + func ATMCachedPayloadSetter(block *Block, data []byte) (*Block, error) { _, err := getCache().Write(block.Id, block.Timestamp, time.Now(), data) if err != nil { diff --git a/filesource.go b/filesource.go index a42be1a..124e139 100644 --- a/filesource.go +++ b/filesource.go @@ -449,6 +449,9 @@ func (s *FileSource) streamReader(blockReader BlockReader, prevLastBlockRead Blo break } + BlocksReadFileSource.Inc() + BytesReadFileSource.AddInt(blk.Payload.Size()) + blockNum := blk.Num() if blockNum < s.startBlockNum { continue @@ -509,6 +512,8 @@ func (s *FileSource) preprocess(block *Block, out chan *PreprocessedBlock) { }} zlog.Debug("block pre processed", zap.Stringer("block_ref", block)) + BlocksSentFileSource.Inc() + BytesSentFileSource.AddInt(block.Payload.Size()) select { case <-s.Terminating(): return diff --git a/hub/hub.go b/hub/hub.go index 5275bc7..73b6549 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -300,6 +300,9 @@ func (h *ForkableHub) processBlock(blk *bstream.Block, obj interface{}) error { zlog.Debug("process_block", zap.Stringer("blk", blk), zap.Any("obj", obj.(*forkable.ForkableObject).Step())) preprocBlock := &bstream.PreprocessedBlock{Block: blk, Obj: obj} + bstream.BlocksReadLiveSource.Inc() + bstream.BytesReadLiveSource.AddInt(blk.Payload.Size()) + subscribers := h.subscribers // we may remove some from the original slice during the loop for _, sub := range subscribers { diff --git a/metrics.go b/metrics.go index 51f94d4..28136bf 100644 --- a/metrics.go +++ b/metrics.go @@ -20,6 +20,15 @@ import ( var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream")) +var BlocksReadFileSource = Metrics.NewCounter("bstream_blocks_read_filesource", "Number of blocks read from file source") +var BytesReadFileSource = Metrics.NewCounter("bstream_bytes_read_filesource", "Bytes read from file source") + +var BlocksSentFileSource = Metrics.NewCounter("bstream_blocks_sent_filesource", "Number of blocks sent that came from file source") +var BytesSentFileSource = Metrics.NewCounter("bstream_bytes_sent_filesource", "Bytes sent that came from file source") + +var BlocksReadLiveSource = Metrics.NewCounter("bstream_blocks_read_livesource", "Number of blocks read from live source") +var BytesReadLiveSource = Metrics.NewCounter("bstream_bytes_read_livesource", "Bytes read from live source") + func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler { return HandlerFunc(func(blk *Block, obj interface{}) error { blkDrift.SetBlockTime(blk.Time()) From d1a42b2e11912836f64cc5b39590718d7e4c2cd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Fri, 7 Oct 2022 12:17:02 +0200 Subject: [PATCH 2/8] remove duplicate bstream prefix --- metrics.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metrics.go b/metrics.go index 28136bf..7808911 100644 --- a/metrics.go +++ b/metrics.go @@ -20,14 +20,14 @@ import ( var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream")) -var BlocksReadFileSource = Metrics.NewCounter("bstream_blocks_read_filesource", "Number of blocks read from file source") -var BytesReadFileSource = Metrics.NewCounter("bstream_bytes_read_filesource", "Bytes read from file source") +var BlocksReadFileSource = Metrics.NewCounter("blocks_read_filesource", "Number of blocks read from file source") +var BytesReadFileSource = Metrics.NewCounter("bytes_read_filesource", "Bytes read from file source") -var BlocksSentFileSource = Metrics.NewCounter("bstream_blocks_sent_filesource", "Number of blocks sent that came from file source") -var BytesSentFileSource = Metrics.NewCounter("bstream_bytes_sent_filesource", "Bytes sent that came from file source") +var BlocksSentFileSource = Metrics.NewCounter("blocks_sent_filesource", "Number of blocks sent that came from file source") +var BytesSentFileSource = Metrics.NewCounter("bytes_sent_filesource", "Bytes sent that came from file source") -var BlocksReadLiveSource = Metrics.NewCounter("bstream_blocks_read_livesource", "Number of blocks read from live source") -var BytesReadLiveSource = Metrics.NewCounter("bstream_bytes_read_livesource", "Bytes read from live source") +var BlocksReadLiveSource = Metrics.NewCounter("blocks_read_livesource", "Number of blocks read from live source") +var BytesReadLiveSource = Metrics.NewCounter("bytes_read_livesource", "Bytes read from live source") func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler { return HandlerFunc(func(blk *Block, obj interface{}) error { From c519edbfd3b88b78d205fc822bf1c113b8c64dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Fri, 7 Oct 2022 12:44:23 +0200 Subject: [PATCH 3/8] update joiningsource to include context, add blocks behind live metrics --- go.mod | 11 ++++++++--- go.sum | 22 ++++++++++++++++++++++ joiningsource.go | 17 +++++++++++++++++ metrics.go | 2 ++ stream/stream.go | 5 +++-- 5 files changed, 52 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 5a296ce..ffdceeb 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 + github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 github.com/streamingfast/pbgo v0.0.6-0.20220629184423-cfd0608e0cf4 @@ -29,7 +30,7 @@ require ( cloud.google.com/go/monitoring v1.6.0 // indirect cloud.google.com/go/storage v1.22.1 // indirect cloud.google.com/go/trace v1.2.0 // indirect - contrib.go.opencensus.io/exporter/stackdriver v0.12.6 // indirect + contrib.go.opencensus.io/exporter/stackdriver v0.13.10 // indirect contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-storage-blob-go v0.14.0 // indirect @@ -37,7 +38,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/blendle/zapdriver v1.3.1 // indirect - github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect + github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -58,12 +59,12 @@ require ( github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/openzipkin/zipkin-go v0.1.6 // indirect + github.com/paulbellamy/ratecounter v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect @@ -80,3 +81,7 @@ require ( google.golang.org/genproto v0.0.0-20220808131553-a91ffa7f803e // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) + +replace github.com/streamingfast/dtracing => github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55 + +replace github.com/streamingfast/dmetrics => github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f diff --git a/go.sum b/go.sum index abafae5..2dc12c2 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,7 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw= cloud.google.com/go/iam v0.3.0 h1:exkAomrVUuzx9kWFI1wm3KI0uoDeUFPB4kKGzx6x+Gc= cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY= +cloud.google.com/go/monitoring v1.1.0/go.mod h1:L81pzz7HKn14QCMaCs6NTQkdBnE87TElyanS95vIcl4= cloud.google.com/go/monitoring v1.6.0 h1:+x5AA2mFkiHK/ySN6NWKbeKBV+Z/DN+h51kBzcW08zU= cloud.google.com/go/monitoring v1.6.0/go.mod h1:w+OY1TYCk4MtvY7WfEHlIp5mP8SV/gDSqOsvGhVa2KM= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -67,10 +68,13 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA= cloud.google.com/go/storage v1.22.1 h1:F6IlQJZrZM++apn9V5/VfS3gbTUYg98PS3EMQAzqtfg= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= +cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A= cloud.google.com/go/trace v1.2.0 h1:oIaB4KahkIUOpLSAAjEJ8y2desbjY/x/RfP4O3KAtTI= cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM= contrib.go.opencensus.io/exporter/stackdriver v0.12.6 h1:Y2FTyj0HgOhfjEW6D6ytZNoz1YcPDXmkKr1I478CWKs= contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk= +contrib.go.opencensus.io/exporter/stackdriver v0.13.10 h1:a9+GZPUe+ONKUwULjlEOucMMG0qfSCCenlji0Nhqbys= +contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8= contrib.go.opencensus.io/exporter/zipkin v0.1.1 h1:PR+1zWqY8ceXs1qDQQIlgXe+sdiwCf0n32bH4+Epk8g= contrib.go.opencensus.io/exporter/zipkin v0.1.1/go.mod h1:GMvdSl3eJ2gapOaLKzTKE3qDgUkJ86k9k3yY2eqwkzc= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -119,6 +123,8 @@ github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHf github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc= github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= +github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -335,7 +341,13 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.6 h1:yXiysv1CSK7Q5yjGy1710zZGnsbMUIjluWBxtLXHPBo= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f h1:ca5ZACDx4kZRQk43gxnr5l5v7eVVmRKrKvCHAwW+e5I= +github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw= +github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55 h1:HYk1ueqMDudtAJMXrt3Vky7GwC3A/ius0NiC4IqrD/8= +github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -385,6 +397,7 @@ github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYp github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q= github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20210908162127-bdc5856d5341/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= +github.com/streamingfast/logging v0.0.0-20220304183711-ddba33d79e27/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 h1:qKt1W13L7GXL3xqvD6z2ufSkIy/KDm9oGrfurypC78E= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 h1:xlWSfi1BoPfsHtPb0VEHGUcAdBF208LUiFCwfaVPfLA= @@ -549,6 +562,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= @@ -628,6 +642,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -753,6 +769,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= +google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E= +google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU= google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo= google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM= @@ -834,7 +852,11 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211018162055-cf77aa76bad2/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= diff --git a/joiningsource.go b/joiningsource.go index 84fbd53..dfa2ff4 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -15,9 +15,12 @@ package bstream import ( + "context" "errors" "fmt" + "github.com/streamingfast/dtracing" "sync" + "time" "github.com/streamingfast/shutter" "go.uber.org/zap" @@ -44,6 +47,7 @@ type JoiningSource struct { lastBlockProcessed *Block + ctx context.Context startBlockNum uint64 // overriden by cursor if it exists cursor *Cursor @@ -54,6 +58,7 @@ func NewJoiningSource( fileSourceFactory, liveSourceFactory ForkableSourceFactory, h Handler, + ctx context.Context, startBlockNum uint64, cursor *Cursor, logger *zap.Logger) *JoiningSource { @@ -64,6 +69,7 @@ func NewJoiningSource( fileSourceFactory: fileSourceFactory, liveSourceFactory: liveSourceFactory, handler: h, + ctx: ctx, startBlockNum: startBlockNum, cursor: cursor, logger: logger, @@ -105,6 +111,8 @@ func (s *JoiningSource) run() error { return fileSrc.Err() } + s.OnTerminating(s.deleteLabeledMetrics) + s.OnTerminating(s.liveSource.Shutdown) s.liveSource.Run() return s.liveSource.Err() @@ -123,6 +131,8 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { return nil } + BlocksBehindLive.SetUint64(s.lowestLiveBlockNum-blk.Number, dtracing.GetTraceIDOrEmpty(s.ctx).String()) + if blk.Number >= s.lowestLiveBlockNum { if src := s.liveSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil { s.liveSource = src @@ -135,3 +145,10 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { return s.handler.ProcessBlock(blk, obj) } + +func (s *JoiningSource) deleteLabeledMetrics(_ error) { + go func() { + time.Sleep(2 * time.Minute) + BlocksBehindLive.DeleteLabelValues(dtracing.GetTraceIDOrEmpty(s.ctx).String()) + }() +} diff --git a/metrics.go b/metrics.go index 7808911..a81cd96 100644 --- a/metrics.go +++ b/metrics.go @@ -29,6 +29,8 @@ var BytesSentFileSource = Metrics.NewCounter("bytes_sent_filesource", "Bytes sen var BlocksReadLiveSource = Metrics.NewCounter("blocks_read_livesource", "Number of blocks read from live source") var BytesReadLiveSource = Metrics.NewCounter("bytes_read_livesource", "Bytes read from live source") +var BlocksBehindLive = Metrics.NewGaugeVec("blocks_behind_live", []string{"trace_id"}, "Number of blocks behind live source") + func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler { return HandlerFunc(func(blk *Block, obj interface{}) error { blkDrift.SetBlockTime(blk.Time()) diff --git a/stream/stream.go b/stream/stream.go index 014eafa..bad89bb 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -75,7 +75,7 @@ func New( } func (s *Stream) Run(ctx context.Context) error { - source, err := s.createSource() + source, err := s.createSource(ctx) if err != nil { return err } @@ -97,7 +97,7 @@ func (s *Stream) Run(ctx context.Context) error { return nil } -func (s *Stream) createSource() (bstream.Source, error) { +func (s *Stream) createSource(ctx context.Context) (bstream.Source, error) { s.logger.Debug("setting up firehose source") absoluteStartBlockNum, err := resolveNegativeStartBlockNum(s.startBlockNum, s.currentHeadGetter) @@ -138,6 +138,7 @@ func (s *Stream) createSource() (bstream.Source, error) { s.fileSourceFactory, s.liveSourceFactory, h, + ctx, absoluteStartBlockNum, s.cursor, s.logger, From bf317706688510b4f74a7137e14518ab26a3377e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Fri, 7 Oct 2022 13:02:46 +0200 Subject: [PATCH 4/8] add debug logging --- joiningsource.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/joiningsource.go b/joiningsource.go index dfa2ff4..ed16fa2 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -104,6 +104,8 @@ func (s *JoiningSource) run() error { s.cursor.String()) } + s.OnTerminating(s.deleteLabeledMetrics) + s.OnTerminating(fileSrc.Shutdown) fileSrc.Run() @@ -111,8 +113,6 @@ func (s *JoiningSource) run() error { return fileSrc.Err() } - s.OnTerminating(s.deleteLabeledMetrics) - s.OnTerminating(s.liveSource.Shutdown) s.liveSource.Run() return s.liveSource.Err() @@ -147,6 +147,7 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { } func (s *JoiningSource) deleteLabeledMetrics(_ error) { + s.logger.Debug("delete labeled metrics for bstream", zap.String("trace_id", dtracing.GetTraceIDOrEmpty(s.ctx).String())) go func() { time.Sleep(2 * time.Minute) BlocksBehindLive.DeleteLabelValues(dtracing.GetTraceIDOrEmpty(s.ctx).String()) From d99aef250456709ec71fcebbc09359b0619dec10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Fri, 7 Oct 2022 13:23:03 +0200 Subject: [PATCH 5/8] improve blocks behind live metric handling --- joiningsource.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/joiningsource.go b/joiningsource.go index ed16fa2..07ecf7c 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -104,7 +104,7 @@ func (s *JoiningSource) run() error { s.cursor.String()) } - s.OnTerminating(s.deleteLabeledMetrics) + defer s.deleteBlocksBehindLive() s.OnTerminating(fileSrc.Shutdown) fileSrc.Run() @@ -116,7 +116,6 @@ func (s *JoiningSource) run() error { s.OnTerminating(s.liveSource.Shutdown) s.liveSource.Run() return s.liveSource.Err() - } func (s *JoiningSource) tryGetSource(handler Handler, factory ForkableSourceFactory) Source { @@ -131,7 +130,7 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { return nil } - BlocksBehindLive.SetUint64(s.lowestLiveBlockNum-blk.Number, dtracing.GetTraceIDOrEmpty(s.ctx).String()) + s.logBlocksBehindLive(s.lowestLiveBlockNum - blk.Number) if blk.Number >= s.lowestLiveBlockNum { if src := s.liveSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil { @@ -146,10 +145,22 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { return s.handler.ProcessBlock(blk, obj) } -func (s *JoiningSource) deleteLabeledMetrics(_ error) { - s.logger.Debug("delete labeled metrics for bstream", zap.String("trace_id", dtracing.GetTraceIDOrEmpty(s.ctx).String())) +func (s *JoiningSource) deleteBlocksBehindLive() { + traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() + s.logger.Debug("delete blocks behind live metric", zap.String("trace_id", traceId)) go func() { time.Sleep(2 * time.Minute) - BlocksBehindLive.DeleteLabelValues(dtracing.GetTraceIDOrEmpty(s.ctx).String()) + BlocksBehindLive.DeleteLabelValues(traceId) }() } + +func (s *JoiningSource) logBlocksBehindLive(blocksBehindLive uint64) { + traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() + + // if we caught up we don't need to keep the metric anymore + if blocksBehindLive <= 0 { + s.deleteBlocksBehindLive() + } else { + BlocksBehindLive.SetUint64(blocksBehindLive, traceId) + } +} From ef37ae6a0a0f29fe281f08962c9b8ce2d8afb4d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Mon, 10 Oct 2022 15:55:34 +0200 Subject: [PATCH 6/8] cleanup --- joiningsource.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/joiningsource.go b/joiningsource.go index 07ecf7c..0f7a14e 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -18,10 +18,10 @@ import ( "context" "errors" "fmt" - "github.com/streamingfast/dtracing" "sync" "time" + "github.com/streamingfast/dtracing" "github.com/streamingfast/shutter" "go.uber.org/zap" ) @@ -129,7 +129,6 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { if s.liveSource != nil { // we should be already shutdown anyway return nil } - s.logBlocksBehindLive(s.lowestLiveBlockNum - blk.Number) if blk.Number >= s.lowestLiveBlockNum { @@ -147,8 +146,10 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { func (s *JoiningSource) deleteBlocksBehindLive() { traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() - s.logger.Debug("delete blocks behind live metric", zap.String("trace_id", traceId)) + go func() { + // allow Prometheus to scrape the current metrics before they are dropped + // 2 min is the maximum recommended scrape interval time.Sleep(2 * time.Minute) BlocksBehindLive.DeleteLabelValues(traceId) }() @@ -157,8 +158,7 @@ func (s *JoiningSource) deleteBlocksBehindLive() { func (s *JoiningSource) logBlocksBehindLive(blocksBehindLive uint64) { traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() - // if we caught up we don't need to keep the metric anymore - if blocksBehindLive <= 0 { + if blocksBehindLive <= 0 { // avoid cluttering the metrics with streams that caught up to live s.deleteBlocksBehindLive() } else { BlocksBehindLive.SetUint64(blocksBehindLive, traceId) From 5595bd87f54a8b1d890922544dfbb7033821df45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Mon, 10 Oct 2022 15:55:42 +0200 Subject: [PATCH 7/8] add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b41d8d..770cc7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- Added metrics to expose blocks/bytes read/sent from sources +- Added a metric to track blocks behind live on a joiningsource for a trace_id - Added FileSourceWithSecondaryBlocksStores Option to allow a fallback location - `.SetNearBlocksCount(count)` and `.Clone()` on `Tracker` object. - `Tracker` object to streamline queries about different targets (like network head, database lib, relayer blockstream head, whatever other BlockRef tags), ask the question about them being near one another (to select between live mode or catch-up mode). Also streamlines the requests of a start block, with a bunch of different backend implementations that can answer to the questions regarding where to start. From ec1f36120f239e838a33b9456e46cf074f55834b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Wed, 12 Oct 2022 11:05:53 +0200 Subject: [PATCH 8/8] bump dtracing, dmetrics, remove replacement --- go.mod | 10 +++------- go.sum | 4 ++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index ffdceeb..2c91523 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,9 @@ require ( github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680 github.com/streamingfast/dbin v0.0.0-20210809205249-73d5eca35dc5 github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc - github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 + github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429 github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 - github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a + github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 github.com/streamingfast/pbgo v0.0.6-0.20220629184423-cfd0608e0cf4 @@ -80,8 +80,4 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220808131553-a91ffa7f803e // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect -) - -replace github.com/streamingfast/dtracing => github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55 - -replace github.com/streamingfast/dmetrics => github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f +) \ No newline at end of file diff --git a/go.sum b/go.sum index 2dc12c2..bd9aa1e 100644 --- a/go.sum +++ b/go.sum @@ -391,10 +391,14 @@ github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc h1:sRhUilbZExb github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc/go.mod h1:YlFJuFiB9rmglB5UfTfnsOTfKC1rFo+D0sRbTzLcqgc= github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 h1:oZwOVjxpWCqLUjgcPgVigVCHYR40JkmXfm1kuMcCOQk= github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447/go.mod h1:VLdQY/FwczmC/flqWkcsBbqXO4BhU4zQDSK7GMrpcjY= +github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429 h1:ll/nzUOxIt7vmJxI5cNJVS1KEsneOSvwkDHt9Vaqq+Q= +github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw= github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 h1:xpy3HNXeUHaZexf42duj7NeOmXcGfDMJXlZaj3CX18Y= github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648/go.mod h1:SHSEIPowGeE1TfNNmGeAUUnlO3dwevmX5kFOSazU60M= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYpueJYOQReTJpfAhAPk0uZD4I58LfiUAr4IMc= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q= +github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb h1:XPuLw6gwN4k1DRxHEwRGbNYVVBf3Petx+Cv5GUx2vIU= +github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20210908162127-bdc5856d5341/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220304183711-ddba33d79e27/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=