Skip to content

Commit

Permalink
feat: support event ack
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Nov 10, 2024
1 parent 5c9d1d2 commit 9486f5f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 40 deletions.
1 change: 0 additions & 1 deletion fs/contube/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMa
if !ok {
return
}
event.Commit()
c <- event
}
}
Expand Down
36 changes: 24 additions & 12 deletions fs/runtime/external/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,12 @@ func (f *functionServerImpl) RegisterSchema(ctx context.Context,
return &model.RegisterSchemaResponse{}, nil
}

func (f *functionServerImpl) Read(ctx context.Context, request *model.ReadRequest) (*model.Event, error) {
func (f *functionServerImpl) Read(ctx context.Context, _ *model.ReadRequest) (*model.Event, error) {
r, err := f.getFunctionRuntime(ctx)
if err != nil {
return nil, err
}
select {
case e := <-r.inputCh:
return &model.Event{
Payload: e.GetPayload(),
}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
return r.ReadRecord(ctx)
}

func (f *functionServerImpl) Write(ctx context.Context, event *model.Event) (*model.WriteResponse, error) {
Expand Down Expand Up @@ -182,9 +175,10 @@ func (f *Factory) NewFunctionRuntime(instance api.FunctionInstance,
_ *funcModel.RuntimeConfig) (api.FunctionRuntime, error) {
def := instance.Definition()
r := &runtime{
inputCh: make(chan contube.Record),
funcCtx: instance.FunctionContext(),
log: instance.Logger(),
inputCh: make(chan contube.Record),
funcCtx: instance.FunctionContext(),
log: instance.Logger(),
recordsMap: make(map[int64]contube.Record),
}
f.server.runtimeMaps.Store(common.GetNamespacedName(def.Namespace, def.Name).String(), r)
f.log.Info("Creating new function runtime", "function", common.GetNamespacedName(def.Namespace, def.Name))
Expand Down Expand Up @@ -240,6 +234,7 @@ type runtime struct {
log *common.Logger

recordsMapMu sync.Mutex
recordIndex int64
recordsMap map[int64]contube.Record
}

Expand All @@ -251,6 +246,23 @@ func (r *runtime) Call(e contube.Record) (contube.Record, error) {
func (r *runtime) Stop() {
}

func (r *runtime) ReadRecord(ctx context.Context) (*model.Event, error) {
select {
case e := <-r.inputCh:
r.recordsMapMu.Lock()
defer r.recordsMapMu.Unlock()
eventId := r.recordIndex
r.recordIndex++
r.recordsMap[eventId] = e
return &model.Event{
Id: eventId,
Payload: e.GetPayload(),
}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

// Ack acknowledges the processing of a record
// This is an idempotent operation
func (r *runtime) Ack(id int64) {
Expand Down
77 changes: 50 additions & 27 deletions fs/runtime/external/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"fmt"
"net"
"os"
"sync/atomic"
"testing"
"time"

"github.com/functionstream/function-stream/fs/statestore"

Expand Down Expand Up @@ -98,11 +98,30 @@ func (f *TestSource) Handle(_ gofs.FunctionContext, emit func(context.Context, g
return nil
}

func runMockClient() {
type TestModules struct {
testFunction *TestFunction
testCounter *TestCounterFunction
testSource *TestSource
testSink *TestSink
}

func NewTestModules() *TestModules {
return &TestModules{
testFunction: &TestFunction{},
testCounter: &TestCounterFunction{},
testSource: &TestSource{},
testSink: &TestSink{
sinkCh: make(chan Counter),
},
}
}

func (t *TestModules) Run() {
err := gofs.NewFSClient().
Register(gofs.DefaultModule, gofs.WithFunction(&TestFunction{})).
Register("counter", gofs.WithFunction(&TestCounterFunction{})).
Register("test-source", gofs.WithSource(&TestSource{})).
Register(gofs.DefaultModule, gofs.WithFunction(t.testFunction)).
Register("counter", gofs.WithFunction(t.testCounter)).
Register("test-source", gofs.WithSource(t.testSource)).
Register("test-sink", gofs.WithSink(t.testSink)).
Run()
if err != nil {
log.Error(err, "failed to run mock client")
Expand All @@ -129,7 +148,7 @@ func TestExternalRuntime(t *testing.T) {
t.Fatal(err)
}

go runMockClient()
go NewTestModules().Run()

inputTopic := "input"
outputTopic := "output"
Expand Down Expand Up @@ -159,13 +178,13 @@ func TestExternalRuntime(t *testing.T) {
err = fm.StartFunction(f)
assert.NoError(t, err)

var acked atomic.Bool
acked := make(chan struct{})

event, err := contube.NewStructRecord(&Person{
Name: "test",
Money: 1,
}, func() {
acked.Store(true)
acked <- struct{}{}
})
assert.NoError(t, err)
err = fm.ProduceEvent(inputTopic, event)
Expand All @@ -178,7 +197,11 @@ func TestExternalRuntime(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, p.Money)

assert.True(t, acked.Load())
select {
case <-acked:
case <-time.After(5 * time.Second):
t.Fatal("failed to ack event")
}

err = fm.DeleteFunction("", f.Name)
assert.NoError(t, err)
Expand Down Expand Up @@ -233,7 +256,7 @@ func TestNonDefaultModule(t *testing.T) {
err = fm.StartFunction(f)
assert.NoError(t, err)

go runMockClient()
go NewTestModules().Run()

event, err := contube.NewStructRecord(&Counter{
Count: 1,
Expand Down Expand Up @@ -298,7 +321,7 @@ func TestExternalSourceModule(t *testing.T) {
err = fm.StartFunction(f)
assert.NoError(t, err)

go runMockClient()
go NewTestModules().Run()

for i := 0; i < 10; i++ {
output, err := fm.ConsumeEvent(outputTopic)
Expand All @@ -322,15 +345,9 @@ func (f *TestSink) Init(_ gofs.FunctionContext) error {
return nil
}

func (f *TestSink) Handle(_ gofs.FunctionContext, event gofs.Event[Counter]) error {
func (f *TestSink) Handle(ctx gofs.FunctionContext, event gofs.Event[Counter]) error {
f.sinkCh <- *event.Data()
return nil
}

func newTestSink() *TestSink {
return &TestSink{
sinkCh: make(chan Counter),
}
return event.Ack(ctx)
}

func TestExternalSinkModule(t *testing.T) {
Expand Down Expand Up @@ -379,25 +396,31 @@ func TestExternalSinkModule(t *testing.T) {
err = fm.StartFunction(f)
assert.NoError(t, err)

sinkMod := newTestSink()
testMods := NewTestModules()
sinkMod := testMods.testSink

go func() {
err := gofs.NewFSClient().Register("test-sink", gofs.WithSink(sinkMod)).Run()
if err != nil {
log.Error(err, "failed to run mock client")
}
}()
go testMods.Run()

ackCh := make(chan struct{}, 100)

event, err := contube.NewStructRecord(&Counter{
Count: 1,
}, func() {})
}, func() {
ackCh <- struct{}{}
})
assert.NoError(t, err)
err = fm.ProduceEvent(inputTopic, event)
assert.NoError(t, err)

r := <-sinkMod.sinkCh
assert.Equal(t, 1, r.Count)

select {
case <-ackCh:
case <-time.After(5 * time.Second):
t.Fatal("failed to ack event")
}

err = fm.DeleteFunction("", f.Name)
assert.NoError(t, err)
}
Expand Down

0 comments on commit 9486f5f

Please sign in to comment.