Skip to content

Commit

Permalink
Add timeout for dynamodb ring kv (#6544)
Browse files Browse the repository at this point in the history
* add dynamodb kv with timeout enforced

Signed-off-by: yeya24 <[email protected]>

* add tests

Signed-off-by: yeya24 <[email protected]>

* docs

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: yeya24 <[email protected]>
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jan 28, 2025
1 parent de5cfe1 commit b4953a3
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ compactor:
# CLI flag: -compactor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -compactor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -store-gateway.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is:
# store-gateway.sharding-ring
Expand Down
28 changes: 28 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -alertmanager.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2286,6 +2290,10 @@ sharding_ring:
# CLI flag: -compactor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -compactor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2595,6 +2603,10 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -distributor.ha-tracker.dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[consul: <consul_config>]
Expand Down Expand Up @@ -2689,6 +2701,10 @@ ring:
# CLI flag: -distributor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -distributor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -3017,6 +3033,10 @@ lifecycler:
# CLI flag: -dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
[consul: <consul_config>]
Expand Down Expand Up @@ -4674,6 +4694,10 @@ ring:
# CLI flag: -ruler.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -ruler.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: ruler.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -5665,6 +5689,10 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]
# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -store-gateway.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
[consul: <consul_config>]
Expand Down
9 changes: 8 additions & 1 deletion pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
MaxCasRetries int `yaml:"max_cas_retries"`
Timeout time.Duration `yaml:"timeout"`
}

type Client struct {
Expand Down Expand Up @@ -53,6 +54,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.")
f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.")
f.IntVar(&cfg.MaxCasRetries, prefix+"dynamodb.max-cas-retries", maxCasRetries, "Maximum number of retries for DDB KV CAS.")
f.DurationVar(&cfg.Timeout, prefix+"dynamodb.timeout", 2*time.Minute, "Timeout of dynamoDbClient requests. Default is 2m.")
}

func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
Expand All @@ -69,8 +71,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh
MaxRetries: cfg.MaxCasRetries,
}

var kv dynamoDbClient
kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}
if cfg.Timeout > 0 {
kv = newDynamodbKVWithTimeout(kv, cfg.Timeout)
}
c := &Client{
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
kv: kv,
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
Expand Down
77 changes: 77 additions & 0 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,29 @@ func Test_UpdateStaleData(t *testing.T) {

}

func Test_DynamodbKVWithTimeout(t *testing.T) {
ddbMock := NewDynamodbClientMock()
// Backend has delay of 5s while the client timeout is 1s.
ddbWithDelay := newDynamodbKVWithDelay(ddbMock, time.Second*5)
dbWithTimeout := newDynamodbKVWithTimeout(ddbWithDelay, time.Second)

ctx := context.Background()
_, _, err := dbWithTimeout.List(ctx, dynamodbKey{primaryKey: key})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Delete(ctx, dynamodbKey{primaryKey: key})
require.True(t, errors.Is(err, context.DeadlineExceeded))

_, _, err = dbWithTimeout.Query(ctx, dynamodbKey{primaryKey: key}, true)
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Batch(ctx, nil, nil)
require.True(t, errors.Is(err, context.DeadlineExceeded))
}

// NewClientMock makes a new local dynamodb client.
func NewClientMock(ddbClient dynamoDbClient, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer, time time.Duration, config backoff.Config) *Client {
return &Client{
Expand Down Expand Up @@ -429,3 +452,57 @@ func (m *DescMock) FindDifference(that codec.MultiKey) (interface{}, []string, e
}
return args.Get(0), args.Get(1).([]string), err
}

type dynamodbKVWithDelayAndContextCheck struct {
ddbClient dynamoDbClient
delay time.Duration
}

func newDynamodbKVWithDelay(client dynamoDbClient, delay time.Duration) *dynamodbKVWithDelayAndContextCheck {
return &dynamodbKVWithDelayAndContextCheck{ddbClient: client, delay: delay}
}

func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.List(ctx, key)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Query(ctx, key, isPrefix)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Delete(ctx context.Context, key dynamodbKey) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Delete(ctx, key)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamodbKey, data []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Put(ctx, key, data)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Batch(ctx, put, delete)
}
}
39 changes: 39 additions & 0 deletions pkg/ring/kv/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st
return item
}

type dynamodbKVWithTimeout struct {
ddbClient dynamoDbClient
timeout time.Duration
}

func newDynamodbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamodbKVWithTimeout {
return &dynamodbKVWithTimeout{ddbClient: client, timeout: timeout}
}

func (d *dynamodbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.List(ctx, key)
}

func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Query(ctx, key, isPrefix)
}

func (d *dynamodbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Delete(ctx, key)
}

func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Put(ctx, key, data)
}

func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Batch(ctx, put, delete)
}

func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {
resp := map[string]*dynamodb.AttributeValue{
primaryKey: {
Expand Down

0 comments on commit b4953a3

Please sign in to comment.