The easiest way to read a stream forwards is to supply a PgEventstore::Stream
object.
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream)
# => [#<PgEventstore::Event 0x1>, #<PgEventstore::Event 0x1>, ...]
You can provide the :max_count
option. This option determines how many records to return in a response. Default is 1000
and it can be changed with the :max_count
configuration setting (see "Configuration" chapter):
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { max_count: 100 })
When reading streams with projected events (links to other events) you can chose to resolve those links by setting resolve_link_tos
to true
, returning the original event instead of the "link" event.
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { resolve_link_tos: true })
You can define from which revision number you would like to start to read events:
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { from_revision: 2 })
As well as being able to read a stream forwards you can also go backwards. This can be achieved by providing the :direction
option:
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { direction: 'Backwards' })
In case a stream with given name does not exist, a PgEventstore::StreamNotFoundError
error will be raised:
begin
stream = PgEventstore::Stream.new(context: 'non-existing-context', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { max_count: 1 })
rescue PgEventstore::StreamNotFoundError => e
puts e.message # => Stream #<PgEventstore::Stream:0x01> does not exist.
puts e.stream # => #<PgEventstore::Stream:0x01>
end
"all" stream definition means that you don't scope your events when reading them from the database. To get the "all" PgEventstore::Stream
instance you have to call the all_stream
method:
PgEventstore::Stream.all_stream
Now you can use it to read from the "all" stream:
PgEventstore.client.read(PgEventstore::Stream.all_stream)
You can read from a specific position of the "all" stream. This is very similar to reading from a specific revision of a specific stream, but instead of the :from_revision
option you have to provide the :from_position
option:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { from_position: 9023, direction: 'Backwards' })
"$streams"
is a special stream which consists of events with stream_revision == 0
. This allows you to effectively query all streams. Example:
stream = PgEventstore::Stream.system_stream("$streams")
PgEventstore.client.read(stream).map(&:stream) # => array of unique streams
If you would like to skip some of your registered middlewares from processing events after they being read from a stream - you should use the :middlewares
argument which allows you to override the list of middlewares you would like to use.
Let's say you have these registered middlewares:
PgEventstore.configure do |config|
config.middlewares = { foo: FooMiddleware.new, bar: BarMiddleware.new, baz: BazMiddleware.new }
end
And you want to skip FooMiddleware
and BazMiddleware
. You simply have to provide an array of corresponding middleware keys you would like to use:
PgEventstore.client.read(PgEventstore::Stream.all_stream, middlewares: %i[bar])
See Writing middleware chapter for info about what is middleware and how to implement it.
When reading events, you can additionally filter the result. Available attributes for filtering depend on the type of stream you are reading from. Reading from the "all" stream supports filters by stream attributes and event types. Reading from a specific stream supports filters by event types only.
Filtering events by their types:
stream = PgEventstore::Stream.new(context: 'MYAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read(stream, options: { filter: { event_types: %w[Foo Bar] } })
Warning There is a restriction on a set of stream attributes that can be used when filtering an "all" stream result. Available combinations:
:context
:context
and:stream_name
:context
,:stream_name
and:stream_id
All other combinations, like providing only :stream_name
or providing :context
with :stream_id
will be ignored.
Filtering events by type:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { event_types: %w[Foo Bar] } })
Filtering events by context:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { streams: [{ context: 'MyAwesomeContext' }] } })
Filtering events by context and name:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { streams: [{ context: 'MyAwesomeContext', stream_name: 'User' }] } })
Filtering events by stream context, stream name and stream id:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { streams: [{ context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae' }] } })
You can provide several sets of stream's attributes. The result will be a union of events that match those criteria. For example, next query will return all events that belong to streams with AnotherContext
context and all events that belong to streams with MyAwesomeContext
context and User
stream name:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { streams: [{ context: 'AnotherContext' }, { context: 'MyAwesomeContext', stream_name: 'User' }] } })
You can also mix filtering by stream's attributes and event types. The result will be intersection of events matching stream's attributes and event's types. For example, next query will return events which type is either Foo
or Bar
and which belong to a stream with MyAwesomeContext
context:
PgEventstore.client.read(PgEventstore::Stream.all_stream, options: { filter: { streams: [{ context: 'MyAwesomeContext' }], event_types: %w[Foo Bar] } })
When reading from "$streams"
same rules apply as when reading from "all" stream. For example, read all streams which have context == "MyAwesomeContext"
and start from events with event type either "Foo"
or "Bar"
:
PgEventstore.client.read(PgEventstore::Stream.system_stream("$streams"), options: { filter: { streams: [{ context: 'MyAwesomeContext' }], event_types: %w[Foo Bar] } })
You can use #read_paginated
to iterate over all (filtered) events. It yields each batch of records that was found according to the filter options:
# Read from the specific stream
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read_paginated(stream).each do |events|
events.each do |event|
# iterate through events
end
end
# Read from "all" stream
PgEventstore.client.read_paginated(PgEventstore::Stream.all_stream).each do |events|
events.each do |event|
# iterate through events
end
end
Options are the same as for #read
method. Several examples:
# Read "Foo" events only from the specific stream
stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'User', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read_paginated(stream, options: { filter: { event_types: ['Foo'] } }).each do |events|
events.each do |event|
# iterate through events
end
end
# Backwards read from "all" stream
PgEventstore.client.read_paginated(PgEventstore::Stream.all_stream, options: { direction: 'Backwards' }).each do |events|
events.each do |event|
# iterate through events
end
end
# Set batch size to 100
PgEventstore.client.read_paginated(PgEventstore::Stream.all_stream, options: { max_count: 100 }).each do |events|
events.each do |event|
# iterate through events
end
end
# Reading from projection stream
projection_stream = PgEventstore::Stream.new(context: 'MyAwesomeContext', stream_name: 'MyAwesomeProjection', stream_id: 'f37b82f2-4152-424d-ab6b-0cc6f0a53aae')
PgEventstore.client.read_paginated(projection_stream, options: { resolve_link_tos: true }).each do |events|
events.each do |event|
# iterate through events
end
end