Skip to content

Commit

Permalink
feat: implementation of events stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 28, 2023
1 parent a56f44c commit 3457c50
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 44 deletions.
41 changes: 31 additions & 10 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ import (
"github.com/pkg/errors"
"github.com/tochemey/ego/egopb"
"github.com/tochemey/ego/eventstore"
"github.com/tochemey/ego/eventstream"
"github.com/tochemey/ego/internal/telemetry"
"github.com/tochemey/goakt/actors"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
eventsTopic = "topic.events.%d"
)

// actor is an event sourced based actor
type actor[T State] struct {
EntityBehavior[T]
Expand All @@ -49,28 +55,30 @@ type actor[T State] struct {
eventsCounter *atomic.Uint64
lastCommandTime time.Time
mu sync.RWMutex
eventsStream eventstream.Stream
}

// enforce compilation error
var _ actors.Actor = &actor[State]{}

// newActor creates an instance of actor provided the eventSourcedHandler and the events store
func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore) *actor[T] {
func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore, eventsStream eventstream.Stream) *actor[T] {
// create an instance of entity and return it
return &actor[T]{
eventsStore: eventsStore,
EntityBehavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
eventsStream: eventsStream,
}
}

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *actor[T]) PreStart(ctx context.Context) error {
// add a span context
//ctx, span := telemetry.SpanContext(ctx, "PreStart")
//defer span.End()
ctx, span := telemetry.SpanContext(ctx, "PreStart")
defer span.End()
// acquire the lock
entity.mu.Lock()
// release lock when done
Expand All @@ -82,7 +90,7 @@ func (entity *actor[T]) PreStart(ctx context.Context) error {
}

// call the connect method of the journal store
if err := entity.eventsStore.Connect(ctx); err != nil {
if err := entity.eventsStore.Ping(ctx); err != nil {
return fmt.Errorf("failed to connect to the events store: %v", err)
}

Expand Down Expand Up @@ -124,10 +132,6 @@ func (entity *actor[T]) PostStop(ctx context.Context) error {
// release lock when done
defer entity.mu.Unlock()

// disconnect the journal
if err := entity.eventsStore.Disconnect(ctx); err != nil {
return fmt.Errorf("failed to disconnect the events store: %v", err)
}
return nil
}

Expand Down Expand Up @@ -274,8 +278,25 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
// create a journal list
journals := []*egopb.Event{envelope}

// TODO persist the event in batch using a child actor
if err := entity.eventsStore.WriteEvents(goCtx, journals); err != nil {
// define the topic for the given shard
topic := fmt.Sprintf(eventsTopic, shardNumber)

// publish to the event stream and persist the event to the events store
eg, goCtx := errgroup.WithContext(goCtx)

// publish the message to the topic
eg.Go(func() error {
entity.eventsStream.Publish(topic, envelope)
return nil
})

// persist the event to the events store
eg.Go(func() error {
return entity.eventsStore.WriteEvents(goCtx, journals)
})

// handle the persistence error
if err := eg.Wait(); err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
Expand Down
69 changes: 64 additions & 5 deletions actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tochemey/ego/egopb"
"github.com/tochemey/ego/eventstore/memory"
pgeventstore "github.com/tochemey/ego/eventstore/postgres"
"github.com/tochemey/ego/eventstream"
testpb "github.com/tochemey/ego/test/data/pb/v1"
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/log"
Expand Down Expand Up @@ -64,8 +65,15 @@ func TestActor(t *testing.T) {
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of events stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore)
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -123,6 +131,12 @@ func TestActor(t *testing.T) {
}
assert.True(t, proto.Equal(expected, resultingState))

// disconnect the events store
err = eventStore.Disconnect(ctx)
require.NoError(t, err)

// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -150,8 +164,15 @@ func TestActor(t *testing.T) {
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of events stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -198,6 +219,10 @@ func TestActor(t *testing.T) {
errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
assert.Equal(t, "command sent to the wrong entity", errorReply.ErrorReply.GetMessage())

// disconnect the event store
require.NoError(t, eventStore.Disconnect(ctx))
// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -225,8 +250,15 @@ func TestActor(t *testing.T) {
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of events stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
Expand All @@ -243,6 +275,12 @@ func TestActor(t *testing.T) {

errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
assert.Equal(t, "unhandled command", errorReply.ErrorReply.GetMessage())

// disconnect from the event store
require.NoError(t, eventStore.Disconnect(ctx))

// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -292,8 +330,15 @@ func TestActor(t *testing.T) {
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of event stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -379,7 +424,10 @@ func TestActor(t *testing.T) {

// free resources
assert.NoError(t, schemaUtils.DropTable(ctx))
assert.NoError(t, eventStore.Disconnect(ctx))
testContainer.Cleanup()
// close the stream
eventStream.Close()
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
Expand All @@ -405,8 +453,15 @@ func TestActor(t *testing.T) {
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of event stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore)
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -487,6 +542,10 @@ func TestActor(t *testing.T) {
}
assert.True(t, proto.Equal(expected, resultingState))

// disconnect from the event store
assert.NoError(t, eventStore.Disconnect(ctx))
// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
Expand Down
30 changes: 30 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ package ego

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/tochemey/ego/eventstore"
"github.com/tochemey/ego/eventstream"
egotel "github.com/tochemey/ego/internal/telemetry"
"github.com/tochemey/ego/projection"
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/discovery"
Expand All @@ -49,6 +52,7 @@ type Engine struct {
partitionsCount uint64 // partitionsCount specifies the number of partitions
started atomic.Bool

eventStream eventstream.Stream
// define the list of projections
projections []*Projection
projectionRunners []*projection.Runner
Expand All @@ -64,6 +68,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
logger: log.DefaultLogger,
telemetry: telemetry.New(),
projections: make([]*Projection, 0),
eventStream: eventstream.New(),
}
// apply the various options
for _, opt := range opts {
Expand Down Expand Up @@ -144,6 +149,8 @@ func (x *Engine) Start(ctx context.Context) error {
func (x *Engine) Stop(ctx context.Context) error {
// set the started to false
x.started.Store(false)
// close the event stream
x.eventStream.Close()
// stop the projections
if len(x.projectionRunners) > 0 {
// simply iterate the list of projections and start them
Expand All @@ -159,3 +166,26 @@ func (x *Engine) Stop(ctx context.Context) error {
// stop the actor system and return the possible error
return x.actorSystem.Stop(ctx)
}

// Subscribe creates an events subscriber
func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error) {
// add a span context
ctx, span := egotel.SpanContext(ctx, "Subscribe")
defer span.End()

// first check whether the ego engine has started or not
if !x.started.Load() {
return nil, errors.New("eGo engine has not started")
}
// create the subscriber
subscriber := x.eventStream.AddSubscriber()
// subscribe to all the topics
for i := 0; i < int(x.partitionsCount); i++ {
// create the topic
topic := fmt.Sprintf(eventsTopic, i)
// subscribe to the topic
x.eventStream.Subscribe(subscriber, topic)
}
// return the subscriber
return subscriber, nil
}
24 changes: 24 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tochemey/ego/egopb"
"github.com/tochemey/ego/eventstore/memory"
samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1"
"github.com/tochemey/goakt/discovery"
Expand All @@ -46,6 +47,7 @@ func TestEgo(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
require.NoError(t, eventStore.Connect(ctx))

nodePorts := dynaport.Get(3)
gossipPort := nodePorts[0]
Expand Down Expand Up @@ -87,6 +89,11 @@ func TestEgo(t *testing.T) {
// wait for the cluster to fully start
time.Sleep(time.Second)

// subscribe to events
subscriber, err := e.Subscribe(ctx)
require.NoError(t, err)
require.NotNil(t, subscriber)

require.NoError(t, err)
// create a persistence id
entityID := uuid.NewString()
Expand Down Expand Up @@ -126,13 +133,29 @@ func TestEgo(t *testing.T) {
assert.Equal(t, entityID, newState.GetAccountId())
assert.EqualValues(t, 2, revision)

for message := range subscriber.Iterator() {
payload := message.Payload()
envelope, ok := payload.(*egopb.Event)
event := envelope.GetEvent()
require.True(t, ok)
switch envelope.GetSequenceNumber() {
case 1:
assert.True(t, event.MessageIs(new(samplepb.AccountCreated)))
case 2:
assert.True(t, event.MessageIs(new(samplepb.AccountCredited)))
}
}

// free resources
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, e.Stop(ctx))
})
t.Run("With no cluster enabled", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
// connect to the event store
require.NoError(t, eventStore.Connect(ctx))
// create the ego engine
e := NewEngine("Sample", eventStore)
// start ego engine
Expand Down Expand Up @@ -173,6 +196,7 @@ func TestEgo(t *testing.T) {
assert.EqualValues(t, 2, revision)

// free resources
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, e.Stop(ctx))
})
}
Expand Down
2 changes: 1 addition & 1 deletion entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewEntity[T State](ctx context.Context, behavior EntityBehavior[T], engine
}

// create the instance of the actor
pid, err := engine.actorSystem.Spawn(ctx, behavior.ID(), newActor(behavior, engine.eventsStore))
pid, err := engine.actorSystem.Spawn(ctx, behavior.ID(), newActor(behavior, engine.eventsStore, engine.eventStream))
// return the error in case there is one
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 3457c50

Please sign in to comment.