From 0a16cde9fd85f3cfeb91c42e5e59c8141fdecca3 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 16 Sep 2024 01:30:59 +0530 Subject: [PATCH] gc signed peer records too --- core/peerstore/peerstore.go | 55 +++++--------- p2p/host/peerstore/pstoremem/addr_book.go | 91 ++++++++++++----------- 2 files changed, 66 insertions(+), 80 deletions(-) diff --git a/core/peerstore/peerstore.go b/core/peerstore/peerstore.go index 0ef09df9fe..7f844f1335 100644 --- a/core/peerstore/peerstore.go +++ b/core/peerstore/peerstore.go @@ -122,17 +122,9 @@ type AddrBook interface { } // CertifiedAddrBook manages "self-certified" addresses for remote peers. -// Self-certified addresses are contained in peer.PeerRecords -// which are wrapped in a record.Envelope and signed by the peer -// to whom they belong. -// -// Certified addresses (CA) are generally more secure than uncertified -// addresses (UA). Consequently, CAs beat and displace UAs. When the -// peerstore learns CAs for a peer, it will reject UAs for the same peer -// (as long as the former haven't expired). -// Furthermore, peer records act like sequenced snapshots of CAs. Therefore, -// processing a peer record that's newer than the last one seen overwrites -// all addresses with the incoming ones. +// Self-certified addresses are contained in signed peer.PeerRecords. +// Certified addresses are generally more secure than uncertified +// addresses. // // This interface is most useful when combined with AddrBook. // To test whether a given AddrBook / Peerstore implementation supports @@ -143,36 +135,23 @@ type AddrBook interface { // cab.ConsumePeerRecord(signedPeerRecord, aTTL) // } type CertifiedAddrBook interface { - // ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in - // a record.Envelope), which will expire after the given TTL. - // - // The 'accepted' return value indicates that the record was successfully processed - // and integrated into the CertifiedAddrBook state. If 'accepted' is false but no - // error is returned, it means that the record was ignored, most likely because - // a newer record exists for the same peer. - // - // Signed records added via this method will be stored without - // alteration as long as the address TTLs remain valid. The Envelopes - // containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID). + // ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire when + // all addresses associated with the peer have expired. The addresses in provided signed + // peer.PeerRecord are expired after `ttl` duration. // - // If the signed PeerRecord belongs to a peer that already has certified - // addresses in the CertifiedAddrBook, the new addresses will replace the - // older ones, if the new record has a higher sequence number than the - // existing record. Attempting to add a peer record with a - // sequence number that's <= an existing record for the same peer will not - // result in an error, but the record will be ignored, and the 'accepted' - // bool return value will be false. + // The `accepted` return value indicates that the record was successfully processed. If + // `accepted` is false but no error is returned, it means that the record was ignored, most + // likely because a newer record exists for the same peer. // - // If the CertifiedAddrBook is also an AddrBook (which is most likely the case), - // adding certified addresses for a peer will *replace* any - // existing non-certified addresses for that peer, and only the certified - // addresses will be returned from AddrBook.Addrs thereafter. + // If the signed peer.PeerRecord belongs to a peer that already has certified addresses in + // the CertifiedAddrBook, and if the new record has a higher sequence number than the + // existing record, the new addresses will be added and the older ones will be kept + // unchanged. Attempting to add a peer record with a sequence number that's lower than an + // existing record will not result in an error, but the record will be ignored, and the + // `accepted` return value will be false. // - // Likewise, once certified addresses have been added for a given peer, - // any non-certified addresses added via AddrBook.AddAddrs or - // AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used - // to update the TTL of certified addresses that have previously been - // added via ConsumePeerRecord. + // The Envelopes containing the PeerRecords can be retrieved by calling + // GetPeerRecord(peerID). ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) // GetPeerRecord returns an Envelope containing a PeerRecord for the diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index d7f5d8d6f7..8019828f3e 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -16,7 +16,7 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -var SignedPeerRecordBound = 1_000 +var SignedPeerRecordBound = 100_000 var log = logging.Logger("peerstore") @@ -42,13 +42,13 @@ type peerRecordState struct { var _ heap.Interface = &peerAddrs{} type peerAddrs struct { - addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr + Addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr expiringHeap []*expiringAddr } func newPeerAddrs() peerAddrs { return peerAddrs{ - addrs: make(map[peer.ID]map[string]*expiringAddr), + Addrs: make(map[peer.ID]map[string]*expiringAddr), } } @@ -63,10 +63,10 @@ func (pa *peerAddrs) Swap(i, j int) { } func (pa *peerAddrs) Push(x any) { a := x.(*expiringAddr) - if _, ok := pa.addrs[a.Peer]; !ok { - pa.addrs[a.Peer] = make(map[string]*expiringAddr) + if _, ok := pa.Addrs[a.Peer]; !ok { + pa.Addrs[a.Peer] = make(map[string]*expiringAddr) } - pa.addrs[a.Peer][string(a.Addr.Bytes())] = a + pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a a.heapIndex = len(pa.expiringHeap) pa.expiringHeap = append(pa.expiringHeap, a) } @@ -77,10 +77,10 @@ func (pa *peerAddrs) Pop() any { a.heapIndex = -1 pa.expiringHeap = old[0 : n-1] - if m, ok := pa.addrs[a.Peer]; ok { + if m, ok := pa.Addrs[a.Peer]; ok { delete(m, string(a.Addr.Bytes())) if len(m) == 0 { - delete(pa.addrs, a.Peer) + delete(pa.Addrs, a.Peer) } } @@ -94,16 +94,16 @@ func (pa *peerAddrs) Fix(a *expiringAddr) { func (pa *peerAddrs) Delete(a *expiringAddr) { heap.Remove(pa, a.heapIndex) a.heapIndex = -1 - if m, ok := pa.addrs[a.Peer]; ok { + if m, ok := pa.Addrs[a.Peer]; ok { delete(m, string(a.Addr.Bytes())) if len(m) == 0 { - delete(pa.addrs, a.Peer) + delete(pa.Addrs, a.Peer) } } } func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) { - if m, ok := pa.addrs[p]; ok { + if m, ok := pa.Addrs[p]; ok { v, ok := m[string(addrBytes.Bytes())] return v, ok } @@ -117,10 +117,12 @@ func (pa *peerAddrs) NextExpiry() time.Time { return pa.expiringHeap[len(pa.expiringHeap)-1].Expires } -func (pa *peerAddrs) gc(now time.Time) { - for len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { - heap.Pop(pa) +func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) { + if len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) { + a := heap.Pop(pa) + return a.(*expiringAddr), true } + return nil, false } type clock interface { @@ -201,14 +203,20 @@ func (mab *memoryAddrBook) gc() { now := mab.clock.Now() mab.mu.Lock() defer mab.mu.Unlock() - mab.addrs.gc(now) + for { + ea, ok := mab.addrs.PopIfExpired(now) + if !ok { + return + } + mab.maybeDeleteSignedPeerRecordUnlocked(ea.Peer) + } } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { mab.mu.RLock() defer mab.mu.RUnlock() - peers := make(peer.IDSlice, 0, len(mab.addrs.addrs)) - for pid := range mab.addrs.addrs { + peers := make(peer.IDSlice, 0, len(mab.addrs.Addrs)) + for pid := range mab.addrs.Addrs { peers = append(peers, pid) } return peers @@ -219,22 +227,15 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } -// AddAddrs gives memoryAddrBook addresses to use, with a given ttl -// (time-to-live), after which the address is no longer valid. +// AddAddrs adds `addrs` for peer `p`, which will expire after the given `ttl`. // This function never reduces the TTL or expiration of an address. func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - // if we have a valid peer record, ignore unsigned addrs - // peerRec := mab.GetPeerRecord(p) - // if peerRec != nil { - // return - // } mab.addAddrs(p, addrs, ttl) } var ErrTooManyRecords = fmt.Errorf("too many signed peer records. Dropping this one") -// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in -// a record.Envelope), which will expire after the given TTL. +// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL. // See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { r, err := recordEnvelope.Record() @@ -249,13 +250,13 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return false, fmt.Errorf("signing key does not match PeerID in PeerRecord") } - // ensure seq is greater than, or equal to, the last received mab.mu.Lock() defer mab.mu.Unlock() if (len(mab.signedPeerRecords)) >= SignedPeerRecordBound { return false, ErrTooManyRecords } + // ensure seq is greater than or equal to the last received lastState, found := mab.signedPeerRecords[rec.PeerID] if found && lastState.Seq > rec.Seq { return false, nil @@ -268,6 +269,12 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return true, nil } +func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) { + if len(mab.addrs.Addrs[p]) == 0 { + delete(mab.signedPeerRecords, p) + } +} + func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { mab.mu.Lock() defer mab.mu.Unlock() @@ -276,6 +283,8 @@ func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + // if ttl is zero, exit. nothing to do. if ttl <= 0 { return @@ -293,8 +302,6 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl log.Warnf("Was passed p2p address with a different peerId. found: %s, expected: %s", addrPid, p) continue } - // find the highest TTL and Expiry time between - // existing records and function args a, found := mab.addrs.FindAddr(p, addr) if !found { // not found, announce it. @@ -330,6 +337,8 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du mab.mu.Lock() defer mab.mu.Unlock() + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { addr, addrPid := peer.SplitAddr(addr) @@ -343,7 +352,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } if a, found := mab.addrs.FindAddr(p, addr); found { - // re-set all of them for new ttl. if ttl > 0 { a.Addr = addr a.Expires = exp @@ -367,9 +375,11 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { mab.mu.Lock() defer mab.mu.Unlock() - exp := mab.clock.Now().Add(newTTL) - for _, a := range mab.addrs.addrs[p] { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + + exp := mab.clock.Now().Add(newTTL) + for _, a := range mab.addrs.Addrs[p] { if oldTTL == a.TTL { if newTTL == 0 { mab.addrs.Delete(a) @@ -386,11 +396,10 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { mab.mu.RLock() defer mab.mu.RUnlock() - - if _, ok := mab.addrs.addrs[p]; !ok { + if _, ok := mab.addrs.Addrs[p]; !ok { return nil } - return validAddrs(mab.clock.Now(), mab.addrs.addrs[p]) + return validAddrs(mab.clock.Now(), mab.addrs.Addrs[p]) } func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { @@ -414,13 +423,11 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { mab.mu.RLock() defer mab.mu.RUnlock() - if _, ok := mab.addrs.addrs[p]; !ok { + if _, ok := mab.addrs.Addrs[p]; !ok { return nil } - // although the signed record gets garbage collected when all addrs inside it are expired, - // we may be in between the expiration time and the GC interval - // so, we check to see if we have any valid signed addrs before returning the record - if len(validAddrs(mab.clock.Now(), mab.addrs.addrs[p])) == 0 { + // The record may have expired, but not gargage collected. + if len(validAddrs(mab.clock.Now(), mab.addrs.Addrs[p])) == 0 { return nil } @@ -437,7 +444,7 @@ func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { defer mab.mu.Unlock() delete(mab.signedPeerRecords, p) - for _, a := range mab.addrs.addrs[p] { + for _, a := range mab.addrs.Addrs[p] { mab.addrs.Delete(a) } } @@ -448,7 +455,7 @@ func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma. var initial []ma.Multiaddr mab.mu.RLock() - if m, ok := mab.addrs.addrs[p]; ok { + if m, ok := mab.addrs.Addrs[p]; ok { initial = make([]ma.Multiaddr, 0, len(m)) for _, a := range m { initial = append(initial, a.Addr)