Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - p2p: server: adjust deadline during long reads and writes #5463

Closed
wants to merge 9 commits into from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ configuration is as follows:
* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.
* [#5463](https://github.com/spacemeshos/go-spacemesh/pull/5463)
Adjust deadline during long reads and writes, reducing "i/o deadline exceeded" errors.

## Release v1.3.3

Expand Down
3 changes: 3 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Config struct {
BatchSize, QueueSize int
MaxRetriesForRequest int
RequestTimeout time.Duration `mapstructure:"request-timeout"`
RequestHardTimeout time.Duration `mapstructure:"request-hard-timeout"`
EnableServerMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
Expand All @@ -127,6 +128,7 @@ func DefaultConfig() Config {
QueueSize: 20,
BatchSize: 10,
RequestTimeout: 25 * time.Second,
RequestHardTimeout: 5 * time.Minute,
MaxRetriesForRequest: 100,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
Expand Down Expand Up @@ -287,6 +289,7 @@ func (f *Fetch) registerServer(
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServerMetrics {
Expand Down
4 changes: 3 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func createFetch(tb testing.TB) *testFetch {
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
GetAtxsConcurrency: DefaultConfig().GetAtxsConcurrency,
}
Expand Down Expand Up @@ -335,7 +336,8 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
BatchTimeout: 2000 * time.Minute, // make sure we never hit the batch timeout
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: time.Second * time.Duration(3),
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
}
p2pconf := p2p.DefaultConfig()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-msgio v0.3.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/multiformats/go-varint v0.0.7
Expand Down Expand Up @@ -144,7 +145,6 @@ require (
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
Expand Down
5 changes: 3 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,9 +1320,10 @@ func getTestDefaultConfig(tb testing.TB) *config.Config {
cfg.DataDirParent = tmp
cfg.FileLock = filepath.Join(tmp, "LOCK")

cfg.FETCH.RequestTimeout = 10
cfg.FETCH.RequestTimeout = 10 * time.Second
cfg.FETCH.RequestHardTimeout = 20 * time.Second
cfg.FETCH.BatchSize = 5
cfg.FETCH.BatchTimeout = 5
cfg.FETCH.BatchTimeout = 5 * time.Second

cfg.Beacon = beacon.NodeSimUnitTestConfig()

Expand Down
150 changes: 150 additions & 0 deletions p2p/server/deadline_adjuster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package server

import (
"context"
"errors"
"fmt"
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-yamux/v4"
)

const (
deadlineAdjusterChunkSize = 4096
)

type deadlineAdjusterError struct {
innerErr error
elapsed time.Duration
totalRead int
totalWritten int
timeout time.Duration
hardTimeout time.Duration
}

func (err *deadlineAdjusterError) Unwrap() error {
return err.innerErr
}

func (err *deadlineAdjusterError) Error() string {
return fmt.Sprintf("%v elapsed, %d bytes read, %d bytes written, timeout %v, hard timeout %v: %v",
err.elapsed,
err.totalRead,
err.totalWritten,
err.timeout,
err.hardTimeout,
err.innerErr)
}

type deadlineAdjuster struct {
peerStream
timeout time.Duration
hardTimeout time.Duration
totalRead int
totalWritten int
start time.Time
clock clockwork.Clock
chunkSize int
nextAdjustRead int
nextAdjustWrite int
hardDeadline time.Time
}

func newDeadlineAdjuster(stream peerStream, timeout, hardTimeout time.Duration) *deadlineAdjuster {
return &deadlineAdjuster{
peerStream: stream,
timeout: timeout,
hardTimeout: hardTimeout,
start: time.Now(),
clock: clockwork.NewRealClock(),
chunkSize: deadlineAdjusterChunkSize,
nextAdjustRead: -1,
nextAdjustWrite: -1,
}
}

func (dadj *deadlineAdjuster) augmentError(what string, err error) error {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, yamux.ErrTimeout) {
return err
}

return &deadlineAdjusterError{
innerErr: err,
elapsed: dadj.clock.Now().Sub(dadj.start),
totalRead: dadj.totalRead,
totalWritten: dadj.totalWritten,
timeout: dadj.timeout,
hardTimeout: dadj.hardTimeout,
}
}

func (dadj *deadlineAdjuster) adjust() error {
now := dadj.clock.Now()
if dadj.hardDeadline.IsZero() {
dadj.hardDeadline = now.Add(dadj.hardTimeout)
} else if now.After(dadj.hardDeadline) {
// emulate yamux timeout error
return yamux.ErrTimeout
}
// Do not adjust the deadline too often
adj := false
if dadj.totalRead > dadj.nextAdjustRead {
dadj.nextAdjustRead = dadj.totalRead + dadj.chunkSize
adj = true
}
if dadj.totalWritten > dadj.nextAdjustWrite {
dadj.nextAdjustWrite = dadj.totalWritten + dadj.chunkSize
adj = true
}
if adj {
// We ignore the error returned by SetDeadline b/c the call
// doesn't work for mock hosts
deadline := now.Add(dadj.timeout)
if deadline.After(dadj.hardDeadline) {
_ = dadj.SetDeadline(dadj.hardDeadline)
} else {
_ = dadj.SetDeadline(deadline)
}
}

return nil
}

func (dadj *deadlineAdjuster) Read(p []byte) (int, error) {
var n int
for n < len(p) {
if err := dadj.adjust(); err != nil {
return n, dadj.augmentError("read", err)
}

Check warning on line 119 in p2p/server/deadline_adjuster.go

View check run for this annotation

Codecov / codecov/patch

p2p/server/deadline_adjuster.go#L118-L119

Added lines #L118 - L119 were not covered by tests
to := min(len(p), n+dadj.chunkSize)
nCur, err := dadj.peerStream.Read(p[n:to])
n += nCur
dadj.totalRead += nCur
if err != nil {
return n, dadj.augmentError("read", err)
}
if n < to {
poszu marked this conversation as resolved.
Show resolved Hide resolved
// Short read, don't try to read more data
break
}
}
return n, nil
}

func (dadj *deadlineAdjuster) Write(p []byte) (n int, err error) {
var nCur int
for n < len(p) {
if err := dadj.adjust(); err != nil {
return n, dadj.augmentError("write", err)
}
to := min(len(p), n+dadj.chunkSize)
nCur, err = dadj.peerStream.Write(p[n:to])
n += nCur
dadj.totalWritten += nCur
if err != nil {
return n, dadj.augmentError("write", err)
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
}
return n, nil
}
128 changes: 128 additions & 0 deletions p2p/server/deadline_adjuster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package server

import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-yamux/v4"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/p2p/server/mocks"
)

func TestDeadlineAdjuster(t *testing.T) {
ctrl := gomock.NewController(t)
s := mocks.NewMockpeerStream(ctrl)
clock := clockwork.NewFakeClock()

readChunks := []string{"xy", "ABCD", "EF", "0123", "4567", "89"}
writeChunks := []string{"foo", "abcd", "efgh", "ijk", "bbbc"}
poszu marked this conversation as resolved.
Show resolved Hide resolved

start := clock.Now()
var deadlines []int
s.EXPECT().
SetDeadline(gomock.Any()).
DoAndReturn(func(dt time.Time) error {
d := dt.Sub(start)
require.Equal(t, d, d.Truncate(time.Second))
deadlines = append(deadlines, int(d/time.Second))
return nil
}).
AnyTimes()

var readCalls []any
for _, str := range readChunks {
chunk := []byte(str)
readCalls = append(readCalls, s.EXPECT().
Read(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(time.Second)
copy(b, []byte(chunk))
return len(chunk), nil
}))
}
readCalls = append(readCalls, s.EXPECT().
Read(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(10 * time.Second)
return 1, yamux.ErrTimeout
}))
gomock.InOrder(readCalls...)

var writeCalls []any
for _, str := range writeChunks {
chunk := []byte(str)
writeCalls = append(writeCalls, s.EXPECT().
Write(chunk).DoAndReturn(func([]byte) (int, error) {
clock.Advance(time.Second)
return len(chunk), nil
}))
}
for i := 0; i < 2; i++ {
writeCalls = append(writeCalls, s.EXPECT().
Write(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(10 * time.Second)
return 2, yamux.ErrTimeout
}))
}
gomock.InOrder(writeCalls...)

dadj := newDeadlineAdjuster(s, 10*time.Second, 35*time.Second)
dadj.clock = clock
dadj.chunkSize = 4

b := make([]byte, 2)
n, err := dadj.Read(b)
require.NoError(t, err)
require.Equal(t, 2, n)
require.Equal(t, []byte("xy"), b)

b = make([]byte, 10)
n, err = dadj.Read(b) // short read
require.NoError(t, err)
require.Equal(t, 6, n)
require.Equal(t, []byte("ABCDEF"), b[:n])

b = make([]byte, 10)
n, err = dadj.Read(b)
require.NoError(t, err)
require.Equal(t, 10, n)
require.Equal(t, []byte("0123456789"), b)

n, err = dadj.Write([]byte("foo"))
require.NoError(t, err)
require.Equal(t, 3, n)

n, err = dadj.Write([]byte("abcdefghijk"))
require.NoError(t, err)
require.Equal(t, 11, n)

b = make([]byte, 2)
n, err = dadj.Read(b)
require.Equal(t, 1, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 14 bytes written, timeout 10s, hard timeout 35s")

n, err = dadj.Write([]byte("bbbcdef"))
require.Equal(t, 6, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 20 bytes written, timeout 10s, hard timeout 35s")

// this causes deadline to be set at the hard deadline
n, err = dadj.Write([]byte("dd"))
require.Equal(t, 2, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 22 bytes written, timeout 10s, hard timeout 35s")

// this write doesn't even start as we're past the hard deadline
require.Equal(t, 41*time.Second, clock.Now().Sub(start))
n, err = dadj.Write([]byte("ddd"))
require.Equal(t, 0, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 22 bytes written, timeout 10s, hard timeout 35s")

require.Equal(t, []int{10, 12, 14, 16, 18, 20, 35}, deadlines)
}
25 changes: 25 additions & 0 deletions p2p/server/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package server

import (
"context"
"io"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go

// Host is a subset of libp2p Host interface that needs to be implemented to be usable with server.
type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
}

type peerStream interface {
io.ReadWriteCloser
SetDeadline(time.Time) error
}
Loading
Loading