Skip to content

Morebec/misas-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

92 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

MISAS Go

MISAS Go is an opinionated library to easily develop systems using a predefined architecture using DDD, CQRS and ES. It provides a solid base for smaller teams to develop advanced system with lesser means. It is the go implementation of MISAS.

Features

  • Domain-Driven Design
  • Event Sourcing
  • CQRS
  • Intra/Out of Process Messaging
  • Observability
    • Tracing using Open Telemetry as automated instrumentation on Command/Query/Event/Prediction buses.
    • Tracing using correlation ID and causation ID on messages propagated to events.

Introduction

MISAS Go mostly provides a set of abstractions to implement DDD, CQRS and ES concepts according to MISAS. It also provides a few concrete implementation of these concepts, for the most common use cases.

Batteries included

Here are some batteries that this library includes to make working with MISAS Systems in Go easier.

  • clock: Provides an abstraction of a clock. by returning "REDACTED" when converted to a string.
  • postgresql/eventstore: Implementation of an event store using PostgreSQL.
  • postgresql/documentstore: Implementation of a document store using PostgreSQL to easily persist unstructured data.
  • postgresql/checkpointstore: Implementation of a checkpoint store using PostgreSQL to persist the last processed event during event processing.
  • postgresql/predictionstore: Implementation of a prediction store using PostgreSQL.
  • secret: Provides a string implementation that avoids showing certain sensitive values in logs or external systems,

Defining a System

At the core of the library there is the concept of System which represents an information system. The System struct is used as a centralized point to define systems. Although entirely optional, the use of the System allows to expressively define the dependencies of the core units within the system:

	utcClock := clock.NewUTCClock()

	s := system.New(
		// These information are reused in logs, tracing spans or as metadata for events.
		system.WithInformation(system.Information{
			Name:    "unit_test",
			Version: "1.0.0",
		}),
		system.WithEnvironment(system.Test),
		system.WithClock(utcClock),

		system.WithCommandHandling(
			system.WithCommandBus(
				command.NewInMemoryBus(),
			),
		),

		system.WithQueryHandling(
			system.WithQueryBus(
				query.NewInMemoryBus(),
			),
		),

		system.WithEventHandling(
			system.WithEventBus(
				event.NewInMemoryBus(),
			),
			system.WithEventStore(
				postgresql.NewEventStore("connectionString", utcClock),
			),
		),

		system.WithPredictionHandling(
			system.WithPredictionBus(
				prediction.NewInMemoryBus(),
			),
			system.WithPredictionStore(
				postgresql.NewPredictionStore(),
			),
		),

		system.WithInstrumentation(
			system.WithTracer(instrumentation.NewSystemTracer()),
			system.WithDefaultLogger(),
			system.WithJaegerTracingSpanExporter("urlToJaeggerInstance"),
			system.WithCommandBusInstrumentation(), // Decorates the command bus adding automated instrumentation.
			system.WithQueryBusInstrumentation(), // Decorates the query bus adding automated instrumentation.
			system.WithEventBusInstrumentation(), // Decorates the event bus adding automated instrumentation.
			system.WithPredictionBusInstrumentation(), // Decorates the prediction bus adding automated instrumentation.
			system.WithEventStoreInstrumentation(), // Decorates the event store adding automated instrumentation.
		),

		// Modules allow separating the dependencies of the systems.
		system.WithSubsystems(
			func(s *system.Subsystem) {
				// Registers
				s.RegisterEvent(accountCreated{})
				s.RegisterCommandHandler(createAccount{}, createAccountCommandHandler))
			}, 
		),
	)

	// Entry points are procedure to start the system and its subsystem's interaction layers.
	// Depending on the needs of the system, one could need to define different entry points
	// starting different things. (e.g. Web Server, Message Queue etc.)
    mainEntryPoint := NewEntryPoint(
		// Name of the entry point, if instrumentation is enabled (see below), this name will be used in spans.
		"web_server",

		// Function to effectively start the entry point.
		func(ctx context.Context, s *System) error {
			return nil
		},

		// Function to stop the entry point.
		func(ctx context.Context, s *System) error {
			return nil
		},

		// Allows adding automated instrumentation on the entry point.
		WithEntryPointInstrumentation(),
	),
	
	// Allows running the system with the given entry point.
	if err := s.RunEntryPoint(mainEntryPoint); err != nil {
		panic(err)
	}

Command Processing

Command Handlers & Failures

Registering a Command Handler with the System

Aggregates

Implementing an event sourced Aggregate

The aggregate interface has the following structure:

type User struct {
	EventSourcedAggregateBase

	ID           string
	EmailAddress string
}

func (u *User) ApplyEvent(evt event.Event) {
	switch evt.(type) {
	case UserRegisteredEvent:
		e := evt.(UserRegisteredEvent)
		u.ID = e.ID
		u.EmailAddress = e.EmailAddress
	}
}

func RegisterUser(id string, emailAddress string) *User {
	u := &User{
		ID:           "",
		EmailAddress: "",
	}
	// NOTE THIS LINE HERE
	u.EventSourcedAggregateBase = EventSourcedAggregateBase{
		ApplyEvent: u.ApplyEvent,
	}

	u.RecordEvent(UserRegisteredEvent{
		ID:           id,
		EmailAddress: emailAddress,
	})

	return u
}

Aggregate Repositories

If using the EventSourcedAggregate interface, one can use the domain.EventStoreRepository helper to quickly implement event store based repositories for aggregates through composition:

type UserRepository struct {
  inner: aggregate.EventStoreRepository
}

func NewUserRepository(es event.Store) *UserRepository {
  return &UserRepository{
    inner: aggregate.NewEventStoreRepository(es, func() aggregate.Aggregate { 
	  // This callback allows defining the initial state of an aggregate before applying its saved changes
          // when loading.
      return &User{}
    }),
  }
}

func (r *UserRepository) Add(ctx context.Context, u *User) error {
    return r.inner.Add(ctx, event.StreamID("user/"+u.ID), u)	
}

func (r *UserRepository) Save(ctx context.Context, u *User, version Version) error {
    return r.inner.Save(ctx, event.StreamID("user/"+u.ID), u, version)	
}

func (r *UserRepository) FindByID(ctx context.Context, id UserID) (*User, Version, error) {
  loaded, v, err := r.inner.Load(ctx, event.StreamID("user/"+u.ID))
  if err != nil {
    return &User{}, 0, err
  }
  
  return loaded.(*User), v, nil
}

Domain Errors

You can create errors like so

const UserNotFoundErrorTypeName domain.ErrorTypeName = "user_not_found"
func UserNotFoundError(id UserId, cause error) error
	return domain.NewError(
		WithTypeName(UserNotFoundErrorTypeName),
		WithMessage(fmt.Sprtinf("user %s not found", string(id))),
		WithCause(cause)
		WithData(map[string]any{
			"id": string(id)
		})
	)
)

You can test that an error is of a given typeName

domain.IsDomainErrorWithTypeName(UserNotFoundErrorTypeName)

Domain errors can also be tagged upon creation:

domain.NewError(
	// ...
	WithTag("a tag")
	WithTags("another tag", "yet another tag")
)

Tags allow grouping errors, for example a system might have a lot of different not found errors for specific types of resources, aggregates and views. Some components of the system might simply want to know if an error qualifies as a not found error without needing to maintain a list of all the ErrorTypeNames that qualifies for this.

This is where tags come into play. There are a few tags available out of the box:

  • domain.NotFoundTag: When a resource, aggregate, view etc. was not found.
  • domain.AlreadyExistsTag: When a resource, aggregate view, was expected not to be found.
  • domain.ValidationErrorTag: When an error represents a validation error.

Query Processing

Query Handlers & Failures

Event Processing

Registering a Query Handler with the System

Event Store

Event Store Subscriptions

Event Processors

Event Handlers

eventBus.RegisterHandler(EventTypeName, Handler)

Checkpoints

Event Handlers & Failures

When an event handler fails to process an event, there are two common strategies at our disposal:

  • Continued Processing: Ignore/log the failure and continue processing the next events.
  • Delayed Processing: Stop/retry the processing at the problematic event, until fixed.

Each strategy has its own pros and cons. Delayed Processing prevents any out-of-order processing of events, and ensures that the system when done with the processing will be fully consistent. However, it will require the event handlers to be idempotent since they have the potential of being called multiple times for the same events in cases of retries. Continued Processing has the benefit of not blocking the processing of events and can therefore minimize the impact it has on other components of the system, however, it also means that event handlers should be implemented in a way to support inconsistencies in data since some events will have happened and will hve been partially applied. This leads to a system that can be slightly inconsistent, and will require close attention to these potential inconsistencies.

An interesting strategy is to used Delayed processing combined with event processing partitions, (e.g. one event processor per subsystem) which can often drastically minimize the bottlenecks occasioned by having a problematic event.

Registering Event Handlers with the System

Prediction Processing

Prediction Handlers & Failures

Projection Processing

Projectors & Failures

HTTP API

Define an Endpoint

func HomeEndpoint(r chi.Router) {
	r.Get("/", func(writer http.ResponseWriter, request *http.Request) {
		writer.WriteHeader(500)
		render.JSON(writer, request, NewSuccessResponse(nil))
	})
}

Start HTTP Web Server

func StartFrontendAPI() error {
	r := chi.NewRouter()
	r.Use(middleware.AllowContentType("application/json"))
	r.Use(middleware.CleanPath)
	r.Use(middleware.RealIP)
	r.Use(middleware.Recoverer)

	r.Use(middleware.Timeout(time.Second * 60))
	r.Use(render.SetContentType(render.ContentTypeJSON))

	HomeEndpoint(r)

	if err := http.ListenAndServe(":3000", r); err != nil {
		return err
	}
	return nil
}

About

MISAS Go implementation

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages