Skip to content

Commit

Permalink
fixes #1, improves error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
anandsas committed Jul 13, 2018
1 parent 943d349 commit e171ad8
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 14 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Release v1.0.1 (2018-07-12)
===
* Fixes [#1](https://github.com/aws/aws-dax-go/issues/1)
* Improves error handling

Release v1.0.0 (2018-06-26)
===
* Initial version
* Initial version
2 changes: 1 addition & 1 deletion dax/internal/client/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"
)

func TestTaskExecutor(t *testing.T) {
func testTaskExecutor(t *testing.T) { // disabled as test is time sensitive
executor := newExecutor()

var cnt1, cnt2, cnt3 int32
Expand Down
2 changes: 1 addition & 1 deletion dax/internal/client/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func buildProjectionOrdinals(projectionExpression *string, expressionAttributeNa
terms := strings.Split(*projectionExpression, ",")
dps := make([]documentPath, 0, len(terms))
for _, t := range terms {
dp, err := buildDocumentPath(t, expressionAttributeNames)
dp, err := buildDocumentPath(strings.TrimSpace(t), expressionAttributeNames)
if err != nil {
return nil, err
}
Expand Down
30 changes: 30 additions & 0 deletions dax/internal/client/projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ func TestBuildDocumentPath(t *testing.T) {
}
}

func TestBuildProjectionOrdinals(t *testing.T) {
cases := []struct {
projectionExpression string
expressionAttributeNames map[string]*string
documentPaths []documentPath
}{
{
"#1",
map[string]*string{"#1": aws.String("a")},
[]documentPath{{[]documentPathElement{{name: "a", index: -1}}}},
},
{
"#1, #2",
map[string]*string{"#1": aws.String("a"), "#2": aws.String("b")},
[]documentPath{{[]documentPathElement{{name: "a", index: -1}}}, {[]documentPathElement{{name: "b", index: -1}}}},
},
}

for _, c := range cases {
actual, err := buildProjectionOrdinals(&c.projectionExpression, c.expressionAttributeNames)
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(c.documentPaths, actual) {
t.Errorf("expected %v, got %v for %s", c.documentPaths, actual, c.projectionExpression)
}
}

}

func TestItemBuilder(t *testing.T) {
cases := []struct {
projectionExpression string
Expand Down
18 changes: 12 additions & 6 deletions dax/internal/client/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,33 +608,39 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en
}

if err = client.auth(tube); err != nil {
client.recycleTube(tube, err)
client.pool.discard(tube)
return err
}

writer := tube.cborWriter
if err = encoder(tube.cborWriter); err != nil {
client.recycleTube(tube, err)
// Validation errors will cause pool to be discarded as there is no guarantee
// that the validation was performed before any data was written into tube
client.pool.discard(tube)
return err
}
if err := writer.Flush(); err != nil {
client.recycleTube(tube, err)
client.pool.discard(tube)
return err
}

reader := tube.cborReader
ex, err := decodeError(reader)
if err != nil { // decode or network error
client.recycleTube(tube, err)
client.pool.discard(tube)
return err
}
if ex != nil { // user or server error
client.recycleTube(tube, nil) // do not close conn
client.recycleTube(tube, ex)
return ex
}

err = decoder(reader)
client.recycleTube(tube, err)
if err != nil {
client.pool.discard(tube)
} else {
client.pool.put(tube)
}
return err
}

Expand Down
151 changes: 151 additions & 0 deletions dax/internal/client/single_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package client

import (
"errors"
"fmt"
"github.com/aws/aws-dax-go/dax/internal/cbor"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"net"
"reflect"
"runtime"
"strings"
"testing"
"time"
)

func TestExecuteErrorHandling(t *testing.T) {
cases := []struct {
conn *mockConn
enc func(writer *cbor.Writer) error
dec func(reader *cbor.Reader) error
ee error
ec map[string]int
}{
{ // write error, discard tube
&mockConn{we: errors.New("io")},
nil,
nil,
errors.New("io"),
map[string]int{"Write": 1, "Close": 1},
},
{ // encoding error, discard tube
&mockConn{},
func(writer *cbor.Writer) error { return errors.New("ser") },
nil,
errors.New("ser"),
map[string]int{"Write": 2, "SetDeadline": 1, "Close": 1},
},
{ // read error, discard tube
&mockConn{re: errors.New("IO")},
func(writer *cbor.Writer) error { return nil },
nil,
errors.New("IO"),
map[string]int{"Write": 2, "Read": 1, "SetDeadline": 1, "Close": 1},
},
{ // serialization error, discard tube
&mockConn{rd: []byte{cbor.NegInt}},
func(writer *cbor.Writer) error { return nil },
nil,
awserr.New(request.ErrCodeSerialization, fmt.Sprintf("cbor: expected major type %d, got %d", cbor.Array, cbor.NegInt), nil),
map[string]int{"Write": 2, "Read": 1, "SetDeadline": 1, "Close": 1},
},
{ // decode error, discard tube
&mockConn{rd: []byte{cbor.Array + 0}},
func(writer *cbor.Writer) error { return nil },
func(reader *cbor.Reader) error { return errors.New("IO") },
errors.New("IO"),
map[string]int{"Write": 2, "Read": 1, "SetDeadline": 1, "Close": 1},
},
{ // dax error, do not discard tube
&mockConn{rd: []byte{cbor.Array + 3, cbor.PosInt + 4, cbor.PosInt + 0, cbor.PosInt + 0, cbor.Utf, cbor.Nil}},
func(writer *cbor.Writer) error { return nil },
nil,
newDaxRequestFailure([]int{4, 0, 0}, ErrCodeUnknown, "", "", 400),
map[string]int{"Write": 2, "Read": 1, "SetDeadline": 1},
},
{ // no error, do not discard tube
&mockConn{rd: []byte{cbor.Array + 0}},
func(writer *cbor.Writer) error { return nil },
func(reader *cbor.Reader) error { return nil },
nil,
map[string]int{"Write": 2, "Read": 1, "SetDeadline": 1},
},
}

for _, c := range cases {
cli, err := newSingleClientWithOptions(":9121", "us-west-2", credentials.NewStaticCredentials("id", "secret", "tok"), 1)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
cli.pool.connectFn = func(a, n string) (net.Conn, error) {
return c.conn, nil
}
cli.pool.closeTubeImmediately = true

err = cli.executeWithContext(aws.BackgroundContext(), OpGetItem, c.enc, c.dec)
if !reflect.DeepEqual(c.ee, err) {
t.Errorf("expected error %v, got error %v", c.ee, err)
}
if !reflect.DeepEqual(c.ec, c.conn.cc) {
t.Errorf("expected %v calls, got %v", c.ec, c.conn.cc)
}
cli.Close()
}
}

type mockConn struct {
net.Conn
we, re error
wd, rd []byte
cc map[string]int
}

func (m *mockConn) Read(b []byte) (n int, err error) {
m.register()
if m.re != nil {
return 0, m.re
}
if len(m.rd) > 0 {
l := copy(b, m.rd)
m.rd = m.rd[l:]
return l, nil
}
return 0, nil
}

func (m *mockConn) Write(b []byte) (n int, err error) {
m.register()
if m.we != nil {
return 0, m.we
}
if len(m.wd) > 0 {
l := copy(m.wd, b)
m.wd = m.wd[l:]
return l, nil
}
return len(b), nil
}

func (m *mockConn) Close() error {
m.register()
return nil
}

func (m *mockConn) SetDeadline(t time.Time) error {
m.register()
return nil
}

func (m *mockConn) register() {
pc, _, _, _ := runtime.Caller(1)
fn := runtime.FuncForPC(pc)
s := strings.Split(fn.Name(), ".")
n := s[len(s)-1]
if m.cc == nil {
m.cc = make(map[string]int)
}
m.cc[n]++
}
15 changes: 10 additions & 5 deletions dax/internal/client/tubepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
const network = "tcp"

type tubePool struct {
address string
gate gate
errCh chan error
timeout time.Duration
connectFn func(string, string) (net.Conn, error)
address string
gate gate
errCh chan error
timeout time.Duration
connectFn func(string, string) (net.Conn, error)
closeTubeImmediately bool

mutex sync.Mutex
closed bool // protected by mutex
Expand Down Expand Up @@ -188,6 +189,10 @@ func (p *tubePool) discard(tube *tube) {
if tube == nil {
return
}
if p.closeTubeImmediately {
tube.Close()
return
}
go func() {
tube.Close()
}()
Expand Down

0 comments on commit e171ad8

Please sign in to comment.