Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add telemetry logs for cancelled changefeed events #142060

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ ChangefeedFailed events.
| `Transformation` | Flag representing whether the changefeed is using CDC queries. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |
| `JobId` | The job id for enterprise changefeeds. | no |

### `cancel_changefeed`

An event of type `cancel_changefeed` is an event for changefeed cancellations.

#### Common fields

| Field | Description | Sensitive |
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,7 @@ func (b *changefeedResumer) OnFailOrCancel(
if jobs.HasErrJobCanceled(
errors.DecodeError(ctx, *b.job.Payload().FinalResumeError),
) {
logChangefeedCancelledTelemetry(ctx)
telemetry.Count(`changefeed.enterprise.cancel`)
} else {
telemetry.Count(`changefeed.enterprise.fail`)
Expand Down Expand Up @@ -1699,6 +1700,11 @@ func logChangefeedFailedTelemetryDuringStartup(
log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
}

func logChangefeedCancelledTelemetry(ctx context.Context) {
changefeedCancelled := &eventpb.ChangefeedCancelled{}
log.StructuredEvent(ctx, severity.INFO, changefeedCancelled)
}

func makeCommonChangefeedEventDetails(
ctx context.Context, details jobspb.ChangefeedDetails, description string, jobID jobspb.JobID,
) eventpb.CommonChangefeedEventDetails {
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/log/eventpb/changefeed_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ message ChangefeedFailed {
string failure_type = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
}

// ChangefeedCancelled is an event for any changefeed cancellations
message ChangefeedCancelled {
CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];
}

// ChangefeedEmittedBytes is an event representing the bytes emitted by a changefeed over an interval.
message ChangefeedEmittedBytes {
CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];
Expand Down
Loading