In order to process new events in your microservices you have to have the ability to listen for them. pg_eventstore
implements a subscription feature for this matter. It is implemented as a background thread that pulls new events according to your filters from time to time (see subscription_pull_interval
setting option under Configuration chapter).
pg_eventstore
stores various subscription information in the database. The corresponding object that describes the database records is the PgEventstore::Subscription
object. It is used in the config.subscription_restart_terminator
setting for example. You can find its attributes summary here.
pg_eventstore
also stores information about which subscriptions are set. The corresponding object that describes the database records is PgEventstore::SubscriptionsSet
. You can find its attributes summary here.
This record is created when you start your subscriptions. All subscriptions created using a single subscriptions manager instance are locked using a single PgEventstore::SubscriptionsSet
. When subscriptions are locked, they can't be managed anywhere else. When you stop your subscriptions, the PgEventstore::SubscriptionsSet
is deleted, unlocking the subscriptions. The SubscriptionSet
also holds information about the state, number of restarts, the restart interval and last error of the background runner which is responsible for pulling the subscription's events. You can set the max number of restarts and the restarts interval of your subscriptions set via config.subscriptions_set_max_retries
and config.subscriptions_set_retries_interval
settings. See Configuration chapter for more info.
First step you need to do is to create a PgEventstore::SubscriptionsManager
object and provide the subscription_set
keyword argument. Optionally you can provide a config name to use, override the config.subscriptions_set_max_retries
and config.subscriptions_set_retries_interval
settings:
subscriptions_manager = PgEventstore.subscriptions_manager(subscription_set: 'SubscriptionsOfMyAwesomeMicroservice')
another_subscriptions_manager = PgEventstore.subscriptions_manager(:my_custom_config, subscription_set: 'SubscriptionsOfMyAwesomeMicroservice', max_retries: 5, retries_interval: 2)
The required subscription_set
option groups your subscriptions into a set. For example, you could refer to your service's name in the subscription set name.
Now we can use the #subscribe
method to create the subscription:
subscriptions_manager.subscribe('MyAwesomeSubscription', handler: proc { |event| puts event })
First argument is the subscription's name. It must be unique within the subscriptions set. Second argument is your subscription's handler where you will be processing your events as they arrive. The example shows the minimum set of arguments required to create the subscription.
In the given state it will be listening to all events from all streams. You can define various filters by providing the :filter
key of options
argument:
subscriptions_manager.subscribe(
'MyAwesomeSubscription',
handler: proc { |event| puts event },
options: { filter: { streams: [{ context: 'MyAwesomeContext' }], event_types: ['Foo', 'Bar'] } }
)
:filter
supports the same options as the #read
method supports when reading from the "all"
stream. See "all" stream filtering section of Reading events chapter.
After you added all necessary subscriptions, it is time to start them:
subscriptions_manager.start
# => PgEventstore::BasicRunner
After calling #start
all subscriptions are locked behind the given subscriptions set and can't be locked by any other subscriptions set. This measure is needed to prevent running the same subscription under the same subscription set using different processes/subscription managers. Such situation will lead to a malformed subscription state and will break its position, meaning the same event will be processed several times.
If, for some reason, you want to lock already locked subscription - you can provide force_lock: true
:
subscriptions_manager = PgEventstore.subscriptions_manager(subscription_set: 'SubscriptionsOfMyAwesomeMicroservice', force_lock: true)
subscriptions_manager.start
A complete example of the subscription setup process looks like this:
PgEventstore.configure do |config|
config.pg_uri = ENV.fetch('PG_EVENTSTORE_URI') { 'postgresql://postgres:postgres@localhost:5532/eventstore' }
end
subscriptions_manager = PgEventstore.subscriptions_manager(
subscription_set: 'MyAwesomeSubscriptions'
)
subscriptions_manager.subscribe(
'Foo events Subscription',
handler: proc { |event| p "Foo events Subscription: #{event.inspect}" },
options: { filter: { event_types: ['Foo'] } }
)
subscriptions_manager.subscribe(
'"BarCtx" context Subscription',
handler: proc { |event| p "'BarCtx' context Subscription: #{event.inspect}" },
options: { filter: { streams: [{ context: 'BarCtx' }] }
}
)
subscriptions_manager.start
Persist this script into a file(let's say subscriptions.rb
). Now it is time to start the process which will be processing those subscriptions. pg_eventstore
has CLI for that purpose:
# -r ./subscriptions.rb will load our subscriptions definitions
pg-eventstore subscriptions start -r ./subscriptions.rb
After running that test subscriptions you can open another ruby console and test posting different events:
require 'pg_eventstore'
PgEventstore.configure do |config|
config.pg_uri = ENV.fetch('PG_EVENTSTORE_URI') { 'postgresql://postgres:postgres@localhost:5532/eventstore' }
end
foo_stream = PgEventstore::Stream.new(context: 'FooCtx', stream_name: 'MyAwesomeStream', stream_id: '1')
bar_stream = PgEventstore::Stream.new(context: 'BarCtx', stream_name: 'MyAwesomeStream', stream_id: '1')
PgEventstore.client.append_to_stream(foo_stream, PgEventstore::Event.new(type: 'Foo', data: { foo: :bar }))
PgEventstore.client.append_to_stream(bar_stream, PgEventstore::Event.new(type: 'Foo', data: { foo: :bar }))
You will then see the output of your subscription handlers. To gracefully stop the subscriptions process, use kill -TERM <pid>
command.
You can override subscription_pull_interval
, subscription_max_retries
, subscription_retries_interval
, subscription_restart_terminator
, failed_subscription_notifier
and subscription_graceful_shutdown_timeout
config values (see Configuration chapter for details) for the specific subscription by providing the corresponding arguments. Example:
subscriptions_manager.subscribe(
'MyAwesomeSubscription',
handler: proc { |event| puts event },
# overrides config.subscription_pull_interval
pull_interval: 0.5,
# overrides config.subscription_max_retries
max_retries: 10,
# overrides config.subscription_retries_interval
retries_interval: 2,
# overrides config.subscription_restart_terminator
restart_terminator: proc { |subscription| subscription.last_error['class'] == 'NoMethodError' },
# overrides config.failed_subscription_notifier
failed_subscription_notifier: proc { |_subscription, err| p err },
# overrides config.subscription_graceful_shutdown_timeout
graceful_shutdown_timeout: 20
)
If you would like to skip some of your registered middlewares from processing events after they are being pulled by the subscription, you should use the :middlewares
argument which allows you to override the list of middlewares you would like to use.
Let's say you have these registered middlewares:
PgEventstore.configure do |config|
config.middlewares = { foo: FooMiddleware.new, bar: BarMiddleware.new, baz: BazMiddleware.new }
end
And you want to skip FooMiddleware
and BazMiddleware
. You simply have to provide an array of corresponding middleware keys you would like to use when creating the subscription:
subscriptions_manager.subscribe('MyAwesomeSubscription', handler: proc { |event| puts event }, middlewares: %i[bar])
See the Writing middleware chapter for info about what is middleware and how to implement it.
It depends on the nature of your subscription handlers. If they spend more time on ruby code execution than on IO operations, you should limit the number of subscriptions per single process. This can be especially noticed when you rebuild the read models of your microservice, processing all events from the start.