Skip to content

Commit

Permalink
fix: database error not correctly propagated (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcbinz authored Sep 19, 2023
1 parent ae1a20d commit baae299
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
2 changes: 1 addition & 1 deletion methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (c *Client) send(ctx context.Context, req request) (_ []byte, err error) {
return nil, fmt.Errorf("channel closed")
}

return res, nil
return res.data, res.err
}
}

Expand Down
12 changes: 6 additions & 6 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ func (c *Client) handleMessage(data []byte) {

c.logger.DebugContext(c.connCtx, "Received message.", "res", res)

if res.Error != nil {
c.logger.ErrorContext(c.connCtx, "Received error response.", "error", res.Error)
return
}

if res.ID == "" {
c.handleLiveQuery(res)
return
Expand All @@ -131,9 +126,14 @@ func (c *Client) handleResult(res *response) {
return
}

var err error
if res.Error != nil {
err = fmt.Errorf("(%d) %s", res.Error.Code, res.Error.Message)
}

select {

case outCh <- res.Result:
case outCh <- &output{data: res.Result, err: err}:
return

case <-c.connCtx.Done():
Expand Down
17 changes: 11 additions & 6 deletions stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,42 @@ func (p *bufPool) Put(b *bytes.Buffer) {
// -- REQUESTS
//

type output struct {
data []byte
err error
}

type requests struct {
store sync.Map
}

func (r *requests) prepare() (string, <-chan []byte) {
func (r *requests) prepare() (string, <-chan *output) {
key := uuid.New()
ch := make(chan []byte)
ch := make(chan *output)

r.store.Store(key.String(), ch)

return key.String(), ch
}

func (r *requests) get(key string) (chan<- []byte, bool) {
func (r *requests) get(key string) (chan<- *output, bool) {
val, ok := r.store.Load(key)
if !ok {
return nil, false
}

return val.(chan []byte), true
return val.(chan *output), true
}

func (r *requests) cleanup(key string) {
if ch, ok := r.store.LoadAndDelete(key); ok {
close(ch.(chan []byte))
close(ch.(chan *output))
}
}

func (r *requests) reset() {
r.store.Range(func(key, ch any) bool {
close(ch.(chan []byte))
close(ch.(chan *output))
r.store.Delete(key)
return true
})
Expand Down

0 comments on commit baae299

Please sign in to comment.