Skip to content

Commit

Permalink
add dead letter queue
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Dec 16, 2024
1 parent 750780d commit 51e3efc
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 74 deletions.
6 changes: 6 additions & 0 deletions cmd/aardappel/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ streams:
- src_topic: "producer1/cf1" # Path of the cdc source topic for the 1st replica table
consumer: "c1" # Topic consumer name in a cdc source topic for the 1st replica table
dst_table: "/Root/test/table1_rep" # Path of the 1st replica table (should exist and have compatible schema)
problem_strategy: "stop" # What to do on some problem (stop, continue)
- src_topic: "producer2/cf1" # Path of the cdc source topic for the 2nd replica table
consumer: "c1" # Topic consumer name in a cdc source topic for the 2nd replica table
dst_table: "/Root/test/table2_rep" # Path of the 2nd replica table (should exist and have compatible schema)
problem_strategy: "continue" # What to do on some problem (stop, continue)

## Max timeout to analise missed heartbeat in seconds. If we don't get quorum during this interval
## the warning message will be written to the log.
Expand All @@ -48,3 +50,7 @@ log_level: "debug"
cmd_topic:
path: aardappel_command
consumer: c1

## Dead letter queue. In this topic you may get events that haven't processed because they could not be applied consistently
dead_letter_queue:
path: dlq
29 changes: 19 additions & 10 deletions cmd/aardappel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*d
mon.RequestSize(stats.RequestSize)
mon.QuorumWaitingDuration(float64(stats.QuorumWaitingDurationMs) / 1000)
}
xlog.Info(ctx, "Replication step ok", zap.Int("modifications", stats.ModificationsCount),
xlog.Debug(ctx, "Replication step ok", zap.Int("modifications", stats.ModificationsCount),
zap.Float32("mps", perSecond),
zap.Uint64("last quorum HB step", stats.LastHeartBeat.Step),
zap.Uint64("last quorum HB tx_id", stats.LastHeartBeat.TxId),
Expand All @@ -88,7 +88,6 @@ func DoReplication(ctx context.Context, prc *processor.Processor, dstTables []*d
func createReplicaStateTable(ctx context.Context, client *client.TableClient, stateTable string) error {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v (id Utf8, step_id Uint64, tx_id Uint64, state Utf8, stage Utf8, "+
"last_msg Utf8, lock_owner Utf8, lock_deadline Timestamp, PRIMARY KEY(id))", stateTable)
// раньше зависало, теперь нет, упадет ч тем, что не сможет получить таймаут
return client.Do(ctx, func(ctx context.Context, s table.Session) error {
return s.ExecuteSchemeQuery(ctx, query, nil)
})
Expand All @@ -102,7 +101,6 @@ func initReplicaStateTable(ctx context.Context, client *client.TableClient, stat
)
initQuery := fmt.Sprintf("INSERT INTO %v (id, step_id, tx_id, state, stage) VALUES ($instanceId,0,0, $state, $stage)",
stateTable)
// раньше зависало, теперь упадет, что не удалось заинить таблицу из-за таймаута
err := client.DoTx(ctx,
func(ctx context.Context, tx table.TransactionActor) error {
_, err := tx.Execute(ctx, initQuery, param)
Expand All @@ -121,7 +119,6 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl
var streamDbgInfos []string
topicPartsCountMap := make(map[int]int)
for i := 0; i < len(config.Streams); i++ {
// раньше зависало, теперь нет, упадет с ошибкой таймаута при попытке описать топик
desc, err := srcDb.Describe(ctx, config.Streams[i].SrcTopic)
if err != nil {
xlog.Fatal(ctx, "Unable to describe topic",
Expand All @@ -146,6 +143,17 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl
ctx, config.InstanceId, config.CmdQueue.Path, config.CmdQueue.Consumer, dstDb.TopicClient)
}

var dlQueue *processor.DLQueue
if config.DLQueue != nil {
xlog.Info(ctx, "Dead letter queue present in config",
zap.String("path", config.DLQueue.Path))
topicWriter, err := dstDb.TopicClient.StartWriter(config.DLQueue.Path)
if err != nil {
xlog.Fatal(ctx, "Unable to start writer for dlq", zap.Error(err))
}
dlQueue = processor.NewDlQueue(ctx, topicWriter)
}

prc, err := processor.NewProcessor(ctx, totalPartitions, config.StateTable, dstDb.TableClient, config.InstanceId, config.KeyFilter)
if err != nil {
xlog.Fatal(ctx, "Unable to create processor", zap.Error(err))
Expand All @@ -160,7 +168,6 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl

for i := 0; i < len(config.Streams); i++ {
startCb, updateCb := topicReader.MakeTopicReaderGuard()
// не зависнет, ошибки не будет, упадет дальше
reader, err := srcDb.StartReader(config.Streams[i].Consumer, config.Streams[i].SrcTopic,
topicoptions.WithReaderGetPartitionStartOffset(startCb))

Expand All @@ -175,9 +182,14 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *client.TopicCl
if err != nil {
xlog.Fatal(ctx, "Unable to init dst table")
}

streamInfo := topicReader.StreamInfo{
Id: uint32(i),
TopicPath: config.Streams[i].SrcTopic,
PartCount: topicPartsCountMap[i],
ProblemStrategy: config.Streams[i].ProblemStrategy}
xlog.Debug(ctx, "Start reading")
go topicReader.ReadTopic(ctx, config.Streams[i].SrcTopic,
uint32(i), reader, prc, topicPartsCountMap[i], conflictHandler, updateCb)
go topicReader.ReadTopic(ctx, streamInfo, reader, prc, conflictHandler, updateCb, dlQueue)
}

lockExecutor := func(fn func(context.Context, table.Session, table.Transaction) error) error {
Expand Down Expand Up @@ -262,7 +274,6 @@ func main() {
dstOpts = append(dstOpts, ydb.WithBalancer(balancers.SingleConn()))
}

// не зависнет, ошибку не вернет, упадет дальше
dstDb, err := client.NewYdbClient(ctx, config.DstConnectionString, dstOpts...)
if err != nil {
xlog.Fatal(ctx, "Unable to connect to dst cluster", zap.Error(err))
Expand Down Expand Up @@ -290,7 +301,6 @@ func main() {
config.InstanceId, owner,
time.Duration(config.MaxExpHbInterval*2)*time.Second)

// завершится с логом https://paste.nebius.dev/paste/1dc40974-6f27-417f-94da-5bf95d5d4a39.html
lockChannel := locker.LockerContext(ctx)
var cont bool
cont = true
Expand All @@ -304,7 +314,6 @@ func main() {
continue
}
lockErrCnt = 0
// не зависнет, ошибку не вернет, упадет дальше
srcDb, err := client.NewYdbClient(lockCtx, config.SrcConnectionString, srcOpts...)
if err != nil {
xlog.Fatal(ctx, "Unable to connect to src cluster", zap.Error(err))
Expand Down
97 changes: 77 additions & 20 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package config

import (
"aardappel/internal/types"
"aardappel/internal/util/xlog"
"context"
"errors"
"fmt"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
"os"
"strings"
)

type Stream struct {
SrcTopic string `yaml:"src_topic"`
DstTable string `yaml:"dst_table"`
Consumer string `yaml:"consumer"`
SrcTopic string `yaml:"src_topic"`
DstTable string `yaml:"dst_table"`
Consumer string `yaml:"consumer"`
ProblemStrategy string `yaml:"problem_strategy"`
}

type MonServer struct {
Expand All @@ -24,6 +28,10 @@ type CmdQueue struct {
Consumer string `yaml:"consumer"`
}

type DLQueue struct {
Path string `yaml:"path"`
}

type KeyFilter struct {
Path string `yaml:"table_path"`
}
Expand All @@ -45,6 +53,22 @@ type Config struct {
MonServer *MonServer `yaml:"mon_server"`
CmdQueue *CmdQueue `yaml:"cmd_queue"`
KeyFilter *KeyFilter `yaml:"key_filter"`
DLQueue *DLQueue `yaml:"dead_letter_queue"`
}

func verifyStreamProblemStrategy(configStrategy *string) error {
if *configStrategy == "" {
*configStrategy = types.ProblemStrategyStop
return nil
}
streamProblemStrategies := []string{types.ProblemStrategyStop, types.ProblemStrategyContinue}
for _, strategy := range streamProblemStrategies {
if strings.ToLower(strategy) == strings.ToLower(*configStrategy) {
*configStrategy = strategy
return nil
}
}
return errors.New("unknown stream problem strategy '" + *configStrategy + "'")
}

func (config Config) ToString() (string, error) {
Expand All @@ -55,24 +79,57 @@ func (config Config) ToString() (string, error) {
return string(data), nil
}

func InitConfig(ctx context.Context, confPath string) (Config, error) {
if len(confPath) != 0 {
confTxt, err := os.ReadFile(confPath)
func (config Config) verify() error {
for _, stream := range config.Streams {
err := verifyStreamProblemStrategy(&stream.ProblemStrategy)
if err != nil {
xlog.Error(ctx, "Unable to read configuration file",
zap.String("config_path", confPath),
zap.Error(err))
return Config{}, err
return err
}
var config Config
err = yaml.Unmarshal(confTxt, &config)
if err != nil {
xlog.Error(ctx, "Unable to parse configuration file",
zap.String("config_path", confPath),
zap.Error(err))
return Config{}, err
}
return config, nil
}
return Config{}, errors.New("configuration file path is empty")
return nil
}

func InitConfig(ctx context.Context, confPath string) (Config, error) {
if len(confPath) == 0 {
return Config{}, errors.New("configuration file path is empty")
}
confTxt, err := os.ReadFile(confPath)
if err != nil {
xlog.Error(ctx, "Unable to read configuration file",
zap.String("config_path", confPath),
zap.Error(err))
return Config{}, fmt.Errorf("unable to read configuration file: %w", err)
}
var config Config
err = yaml.Unmarshal(confTxt, &config)
if err != nil {
xlog.Error(ctx, "Unable to parse configuration file",
zap.String("config_path", confPath),
zap.Error(err))
return Config{}, fmt.Errorf("unable to parse configuration file: %w", err)
}
err = config.verify()
if err != nil {
xlog.Error(ctx, "Unable to verify configuration file", zap.Error(err))
return Config{}, fmt.Errorf("unable to verify configuration file: %w", err)
}
a := DLQueue{Path: "dlq_test"}
config.DLQueue = &a
config.StateTable = "vpolka_state_table"
config.Streams = make([]Stream, 2)
config.Streams[0] = Stream{
SrcTopic: "fake_cdc1",
DstTable: "/testing-global/aardappel-test/vpolka_table1",
Consumer: "c1",
ProblemStrategy: types.ProblemStrategyContinue,
}
config.Streams[1] = Stream{
SrcTopic: "fake_cdc2",
DstTable: "/testing-global/aardappel-test/vpolka_table2",
Consumer: "c1",
ProblemStrategy: types.ProblemStrategyContinue,
}
config.LogLevel = "info"
config.MaxExpHbInterval = 604800
return config, nil
}
1 change: 0 additions & 1 deletion internal/dst_table/dst_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func NewDstTable(client *client.TableClient, tablePath string) *DstTable {

func (dstTable *DstTable) DescribeTable(ctx context.Context) (*options.Description, error) {
var desc options.Description
// раньше зависало, теперь упадет с тем, что невозможно заинить таблицу
err := dstTable.client.Do(ctx, func(ctx context.Context, s table.Session) error {
var err error
desc, err = s.DescribeTable(ctx, dstTable.tablePath)
Expand Down
1 change: 0 additions & 1 deletion internal/hb_tracker/hb_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (ht *HeartBeatTracker) AddHb(data types.HbData) error {
if ok {
if types.NewPosition(hb).LessThan(*types.NewPosition(data)) {
// Got new heartbeat for stream - we can commit previous one
// При потери сети ошибка игнорируется, ардапл завершится, только если в другом месте будет ошибка
err := hb.CommitTopic()
if err != nil {
return fmt.Errorf("unable to commit topic during update HB %w, stepId: %d, txId: %d", err,
Expand Down
32 changes: 21 additions & 11 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ func NewCmdQueueConflictHandler(ctx context.Context, instanceId string, path str
return &handler
}

type DLQueue struct {
Writer *client.TopicWriter
Lock sync.Mutex
}

func NewDlQueue(ctx context.Context, writer *client.TopicWriter) *DLQueue {
var queue DLQueue
queue.Writer = writer
return &queue
}

func (queue *DLQueue) Write(ctx context.Context, msg string) error {
queue.Lock.Lock()
err := queue.Writer.Write(ctx, msg)
if err != nil {
xlog.Error(ctx, "Unable to write into dead letter queue", zap.Error(err), zap.String("msg", msg))
}
queue.Lock.Unlock()
return nil
}

func readWithTimeout(ctx context.Context, reader *client.TopicReader) (*topicreader.Message, error, bool) {
timingCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
Expand All @@ -129,7 +150,6 @@ func (this *CmdQueueConflictHandler) Handle(ctx context.Context, streamTopicPath
this.Lock.Lock()
defer this.Lock.Unlock()

// Если тут упадет, то только если в другом месте упадет...
reader, err := this.Topic.StartReader(this.Consumer, this.Path)
if err != nil {
xlog.Fatal(ctx, "Unable to read from specified command topic",
Expand Down Expand Up @@ -248,7 +268,6 @@ func selectReplicationState(ctx context.Context, client *client.TableClient, sta
var lastMsg *string
var stage *string

// Раньше зависало, сейчас упадет сразу с ошибкой, что не удалось создать процессов из-за таймаута
err := client.DoTx(ctx,
func(ctx context.Context, tx table.TransactionActor) error {
res, err := tx.Execute(ctx, stateQuery, param)
Expand Down Expand Up @@ -348,7 +367,6 @@ func (processor *Processor) EnqueueHb(ctx context.Context, hb types.HbData) {
zap.Int64("partition_id:", hb.StreamId.PartitionId),
zap.Bool("willSkip", types.NewPosition(hb).LessThan(lastPosition)))
if types.NewPosition(hb).LessThan(lastPosition) {
// Ошибку будет не заметно, только если в другом месте упадет
err := hb.CommitTopic()
if err != nil {
xlog.Fatal(ctx, "unable to commit topic", zap.Error(err))
Expand Down Expand Up @@ -385,7 +403,6 @@ func (processor *Processor) SaveReplicationState(ctx context.Context, status str
stopQuery := fmt.Sprintf("UPSERT INTO %v (id, state, last_msg) VALUES ($instanceId,$state,$lastError)",
processor.stateTablePath)

// Раньше зависало, сейчас нет, после этого xlog.fatal, так что ардапель должен нормально завершиться
return processor.dstServerClient.DoTx(ctx,
func(ctx context.Context, tx table.TransactionActor) error {
_, err := tx.Execute(ctx, stopQuery, param)
Expand All @@ -401,7 +418,6 @@ func (processor *Processor) EnqueueTx(ctx context.Context, tx types.TxData) {
zap.Uint32("reader_id", tx.TableId),
zap.Bool("willSkip", (types.Position{tx.Step, tx.TxId}.LessThan(lastPosition))))
if (types.Position{tx.Step, tx.TxId}.LessThan(lastPosition)) {
// Ошибка незаметна, ток если где то в другом месте завершится...
err := tx.CommitTopic()
if err != nil {
xlog.Fatal(ctx, "unable to commit topic", zap.Error(err))
Expand Down Expand Up @@ -653,7 +669,6 @@ func (processor *Processor) DoInitialScan(ctx context.Context, dstTables []*dst_
}

for i := 0; i < len(txs); i++ {
// Ошибка незаметна, так что ток если в другом месте обнаружится
err := txs[i].CommitTopic()
if err != nil {
return nil, fmt.Errorf("%w; Unable to commit topic fot dataTx", err)
Expand All @@ -664,7 +679,6 @@ func (processor *Processor) DoInitialScan(ctx context.Context, dstTables []*dst_
xlog.Debug(ctx, "commit hb in topic",
zap.Uint64("step", processor.initialScanPos.Step),
zap.Uint64("tx_id", processor.initialScanPos.TxId))
// Ошибка не заметна, ток если в другом месте стопнется
err := processor.initialScanPos.CommitTopic()
if err != nil {
return nil, fmt.Errorf("%w; Unable to commit topic fot hb", err)
Expand Down Expand Up @@ -711,15 +725,13 @@ func (processor *Processor) PushAsSingleTx(ctx context.Context, data dst_table.P
)
param := append(data.Parameters, *stateParam...)

// Завершится из-за таймаута
ctxWithTimeout, cancel := context.WithTimeout(ctx, client.DEFAULT_TIMEOUT)
defer cancel()
_, err := tx.Execute(ctxWithTimeout, data.Query+processor.stateStoreQuery, &param, options.WithCommit())
return client.HandleRequestError(ctx, err)
}

func (processor *Processor) PushTxs(ctx context.Context, data dst_table.PushQuery, tx table.Transaction) error {
// Завершится из-за таймаута
ctxWithTimeout, cancel := context.WithTimeout(ctx, client.DEFAULT_TIMEOUT)
defer cancel()
_, err := tx.Execute(ctxWithTimeout, data.Query, &data.Parameters, options.WithCommit())
Expand Down Expand Up @@ -757,7 +769,6 @@ func (processor *Processor) DoReplication(ctx context.Context, dstTables []*dst_
processor.lastPosition.Store(*types.NewPosition(batch.Hb))

for i := 0; i < len(batch.TxData); i++ {
// Ошбика не заметна, ток если в другом месте упадет
err := batch.TxData[i].CommitTopic()
if err != nil {
return nil, fmt.Errorf("%w; Unable to commit topic fot dataTx", err)
Expand All @@ -767,7 +778,6 @@ func (processor *Processor) DoReplication(ctx context.Context, dstTables []*dst_
xlog.Debug(ctx, "commit hb in topic",
zap.Uint64("step", batch.Hb.Step),
zap.Uint64("tx_id", batch.Hb.TxId))
// Ошибка не заметна, ток если в другом месте упадет
err = batch.Hb.CommitTopic()
if err != nil {
return nil, fmt.Errorf("%w; Unable to commit topic fot hb", err)
Expand Down
Loading

0 comments on commit 51e3efc

Please sign in to comment.