From 843141fed17d50e795bbf3a3b5efe16e5cd5fbd1 Mon Sep 17 00:00:00 2001 From: Arsene Date: Sat, 28 Oct 2023 20:45:42 +0000 Subject: [PATCH] feat: implementation of events stream (#49) --- actor.go | 41 ++++++--- actor_test.go | 69 ++++++++++++-- engine.go | 30 +++++++ engine_test.go | 24 +++++ entity.go | 2 +- eventstream/message.go | 47 ++++++++++ eventstream/stream.go | 180 +++++++++++++++++++++++++++++++++++++ eventstream/stream_test.go | 144 +++++++++++++++++++++++++++++ eventstream/subscriber.go | 160 +++++++++++++++++++++++++++++++++ example/main.go | 4 + go.sum | 4 - projection/runner.go | 28 ++---- projection/runner_test.go | 34 +++++-- readme.md | 1 + 14 files changed, 722 insertions(+), 46 deletions(-) create mode 100644 eventstream/message.go create mode 100644 eventstream/stream.go create mode 100644 eventstream/stream_test.go create mode 100644 eventstream/subscriber.go diff --git a/actor.go b/actor.go index 412ec82..ecf98e5 100644 --- a/actor.go +++ b/actor.go @@ -31,13 +31,19 @@ import ( "github.com/pkg/errors" "github.com/tochemey/ego/egopb" "github.com/tochemey/ego/eventstore" + "github.com/tochemey/ego/eventstream" "github.com/tochemey/ego/internal/telemetry" "github.com/tochemey/goakt/actors" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" ) +var ( + eventsTopic = "topic.events.%d" +) + // actor is an event sourced based actor type actor[T State] struct { EntityBehavior[T] @@ -49,19 +55,21 @@ type actor[T State] struct { eventsCounter *atomic.Uint64 lastCommandTime time.Time mu sync.RWMutex + eventsStream eventstream.Stream } // enforce compilation error var _ actors.Actor = &actor[State]{} // newActor creates an instance of actor provided the eventSourcedHandler and the events store -func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore) *actor[T] { +func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore, eventsStream eventstream.Stream) *actor[T] { // create an instance of entity and return it return &actor[T]{ eventsStore: eventsStore, EntityBehavior: behavior, eventsCounter: atomic.NewUint64(0), mu: sync.RWMutex{}, + eventsStream: eventsStream, } } @@ -69,8 +77,8 @@ func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.Events // At this stage we connect to the various stores func (entity *actor[T]) PreStart(ctx context.Context) error { // add a span context - //ctx, span := telemetry.SpanContext(ctx, "PreStart") - //defer span.End() + ctx, span := telemetry.SpanContext(ctx, "PreStart") + defer span.End() // acquire the lock entity.mu.Lock() // release lock when done @@ -82,7 +90,7 @@ func (entity *actor[T]) PreStart(ctx context.Context) error { } // call the connect method of the journal store - if err := entity.eventsStore.Connect(ctx); err != nil { + if err := entity.eventsStore.Ping(ctx); err != nil { return fmt.Errorf("failed to connect to the events store: %v", err) } @@ -124,10 +132,6 @@ func (entity *actor[T]) PostStop(ctx context.Context) error { // release lock when done defer entity.mu.Unlock() - // disconnect the journal - if err := entity.eventsStore.Disconnect(ctx); err != nil { - return fmt.Errorf("failed to disconnect the events store: %v", err) - } return nil } @@ -274,8 +278,25 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman // create a journal list journals := []*egopb.Event{envelope} - // TODO persist the event in batch using a child actor - if err := entity.eventsStore.WriteEvents(goCtx, journals); err != nil { + // define the topic for the given shard + topic := fmt.Sprintf(eventsTopic, shardNumber) + + // publish to the event stream and persist the event to the events store + eg, goCtx := errgroup.WithContext(goCtx) + + // publish the message to the topic + eg.Go(func() error { + entity.eventsStream.Publish(topic, envelope) + return nil + }) + + // persist the event to the events store + eg.Go(func() error { + return entity.eventsStore.WriteEvents(goCtx, journals) + }) + + // handle the persistence error + if err := eg.Wait(); err != nil { // send an error reply entity.sendErrorReply(ctx, err) return diff --git a/actor_test.go b/actor_test.go index 066a802..9ea9a99 100644 --- a/actor_test.go +++ b/actor_test.go @@ -33,6 +33,7 @@ import ( "github.com/tochemey/ego/egopb" "github.com/tochemey/ego/eventstore/memory" pgeventstore "github.com/tochemey/ego/eventstore/postgres" + "github.com/tochemey/ego/eventstream" testpb "github.com/tochemey/ego/test/data/pb/v1" "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/log" @@ -64,8 +65,15 @@ func TestActor(t *testing.T) { // create the persistence behavior behavior := NewAccountEntityBehavior(persistenceID) + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of events stream + eventStream := eventstream.New() + // create the persistence actor using the behavior previously created - actor := newActor[*testpb.Account](behavior, eventStore) + actor := newActor[*testpb.Account](behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) @@ -123,6 +131,12 @@ func TestActor(t *testing.T) { } assert.True(t, proto.Equal(expected, resultingState)) + // disconnect the events store + err = eventStore.Disconnect(ctx) + require.NoError(t, err) + + // close the stream + eventStream.Close() // stop the actor system err = actorSystem.Stop(ctx) assert.NoError(t, err) @@ -150,8 +164,15 @@ func TestActor(t *testing.T) { // create the persistence behavior behavior := NewAccountEntityBehavior(persistenceID) + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of events stream + eventStream := eventstream.New() + // create the persistence actor using the behavior previously created - persistentActor := newActor[*testpb.Account](behavior, eventStore) + persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) @@ -198,6 +219,10 @@ func TestActor(t *testing.T) { errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply) assert.Equal(t, "command sent to the wrong entity", errorReply.ErrorReply.GetMessage()) + // disconnect the event store + require.NoError(t, eventStore.Disconnect(ctx)) + // close the stream + eventStream.Close() // stop the actor system err = actorSystem.Stop(ctx) assert.NoError(t, err) @@ -225,8 +250,15 @@ func TestActor(t *testing.T) { // create the persistence behavior behavior := NewAccountEntityBehavior(persistenceID) + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of events stream + eventStream := eventstream.New() + // create the persistence actor using the behavior previously created - persistentActor := newActor[*testpb.Account](behavior, eventStore) + persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) @@ -243,6 +275,12 @@ func TestActor(t *testing.T) { errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply) assert.Equal(t, "unhandled command", errorReply.ErrorReply.GetMessage()) + + // disconnect from the event store + require.NoError(t, eventStore.Disconnect(ctx)) + + // close the stream + eventStream.Close() // stop the actor system err = actorSystem.Stop(ctx) assert.NoError(t, err) @@ -292,8 +330,15 @@ func TestActor(t *testing.T) { // create the persistence behavior behavior := NewAccountEntityBehavior(persistenceID) + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of event stream + eventStream := eventstream.New() + // create the persistence actor using the behavior previously created - persistentActor := newActor[*testpb.Account](behavior, eventStore) + persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) @@ -379,7 +424,10 @@ func TestActor(t *testing.T) { // free resources assert.NoError(t, schemaUtils.DropTable(ctx)) + assert.NoError(t, eventStore.Disconnect(ctx)) testContainer.Cleanup() + // close the stream + eventStream.Close() err = actorSystem.Stop(ctx) assert.NoError(t, err) }) @@ -405,8 +453,15 @@ func TestActor(t *testing.T) { // create the persistence behavior behavior := NewAccountEntityBehavior(persistenceID) + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of event stream + eventStream := eventstream.New() + // create the persistence actor using the behavior previously created - actor := newActor[*testpb.Account](behavior, eventStore) + actor := newActor[*testpb.Account](behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) @@ -487,6 +542,10 @@ func TestActor(t *testing.T) { } assert.True(t, proto.Equal(expected, resultingState)) + // disconnect from the event store + assert.NoError(t, eventStore.Disconnect(ctx)) + // close the stream + eventStream.Close() // stop the actor system err = actorSystem.Stop(ctx) assert.NoError(t, err) diff --git a/engine.go b/engine.go index 71ef446..971ec55 100644 --- a/engine.go +++ b/engine.go @@ -24,10 +24,13 @@ package ego import ( "context" + "fmt" "time" "github.com/pkg/errors" "github.com/tochemey/ego/eventstore" + "github.com/tochemey/ego/eventstream" + egotel "github.com/tochemey/ego/internal/telemetry" "github.com/tochemey/ego/projection" "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/discovery" @@ -49,6 +52,7 @@ type Engine struct { partitionsCount uint64 // partitionsCount specifies the number of partitions started atomic.Bool + eventStream eventstream.Stream // define the list of projections projections []*Projection projectionRunners []*projection.Runner @@ -64,6 +68,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) logger: log.DefaultLogger, telemetry: telemetry.New(), projections: make([]*Projection, 0), + eventStream: eventstream.New(), } // apply the various options for _, opt := range opts { @@ -144,6 +149,8 @@ func (x *Engine) Start(ctx context.Context) error { func (x *Engine) Stop(ctx context.Context) error { // set the started to false x.started.Store(false) + // close the event stream + x.eventStream.Close() // stop the projections if len(x.projectionRunners) > 0 { // simply iterate the list of projections and start them @@ -159,3 +166,26 @@ func (x *Engine) Stop(ctx context.Context) error { // stop the actor system and return the possible error return x.actorSystem.Stop(ctx) } + +// Subscribe creates an events subscriber +func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error) { + // add a span context + ctx, span := egotel.SpanContext(ctx, "Subscribe") + defer span.End() + + // first check whether the ego engine has started or not + if !x.started.Load() { + return nil, errors.New("eGo engine has not started") + } + // create the subscriber + subscriber := x.eventStream.AddSubscriber() + // subscribe to all the topics + for i := 0; i < int(x.partitionsCount); i++ { + // create the topic + topic := fmt.Sprintf(eventsTopic, i) + // subscribe to the topic + x.eventStream.Subscribe(subscriber, topic) + } + // return the subscriber + return subscriber, nil +} diff --git a/engine_test.go b/engine_test.go index fc8e95f..1d0097f 100644 --- a/engine_test.go +++ b/engine_test.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tochemey/ego/egopb" "github.com/tochemey/ego/eventstore/memory" samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1" "github.com/tochemey/goakt/discovery" @@ -46,6 +47,7 @@ func TestEgo(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() + require.NoError(t, eventStore.Connect(ctx)) nodePorts := dynaport.Get(3) gossipPort := nodePorts[0] @@ -87,6 +89,11 @@ func TestEgo(t *testing.T) { // wait for the cluster to fully start time.Sleep(time.Second) + // subscribe to events + subscriber, err := e.Subscribe(ctx) + require.NoError(t, err) + require.NotNil(t, subscriber) + require.NoError(t, err) // create a persistence id entityID := uuid.NewString() @@ -126,13 +133,29 @@ func TestEgo(t *testing.T) { assert.Equal(t, entityID, newState.GetAccountId()) assert.EqualValues(t, 2, revision) + for message := range subscriber.Iterator() { + payload := message.Payload() + envelope, ok := payload.(*egopb.Event) + event := envelope.GetEvent() + require.True(t, ok) + switch envelope.GetSequenceNumber() { + case 1: + assert.True(t, event.MessageIs(new(samplepb.AccountCreated))) + case 2: + assert.True(t, event.MessageIs(new(samplepb.AccountCredited))) + } + } + // free resources + assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, e.Stop(ctx)) }) t.Run("With no cluster enabled", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() + // connect to the event store + require.NoError(t, eventStore.Connect(ctx)) // create the ego engine e := NewEngine("Sample", eventStore) // start ego engine @@ -173,6 +196,7 @@ func TestEgo(t *testing.T) { assert.EqualValues(t, 2, revision) // free resources + assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, e.Stop(ctx)) }) } diff --git a/entity.go b/entity.go index cb0f20e..a509e43 100644 --- a/entity.go +++ b/entity.go @@ -59,7 +59,7 @@ func NewEntity[T State](ctx context.Context, behavior EntityBehavior[T], engine } // create the instance of the actor - pid, err := engine.actorSystem.Spawn(ctx, behavior.ID(), newActor(behavior, engine.eventsStore)) + pid, err := engine.actorSystem.Spawn(ctx, behavior.ID(), newActor(behavior, engine.eventsStore, engine.eventStream)) // return the error in case there is one if err != nil { return nil, err diff --git a/eventstream/message.go b/eventstream/message.go new file mode 100644 index 0000000..62ec6ee --- /dev/null +++ b/eventstream/message.go @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +// Message defines the stream message +type Message struct { + topic string + payload any +} + +// Topic returns the message topic +func (m Message) Topic() string { + return m.topic +} + +// Payload returns the message payload +func (m Message) Payload() any { + return m.payload +} + +// NewMessage creates an instance of Stream Message +func NewMessage(topic string, payload any) *Message { + return &Message{ + topic: topic, + payload: payload, + } +} diff --git a/eventstream/stream.go b/eventstream/stream.go new file mode 100644 index 0000000..4ec3651 --- /dev/null +++ b/eventstream/stream.go @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +import ( + "sync" +) + +// Subscribers defines the map of subscribers +type Subscribers map[string]Subscriber + +type Stream interface { + // AddSubscriber adds a subscriber + AddSubscriber() Subscriber + // RemoveSubscriber removes a subscriber + RemoveSubscriber(sub Subscriber) + // SubscribersCount returns the number of subscribers for a given topic + SubscribersCount(topic string) int + // Subscribe subscribes a subscriber to a topic + Subscribe(sub Subscriber, topic string) + // Unsubscribe removes a subscriber from a topic + Unsubscribe(sub Subscriber, topic string) + // Publish publishes a message to a topic + Publish(topic string, msg any) + // Broadcast notifies all subscribers of a given topic of a new message + Broadcast(msg any, topics []string) + // Close closes the stream + Close() +} + +// EventsStream defines the stream broker +type EventsStream struct { + subs Subscribers + topics map[string]Subscribers + mu sync.Mutex +} + +// enforce a compilation error +var _ Stream = (*EventsStream)(nil) + +// New creates an instance of EventsStream +func New() Stream { + return &EventsStream{ + subs: Subscribers{}, + topics: map[string]Subscribers{}, + mu: sync.Mutex{}, + } +} + +// AddSubscriber adds a subscriber +func (b *EventsStream) AddSubscriber() Subscriber { + b.mu.Lock() + defer b.mu.Unlock() + c := newSubscriber() + b.subs[c.ID()] = c + return c +} + +// RemoveSubscriber removes a subscriber +func (b *EventsStream) RemoveSubscriber(sub Subscriber) { + // remove subscriber to the broker. + //unsubscribe to all topics which s is subscribed to. + for _, topic := range sub.Topics() { + b.Unsubscribe(sub, topic) + } + b.mu.Lock() + // remove subscriber from list of subscribers. + delete(b.subs, sub.ID()) + b.mu.Unlock() + sub.Shutdown() +} + +// Broadcast notifies all subscribers of a given topic of a new message +func (b *EventsStream) Broadcast(msg any, topics []string) { + // broadcast message to all topics. + for _, topic := range topics { + for _, consumer := range b.topics[topic] { + m := NewMessage(topic, msg) + if !consumer.Active() { + return + } + go (func(s Subscriber) { + s.signal(m) + })(consumer) + } + } +} + +// SubscribersCount returns the number of subscribers for a given topic +func (b *EventsStream) SubscribersCount(topic string) int { + // get total subscribers subscribed to given topic. + b.mu.Lock() + defer b.mu.Unlock() + return len(b.topics[topic]) +} + +// Subscribe subscribes a subscriber to a topic +func (b *EventsStream) Subscribe(sub Subscriber, topic string) { + // subscribe to given topic + b.mu.Lock() + defer b.mu.Unlock() + + // only subscribe active consumer + if !sub.Active() { + return + } + + if b.topics[topic] == nil { + b.topics[topic] = Subscribers{} + } + sub.subscribe(topic) + b.topics[topic][sub.ID()] = sub +} + +// Unsubscribe removes a subscriber from a topic +func (b *EventsStream) Unsubscribe(sub Subscriber, topic string) { + // unsubscribe to given topic + b.mu.Lock() + defer b.mu.Unlock() + + // only unsubscribe active subscriber + if !sub.Active() { + return + } + + delete(b.topics[topic], sub.ID()) + sub.unsubscribe(topic) +} + +// Publish publishes a message to a topic +func (b *EventsStream) Publish(topic string, msg any) { + // publish the message to given topic. + b.mu.Lock() + bTopics := b.topics[topic] + b.mu.Unlock() + for _, consumer := range bTopics { + m := NewMessage(topic, msg) + if !consumer.Active() { + return + } + go (func(s Subscriber) { + s.signal(m) + })(consumer) + } +} + +// Close closes the stream +func (b *EventsStream) Close() { + // acquire the lock + b.mu.Lock() + // release the lock once done + defer b.mu.Unlock() + for _, sub := range b.subs { + if sub.Active() { + sub.Shutdown() + } + } + b.subs = Subscribers{} + b.topics = map[string]Subscribers{} +} diff --git a/eventstream/stream_test.go b/eventstream/stream_test.go new file mode 100644 index 0000000..4dc7fe9 --- /dev/null +++ b/eventstream/stream_test.go @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStream(t *testing.T) { + t.Run("With Subscription", func(t *testing.T) { + broker := New() + + // add consumer + cons := broker.AddSubscriber() + require.NotNil(t, cons) + broker.Subscribe(cons, "t1") + broker.Subscribe(cons, "t2") + + require.EqualValues(t, 1, broker.SubscribersCount("t1")) + require.EqualValues(t, 1, broker.SubscribersCount("t2")) + + // remove the consumer + broker.RemoveSubscriber(cons) + assert.Zero(t, broker.SubscribersCount("t1")) + assert.Zero(t, broker.SubscribersCount("t2")) + + broker.Subscribe(cons, "t3") + assert.Zero(t, broker.SubscribersCount("t3")) + + t.Cleanup(func() { + broker.Close() + }) + }) + t.Run("With Unsubscription", func(t *testing.T) { + broker := New() + + // add consumer + cons := broker.AddSubscriber() + require.NotNil(t, cons) + broker.Subscribe(cons, "t1") + broker.Subscribe(cons, "t2") + + require.EqualValues(t, 1, broker.SubscribersCount("t1")) + require.EqualValues(t, 1, broker.SubscribersCount("t2")) + + // Unsubscribe the consumer + broker.Unsubscribe(cons, "t1") + assert.Zero(t, broker.SubscribersCount("t1")) + require.EqualValues(t, 1, broker.SubscribersCount("t2")) + + broker.Subscribe(cons, "t3") + require.EqualValues(t, 1, broker.SubscribersCount("t3")) + + // remove the consumer + broker.RemoveSubscriber(cons) + broker.Subscribe(cons, "t4") + assert.Zero(t, broker.SubscribersCount("t4")) + + t.Cleanup(func() { + broker.Close() + }) + }) + t.Run("With Publication", func(t *testing.T) { + broker := New() + + // add consumer + cons := broker.AddSubscriber() + require.NotNil(t, cons) + broker.Subscribe(cons, "t1") + broker.Subscribe(cons, "t2") + + require.EqualValues(t, 1, broker.SubscribersCount("t1")) + require.EqualValues(t, 1, broker.SubscribersCount("t2")) + + broker.Publish("t1", "hi") + broker.Publish("t2", "hello") + + time.Sleep(time.Second) + + var messages []*Message + for message := range cons.Iterator() { + messages = append(messages, message) + } + + assert.Len(t, messages, 2) + assert.Len(t, cons.Topics(), 2) + + t.Cleanup(func() { + broker.Close() + }) + }) + t.Run("With Broadcast", func(t *testing.T) { + broker := New() + + // add consumer + cons := broker.AddSubscriber() + require.NotNil(t, cons) + broker.Subscribe(cons, "t1") + broker.Subscribe(cons, "t2") + + require.EqualValues(t, 1, broker.SubscribersCount("t1")) + require.EqualValues(t, 1, broker.SubscribersCount("t2")) + + broker.Broadcast("hi", []string{"t1", "t2"}) + + time.Sleep(time.Second) + + var messages []*Message + for message := range cons.Iterator() { + messages = append(messages, message) + } + + assert.Len(t, messages, 2) + assert.Len(t, cons.Topics(), 2) + + t.Cleanup(func() { + broker.Close() + }) + }) +} diff --git a/eventstream/subscriber.go b/eventstream/subscriber.go new file mode 100644 index 0000000..f9022f8 --- /dev/null +++ b/eventstream/subscriber.go @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +import ( + "sync" + + "github.com/google/uuid" + "github.com/tochemey/goakt/pkg/queue" +) + +// Subscriber defines the Subscriber Interface +type Subscriber interface { + Topics() []string + Iterator() chan *Message + Shutdown() + signal(message *Message) + subscribe(topic string) + unsubscribe(topic string) + Active() bool + ID() string +} + +// subscriber defines the subscriber +type subscriber struct { + // id defines the subscriber id + id string + // sem represents a lock + sem sync.Mutex + // messages of the subscriber + messages *queue.Unbounded[*Message] + // topics define the topic the subscriber subscribed to + topics map[string]bool + // states whether the given subscriber is active or not + active bool +} + +var _ Subscriber = &subscriber{} + +// newSubscriber creates an instance of a stream consumer +// and returns the consumer id and its reference. +// The type of messages the consumer will consume is past as type +// parameter +func newSubscriber() *subscriber { + // create the consumer id + id := uuid.NewString() + return &subscriber{ + id: id, + sem: sync.Mutex{}, + messages: queue.NewUnbounded[*Message](), + topics: make(map[string]bool), + active: true, + } +} + +// ID return consumer id +func (x *subscriber) ID() string { + // acquire the lock + x.sem.Lock() + // release the lock once done + defer x.sem.Unlock() + return x.id +} + +// Active checks whether the consumer is active +func (x *subscriber) Active() bool { + // acquire the lock + x.sem.Lock() + // release the lock once done + defer x.sem.Unlock() + return x.active +} + +// Topics returns the list of topics the consumer has subscribed to +func (x *subscriber) Topics() []string { + // acquire the lock + x.sem.Lock() + // release the lock once done + defer x.sem.Unlock() + var topics []string + for topic := range x.topics { + topics = append(topics, topic) + } + return topics +} + +// Shutdown shutdowns the consumer +func (x *subscriber) Shutdown() { + // acquire the lock + x.sem.Lock() + // release the lock once done + defer x.sem.Unlock() + x.active = false + x.messages.Close() +} + +func (x *subscriber) Iterator() chan *Message { + out := make(chan *Message, x.messages.Len()) + defer close(out) + for { + msg, ok := x.messages.Pop() + if !ok { + break + } + out <- msg + } + return out +} + +// signal is used to push a message to the subscriber +func (x *subscriber) signal(message *Message) { + // acquire the lock + x.sem.Lock() + // release the lock once done + defer x.sem.Unlock() + // only receive message when active + if x.active { + x.messages.Push(message) + } +} + +// subscribe subscribes the subscriber to a given topic +func (x *subscriber) subscribe(topic string) { + // acquire the lock + x.sem.Lock() + // set the topic + x.topics[topic] = true + // release the lock + x.sem.Unlock() +} + +// unsubscribe unsubscribes the subscriber from the give topic +func (x *subscriber) unsubscribe(topic string) { + // acquire the lock + x.sem.Lock() + // remove the topic from the consumer topics + delete(x.topics, topic) + // release the lock + x.sem.Unlock() +} diff --git a/example/main.go b/example/main.go index 6739396..e0c0ec4 100644 --- a/example/main.go +++ b/example/main.go @@ -42,6 +42,8 @@ func main() { ctx := context.Background() // create the event store eventStore := memory.NewEventsStore() + // connect the event store + _ = eventStore.Connect(ctx) // create the ego engine e := ego.NewEngine("Sample", eventStore) // start ego engine @@ -78,6 +80,8 @@ func main() { signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-interruptSignal + // disconnect the event store + _ = eventStore.Disconnect(ctx) // stop the actor system _ = e.Stop(ctx) os.Exit(0) diff --git a/go.sum b/go.sum index 5801578..d11e303 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20230912144702-c363fe2c2ed8 h1:gpptm606MZYGaMHMsB4Srmb6EbW/IVHnt04rcMXnkBQ= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI= @@ -321,8 +319,6 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= diff --git a/projection/runner.go b/projection/runner.go index c951213..f6513e4 100644 --- a/projection/runner.go +++ b/projection/runner.go @@ -58,7 +58,7 @@ type Runner struct { // stop signal stopSignal chan struct{} // started status - isStarted *atomic.Bool + started *atomic.Bool // refresh interval. Events are fetched with this interval // the default value is 1s refreshInterval time.Duration @@ -87,7 +87,7 @@ func NewRunner(name string, offsetsStore: offsetStore, recovery: NewRecovery(), stopSignal: make(chan struct{}, 1), - isStarted: atomic.NewBool(false), + started: atomic.NewBool(false), refreshInterval: time.Second, maxBufferSize: 500, startingOffset: time.Time{}, @@ -108,7 +108,7 @@ func (x *Runner) Start(ctx context.Context) error { ctx, span := telemetry.SpanContext(ctx, "PreStart") defer span.End() - if x.isStarted.Load() { + if x.started.Load() { return nil } @@ -118,7 +118,7 @@ func (x *Runner) Start(ctx context.Context) error { } // call the connect method of the journal store - if err := x.offsetsStore.Connect(ctx); err != nil { + if err := x.offsetsStore.Ping(ctx); err != nil { return fmt.Errorf("failed to connect to the offsets store: %v", err) } @@ -128,7 +128,7 @@ func (x *Runner) Start(ctx context.Context) error { } // call the connect method of the journal store - if err := x.eventsStore.Connect(ctx); err != nil { + if err := x.eventsStore.Ping(ctx); err != nil { return fmt.Errorf("failed to connect to the events store: %v", err) } @@ -168,7 +168,7 @@ func (x *Runner) Start(ctx context.Context) error { } // set the started status - x.isStarted.Store(true) + x.started.Store(true) // start processing go x.processingLoop(ctx) @@ -183,25 +183,15 @@ func (x *Runner) Stop(ctx context.Context) error { defer span.End() // check whether it is stopped or not - if !x.isStarted.Load() { + if !x.started.Load() { return nil } // send the stop x.stopSignal <- struct{}{} - // disconnect the events store - if err := x.eventsStore.Disconnect(ctx); err != nil { - return fmt.Errorf("failed to disconnect the events store: %v", err) - } - - // disconnect the offset store - if err := x.offsetsStore.Disconnect(ctx); err != nil { - return fmt.Errorf("failed to disconnect the offsets store: %v", err) - } - // set the started status to false - x.isStarted.Store(false) + x.started.Store(false) return nil } @@ -296,7 +286,7 @@ func (x *Runner) doProcess(ctx context.Context, shard uint64) error { ctx, span := telemetry.SpanContext(ctx, "HandleShard") defer span.End() - if !x.isStarted.Load() { + if !x.started.Load() { return nil } diff --git a/projection/runner_test.go b/projection/runner_test.go index 81982d6..50b8923 100644 --- a/projection/runner_test.go +++ b/projection/runner_test.go @@ -54,10 +54,12 @@ func TestProjection(t *testing.T) { // set up the event store journalStore := memory.NewEventsStore() assert.NotNil(t, journalStore) + require.NoError(t, journalStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Disconnect(ctx)) // set up the projection // create a handler that return successfully @@ -92,7 +94,7 @@ func TestProjection(t *testing.T) { } require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) + require.True(t, projection.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case time.Sleep(time.Second) @@ -112,6 +114,8 @@ func TestProjection(t *testing.T) { assert.EqualValues(t, 10, handler.EventsCount()) // free resources + assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, projection.Stop(ctx)) }) t.Run("with failed handler with fail strategy", func(t *testing.T) { @@ -123,10 +127,12 @@ func TestProjection(t *testing.T) { // set up the event store journalStore := memory.NewEventsStore() assert.NotNil(t, journalStore) + require.NoError(t, journalStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Disconnect(ctx)) // set up the projection // create a handler that return successfully @@ -159,14 +165,16 @@ func TestProjection(t *testing.T) { } require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) + require.True(t, projection.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case time.Sleep(time.Second) // here due to the default recovery strategy the projection is stopped - require.False(t, projection.isStarted.Load()) + require.False(t, projection.started.Load()) // free resources + assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, projection.Stop(ctx)) }) t.Run("with failed handler and retry_fail strategy", func(t *testing.T) { @@ -178,10 +186,12 @@ func TestProjection(t *testing.T) { // set up the event store journalStore := memory.NewEventsStore() assert.NotNil(t, journalStore) + require.NoError(t, journalStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Disconnect(ctx)) // set up the projection // create a handler that return successfully @@ -220,15 +230,17 @@ func TestProjection(t *testing.T) { } require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) + require.True(t, projection.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case time.Sleep(1 * time.Second) // let us grab the current offset - require.False(t, projection.isStarted.Load()) + require.False(t, projection.started.Load()) // free resources + assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, projection.Stop(ctx)) }) t.Run("with failed handler and skip strategy", func(t *testing.T) { @@ -241,10 +253,12 @@ func TestProjection(t *testing.T) { // set up the event store journalStore := memory.NewEventsStore() assert.NotNil(t, journalStore) + require.NoError(t, journalStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Connect(ctx)) // set up the projection // create a handler that return successfully @@ -283,7 +297,7 @@ func TestProjection(t *testing.T) { } require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) + require.True(t, projection.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case time.Sleep(time.Second) @@ -300,6 +314,8 @@ func TestProjection(t *testing.T) { assert.EqualValues(t, 5, handler.counter.Load()) // free resource + assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, projection.Stop(ctx)) }) t.Run("with failed handler and skip retry strategy", func(t *testing.T) { @@ -312,10 +328,12 @@ func TestProjection(t *testing.T) { // set up the event store journalStore := memory.NewEventsStore() assert.NotNil(t, journalStore) + require.NoError(t, journalStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Connect(ctx)) // set up the projection // create a handler that return successfully @@ -354,7 +372,7 @@ func TestProjection(t *testing.T) { } require.NoError(t, journalStore.WriteEvents(ctx, journals)) - require.True(t, projection.isStarted.Load()) + require.True(t, projection.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case time.Sleep(time.Second) @@ -371,6 +389,8 @@ func TestProjection(t *testing.T) { assert.EqualValues(t, 5, handler.counter.Load()) // free resource + assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, projection.Stop(ctx)) }) } diff --git a/readme.md b/readme.md index 1e628e0..f242830 100644 --- a/readme.md +++ b/readme.md @@ -30,6 +30,7 @@ Under the hood, ego leverages [goakt](https://github.com/Tochemey/goakt) to scal - Built-in offset stores: - [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql) - [Memory](./offsetstore/memory/memory.go) (for testing purpose only) +- Events Subscription: One can subscribe to events that are emitted on the Write Model instead of using the projection - Examples (check the [examples](./example)) ### Installation