Skip to content

Commit

Permalink
network: remove ws net proto 2.1 (#6081)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Aug 8, 2024
1 parent 602d950 commit 595ec23
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 182 deletions.
55 changes: 10 additions & 45 deletions network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,6 @@ var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd}

const zstdCompressionLevel = zstd.BestSpeed

// checkCanCompress checks if there is an proposal payload message and peers supporting compression
func checkCanCompress(request broadcastRequest, peers []*wsPeer) bool {
canCompress := false
hasPP := false
for _, tag := range request.tags {
if tag == protocol.ProposalPayloadTag {
hasPP = true
break
}
}
// if have proposal payload check if there are any peers supporting compression
if hasPP {
for _, peer := range peers {
if peer.pfProposalCompressionSupported() {
canCompress = true
break
}
}
}
return canCompress
}

// zstdCompressMsg returns a concatenation of a tag and compressed data
func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) {
bound := zstd.CompressBound(len(d))
Expand Down Expand Up @@ -89,13 +67,7 @@ type wsPeerMsgDataConverter struct {
ppdec zstdProposalDecompressor
}

type zstdProposalDecompressor struct {
active bool
}

func (dec zstdProposalDecompressor) enabled() bool {
return dec.active
}
type zstdProposalDecompressor struct{}

func (dec zstdProposalDecompressor) accept(data []byte) bool {
return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:])
Expand Down Expand Up @@ -126,18 +98,16 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {

func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.ProposalPayloadTag {
if c.ppdec.enabled() {
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
return nil, fmt.Errorf("peer %s: %w", c.origin, err)
}
return res, nil
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
return nil, fmt.Errorf("peer %s: %w", c.origin, err)
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
return res, nil
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
}
return data, nil
}
Expand All @@ -148,11 +118,6 @@ func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
origin: wp.originAddress,
}

if wp.pfProposalCompressionSupported() {
c.ppdec = zstdProposalDecompressor{
active: true,
}
}

c.ppdec = zstdProposalDecompressor{}
return &c
}
39 changes: 2 additions & 37 deletions network/msgCompressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,6 @@ func TestZstdDecompress(t *testing.T) {
require.Nil(t, decompressed)
}

func TestCheckCanCompress(t *testing.T) {
partitiontest.PartitionTest(t)

req := broadcastRequest{}
peers := []*wsPeer{}
r := checkCanCompress(req, peers)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag}
r = checkCanCompress(req, peers)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag}
r = checkCanCompress(req, peers)
require.False(t, r)

peer1 := wsPeer{
features: 0,
}
peers = []*wsPeer{&peer1}
r = checkCanCompress(req, peers)
require.False(t, r)

peer2 := wsPeer{
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
r = checkCanCompress(req, peers)
require.True(t, r)
}

func TestZstdCompressMsg(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -108,7 +77,7 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
partitiontest.PartitionTest(t)

c := wsPeerMsgDataConverter{}
c.ppdec = zstdProposalDecompressor{active: false}
c.ppdec = zstdProposalDecompressor{}
tag := protocol.AgreementVoteTag
data := []byte("data")

Expand All @@ -117,13 +86,9 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
require.Equal(t, data, r)

tag = protocol.ProposalPayloadTag
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)

l := converterTestLogger{}
c.log = &l
c.ppdec = zstdProposalDecompressor{active: true}
c.ppdec = zstdProposalDecompressor{}
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)
Expand Down
69 changes: 12 additions & 57 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", De
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})

var networkPrioBatchesPPWithCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_batches_wpp_comp_sent_total", Description: "number of prio compressed batches with PP"})
var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_pp_prio_batches_wpp_non_comp_sent_total", Description: "number of prio non-compressed batches with PP"})
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete.
const peerDisconnectionAckDuration = 5 * time.Second

Expand Down Expand Up @@ -1062,6 +1057,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
wn.setHeaders(responseHeader)
responseHeader.Set(ProtocolVersionHeader, matchingVersion)
responseHeader.Set(GenesisHeader, wn.GenesisID)
// set the features we support
responseHeader.Set(PeerFeaturesHeader, PeerFeatureProposalCompression)
var challenge string
if wn.prioScheme != nil {
Expand Down Expand Up @@ -1391,21 +1387,10 @@ func (wn *WebsocketNetwork) getPeersChangeCounter() int32 {
}

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
containsPrioPPTag := false
if prio {
wantCompression = checkCanCompress(request, peers)
}

// It performs zstd compression for proposal massages if they this is a prio request and has proposal.
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) {
digests := make([]crypto.Digest, len(request.data))
data := make([][]byte, len(request.data))
var dataCompressed [][]byte
if wantCompression {
dataCompressed = make([][]byte, len(request.data))
}
for i, d := range request.data {
tbytes := []byte(request.tags[i])
mbytes := make([]byte, len(tbytes)+len(d))
Expand All @@ -1416,29 +1401,15 @@ func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, p
digests[i] = crypto.Hash(mbytes)
}

if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
containsPrioPPTag = true
}
}

if wantCompression {
if request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)
} else {
networkPrioPPCompressedSize.AddUint64(uint64(len(compressed)), nil)
}
dataCompressed[i] = compressed
} else {
// otherwise reuse non-compressed from above
dataCompressed[i] = mbytes
if prio && request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)
}
data[i] = compressed
}
}
return data, dataCompressed, digests, containsPrioPPTag
return data, digests
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1455,7 +1426,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
}

start := time.Now()
data, dataWithCompression, digests, containsPrioPPTag := wn.preparePeerData(request, prio, peers)
data, digests := wn.preparePeerData(request, prio)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1466,23 +1437,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
if peer == request.except {
continue
}
var ok bool
if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 {
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if ok {
sentMessageCount++
continue
Expand Down Expand Up @@ -1951,7 +1906,7 @@ const ProtocolVersionHeader = "X-Algorand-Version"
const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version"

// SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ).
var SupportedProtocolVersions = []string{"2.2", "2.1"}
var SupportedProtocolVersions = []string{"2.2"}

// ProtocolVersion is the current version attached to the ProtocolVersionHeader header
/* Version history:
Expand Down
46 changes: 12 additions & 34 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,12 @@ func TestWebsocketProposalPayloadCompression(t *testing.T) {
}

var tests []testDef = []testDef{
// two old nodes
{[]string{"2.1"}, "2.1", []string{"2.1"}, "2.1"},

// two new nodes with overwritten config
{[]string{"2.2"}, "2.2", []string{"2.2"}, "2.2"},

// old node + new node
{[]string{"2.1"}, "2.1", []string{"2.2", "2.1"}, "2.2"},
{[]string{"2.2", "2.1"}, "2.2", []string{"2.1"}, "2.1"},
{[]string{"2.2", "2.1"}, "2.1", []string{"2.2"}, "2.2"},

// combinations
{[]string{"2.2", "2.1"}, "2.1", []string{"2.2", "2.1"}, "2.1"},
Expand Down Expand Up @@ -1101,7 +1098,7 @@ func TestDupFilter(t *testing.T) {
defer netC.Stop()

makeMsg := func(n int) []byte {
// We cannot harcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that.
// We cannot hardcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that.
// We also cannot use maxSize for PP since it's a compressible tag but trying to compress random data will expand it.
if messageFilterSize+1 < n {
n = messageFilterSize + 1
Expand Down Expand Up @@ -1387,7 +1384,7 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut)))
// netA never attempts to set identity as it never sees a verified identity
assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount())
// no connecton => netB does attepmt to add the identity to the tracker
// no connection => netB does attempt to add the identity to the tracker
// and it would not end up being added
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount())
Expand Down Expand Up @@ -1608,7 +1605,7 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) {
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
}

// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match
// TestPeeringIncorrectDeduplicationName confirm that if the receiver can't match
// the Address in the challenge to its PublicAddress, identities aren't exchanged, but peering continues
func TestPeeringIncorrectDeduplicationName(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down Expand Up @@ -1665,7 +1662,7 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) {

// bi-directional connection would now work since netB detects to be connected to netA in tryConnectReserveAddr,
// so force it.
// this second connection should set identities, because the reciever address matches now
// this second connection should set identities, because the receiver address matches now
_, ok = netB.tryConnectReserveAddr(addrA)
assert.False(t, ok)
netB.wg.Add(1)
Expand Down Expand Up @@ -2504,9 +2501,9 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) {
}

func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte, when time.Time) error {
msgArr := make([][]byte, 1, 1)
msgArr := make([][]byte, 1)
msgArr[0] = data
tagArr := make([]protocol.Tag, 1, 1)
tagArr := make([]protocol.Tag, 1)
tagArr[0] = tag
request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()}

Expand Down Expand Up @@ -3711,48 +3708,29 @@ func TestPreparePeerData(t *testing.T) {
data: [][]byte{[]byte("test"), []byte("data")},
}

peers := []*wsPeer{}
wn := WebsocketNetwork{}
data, comp, digests, seenPrioPPTag := wn.broadcaster.preparePeerData(req, false, peers)
data, digests := wn.broadcaster.preparePeerData(req, false)
require.NotEmpty(t, data)
require.Empty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.False(t, seenPrioPPTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
}

// compression
peer1 := wsPeer{
features: 0,
}
peer2 := wsPeer{
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
data, comp, digests, seenPrioPPTag = wn.broadcaster.preparePeerData(req, true, peers)
data, digests = wn.broadcaster.preparePeerData(req, true)
require.NotEmpty(t, data)
require.NotEmpty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Equal(t, len(comp), len(digests))
require.True(t, seenPrioPPTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
}

for i := range comp {
if req.tags[i] != protocol.ProposalPayloadTag {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), comp[i])
require.Equal(t, data[i], comp[i])
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
require.Equal(t, data[i], data[i])
} else {
require.NotEqual(t, data[i], comp[i])
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), comp[i][:len(req.tags[i])+len(zstdCompressionMagic)])
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), data[i][:len(req.tags[i])+len(zstdCompressionMagic)])
}
}
}
Expand Down
Loading

0 comments on commit 595ec23

Please sign in to comment.