Skip to content

Commit

Permalink
Add gRPC render metadata header
Browse files Browse the repository at this point in the history
In headers, we now send the number of metrics gRPC render will
stream. This can be beneficial for the receiver to know how many
metrics it will receive over the stream.
  • Loading branch information
emadolsky committed Feb 17, 2023
1 parent 1d734b7 commit 55c561f
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -646,6 +647,13 @@ func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_Rende
return
}

const gRPCRenderMetricsCountHeaderKey = "metrics-count"

func (listener *CarbonserverListener) sendRenderMetadataHeader(stream grpcv2.CarbonV2_RenderServer, filesCount int) error {
header := metadata.Pairs(gRPCRenderMetricsCountHeaderKey, strconv.Itoa(filesCount))
return stream.SendHeader(header)
}

// Render implements Render rpc of CarbonV2 gRPC service
func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) {
t0 := time.Now()
Expand Down Expand Up @@ -726,6 +734,10 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
tle.MetricGlobMapLength = len(metricGlobMap)
filesCount := countFilesInExpandedGlobs(expandedGlobs)
err = listener.sendRenderMetadataHeader(stream, filesCount)
if err != nil {
return nil, err
}
prepareChan := make(chan response, getStreamingChannelSize(filesCount))
go func() {
prepareT0 := time.Now()
Expand Down Expand Up @@ -768,6 +780,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
case res != nil:
atomic.AddUint64(&listener.metrics.QueryCacheHit, 1)
cachedResponses := res.([]response)
err = listener.sendRenderMetadataHeader(stream, len(cachedResponses))
responseChanToStream = make(chan response, getStreamingChannelSize(len(cachedResponses)))
go func() {
for _, r := range cachedResponses {
Expand Down

0 comments on commit 55c561f

Please sign in to comment.