From cdcc7cf0a883491d904e09a61f47b9d32d0b730e Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:24:23 -0700 Subject: [PATCH 1/4] refactoring CLI to use connection tunnel manager --- cli/internal/cmds/neosync/sync/sync.go | 202 +++++++++++++++--- cli/internal/cmds/neosync/sync/ui.go | 17 +- .../connection-tunnel-manager/manager.go | 0 .../connection-tunnel-manager/manager_test.go | 0 .../mock_ConnectionProvider.go | 0 .../providers/mongo}/mongo-pool-provider.go | 24 +-- .../pool/providers/sql}/sql-pool-provider.go | 22 +- .../providers/sql}/sql-pool-provider_test.go | 6 +- .../providers/mongoprovider/provider.go | 2 +- .../providers/provider.go | 2 +- .../providers/provider_test.go | 2 +- .../providers/sqlprovider/provider.go | 2 +- worker/pkg/benthos/mongodb/output.go | 20 -- worker/pkg/benthos/sql/output_sql_insert.go | 27 --- worker/pkg/benthos/sql/output_sql_update.go | 53 ----- .../datasync/activities/sync/activity.go | 14 +- 16 files changed, 221 insertions(+), 172 deletions(-) rename {worker/internal => internal}/connection-tunnel-manager/manager.go (100%) rename {worker/internal => internal}/connection-tunnel-manager/manager_test.go (100%) rename {worker/internal => internal}/connection-tunnel-manager/mock_ConnectionProvider.go (100%) rename {worker/pkg/workflows/datasync/activities/sync => internal/connection-tunnel-manager/pool/providers/mongo}/mongo-pool-provider.go (70%) rename {worker/pkg/workflows/datasync/activities/sync => internal/connection-tunnel-manager/pool/providers/sql}/sql-pool-provider.go (77%) rename {worker/pkg/workflows/datasync/activities/sync => internal/connection-tunnel-manager/pool/providers/sql}/sql-pool-provider_test.go (72%) rename {worker/internal => internal}/connection-tunnel-manager/providers/mongoprovider/provider.go (92%) rename {worker/internal => internal}/connection-tunnel-manager/providers/provider.go (94%) rename {worker/internal => internal}/connection-tunnel-manager/providers/provider_test.go (97%) rename {worker/internal => internal}/connection-tunnel-manager/providers/sqlprovider/provider.go (92%) diff --git a/cli/internal/cmds/neosync/sync/sync.go b/cli/internal/cmds/neosync/sync/sync.go index cf2c8886cd..2c5ce1494f 100644 --- a/cli/internal/cmds/neosync/sync/sync.go +++ b/cli/internal/cmds/neosync/sync/sync.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "slices" "strings" @@ -12,6 +13,7 @@ import ( "connectrpc.com/connect" charmlog "github.com/charmbracelet/log" + "github.com/google/uuid" mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" pg_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/postgresql" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" @@ -31,13 +33,19 @@ import ( "github.com/nucleuscloud/neosync/cli/internal/serverconfig" "github.com/nucleuscloud/neosync/cli/internal/userconfig" "github.com/nucleuscloud/neosync/cli/internal/version" - neosyncbenthos_dynamodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/dynamodb" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" + pool_sql_provider "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/pool/providers/sql" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers/mongoprovider" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers/sqlprovider" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" _ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs" + benthos_environment "github.com/nucleuscloud/neosync/worker/pkg/benthos/environment" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + "github.com/warpstreamlabs/bento/public/bloblang" _ "github.com/warpstreamlabs/bento/public/components/aws" _ "github.com/warpstreamlabs/bento/public/components/io" _ "github.com/warpstreamlabs/bento/public/components/pure" @@ -54,6 +62,7 @@ type DriverType string const ( postgresDriver DriverType = "postgres" mysqlDriver DriverType = "mysql" + mssqlDriver DriverType = "mssql" awsS3Connection ConnectionType = "awsS3" gcpCloudStorageConnection ConnectionType = "gcpCloudStorage" @@ -330,24 +339,24 @@ func sync( sqlConnector := &sqlconnect.SqlOpenConnector{} sqlmanagerclient := sqlmanager.NewSqlManager(pgpoolmap, pgquerier, mysqlpoolmap, mysqlquerier, mssqlpoolmap, mssqlquerier, sqlConnector) - logger.Debug("Retrieving neosync connection") + logger.Debug("Retrieving neosync source connection") connResp, err := connectionclient.GetConnection(ctx, connect.NewRequest(&mgmtv1alpha1.GetConnectionRequest{ Id: cmd.Source.ConnectionId, })) if err != nil { return err } - connection := connResp.Msg.GetConnection() - connectionType, err := getConnectionType(connection) + sourceConnection := connResp.Msg.GetConnection() + sourceConnectionType, err := getConnectionType(sourceConnection) if err != nil { return err } - logger.Debug(fmt.Sprintf("Source connection type: %s", connectionType)) + logger.Debug(fmt.Sprintf("Source connection type: %s", sourceConnectionType)) - if connectionType == awsS3Connection && (cmd.Source.ConnectionOpts.JobId == nil || *cmd.Source.ConnectionOpts.JobId == "") && (cmd.Source.ConnectionOpts.JobRunId == nil || *cmd.Source.ConnectionOpts.JobRunId == "") { + if sourceConnectionType == awsS3Connection && (cmd.Source.ConnectionOpts.JobId == nil || *cmd.Source.ConnectionOpts.JobId == "") && (cmd.Source.ConnectionOpts.JobRunId == nil || *cmd.Source.ConnectionOpts.JobRunId == "") { return errors.New("S3 source connection type requires job-id or job-run-id.") } - if connectionType == gcpCloudStorageConnection && (cmd.Source.ConnectionOpts.JobId == nil || *cmd.Source.ConnectionOpts.JobId == "") && (cmd.Source.ConnectionOpts.JobRunId == nil || *cmd.Source.ConnectionOpts.JobRunId == "") { + if sourceConnectionType == gcpCloudStorageConnection && (cmd.Source.ConnectionOpts.JobId == nil || *cmd.Source.ConnectionOpts.JobId == "") && (cmd.Source.ConnectionOpts.JobRunId == nil || *cmd.Source.ConnectionOpts.JobRunId == "") { return errors.New("GCP Cloud Storage source connection type requires job-id or job-run-id") } @@ -355,7 +364,7 @@ func sync( return fmt.Errorf("truncate cascade is only supported in postgres") } - if connectionType == mysqlConnection || connectionType == postgresConnection { + if sourceConnectionType == mysqlConnection || sourceConnectionType == postgresConnection { if cmd.Destination.Driver == "" { return fmt.Errorf("must provide destination-driver") } @@ -368,7 +377,7 @@ func sync( } } - if connectionType == awsDynamoDBConnection { + if sourceConnectionType == awsDynamoDBConnection { if cmd.AwsDynamoDbDestination == nil { return fmt.Errorf("must provide destination aws credentials") } @@ -408,21 +417,74 @@ func sync( return errors.New("Account Id not found. Please use account switch command to set account.") } - if connection.AccountId != *accountId { + if sourceConnection.AccountId != *accountId { return fmt.Errorf("Connection not found. AccountId: %s", *accountId) } } } logger.Debug("Checking if source and destination are compatible") - err = areSourceAndDestCompatible(connection, cmd.Destination.Driver) + err = areSourceAndDestCompatible(sourceConnection, cmd.Destination.Driver) + if err != nil { + return err + } + + connectionprovider := providers.NewProvider( + mongoprovider.NewProvider(), + sqlprovider.NewProvider(sqlConnector), + ) + tunnelmanager := connectiontunnelmanager.NewConnectionTunnelManager(connectionprovider) + session := uuid.NewString() + // might not need this in cli context + defer func() { + tunnelmanager.ReleaseSession(session) + }() + + destConnection := cmdConfigToDestinationConnection(cmd) + dsnToConnIdMap := &syncmap.Map{} + var sqlDsn string + if cmd.Destination != nil { + sqlDsn = cmd.Destination.ConnectionUrl + } + dsnToConnIdMap.Store(sqlDsn, destConnection.Id) + stopChan := make(chan error, 3) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-stopChan: + cancel() + return + } + } + }() + benv, err := benthos_environment.NewEnvironment( + slog.Default(), + benthos_environment.WithSqlConfig(&benthos_environment.SqlConfig{ + Provider: pool_sql_provider.NewProvider(pool_sql_provider.GetSqlPoolProviderGetter( + tunnelmanager, + dsnToConnIdMap, + map[string]*mgmtv1alpha1.Connection{ + destConnection.Id: destConnection, + }, + session, + slog.Default(), + )), + IsRetry: false, + }), + benthos_environment.WithStopChannel(stopChan), + benthos_environment.WithBlobEnv(bloblang.NewEnvironment()), + ) if err != nil { return err } logger.Info("Retrieving connection schema...") var schemaConfig *schemaConfig - switch connectionType { + switch sourceConnectionType { case awsS3Connection: var cfg *mgmtv1alpha1.AwsS3SchemaConfig if cmd.Source.ConnectionOpts.JobRunId != nil && *cmd.Source.ConnectionOpts.JobRunId != "" { @@ -436,7 +498,7 @@ func sync( }, } - schemaCfg, err := getDestinationSchemaConfig(ctx, connectiondataclient, sqlmanagerclient, connection, cmd, s3Config, logger) + schemaCfg, err := getDestinationSchemaConfig(ctx, connectiondataclient, sqlmanagerclient, sourceConnection, cmd, s3Config, logger) if err != nil { return err } @@ -459,7 +521,7 @@ func sync( }, } - schemaCfg, err := getDestinationSchemaConfig(ctx, connectiondataclient, sqlmanagerclient, connection, cmd, gcpConfig, logger) + schemaCfg, err := getDestinationSchemaConfig(ctx, connectiondataclient, sqlmanagerclient, sourceConnection, cmd, gcpConfig, logger) if err != nil { return err } @@ -475,7 +537,7 @@ func sync( MysqlConfig: &mgmtv1alpha1.MysqlSchemaConfig{}, }, } - schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, connection, cmd, mysqlCfg) + schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, sourceConnection, cmd, mysqlCfg) if err != nil { return err } @@ -491,7 +553,7 @@ func sync( PgConfig: &mgmtv1alpha1.PostgresSchemaConfig{}, }, } - schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, connection, cmd, postgresConfig) + schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, sourceConnection, cmd, postgresConfig) if err != nil { return err } @@ -506,7 +568,7 @@ func sync( DynamodbConfig: &mgmtv1alpha1.DynamoDBSchemaConfig{}, }, } - schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, connection, cmd, dynamoConfig) + schemaCfg, err := getConnectionSchemaConfig(ctx, logger, connectiondataclient, sourceConnection, cmd, dynamoConfig) if err != nil { return err } @@ -524,7 +586,7 @@ func sync( configs = append(configs, benthosConfig) } - return runSync(ctx, outputType, [][]*benthosConfigResponse{configs}, logger) + return runSync(ctx, outputType, benv, [][]*benthosConfigResponse{configs}, logger) default: return fmt.Errorf("this connection type is not currently supported") } @@ -544,7 +606,7 @@ func sync( logger.Infof("Generating %d sync configs...", syncConfigCount) configs := []*benthosConfigResponse{} for _, cfg := range syncConfigs { - benthosConfig := generateBenthosConfig(cmd, connectionType, serverconfig.GetApiBaseUrl(), cfg, token) + benthosConfig := generateBenthosConfig(cmd, sourceConnectionType, serverconfig.GetApiBaseUrl(), cfg, token) configs = append(configs, benthosConfig) } @@ -554,7 +616,7 @@ func sync( return nil } - return runSync(ctx, outputType, groupedConfigs, logger) + return runSync(ctx, outputType, benv, groupedConfigs, logger) } func areSourceAndDestCompatible(connection *mgmtv1alpha1.Connection, destinationDriver DriverType) error { @@ -574,32 +636,33 @@ func areSourceAndDestCompatible(connection *mgmtv1alpha1.Connection, destination return nil } -func syncData(ctx context.Context, cfg *benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) error { +var ( + // Hack that locks the instanced bento stream builder build step that causes data races if done in parallel + streamBuilderMu syncmap.Mutex +) + +func syncData(ctx context.Context, benv *service.Environment, cfg *benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) error { configbits, err := yaml.Marshal(cfg.Config) if err != nil { return err } - env := service.NewEnvironment() - - err = neosyncbenthos_dynamodb.RegisterDynamoDbOutput(env) - if err != nil { - return fmt.Errorf("unable to register dynamodb output to benthos instance: %w", err) - } - + benthosStreamMutex := syncmap.Mutex{} var benthosStream *service.Stream go func() { for { //nolint select { case <-ctx.Done(): + benthosStreamMutex.Lock() if benthosStream != nil { // this must be here because stream.Run(ctx) doesn't seem to fully obey a canceled context when // a sink is in an error state. We want to explicitly call stop here because the workflow has been canceled. - err := benthosStream.Stop(ctx) + err := benthosStream.StopWithin(1 * time.Millisecond) if err != nil { logger.Error(err.Error()) } } + benthosStreamMutex.Unlock() return } } @@ -610,7 +673,8 @@ func syncData(ctx context.Context, cfg *benthosConfigResponse, logger *charmlog. if len(split) != 0 { runType = split[len(split)-1] } - streambldr := env.NewStreamBuilder() + streamBuilderMu.Lock() + streambldr := benv.NewStreamBuilder() if outputType == output.PlainOutput { streambldr.SetPrintLogger(logger.With("benthos", "true", "table", cfg.Table, "runType", runType).StandardLog()) } @@ -621,19 +685,97 @@ func syncData(ctx context.Context, cfg *benthosConfigResponse, logger *charmlog. } stream, err := streambldr.Build() + streamBuilderMu.Unlock() if err != nil { return err } + benthosStreamMutex.Lock() benthosStream = stream + benthosStreamMutex.Unlock() err = stream.Run(ctx) if err != nil { return fmt.Errorf("unable to run benthos stream: %w", err) } + benthosStreamMutex.Lock() benthosStream = nil + benthosStreamMutex.Unlock() return nil } +func cmdConfigToDestinationConnection(cmd *cmdConfig) *mgmtv1alpha1.Connection { + destId := uuid.NewString() + if cmd.Destination != nil { + switch cmd.Destination.Driver { + case postgresDriver: + return &mgmtv1alpha1.Connection{ + Id: destId, + Name: destId, + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_PgConfig{ + PgConfig: &mgmtv1alpha1.PostgresConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ + Url: cmd.Destination.ConnectionUrl, + }, + }, + }, + }, + } + case mysqlDriver: + return &mgmtv1alpha1.Connection{ + Id: destId, + Name: destId, + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_MysqlConfig{ + MysqlConfig: &mgmtv1alpha1.MysqlConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.MysqlConnectionConfig_Url{ + Url: cmd.Destination.ConnectionUrl, + }, + }, + }, + }, + } + case mssqlDriver: + return &mgmtv1alpha1.Connection{ + Id: destId, + Name: destId, + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_MssqlConfig{ + MssqlConfig: &mgmtv1alpha1.MssqlConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.MssqlConnectionConfig_Url{ + Url: cmd.Destination.ConnectionUrl, + }, + }, + }, + }, + } + } + } else if cmd.AwsDynamoDbDestination != nil { + creds := &mgmtv1alpha1.AwsS3Credentials{} + if cmd.AwsDynamoDbDestination.AwsCredConfig != nil { + cfg := cmd.AwsDynamoDbDestination.AwsCredConfig + creds.Profile = cfg.Profile + creds.AccessKeyId = cfg.AccessKeyID + creds.SecretAccessKey = cfg.SecretAccessKey + creds.SessionToken = cfg.SessionToken + creds.RoleArn = cfg.RoleARN + creds.RoleExternalId = cfg.RoleExternalID + } + return &mgmtv1alpha1.Connection{ + Id: destId, + Name: destId, + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_DynamodbConfig{ + DynamodbConfig: &mgmtv1alpha1.DynamoDBConnectionConfig{ + Credentials: creds, + }, + }, + }, + } + } + return &mgmtv1alpha1.Connection{} +} + func runDestinationInitStatements( ctx context.Context, logger *charmlog.Logger, diff --git a/cli/internal/cmds/neosync/sync/ui.go b/cli/internal/cmds/neosync/sync/ui.go index cfcbc52555..8f2cab68e3 100644 --- a/cli/internal/cmds/neosync/sync/ui.go +++ b/cli/internal/cmds/neosync/sync/ui.go @@ -18,6 +18,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/io" _ "github.com/warpstreamlabs/bento/public/components/pure" _ "github.com/warpstreamlabs/bento/public/components/pure/extended" + "github.com/warpstreamlabs/bento/public/service" "github.com/charmbracelet/bubbles/spinner" tea "github.com/charmbracelet/bubbletea" @@ -28,6 +29,7 @@ import ( type model struct { ctx context.Context logger *charmlog.Logger + benv *service.Environment groupedConfigs [][]*benthosConfigResponse tableSynced int index int @@ -50,7 +52,7 @@ var ( durationStyle = dotStyle ) -func newModel(ctx context.Context, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) *model { +func newModel(ctx context.Context, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) *model { s := spinner.New() s.Style = lipgloss.NewStyle().Foreground(lipgloss.Color("63")) return &model{ @@ -61,11 +63,12 @@ func newModel(ctx context.Context, groupedConfigs [][]*benthosConfigResponse, lo totalConfigCount: getConfigCount(groupedConfigs), logger: logger, outputType: outputType, + benv: benv, } } func (m *model) Init() tea.Cmd { - return tea.Batch(m.syncConfigs(m.ctx, m.groupedConfigs[m.index]), m.spinner.Tick) + return tea.Batch(m.syncConfigs(m.ctx, m.benv, m.groupedConfigs[m.index]), m.spinner.Tick) } func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { @@ -95,7 +98,7 @@ func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.index++ return m, tea.Batch( tea.Println(strings.Join(successStrs, " \n")), - m.syncConfigs(m.ctx, m.groupedConfigs[m.index]), + m.syncConfigs(m.ctx, m.benv, m.groupedConfigs[m.index]), ) case spinner.TickMsg: var cmd tea.Cmd @@ -135,7 +138,7 @@ func (m *model) View() string { type syncedDataMsg map[string]string -func (m *model) syncConfigs(ctx context.Context, configs []*benthosConfigResponse) tea.Cmd { +func (m *model) syncConfigs(ctx context.Context, benv *service.Environment, configs []*benthosConfigResponse) tea.Cmd { return func() tea.Msg { messageMap := syncmap.Map{} errgrp, errctx := errgroup.WithContext(ctx) @@ -145,7 +148,7 @@ func (m *model) syncConfigs(ctx context.Context, configs []*benthosConfigRespons errgrp.Go(func() error { start := time.Now() m.logger.Infof("Syncing table %s", cfg.Name) - err := syncData(errctx, cfg, m.logger, m.outputType) + err := syncData(errctx, benv, cfg, m.logger, m.outputType) if err != nil { fmt.Printf("Error syncing table: %s", err.Error()) //nolint:forbidigo return err @@ -190,7 +193,7 @@ func getConfigCount(groupedConfigs [][]*benthosConfigResponse) int { return count } -func runSync(ctx context.Context, outputType output.OutputType, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger) error { +func runSync(ctx context.Context, outputType output.OutputType, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger) error { var opts []tea.ProgramOption if outputType == output.PlainOutput { // Plain mode don't render the TUI @@ -200,7 +203,7 @@ func runSync(ctx context.Context, outputType output.OutputType, groupedConfigs [ // TUI mode, discard log output logger.SetOutput(io.Discard) } - if _, err := tea.NewProgram(newModel(ctx, groupedConfigs, logger, outputType), opts...).Run(); err != nil { + if _, err := tea.NewProgram(newModel(ctx, benv, groupedConfigs, logger, outputType), opts...).Run(); err != nil { logger.Error("Error syncing data:", err) os.Exit(1) } diff --git a/worker/internal/connection-tunnel-manager/manager.go b/internal/connection-tunnel-manager/manager.go similarity index 100% rename from worker/internal/connection-tunnel-manager/manager.go rename to internal/connection-tunnel-manager/manager.go diff --git a/worker/internal/connection-tunnel-manager/manager_test.go b/internal/connection-tunnel-manager/manager_test.go similarity index 100% rename from worker/internal/connection-tunnel-manager/manager_test.go rename to internal/connection-tunnel-manager/manager_test.go diff --git a/worker/internal/connection-tunnel-manager/mock_ConnectionProvider.go b/internal/connection-tunnel-manager/mock_ConnectionProvider.go similarity index 100% rename from worker/internal/connection-tunnel-manager/mock_ConnectionProvider.go rename to internal/connection-tunnel-manager/mock_ConnectionProvider.go diff --git a/worker/pkg/workflows/datasync/activities/sync/mongo-pool-provider.go b/internal/connection-tunnel-manager/pool/providers/mongo/mongo-pool-provider.go similarity index 70% rename from worker/pkg/workflows/datasync/activities/sync/mongo-pool-provider.go rename to internal/connection-tunnel-manager/pool/providers/mongo/mongo-pool-provider.go index e5f24981fe..75797a3608 100644 --- a/worker/pkg/workflows/datasync/activities/sync/mongo-pool-provider.go +++ b/internal/connection-tunnel-manager/pool/providers/mongo/mongo-pool-provider.go @@ -1,4 +1,4 @@ -package sync_activity +package pool_mongo_provider import ( "errors" @@ -7,36 +7,36 @@ import ( "sync" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" ) -type mongoConnectionGetter = func(url string) (neosync_benthos_mongodb.MongoClient, error) +type Getter = func(url string) (neosync_benthos_mongodb.MongoClient, error) -// wrapper used for benthos sql-based connections to retrieve the connection they need -type mongoPoolPovider struct { - getter mongoConnectionGetter +// wrapper used for benthos mongo-based connections to retrieve the connection they need +type Provider struct { + getter Getter } -var _ neosync_benthos_mongodb.MongoPoolProvider = &mongoPoolPovider{} +var _ neosync_benthos_mongodb.MongoPoolProvider = (*Provider)(nil) -func newMongoPoolProvider(getter mongoConnectionGetter) *mongoPoolPovider { - return &mongoPoolPovider{getter: getter} +func NewProvider(getter Getter) *Provider { + return &Provider{getter: getter} } -func (p *mongoPoolPovider) GetClient(url string) (neosync_benthos_mongodb.MongoClient, error) { +func (p *Provider) GetClient(url string) (neosync_benthos_mongodb.MongoClient, error) { return p.getter(url) } // Returns a function that converts a raw DSN directly to the relevant pooled sql client. // Allows sharing connections across activities for effective pooling and SSH tunnel management. -func getMongoPoolProviderGetter( +func GetMongoPoolProviderGetter( tunnelmanager connectiontunnelmanager.Interface[any], dsnToConnectionIdMap *sync.Map, connectionMap map[string]*mgmtv1alpha1.Connection, session string, slogger *slog.Logger, -) mongoConnectionGetter { +) Getter { return func(url string) (neosync_benthos_mongodb.MongoClient, error) { connid, ok := dsnToConnectionIdMap.Load(url) if !ok { diff --git a/worker/pkg/workflows/datasync/activities/sync/sql-pool-provider.go b/internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider.go similarity index 77% rename from worker/pkg/workflows/datasync/activities/sync/sql-pool-provider.go rename to internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider.go index 3b9c9a4a7f..da87cac4e1 100644 --- a/worker/pkg/workflows/datasync/activities/sync/sql-pool-provider.go +++ b/internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider.go @@ -1,4 +1,4 @@ -package sync_activity +package pool_sql_provider import ( "errors" @@ -7,34 +7,36 @@ import ( "sync" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" ) -type sqlConnectionGetter = func(dsn string) (neosync_benthos_sql.SqlDbtx, error) +type Getter = func(dsn string) (neosync_benthos_sql.SqlDbtx, error) // wrapper used for benthos sql-based connections to retrieve the connection they need -type sqlPoolProvider struct { - getter sqlConnectionGetter +type Provider struct { + getter Getter } -func newSqlPoolProvider(getter sqlConnectionGetter) *sqlPoolProvider { - return &sqlPoolProvider{getter: getter} +var _ neosync_benthos_sql.DbPoolProvider = (*Provider)(nil) + +func NewProvider(getter Getter) *Provider { + return &Provider{getter: getter} } -func (p *sqlPoolProvider) GetDb(driver, dsn string) (neosync_benthos_sql.SqlDbtx, error) { +func (p *Provider) GetDb(driver, dsn string) (neosync_benthos_sql.SqlDbtx, error) { return p.getter(dsn) } // Returns a function that converts a raw DSN directly to the relevant pooled sql client. // Allows sharing connections across activities for effective pooling and SSH tunnel management. -func getSqlPoolProviderGetter( +func GetSqlPoolProviderGetter( tunnelmanager connectiontunnelmanager.Interface[any], dsnToConnectionIdMap *sync.Map, connectionMap map[string]*mgmtv1alpha1.Connection, session string, slogger *slog.Logger, -) sqlConnectionGetter { +) Getter { return func(dsn string) (neosync_benthos_sql.SqlDbtx, error) { connid, ok := dsnToConnectionIdMap.Load(dsn) if !ok { diff --git a/worker/pkg/workflows/datasync/activities/sync/sql-pool-provider_test.go b/internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider_test.go similarity index 72% rename from worker/pkg/workflows/datasync/activities/sync/sql-pool-provider_test.go rename to internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider_test.go index fe67fa682a..5ebbccc322 100644 --- a/worker/pkg/workflows/datasync/activities/sync/sql-pool-provider_test.go +++ b/internal/connection-tunnel-manager/pool/providers/sql/sql-pool-provider_test.go @@ -1,4 +1,4 @@ -package sync_activity +package pool_sql_provider import ( "testing" @@ -8,11 +8,11 @@ import ( ) func Test_newPoolProvider(t *testing.T) { - assert.NotNil(t, newSqlPoolProvider(nil)) + assert.NotNil(t, NewProvider(nil)) } func Test_newPoolProvider_GetDb(t *testing.T) { - provider := newSqlPoolProvider(func(dsn string) (neosync_benthos_sql.SqlDbtx, error) { + provider := NewProvider(func(dsn string) (neosync_benthos_sql.SqlDbtx, error) { return neosync_benthos_sql.NewMockSqlDbtx(t), nil }) assert.NotNil(t, provider) diff --git a/worker/internal/connection-tunnel-manager/providers/mongoprovider/provider.go b/internal/connection-tunnel-manager/providers/mongoprovider/provider.go similarity index 92% rename from worker/internal/connection-tunnel-manager/providers/mongoprovider/provider.go rename to internal/connection-tunnel-manager/providers/mongoprovider/provider.go index 162ad41b91..3ccf790af0 100644 --- a/worker/internal/connection-tunnel-manager/providers/mongoprovider/provider.go +++ b/internal/connection-tunnel-manager/providers/mongoprovider/provider.go @@ -5,7 +5,7 @@ import ( "errors" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" diff --git a/worker/internal/connection-tunnel-manager/providers/provider.go b/internal/connection-tunnel-manager/providers/provider.go similarity index 94% rename from worker/internal/connection-tunnel-manager/providers/provider.go rename to internal/connection-tunnel-manager/providers/provider.go index e8c647e39a..3438fb0c62 100644 --- a/worker/internal/connection-tunnel-manager/providers/provider.go +++ b/internal/connection-tunnel-manager/providers/provider.go @@ -4,7 +4,7 @@ import ( "fmt" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" ) diff --git a/worker/internal/connection-tunnel-manager/providers/provider_test.go b/internal/connection-tunnel-manager/providers/provider_test.go similarity index 97% rename from worker/internal/connection-tunnel-manager/providers/provider_test.go rename to internal/connection-tunnel-manager/providers/provider_test.go index cc8acf6489..d0e5aa1f5d 100644 --- a/worker/internal/connection-tunnel-manager/providers/provider_test.go +++ b/internal/connection-tunnel-manager/providers/provider_test.go @@ -4,7 +4,7 @@ import ( "testing" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" "github.com/stretchr/testify/mock" diff --git a/worker/internal/connection-tunnel-manager/providers/sqlprovider/provider.go b/internal/connection-tunnel-manager/providers/sqlprovider/provider.go similarity index 92% rename from worker/internal/connection-tunnel-manager/providers/sqlprovider/provider.go rename to internal/connection-tunnel-manager/providers/sqlprovider/provider.go index f2b6d26f1f..f284ddc9eb 100644 --- a/worker/internal/connection-tunnel-manager/providers/sqlprovider/provider.go +++ b/internal/connection-tunnel-manager/providers/sqlprovider/provider.go @@ -5,7 +5,7 @@ import ( mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" ) diff --git a/worker/pkg/benthos/mongodb/output.go b/worker/pkg/benthos/mongodb/output.go index b0d6b1dda1..b19600934a 100644 --- a/worker/pkg/benthos/mongodb/output.go +++ b/worker/pkg/benthos/mongodb/output.go @@ -77,26 +77,6 @@ func outputSpec() *service.ConfigSpec { return spec } -// func init() { -// err := service.RegisterBatchOutput( -// "mongodb", outputSpec(), -// func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPol service.BatchPolicy, mif int, err error) { -// if batchPol, err = conf.FieldBatchPolicy(moFieldBatching); err != nil { -// return -// } -// if mif, err = conf.FieldMaxInFlight(); err != nil { -// return -// } -// if out, err = newOutputWriter(conf, mgr); err != nil { -// return -// } -// return -// }) -// if err != nil { -// panic(err) -// } -// } - func RegisterPooledMongoDbOutput(env *service.Environment, clientProvider MongoPoolProvider) error { return env.RegisterBatchOutput( "pooled_mongodb", outputSpec(), diff --git a/worker/pkg/benthos/sql/output_sql_insert.go b/worker/pkg/benthos/sql/output_sql_insert.go index d79ba06c65..2c7a4c23ff 100644 --- a/worker/pkg/benthos/sql/output_sql_insert.go +++ b/worker/pkg/benthos/sql/output_sql_insert.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "os" "strings" "sync" @@ -64,32 +63,6 @@ func RegisterPooledSqlInsertOutput(env *service.Environment, dbprovider DbPoolPr ) } -func init() { - dbprovider := NewDbPoolProvider() - err := service.RegisterBatchOutput( - "pooled_sql_insert", sqlInsertOutputSpec(), - func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchOutput, service.BatchPolicy, int, error) { - batchPolicy, err := conf.FieldBatchPolicy("batching") - if err != nil { - return nil, batchPolicy, -1, err - } - - maxInFlight, err := conf.FieldInt("max_in_flight") - if err != nil { - return nil, service.BatchPolicy{}, -1, err - } - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{})) - out, err := newInsertOutput(conf, mgr, dbprovider, false, logger) - if err != nil { - return nil, service.BatchPolicy{}, -1, err - } - return out, batchPolicy, maxInFlight, nil - }) - if err != nil { - panic(err) - } -} - var _ service.BatchOutput = &pooledInsertOutput{} type pooledInsertOutput struct { diff --git a/worker/pkg/benthos/sql/output_sql_update.go b/worker/pkg/benthos/sql/output_sql_update.go index 849dbcc43c..7444e60aeb 100644 --- a/worker/pkg/benthos/sql/output_sql_update.go +++ b/worker/pkg/benthos/sql/output_sql_update.go @@ -27,34 +27,6 @@ type DbPoolProvider interface { GetDb(driver, dsn string) (SqlDbtx, error) } -type dbPoolProvider struct { - pools map[string]*sql.DB - mu sync.Mutex -} - -func NewDbPoolProvider() *dbPoolProvider { - return &dbPoolProvider{ - pools: make(map[string]*sql.DB), - } -} -func (p *dbPoolProvider) GetDb(driver, dsn string) (SqlDbtx, error) { - p.mu.Lock() - defer p.mu.Unlock() - - if db, exists := p.pools[dsn]; exists { - return db, nil - } - - db, err := sql.Open(driver, dsn) - if err != nil { - return nil, err - } - db.SetMaxOpenConns(10) - - p.pools[dsn] = db - return db, nil -} - func sqlUpdateOutputSpec() *service.ConfigSpec { return service.NewConfigSpec(). Field(service.NewStringField("driver")). @@ -92,31 +64,6 @@ func RegisterPooledSqlUpdateOutput(env *service.Environment, dbprovider DbPoolPr ) } -func init() { - dbprovider := NewDbPoolProvider() - err := service.RegisterBatchOutput( - "pooled_sql_update", sqlUpdateOutputSpec(), - func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchOutput, service.BatchPolicy, int, error) { - batchPolicy, err := conf.FieldBatchPolicy("batching") - if err != nil { - return nil, batchPolicy, -1, err - } - - maxInFlight, err := conf.FieldInt("max_in_flight") - if err != nil { - return nil, service.BatchPolicy{}, -1, err - } - out, err := newUpdateOutput(conf, mgr, dbprovider) - if err != nil { - return nil, service.BatchPolicy{}, -1, err - } - return out, batchPolicy, maxInFlight, nil - }) - if err != nil { - panic(err) - } -} - var _ service.BatchOutput = &pooledUpdateOutput{} type pooledUpdateOutput struct { diff --git a/worker/pkg/workflows/datasync/activities/sync/activity.go b/worker/pkg/workflows/datasync/activities/sync/activity.go index e7fe0bc2c6..e13d1f5542 100644 --- a/worker/pkg/workflows/datasync/activities/sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync/activity.go @@ -21,10 +21,12 @@ import ( neosynclogger "github.com/nucleuscloud/neosync/backend/pkg/logger" "github.com/nucleuscloud/neosync/backend/pkg/metrics" "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" - connectiontunnelmanager "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager" - "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager/providers" - "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager/providers/mongoprovider" - "github.com/nucleuscloud/neosync/worker/internal/connection-tunnel-manager/providers/sqlprovider" + connectiontunnelmanager "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager" + pool_mongo_provider "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/pool/providers/mongo" + pool_sql_provider "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/pool/providers/sql" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers/mongoprovider" + "github.com/nucleuscloud/neosync/internal/connection-tunnel-manager/providers/sqlprovider" benthos_environment "github.com/nucleuscloud/neosync/worker/pkg/benthos/environment" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/redis" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" @@ -266,11 +268,11 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet slogger, benthos_environment.WithMeter(a.meter), benthos_environment.WithSqlConfig(&benthos_environment.SqlConfig{ - Provider: newSqlPoolProvider(getSqlPoolProviderGetter(tunnelmanager, &dsnToConnectionIdMap, connectionMap, session, slogger)), + Provider: pool_sql_provider.NewProvider(pool_sql_provider.GetSqlPoolProviderGetter(tunnelmanager, &dsnToConnectionIdMap, connectionMap, session, slogger)), IsRetry: isRetry, }), benthos_environment.WithMongoConfig(&benthos_environment.MongoConfig{ - Provider: newMongoPoolProvider(getMongoPoolProviderGetter(tunnelmanager, &dsnToConnectionIdMap, connectionMap, session, slogger)), + Provider: pool_mongo_provider.NewProvider(pool_mongo_provider.GetMongoPoolProviderGetter(tunnelmanager, &dsnToConnectionIdMap, connectionMap, session, slogger)), }), benthos_environment.WithStopChannel(stopActivityChan), benthos_environment.WithBlobEnv(bloblang.NewEnvironment()), From e9c6a78563a2a51001be2fe05c05cd146b8220fc Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:31:41 -0700 Subject: [PATCH 2/4] Adds connection limit --- cli/internal/cmds/neosync/sync/sync.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cli/internal/cmds/neosync/sync/sync.go b/cli/internal/cmds/neosync/sync/sync.go index 2c5ce1494f..112b40f4c2 100644 --- a/cli/internal/cmds/neosync/sync/sync.go +++ b/cli/internal/cmds/neosync/sync/sync.go @@ -45,6 +45,7 @@ import ( _ "github.com/nucleuscloud/neosync/cli/internal/benthos/inputs" benthos_environment "github.com/nucleuscloud/neosync/worker/pkg/benthos/environment" _ "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/warpstreamlabs/bento/public/bloblang" _ "github.com/warpstreamlabs/bento/public/components/aws" _ "github.com/warpstreamlabs/bento/public/components/io" @@ -717,6 +718,9 @@ func cmdConfigToDestinationConnection(cmd *cmdConfig) *mgmtv1alpha1.Connection { ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ Url: cmd.Destination.ConnectionUrl, }, + ConnectionOptions: &mgmtv1alpha1.SqlConnectionOptions{ + MaxConnectionLimit: shared.Ptr(int32(25)), + }, }, }, }, @@ -731,6 +735,9 @@ func cmdConfigToDestinationConnection(cmd *cmdConfig) *mgmtv1alpha1.Connection { ConnectionConfig: &mgmtv1alpha1.MysqlConnectionConfig_Url{ Url: cmd.Destination.ConnectionUrl, }, + ConnectionOptions: &mgmtv1alpha1.SqlConnectionOptions{ + MaxConnectionLimit: shared.Ptr(int32(25)), + }, }, }, }, @@ -745,6 +752,9 @@ func cmdConfigToDestinationConnection(cmd *cmdConfig) *mgmtv1alpha1.Connection { ConnectionConfig: &mgmtv1alpha1.MssqlConnectionConfig_Url{ Url: cmd.Destination.ConnectionUrl, }, + ConnectionOptions: &mgmtv1alpha1.SqlConnectionOptions{ + MaxConnectionLimit: shared.Ptr(int32(25)), + }, }, }, }, From 5f733acc34d27c5474164ffd239d6ad5c7db22c8 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:33:43 -0700 Subject: [PATCH 3/4] returns erro --- cli/internal/cmds/neosync/sync/ui.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/ui.go b/cli/internal/cmds/neosync/sync/ui.go index 8f2cab68e3..abbcb94bde 100644 --- a/cli/internal/cmds/neosync/sync/ui.go +++ b/cli/internal/cmds/neosync/sync/ui.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "os" "strings" syncmap "sync" "time" @@ -205,7 +204,7 @@ func runSync(ctx context.Context, outputType output.OutputType, benv *service.En } if _, err := tea.NewProgram(newModel(ctx, benv, groupedConfigs, logger, outputType), opts...).Run(); err != nil { logger.Error("Error syncing data:", err) - os.Exit(1) + return fmt.Errorf("unable to finish syncing data: %w", err) } return nil } From 59d57159515821ef4287c5b1c791023ad4c97453 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:01:28 -0700 Subject: [PATCH 4/4] updates cli sync to use slog.Logger --- cli/internal/cmds/neosync/sync/sync.go | 38 +++++++++++---------- cli/internal/cmds/neosync/sync/sync_test.go | 13 +++---- cli/internal/cmds/neosync/sync/ui.go | 21 ++++++------ 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/sync.go b/cli/internal/cmds/neosync/sync/sync.go index 112b40f4c2..ba3a474518 100644 --- a/cli/internal/cmds/neosync/sync/sync.go +++ b/cli/internal/cmds/neosync/sync/sync.go @@ -304,10 +304,12 @@ func sync( if cmd.Debug { logLevel = charmlog.DebugLevel } - logger := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ + charmlogger := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ ReportTimestamp: true, Level: logLevel, }) + logger := slog.New(charmlogger) + logger.Info("Starting sync") isAuthEnabled, err := auth.IsAuthEnabled(ctx) if err != nil { @@ -463,7 +465,7 @@ func sync( } }() benv, err := benthos_environment.NewEnvironment( - slog.Default(), + logger, benthos_environment.WithSqlConfig(&benthos_environment.SqlConfig{ Provider: pool_sql_provider.NewProvider(pool_sql_provider.GetSqlPoolProviderGetter( tunnelmanager, @@ -472,7 +474,7 @@ func sync( destConnection.Id: destConnection, }, session, - slog.Default(), + logger, )), IsRetry: false, }), @@ -604,7 +606,7 @@ func sync( } syncConfigCount := len(syncConfigs) - logger.Infof("Generating %d sync configs...", syncConfigCount) + logger.Info(fmt.Sprintf("Generating %d sync configs...", syncConfigCount)) configs := []*benthosConfigResponse{} for _, cfg := range syncConfigs { benthosConfig := generateBenthosConfig(cmd, sourceConnectionType, serverconfig.GetApiBaseUrl(), cfg, token) @@ -642,7 +644,7 @@ var ( streamBuilderMu syncmap.Mutex ) -func syncData(ctx context.Context, benv *service.Environment, cfg *benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) error { +func syncData(ctx context.Context, benv *service.Environment, cfg *benthosConfigResponse, logger *slog.Logger, outputType output.OutputType) error { configbits, err := yaml.Marshal(cfg.Config) if err != nil { return err @@ -677,7 +679,7 @@ func syncData(ctx context.Context, benv *service.Environment, cfg *benthosConfig streamBuilderMu.Lock() streambldr := benv.NewStreamBuilder() if outputType == output.PlainOutput { - streambldr.SetPrintLogger(logger.With("benthos", "true", "table", cfg.Table, "runType", runType).StandardLog()) + streambldr.SetLogger(logger.With("benthos", "true", "table", cfg.Table, "runType", runType)) } err = streambldr.SetYAML(string(configbits)) @@ -788,7 +790,7 @@ func cmdConfigToDestinationConnection(cmd *cmdConfig) *mgmtv1alpha1.Connection { func runDestinationInitStatements( ctx context.Context, - logger *charmlog.Logger, + logger *slog.Logger, sqlmanagerclient sqlmanager.SqlManagerClient, cmd *cmdConfig, syncConfigs []*tabledependency.RunConfig, @@ -803,13 +805,13 @@ func runDestinationInitStatements( if cmd.Destination.InitSchema { if len(schemaConfig.InitSchemaStatements) != 0 { for _, block := range schemaConfig.InitSchemaStatements { - logger.Infof("[%s] found %d statements to execute during schema initialization", block.Label, len(block.Statements)) + logger.Info(fmt.Sprintf("[%s] found %d statements to execute during schema initialization", block.Label, len(block.Statements))) if len(block.Statements) == 0 { continue } err = db.Db.BatchExec(ctx, batchSize, block.Statements, &sql_manager.BatchExecOpts{}) if err != nil { - logger.Error("Error creating tables:", err) + logger.Error(fmt.Sprintf("Error creating tables: %v", err)) return fmt.Errorf("unable to exec pg %s statements: %w", block.Label, err) } } @@ -829,7 +831,7 @@ func runDestinationInitStatements( err = db.Db.BatchExec(ctx, batchSize, orderedInitStatements, &sql_manager.BatchExecOpts{}) if err != nil { - logger.Error("Error creating tables:", err) + logger.Error(fmt.Sprintf("Error creating tables: %v", err)) return err } } @@ -845,7 +847,7 @@ func runDestinationInitStatements( } err = db.Db.BatchExec(ctx, batchSize, truncateCascadeStmts, &sql_manager.BatchExecOpts{}) if err != nil { - logger.Error("Error truncate cascade tables:", err) + logger.Error(fmt.Sprintf("Error truncate cascade tables: %v", err)) return err } } else if cmd.Destination.TruncateBeforeInsert { @@ -859,7 +861,7 @@ func runDestinationInitStatements( } err = db.Db.Exec(ctx, orderedTruncateStatement) if err != nil { - logger.Error("Error truncating tables:", err) + logger.Error(fmt.Sprintf("Error truncating tables: %v", err)) return err } } @@ -875,7 +877,7 @@ func runDestinationInitStatements( disableFkChecks := sql_manager.DisableForeignKeyChecks err = db.Db.BatchExec(ctx, batchSize, orderedTableTruncateStatements, &sql_manager.BatchExecOpts{Prefix: &disableFkChecks}) if err != nil { - logger.Error("Error truncating tables:", err) + logger.Error(fmt.Sprintf("Error truncating tables: %v", err)) return err } } @@ -884,7 +886,7 @@ func runDestinationInitStatements( func buildSyncConfigs( schemaConfig *schemaConfig, - logger *charmlog.Logger, + logger *slog.Logger, ) []*tabledependency.RunConfig { tableColMap := getTableColMap(schemaConfig.Schemas) if len(tableColMap) == 0 { @@ -921,7 +923,7 @@ func buildDependencyMap(syncConfigs []*tabledependency.RunConfig) map[string][]s func getTableInitStatementMap( ctx context.Context, - logger *charmlog.Logger, + logger *slog.Logger, connectiondataclient mgmtv1alpha1connect.ConnectionDataServiceClient, connectionId string, opts *sqlDestinationConfig, @@ -1073,7 +1075,7 @@ func generateBenthosConfig( Columns: syncConfig.InsertColumns(), } } -func groupConfigsByDependency(configs []*benthosConfigResponse, logger *charmlog.Logger) [][]*benthosConfigResponse { +func groupConfigsByDependency(configs []*benthosConfigResponse, logger *slog.Logger) [][]*benthosConfigResponse { groupedConfigs := [][]*benthosConfigResponse{} configMap := map[string]*benthosConfigResponse{} queuedMap := map[string][]string{} // map -> table to cols @@ -1145,7 +1147,7 @@ type schemaConfig struct { func getConnectionSchemaConfig( ctx context.Context, - logger *charmlog.Logger, + logger *slog.Logger, connectiondataclient mgmtv1alpha1connect.ConnectionDataServiceClient, connection *mgmtv1alpha1.Connection, cmd *cmdConfig, @@ -1230,7 +1232,7 @@ func getDestinationSchemaConfig( connection *mgmtv1alpha1.Connection, cmd *cmdConfig, sc *mgmtv1alpha1.ConnectionSchemaConfig, - logger *charmlog.Logger, + logger *slog.Logger, ) (*schemaConfig, error) { schemaResp, err := connectiondataclient.GetConnectionSchema(ctx, connect.NewRequest(&mgmtv1alpha1.GetConnectionSchemaRequest{ ConnectionId: connection.Id, diff --git a/cli/internal/cmds/neosync/sync/sync_test.go b/cli/internal/cmds/neosync/sync/sync_test.go index b2dc4f90e0..a7163ec4be 100644 --- a/cli/internal/cmds/neosync/sync/sync_test.go +++ b/cli/internal/cmds/neosync/sync/sync_test.go @@ -1,10 +1,10 @@ package sync_cmd import ( - "os" + "io" + "log/slog" "testing" - charmlog "github.com/charmbracelet/log" tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency" "github.com/stretchr/testify/require" ) @@ -103,9 +103,7 @@ func Test_groupConfigsByDependency(t *testing.T) { }, } - logger := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ - ReportTimestamp: true, - }) + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -133,10 +131,7 @@ func Test_groupConfigsByDependency_Error(t *testing.T) { {Name: "public.b", DependsOn: []*tabledependency.DependsOn{{Table: "public.c", Columns: []string{"id"}}}, Table: "public.b", Columns: []string{"id", "c_id"}}, {Name: "public.c", DependsOn: []*tabledependency.DependsOn{{Table: "public.a", Columns: []string{"id"}}}, Table: "public.c", Columns: []string{"id", "a_id"}}, } - logger := charmlog.NewWithOptions(os.Stderr, charmlog.Options{ - ReportTimestamp: true, - }) - groups := groupConfigsByDependency(configs, logger) + groups := groupConfigsByDependency(configs, slog.New(slog.NewTextHandler(io.Discard, nil))) require.Nil(t, groups) } diff --git a/cli/internal/cmds/neosync/sync/ui.go b/cli/internal/cmds/neosync/sync/ui.go index abbcb94bde..7535f9ed47 100644 --- a/cli/internal/cmds/neosync/sync/ui.go +++ b/cli/internal/cmds/neosync/sync/ui.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "log/slog" "strings" syncmap "sync" "time" @@ -22,12 +23,11 @@ import ( "github.com/charmbracelet/bubbles/spinner" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" - charmlog "github.com/charmbracelet/log" ) type model struct { ctx context.Context - logger *charmlog.Logger + logger *slog.Logger benv *service.Environment groupedConfigs [][]*benthosConfigResponse tableSynced int @@ -51,7 +51,7 @@ var ( durationStyle = dotStyle ) -func newModel(ctx context.Context, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger, outputType output.OutputType) *model { +func newModel(ctx context.Context, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *slog.Logger, outputType output.OutputType) *model { s := spinner.New() s.Style = lipgloss.NewStyle().Foreground(lipgloss.Color("63")) return &model{ @@ -87,7 +87,7 @@ func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } if m.totalConfigCount == m.tableSynced { m.done = true - m.logger.Infof("Done! Completed %d tables.", m.tableSynced) + m.logger.Info(fmt.Sprintf("Done! Completed %d tables.", m.tableSynced)) return m, tea.Sequence( tea.Println(strings.Join(successStrs, " \n")), tea.Quit, @@ -146,7 +146,7 @@ func (m *model) syncConfigs(ctx context.Context, benv *service.Environment, conf cfg := cfg errgrp.Go(func() error { start := time.Now() - m.logger.Infof("Syncing table %s", cfg.Name) + m.logger.Info(fmt.Sprintf("Syncing table %s", cfg.Name)) err := syncData(errctx, benv, cfg, m.logger, m.outputType) if err != nil { fmt.Printf("Error syncing table: %s", err.Error()) //nolint:forbidigo @@ -154,7 +154,7 @@ func (m *model) syncConfigs(ctx context.Context, benv *service.Environment, conf } duration := time.Since(start) messageMap.Store(cfg.Name, duration) - m.logger.Infof("Finished syncing table %s %s", cfg.Name, duration.String()) + m.logger.Info(fmt.Sprintf("Finished syncing table %s %s", cfg.Name, duration.String())) return nil }) } @@ -192,18 +192,19 @@ func getConfigCount(groupedConfigs [][]*benthosConfigResponse) int { return count } -func runSync(ctx context.Context, outputType output.OutputType, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *charmlog.Logger) error { +func runSync(ctx context.Context, outputType output.OutputType, benv *service.Environment, groupedConfigs [][]*benthosConfigResponse, logger *slog.Logger) error { var opts []tea.ProgramOption + var synclogger = logger if outputType == output.PlainOutput { // Plain mode don't render the TUI opts = []tea.ProgramOption{tea.WithoutRenderer(), tea.WithInput(nil)} } else { fmt.Println(bold.Render(" \n Completed Tables")) //nolint:forbidigo // TUI mode, discard log output - logger.SetOutput(io.Discard) + synclogger = slog.New(slog.NewJSONHandler(io.Discard, nil)) } - if _, err := tea.NewProgram(newModel(ctx, benv, groupedConfigs, logger, outputType), opts...).Run(); err != nil { - logger.Error("Error syncing data:", err) + if _, err := tea.NewProgram(newModel(ctx, benv, groupedConfigs, synclogger, outputType), opts...).Run(); err != nil { + logger.Error(fmt.Sprintf("Error syncing data: %v", err)) return fmt.Errorf("unable to finish syncing data: %w", err) } return nil