Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trace support into task #55

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, wi WorkItem) er
}()
}

// set the parent trace context to be the newly created activity span
ts.ParentTraceContext = helpers.TraceContextFromSpan(span)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to reset the ParentTraceContext here as the newly created context above already has the span information. It can be extracted in the ExecuteActivity itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still need to set the ParentTraceContext as I need to populate it here

ParentTraceContext: task.ParentTraceContext,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually get span from context itself: https://pkg.go.dev/go.opentelemetry.io/otel/trace#SpanFromContext and then get traceContext from span, but i guess it's easier updating the event itself.

My issue was updating the event was that it should't cause any issue in case of errors/retries(like recursively adding a new span under activity span), but it doesn't seem anything like that happening for now, so no issues.

// Execute the activity and get its result
result, err := p.executor.ExecuteActivity(ctx, awi.InstanceID, awi.NewEvent)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
Input: task.Input,
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
TaskId: e.EventId,
ParentTraceContext: task.ParentTraceContext,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion backend/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA
}
} else if createtimer := action.GetCreateTimer(); createtimer != nil {
s.AddEvent(helpers.NewTimerCreatedEvent(action.Id, createtimer.FireAt))
s.pendingTimers = append(s.pendingTimers, helpers.NewTimerFiredEvent(action.Id, createtimer.FireAt, currentTraceContext))
s.pendingTimers = append(s.pendingTimers, helpers.NewTimerFiredEvent(action.Id, createtimer.FireAt))
} else if scheduleTask := action.GetScheduleTask(); scheduleTask != nil {
scheduledEvent := helpers.NewTaskScheduledEvent(
action.Id,
Expand Down
2 changes: 1 addition & 1 deletion client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
executor backend.Executor,
req *protos.ActivityRequest,
) {
var tc *protos.TraceContext = nil // TODO: How to populate trace context?
var tc *protos.TraceContext = req.ParentTraceContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need more help understanding why we need to pass ParentTraceContext on this request object instead of just getting it directly from the gRPC context. Is it because work items are received on a stream and gRPC doesn't automatically populate any distributed trace context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think

  1. the grpc context cannot directly used to pass through key-value pair (span is usually stored as key-val pair in context in go), instead it needs a bit more work https://www.hward.com/golang-grpc-context-client-server/.
  2. we obtain the stream at
    stream, err := c.client.GetWorkItems(ctx, &req)
    , at this point we don't have any trace context set up in the context, the context basically is an empty context. The stream should not have any trace info therefore.

I tried a local run to assert this
This is the context we build up on grpc server side
image

This is the context at

ctx context.Context,

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from here: #31 (comment)

Agree @kaibocai. I looked into it as well. We can easily propagate context for a normal grpc request(non-streaming) using otel interceptors(e.g. here) but for streaming requests, context is propagated at the start only i.e. when the stream is created. Once the stream creation is done, then since no context is passed during stream.send() method, and because we can't update stream context, the newer context(updated after stream creation) is never passed to client/server.

Here in DTF-Go as well at the time of stream creation, there is nothing in context(since no orchestration/acitivity are created yet). We have the updated context(with all span info) when activity/orchestration workitem needs to be sent to the app but this context is not passed while sending the workItem. So best way to do it is to encode trace context in the message itself.

event := helpers.NewTaskScheduledEvent(req.TaskId, req.Name, req.Version, req.Input, tc)
result, err := executor.ExecuteActivity(ctx, api.InstanceID(req.OrchestrationInstance.InstanceId), event)

Expand Down
1 change: 0 additions & 1 deletion internal/helpers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func NewTimerCreatedEvent(eventID int32, fireAt *timestamppb.Timestamp) *protos.
func NewTimerFiredEvent(
timerID int32,
fireAt *timestamppb.Timestamp,
parentTraceContext *protos.TraceContext,
) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Expand Down
2,347 changes: 1,180 additions & 1,167 deletions internal/protos/orchestrator_service.pb.go

Large diffs are not rendered by default.

116 changes: 44 additions & 72 deletions internal/protos/orchestrator_service_grpc.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions samples/distributedtracing/distributedtracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/microsoft/durabletask-go/task"
)

var tracer = otel.Tracer("distributedtracing-example")

func main() {
// Tracing can be configured independently of the orchestration code.
tp, err := ConfigureZipkinTracing()
Expand Down Expand Up @@ -136,6 +138,16 @@ func DoWorkActivity(ctx task.ActivityContext) (any, error) {
return "", err
}

_, childSpan := tracer.Start(ctx.Context(), "activity-subwork")
// Simulate doing some sub work
select {
case <-time.After(2 * time.Second):
// Ok
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
}
childSpan.End()

// Simulate doing work
select {
case <-time.After(duration):
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
6 changes: 6 additions & 0 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package task

import (
"context"
"fmt"

"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
"google.golang.org/protobuf/types/known/wrapperspb"
)
Expand Down Expand Up @@ -52,6 +54,10 @@ type activityContext struct {
type Activity func(ctx ActivityContext) (any, error)

func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {
ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that the ctx argument already has the correct trace context, being set much earlier than this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for examples running not through the grpc we did have the trace context, but if we run it through grpc we are missing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually set the context in processWorkItem itself:

func (c *TaskHubGrpcClient) processActivityWorkItem(

if err != nil {
fmt.Printf("%v: failed to parse trace context: %v", ts.Name, err)
}
return &activityContext{
TaskID: taskID,
Name: ts.Name,
Expand Down
53 changes: 53 additions & 0 deletions tests/orchestrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
Expand All @@ -19,6 +20,8 @@ import (
"github.com/microsoft/durabletask-go/task"
)

var tracer = otel.Tracer("orchestration-test")

func Test_EmptyOrchestration(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
Expand Down Expand Up @@ -207,6 +210,56 @@ func Test_SingleActivity(t *testing.T) {
)
}

func Test_SingleActivity_TaskSpan(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a test added in https://github.com/microsoft/durabletask-go/blob/main/tests/grpc/grpc_test.go as well to validate we are correctly passing the context information over grpc stream.

// Registration
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
_, childSpan := tracer.Start(ctx.Context(), "activityChild")
childSpan.End()
return fmt.Sprintf("Hello, %s!", name), nil
})

// Initialization
ctx := context.Background()
exporter := initTracing()
client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"))
if assert.NoError(t, err) {
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
if assert.NoError(t, err) {
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus)
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
}
}

// Validate the exported OTel traces
spans := exporter.GetSpans().Snapshots()
assertSpanSequence(t, spans,
assertOrchestratorCreated("SingleActivity", id),
assertSpan("activityChild"),
assertActivity("SayHello", id, 0),
assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"),
)
// assert child-parent relationship
assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID())
}

func Test_ActivityChain(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
Expand Down
Loading