From 5d441b433c4e16bda10bd89de6f6f9f157e831f0 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 30 Oct 2023 13:07:20 +0100 Subject: [PATCH] Pass request context to frame observer This allows the frame observer to access the context passed to gocql. --- conn.go | 5 +++++ frame.go | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/conn.go b/conn.go index cd9773d01..5038b22c1 100644 --- a/conn.go +++ b/conn.go @@ -788,6 +788,7 @@ func (c *Conn) recv(ctx context.Context) error { } else if head.stream != call.streamID { panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream)) } + parseObserver.reqCtx = call.reqCtx framer := newFramer(c.compressor, c.version) @@ -851,6 +852,9 @@ type callReq struct { // streamObserverEndOnce ensures that either StreamAbandoned or StreamFinished is called, // but not both. streamObserverEndOnce sync.Once + + // context passed to execute. + reqCtx context.Context } type callResp struct { @@ -1104,6 +1108,7 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram timeout: make(chan struct{}), streamID: stream, resp: make(chan callResp), + reqCtx: ctx, } if c.streamObserver != nil { diff --git a/frame.go b/frame.go index bcf748b76..b876d4c13 100644 --- a/frame.go +++ b/frame.go @@ -410,6 +410,7 @@ type framer struct { type frameParseObserver struct { head ObservedFrameHeader frameObserver FrameObserver + reqCtx context.Context } func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) { @@ -425,7 +426,11 @@ func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) { of.RowCount = rows.numRows of.RowsSize = rows.rowsContentSize } - fpo.frameObserver.ObserveFrame(context.TODO(), of) + ctx := fpo.reqCtx + if ctx == nil { + ctx = context.TODO() + } + fpo.frameObserver.ObserveFrame(ctx, of) } func newFramer(compressor Compressor, version byte) *framer {