From 595ec23e5a49413d1744b7f3620a45cbd849ff42 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Thu, 8 Aug 2024 16:10:30 -0400 Subject: [PATCH] network: remove ws net proto 2.1 (#6081) --- network/msgCompressor.go | 55 +++++----------------------- network/msgCompressor_test.go | 39 +------------------- network/wsNetwork.go | 69 ++++++----------------------------- network/wsNetwork_test.go | 46 ++++++----------------- network/wsPeer.go | 16 ++++---- 5 files changed, 43 insertions(+), 182 deletions(-) diff --git a/network/msgCompressor.go b/network/msgCompressor.go index 831b509aef..61108cad99 100644 --- a/network/msgCompressor.go +++ b/network/msgCompressor.go @@ -31,28 +31,6 @@ var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd} const zstdCompressionLevel = zstd.BestSpeed -// checkCanCompress checks if there is an proposal payload message and peers supporting compression -func checkCanCompress(request broadcastRequest, peers []*wsPeer) bool { - canCompress := false - hasPP := false - for _, tag := range request.tags { - if tag == protocol.ProposalPayloadTag { - hasPP = true - break - } - } - // if have proposal payload check if there are any peers supporting compression - if hasPP { - for _, peer := range peers { - if peer.pfProposalCompressionSupported() { - canCompress = true - break - } - } - } - return canCompress -} - // zstdCompressMsg returns a concatenation of a tag and compressed data func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) { bound := zstd.CompressBound(len(d)) @@ -89,13 +67,7 @@ type wsPeerMsgDataConverter struct { ppdec zstdProposalDecompressor } -type zstdProposalDecompressor struct { - active bool -} - -func (dec zstdProposalDecompressor) enabled() bool { - return dec.active -} +type zstdProposalDecompressor struct{} func (dec zstdProposalDecompressor) accept(data []byte) bool { return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:]) @@ -126,18 +98,16 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) { func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) { if tag == protocol.ProposalPayloadTag { - if c.ppdec.enabled() { - // sender might support compressed payload but fail to compress for whatever reason, - // in this case it sends non-compressed payload - the receiver decompress only if it is compressed. - if c.ppdec.accept(data) { - res, err := c.ppdec.convert(data) - if err != nil { - return nil, fmt.Errorf("peer %s: %w", c.origin, err) - } - return res, nil + // sender might support compressed payload but fail to compress for whatever reason, + // in this case it sends non-compressed payload - the receiver decompress only if it is compressed. + if c.ppdec.accept(data) { + res, err := c.ppdec.convert(data) + if err != nil { + return nil, fmt.Errorf("peer %s: %w", c.origin, err) } - c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin) + return res, nil } + c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin) } return data, nil } @@ -148,11 +118,6 @@ func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter { origin: wp.originAddress, } - if wp.pfProposalCompressionSupported() { - c.ppdec = zstdProposalDecompressor{ - active: true, - } - } - + c.ppdec = zstdProposalDecompressor{} return &c } diff --git a/network/msgCompressor_test.go b/network/msgCompressor_test.go index 3b08b5fc0e..172cf05a98 100644 --- a/network/msgCompressor_test.go +++ b/network/msgCompressor_test.go @@ -48,37 +48,6 @@ func TestZstdDecompress(t *testing.T) { require.Nil(t, decompressed) } -func TestCheckCanCompress(t *testing.T) { - partitiontest.PartitionTest(t) - - req := broadcastRequest{} - peers := []*wsPeer{} - r := checkCanCompress(req, peers) - require.False(t, r) - - req.tags = []protocol.Tag{protocol.AgreementVoteTag} - r = checkCanCompress(req, peers) - require.False(t, r) - - req.tags = []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag} - r = checkCanCompress(req, peers) - require.False(t, r) - - peer1 := wsPeer{ - features: 0, - } - peers = []*wsPeer{&peer1} - r = checkCanCompress(req, peers) - require.False(t, r) - - peer2 := wsPeer{ - features: pfCompressedProposal, - } - peers = []*wsPeer{&peer1, &peer2} - r = checkCanCompress(req, peers) - require.True(t, r) -} - func TestZstdCompressMsg(t *testing.T) { partitiontest.PartitionTest(t) @@ -108,7 +77,7 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) { partitiontest.PartitionTest(t) c := wsPeerMsgDataConverter{} - c.ppdec = zstdProposalDecompressor{active: false} + c.ppdec = zstdProposalDecompressor{} tag := protocol.AgreementVoteTag data := []byte("data") @@ -117,13 +86,9 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) { require.Equal(t, data, r) tag = protocol.ProposalPayloadTag - r, err = c.convert(tag, data) - require.NoError(t, err) - require.Equal(t, data, r) - l := converterTestLogger{} c.log = &l - c.ppdec = zstdProposalDecompressor{active: true} + c.ppdec = zstdProposalDecompressor{} r, err = c.convert(tag, data) require.NoError(t, err) require.Equal(t, data, r) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 917d1b6e64..2af3a9b6bf 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -146,11 +146,6 @@ var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", De var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) -var networkPrioBatchesPPWithCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_batches_wpp_comp_sent_total", Description: "number of prio compressed batches with PP"}) -var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_pp_prio_batches_wpp_non_comp_sent_total", Description: "number of prio non-compressed batches with PP"}) -var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"}) -var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"}) - // peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete. const peerDisconnectionAckDuration = 5 * time.Second @@ -1062,6 +1057,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt wn.setHeaders(responseHeader) responseHeader.Set(ProtocolVersionHeader, matchingVersion) responseHeader.Set(GenesisHeader, wn.GenesisID) + // set the features we support responseHeader.Set(PeerFeaturesHeader, PeerFeatureProposalCompression) var challenge string if wn.prioScheme != nil { @@ -1391,21 +1387,10 @@ func (wn *WebsocketNetwork) getPeersChangeCounter() int32 { } // preparePeerData prepares batches of data for sending. -// It performs optional zstd compression for proposal massages -func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { - // determine if there is a payload proposal and peers supporting compressed payloads - wantCompression := false - containsPrioPPTag := false - if prio { - wantCompression = checkCanCompress(request, peers) - } - +// It performs zstd compression for proposal massages if they this is a prio request and has proposal. +func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) { digests := make([]crypto.Digest, len(request.data)) data := make([][]byte, len(request.data)) - var dataCompressed [][]byte - if wantCompression { - dataCompressed = make([][]byte, len(request.data)) - } for i, d := range request.data { tbytes := []byte(request.tags[i]) mbytes := make([]byte, len(tbytes)+len(d)) @@ -1416,29 +1401,15 @@ func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, p digests[i] = crypto.Hash(mbytes) } - if prio { - if request.tags[i] == protocol.ProposalPayloadTag { - networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) - containsPrioPPTag = true - } - } - - if wantCompression { - if request.tags[i] == protocol.ProposalPayloadTag { - compressed, logMsg := zstdCompressMsg(tbytes, d) - if len(logMsg) > 0 { - wn.log.Warn(logMsg) - } else { - networkPrioPPCompressedSize.AddUint64(uint64(len(compressed)), nil) - } - dataCompressed[i] = compressed - } else { - // otherwise reuse non-compressed from above - dataCompressed[i] = mbytes + if prio && request.tags[i] == protocol.ProposalPayloadTag { + compressed, logMsg := zstdCompressMsg(tbytes, d) + if len(logMsg) > 0 { + wn.log.Warn(logMsg) } + data[i] = compressed } } - return data, dataCompressed, digests, containsPrioPPTag + return data, digests } // prio is set if the broadcast is a high-priority broadcast. @@ -1455,7 +1426,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe } start := time.Now() - data, dataWithCompression, digests, containsPrioPPTag := wn.preparePeerData(request, prio, peers) + data, digests := wn.preparePeerData(request, prio) // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 @@ -1466,23 +1437,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe if peer == request.except { continue } - var ok bool - if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 { - // if this peer supports compressed proposals and compressed data batch is filled out, use it - ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime) - if prio { - if containsPrioPPTag { - networkPrioBatchesPPWithCompression.Inc(nil) - } - } - } else { - ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) - if prio { - if containsPrioPPTag { - networkPrioBatchesPPWithoutCompression.Inc(nil) - } - } - } + ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) if ok { sentMessageCount++ continue @@ -1951,7 +1906,7 @@ const ProtocolVersionHeader = "X-Algorand-Version" const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version" // SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ). -var SupportedProtocolVersions = []string{"2.2", "2.1"} +var SupportedProtocolVersions = []string{"2.2"} // ProtocolVersion is the current version attached to the ProtocolVersionHeader header /* Version history: diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 6af3a697fc..0128c28fc2 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -429,15 +429,12 @@ func TestWebsocketProposalPayloadCompression(t *testing.T) { } var tests []testDef = []testDef{ - // two old nodes - {[]string{"2.1"}, "2.1", []string{"2.1"}, "2.1"}, - // two new nodes with overwritten config {[]string{"2.2"}, "2.2", []string{"2.2"}, "2.2"}, // old node + new node {[]string{"2.1"}, "2.1", []string{"2.2", "2.1"}, "2.2"}, - {[]string{"2.2", "2.1"}, "2.2", []string{"2.1"}, "2.1"}, + {[]string{"2.2", "2.1"}, "2.1", []string{"2.2"}, "2.2"}, // combinations {[]string{"2.2", "2.1"}, "2.1", []string{"2.2", "2.1"}, "2.1"}, @@ -1101,7 +1098,7 @@ func TestDupFilter(t *testing.T) { defer netC.Stop() makeMsg := func(n int) []byte { - // We cannot harcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that. + // We cannot hardcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that. // We also cannot use maxSize for PP since it's a compressible tag but trying to compress random data will expand it. if messageFilterSize+1 < n { n = messageFilterSize + 1 @@ -1387,7 +1384,7 @@ func TestPeeringWithIdentityChallenge(t *testing.T) { assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut))) // netA never attempts to set identity as it never sees a verified identity assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount()) - // no connecton => netB does attepmt to add the identity to the tracker + // no connection => netB does attempt to add the identity to the tracker // and it would not end up being added assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount()) assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount()) @@ -1608,7 +1605,7 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) { assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount()) } -// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match +// TestPeeringIncorrectDeduplicationName confirm that if the receiver can't match // the Address in the challenge to its PublicAddress, identities aren't exchanged, but peering continues func TestPeeringIncorrectDeduplicationName(t *testing.T) { partitiontest.PartitionTest(t) @@ -1665,7 +1662,7 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) { // bi-directional connection would now work since netB detects to be connected to netA in tryConnectReserveAddr, // so force it. - // this second connection should set identities, because the reciever address matches now + // this second connection should set identities, because the receiver address matches now _, ok = netB.tryConnectReserveAddr(addrA) assert.False(t, ok) netB.wg.Add(1) @@ -2504,9 +2501,9 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) { } func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte, when time.Time) error { - msgArr := make([][]byte, 1, 1) + msgArr := make([][]byte, 1) msgArr[0] = data - tagArr := make([]protocol.Tag, 1, 1) + tagArr := make([]protocol.Tag, 1) tagArr[0] = tag request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()} @@ -3711,48 +3708,29 @@ func TestPreparePeerData(t *testing.T) { data: [][]byte{[]byte("test"), []byte("data")}, } - peers := []*wsPeer{} wn := WebsocketNetwork{} - data, comp, digests, seenPrioPPTag := wn.broadcaster.preparePeerData(req, false, peers) + data, digests := wn.broadcaster.preparePeerData(req, false) require.NotEmpty(t, data) - require.Empty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) - require.False(t, seenPrioPPTag) for i := range data { require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) } - // compression - peer1 := wsPeer{ - features: 0, - } - peer2 := wsPeer{ - features: pfCompressedProposal, - } - peers = []*wsPeer{&peer1, &peer2} - data, comp, digests, seenPrioPPTag = wn.broadcaster.preparePeerData(req, true, peers) + data, digests = wn.broadcaster.preparePeerData(req, true) require.NotEmpty(t, data) - require.NotEmpty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) - require.Equal(t, len(comp), len(digests)) - require.True(t, seenPrioPPTag) for i := range data { - require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) - } - - for i := range comp { if req.tags[i] != protocol.ProposalPayloadTag { - require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), comp[i]) - require.Equal(t, data[i], comp[i]) + require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) + require.Equal(t, data[i], data[i]) } else { - require.NotEqual(t, data[i], comp[i]) - require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), comp[i][:len(req.tags[i])+len(zstdCompressionMagic)]) + require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), data[i][:len(req.tags[i])+len(zstdCompressionMagic)]) } } } diff --git a/network/wsPeer.go b/network/wsPeer.go index 2b302f071f..88a0c615f9 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -746,7 +746,7 @@ func (wp *wsPeer) handleMessageOfInterest(msg IncomingMessage) (close bool, reas wp.log.Warnf("wsPeer handleMessageOfInterest: could not unmarshall message from: %s %v", wp.conn.RemoteAddrString(), err) return true, disconnectBadData } - msgs := make([]sendMessage, 1, 1) + msgs := make([]sendMessage, 1) msgs[0] = sendMessage{ data: nil, enqueued: time.Now(), @@ -911,8 +911,8 @@ func (wp *wsPeer) writeLoopCleanup(reason disconnectReason) { } func (wp *wsPeer) writeNonBlock(ctx context.Context, data []byte, highPrio bool, digest crypto.Digest, msgEnqueueTime time.Time) bool { - msgs := make([][]byte, 1, 1) - digests := make([]crypto.Digest, 1, 1) + msgs := make([][]byte, 1) + digests := make([]crypto.Digest, 1) msgs[0] = data digests[0] = digest return wp.writeNonBlockMsgs(ctx, msgs, highPrio, digests, msgEnqueueTime) @@ -1090,7 +1090,7 @@ func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Re defer wp.getAndRemoveResponseChannel(hash) // Send serializedMsg - msg := make([]sendMessage, 1, 1) + msg := make([]sendMessage, 1) msg[0] = sendMessage{ data: append([]byte(tag), serializedMsg...), enqueued: time.Now(), @@ -1166,10 +1166,6 @@ func (wp *wsPeer) sendMessagesOfInterest(messagesOfInterestGeneration uint32, me } } -func (wp *wsPeer) pfProposalCompressionSupported() bool { - return wp.features&pfCompressedProposal != 0 -} - func (wp *wsPeer) OnClose(f func()) { if wp.closers == nil { wp.closers = []func(){} @@ -1180,7 +1176,9 @@ func (wp *wsPeer) OnClose(f func()) { //msgp:ignore peerFeatureFlag type peerFeatureFlag int -const pfCompressedProposal peerFeatureFlag = 1 +const ( + pfCompressedProposal peerFeatureFlag = 1 << iota +) // versionPeerFeatures defines protocol version when peer features were introduced const versionPeerFeatures = "2.2"