diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 71de47f0e5..8bdb072bb8 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -26,6 +26,7 @@ type Config struct { TTL time.Duration `yaml:"ttl"` PullerSyncTime time.Duration `yaml:"puller_sync_time"` MaxCasRetries int `yaml:"max_cas_retries"` + Timeout time.Duration `yaml:"timeout"` } type Client struct { @@ -69,8 +70,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh MaxRetries: cfg.MaxCasRetries, } + var kv dynamoDbClient + kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics} + if cfg.Timeout > 0 { + kv = newDynamoDbKVWithTimeout(kv, cfg.Timeout) + } c := &Client{ - kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}, + kv: kv, codec: cc, logger: ddbLog(logger), ddbMetrics: ddbMetrics, diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index f54e5fe55b..ebda3aace2 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st return item } +type dynamoDbKVWithTimeout struct { + ddbClient dynamoDbClient + timeout time.Duration +} + +func newDynamoDbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamoDbKVWithTimeout { + return &dynamoDbKVWithTimeout{ddbClient: client, timeout: timeout} +} + +func (d *dynamoDbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.List(ctx, key) +} + +func (d *dynamoDbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Query(ctx, key, isPrefix) +} + +func (d *dynamoDbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Delete(ctx, key) +} + +func (d *dynamoDbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Put(ctx, key, data) +} + +func (d *dynamoDbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Batch(ctx, put, delete) +} + func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue { resp := map[string]*dynamodb.AttributeValue{ primaryKey: {