diff --git a/components/fctl/cmd/wallets/balances/create.go b/components/fctl/cmd/wallets/balances/create.go index 8b11bf02ce..32e6913ee6 100644 --- a/components/fctl/cmd/wallets/balances/create.go +++ b/components/fctl/cmd/wallets/balances/create.go @@ -2,6 +2,8 @@ package balances import ( "fmt" + "math/big" + "github.com/formancehq/fctl/cmd/wallets/internal" fctl "github.com/formancehq/fctl/pkg" "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" @@ -9,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/pterm/pterm" "github.com/spf13/cobra" - "math/big" ) type CreateStore struct { diff --git a/components/ledger/libs/go.mod b/components/ledger/libs/go.mod index 8df9a98978..823c15a392 100644 --- a/components/ledger/libs/go.mod +++ b/components/ledger/libs/go.mod @@ -6,7 +6,6 @@ toolchain go1.21.5 require ( github.com/IBM/sarama v1.42.1 - github.com/Shopify/sarama v1.38.1 github.com/ThreeDotsLabs/watermill v1.3.5 github.com/ThreeDotsLabs/watermill-http/v2 v2.1.0 github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.0 diff --git a/components/ledger/libs/go.sum b/components/ledger/libs/go.sum index 1cb227df56..c8e77c8547 100644 --- a/components/ledger/libs/go.sum +++ b/components/ledger/libs/go.sum @@ -47,10 +47,6 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= -github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= -github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= -github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0= github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4= github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= diff --git a/components/ledger/libs/publish/cli.go b/components/ledger/libs/publish/cli.go index 21bf296a4f..0032d8a1c7 100644 --- a/components/ledger/libs/publish/cli.go +++ b/components/ledger/libs/publish/cli.go @@ -96,6 +96,16 @@ func InitCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) { cmd.PersistentFlags().Int(PublisherKafkaSASLScramSHASizeFlag, values.PublisherKafkaSASLScramSHASize, "SASL SCRAM SHA size") cmd.PersistentFlags().Bool(PublisherKafkaTLSEnabledFlag, values.PublisherKafkaTLSEnabled, "Enable TLS to connect on kafka") + // NATS + InitNatsCliFlags(cmd, options...) +} + +func InitNatsCliFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) { + values := defaultConfigValues + for _, option := range options { + option(&values) + } + // NATS cmd.PersistentFlags().Bool(PublisherNatsEnabledFlag, values.PublisherNatsEnabled, "Publish write events to nats") cmd.PersistentFlags().String(PublisherNatsClientIDFlag, values.PublisherNatsClientID, "Nats client ID") diff --git a/components/ledger/libs/publish/nats.go b/components/ledger/libs/publish/nats.go index dd6bd1eb66..f8857c797a 100644 --- a/components/ledger/libs/publish/nats.go +++ b/components/ledger/libs/publish/nats.go @@ -40,20 +40,11 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option { } return fx.Options( fx.Provide(NewNatsConn), - fx.Provide(newNatsDefaultCallbacks), + fx.Provide(NewNatsDefaultCallbacks), fx.Provide(NewNatsPublisherWithConn), fx.Provide(NewNatsSubscriberWithConn), fx.Provide(func(natsCallbacks NATSCallbacks) wNats.PublisherConfig { - natsOptions = append(natsOptions, - nats.ConnectHandler(natsCallbacks.ConnectedCB), - nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB), - nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB), - nats.ErrorHandler(natsCallbacks.AsyncErrorCB), - nats.ReconnectHandler(natsCallbacks.ReconnectedCB), - nats.DisconnectHandler(natsCallbacks.DisconnectedCB), - nats.ClosedHandler(natsCallbacks.ClosedCB), - ) - + natsOptions = AppendCallBacks(natsOptions, natsCallbacks) return wNats.PublisherConfig{ NatsOptions: natsOptions, URL: url, @@ -63,16 +54,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option { } }), fx.Provide(func(natsCallbacks NATSCallbacks) wNats.SubscriberConfig { - natsOptions = append(natsOptions, - nats.ConnectHandler(natsCallbacks.ConnectedCB), - nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB), - nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB), - nats.ErrorHandler(natsCallbacks.AsyncErrorCB), - nats.ReconnectHandler(natsCallbacks.ReconnectedCB), - nats.DisconnectHandler(natsCallbacks.DisconnectedCB), - nats.ClosedHandler(natsCallbacks.ClosedCB), - ) - + natsOptions = AppendCallBacks(natsOptions, natsCallbacks) return wNats.SubscriberConfig{ NatsOptions: natsOptions, Unmarshaler: &wNats.NATSMarshaler{}, @@ -106,43 +88,55 @@ type NATSCallbacks interface { AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) } -type natsDefaultCallbacks struct { +func AppendCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option { + return append(natsOptions, + nats.ConnectHandler(c.ConnectedCB), + nats.DisconnectErrHandler(c.DisconnectedErrCB), + nats.DiscoveredServersHandler(c.DiscoveredServersCB), + nats.ErrorHandler(c.AsyncErrorCB), + nats.ReconnectHandler(c.ReconnectedCB), + nats.DisconnectHandler(c.DisconnectedCB), + nats.ClosedHandler(c.ClosedCB), + ) +} + +type NatsDefaultCallbacks struct { logger logging.Logger shutdowner fx.Shutdowner } -func newNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks { - return &natsDefaultCallbacks{ +func NewNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks { + return &NatsDefaultCallbacks{ logger: logger, shutdowner: shutdowner, } } -func (c *natsDefaultCallbacks) ClosedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ClosedCB(nc *nats.Conn) { c.logger.Infof("nats connection closed: %s", nc.Opts.Name) c.shutdowner.Shutdown() } -func (c *natsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection disconnected: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) { c.logger.Infof("nats server discovered: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection reconnected: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) { +func (c *NatsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) { c.logger.Errorf("nats connection disconnected error for %s: %v", nc.Opts.Name, err) } -func (c *natsDefaultCallbacks) ConnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ConnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection done: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) { +func (c *NatsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) { c.logger.Errorf("nats async error for %s with subject %s: %v", nc.Opts.Name, sub.Subject, err) } diff --git a/libs/go-libs/go.mod b/libs/go-libs/go.mod index 8df9a98978..823c15a392 100644 --- a/libs/go-libs/go.mod +++ b/libs/go-libs/go.mod @@ -6,7 +6,6 @@ toolchain go1.21.5 require ( github.com/IBM/sarama v1.42.1 - github.com/Shopify/sarama v1.38.1 github.com/ThreeDotsLabs/watermill v1.3.5 github.com/ThreeDotsLabs/watermill-http/v2 v2.1.0 github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.0 diff --git a/libs/go-libs/go.sum b/libs/go-libs/go.sum index 1cb227df56..c8e77c8547 100644 --- a/libs/go-libs/go.sum +++ b/libs/go-libs/go.sum @@ -47,10 +47,6 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= -github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= -github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= -github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0= github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4= github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= diff --git a/libs/go-libs/publish/cli.go b/libs/go-libs/publish/cli.go index 21bf296a4f..591339fc87 100644 --- a/libs/go-libs/publish/cli.go +++ b/libs/go-libs/publish/cli.go @@ -96,6 +96,14 @@ func InitCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) { cmd.PersistentFlags().Int(PublisherKafkaSASLScramSHASizeFlag, values.PublisherKafkaSASLScramSHASize, "SASL SCRAM SHA size") cmd.PersistentFlags().Bool(PublisherKafkaTLSEnabledFlag, values.PublisherKafkaTLSEnabled, "Enable TLS to connect on kafka") + InitNatsCLIFlags(cmd, options...) +} + +func InitNatsCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) { + values := defaultConfigValues + for _, option := range options { + option(&values) + } // NATS cmd.PersistentFlags().Bool(PublisherNatsEnabledFlag, values.PublisherNatsEnabled, "Publish write events to nats") cmd.PersistentFlags().String(PublisherNatsClientIDFlag, values.PublisherNatsClientID, "Nats client ID") diff --git a/libs/go-libs/publish/nats.go b/libs/go-libs/publish/nats.go index dd6bd1eb66..07f3d5ca8c 100644 --- a/libs/go-libs/publish/nats.go +++ b/libs/go-libs/publish/nats.go @@ -40,20 +40,11 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option { } return fx.Options( fx.Provide(NewNatsConn), - fx.Provide(newNatsDefaultCallbacks), + fx.Provide(NewNatsDefaultCallbacks), fx.Provide(NewNatsPublisherWithConn), fx.Provide(NewNatsSubscriberWithConn), fx.Provide(func(natsCallbacks NATSCallbacks) wNats.PublisherConfig { - natsOptions = append(natsOptions, - nats.ConnectHandler(natsCallbacks.ConnectedCB), - nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB), - nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB), - nats.ErrorHandler(natsCallbacks.AsyncErrorCB), - nats.ReconnectHandler(natsCallbacks.ReconnectedCB), - nats.DisconnectHandler(natsCallbacks.DisconnectedCB), - nats.ClosedHandler(natsCallbacks.ClosedCB), - ) - + natsOptions = AppendNatsCallBacks(natsOptions, natsCallbacks) return wNats.PublisherConfig{ NatsOptions: natsOptions, URL: url, @@ -63,16 +54,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option { } }), fx.Provide(func(natsCallbacks NATSCallbacks) wNats.SubscriberConfig { - natsOptions = append(natsOptions, - nats.ConnectHandler(natsCallbacks.ConnectedCB), - nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB), - nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB), - nats.ErrorHandler(natsCallbacks.AsyncErrorCB), - nats.ReconnectHandler(natsCallbacks.ReconnectedCB), - nats.DisconnectHandler(natsCallbacks.DisconnectedCB), - nats.ClosedHandler(natsCallbacks.ClosedCB), - ) - + natsOptions = AppendNatsCallBacks(natsOptions, natsCallbacks) return wNats.SubscriberConfig{ NatsOptions: natsOptions, Unmarshaler: &wNats.NATSMarshaler{}, @@ -106,43 +88,55 @@ type NATSCallbacks interface { AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) } -type natsDefaultCallbacks struct { +func AppendNatsCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option { + return append(natsOptions, + nats.ConnectHandler(c.ConnectedCB), + nats.DisconnectErrHandler(c.DisconnectedErrCB), + nats.DiscoveredServersHandler(c.DiscoveredServersCB), + nats.ErrorHandler(c.AsyncErrorCB), + nats.ReconnectHandler(c.ReconnectedCB), + nats.DisconnectHandler(c.DisconnectedCB), + nats.ClosedHandler(c.ClosedCB), + ) +} + +type NatsDefaultCallbacks struct { logger logging.Logger shutdowner fx.Shutdowner } -func newNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks { - return &natsDefaultCallbacks{ +func NewNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks { + return &NatsDefaultCallbacks{ logger: logger, shutdowner: shutdowner, } } -func (c *natsDefaultCallbacks) ClosedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ClosedCB(nc *nats.Conn) { c.logger.Infof("nats connection closed: %s", nc.Opts.Name) c.shutdowner.Shutdown() } -func (c *natsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection disconnected: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) { c.logger.Infof("nats server discovered: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection reconnected: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) { +func (c *NatsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) { c.logger.Errorf("nats connection disconnected error for %s: %v", nc.Opts.Name, err) } -func (c *natsDefaultCallbacks) ConnectedCB(nc *nats.Conn) { +func (c *NatsDefaultCallbacks) ConnectedCB(nc *nats.Conn) { c.logger.Infof("nats connection done: %s", nc.Opts.Name) } -func (c *natsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) { +func (c *NatsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) { c.logger.Errorf("nats async error for %s with subject %s: %v", nc.Opts.Name, sub.Subject, err) }