Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Dec 12, 2023
1 parent d573de5 commit 66f3e58
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 13 deletions.
2 changes: 1 addition & 1 deletion codec/closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ type Closer interface {

type nullCloser struct{}

func (n *nullCloser) Close() error {}
func (n *nullCloser) Close() error { return nil }

var NoopCloser = new(nullCloser)
15 changes: 13 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type cbContext struct {
// tracking statistics for the output topic
trackOutputStats func(ctx context.Context, topic string, size int)

deferreds []func()

msg *message
done bool
counters struct {
Expand Down Expand Up @@ -345,10 +347,13 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
return nil, fmt.Errorf("Cannot access state in stateless processor")
}

data, err := ctx.table.Get(key)
data, closer, err := ctx.table.Get(key)
if err != nil {
return nil, fmt.Errorf("error reading value: %v", err)
} else if data == nil {
}
ctx.deferreds = append(ctx.deferreds, func() { _ = closer.Close() })

if data == nil {
return nil, nil
}

Expand Down Expand Up @@ -450,6 +455,12 @@ func (ctx *cbContext) tryCommit(err error) {
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(fmt.Errorf("could not commit message with key '%s': %w", ctx.Key(), ctx.errors.ErrorOrNil()))
} else {

// execute deferred commit functions in reverse order
for i := len(ctx.deferreds) - 1; i >= 0; i-- {
ctx.deferreds[i]()
}

ctx.commit()
}

Expand Down
3 changes: 2 additions & 1 deletion examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
5 changes: 3 additions & 2 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocker
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand Down Expand Up @@ -30,7 +31,7 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Expand All @@ -49,7 +50,7 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
3 changes: 2 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand All @@ -29,7 +30,7 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
3 changes: 2 additions & 1 deletion examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package detector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
5 changes: 3 additions & 2 deletions examples/3-messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package messaging

import (
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand Down Expand Up @@ -29,7 +30,7 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
Expand All @@ -46,7 +47,7 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
3 changes: 2 additions & 1 deletion examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
3 changes: 2 additions & 1 deletion examples/7-redis/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"io"

"github.com/lovoo/goka/codec"
)
Expand All @@ -21,7 +22,7 @@ func (c *Codec) Decode(data []byte) (interface{}, error) {
return event, err
}

func (c *Codec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (c *Codec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
3 changes: 2 additions & 1 deletion examples/8-monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}
Expand Down
1 change: 1 addition & 0 deletions mockstorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 66f3e58

Please sign in to comment.