From 32c0f063a0a92aa7b850a747728dc64a776e9e4f Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Thu, 14 Nov 2024 13:48:36 +0100 Subject: [PATCH] add topic path --- cmd/aardappel/main.go | 2 +- internal/reader/reader.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/aardappel/main.go b/cmd/aardappel/main.go index f732659..270f783 100644 --- a/cmd/aardappel/main.go +++ b/cmd/aardappel/main.go @@ -148,7 +148,7 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *ydb.Driver, ds xlog.Fatal(ctx, "Unable to init dst table") } xlog.Debug(ctx, "Start reading") - go topicReader.ReadTopic(ctx, uint32(i), reader, prc) + go topicReader.ReadTopic(ctx, config.Streams[i].SrcTopic, uint32(i), reader, prc) } lockExecutor := func(fn func(context.Context, table.Session, table.Transaction) error) error { diff --git a/internal/reader/reader.go b/internal/reader/reader.go index e363912..666db55 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -20,7 +20,7 @@ type TopicData struct { Resolved json.RawMessage `json:"resolved"` } -func ReadTopic(ctx context.Context, readerId uint32, reader *topicreader.Reader, channel processor.Channel) { +func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *topicreader.Reader, channel processor.Channel) { var mu sync.Mutex lastHb := make(map[int64]uint64) verifyStream := func(part int64, data types.TxData) { @@ -34,8 +34,8 @@ func ReadTopic(ctx context.Context, readerId uint32, reader *topicreader.Reader, hb := lastHb[part] if hb != 0 && data.Step < hb { errString := fmt.Sprintf("Unexpected step_id in stream, last hb step_id: %v,"+ - "got tx {\"key\":%v,\"ts\":[%v,%v]}", - lastHb[part], serializeKey(data.KeyValues), data.Step, data.TxId) + "got tx {\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}", + lastHb[part], topicPath, serializeKey(data.KeyValues), data.Step, data.TxId) stopErr := channel.SaveReplicationState(ctx, processor.REPLICATION_FATAL_ERROR, errString) if stopErr != nil { xlog.Fatal(ctx, errString,