Skip to content

Commit

Permalink
docs: enhance readme (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 29, 2023
1 parent 98fc19a commit 568b28a
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 24 deletions.
133 changes: 133 additions & 0 deletions projection/actor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2022-2023 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package projection

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tochemey/ego/egopb"
"github.com/tochemey/ego/eventstore/memory"
memoffsetstore "github.com/tochemey/ego/offsetstore/memory"
testpb "github.com/tochemey/ego/test/data/pb/v1"
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/log"
"go.uber.org/goleak"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestActor(t *testing.T) {
t.Run("With happy path", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

// start the actor system
err = actorSystem.Start(ctx)
require.NoError(t, err)

projectionName := "db-writer"
persistenceID := uuid.NewString()
shardNumber := uint64(9)
logger := log.DefaultLogger

// set up the event store
journalStore := memory.NewEventsStore()
assert.NotNil(t, journalStore)
require.NoError(t, journalStore.Connect(ctx))

// set up the offset store
offsetStore := memoffsetstore.NewOffsetStore()
assert.NotNil(t, offsetStore)
require.NoError(t, offsetStore.Disconnect(ctx))

handler := NewDiscardHandler(logger)

// create the actor
actor := New(projectionName, handler, journalStore, offsetStore, WithRefreshInterval(time.Millisecond))
// spawn the actor
pid, err := actorSystem.Spawn(ctx, persistenceID, actor)
require.NoError(t, err)
require.NotNil(t, pid)

// start the projection
require.NoError(t, actors.Tell(ctx, pid, Start))

// persist some events
state, err := anypb.New(new(testpb.Account))
assert.NoError(t, err)
event, err := anypb.New(&testpb.AccountCredited{})
assert.NoError(t, err)

count := 10
timestamp := timestamppb.Now()
journals := make([]*egopb.Event, count)
for i := 0; i < count; i++ {
seqNr := i + 1
journals[i] = &egopb.Event{
PersistenceId: persistenceID,
SequenceNumber: uint64(seqNr),
IsDeleted: false,
Event: event,
ResultingState: state,
Timestamp: timestamp.AsTime().Unix(),
Shard: shardNumber,
}
}

require.NoError(t, journalStore.WriteEvents(ctx, journals))

// wait for the data to be persisted by the database since this an eventual consistency case
time.Sleep(time.Second)

// create the projection id
projectionID := &egopb.ProjectionId{
ProjectionName: projectionName,
ShardNumber: shardNumber,
}

// let us grab the current offset
actual, err := offsetStore.GetCurrentOffset(ctx, projectionID)
assert.NoError(t, err)
assert.NotNil(t, actual)
assert.EqualValues(t, journals[9].GetTimestamp(), actual.GetValue())

assert.EqualValues(t, 10, handler.EventsCount())

// free resources
assert.NoError(t, journalStore.Disconnect(ctx))
assert.NoError(t, offsetStore.Disconnect(ctx))
assert.NoError(t, actorSystem.Stop(ctx))
})
}
76 changes: 52 additions & 24 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,62 @@
[![codecov](https://codecov.io/gh/Tochemey/ego/branch/main/graph/badge.svg?token=Z5b9gM6Mnt)](https://codecov.io/gh/Tochemey/ego)
[![GitHub tag (with filter)](https://img.shields.io/github/v/tag/tochemey/ego)](https://github.com/Tochemey/ego/tags)

eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers.
eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their **_commands_**, **_events_** and **_states_** **_are defined using google protocol buffers_**.
Under the hood, ego leverages [Go-Akt](https://github.com/Tochemey/goakt) to scale out and guarantee performant, reliable persistence.

### Features

- Write Model:
- Commands handler: The command handlers define how to handle each incoming command,
which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
be returned as a no-op. Command handlers are the meat of the event sourced actor.
They encode the business rules of your event sourced actor and act as a guardian of the Aggregate consistency.
The command handler must first validate that the incoming command can be applied to the current model state.
Any decision should be solely based on the data passed in the commands and the state of the Behavior.
In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.
- Events handler: The event handlers are used to mutate the state of the Aggregate by applying the events to it.
Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store.
- Extensible events store
- Built-in events store:
- [Postgres](./eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql)
- [Memory](./eventstore/memory/memory.go) (for testing purpose only)
- [Cluster Mode](https://github.com/Tochemey/goakt#clustering)
- Projection: Helps build read model with a timestamp-based offset.
- Runner: Helps consume and handle events persisted by entity. It depends on the [Offset Store](./offsetstore/iface.go) to track consumers' offset.
- Extensible Offset store: Helps store offsets of events consumed and processed by projections
- Built-in offset stores:
- [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql)
- [Memory](./offsetstore/memory/memory.go) (for testing purpose only)
- Events Subscription: One can subscribe to events that are emitted on the Write Model instead of using the projection
- Examples (check the [examples](./example))
#### Domain Entity/Aggregate Root

The aggregate root is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the aggregate root.
In eGo commands sent the aggregate root are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number
and timestamp that can help track it. The aggregate root in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the aggregate root.
When running in cluster mode, aggregate root are sharded.

- Commands handler: The command handlers define how to handle each incoming command,
which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
be returned as a no-op. Command handlers are the meat of the event sourced actor.
They encode the business rules of your event sourced actor and act as a guardian of the Aggregate consistency.
The command handler must first validate that the incoming command can be applied to the current model state.
Any decision should be solely based on the data passed in the commands and the state of the Behavior.
In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.
- Events handler: The event handlers are used to mutate the state of the Aggregate by applying the events to it.
Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store.

To define an Aggregate Root, one needs to:
1. the state of the aggregate root using google protocol buffers message
2. the various commands that will be handled by the aggregate root
3. the various events that are result of the command handlers and that will be handled by the aggregate root to return the new state of the aggregate root
2. implements the [`EntityBehavior[T State]`](./behavior.go) interface where T is the generated golang struct of the prior defined aggregate root state.

#### Events Stream

Every event handled by Aggregate Root are pushed to an events stream. That enables real-time processing of events without having to interact with the events store

#### Projection

One can add a projection to the eGo engine to help build a read model. Projections in eGo rely on an offset store to track how far they have consumed events
persisted by the write model. The offset used in eGo is a timestamp-based offset.

#### Events Store

One can implement a custom events store. See [EventsStore](./eventstore/iface.go). eGo comes packaged with two events store:
- [Postgres](./eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql)
- [Memory](./eventstore/memory/memory.go) (for testing purpose only)

#### Offsets Store

One can implement a custom offsets store. See [OffsetStore](./offsetstore/iface.go). eGo comes packaged with two offset store:
- [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql)
- [Memory](./offsetstore/memory/memory.go) (for testing purpose only)

#### Cluster

The cluster mode heavily relies on [Go-Akt](https://github.com/Tochemey/goakt#clustering) clustering.

#### Examples

Check the [examples](./example)

### Installation

Expand Down

0 comments on commit 568b28a

Please sign in to comment.