Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Pass request context to frame observer
Browse files Browse the repository at this point in the history
This allows the frame observer to access the context passed to
gocql.
  • Loading branch information
martin-sucha committed Oct 30, 2023
1 parent fb119cd commit 5d441b4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
5 changes: 5 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ func (c *Conn) recv(ctx context.Context) error {
} else if head.stream != call.streamID {
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
}
parseObserver.reqCtx = call.reqCtx

framer := newFramer(c.compressor, c.version)

Expand Down Expand Up @@ -851,6 +852,9 @@ type callReq struct {
// streamObserverEndOnce ensures that either StreamAbandoned or StreamFinished is called,
// but not both.
streamObserverEndOnce sync.Once

// context passed to execute.
reqCtx context.Context
}

type callResp struct {
Expand Down Expand Up @@ -1104,6 +1108,7 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
timeout: make(chan struct{}),
streamID: stream,
resp: make(chan callResp),
reqCtx: ctx,
}

if c.streamObserver != nil {
Expand Down
7 changes: 6 additions & 1 deletion frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ type framer struct {
type frameParseObserver struct {
head ObservedFrameHeader
frameObserver FrameObserver
reqCtx context.Context
}

func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) {
Expand All @@ -425,7 +426,11 @@ func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) {
of.RowCount = rows.numRows
of.RowsSize = rows.rowsContentSize
}
fpo.frameObserver.ObserveFrame(context.TODO(), of)
ctx := fpo.reqCtx
if ctx == nil {
ctx = context.TODO()
}
fpo.frameObserver.ObserveFrame(ctx, of)
}

func newFramer(compressor Compressor, version byte) *framer {
Expand Down

0 comments on commit 5d441b4

Please sign in to comment.