Skip to content

Commit

Permalink
broadcastclient: check if compression was negotiated when connecting …
Browse files Browse the repository at this point in the history
…to feed server
  • Loading branch information
magicxyyz committed Nov 18, 2024
1 parent 36a7ab8 commit fb5886e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
30 changes: 25 additions & 5 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ type BroadcastClient struct {

chainId uint64

// Protects conn and shuttingDown
connMutex sync.Mutex
conn net.Conn
// Protects conn, shuttingDown and compression
connMutex sync.Mutex
conn net.Conn
compression bool

retryCount atomic.Int64

Expand Down Expand Up @@ -299,7 +300,7 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa
return nil, nil
}

conn, br, _, err := timeoutDialer.Dial(ctx, bc.websocketUrl)
conn, br, hs, err := timeoutDialer.Dial(ctx, bc.websocketUrl)
if errors.Is(err, ErrIncorrectFeedServerVersion) || errors.Is(err, ErrIncorrectChainId) {
return nil, err
}
Expand All @@ -325,6 +326,24 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa
return nil, ErrMissingFeedServerVersion
}

compressionNegotiated := false
for _, ext := range hs.Extensions {
if ext.Equal(deflateExt) {
compressionNegotiated = true
break
}
}
if !compressionNegotiated && config.EnableCompression {
log.Warn("Compression was not negotiated when connecting to feed server.")
}
if compressionNegotiated && !config.EnableCompression {
err := conn.Close()
if err != nil {
return nil, fmt.Errorf("error closing connection when negotiated disabled extension: %w", err)
}
return nil, errors.New("error dialing feed server: negotiated compression ws extension, but it is disabled")
}

var earlyFrameData io.Reader
if br != nil {
// Depending on how long the client takes to read the response, there may be
Expand All @@ -339,6 +358,7 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa

bc.connMutex.Lock()
bc.conn = conn
bc.compression = compressionNegotiated
bc.connMutex.Unlock()
log.Info("Feed connected", "feedServerVersion", feedServerVersion, "chainId", chainId, "requestedSeqNum", nextSeqNum)

Expand All @@ -362,7 +382,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) {
var op ws.OpCode
var err error
config := bc.config()
msg, op, err = wsbroadcastserver.ReadData(ctx, bc.conn, earlyFrameData, config.Timeout, ws.StateClientSide, config.EnableCompression, flateReader)
msg, op, err = wsbroadcastserver.ReadData(ctx, bc.conn, earlyFrameData, config.Timeout, ws.StateClientSide, bc.compression, flateReader)
if err != nil {
if bc.isShuttingDown() {
return
Expand Down
2 changes: 1 addition & 1 deletion wsbroadcastserver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func ReadData(ctx context.Context, conn net.Conn, earlyFrameData io.Reader, time
var data []byte
if msg.IsCompressed() {
if !compression {
return nil, 0, errors.New("Received compressed frame even though compression is disabled")
return nil, 0, errors.New("Received compressed frame even though compression extension wasn't negotiated")
}
flateReader.Reset(&reader)
data, err = io.ReadAll(flateReader)
Expand Down

0 comments on commit fb5886e

Please sign in to comment.