Skip to content

Commit

Permalink
add dynamodb kv with timeout enforced
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Jan 24, 2025
1 parent 60b5b09 commit d1d0fac
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
MaxCasRetries int `yaml:"max_cas_retries"`
Timeout time.Duration `yaml:"timeout"`
}

type Client struct {
Expand Down Expand Up @@ -69,8 +70,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh
MaxRetries: cfg.MaxCasRetries,
}

var kv dynamoDbClient
kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}
if cfg.Timeout > 0 {
kv = newDynamoDbKVWithTimeout(kv, cfg.Timeout)
}
c := &Client{
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
kv: kv,
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
Expand Down
39 changes: 39 additions & 0 deletions pkg/ring/kv/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st
return item
}

type dynamoDbKVWithTimeout struct {
ddbClient dynamoDbClient
timeout time.Duration
}

func newDynamoDbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamoDbKVWithTimeout {
return &dynamoDbKVWithTimeout{ddbClient: client, timeout: timeout}
}

func (d *dynamoDbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.List(ctx, key)
}

func (d *dynamoDbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Query(ctx, key, isPrefix)
}

func (d *dynamoDbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Delete(ctx, key)
}

func (d *dynamoDbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Put(ctx, key, data)
}

func (d *dynamoDbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Batch(ctx, put, delete)
}

func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {
resp := map[string]*dynamodb.AttributeValue{
primaryKey: {
Expand Down

0 comments on commit d1d0fac

Please sign in to comment.