Skip to content

Commit

Permalink
wip: implement pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Dec 12, 2023
1 parent 2f90eeb commit d573de5
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 305 deletions.
9 changes: 9 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package goka

import "io"

// Codec decodes and encodes from and to []byte
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
DecodeP(data []byte) (value interface{}, closer io.Closer, err error)
}

type FuncCloser func() error

func (f FuncCloser) Close() error {
return f()
}
11 changes: 11 additions & 0 deletions codec/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package codec

type Closer interface {
Close()
}

type nullCloser struct{}

func (n *nullCloser) Close() error {}

var NoopCloser = new(nullCloser)
16 changes: 16 additions & 0 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package codec

import (
"fmt"
"io"
"strconv"
)

Expand All @@ -23,6 +24,11 @@ func (d *Bytes) Decode(data []byte) (interface{}, error) {
return data, nil
}

// Decode of defaultCodec simply returns the data
func (d *Bytes) DecodeP(data []byte) (interface{}, io.Closer, error) {
return data, NoopCloser, nil
}

// String is a commonly used codec to encode and decode string <-> []byte
type String struct{}

Expand All @@ -40,6 +46,10 @@ func (c *String) Decode(data []byte) (interface{}, error) {
return string(data), nil
}

func (c *String) DecodeP(data []byte) (interface{}, io.Closer, error) {
return string(data), NoopCloser, nil
}

// Int64 is a commonly used codec to encode and decode string <-> []byte
type Int64 struct{}

Expand All @@ -60,3 +70,9 @@ func (c *Int64) Decode(data []byte) (interface{}, error) {
}
return intVal, nil
}

func (c *Int64) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)

return dec, NoopCloser, err
}
5 changes: 5 additions & 0 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down
12 changes: 12 additions & 0 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package blocker
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -28,6 +30,11 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type BlockValue struct {
Blocked bool
}
Expand All @@ -42,6 +49,11 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
Expand Down
9 changes: 8 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package collector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -27,6 +29,11 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func collect(ctx goka.Context, msg interface{}) {
var ml []messaging.Message
if v := ctx.Value(); v != nil {
Expand Down
13 changes: 9 additions & 4 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package detector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand All @@ -14,9 +16,7 @@ const (
maxRate = 0.5
)

var (
group goka.Group = "detector"
)
var group goka.Group = "detector"

type Counters struct {
Sent int
Expand All @@ -34,6 +34,11 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func getValue(ctx goka.Context) *Counters {
if v := ctx.Value(); v != nil {
return v.(*Counters)
Expand Down
11 changes: 11 additions & 0 deletions examples/3-messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Expand All @@ -28,6 +29,11 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type MessageListCodec struct{}

func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) {
Expand All @@ -39,3 +45,8 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, &m)
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
15 changes: 9 additions & 6 deletions examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter(ctx context.Context) (rerr error) {
emitterA, err := goka.NewEmitter(brokers, inputA, new(codec.String))
if err != nil {
Expand Down Expand Up @@ -118,8 +123,8 @@ func process(ctx goka.Context, msg interface{}) {
func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
groupInitialized chan struct{}) error {

groupInitialized chan struct{},
) error {
tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
Expand Down Expand Up @@ -161,8 +166,8 @@ func runView(ctx context.Context,
errg *multierr.ErrGroup,
root *mux.Router,
monitor *monitor.Server,
groupInitialized chan struct{}) error {

groupInitialized chan struct{},
) error {
<-groupInitialized

view, err := goka.NewView(brokers,
Expand All @@ -183,7 +188,6 @@ func runView(ctx context.Context,

server := &http.Server{Addr: ":0", Handler: root}
errg.Go(func() error {

root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
value, _ := view.Get(mux.Vars(r)["key"])
data, _ := json.Marshal(value)
Expand Down Expand Up @@ -225,7 +229,6 @@ func pprofInit(root *mux.Router) {
}

func main() {

cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Version = sarama.V2_4_0_0
Expand Down
11 changes: 10 additions & 1 deletion examples/7-redis/codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import "encoding/json"
import (
"encoding/json"

"github.com/lovoo/goka/codec"
)

type Codec struct{}

Expand All @@ -16,3 +20,8 @@ func (c *Codec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, event)
return event, err
}

func (c *Codec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
19 changes: 11 additions & 8 deletions examples/8-monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter(ctx context.Context) (rerr error) {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down Expand Up @@ -103,12 +108,13 @@ func process(ctx goka.Context, msg interface{}) {
ctx.SetValue(u)
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error {
g := goka.DefineGroup(group+"-stateless",
goka.Input(topic,
new(codec.String),
func(ctx goka.Context, msg interface{}) {
//ignored
// ignored
}),
)
p, err := goka.NewProcessor(brokers, g)
Expand All @@ -124,8 +130,8 @@ func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error {

func runJoinProcessor(ctx context.Context,
monitor *monitor.Server,
joinGroupInitialized chan struct{}) error {

joinGroupInitialized chan struct{},
) error {
g := goka.DefineGroup(joinGroup,
goka.Input(topic,
new(codec.String),
Expand Down Expand Up @@ -160,11 +166,10 @@ func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
actions *actions.Server,
joinGroupInitialized chan struct{}) error {

joinGroupInitialized chan struct{},
) error {
// helper function that waits the configured number of times
waitVisitor := func(ctx goka.Context, value interface{}) {

waitTime, ok := value.(int64)
if !ok {
return
Expand Down Expand Up @@ -245,7 +250,6 @@ func runView(errg *multierr.ErrGroup, ctx context.Context, root *mux.Router, mon
server := &http.Server{Addr: ":9095", Handler: root}

errg.Go(func() error {

root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
value, _ := view.Get(mux.Vars(r)["key"])
data, _ := json.Marshal(value)
Expand Down Expand Up @@ -287,7 +291,6 @@ func pprofInit(root *mux.Router) {
}

func main() {

cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Version = sarama.V2_4_0_0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/lovoo/goka
go 1.20

require (
github.com/IBM/sarama v1.41.3
github.com/IBM/sarama v1.41.2
github.com/go-stack/stack v1.8.1
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c=
github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
5 changes: 5 additions & 0 deletions integrationtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integrationtest
import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"
Expand All @@ -22,6 +23,10 @@ func (fc *failingDecode) Decode(_ []byte) (interface{}, error) {
return nil, fmt.Errorf("Error decoding")
}

func (fc *failingDecode) DecodeP(_ []byte) (interface{}, io.Closer, error) {
return nil, codec.NoopCloser, fmt.Errorf("Error decoding")
}

func (fc *failingDecode) Encode(msg interface{}) ([]byte, error) {
return fc.codec.Encode(msg)
}
Expand Down
4 changes: 1 addition & 3 deletions mockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"github.com/lovoo/goka/storage"
)

var (
errProducerBuilder error = errors.New("building producer failed on purpose")
)
var errProducerBuilder error = errors.New("building producer failed on purpose")

type builderMock struct {
ctrl *gomock.Controller
Expand Down
Loading

0 comments on commit d573de5

Please sign in to comment.