-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add trace support into task #55
base: main
Are you sure you want to change the base?
Changes from 3 commits
33790bd
9fd912b
d11688c
3bbf9d1
a5b3495
7a608b7
ab03cd0
14a9bda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -94,7 +94,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( | |||||
executor backend.Executor, | ||||||
req *protos.ActivityRequest, | ||||||
) { | ||||||
var tc *protos.TraceContext = nil // TODO: How to populate trace context? | ||||||
var tc *protos.TraceContext = req.ParentTraceContext | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need more help understanding why we need to pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
I tried a local run to assert this This is the context at durabletask-go/client/worker_grpc.go Line 93 in fc57567
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copying from here: #31 (comment) Agree @kaibocai. I looked into it as well. We can easily propagate context for a normal grpc request(non-streaming) using otel interceptors(e.g. here) but for streaming requests, context is propagated at the start only i.e. when the stream is created. Once the stream creation is done, then since no context is passed during stream.send() method, and because we can't update stream context, the newer context(updated after stream creation) is never passed to client/server. Here in DTF-Go as well at the time of stream creation, there is nothing in context(since no orchestration/acitivity are created yet). We have the updated context(with all span info) when activity/orchestration workitem needs to be sent to the app but this context is not passed while sending the workItem. So best way to do it is to encode trace context in the message itself. |
||||||
event := helpers.NewTaskScheduledEvent(req.TaskId, req.Name, req.Version, req.Input, tc) | ||||||
result, err := executor.ExecuteActivity(ctx, api.InstanceID(req.OrchestrationInstance.InstanceId), event) | ||||||
|
||||||
|
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -2,7 +2,9 @@ package task | |||
|
||||
import ( | ||||
"context" | ||||
"fmt" | ||||
|
||||
"github.com/microsoft/durabletask-go/internal/helpers" | ||||
"github.com/microsoft/durabletask-go/internal/protos" | ||||
"google.golang.org/protobuf/types/known/wrapperspb" | ||||
) | ||||
|
@@ -52,6 +54,10 @@ type activityContext struct { | |||
type Activity func(ctx ActivityContext) (any, error) | ||||
|
||||
func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext { | ||||
ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for examples running not through the grpc we did have the trace context, but if we run it through grpc we are missing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can actually set the context in durabletask-go/client/worker_grpc.go Line 92 in fc57567
|
||||
if err != nil { | ||||
fmt.Printf("%v: failed to parse trace context: %v", ts.Name, err) | ||||
} | ||||
return &activityContext{ | ||||
TaskID: taskID, | ||||
Name: ts.Name, | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/otel" | ||
|
||
"github.com/microsoft/durabletask-go/api" | ||
"github.com/microsoft/durabletask-go/backend" | ||
|
@@ -19,6 +20,8 @@ import ( | |
"github.com/microsoft/durabletask-go/task" | ||
) | ||
|
||
var tracer = otel.Tracer("orchestration-test") | ||
|
||
func Test_EmptyOrchestration(t *testing.T) { | ||
// Registration | ||
r := task.NewTaskRegistry() | ||
|
@@ -207,6 +210,56 @@ func Test_SingleActivity(t *testing.T) { | |
) | ||
} | ||
|
||
func Test_SingleActivity_TaskSpan(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a test added in https://github.com/microsoft/durabletask-go/blob/main/tests/grpc/grpc_test.go as well to validate we are correctly passing the context information over grpc stream. |
||
// Registration | ||
r := task.NewTaskRegistry() | ||
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { | ||
var input string | ||
if err := ctx.GetInput(&input); err != nil { | ||
return nil, err | ||
} | ||
var output string | ||
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) | ||
return output, err | ||
}) | ||
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { | ||
var name string | ||
if err := ctx.GetInput(&name); err != nil { | ||
return nil, err | ||
} | ||
_, childSpan := tracer.Start(ctx.Context(), "activityChild") | ||
childSpan.End() | ||
return fmt.Sprintf("Hello, %s!", name), nil | ||
}) | ||
|
||
// Initialization | ||
ctx := context.Background() | ||
exporter := initTracing() | ||
client, worker := initTaskHubWorker(ctx, r) | ||
defer worker.Shutdown(ctx) | ||
|
||
// Run the orchestration | ||
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界")) | ||
if assert.NoError(t, err) { | ||
metadata, err := client.WaitForOrchestrationCompletion(ctx, id) | ||
if assert.NoError(t, err) { | ||
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) | ||
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) | ||
} | ||
} | ||
|
||
// Validate the exported OTel traces | ||
spans := exporter.GetSpans().Snapshots() | ||
assertSpanSequence(t, spans, | ||
assertOrchestratorCreated("SingleActivity", id), | ||
assertSpan("activityChild"), | ||
assertActivity("SayHello", id, 0), | ||
assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), | ||
) | ||
// assert child-parent relationship | ||
assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) | ||
} | ||
|
||
func Test_ActivityChain(t *testing.T) { | ||
// Registration | ||
r := task.NewTaskRegistry() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to reset the
ParentTraceContext
here as the newly created context above already has the span information. It can be extracted in theExecuteActivity
itself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I still need to set the
ParentTraceContext
as I need to populate it heredurabletask-go/backend/executor.go
Line 151 in d11688c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can actually get span from context itself: https://pkg.go.dev/go.opentelemetry.io/otel/trace#SpanFromContext and then get traceContext from span, but i guess it's easier updating the event itself.
My issue was updating the event was that it should't cause any issue in case of errors/retries(like recursively adding a new span under activity span), but it doesn't seem anything like that happening for now, so no issues.