From 45ed49da67c0cfa258e2f51c20a29f836264847c Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Mon, 25 Jun 2018 18:44:02 -0700 Subject: [PATCH 1/3] Dyanmo Client --- .circleci/config.yml | 27 +- dynamodb_client/base_table_client.go | 529 +++++++++++++++++++++ dynamodb_client/base_table_client_test.go | 93 ++++ dynamodb_client/dynamo.go | 138 ++++++ dynamodb_client/dynamo_attribute_values.go | 48 ++ 5 files changed, 834 insertions(+), 1 deletion(-) create mode 100644 dynamodb_client/base_table_client.go create mode 100644 dynamodb_client/base_table_client_test.go create mode 100644 dynamodb_client/dynamo.go create mode 100644 dynamodb_client/dynamo_attribute_values.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 0ccff280..e2474932 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -22,6 +22,31 @@ jobs: - checkout # specify any bash command here prefixed with `run: ` + - restore_cache: + keys: + - dynamodb-installed + - run: + name: Install Java + command: 'sudo apt-get update && sudo apt-get install default-jre default-jdk' + - run: + name: Setup Container + command: | + test -f DynamoDBLocal.jar || curl -k -L -o dynamodb-local.tgz http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz + test -f DynamoDBLocal.jar || tar -xzf dynamodb-local.tgz + - save_cache: + key: dynamodb-installed + paths: + - DynamoDBLocal_lib + - DynamoDBLocal.jar + - run: + name: Launch Dynamodb + command: java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb + background: true - run: make deps - run: go get -v -t -d ./... - - run: go test ./... + - run: + name: Run Tests + command: go test ./... + environment: + AWS_ACCESS_KEY_ID: DUMMY_KEY + AWS_SECRET_ACCESS_KEY: DUMMY_SECRET diff --git a/dynamodb_client/base_table_client.go b/dynamodb_client/base_table_client.go new file mode 100644 index 00000000..74a223a7 --- /dev/null +++ b/dynamodb_client/base_table_client.go @@ -0,0 +1,529 @@ +package dynamodb_client + +import ( + "context" + "fmt" + "strconv" + + dd_opentracing "github.com/DataDog/dd-trace-go/opentracing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + opentracing "github.com/opentracing/opentracing-go" +) + +var ( + BuildQueryHashKeyName = ":hash_key" + AwsErrNotFound = fmt.Errorf("item not found") + AwsErrNotProcessed = fmt.Errorf("item not processed") +) + +type PrimaryKey struct { + PartitionKey *Key + SortKey *Key +} + +type Key struct { + Type string + Name string +} + +// BaseTableClient implements the TableClient interface. +type BaseTableClient struct { + desc *dynamodb.TableDescription + client dynamodbiface.DynamoDBAPI + pk *PrimaryKey + tableName string + serviceName string +} + +// TableClient defines a context aware table based interface to manage a dynamodb +// table. +type TableClient interface { + // Reads + GetItem(ctx context.Context, hashKey, rangeKey string) (DynamoItem, error) + // Writes + PutItem(ctx context.Context, hashKey, rangeKey string, item DynamoItem) error + DeleteItem(ctx context.Context, hashKey, rangeKey string) error + // Table management + Create(ctx context.Context) error + Delete(ctx context.Context) error + + // LOOKING FOR SOMETHING??? + // This implementation was pulled from another project. The functions above have had tests added. + // If you want to use any of the functionality below, please uncomment the declaration and write a test + // for it. The implementation is already in this file, though without tests we don't know if it is correct. + + // Reads + // Query(ctx context.Context, opts QueryOpts) ([]map[string]*dynamodb.AttributeValue, error) + // BatchGetDocument(ctx context.Context, keys []DynamoKey, consistentRead bool, v interface{}) ([]error, error) + // Writes + // ConditionExpressionPutItem(ctx context.Context, hashKey, rangeKey string, item DynamoItem, expression AwsExpression) error + // ConditionExpressionDeleteItem(ctx context.Context, hashKey, rangeKey string, expr AwsExpression) error + // ConditionExpressionUpdateAttributes(ctx context.Context, hashKey, rangeKey string, expr AwsExpression) error + // BatchPutDocument(ctx context.Context, keys []DynamoKey, items []DynamoItem) ([]error, error) + // BatchDeleteDocument(ctx context.Context, keys []DynamoKey) ([]error, error) + // Table Management + // UpdateStreamConfiguration(ctx context.Context) error +} + +type TableClientOpts struct { + Desc *dynamodb.TableDescription + Client dynamodbiface.DynamoDBAPI + ServiceName string +} + +func NewBaseTableClient(opts TableClientOpts) (*BaseTableClient, error) { + tableName := aws.StringValue(opts.Desc.TableName) + pk, err := buildPrimaryKey(opts.Desc) + if err != nil { + return nil, fmt.Errorf("Error building primary key for %s: %v", tableName, err) + } + + table := &BaseTableClient{ + desc: opts.Desc, + client: opts.Client, + pk: &pk, + tableName: tableName, + serviceName: opts.ServiceName, + } + + return table, nil +} + +func (dt *BaseTableClient) Query(ctx context.Context, opts QueryOpts) ([]map[string]*dynamodb.AttributeValue, error) { + input := dt.buildQuery(opts) + + req, output := dt.client.QueryRequest(input) + err := dt.sendWithTracing(ctx, req) + + return output.Items, err +} + +func (dt *BaseTableClient) PutItem(ctx context.Context, hashKey, rangeKey string, item DynamoItem) error { + item = dt.addPrimaryKey(hashKey, rangeKey, item) + + input := &dynamodb.PutItemInput{ + Item: item, + TableName: dt.desc.TableName, + } + + req, _ := dt.client.PutItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + return err +} + +func (dt *BaseTableClient) DeleteItem(ctx context.Context, hashKey, rangeKey string) error { + key := dt.addPrimaryKey(hashKey, rangeKey, DynamoItem{}) + + input := &dynamodb.DeleteItemInput{ + TableName: dt.desc.TableName, + Key: key, + } + + req, _ := dt.client.DeleteItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + return err +} + +//func (dt *BaseTableClient) ConditionExpressionDeleteItem(ctx context.Context, hashKey, rangeKey string, expr AwsExpression) error { +// key := dt.addPrimaryKey(hashKey, rangeKey, DynamoItem{}) +// +// input := &dynamodb.DeleteItemInput{ +// TableName: dt.desc.TableName, +// Key: key, +// ConditionExpression: aws.String(expr.ConditionExpression), +// ExpressionAttributeNames: expr.ExpressionAttributeNames, +// ExpressionAttributeValues: expr.ExpressionAttributeValues, +// } +// +// req, output := dt.client.DeleteItemRequest(input) +// err := dt.sendWithTracing(ctx, req) +// +// return err +//} +// +//func (dt *BaseTableClient) ConditionExpressionPutItem(ctx context.Context, hashKey, rangeKey string, item DynamoItem, expr AwsExpression) error { +// item = dt.addPrimaryKey(hashKey, rangeKey, item) +// +// input := &dynamodb.PutItemInput{ +// TableName: dt.desc.TableName, +// Item: item, +// ConditionExpression: aws.String(expr.ConditionExpression), +// ExpressionAttributeNames: expr.ExpressionAttributeNames, +// ExpressionAttributeValues: expr.ExpressionAttributeValues, +// } +// +// req, output := dt.client.PutItemRequest(input) +// err := dt.sendWithTracing(ctx, req) +// +// return err +//} +// +//func (dt *BaseTableClient) ConditionExpressionUpdateAttributes(ctx context.Context, hashKey, rangeKey string, expr AwsExpression) error { +// key := dt.addPrimaryKey(hashKey, rangeKey, DynamoItem{}) +// +// input := &dynamodb.UpdateItemInput{ +// TableName: dt.desc.TableName, +// Key: key, +// ConditionExpression: aws.String(expr.ConditionExpression), +// ExpressionAttributeNames: expr.ExpressionAttributeNames, +// ExpressionAttributeValues: expr.ExpressionAttributeValues, +// UpdateExpression: aws.String(expr.UpdateExpression), +// } +// +// req, output := dt.client.UpdateItemRequest(input) +// err := dt.sendWithTracing(ctx, req) +// +// return err +//} +// +func (dt *BaseTableClient) GetItem(ctx context.Context, hashKey, rangeKey string) (DynamoItem, error) { + item := dt.addPrimaryKey(hashKey, rangeKey, DynamoItem{}) + + input := &dynamodb.GetItemInput{ + TableName: dt.desc.TableName, + Key: item, + ConsistentRead: aws.Bool(false), + } + + req, output := dt.client.GetItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + if output != nil { + // Do we still want this? + //dt.recordConsumedCapacity(ctx, "GetItem", hashKey, output.ConsumedCapacity) + } + + if isEmptyGetItemOutput(output) { + return nil, AwsErrNotFound + } + + return output.Item, err +} + +func (dt *BaseTableClient) Create(ctx context.Context) error { + pt := &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: dt.desc.ProvisionedThroughput.ReadCapacityUnits, + WriteCapacityUnits: dt.desc.ProvisionedThroughput.WriteCapacityUnits, + } + + var localSecondaryIndexes []*dynamodb.LocalSecondaryIndex + for _, desc := range dt.desc.LocalSecondaryIndexes { + localSecondaryIndexes = append(localSecondaryIndexes, &dynamodb.LocalSecondaryIndex{ + IndexName: desc.IndexName, + KeySchema: desc.KeySchema, + Projection: desc.Projection, + }) + } + + var globalSecondaryIndexes []*dynamodb.GlobalSecondaryIndex + for _, desc := range dt.desc.GlobalSecondaryIndexes { + gpt := &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: desc.ProvisionedThroughput.ReadCapacityUnits, + WriteCapacityUnits: desc.ProvisionedThroughput.WriteCapacityUnits, + } + globalSecondaryIndexes = append(globalSecondaryIndexes, &dynamodb.GlobalSecondaryIndex{ + IndexName: desc.IndexName, + KeySchema: desc.KeySchema, + Projection: desc.Projection, + ProvisionedThroughput: gpt, + }) + } + + input := &dynamodb.CreateTableInput{ + AttributeDefinitions: dt.desc.AttributeDefinitions, + ProvisionedThroughput: pt, + KeySchema: dt.desc.KeySchema, + TableName: dt.desc.TableName, + LocalSecondaryIndexes: localSecondaryIndexes, + GlobalSecondaryIndexes: globalSecondaryIndexes, + StreamSpecification: dt.desc.StreamSpecification, + } + + req, _ := dt.client.CreateTableRequest(input) + err := dt.sendWithTracing(ctx, req) + + return err +} + +func (dt *BaseTableClient) Delete(ctx context.Context) error { + input := &dynamodb.DeleteTableInput{ + TableName: dt.desc.TableName, + } + + req, _ := dt.client.DeleteTableRequest(input) + err := dt.sendWithTracing(ctx, req) + + return err +} + +//func (dt *BaseTableClient) UpdateStreamConfiguration(ctx context.Context) error { +// if dt.desc.StreamSpecification == nil { +// return nil +// } +// tableName := *dt.desc.TableName +// enable := dt.desc.StreamSpecification.StreamEnabled != nil && *dt.desc.StreamSpecification.StreamEnabled +// input := &dynamodb.UpdateTableInput{ +// TableName: dt.desc.TableName, +// StreamSpecification: dt.desc.StreamSpecification, +// } +// +// req, _ := dt.client.UpdateTableRequest(input) +// err := dt.sendWithTracing(ctx, req) +// +// if err != nil { +// if awsErr, ok := err.(awserr.Error); ok { +// if awsErr.Code() == "ValidationException" { // stream already configured +// if enable { +// logger.Debug(ctx, "stream", tableName, "already enabled") +// } else { +// logger.Debug(ctx, "stream", tableName, "already disabled") +// } +// return nil +// } +// } +// return fmt.Errorf("Error creating stream %s: %v", tableName, err) +// } +// +// if enable { +// logger.Debug(ctx, "stream", tableName, "enabled") +// } else { +// logger.Debug(ctx, "stream", tableName, "disabled") +// } +// return nil +//} + +func (dt *BaseTableClient) BatchGetDocument(ctx context.Context, keys []DynamoKey, consistentRead bool) (*dynamodb.BatchGetItemOutput, *request.Request, error) { + keysSlice := make([]map[string]*dynamodb.AttributeValue, len(keys)) + for i, key := range keys { + keysSlice[i] = dt.addPrimaryKey(key.HashKey, key.RangeKey, DynamoItem{}) + } + + requestItems := map[string]*dynamodb.KeysAndAttributes{ + *dt.desc.TableName: &dynamodb.KeysAndAttributes{ + ConsistentRead: aws.Bool(consistentRead), + Keys: keysSlice, + }, + } + + input := &dynamodb.BatchGetItemInput{ + RequestItems: requestItems, + } + + req, output := dt.client.BatchGetItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + return output, req, err +} + +func (dt *BaseTableClient) BatchPutDocument(ctx context.Context, keys []DynamoKey, items []DynamoItem) (*dynamodb.BatchWriteItemOutput, *request.Request, error) { + if len(keys) != len(items) { + return nil, nil, fmt.Errorf("keys and items must have same length") + } + + writeRequests := make([]*dynamodb.WriteRequest, len(keys)) + for index, key := range keys { + item := dt.addPrimaryKey(key.HashKey, key.RangeKey, items[index]) + + writeRequests[index] = &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{ + Item: item, + }, + } + } + + requestItems := map[string][]*dynamodb.WriteRequest{ + *dt.desc.TableName: writeRequests, + } + + input := &dynamodb.BatchWriteItemInput{ + RequestItems: requestItems, + } + + req, output := dt.client.BatchWriteItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + return output, req, err +} + +func (dt *BaseTableClient) BatchDeleteDocument(ctx context.Context, keys []DynamoKey) (*dynamodb.BatchWriteItemOutput, *request.Request, error) { + writeRequests := make([]*dynamodb.WriteRequest, len(keys)) + for i, key := range keys { + item := dt.addPrimaryKey(key.HashKey, key.RangeKey, DynamoItem{}) + + writeRequests[i] = &dynamodb.WriteRequest{ + DeleteRequest: &dynamodb.DeleteRequest{ + Key: item, + }, + } + } + + requestItems := map[string][]*dynamodb.WriteRequest{ + *dt.desc.TableName: writeRequests, + } + + input := &dynamodb.BatchWriteItemInput{ + RequestItems: requestItems, + } + + req, output := dt.client.BatchWriteItemRequest(input) + err := dt.sendWithTracing(ctx, req) + + return output, req, err +} + +// Builds a PrimaryKey from a TableDescription +func buildPrimaryKey(t *dynamodb.TableDescription) (pk PrimaryKey, err error) { + for _, k := range t.KeySchema { + ad := findAttributeDefinitionByName(t.AttributeDefinitions, aws.StringValue(k.AttributeName)) + if ad == nil { + return pk, fmt.Errorf("An inconsistency found in TableDescription") + } + + switch aws.StringValue(k.KeyType) { + case dynamodb.KeyTypeHash: + pk.PartitionKey = &Key{Type: aws.StringValue(ad.AttributeType), Name: aws.StringValue(k.AttributeName)} + case dynamodb.KeyTypeRange: + pk.SortKey = &Key{Type: aws.StringValue(ad.AttributeType), Name: aws.StringValue(k.AttributeName)} + default: + return pk, fmt.Errorf("key type not supported") + } + } + return +} + +// Finds Attribute Definition matching the passed in name +func findAttributeDefinitionByName(ads []*dynamodb.AttributeDefinition, name string) *dynamodb.AttributeDefinition { + for _, ad := range ads { + if aws.StringValue(ad.AttributeName) == name { + return ad + } + } + return nil +} + +// Adds HashKey and RangeKey to a DynamoItem +func (dt *BaseTableClient) addPrimaryKey(hashKey, rangeKey string, item DynamoItem) DynamoItem { + item[dt.pk.PartitionKey.Name] = dt.pk.PartitionKey.newAttributeValue(hashKey) + + if dt.pk.hasSortKey() { + item[dt.pk.SortKey.Name] = dt.pk.SortKey.newAttributeValue(rangeKey) + } + + return item +} + +func (pk *PrimaryKey) hasSortKey() bool { + if pk.SortKey != nil { + return true + } + return false +} + +// Builds an attribute value from a PrimaryKey attribute +func (k *Key) newAttributeValue(value string) *dynamodb.AttributeValue { + switch k.Type { + case dynamodb.ScalarAttributeTypeS: + return NewStringAttributeValue(value) + case dynamodb.ScalarAttributeTypeN: + return NewNumberAttributeValue(value) + case dynamodb.ScalarAttributeTypeB: + b, _ := strconv.ParseBool(value) + return NewBoolAttributeValue(b) + default: + return nil + } +} + +func isEmptyGetItemOutput(gio *dynamodb.GetItemOutput) bool { + return gio.Item == nil +} + +func isValidConsumedCapacityLevel(level string) bool { + switch level { + case dynamodb.ReturnConsumedCapacityIndexes: + return true + case dynamodb.ReturnConsumedCapacityTotal: + return true + case dynamodb.ReturnConsumedCapacityNone: + return true + default: + return false + } +} + +func (dt *BaseTableClient) buildQuery(opts QueryOpts) *dynamodb.QueryInput { + qi := &dynamodb.QueryInput{TableName: dt.desc.TableName} + + // Copy over most fields from opts + if opts.Limit != 0 { + qi.Limit = aws.Int64(opts.Limit) + } + if opts.Descending { + qi.ScanIndexForward = aws.Bool(false) + } + if opts.IndexName != "" { + qi.IndexName = aws.String(opts.IndexName) + } + + if opts.FilterExpression != "" { + qi.FilterExpression = aws.String(opts.FilterExpression) + } + + if opts.ProjectionExpression != "" { + qi.ProjectionExpression = aws.String(opts.ProjectionExpression) + } + qi.ExpressionAttributeValues = opts.ExpressionAttributeValues + qi.ExpressionAttributeNames = opts.ExpressionAttributeNames + + // HashKeys are added as key conditions + keyCondition := "" + if opts.HashKey != "" { + keyCondition += fmt.Sprintf("%s = %s", dt.pk.PartitionKey.Name, BuildQueryHashKeyName) + + if qi.ExpressionAttributeValues == nil { + qi.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{} + } + qi.ExpressionAttributeValues[BuildQueryHashKeyName] = NewStringAttributeValue(opts.HashKey) + + if opts.KeyConditionExpression != "" { + keyCondition += " AND " + } + } + keyCondition += opts.KeyConditionExpression + qi.KeyConditionExpression = aws.String(keyCondition) + return qi +} + +func (dt *BaseTableClient) sendWithTracing(ctx context.Context, r *request.Request) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "client.request") + defer span.Finish() + r.HTTPRequest = r.HTTPRequest.WithContext(ctx) + + span.SetTag(dd_opentracing.SpanType, "db") + span.SetTag(dd_opentracing.ServiceName, dt.serviceName) + span.SetTag(dd_opentracing.ResourceName, r.Operation.Name) + span.SetTag("http.method", r.Operation.HTTPMethod) + span.SetTag("http.url", r.ClientInfo.Endpoint+r.Operation.HTTPPath) + span.SetTag("out.host", r.ClientInfo.Endpoint) + span.SetTag("aws.operation", r.Operation.Name) + span.SetTag("aws.table_name", dt.tableName) + + err := r.Send() + + span.SetTag("aws.retry_count", r.RetryCount) + + if r.HTTPResponse != nil { + span.SetTag("http.status_code", r.HTTPResponse.StatusCode) + } + + if err != nil { + span.SetTag(dd_opentracing.Error, err) + } + + return err +} diff --git a/dynamodb_client/base_table_client_test.go b/dynamodb_client/base_table_client_test.go new file mode 100644 index 00000000..91ea3c19 --- /dev/null +++ b/dynamodb_client/base_table_client_test.go @@ -0,0 +1,93 @@ +package dynamodb_client_test + +import ( + "context" + "os" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + . "github.com/remind101/pkg/dynamodb_client" + "github.com/stretchr/testify/assert" +) + +var testTableName = "test_table" +var AllTableDescriptions = map[string]*dynamodb.TableDescription{ + testTableName: &dynamodb.TableDescription{ + TableName: aws.String(testTableName), + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + &dynamodb.AttributeDefinition{ + AttributeName: aws.String("user_uuid"), + AttributeType: aws.String("S"), + }, + &dynamodb.AttributeDefinition{ + AttributeName: aws.String("range_uuid"), + AttributeType: aws.String("S"), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + &dynamodb.KeySchemaElement{ + AttributeName: aws.String("user_uuid"), + KeyType: aws.String("HASH"), + }, + &dynamodb.KeySchemaElement{ + AttributeName: aws.String("range_uuid"), + KeyType: aws.String("RANGE"), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughputDescription{ + ReadCapacityUnits: aws.Int64(1), + WriteCapacityUnits: aws.Int64(1), + }, + }, +} + +func fetchEnv(key string, dval string) string { + if val := os.Getenv(key); val != "" { + return val + } + return dval +} + +func newFreshDynamoClient() *DynamoClient { + params := DynamoConnectionParams{ + RegionName: "us-west-2", + LocalDynamoURL: fetchEnv("LOCAL_DYNAMO_URL", "http://127.0.0.1:8000"), + Scope: "client-test", + TableDescriptions: AllTableDescriptions, + } + c := NewDynamoDBClient(params) + c.DeleteTables() + c.CreateTables() + return c +} + +func TestTableClient(t *testing.T) { + dClient := newFreshDynamoClient() + _, err := dClient.GetClientForTable("client-test") + assert.NotNil(t, err) + _, err = dClient.GetClientForTable(testTableName) + assert.Nil(t, err) +} + +func TestCRUDItem(t *testing.T) { + dClient := newFreshDynamoClient() + tClient, _ := dClient.GetClientForTable(testTableName) + data, err := tClient.GetItem(context.Background(), "user-a", "range-a") + assert.Nil(t, data) + data, err = dynamodbattribute.MarshalMap(map[string]string{ + "user_uuid": "user-a", + "range_uuid": "range-a", + "beans": "cool", + }) + assert.Nil(t, err) + err = tClient.PutItem(context.Background(), "user-a", "range-a", data) + assert.Nil(t, err) + data, err = tClient.GetItem(context.Background(), "user-a", "range-a") + assert.NotNil(t, data) + var parsedData map[string]string + err = dynamodbattribute.UnmarshalMap(data, &parsedData) + assert.Nil(t, err) + assert.Equal(t, parsedData["beans"], "cool") +} diff --git a/dynamodb_client/dynamo.go b/dynamodb_client/dynamo.go new file mode 100644 index 00000000..497ac1dd --- /dev/null +++ b/dynamodb_client/dynamo.go @@ -0,0 +1,138 @@ +package dynamodb_client + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/remind101/pkg/logger" +) + +type DynamoClient struct { + dynamodbiface.DynamoDBAPI + TableDescriptions map[string]*dynamodb.TableDescription + ServiceName string +} + +type DynamoConnectionParams struct { + RegionName string + LocalDynamoURL string // if not "", URL of local Dynamo to point to. + Scope string // configures the table name per env + TableDescriptions map[string]*dynamodb.TableDescription + ServiceName string // Name to use in apm +} + +func NewDynamoDBClient(params DynamoConnectionParams) *DynamoClient { + if params.RegionName == "" { + params.RegionName = "us-east-1" + } + config := aws.Config{ + Region: aws.String(params.RegionName), + Endpoint: aws.String(params.LocalDynamoURL), + } + svc := dynamodb.New(session.New(), &config) + //svc.Handlers.Retry.PushFrontNamed(CheckThrottleHandler) + + return &DynamoClient{ + svc, + scopedTabledDescriptions(params.TableDescriptions, params.Scope), + params.ServiceName, + } +} + +func scopedTabledDescriptions(tds map[string]*dynamodb.TableDescription, scope string) map[string]*dynamodb.TableDescription { + scopedTds := make(map[string]*dynamodb.TableDescription, len(tds)) + for name, td := range tds { + newTd := *td + newTd.TableName = aws.String(fmt.Sprintf("%s-%s", scope, name)) + scopedTds[name] = &newTd + } + return scopedTds +} + +type DynamoKey struct { + HashKey string + RangeKey string +} + +type AwsExpression struct { + // http://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/#UpdateItemInput + ConditionExpression string + UpdateExpression string + ExpressionAttributeNames map[string]*string + ExpressionAttributeValues map[string]*dynamodb.AttributeValue +} + +type QueryOpts struct { + HashKey string + Limit int64 + Descending bool + ExpressionAttributeNames map[string]*string + ExpressionAttributeValues map[string]*dynamodb.AttributeValue + ProjectionExpression string + KeyConditionExpression string + FilterExpression string + IndexName string +} + +func (dc *DynamoClient) GetClientForTable(name string) (TableClient, error) { + if val, ok := dc.TableDescriptions[name]; ok { + return NewBaseTableClient(TableClientOpts{ + Desc: val, + Client: dc, + ServiceName: dc.ServiceName, + }) + } else { + return nil, fmt.Errorf("Table not found") + } +} + +func (dc *DynamoClient) CreateTables() error { + for tableName, _ := range dc.TableDescriptions { + tableClient, err := dc.GetClientForTable(tableName) + if err != nil { + return err + } + fmt.Printf("Creating table %s \n", tableName) + err = tableClient.Create(context.Background()) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + fmt.Printf("%s and %s\n", awsErr.Error(), awsErr.Code()) + if awsErr.Code() == "ResourceInUseException" { // table already exists + logger.Debug(context.Background(), "table", tableName, "already exists") + continue + } + } + return fmt.Errorf("Error creating table %s: %v", tableName, err) + } + logger.Debug(context.Background(), "table", tableName, "created successfully") + } + return nil +} + +func (dc *DynamoClient) DeleteTables() error { + for tableName, _ := range dc.TableDescriptions { + tableClient, err := dc.GetClientForTable(tableName) + if err != nil { + return err + } + fmt.Printf("Deleting table %s \n", tableName) + tableClient.Delete(context.Background()) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + fmt.Printf("%s and %s\n", awsErr.Error(), awsErr.Code()) + if awsErr.Code() == "ResourceInUseException" { // table doesn't exist + logger.Debug(context.Background(), "table", tableName, "doesn't exist") + continue + } + } + return fmt.Errorf("Error creating table %s: %v", tableName, err) + } + logger.Debug(context.Background(), "table", tableName, "created successfully") + } + return nil +} diff --git a/dynamodb_client/dynamo_attribute_values.go b/dynamodb_client/dynamo_attribute_values.go new file mode 100644 index 00000000..0f94e5c9 --- /dev/null +++ b/dynamodb_client/dynamo_attribute_values.go @@ -0,0 +1,48 @@ +package dynamodb_client + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +type DynamoItem map[string]*dynamodb.AttributeValue + +func NewStringAttributeValue(s string) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{S: aws.String(s)} +} + +func NewNumberAttributeValue(n string) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{N: aws.String(n)} +} + +func NewBoolAttributeValue(b bool) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{BOOL: aws.Bool(b)} +} + +func NewStringSetAttributeValue(ss []string) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{SS: aws.StringSlice(ss)} +} + +func NewListAttributeValue(l []*dynamodb.AttributeValue) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{L: l} +} + +func NewMapAttributeValue(m map[string]*dynamodb.AttributeValue) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{M: m} +} + +func NewByteAttributeValue(b []byte) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{B: b} +} + +func NewBinarySetAttributeValue(bs [][]byte) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{BS: bs} +} + +func NewNumberSetAttributeValue(ns []string) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{NS: aws.StringSlice(ns)} +} + +func NewNullAttributeValue(isNull bool) *dynamodb.AttributeValue { + return &dynamodb.AttributeValue{NULL: aws.Bool(isNull)} +} From 8339ebe68c8141bc0636cf23e85f48491ea6d8ab Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Fri, 22 Jun 2018 21:52:18 -0700 Subject: [PATCH 2/3] Remove sendWithTracing. --- dynamodb_client/base_table_client.go | 58 ++++++++-------------------- 1 file changed, 16 insertions(+), 42 deletions(-) diff --git a/dynamodb_client/base_table_client.go b/dynamodb_client/base_table_client.go index 74a223a7..0b3d9a6c 100644 --- a/dynamodb_client/base_table_client.go +++ b/dynamodb_client/base_table_client.go @@ -5,12 +5,10 @@ import ( "fmt" "strconv" - dd_opentracing "github.com/DataDog/dd-trace-go/opentracing" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" - opentracing "github.com/opentracing/opentracing-go" ) var ( @@ -96,7 +94,7 @@ func (dt *BaseTableClient) Query(ctx context.Context, opts QueryOpts) ([]map[str input := dt.buildQuery(opts) req, output := dt.client.QueryRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return output.Items, err } @@ -110,7 +108,7 @@ func (dt *BaseTableClient) PutItem(ctx context.Context, hashKey, rangeKey string } req, _ := dt.client.PutItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return err } @@ -124,7 +122,7 @@ func (dt *BaseTableClient) DeleteItem(ctx context.Context, hashKey, rangeKey str } req, _ := dt.client.DeleteItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return err } @@ -141,7 +139,7 @@ func (dt *BaseTableClient) DeleteItem(ctx context.Context, hashKey, rangeKey str // } // // req, output := dt.client.DeleteItemRequest(input) -// err := dt.sendWithTracing(ctx, req) +// err := dt.send(ctx, req) // // return err //} @@ -158,7 +156,7 @@ func (dt *BaseTableClient) DeleteItem(ctx context.Context, hashKey, rangeKey str // } // // req, output := dt.client.PutItemRequest(input) -// err := dt.sendWithTracing(ctx, req) +// err := dt.send(ctx, req) // // return err //} @@ -176,7 +174,7 @@ func (dt *BaseTableClient) DeleteItem(ctx context.Context, hashKey, rangeKey str // } // // req, output := dt.client.UpdateItemRequest(input) -// err := dt.sendWithTracing(ctx, req) +// err := dt.send(ctx, req) // // return err //} @@ -191,7 +189,7 @@ func (dt *BaseTableClient) GetItem(ctx context.Context, hashKey, rangeKey string } req, output := dt.client.GetItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) if output != nil { // Do we still want this? @@ -245,7 +243,7 @@ func (dt *BaseTableClient) Create(ctx context.Context) error { } req, _ := dt.client.CreateTableRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return err } @@ -256,7 +254,7 @@ func (dt *BaseTableClient) Delete(ctx context.Context) error { } req, _ := dt.client.DeleteTableRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return err } @@ -273,7 +271,7 @@ func (dt *BaseTableClient) Delete(ctx context.Context) error { // } // // req, _ := dt.client.UpdateTableRequest(input) -// err := dt.sendWithTracing(ctx, req) +// err := dt.send(ctx, req) // // if err != nil { // if awsErr, ok := err.(awserr.Error); ok { @@ -315,7 +313,7 @@ func (dt *BaseTableClient) BatchGetDocument(ctx context.Context, keys []DynamoKe } req, output := dt.client.BatchGetItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return output, req, err } @@ -345,7 +343,7 @@ func (dt *BaseTableClient) BatchPutDocument(ctx context.Context, keys []DynamoKe } req, output := dt.client.BatchWriteItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return output, req, err } @@ -371,7 +369,7 @@ func (dt *BaseTableClient) BatchDeleteDocument(ctx context.Context, keys []Dynam } req, output := dt.client.BatchWriteItemRequest(input) - err := dt.sendWithTracing(ctx, req) + err := dt.send(ctx, req) return output, req, err } @@ -499,31 +497,7 @@ func (dt *BaseTableClient) buildQuery(opts QueryOpts) *dynamodb.QueryInput { return qi } -func (dt *BaseTableClient) sendWithTracing(ctx context.Context, r *request.Request) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "client.request") - defer span.Finish() - r.HTTPRequest = r.HTTPRequest.WithContext(ctx) - - span.SetTag(dd_opentracing.SpanType, "db") - span.SetTag(dd_opentracing.ServiceName, dt.serviceName) - span.SetTag(dd_opentracing.ResourceName, r.Operation.Name) - span.SetTag("http.method", r.Operation.HTTPMethod) - span.SetTag("http.url", r.ClientInfo.Endpoint+r.Operation.HTTPPath) - span.SetTag("out.host", r.ClientInfo.Endpoint) - span.SetTag("aws.operation", r.Operation.Name) - span.SetTag("aws.table_name", dt.tableName) - - err := r.Send() - - span.SetTag("aws.retry_count", r.RetryCount) - - if r.HTTPResponse != nil { - span.SetTag("http.status_code", r.HTTPResponse.StatusCode) - } - - if err != nil { - span.SetTag(dd_opentracing.Error, err) - } - - return err +func (dt *BaseTableClient) send(ctx context.Context, r *request.Request) error { + r.SetContext(ctx) + return r.Send() } From 46305d4d4837b69b39c17e5ea79b127a1e999e46 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Mon, 25 Jun 2018 18:58:15 -0700 Subject: [PATCH 3/3] Allow NewDynamoDBClient to accept a client.ConfigProvider. --- dynamodb_client/base_table_client_test.go | 3 ++- dynamodb_client/dynamo.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dynamodb_client/base_table_client_test.go b/dynamodb_client/base_table_client_test.go index 91ea3c19..13b1951f 100644 --- a/dynamodb_client/base_table_client_test.go +++ b/dynamodb_client/base_table_client_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" . "github.com/remind101/pkg/dynamodb_client" @@ -57,7 +58,7 @@ func newFreshDynamoClient() *DynamoClient { Scope: "client-test", TableDescriptions: AllTableDescriptions, } - c := NewDynamoDBClient(params) + c := NewDynamoDBClient(session.New(), params) c.DeleteTables() c.CreateTables() return c diff --git a/dynamodb_client/dynamo.go b/dynamodb_client/dynamo.go index 497ac1dd..83044ec8 100644 --- a/dynamodb_client/dynamo.go +++ b/dynamodb_client/dynamo.go @@ -6,7 +6,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/remind101/pkg/logger" @@ -26,7 +26,7 @@ type DynamoConnectionParams struct { ServiceName string // Name to use in apm } -func NewDynamoDBClient(params DynamoConnectionParams) *DynamoClient { +func NewDynamoDBClient(c client.ConfigProvider, params DynamoConnectionParams) *DynamoClient { if params.RegionName == "" { params.RegionName = "us-east-1" } @@ -34,7 +34,7 @@ func NewDynamoDBClient(params DynamoConnectionParams) *DynamoClient { Region: aws.String(params.RegionName), Endpoint: aws.String(params.LocalDynamoURL), } - svc := dynamodb.New(session.New(), &config) + svc := dynamodb.New(c, &config) //svc.Handlers.Retry.PushFrontNamed(CheckThrottleHandler) return &DynamoClient{