Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim stream to version #294

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ MIT License
- [Reading from all streams](guides/Usage.md#reading-from-all-streams)
- [Stream from all streams](guides/Usage.md#stream-from-all-streams)
- [Linking events between streams](guides/Usage.md#linking-events-between-streams)
- [Trimming events from streams](guides/Usage.md#trimming-events-from-streams)
- [Subscriptions](guides/Subscriptions.md)
- [Transient subscriptions](guides/Subscriptions.md#transient-subscriptions)
- [Persistent subscriptions](guides/Subscriptions.md#persistent-subscriptions)
Expand Down
22 changes: 22 additions & 0 deletions guides/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,25 @@ alias MyApp.EventStore
```

You can also pass a list of `event_ids` instead of recorded event structs to link events.

## Trimming events from streams

Stream trimming allows you to permanently delete events from a stream up to a given version. This allows one form of 'Tombstoning' or 'Closing the books' where an event can be a rollup of the state so far.

Trim an existing stream up to the 50th event:

```elixir
alias MyApp.EventStore

:ok = EventStore.trim_stream(stream_uuid, 50)
```

Given a stream with 50 events, append 3 new events, and trim up to the last one:

```elixir
alias MyApp.EventStore

events = [withdrawn, deposited, closed_for_the_day]
expected_version = 50
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, trim_stream_to_version: 53)
```
10 changes: 8 additions & 2 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,15 @@ defmodule EventStore do
Supervisor.stop(supervisor, :normal, timeout)
end

@accepted_overrides_append_to_stream [:created_at_override]
@append_to_stream_overrides [:created_at_override, :trim_stream_to_version]

def append_to_stream(stream_uuid, expected_version, events, opts \\ [])

def append_to_stream(@all_stream, _expected_version, _events, _opts),
do: {:error, :cannot_append_to_all_stream}

def append_to_stream(stream_uuid, expected_version, events, opts) do
overrides = Keyword.take(opts, @accepted_overrides_append_to_stream)
overrides = Keyword.take(opts, @append_to_stream_overrides)
{conn, opts} = parse_opts(opts)
opts = Keyword.merge(opts, overrides)

Expand Down Expand Up @@ -409,6 +409,12 @@ defmodule EventStore do
def stream_all_backward(start_version, opts),
do: stream_backward(@all_stream, start_version, opts)

def trim_stream(stream_uuid, cutoff_version, expected_version \\ :any_version, opts \\ []) do
{conn, opts} = parse_opts(opts)

Stream.trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts)
end

def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ [])

def delete_stream(@all_stream, _expected_version, _type, _opts),
Expand Down
1 change: 1 addition & 0 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule EventStore.Sql.Statements do
{:subscription_ack, [:schema]},
{:insert_snapshot, [:schema]},
{:delete_snapshot, [:schema]},
{:trim_stream, [:schema, :stream_id]},
{:query_all_subscriptions, [:schema]},
{:query_snapshot, [:schema]},
{:query_stream_info, [:schema]},
Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<%#
<% #
# Elixir template variables:
# schema - string
# stream_id - integer
Expand All @@ -21,7 +21,7 @@
%>

WITH
<%#
<% #
# create a table variable with:
# event_id - uuid - the id for the new event
# index - integer - the increase in the stream version for any stream it is linked to
Expand Down
26 changes: 26 additions & 0 deletions lib/event_store/sql/statements/trim_stream.sql.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
WITH stream_info AS (
<%= if stream_id do %>
SELECT $1::bigint AS stream_id
<% else %>
SELECT stream_id
FROM "<%= schema %>".streams
WHERE stream_uuid = $1
<% end %>
),
deleted_stream_events AS (
DELETE FROM "<%= schema %>".stream_events AS stream_events
USING stream_info as s
WHERE stream_events.stream_id = s.stream_id
AND stream_version < $2
RETURNING event_id
),
linked_events AS (
DELETE FROM "<%= schema %>".stream_events
WHERE event_id IN (SELECT event_id FROM deleted_stream_events)
),
events AS (
DELETE FROM "<%= schema %>".events
WHERE event_id IN (SELECT event_id FROM deleted_stream_events)
)

select count(*) from deleted_stream_events;
10 changes: 9 additions & 1 deletion lib/event_store/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule EventStore.Storage do
Reader,
Snapshot,
Stream,
Subscription
Subscription,
TrimStream
}

@doc """
Expand Down Expand Up @@ -101,4 +102,11 @@ defmodule EventStore.Storage do
Delete an existing snapshot for a given source.
"""
defdelegate delete_snapshot(conn, source_uuid, opts), to: Snapshot

@doc """
Trim an existing stream up to the cutoff event
"""
defdelegate trim_stream(conn, stream_id, stream_uuid, cutoff_version, otps),
to: TrimStream,
as: :trim
end
31 changes: 31 additions & 0 deletions lib/event_store/storage/trim_stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule EventStore.Storage.TrimStream do
@moduledoc false

require Logger

alias EventStore.Sql.Statements

def trim(conn, stream_id, stream_uuid, cutoff_version, opts) do
{schema, opts} = Keyword.pop(opts, :schema)

query = Statements.trim_stream(schema, stream_id)

stream_id_or_uuid = stream_id || stream_uuid

case Postgrex.query(conn, query, [stream_id_or_uuid, cutoff_version], opts) do
{:ok, %Postgrex.Result{num_rows: 1, rows: [[num_events]]}} ->
Logger.debug("Trimmed #{num_events} events from stream #{inspect(stream_id)}")
:ok

{:ok, %Postgrex.Result{num_rows: 0}} ->
Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: stream not found")

{:error, :stream_not_found}

{:error, error} = reply ->
Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: " <> inspect(error))

reply
end
end
end
105 changes: 74 additions & 31 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,53 @@ defmodule EventStore.Streams.Stream do
alias EventStore.{EventData, RecordedEvent, Storage, UUID}
alias EventStore.Streams.StreamInfo

def append_to_stream(conn, stream_uuid, expected_version, events, opts)
when length(events) < 1000 do
{serializer, new_opts} = Keyword.pop(opts, :serializer)

with {:ok, stream} <- stream_info(conn, stream_uuid, expected_version, new_opts),
:ok <- do_append_to_storage(conn, stream, events, expected_version, serializer, new_opts) do
:ok
def trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) do
if Keyword.fetch!(opts, :enable_hard_deletes) do
transaction(
conn,
fn transaction ->
with {:ok, %StreamInfo{} = stream} <-
stream_info(transaction, stream_uuid, expected_version, opts),
:ok <- do_trim_stream(transaction, stream, cutoff_version, opts) do
:ok
else
{:error, error} -> Postgrex.rollback(transaction, error)
end
end,
opts
)
else
{:error, :not_supported}
end
|> maybe_retry_once(conn, stream_uuid, expected_version, events, opts)
end

def append_to_stream(conn, stream_uuid, expected_version, events, opts) do
{serializer, new_opts} = Keyword.pop(opts, :serializer)
with :ok <- validate_append_opts(opts, expected_version) do
{serializer, new_opts} = Keyword.pop(opts, :serializer)

transaction(
conn,
fn transaction ->
with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts),
:ok <-
do_append_to_storage(
transaction,
stream,
events,
expected_version,
serializer,
new_opts
) do
:ok
else
{:error, error} -> Postgrex.rollback(transaction, error)
end
end,
new_opts
)
|> maybe_retry_once(conn, stream_uuid, expected_version, events, opts)
transaction(
conn,
fn transaction ->
with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts),
:ok <-
do_append_to_storage(
transaction,
stream,
events,
expected_version,
serializer,
new_opts
),
:ok <- maybe_trim_stream(transaction, stream, new_opts) do
:ok
else
{:error, error} -> Postgrex.rollback(transaction, error)
end
end,
new_opts
)
|> maybe_retry_once(conn, stream_uuid, expected_version, events, opts)
end
end

def link_to_stream(conn, stream_uuid, expected_version, events_or_event_ids, opts) do
Expand Down Expand Up @@ -323,6 +335,36 @@ defmodule EventStore.Streams.Stream do
end
end

defp validate_append_opts(opts, expected_version) do
trim_version = Keyword.get(opts, :trim_stream_to_version, :no_trim)
hard_deletes_allowed? = Keyword.get(opts, :enable_hard_deletes, false)

case {trim_version, expected_version, hard_deletes_allowed?} do
{:no_trim, _, _} -> :ok
{_, :any_version, _} -> {:error, :cannot_trim_stream_with_any_version}
{_, _version, false} -> {:error, :cannot_trim_when_hard_deletes_not_enabled}
{_, _, _} -> :ok
end
end

defp maybe_trim_stream(transaction, %StreamInfo{} = stream, opts) do
case Keyword.get(opts, :trim_stream_to_version) do
nil ->
:ok

cutoff_version ->
do_trim_stream(transaction, stream, cutoff_version, opts)
end
end

defp do_trim_stream(transaction, %StreamInfo{} = stream, cutoff_version, opts) do
opts = query_opts(opts)

with :ok <- set_enable_hard_deletes(transaction) do
Storage.trim_stream(transaction, stream.stream_id, stream.stream_uuid, cutoff_version, opts)
end
end

defp set_enable_hard_deletes(conn) do
query = "SET SESSION eventstore.enable_hard_deletes TO 'on';"

Expand All @@ -348,7 +390,8 @@ defmodule EventStore.Streams.Stream do
end
end

defp maybe_retry_once(error, _conn, _stream_uuid, _expected_version, _events, _opts), do: error
defp maybe_retry_once(ok_or_error, _conn, _stream_uuid, _expected_version, _events, _opts),
do: ok_or_error

defp transaction(conn, transaction_fun, opts) do
case Postgrex.transaction(conn, transaction_fun, opts) do
Expand Down
81 changes: 81 additions & 0 deletions test/event_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,87 @@
end
end

describe "trimming the event stream" do
setup(tags) do
hard_deletes? = Map.get(tags, :enable_hard_deletes, true)
stop_supervised!(TestEventStore)
start_supervised!({TestEventStore, enable_hard_deletes: hard_deletes?})
:ok
end

test "should not allow trimming with :any_version" do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(2)

assert {:error, :cannot_trim_stream_with_any_version} =
EventStore.append_to_stream(stream_uuid, :any_version, events,
trim_stream_to_version: 2
)
end

@tag enable_hard_deletes: false
test "should not allow trimming when hard_deletes are disabled" do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(2)

assert {:error, :cannot_trim_when_hard_deletes_not_enabled} =
EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2)
end

test "should trim up to the given version" do
# When a stream exists with 2 events
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(2)
assert :ok = EventStore.append_to_stream(stream_uuid, 0, events)

# When we trim to stream to the 2nd event
assert :ok = EventStore.trim_stream(stream_uuid, 2)

# Then the stream has a single event in it, at version 2
assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid)
assert event.stream_version == 2

# And so does the $all stream
assert {:ok, [event]} = EventStore.read_stream_forward("$all")
assert event.stream_version == 2
end

test "should trim up to the event given when the stream exists" do
# Given an existing stream with an event
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)
assert :ok = EventStore.append_to_stream(stream_uuid, 0, events)

# When we append 2 events and ask the stream to be trimmed up to the 3rd event
events = EventFactory.create_events(2)
assert :ok = EventStore.append_to_stream(stream_uuid, 1, events, trim_stream_to_version: 3)

# Then the stream has a single event in it, at version 3
assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid)
assert event.stream_version == 3

# And so does the $all stream
assert {:ok, [event]} = EventStore.read_stream_forward("$all")
assert event.stream_version == 3
end

test "should trim up to the event given even when the stream doesn't exist" do
# When we append 2 events and ask the stream to be trimmed up to the 2nd event
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(2)

assert :ok = EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2)

# Then the stream has a single event in it, at version 2
assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid)
assert event.stream_version == 2

# And so does the $all stream
assert {:ok, [event]} = EventStore.read_stream_forward("$all")
assert event.stream_version == 2
end
end

describe "link to event store" do
setup do
source_stream_uuid = UUID.uuid4()
Expand Down Expand Up @@ -167,7 +248,7 @@

{:ok, recorded_events} = EventStore.read_stream_backward(stream_uuid)

assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events)

Check warning on line 251 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 251 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 251 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead
end

test "stream backward", %{stream_uuid: stream_uuid, events: events} do
Expand All @@ -176,7 +257,7 @@
recorded_events =
EventStore.stream_backward(stream_uuid, -1, batch_size: 5) |> Enum.to_list()

assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events)

Check warning on line 260 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 260 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 260 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead
end

test "stream all backward", %{stream_uuid: stream_uuid, events: events} do
Expand All @@ -184,7 +265,7 @@

recorded_events = EventStore.stream_all_backward(-1, batch_size: 5) |> Enum.to_list()

assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events)

Check warning on line 268 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 268 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead

Check warning on line 268 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.17.x, 27)

10..1 has a default step of -1, please write 10..1//-1 instead
end
end

Expand Down
Loading