Skip to content

Commit

Permalink
feat: export + init cli flags
Browse files Browse the repository at this point in the history
  • Loading branch information
David Ragot committed Feb 26, 2024
1 parent 693044a commit 91365c3
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 73 deletions.
3 changes: 2 additions & 1 deletion components/fctl/cmd/wallets/balances/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package balances

import (
"fmt"
"math/big"

"github.com/formancehq/fctl/cmd/wallets/internal"
fctl "github.com/formancehq/fctl/pkg"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/operations"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/shared"
"github.com/pkg/errors"
"github.com/pterm/pterm"
"github.com/spf13/cobra"
"math/big"
)

type CreateStore struct {
Expand Down
1 change: 0 additions & 1 deletion components/ledger/libs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ toolchain go1.21.5

require (
github.com/IBM/sarama v1.42.1
github.com/Shopify/sarama v1.38.1
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-http/v2 v2.1.0
github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.0
Expand Down
4 changes: 0 additions & 4 deletions components/ledger/libs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
Expand Down
10 changes: 10 additions & 0 deletions components/ledger/libs/publish/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func InitCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
cmd.PersistentFlags().Int(PublisherKafkaSASLScramSHASizeFlag, values.PublisherKafkaSASLScramSHASize, "SASL SCRAM SHA size")
cmd.PersistentFlags().Bool(PublisherKafkaTLSEnabledFlag, values.PublisherKafkaTLSEnabled, "Enable TLS to connect on kafka")

// NATS
InitNatsCliFlags(cmd, options...)
}

func InitNatsCliFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
values := defaultConfigValues
for _, option := range options {
option(&values)
}

// NATS
cmd.PersistentFlags().Bool(PublisherNatsEnabledFlag, values.PublisherNatsEnabled, "Publish write events to nats")
cmd.PersistentFlags().String(PublisherNatsClientIDFlag, values.PublisherNatsClientID, "Nats client ID")
Expand Down
56 changes: 25 additions & 31 deletions components/ledger/libs/publish/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,11 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
}
return fx.Options(
fx.Provide(NewNatsConn),
fx.Provide(newNatsDefaultCallbacks),
fx.Provide(NewNatsDefaultCallbacks),
fx.Provide(NewNatsPublisherWithConn),
fx.Provide(NewNatsSubscriberWithConn),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.PublisherConfig {
natsOptions = append(natsOptions,
nats.ConnectHandler(natsCallbacks.ConnectedCB),
nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB),
nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB),
nats.ErrorHandler(natsCallbacks.AsyncErrorCB),
nats.ReconnectHandler(natsCallbacks.ReconnectedCB),
nats.DisconnectHandler(natsCallbacks.DisconnectedCB),
nats.ClosedHandler(natsCallbacks.ClosedCB),
)

natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
return wNats.PublisherConfig{
NatsOptions: natsOptions,
URL: url,
Expand All @@ -63,16 +54,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
}
}),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.SubscriberConfig {
natsOptions = append(natsOptions,
nats.ConnectHandler(natsCallbacks.ConnectedCB),
nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB),
nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB),
nats.ErrorHandler(natsCallbacks.AsyncErrorCB),
nats.ReconnectHandler(natsCallbacks.ReconnectedCB),
nats.DisconnectHandler(natsCallbacks.DisconnectedCB),
nats.ClosedHandler(natsCallbacks.ClosedCB),
)

natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
return wNats.SubscriberConfig{
NatsOptions: natsOptions,
Unmarshaler: &wNats.NATSMarshaler{},
Expand Down Expand Up @@ -106,43 +88,55 @@ type NATSCallbacks interface {
AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error)
}

type natsDefaultCallbacks struct {
func AppendCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option {
return append(natsOptions,
nats.ConnectHandler(c.ConnectedCB),
nats.DisconnectErrHandler(c.DisconnectedErrCB),
nats.DiscoveredServersHandler(c.DiscoveredServersCB),
nats.ErrorHandler(c.AsyncErrorCB),
nats.ReconnectHandler(c.ReconnectedCB),
nats.DisconnectHandler(c.DisconnectedCB),
nats.ClosedHandler(c.ClosedCB),
)
}

type NatsDefaultCallbacks struct {
logger logging.Logger
shutdowner fx.Shutdowner
}

func newNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks {
return &natsDefaultCallbacks{
func NewNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks {
return &NatsDefaultCallbacks{
logger: logger,
shutdowner: shutdowner,
}
}

func (c *natsDefaultCallbacks) ClosedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ClosedCB(nc *nats.Conn) {
c.logger.Infof("nats connection closed: %s", nc.Opts.Name)
c.shutdowner.Shutdown()
}

func (c *natsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection disconnected: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) {
c.logger.Infof("nats server discovered: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection reconnected: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) {
func (c *NatsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) {
c.logger.Errorf("nats connection disconnected error for %s: %v", nc.Opts.Name, err)
}

func (c *natsDefaultCallbacks) ConnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ConnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection done: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) {
func (c *NatsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) {
c.logger.Errorf("nats async error for %s with subject %s: %v", nc.Opts.Name, sub.Subject, err)
}
1 change: 0 additions & 1 deletion libs/go-libs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ toolchain go1.21.5

require (
github.com/IBM/sarama v1.42.1
github.com/Shopify/sarama v1.38.1
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-http/v2 v2.1.0
github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.0
Expand Down
4 changes: 0 additions & 4 deletions libs/go-libs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
Expand Down
8 changes: 8 additions & 0 deletions libs/go-libs/publish/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func InitCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
cmd.PersistentFlags().Int(PublisherKafkaSASLScramSHASizeFlag, values.PublisherKafkaSASLScramSHASize, "SASL SCRAM SHA size")
cmd.PersistentFlags().Bool(PublisherKafkaTLSEnabledFlag, values.PublisherKafkaTLSEnabled, "Enable TLS to connect on kafka")

InitNatsCLIFlags(cmd, options...)
}

func InitNatsCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
values := defaultConfigValues
for _, option := range options {
option(&values)
}
// NATS
cmd.PersistentFlags().Bool(PublisherNatsEnabledFlag, values.PublisherNatsEnabled, "Publish write events to nats")
cmd.PersistentFlags().String(PublisherNatsClientIDFlag, values.PublisherNatsClientID, "Nats client ID")
Expand Down
56 changes: 25 additions & 31 deletions libs/go-libs/publish/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,11 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
}
return fx.Options(
fx.Provide(NewNatsConn),
fx.Provide(newNatsDefaultCallbacks),
fx.Provide(NewNatsDefaultCallbacks),
fx.Provide(NewNatsPublisherWithConn),
fx.Provide(NewNatsSubscriberWithConn),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.PublisherConfig {
natsOptions = append(natsOptions,
nats.ConnectHandler(natsCallbacks.ConnectedCB),
nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB),
nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB),
nats.ErrorHandler(natsCallbacks.AsyncErrorCB),
nats.ReconnectHandler(natsCallbacks.ReconnectedCB),
nats.DisconnectHandler(natsCallbacks.DisconnectedCB),
nats.ClosedHandler(natsCallbacks.ClosedCB),
)

natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
return wNats.PublisherConfig{
NatsOptions: natsOptions,
URL: url,
Expand All @@ -63,16 +54,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
}
}),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.SubscriberConfig {
natsOptions = append(natsOptions,
nats.ConnectHandler(natsCallbacks.ConnectedCB),
nats.DisconnectErrHandler(natsCallbacks.DisconnectedErrCB),
nats.DiscoveredServersHandler(natsCallbacks.DiscoveredServersCB),
nats.ErrorHandler(natsCallbacks.AsyncErrorCB),
nats.ReconnectHandler(natsCallbacks.ReconnectedCB),
nats.DisconnectHandler(natsCallbacks.DisconnectedCB),
nats.ClosedHandler(natsCallbacks.ClosedCB),
)

natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
return wNats.SubscriberConfig{
NatsOptions: natsOptions,
Unmarshaler: &wNats.NATSMarshaler{},
Expand Down Expand Up @@ -106,43 +88,55 @@ type NATSCallbacks interface {
AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error)
}

type natsDefaultCallbacks struct {
func AppendCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option {
return append(natsOptions,
nats.ConnectHandler(c.ConnectedCB),
nats.DisconnectErrHandler(c.DisconnectedErrCB),
nats.DiscoveredServersHandler(c.DiscoveredServersCB),
nats.ErrorHandler(c.AsyncErrorCB),
nats.ReconnectHandler(c.ReconnectedCB),
nats.DisconnectHandler(c.DisconnectedCB),
nats.ClosedHandler(c.ClosedCB),
)
}

type NatsDefaultCallbacks struct {
logger logging.Logger
shutdowner fx.Shutdowner
}

func newNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks {
return &natsDefaultCallbacks{
func NewNatsDefaultCallbacks(logger logging.Logger, shutdowner fx.Shutdowner) NATSCallbacks {
return &NatsDefaultCallbacks{
logger: logger,
shutdowner: shutdowner,
}
}

func (c *natsDefaultCallbacks) ClosedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ClosedCB(nc *nats.Conn) {
c.logger.Infof("nats connection closed: %s", nc.Opts.Name)
c.shutdowner.Shutdown()
}

func (c *natsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) DisconnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection disconnected: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) DiscoveredServersCB(nc *nats.Conn) {
c.logger.Infof("nats server discovered: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ReconnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection reconnected: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) {
func (c *NatsDefaultCallbacks) DisconnectedErrCB(nc *nats.Conn, err error) {
c.logger.Errorf("nats connection disconnected error for %s: %v", nc.Opts.Name, err)
}

func (c *natsDefaultCallbacks) ConnectedCB(nc *nats.Conn) {
func (c *NatsDefaultCallbacks) ConnectedCB(nc *nats.Conn) {
c.logger.Infof("nats connection done: %s", nc.Opts.Name)
}

func (c *natsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) {
func (c *NatsDefaultCallbacks) AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error) {
c.logger.Errorf("nats async error for %s with subject %s: %v", nc.Opts.Name, sub.Subject, err)
}

0 comments on commit 91365c3

Please sign in to comment.