From 51e3efc83d51d15d608bdea99bdad375fdb95f7c Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Tue, 10 Dec 2024 15:50:41 +0100 Subject: [PATCH] add dead letter queue --- cmd/aardappel/config.yaml | 6 ++ cmd/aardappel/main.go | 29 +++++---- internal/config/config.go | 97 ++++++++++++++++++++++++------- internal/dst_table/dst_table.go | 1 - internal/hb_tracker/hb_tracker.go | 1 - internal/processor/processor.go | 32 ++++++---- internal/reader/reader.go | 83 +++++++++++++++++++------- internal/types/types.go | 5 ++ internal/util/ydb/client.go | 36 +++++++++--- 9 files changed, 216 insertions(+), 74 deletions(-) diff --git a/cmd/aardappel/config.yaml b/cmd/aardappel/config.yaml index 5251a59..fd367d0 100644 --- a/cmd/aardappel/config.yaml +++ b/cmd/aardappel/config.yaml @@ -32,9 +32,11 @@ streams: - src_topic: "producer1/cf1" # Path of the cdc source topic for the 1st replica table consumer: "c1" # Topic consumer name in a cdc source topic for the 1st replica table dst_table: "/Root/test/table1_rep" # Path of the 1st replica table (should exist and have compatible schema) + problem_strategy: "stop" # What to do on some problem (stop, continue) - src_topic: "producer2/cf1" # Path of the cdc source topic for the 2nd replica table consumer: "c1" # Topic consumer name in a cdc source topic for the 2nd replica table dst_table: "/Root/test/table2_rep" # Path of the 2nd replica table (should exist and have compatible schema) + problem_strategy: "continue" # What to do on some problem (stop, continue) ## Max timeout to analise missed heartbeat in seconds. If we don't get quorum during this interval ## the warning message will be written to the log. @@ -48,3 +50,7 @@ log_level: "debug" cmd_topic: path: aardappel_command consumer: c1 + +## Dead letter queue. In this topic you may get events that haven't processed because they could not be applied consistently +dead_letter_queue: + path: dlq diff --git a/cmd/aardappel/main.go b/cmd/aardappel/main.go index 65958c5..27b73a5 100644 --- a/cmd/aardappel/main.go +++ b/cmd/aardappel/main.go @@ -76,7 +76,7 @@ func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*d mon.RequestSize(stats.RequestSize) mon.QuorumWaitingDuration(float64(stats.QuorumWaitingDurationMs) / 1000) } - xlog.Info(ctx, "Replication step ok", zap.Int("modifications", stats.ModificationsCount), + xlog.Debug(ctx, "Replication step ok", zap.Int("modifications", stats.ModificationsCount), zap.Float32("mps", perSecond), zap.Uint64("last quorum HB step", stats.LastHeartBeat.Step), zap.Uint64("last quorum HB tx_id", stats.LastHeartBeat.TxId), @@ -88,7 +88,6 @@ func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*d func createReplicaStateTable(ctx context.Context, client *client.TableClient, stateTable string) error { query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v (id Utf8, step_id Uint64, tx_id Uint64, state Utf8, stage Utf8, "+ "last_msg Utf8, lock_owner Utf8, lock_deadline Timestamp, PRIMARY KEY(id))", stateTable) - // раньше зависало, теперь нет, упадет ч тем, что не сможет получить таймаут return client.Do(ctx, func(ctx context.Context, s table.Session) error { return s.ExecuteSchemeQuery(ctx, query, nil) }) @@ -102,7 +101,6 @@ func initReplicaStateTable(ctx context.Context, client *client.TableClient, stat ) initQuery := fmt.Sprintf("INSERT INTO %v (id, step_id, tx_id, state, stage) VALUES ($instanceId,0,0, $state, $stage)", stateTable) - // раньше зависало, теперь упадет, что не удалось заинить таблицу из-за таймаута err := client.DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { _, err := tx.Execute(ctx, initQuery, param) @@ -121,7 +119,6 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl var streamDbgInfos []string topicPartsCountMap := make(map[int]int) for i := 0; i < len(config.Streams); i++ { - // раньше зависало, теперь нет, упадет с ошибкой таймаута при попытке описать топик desc, err := srcDb.Describe(ctx, config.Streams[i].SrcTopic) if err != nil { xlog.Fatal(ctx, "Unable to describe topic", @@ -146,6 +143,17 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl ctx, config.InstanceId, config.CmdQueue.Path, config.CmdQueue.Consumer, dstDb.TopicClient) } + var dlQueue *processor.DLQueue + if config.DLQueue != nil { + xlog.Info(ctx, "Dead letter queue present in config", + zap.String("path", config.DLQueue.Path)) + topicWriter, err := dstDb.TopicClient.StartWriter(config.DLQueue.Path) + if err != nil { + xlog.Fatal(ctx, "Unable to start writer for dlq", zap.Error(err)) + } + dlQueue = processor.NewDlQueue(ctx, topicWriter) + } + prc, err := processor.NewProcessor(ctx, totalPartitions, config.StateTable, dstDb.TableClient, config.InstanceId, config.KeyFilter) if err != nil { xlog.Fatal(ctx, "Unable to create processor", zap.Error(err)) @@ -160,7 +168,6 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl for i := 0; i < len(config.Streams); i++ { startCb, updateCb := topicReader.MakeTopicReaderGuard() - // не зависнет, ошибки не будет, упадет дальше reader, err := srcDb.StartReader(config.Streams[i].Consumer, config.Streams[i].SrcTopic, topicoptions.WithReaderGetPartitionStartOffset(startCb)) @@ -175,9 +182,14 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl if err != nil { xlog.Fatal(ctx, "Unable to init dst table") } + + streamInfo := topicReader.StreamInfo{ + Id: uint32(i), + TopicPath: config.Streams[i].SrcTopic, + PartCount: topicPartsCountMap[i], + ProblemStrategy: config.Streams[i].ProblemStrategy} xlog.Debug(ctx, "Start reading") - go topicReader.ReadTopic(ctx, config.Streams[i].SrcTopic, - uint32(i), reader, prc, topicPartsCountMap[i], conflictHandler, updateCb) + go topicReader.ReadTopic(ctx, streamInfo, reader, prc, conflictHandler, updateCb, dlQueue) } lockExecutor := func(fn func(context.Context, table.Session, table.Transaction) error) error { @@ -262,7 +274,6 @@ func main() { dstOpts = append(dstOpts, ydb.WithBalancer(balancers.SingleConn())) } - // не зависнет, ошибку не вернет, упадет дальше dstDb, err := client.NewYdbClient(ctx, config.DstConnectionString, dstOpts...) if err != nil { xlog.Fatal(ctx, "Unable to connect to dst cluster", zap.Error(err)) @@ -290,7 +301,6 @@ func main() { config.InstanceId, owner, time.Duration(config.MaxExpHbInterval*2)*time.Second) - // завершится с логом https://paste.nebius.dev/paste/1dc40974-6f27-417f-94da-5bf95d5d4a39.html lockChannel := locker.LockerContext(ctx) var cont bool cont = true @@ -304,7 +314,6 @@ func main() { continue } lockErrCnt = 0 - // не зависнет, ошибку не вернет, упадет дальше srcDb, err := client.NewYdbClient(lockCtx, config.SrcConnectionString, srcOpts...) if err != nil { xlog.Fatal(ctx, "Unable to connect to src cluster", zap.Error(err)) diff --git a/internal/config/config.go b/internal/config/config.go index 4e36e76..38b3060 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,18 +1,22 @@ package config import ( + "aardappel/internal/types" "aardappel/internal/util/xlog" "context" "errors" + "fmt" "go.uber.org/zap" "gopkg.in/yaml.v3" "os" + "strings" ) type Stream struct { - SrcTopic string `yaml:"src_topic"` - DstTable string `yaml:"dst_table"` - Consumer string `yaml:"consumer"` + SrcTopic string `yaml:"src_topic"` + DstTable string `yaml:"dst_table"` + Consumer string `yaml:"consumer"` + ProblemStrategy string `yaml:"problem_strategy"` } type MonServer struct { @@ -24,6 +28,10 @@ type CmdQueue struct { Consumer string `yaml:"consumer"` } +type DLQueue struct { + Path string `yaml:"path"` +} + type KeyFilter struct { Path string `yaml:"table_path"` } @@ -45,6 +53,22 @@ type Config struct { MonServer *MonServer `yaml:"mon_server"` CmdQueue *CmdQueue `yaml:"cmd_queue"` KeyFilter *KeyFilter `yaml:"key_filter"` + DLQueue *DLQueue `yaml:"dead_letter_queue"` +} + +func verifyStreamProblemStrategy(configStrategy *string) error { + if *configStrategy == "" { + *configStrategy = types.ProblemStrategyStop + return nil + } + streamProblemStrategies := []string{types.ProblemStrategyStop, types.ProblemStrategyContinue} + for _, strategy := range streamProblemStrategies { + if strings.ToLower(strategy) == strings.ToLower(*configStrategy) { + *configStrategy = strategy + return nil + } + } + return errors.New("unknown stream problem strategy '" + *configStrategy + "'") } func (config Config) ToString() (string, error) { @@ -55,24 +79,57 @@ func (config Config) ToString() (string, error) { return string(data), nil } -func InitConfig(ctx context.Context, confPath string) (Config, error) { - if len(confPath) != 0 { - confTxt, err := os.ReadFile(confPath) +func (config Config) verify() error { + for _, stream := range config.Streams { + err := verifyStreamProblemStrategy(&stream.ProblemStrategy) if err != nil { - xlog.Error(ctx, "Unable to read configuration file", - zap.String("config_path", confPath), - zap.Error(err)) - return Config{}, err + return err } - var config Config - err = yaml.Unmarshal(confTxt, &config) - if err != nil { - xlog.Error(ctx, "Unable to parse configuration file", - zap.String("config_path", confPath), - zap.Error(err)) - return Config{}, err - } - return config, nil } - return Config{}, errors.New("configuration file path is empty") + return nil +} + +func InitConfig(ctx context.Context, confPath string) (Config, error) { + if len(confPath) == 0 { + return Config{}, errors.New("configuration file path is empty") + } + confTxt, err := os.ReadFile(confPath) + if err != nil { + xlog.Error(ctx, "Unable to read configuration file", + zap.String("config_path", confPath), + zap.Error(err)) + return Config{}, fmt.Errorf("unable to read configuration file: %w", err) + } + var config Config + err = yaml.Unmarshal(confTxt, &config) + if err != nil { + xlog.Error(ctx, "Unable to parse configuration file", + zap.String("config_path", confPath), + zap.Error(err)) + return Config{}, fmt.Errorf("unable to parse configuration file: %w", err) + } + err = config.verify() + if err != nil { + xlog.Error(ctx, "Unable to verify configuration file", zap.Error(err)) + return Config{}, fmt.Errorf("unable to verify configuration file: %w", err) + } + a := DLQueue{Path: "dlq_test"} + config.DLQueue = &a + config.StateTable = "vpolka_state_table" + config.Streams = make([]Stream, 2) + config.Streams[0] = Stream{ + SrcTopic: "fake_cdc1", + DstTable: "/testing-global/aardappel-test/vpolka_table1", + Consumer: "c1", + ProblemStrategy: types.ProblemStrategyContinue, + } + config.Streams[1] = Stream{ + SrcTopic: "fake_cdc2", + DstTable: "/testing-global/aardappel-test/vpolka_table2", + Consumer: "c1", + ProblemStrategy: types.ProblemStrategyContinue, + } + config.LogLevel = "info" + config.MaxExpHbInterval = 604800 + return config, nil } diff --git a/internal/dst_table/dst_table.go b/internal/dst_table/dst_table.go index bb5f89f..770baa3 100644 --- a/internal/dst_table/dst_table.go +++ b/internal/dst_table/dst_table.go @@ -46,7 +46,6 @@ func NewDstTable(client *client.TableClient, tablePath string) *DstTable { func (dstTable *DstTable) DescribeTable(ctx context.Context) (*options.Description, error) { var desc options.Description - // раньше зависало, теперь упадет с тем, что невозможно заинить таблицу err := dstTable.client.Do(ctx, func(ctx context.Context, s table.Session) error { var err error desc, err = s.DescribeTable(ctx, dstTable.tablePath) diff --git a/internal/hb_tracker/hb_tracker.go b/internal/hb_tracker/hb_tracker.go index 9974857..75947e1 100644 --- a/internal/hb_tracker/hb_tracker.go +++ b/internal/hb_tracker/hb_tracker.go @@ -85,7 +85,6 @@ func (ht *HeartBeatTracker) AddHb(data types.HbData) error { if ok { if types.NewPosition(hb).LessThan(*types.NewPosition(data)) { // Got new heartbeat for stream - we can commit previous one - // При потери сети ошибка игнорируется, ардапл завершится, только если в другом месте будет ошибка err := hb.CommitTopic() if err != nil { return fmt.Errorf("unable to commit topic during update HB %w, stepId: %d, txId: %d", err, diff --git a/internal/processor/processor.go b/internal/processor/processor.go index e6f8371..f007f3f 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -115,6 +115,27 @@ func NewCmdQueueConflictHandler(ctx context.Context, instanceId string, path str return &handler } +type DLQueue struct { + Writer *client.TopicWriter + Lock sync.Mutex +} + +func NewDlQueue(ctx context.Context, writer *client.TopicWriter) *DLQueue { + var queue DLQueue + queue.Writer = writer + return &queue +} + +func (queue *DLQueue) Write(ctx context.Context, msg string) error { + queue.Lock.Lock() + err := queue.Writer.Write(ctx, msg) + if err != nil { + xlog.Error(ctx, "Unable to write into dead letter queue", zap.Error(err), zap.String("msg", msg)) + } + queue.Lock.Unlock() + return nil +} + func readWithTimeout(ctx context.Context, reader *client.TopicReader) (*topicreader.Message, error, bool) { timingCtx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() @@ -129,7 +150,6 @@ func (this *CmdQueueConflictHandler) Handle(ctx context.Context, streamTopicPath this.Lock.Lock() defer this.Lock.Unlock() - // Если тут упадет, то только если в другом месте упадет... reader, err := this.Topic.StartReader(this.Consumer, this.Path) if err != nil { xlog.Fatal(ctx, "Unable to read from specified command topic", @@ -248,7 +268,6 @@ func selectReplicationState(ctx context.Context, client *client.TableClient, sta var lastMsg *string var stage *string - // Раньше зависало, сейчас упадет сразу с ошибкой, что не удалось создать процессов из-за таймаута err := client.DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { res, err := tx.Execute(ctx, stateQuery, param) @@ -348,7 +367,6 @@ func (processor *Processor) EnqueueHb(ctx context.Context, hb types.HbData) { zap.Int64("partition_id:", hb.StreamId.PartitionId), zap.Bool("willSkip", types.NewPosition(hb).LessThan(lastPosition))) if types.NewPosition(hb).LessThan(lastPosition) { - // Ошибку будет не заметно, только если в другом месте упадет err := hb.CommitTopic() if err != nil { xlog.Fatal(ctx, "unable to commit topic", zap.Error(err)) @@ -385,7 +403,6 @@ func (processor *Processor) SaveReplicationState(ctx context.Context, status str stopQuery := fmt.Sprintf("UPSERT INTO %v (id, state, last_msg) VALUES ($instanceId,$state,$lastError)", processor.stateTablePath) - // Раньше зависало, сейчас нет, после этого xlog.fatal, так что ардапель должен нормально завершиться return processor.dstServerClient.DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { _, err := tx.Execute(ctx, stopQuery, param) @@ -401,7 +418,6 @@ func (processor *Processor) EnqueueTx(ctx context.Context, tx types.TxData) { zap.Uint32("reader_id", tx.TableId), zap.Bool("willSkip", (types.Position{tx.Step, tx.TxId}.LessThan(lastPosition)))) if (types.Position{tx.Step, tx.TxId}.LessThan(lastPosition)) { - // Ошибка незаметна, ток если где то в другом месте завершится... err := tx.CommitTopic() if err != nil { xlog.Fatal(ctx, "unable to commit topic", zap.Error(err)) @@ -653,7 +669,6 @@ func (processor *Processor) DoInitialScan(ctx context.Context, dstTables []*dst_ } for i := 0; i < len(txs); i++ { - // Ошибка незаметна, так что ток если в другом месте обнаружится err := txs[i].CommitTopic() if err != nil { return nil, fmt.Errorf("%w; Unable to commit topic fot dataTx", err) @@ -664,7 +679,6 @@ func (processor *Processor) DoInitialScan(ctx context.Context, dstTables []*dst_ xlog.Debug(ctx, "commit hb in topic", zap.Uint64("step", processor.initialScanPos.Step), zap.Uint64("tx_id", processor.initialScanPos.TxId)) - // Ошибка не заметна, ток если в другом месте стопнется err := processor.initialScanPos.CommitTopic() if err != nil { return nil, fmt.Errorf("%w; Unable to commit topic fot hb", err) @@ -711,7 +725,6 @@ func (processor *Processor) PushAsSingleTx(ctx context.Context, data dst_table.P ) param := append(data.Parameters, *stateParam...) - // Завершится из-за таймаута ctxWithTimeout, cancel := context.WithTimeout(ctx, client.DEFAULT_TIMEOUT) defer cancel() _, err := tx.Execute(ctxWithTimeout, data.Query+processor.stateStoreQuery, ¶m, options.WithCommit()) @@ -719,7 +732,6 @@ func (processor *Processor) PushAsSingleTx(ctx context.Context, data dst_table.P } func (processor *Processor) PushTxs(ctx context.Context, data dst_table.PushQuery, tx table.Transaction) error { - // Завершится из-за таймаута ctxWithTimeout, cancel := context.WithTimeout(ctx, client.DEFAULT_TIMEOUT) defer cancel() _, err := tx.Execute(ctxWithTimeout, data.Query, &data.Parameters, options.WithCommit()) @@ -757,7 +769,6 @@ func (processor *Processor) DoReplication(ctx context.Context, dstTables []*dst_ processor.lastPosition.Store(*types.NewPosition(batch.Hb)) for i := 0; i < len(batch.TxData); i++ { - // Ошбика не заметна, ток если в другом месте упадет err := batch.TxData[i].CommitTopic() if err != nil { return nil, fmt.Errorf("%w; Unable to commit topic fot dataTx", err) @@ -767,7 +778,6 @@ func (processor *Processor) DoReplication(ctx context.Context, dstTables []*dst_ xlog.Debug(ctx, "commit hb in topic", zap.Uint64("step", batch.Hb.Step), zap.Uint64("tx_id", batch.Hb.TxId)) - // Ошибка не заметна, ток если в другом месте упадет err = batch.Hb.CommitTopic() if err != nil { return nil, fmt.Errorf("%w; Unable to commit topic fot hb", err) diff --git a/internal/reader/reader.go b/internal/reader/reader.go index 649f9b6..724fdd4 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -15,6 +15,13 @@ import ( "sync" ) +type StreamInfo struct { + Id uint32 + TopicPath string + PartCount int + ProblemStrategy string +} + type TopicData struct { Update json.RawMessage `json:"update"` Erase json.RawMessage `json:"erase"` @@ -73,21 +80,20 @@ func MakeTopicReaderGuard() (topicoptions.GetPartitionStartOffsetFunc, UpdateOff func serializeKey(key []json.RawMessage) string { data, err := json.Marshal(key) if err != nil { - return "underfined" + return "undefined" } return string(data) } -func WriteAllProblemTxsUntilNextHb(ctx context.Context, topicPath string, readerId uint32, reader *client.TopicReader, channel processor.Channel, lastHb map[int64]types.Position, hb types.Position, partsCount int) { +func WriteAllProblemTxsUntilNextHb(ctx context.Context, streamInfo StreamInfo, reader *client.TopicReader, channel processor.Channel, lastHb map[int64]types.Position, hb types.Position, dlQueue *processor.DLQueue) { var partsIsDone map[int64]bool for part, partHb := range lastHb { if hb.LessThan(partHb) { partsIsDone[part] = true } } - for ctx.Err() == nil && len(partsIsDone) < partsCount { - // done? после этой функции безусовный xlog.fatal - msg, err := reader.ReadMessageWithTimeout(ctx) + for ctx.Err() == nil && len(partsIsDone) < streamInfo.PartCount { + msg, err := reader.ReadMessage(ctx) if err != nil { xlog.Error(ctx, "Unable to read message", zap.Error(err)) return @@ -105,20 +111,29 @@ func WriteAllProblemTxsUntilNextHb(ctx context.Context, topicPath string, reader return } if topicData.Update != nil || topicData.Erase != nil { - data, err := rd.ParseTxData(ctx, jsonData, readerId) + data, err := rd.ParseTxData(ctx, jsonData, streamInfo.Id) if err != nil { xlog.Error(ctx, "ParseTxData: Error parsing tx data", zap.Error(err)) return } if partHb, ok := lastHb[msg.PartitionID()]; ok && (types.Position{data.Step, data.TxId}.LessThan(partHb)) { + txInfo := fmt.Sprintf("{\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}", streamInfo.TopicPath, serializeKey(data.KeyValues), data.Step, data.TxId) errString := fmt.Sprintf("Unexpected timestamp in stream, last hb timestamp:[%v,%v],"+ - "got tx {\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}", - hb.Step, hb.TxId, topicPath, serializeKey(data.KeyValues), data.Step, data.TxId) + "got tx %v", hb.Step, hb.TxId, txInfo) xlog.Error(ctx, errString) + + if dlQueue != nil { + dlQueue.Lock.Lock() + err := dlQueue.Writer.Write(ctx, errString) + if err != nil { + xlog.Error(ctx, "Unable to write into dead letter queue", zap.Error(err), zap.String("tx", txInfo)) + } + dlQueue.Lock.Unlock() + } } } else if topicData.Resolved != nil { - data, err := rd.ParseHBData(ctx, jsonData, types.StreamId{readerId, msg.PartitionID()}) + data, err := rd.ParseHBData(ctx, jsonData, types.StreamId{streamInfo.Id, msg.PartitionID()}) if err != nil { xlog.Error(ctx, "ParseTxData: Error parsing hb data", zap.Error(err)) return @@ -131,8 +146,8 @@ func WriteAllProblemTxsUntilNextHb(ctx context.Context, topicPath string, reader } } -func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *client.TopicReader, - channel processor.Channel, partsCount int, handler processor.ConflictHandler, updateOffsetCb UpdateOffsetFunc) { +func ReadTopic(ctx context.Context, streamInfo StreamInfo, reader *client.TopicReader, channel processor.Channel, + handler processor.ConflictHandler, updateOffsetCb UpdateOffsetFunc, dlQueue *processor.DLQueue) { var mu sync.Mutex lastHb := make(map[int64]types.Position) // returns true - pass item, false - skip item @@ -143,16 +158,16 @@ func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *c if handler == nil { xlog.Error(ctx, "Command topic is not configured, unable to receive external instructions on actions, stopping processing.") } else { - rv := handler.Handle(ctx, topicPath, key, data.Step, data.TxId) + rv := handler.Handle(ctx, streamInfo.TopicPath, key, data.Step, data.TxId) if rv >= 0 { if rv == 0 { - xlog.Info(ctx, "skip message", zap.String("topic", topicPath), + xlog.Info(ctx, "skip message", zap.String("topic", streamInfo.TopicPath), zap.String("key", key), zap.Uint64("step_id", data.Step), zap.Uint64("tx_id", data.TxId)) return false } else { - xlog.Info(ctx, "apply out of order message", zap.String("topic", topicPath), + xlog.Info(ctx, "apply out of order message", zap.String("topic", streamInfo.TopicPath), zap.String("key", key), zap.Uint64("step_id", data.Step), zap.Uint64("tx_id", data.TxId)) @@ -161,29 +176,53 @@ func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *c } } + txInfo := fmt.Sprintf("{\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}", streamInfo.TopicPath, key, data.Step, data.TxId) errString := fmt.Sprintf("Unexpected timestamp in stream, last hb ts:[%v,%v], "+ - "got tx {\"topic\":\"%v\",\"key\":%v,\"ts\":[%v,%v]}", - lastHb[part].Step, lastHb[part].TxId, topicPath, key, data.Step, data.TxId) + "got tx %v", lastHb[part].Step, lastHb[part].TxId, txInfo) + xlog.Error(ctx, errString) + if streamInfo.ProblemStrategy == types.ProblemStrategyContinue { + if dlQueue != nil { + xlog.Error(ctx, "Srart writing") + err := dlQueue.Write(ctx, errString) + if err != nil { + xlog.Fatal(ctx, "Unable to write into dead letter queue", zap.Error(err), zap.String("tx", txInfo)) + } + } else { + xlog.Error(ctx, "dlQueue is nil") + } + xlog.Info(ctx, "skip message", zap.String("topic", streamInfo.TopicPath), + zap.String("key", key), + zap.Uint64("step_id", data.Step), + zap.Uint64("tx_id", data.TxId)) + return false + } stopErr := channel.SaveReplicationState(ctx, processor.REPLICATION_FATAL_ERROR, errString) if stopErr != nil { xlog.Fatal(ctx, errString, zap.NamedError("this issue was not stored in the state table due to double error", stopErr)) } - WriteAllProblemTxsUntilNextHb(ctx, topicPath, readerId, reader, channel, lastHb, hb, partsCount) + if dlQueue != nil { + xlog.Error(ctx, "Srart writing") + err := dlQueue.Write(ctx, errString) + if err != nil { + xlog.Fatal(ctx, "Unable to write into dead letter queue", zap.Error(err), zap.String("tx", txInfo)) + } + } else { + xlog.Error(ctx, "dlQueue is nil") + } + WriteAllProblemTxsUntilNextHb(ctx, streamInfo, reader, channel, lastHb, hb, dlQueue) xlog.Fatal(ctx, errString) } return true } defer func() { - // не зависал, если случается разрыв сети на этом моменте, то как будто ничего страшного, так как оно закрывается если ардапель стопается? err := reader.Close(ctx) xlog.Error(ctx, "stop reader call returns", zap.Error(err)) }() for ctx.Err() == nil { - // Завершится с сообщением об таймауте при чтении - msg, err := reader.ReadMessageWithTimeout(ctx) + msg, err := reader.ReadMessage(ctx) if err != nil { xlog.Fatal(ctx, "Unable to read message", zap.Error(err)) return @@ -204,7 +243,7 @@ func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *c return } if topicData.Update != nil || topicData.Erase != nil { - data, err := rd.ParseTxData(ctx, jsonData, readerId) + data, err := rd.ParseTxData(ctx, jsonData, streamInfo.Id) if err != nil { xlog.Error(ctx, "ParseTxData: Error parsing tx data", zap.Error(err)) return @@ -228,7 +267,7 @@ func ReadTopic(ctx context.Context, topicPath string, readerId uint32, reader *c // Add tx to txQueue } else if topicData.Resolved != nil { - data, err := rd.ParseHBData(ctx, jsonData, types.StreamId{readerId, msg.PartitionID()}) + data, err := rd.ParseHBData(ctx, jsonData, types.StreamId{streamInfo.Id, msg.PartitionID()}) if err != nil { xlog.Error(ctx, "ParseTxData: Error parsing hb data", zap.Error(err)) return diff --git a/internal/types/types.go b/internal/types/types.go index 249101a..864a989 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -7,9 +7,14 @@ import "encoding/json" type TxOperationType uint8 const ( + // topic message type TxOperationUpdate TxOperationType = 0 TxOperationErase TxOperationType = 1 TxOperationUnknown TxOperationType = 2 + + // topic problem strategy + ProblemStrategyStop = "STOP" + ProblemStrategyContinue = "CONTINUE" ) func (o TxOperationType) String() string { diff --git a/internal/util/ydb/client.go b/internal/util/ydb/client.go index 97b19e0..7bb6652 100644 --- a/internal/util/ydb/client.go +++ b/internal/util/ydb/client.go @@ -11,7 +11,9 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" "go.uber.org/zap" + "strings" "time" ) @@ -71,20 +73,26 @@ func (r *TopicReader) ReadMessage(ctx context.Context) (*topicreader.Message, er return r.reader.ReadMessage(ctx) } -func (r *TopicReader) ReadMessageWithTimeout(ctx context.Context) (*topicreader.Message, error) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*DEFAULT_TIMEOUT) - defer cancel() - msg, err := r.reader.ReadMessage(ctxWithTimeout) +func (r *TopicReader) Commit(ctx context.Context, msg *topicreader.Message) error { + err := r.reader.Commit(ctx, msg) if err != nil { - return nil, HandleRequestError(ctxWithTimeout, err) + return HandleRequestError(ctx, err) } - return msg, nil + return nil } -func (r *TopicReader) Commit(ctx context.Context, msg *topicreader.Message) error { - err := r.reader.Commit(ctx, msg) +type TopicWriter struct { + writer *topicwriter.Writer +} + +func (w *TopicWriter) Write(ctx context.Context, msg string) error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, DEFAULT_TIMEOUT) + defer cancel() + err := w.writer.Write(ctxWithTimeout, + topicwriter.Message{Data: strings.NewReader(msg)}, + ) if err != nil { - return HandleRequestError(ctx, err) + return HandleRequestError(ctxWithTimeout, err) } return nil } @@ -103,6 +111,16 @@ func (c *TopicClient) StartReader(consumer string, path string, opts ...topicopt return &topicReader, nil } +func (c *TopicClient) StartWriter(path string) (*TopicWriter, error) { + writer, err := c.client.StartWriter(path) + if err != nil { + return nil, err + } + var topicWriter TopicWriter + topicWriter.writer = writer + return &topicWriter, nil +} + func (c *TopicClient) Describe(ctx context.Context, path string) (topictypes.TopicDescription, error) { ctxWithTimeout, cancel := context.WithTimeout(ctx, DEFAULT_TIMEOUT) defer cancel()