Skip to content

Commit

Permalink
add topic path
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Nov 14, 2024
1 parent 3c1861b commit 32c0f06
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/aardappel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *ydb.Driver, ds
xlog.Fatal(ctx, "Unable to init dst table")
}
xlog.Debug(ctx, "Start reading")
go topicReader.ReadTopic(ctx, uint32(i), reader, prc)
go topicReader.ReadTopic(ctx, config.Streams[i].SrcTopic, uint32(i), reader, prc)
}

lockExecutor := func(fn func(context.Context, table.Session, table.Transaction) error) error {
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type TopicData struct {
Resolved json.RawMessage `json:"resolved"`
}

func ReadTopic(ctx context.Context, readerId uint32, reader *topicreader.Reader, channel processor.Channel) {
func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *topicreader.Reader, channel processor.Channel) {
var mu sync.Mutex
lastHb := make(map[int64]uint64)
verifyStream := func(part int64, data types.TxData) {
Expand All @@ -34,8 +34,8 @@ func ReadTopic(ctx context.Context, readerId uint32, reader *topicreader.Reader,
hb := lastHb[part]
if hb != 0 && data.Step < hb {
errString := fmt.Sprintf("Unexpected step_id in stream, last hb step_id: %v,"+
"got tx {\"key\":%v,\"ts\":[%v,%v]}",
lastHb[part], serializeKey(data.KeyValues), data.Step, data.TxId)
"got tx {\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}",
lastHb[part], topicPath, serializeKey(data.KeyValues), data.Step, data.TxId)
stopErr := channel.SaveReplicationState(ctx, processor.REPLICATION_FATAL_ERROR, errString)
if stopErr != nil {
xlog.Fatal(ctx, errString,
Expand Down

0 comments on commit 32c0f06

Please sign in to comment.