Skip to content

Latest commit

 

History

History
155 lines (112 loc) · 8.79 KB

subscriptions.md

File metadata and controls

155 lines (112 loc) · 8.79 KB

Subscriptions

In order to process new events in your microservices you have to have the ability to listen for them. pg_eventstore implements a subscription feature for this matter. It is implemented as a background thread that pulls new events according to your filters from time to time (see subscription_pull_interval setting option under Configuration chapter).

PgEventstore::Subscription

pg_eventstore stores various subscription information in the database. The corresponding object that describes the database records is the PgEventstore::Subscription object. It is used in the config.subscription_restart_terminator setting for example. You can find its attributes summary here.

PgEventstore::SubscriptionsSet

pg_eventstore also stores information about which subscriptions are set. The corresponding object that describes the database records is PgEventstore::SubscriptionsSet. You can find its attributes summary here.

This record is created when you start your subscriptions. All subscriptions created using a single subscriptions manager instance are locked using a single PgEventstore::SubscriptionsSet. When subscriptions are locked, they can't be managed anywhere else. When you stop your subscriptions, the PgEventstore::SubscriptionsSet is deleted, unlocking the subscriptions. The SubscriptionSet also holds information about the state, number of restarts, the restart interval and last error of the background runner which is responsible for pulling the subscription's events. You can set the max number of restarts and the restarts interval of your subscriptions set via config.subscriptions_set_max_retries and config.subscriptions_set_retries_interval settings. See Configuration chapter for more info.

Creating a subscription

First step you need to do is to create a PgEventstore::SubscriptionsManager object and provide the subscription_set keyword argument. Optionally you can provide a config name to use, override the config.subscriptions_set_max_retries and config.subscriptions_set_retries_interval settings:

subscriptions_manager = PgEventstore.subscriptions_manager(subscription_set: 'SubscriptionsOfMyAwesomeMicroservice')
another_subscriptions_manager = PgEventstore.subscriptions_manager(:my_custom_config, subscription_set: 'SubscriptionsOfMyAwesomeMicroservice', max_retries: 5, retries_interval: 2)

The required subscription_set option groups your subscriptions into a set. For example, you could refer to your service's name in the subscription set name.

Now we can use the #subscribe method to create the subscription:

subscriptions_manager.subscribe('MyAwesomeSubscription', handler: proc { |event| puts event })

First argument is the subscription's name. It must be unique within the subscriptions set. Second argument is your subscription's handler where you will be processing your events as they arrive. The example shows the minimum set of arguments required to create the subscription.

In the given state it will be listening to all events from all streams. You can define various filters by providing the :filter key of options argument:

subscriptions_manager.subscribe(
  'MyAwesomeSubscription', 
  handler: proc { |event| puts event }, 
  options: { filter: { streams: [{ context: 'MyAwesomeContext' }], event_types: ['Foo', 'Bar'] } }
)

:filter supports the same options as the #read method supports when reading from the "all" stream. See "all" stream filtering section of Reading events chapter.

After you added all necessary subscriptions, it is time to start them:

subscriptions_manager.start
# => PgEventstore::BasicRunner

After calling #start all subscriptions are locked behind the given subscriptions set and can't be locked by any other subscriptions set. This measure is needed to prevent running the same subscription under the same subscription set using different processes/subscription managers. Such situation will lead to a malformed subscription state and will break its position, meaning the same event will be processed several times.

If, for some reason, you want to lock already locked subscription - you can provide force_lock: true:

subscriptions_manager = PgEventstore.subscriptions_manager(subscription_set: 'SubscriptionsOfMyAwesomeMicroservice', force_lock: true)
subscriptions_manager.start

A complete example of the subscription setup process looks like this:

PgEventstore.configure do |config|
  config.pg_uri = ENV.fetch('PG_EVENTSTORE_URI') { 'postgresql://postgres:postgres@localhost:5532/eventstore' }  
end

subscriptions_manager = PgEventstore.subscriptions_manager(
  subscription_set: 'MyAwesomeSubscriptions'
)
subscriptions_manager.subscribe(
  'Foo events Subscription', 
  handler: proc { |event| p "Foo events Subscription: #{event.inspect}" }, 
  options: { filter: { event_types: ['Foo'] } }
)
subscriptions_manager.subscribe(
  '"BarCtx" context Subscription',
  handler: proc { |event| p "'BarCtx' context Subscription: #{event.inspect}" }, 
  options: { filter: { streams: [{ context: 'BarCtx' }] } 
  }
)
subscriptions_manager.start

Persist this script into a file(let's say subscriptions.rb). Now it is time to start the process which will be processing those subscriptions. pg_eventstore has CLI for that purpose:

# -r ./subscriptions.rb will load our subscriptions definitions
pg-eventstore subscriptions start -r ./subscriptions.rb

After running that test subscriptions you can open another ruby console and test posting different events:

require 'pg_eventstore'

PgEventstore.configure do |config|
  config.pg_uri = ENV.fetch('PG_EVENTSTORE_URI') { 'postgresql://postgres:postgres@localhost:5532/eventstore' }
end

foo_stream = PgEventstore::Stream.new(context: 'FooCtx', stream_name: 'MyAwesomeStream', stream_id: '1')
bar_stream = PgEventstore::Stream.new(context: 'BarCtx', stream_name: 'MyAwesomeStream', stream_id: '1')
PgEventstore.client.append_to_stream(foo_stream, PgEventstore::Event.new(type: 'Foo', data: { foo: :bar }))
PgEventstore.client.append_to_stream(bar_stream, PgEventstore::Event.new(type: 'Foo', data: { foo: :bar }))

You will then see the output of your subscription handlers. To gracefully stop the subscriptions process, use kill -TERM <pid> command.

Overriding Subscription config values

You can override subscription_pull_interval, subscription_max_retries, subscription_retries_interval, subscription_restart_terminator, failed_subscription_notifier and subscription_graceful_shutdown_timeout config values (see Configuration chapter for details) for the specific subscription by providing the corresponding arguments. Example:

subscriptions_manager.subscribe(
  'MyAwesomeSubscription', 
  handler: proc { |event| puts event },
  # overrides config.subscription_pull_interval
  pull_interval: 0.5,
  # overrides config.subscription_max_retries
  max_retries: 10,
  # overrides config.subscription_retries_interval
  retries_interval: 2,
  # overrides config.subscription_restart_terminator
  restart_terminator: proc { |subscription| subscription.last_error['class'] == 'NoMethodError' },
  # overrides config.failed_subscription_notifier
  failed_subscription_notifier: proc { |_subscription, err| p err },
  # overrides config.subscription_graceful_shutdown_timeout
  graceful_shutdown_timeout: 20
)

Middlewares

If you would like to skip some of your registered middlewares from processing events after they are being pulled by the subscription, you should use the :middlewares argument which allows you to override the list of middlewares you would like to use.

Let's say you have these registered middlewares:

PgEventstore.configure do |config|
  config.middlewares = { foo: FooMiddleware.new, bar: BarMiddleware.new, baz: BazMiddleware.new }
end

And you want to skip FooMiddleware and BazMiddleware. You simply have to provide an array of corresponding middleware keys you would like to use when creating the subscription:

subscriptions_manager.subscribe('MyAwesomeSubscription', handler: proc { |event| puts event }, middlewares: %i[bar])

See the Writing middleware chapter for info about what is middleware and how to implement it.

How many subscriptions I should put in one process?

It depends on the nature of your subscription handlers. If they spend more time on ruby code execution than on IO operations, you should limit the number of subscriptions per single process. This can be especially noticed when you rebuild the read models of your microservice, processing all events from the start.