From ad363ec1d5162b15aef88a404487a1aa2767608a Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 10 Sep 2024 15:36:31 -0700 Subject: [PATCH 1/2] Changes to better enable lifecycles --- p2p/host/blank/blank.go | 112 +++++++++++++++++++++++++++----------- p2p/protocol/ping/ping.go | 8 +++ 2 files changed, 87 insertions(+), 33 deletions(-) diff --git a/p2p/host/blank/blank.go b/p2p/host/blank/blank.go index 0cf6642f69..e87d27161e 100644 --- a/p2p/host/blank/blank.go +++ b/p2p/host/blank/blank.go @@ -24,20 +24,24 @@ import ( var log = logging.Logger("blankhost") -// BlankHost is the thinnest implementation of the host.Host interface +// BlankHost is a thin implementation of the host.Host interface type BlankHost struct { - n network.Network - mux *mstream.MultistreamMuxer[protocol.ID] - cmgr connmgr.ConnManager - eventbus event.Bus - emitters struct { + N network.Network + M *mstream.MultistreamMuxer[protocol.ID] + E event.Bus + ConnMgr connmgr.ConnManager + // SkipInitSignedRecord is a flag to skip the initialization of a signed record for the host + SkipInitSignedRecord bool + emitters struct { evtLocalProtocolsUpdated event.Emitter } + onStop []func() error } type config struct { - cmgr connmgr.ConnManager - eventBus event.Bus + cmgr connmgr.ConnManager + eventBus event.Bus + skipInitSignedRecord bool } type Option = func(cfg *config) @@ -54,6 +58,12 @@ func WithEventBus(eventBus event.Bus) Option { } } +func SkipInitSignedRecord(eventBus event.Bus) Option { + return func(cfg *config) { + cfg.skipInitSignedRecord = true + } +} + func NewBlankHost(n network.Network, options ...Option) *BlankHost { cfg := config{ cmgr: &connmgr.NullConnMgr{}, @@ -63,36 +73,72 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost { } bh := &BlankHost{ - n: n, - cmgr: cfg.cmgr, - mux: mstream.NewMultistreamMuxer[protocol.ID](), - eventbus: cfg.eventBus, + N: n, + ConnMgr: cfg.cmgr, + M: mstream.NewMultistreamMuxer[protocol.ID](), + E: cfg.eventBus, + + SkipInitSignedRecord: cfg.skipInitSignedRecord, + } + + if err := bh.Start(); err != nil { + log.Errorf("error creating blank host, err=%s", err) + return nil } - if bh.eventbus == nil { - bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) + + return bh +} + +func (bh *BlankHost) Start() error { + if bh.E == nil { + bh.E = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) } // subscribe the connection manager to network notifications (has no effect with NullConnMgr) - n.Notify(bh.cmgr.Notifee()) + notifee := bh.ConnMgr.Notifee() + bh.N.Notify(notifee) + bh.onStop = append(bh.onStop, func() error { + bh.N.StopNotify(notifee) + return nil + }) var err error - if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { - return nil + if bh.emitters.evtLocalProtocolsUpdated, err = bh.E.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { + return err } + bh.onStop = append(bh.onStop, func() error { + bh.emitters.evtLocalProtocolsUpdated.Close() + return nil + }) - n.SetStreamHandler(bh.newStreamHandler) + bh.N.SetStreamHandler(bh.newStreamHandler) + bh.onStop = append(bh.onStop, func() error { + bh.N.SetStreamHandler(func(s network.Stream) { s.Reset() }) + return nil + }) // persist a signed peer record for self to the peerstore. - if err := bh.initSignedRecord(); err != nil { - log.Errorf("error creating blank host, err=%s", err) - return nil + if !bh.SkipInitSignedRecord { + if err := bh.initSignedRecord(); err != nil { + log.Errorf("error creating blank host, err=%s", err) + return err + } } - return bh + return nil +} + +func (bh *BlankHost) Stop() error { + var err error + for _, f := range bh.onStop { + err = errors.Join(err, f()) + } + bh.onStop = nil + return err } func (bh *BlankHost) initSignedRecord() error { - cab, ok := peerstore.GetCertifiedAddrBook(bh.n.Peerstore()) + cab, ok := peerstore.GetCertifiedAddrBook(bh.N.Peerstore()) if !ok { log.Error("peerstore does not support signed records") return errors.New("peerstore does not support signed records") @@ -114,7 +160,7 @@ func (bh *BlankHost) initSignedRecord() error { var _ host.Host = (*BlankHost)(nil) func (bh *BlankHost) Addrs() []ma.Multiaddr { - addrs, err := bh.n.InterfaceListenAddresses() + addrs, err := bh.N.InterfaceListenAddresses() if err != nil { log.Debug("error retrieving network interface addrs: ", err) return nil @@ -124,14 +170,14 @@ func (bh *BlankHost) Addrs() []ma.Multiaddr { } func (bh *BlankHost) Close() error { - return bh.n.Close() + return bh.N.Close() } func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error { // absorb addresses into peerstore bh.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL) - cs := bh.n.ConnsToPeer(ai.ID) + cs := bh.N.ConnsToPeer(ai.ID) if len(cs) > 0 { return nil } @@ -144,15 +190,15 @@ func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error { } func (bh *BlankHost) Peerstore() peerstore.Peerstore { - return bh.n.Peerstore() + return bh.N.Peerstore() } func (bh *BlankHost) ID() peer.ID { - return bh.n.LocalPeer() + return bh.N.LocalPeer() } func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protocol.ID) (network.Stream, error) { - s, err := bh.n.NewStream(ctx, p) + s, err := bh.N.NewStream(ctx, p) if err != nil { return nil, fmt.Errorf("failed to open stream: %w", err) } @@ -216,18 +262,18 @@ func (bh *BlankHost) newStreamHandler(s network.Stream) { // TODO: i'm not sure this really needs to be here func (bh *BlankHost) Mux() protocol.Switch { - return bh.mux + return bh.M } // TODO: also not sure this fits... Might be better ways around this (leaky abstractions) func (bh *BlankHost) Network() network.Network { - return bh.n + return bh.N } func (bh *BlankHost) ConnManager() connmgr.ConnManager { - return bh.cmgr + return bh.ConnMgr } func (bh *BlankHost) EventBus() event.Bus { - return bh.eventbus + return bh.E } diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index a846f40499..c8d72e95d9 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -39,6 +39,14 @@ func NewPingService(h host.Host) *PingService { return ps } +func (p *PingService) Start() { + p.Host.SetStreamHandler(ID, p.PingHandler) +} + +func (p *PingService) Stop() { + p.Host.RemoveStreamHandler(ID) +} + func (p *PingService) PingHandler(s network.Stream) { if err := s.Scope().SetService(ServiceName); err != nil { log.Debugf("error attaching stream to ping service: %s", err) From 41af38d76bb8523c6b1ec81ce750f6726bfb6567 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 10 Sep 2024 15:36:42 -0700 Subject: [PATCH 2/2] Add Fx options --- fx/options.go | 362 +++++++++++++++++++++++++++++++++++++++++++++ fx/options_test.go | 235 +++++++++++++++++++++++++++++ fx/services.go | 25 ++++ 3 files changed, 622 insertions(+) create mode 100644 fx/options.go create mode 100644 fx/options_test.go create mode 100644 fx/services.go diff --git a/fx/options.go b/fx/options.go new file mode 100644 index 0000000000..3d89fd10a4 --- /dev/null +++ b/fx/options.go @@ -0,0 +1,362 @@ +package libp2pfx + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "io" + "slices" + + "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/pnet" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/core/sec" + "github.com/libp2p/go-libp2p/core/transport" + blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/libp2p/go-libp2p/p2p/muxer/yamux" + connmgrImpl "github.com/libp2p/go-libp2p/p2p/net/connmgr" + "github.com/libp2p/go-libp2p/p2p/net/swarm" + tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" + "github.com/libp2p/go-libp2p/p2p/security/noise" + tls "github.com/libp2p/go-libp2p/p2p/security/tls" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/multiformats/go-multiaddr" + mstream "github.com/multiformats/go-multistream" + "github.com/prometheus/client_golang/prometheus" + "github.com/quic-go/quic-go" + "go.uber.org/fx" + "golang.org/x/crypto/hkdf" +) + +type blankHostParams struct { + fx.In + Network network.Network + Mux *mstream.MultistreamMuxer[protocol.ID] + ConnMgr connmgr.ConnManager + EventBus event.Bus + Lifecycle fx.Lifecycle +} + +func BlankHost() fx.Option { + return fx.Provide(func(params blankHostParams) host.Host { + h := blankhost.BlankHost{ + N: params.Network, + M: params.Mux, + ConnMgr: params.ConnMgr, + E: params.EventBus, + // Users can do this manually, but can't opt out of it otherwise. + SkipInitSignedRecord: true, + } + params.Lifecycle.Append(fx.Hook{ + OnStart: func(context.Context) error { + return h.Start() + }, + OnStop: func(context.Context) error { + return h.Close() + }, + }) + return &h + }) +} + +type swarmParams struct { + fx.In + fx.Lifecycle + Local peer.ID + Peerstore peerstore.Peerstore + EventBus event.Bus + ListenAddrs []ListenAddr `group:"listenAddr"` +} + +func SwarmNetwork(opts ...swarm.Option) fx.Option { + return fx.Module("swarm", + fx.Provide(fx.Annotate(func(params swarmParams) (*swarm.Swarm, error) { + s, err := swarm.NewSwarm( + params.Local, + params.Peerstore, + params.EventBus, + opts..., + ) + if err != nil { + return nil, err + } + params.Lifecycle.Append(fx.StartStopHook( + func() error { + addrs := make([]multiaddr.Multiaddr, len(params.ListenAddrs)) + for i, a := range params.ListenAddrs { + addrs[i] = multiaddr.Multiaddr(a) + } + return s.Listen(addrs...) + }, + s.Close, + )) + return s, nil + }, fx.As(new(network.Network)), fx.As(fx.Self()))), + fx.Invoke( + fx.Annotate( + func(swrm *swarm.Swarm, tpts []transport.Transport) error { + for _, t := range tpts { + if err := swrm.AddTransport(t); err != nil { + return err + } + } + return nil + }, + fx.ParamTags("", `group:"transport"`), + )), + ) +} + +type peerIDRes struct { + fx.Out + Peer peer.ID + Key crypto.PrivKey +} + +func RandomPeerID() fx.Option { + return fx.Provide(func() (peerIDRes, error) { + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return peerIDRes{}, err + } + pid, err := peer.IDFromPrivateKey(priv) + if err != nil { + return peerIDRes{}, err + } + return peerIDRes{Peer: pid, Key: priv}, nil + }) +} + +var QUICTransport = fx.Provide( + fx.Annotate( + func(params struct { + fx.In + PrivKey crypto.PrivKey + QUICConnMgr *quicreuse.ConnManager + ConnGater connmgr.ConnectionGater + Rcmgr network.ResourceManager + }) (transport.Transport, error) { + return libp2pquic.NewTransport( + params.PrivKey, + params.QUICConnMgr, + nil, + params.ConnGater, + params.Rcmgr, + ) + }, + fx.ResultTags(`group:"transport"`), + ), +) + +func TCPTransport(opts ...tcp.Option) fx.Option { + return fx.Provide( + fx.Annotate( + func(p struct { + fx.In + Upgrader transport.Upgrader + Rcmgr network.ResourceManager + }) (transport.Transport, error) { + return tcp.NewTCPTransport(p.Upgrader, p.Rcmgr, opts...) + }, + fx.As(new(transport.Transport)), + fx.ResultTags(`group:"transport"`), + ), + ) +} + +func Upgrader(opts ...tptu.Option) fx.Option { + return fx.Provide( + func(p struct { + fx.In + Security []sec.SecureTransport + Muxers []tptu.StreamMuxer + Rcmgr network.ResourceManager + ConnGater connmgr.ConnectionGater + }) (transport.Upgrader, error) { + // Not supporting PSK here since it doesn't work on all transports, + // and there are better ways of authenticating peers. If you need + // PSK, provide the upgrader manually. + var psk pnet.PSK = nil + return tptu.New(p.Security, p.Muxers, psk, p.Rcmgr, p.ConnGater, opts...) + }, + ) +} + +// Security is a helper to provide a list of security transports in a specific order. +func Security(ss ...func() (protocol.ID, fx.Option)) fx.Option { + order := make(map[protocol.ID]int) + var opts []fx.Option + for i, s := range ss { + id, opt := s() + order[id] = i + opts = append(opts, opt) + } + opts = append(opts, + fx.Provide(fx.Annotate(func(unorderedSecurity []sec.SecureTransport) []sec.SecureTransport { + slices.SortFunc(unorderedSecurity, func(a, b sec.SecureTransport) int { + return order[a.ID()] - order[b.ID()] + }) + return unorderedSecurity + }, fx.ParamTags(`group:"unorderedSecurity"`))), + ) + return fx.Options(opts...) +} + +var Noise = func() (protocol.ID, fx.Option) { + return noise.ID, fx.Provide( + fx.Annotate( + func( + p struct { + fx.In + Privkey crypto.PrivKey + Muxers []tptu.StreamMuxer + }) (sec.SecureTransport, error) { + return noise.New(noise.ID, p.Privkey, p.Muxers) + }, + fx.ResultTags(`group:"unorderedSecurity"`), + ), + ) +} + +var TLS = func() (protocol.ID, fx.Option) { + return tls.ID, fx.Provide( + fx.Annotate( + func( + p struct { + fx.In + Privkey crypto.PrivKey + Muxers []tptu.StreamMuxer + }) (sec.SecureTransport, error) { + return tls.New(tls.ID, p.Privkey, p.Muxers) + }, + fx.ResultTags(`group:"unorderedSecurity"`), + ), + ) +} + +var Yamux = fx.Supply( + []tptu.StreamMuxer{{ + ID: yamux.ID, + Muxer: yamux.DefaultTransport, + }}, +) + +type MetricsConfig struct { + Disable bool + PrometheusRegister prometheus.Registerer +} + +var DisableMetrics = fx.Decorate(func(params struct { + fx.In + cfg *MetricsConfig `optional:"true"` +}) *MetricsConfig { + if params.cfg == nil { + params.cfg = new(MetricsConfig) + } + params.cfg.Disable = true + return params.cfg +}) + +var QUICReuseConnManager = fx.Provide( + func(metricsCfg MetricsConfig, key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) { + var opts []quicreuse.Option + if !metricsCfg.Disable { + opts = append(opts, quicreuse.EnableMetrics(metricsCfg.PrometheusRegister)) + } + cm, err := quicreuse.NewConnManager(key, tokenGenerator, opts...) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StopHook(cm.Close)) + return cm, nil + }, + func(key crypto.PrivKey) (quic.StatelessResetKey, error) { + var statelessResetKey quic.StatelessResetKey + keyBytes, err := key.Raw() + if err != nil { + return statelessResetKey, err + } + + const statelessResetKeyInfo = "libp2p quic stateless reset key" + keyReader := hkdf.New(sha256.New, keyBytes, nil, []byte(statelessResetKeyInfo)) + if _, err := io.ReadFull(keyReader, statelessResetKey[:]); err != nil { + return statelessResetKey, err + } + return statelessResetKey, nil + }, + func(key crypto.PrivKey) (quic.TokenGeneratorKey, error) { + var tokenKey quic.TokenGeneratorKey + keyBytes, err := key.Raw() + if err != nil { + return tokenKey, err + } + + const tokenGeneratorKeyInfo = "libp2p quic token generator key" + keyReader := hkdf.New(sha256.New, keyBytes, nil, []byte(tokenGeneratorKeyInfo)) + if _, err := io.ReadFull(keyReader, tokenKey[:]); err != nil { + return tokenKey, err + } + return tokenKey, nil + }, +) + +func EventBus(opts ...eventbus.Option) fx.Option { + return fx.Supply(fx.Annotate(eventbus.NewBus(opts...), fx.As(new(event.Bus)))) +} + +func InMemoryPeerstore() fx.Option { + return fx.Provide(func() (peerstore.Peerstore, error) { + return pstoremem.NewPeerstore() + }) +} + +func ConnManager(low, hi int) fx.Option { + return fx.Provide(func() (connmgr.ConnManager, error) { + return connmgrImpl.NewConnManager(low, hi) + }) +} + +var DefaultConnManager = ConnManager(160, 192) + +var NullConnManager = fx.Provide(func() connmgr.ConnManager { + return connmgr.NullConnMgr{} +}) + +var NullResourceManager = fx.Provide(func() network.ResourceManager { + return &network.NullResourceManager{} +}) + +var NullConnectionGater = fx.Provide(func() connmgr.ConnectionGater { + return nil +}) + +var MultistreamMuxer = fx.Provide(func() *mstream.MultistreamMuxer[protocol.ID] { + return mstream.NewMultistreamMuxer[protocol.ID]() +}) + +// New type to specify that these are used for listening. +type ListenAddr multiaddr.Multiaddr + +func ListenAddrs(addrs ...multiaddr.Multiaddr) fx.Option { + return fx.Provide( + fx.Annotate( + func() []ListenAddr { + out := make([]ListenAddr, len(addrs)) + for i, a := range addrs { + out[i] = ListenAddr(a) + } + return out + }, + fx.ResultTags(`group:"listenAddr,flatten"`), + )) +} diff --git a/fx/options_test.go b/fx/options_test.go new file mode 100644 index 0000000000..c25628315e --- /dev/null +++ b/fx/options_test.go @@ -0,0 +1,235 @@ +package libp2pfx + +import ( + "context" + "io" + "testing" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/sec" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx" +) + +func TestConstructHost(t *testing.T) { + app := fx.New( + BlankHost(), + SwarmNetwork(), + RandomPeerID(), + EventBus(), + InMemoryPeerstore(), + NullConnManager, + MultistreamMuxer, + fx.Invoke(func(h host.Host) { + require.NotNil(t, h) + }), + ) + require.NoError(t, app.Start(context.Background())) + defer assert.NoError(t, app.Stop(context.Background())) +} + +func echoTest(t *testing.T, h1, h2 host.Host) { + require.NoError(t, + h1.Connect(context.Background(), peer.AddrInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + })) + require.NoError(t, + h2.Connect(context.Background(), peer.AddrInfo{ + ID: h1.ID(), + Addrs: h1.Addrs(), + })) + + h2.SetStreamHandler("/test", func(s network.Stream) { + defer s.Close() + io.Copy(s, s) + }) + + s, err := h1.NewStream(context.Background(), h2.ID(), "/test") + require.NoError(t, err) + + _, err = s.Write([]byte("hello")) + require.NoError(t, err) + require.NoError(t, s.CloseWrite()) + + b, err := io.ReadAll(s) + require.NoError(t, err) + require.Equal(t, "hello", string(b)) +} + +func TestQUICTransport(t *testing.T) { + newHost := func() host.Host { + var h host.Host + app := fx.New( + fx.NopLogger, + BlankHost(), + SwarmNetwork(), + RandomPeerID(), + EventBus(), + InMemoryPeerstore(), + MultistreamMuxer, + QUICTransport, + QUICReuseConnManager, + NullConnectionGater, + NullResourceManager, + NullConnManager, + fx.Supply(MetricsConfig{Disable: true}), + fx.Populate(&h), + ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/udp/0/quic-v1")), + ) + + require.NoError(t, app.Start(context.Background())) + t.Cleanup(func() { app.Stop(context.Background()) }) + return h + } + + h1 := newHost() + h2 := newHost() + + echoTest(t, h1, h2) +} + +func TestTCPTransport(t *testing.T) { + newHost := func() host.Host { + var h host.Host + app := fx.New( + fx.NopLogger, + BlankHost(), + SwarmNetwork(), + RandomPeerID(), + EventBus(), + InMemoryPeerstore(), + MultistreamMuxer, + QUICReuseConnManager, + NullConnectionGater, + NullResourceManager, + NullConnManager, + fx.Supply(MetricsConfig{Disable: true}), + fx.Populate(&h), + Upgrader(), + TCPTransport(), + Yamux, + // TODO how to order? + Security( + TLS, + Noise, + ), + // Assert the security order is correct + ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")), + ) + + require.NoError(t, app.Start(context.Background())) + t.Cleanup(func() { app.Stop(context.Background()) }) + return h + } + + h1 := newHost() + h2 := newHost() + t.Log(h1.Addrs(), h2.Addrs()) + + echoTest(t, h1, h2) +} + +func TestSecurityOrder(t *testing.T) { + app := fx.New( + RandomPeerID(), + Yamux, + Security( + TLS, + Noise, + ), + // Assert the security order is correct + fx.Invoke(func(security []sec.SecureTransport) { + ids := make([]string, len(security)) + for i, s := range security { + ids[i] = string(s.ID()) + } + assert.Equal(t, []string{"/tls/1.0.0", "/noise"}, ids) + }), + ) + require.NoError(t, app.Start(context.Background())) + require.NoError(t, app.Stop(context.Background())) +} + +func quicOptions() fx.Option { + return fx.Options( + fx.NopLogger, + BlankHost(), + SwarmNetwork(), + RandomPeerID(), + EventBus(), + InMemoryPeerstore(), + MultistreamMuxer, + QUICTransport, + QUICReuseConnManager, + NullConnectionGater, + NullResourceManager, + NullConnManager, + fx.Supply(MetricsConfig{Disable: true}), + ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/udp/0/quic-v1")), + ) +} + +func TestPing(t *testing.T) { + newHost := func() (host.Host, *ping.PingService) { + var h host.Host + var s *ping.PingService + opts := quicOptions() + app := fx.New( + opts, + PingService, + fx.Populate(&h), + fx.Populate(&s), + ) + require.NoError(t, app.Start(context.Background())) + t.Cleanup(func() { assert.NoError(t, app.Stop(context.Background())) }) + return h, s + } + + h1, s := newHost() + h2, _ := newHost() + require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{h2.ID(), h2.Addrs()})) + res := <-s.Ping(context.Background(), h2.ID()) + require.NoError(t, res.Error) + t.Log(res.RTT) +} + +func TestIdentify(t *testing.T) { + newHost := func() (host.Host, identify.IDService, event.Bus) { + var h host.Host + var s identify.IDService + var eb event.Bus + opts := quicOptions() + app := fx.New( + opts, + IdentifyService(), + fx.Populate(&h, &s, &eb), + ) + require.NoError(t, app.Start(context.Background())) + t.Cleanup(func() { assert.NoError(t, app.Stop(context.Background())) }) + return h, s, eb + } + + h1, s, eb1 := newHost() + h2, _, eb2 := newHost() + sub1, err := eb1.Subscribe(new(event.EvtPeerIdentificationCompleted)) + require.NoError(t, err) + + sub2, err := eb2.Subscribe(new(event.EvtPeerIdentificationCompleted)) + require.NoError(t, err) + + require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{h2.ID(), h2.Addrs()})) + c := h1.Network().Conns()[0] + <-s.IdentifyWait(c) + res := (<-sub1.Out()).(event.EvtPeerIdentificationCompleted) + t.Log(res) + res = (<-sub2.Out()).(event.EvtPeerIdentificationCompleted) + t.Log(res) +} diff --git a/fx/services.go b/fx/services.go new file mode 100644 index 0000000000..d5d6916376 --- /dev/null +++ b/fx/services.go @@ -0,0 +1,25 @@ +package libp2pfx + +import ( + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "go.uber.org/fx" +) + +func IdentifyService(opts ...identify.Option) fx.Option { + return fx.Provide(func(l fx.Lifecycle, h host.Host) (identify.IDService, error) { + s, err := identify.NewIDService(h, opts...) + if err != nil { + return nil, err + } + l.Append(fx.StartStopHook(s.Start, s.Close)) + return s, nil + }) +} + +var PingService = fx.Provide(func(l fx.Lifecycle, h host.Host) *ping.PingService { + s := &ping.PingService{Host: h} + l.Append(fx.StartStopHook(s.Start, s.Stop)) + return s +})