Skip to content

Commit

Permalink
Command queue config option (#44)
Browse files Browse the repository at this point in the history
* Config option for command queue

(cherry picked from commit ad5d2b4)
  • Loading branch information
dcherednik authored and VPolka committed Nov 18, 2024
1 parent 4a18b76 commit 8d4b519
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/aardappel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *ydb.Driver, ds
xlog.Debug(ctx, "All topics described",
zap.Int("total parts", totalPartitions))

if config.CmdQueue != nil {
xlog.Debug(ctx, "Command queue present in config",
zap.String("path", config.CmdQueue.Path),
zap.String("consumer", config.CmdQueue.Consumer))
}

prc, err := processor.NewProcessor(ctx, totalPartitions, config.StateTable, dstDb.Table(), config.InstanceId)
if err != nil {
xlog.Fatal(ctx, "Unable to create processor", zap.Error(err))
Expand All @@ -133,6 +139,7 @@ func doMain(ctx context.Context, config configInit.Config, srcDb *ydb.Driver, ds
xlog.Info(ctx, "start heartbeat tracker guard timer", zap.Uint32("timeout in seconds", config.MaxExpHbInterval))
prc.StartHbGuard(ctx, config.MaxExpHbInterval, streamDbgInfos)
}

var dstTables []*dst_table.DstTable
for i := 0; i < len(config.Streams); i++ {
reader, err := srcDb.Topic().StartReader(config.Streams[i].Consumer, topicoptions.ReadTopic(config.Streams[i].SrcTopic))
Expand Down
6 changes: 6 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type MonServer struct {
Listen string `yaml:"listen"`
}

type CmdQueue struct {
Path string `yaml:"path"`
Consumer string `yaml:"consumer"`
}

type Config struct {
SrcConnectionString string `yaml:"src_connection_string"`
SrcClientBalancer bool `yaml:"src_client_balancer"`
Expand All @@ -34,6 +39,7 @@ type Config struct {
MaxExpHbInterval uint32 `yaml:"max_expected_heartbeat_interval"`
LogLevel string `yaml:"log_level"`
MonServer *MonServer `yaml:"mon_server"`
CmdQueue *CmdQueue `yaml:"cmd_queue"`
}

func (config Config) ToString() (string, error) {
Expand Down

0 comments on commit 8d4b519

Please sign in to comment.