diff --git a/README.md b/README.md index f7eb1756..efcb65db 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ MIT License - [Reading from all streams](guides/Usage.md#reading-from-all-streams) - [Stream from all streams](guides/Usage.md#stream-from-all-streams) - [Linking events between streams](guides/Usage.md#linking-events-between-streams) + - [Trimming events from streams](guides/Usage.md#trimming-events-from-streams) - [Subscriptions](guides/Subscriptions.md) - [Transient subscriptions](guides/Subscriptions.md#transient-subscriptions) - [Persistent subscriptions](guides/Subscriptions.md#persistent-subscriptions) diff --git a/guides/Usage.md b/guides/Usage.md index 6f718f01..cf41110c 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -156,3 +156,25 @@ alias MyApp.EventStore ``` You can also pass a list of `event_ids` instead of recorded event structs to link events. + +## Trimming events from streams + +Stream trimming allows you to permanently delete events from a stream up to a given version. This allows one form of 'Tombstoning' or 'Closing the books' where an event can be a rollup of the state so far. + +Trim an existing stream up to the 50th event: + +```elixir +alias MyApp.EventStore + +:ok = EventStore.trim_stream(stream_uuid, 50) +``` + +Given a stream with 50 events, append 3 new events, and trim up to the last one: + +```elixir +alias MyApp.EventStore + +events = [withdrawn, deposited, closed_for_the_day] +expected_version = 50 +:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, trim_stream_to_version: 53) +``` diff --git a/lib/event_store.ex b/lib/event_store.ex index a02560a8..74599c0b 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -294,7 +294,7 @@ defmodule EventStore do Supervisor.stop(supervisor, :normal, timeout) end - @accepted_overrides_append_to_stream [:created_at_override] + @append_to_stream_overrides [:created_at_override, :trim_stream_to_version] def append_to_stream(stream_uuid, expected_version, events, opts \\ []) @@ -302,7 +302,7 @@ defmodule EventStore do do: {:error, :cannot_append_to_all_stream} def append_to_stream(stream_uuid, expected_version, events, opts) do - overrides = Keyword.take(opts, @accepted_overrides_append_to_stream) + overrides = Keyword.take(opts, @append_to_stream_overrides) {conn, opts} = parse_opts(opts) opts = Keyword.merge(opts, overrides) @@ -409,6 +409,12 @@ defmodule EventStore do def stream_all_backward(start_version, opts), do: stream_backward(@all_stream, start_version, opts) + def trim_stream(stream_uuid, cutoff_version, expected_version \\ :any_version, opts \\ []) do + {conn, opts} = parse_opts(opts) + + Stream.trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) + end + def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ []) def delete_stream(@all_stream, _expected_version, _type, _opts), diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index c9801731..e586a43c 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -23,6 +23,7 @@ defmodule EventStore.Sql.Statements do {:subscription_ack, [:schema]}, {:insert_snapshot, [:schema]}, {:delete_snapshot, [:schema]}, + {:trim_stream, [:schema, :stream_id]}, {:query_all_subscriptions, [:schema]}, {:query_snapshot, [:schema]}, {:query_stream_info, [:schema]}, diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index 27141d76..ec198542 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -1,4 +1,4 @@ -<%# +<% # # Elixir template variables: # schema - string # stream_id - integer @@ -21,7 +21,7 @@ %> WITH - <%# + <% # # create a table variable with: # event_id - uuid - the id for the new event # index - integer - the increase in the stream version for any stream it is linked to diff --git a/lib/event_store/sql/statements/trim_stream.sql.eex b/lib/event_store/sql/statements/trim_stream.sql.eex new file mode 100644 index 00000000..b334bcb9 --- /dev/null +++ b/lib/event_store/sql/statements/trim_stream.sql.eex @@ -0,0 +1,26 @@ +WITH stream_info AS ( + <%= if stream_id do %> + SELECT $1::bigint AS stream_id + <% else %> + SELECT stream_id + FROM "<%= schema %>".streams + WHERE stream_uuid = $1 + <% end %> +), +deleted_stream_events AS ( + DELETE FROM "<%= schema %>".stream_events AS stream_events + USING stream_info as s + WHERE stream_events.stream_id = s.stream_id + AND stream_version < $2 + RETURNING event_id +), +linked_events AS ( + DELETE FROM "<%= schema %>".stream_events + WHERE event_id IN (SELECT event_id FROM deleted_stream_events) +), +events AS ( + DELETE FROM "<%= schema %>".events + WHERE event_id IN (SELECT event_id FROM deleted_stream_events) +) + +select count(*) from deleted_stream_events; diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index 0f2af5ac..a2055b17 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -8,7 +8,8 @@ defmodule EventStore.Storage do Reader, Snapshot, Stream, - Subscription + Subscription, + TrimStream } @doc """ @@ -101,4 +102,11 @@ defmodule EventStore.Storage do Delete an existing snapshot for a given source. """ defdelegate delete_snapshot(conn, source_uuid, opts), to: Snapshot + + @doc """ + Trim an existing stream up to the cutoff event + """ + defdelegate trim_stream(conn, stream_id, stream_uuid, cutoff_version, otps), + to: TrimStream, + as: :trim end diff --git a/lib/event_store/storage/trim_stream.ex b/lib/event_store/storage/trim_stream.ex new file mode 100644 index 00000000..be41ab7a --- /dev/null +++ b/lib/event_store/storage/trim_stream.ex @@ -0,0 +1,31 @@ +defmodule EventStore.Storage.TrimStream do + @moduledoc false + + require Logger + + alias EventStore.Sql.Statements + + def trim(conn, stream_id, stream_uuid, cutoff_version, opts) do + {schema, opts} = Keyword.pop(opts, :schema) + + query = Statements.trim_stream(schema, stream_id) + + stream_id_or_uuid = stream_id || stream_uuid + + case Postgrex.query(conn, query, [stream_id_or_uuid, cutoff_version], opts) do + {:ok, %Postgrex.Result{num_rows: 1, rows: [[num_events]]}} -> + Logger.debug("Trimmed #{num_events} events from stream #{inspect(stream_id)}") + :ok + + {:ok, %Postgrex.Result{num_rows: 0}} -> + Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: stream not found") + + {:error, :stream_not_found} + + {:error, error} = reply -> + Logger.warning("Failed to trim stream #{inspect(stream_id)} due to: " <> inspect(error)) + + reply + end + end +end diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 28cb554d..65310a4a 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -4,41 +4,53 @@ defmodule EventStore.Streams.Stream do alias EventStore.{EventData, RecordedEvent, Storage, UUID} alias EventStore.Streams.StreamInfo - def append_to_stream(conn, stream_uuid, expected_version, events, opts) - when length(events) < 1000 do - {serializer, new_opts} = Keyword.pop(opts, :serializer) - - with {:ok, stream} <- stream_info(conn, stream_uuid, expected_version, new_opts), - :ok <- do_append_to_storage(conn, stream, events, expected_version, serializer, new_opts) do - :ok + def trim_stream(conn, stream_uuid, cutoff_version, expected_version, opts) do + if Keyword.fetch!(opts, :enable_hard_deletes) do + transaction( + conn, + fn transaction -> + with {:ok, %StreamInfo{} = stream} <- + stream_info(transaction, stream_uuid, expected_version, opts), + :ok <- do_trim_stream(transaction, stream, cutoff_version, opts) do + :ok + else + {:error, error} -> Postgrex.rollback(transaction, error) + end + end, + opts + ) + else + {:error, :not_supported} end - |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) end def append_to_stream(conn, stream_uuid, expected_version, events, opts) do - {serializer, new_opts} = Keyword.pop(opts, :serializer) + with :ok <- validate_append_opts(opts, expected_version) do + {serializer, new_opts} = Keyword.pop(opts, :serializer) - transaction( - conn, - fn transaction -> - with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), - :ok <- - do_append_to_storage( - transaction, - stream, - events, - expected_version, - serializer, - new_opts - ) do - :ok - else - {:error, error} -> Postgrex.rollback(transaction, error) - end - end, - new_opts - ) - |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) + transaction( + conn, + fn transaction -> + with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), + :ok <- + do_append_to_storage( + transaction, + stream, + events, + expected_version, + serializer, + new_opts + ), + :ok <- maybe_trim_stream(transaction, stream, new_opts) do + :ok + else + {:error, error} -> Postgrex.rollback(transaction, error) + end + end, + new_opts + ) + |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) + end end def link_to_stream(conn, stream_uuid, expected_version, events_or_event_ids, opts) do @@ -323,6 +335,36 @@ defmodule EventStore.Streams.Stream do end end + defp validate_append_opts(opts, expected_version) do + trim_version = Keyword.get(opts, :trim_stream_to_version, :no_trim) + hard_deletes_allowed? = Keyword.get(opts, :enable_hard_deletes, false) + + case {trim_version, expected_version, hard_deletes_allowed?} do + {:no_trim, _, _} -> :ok + {_, :any_version, _} -> {:error, :cannot_trim_stream_with_any_version} + {_, _version, false} -> {:error, :cannot_trim_when_hard_deletes_not_enabled} + {_, _, _} -> :ok + end + end + + defp maybe_trim_stream(transaction, %StreamInfo{} = stream, opts) do + case Keyword.get(opts, :trim_stream_to_version) do + nil -> + :ok + + cutoff_version -> + do_trim_stream(transaction, stream, cutoff_version, opts) + end + end + + defp do_trim_stream(transaction, %StreamInfo{} = stream, cutoff_version, opts) do + opts = query_opts(opts) + + with :ok <- set_enable_hard_deletes(transaction) do + Storage.trim_stream(transaction, stream.stream_id, stream.stream_uuid, cutoff_version, opts) + end + end + defp set_enable_hard_deletes(conn) do query = "SET SESSION eventstore.enable_hard_deletes TO 'on';" @@ -348,7 +390,8 @@ defmodule EventStore.Streams.Stream do end end - defp maybe_retry_once(error, _conn, _stream_uuid, _expected_version, _events, _opts), do: error + defp maybe_retry_once(ok_or_error, _conn, _stream_uuid, _expected_version, _events, _opts), + do: ok_or_error defp transaction(conn, transaction_fun, opts) do case Postgrex.transaction(conn, transaction_fun, opts) do diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 630c0775..553ffeed 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -65,6 +65,87 @@ defmodule EventStore.EventStoreTest do end end + describe "trimming the event stream" do + setup(tags) do + hard_deletes? = Map.get(tags, :enable_hard_deletes, true) + stop_supervised!(TestEventStore) + start_supervised!({TestEventStore, enable_hard_deletes: hard_deletes?}) + :ok + end + + test "should not allow trimming with :any_version" do + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert {:error, :cannot_trim_stream_with_any_version} = + EventStore.append_to_stream(stream_uuid, :any_version, events, + trim_stream_to_version: 2 + ) + end + + @tag enable_hard_deletes: false + test "should not allow trimming when hard_deletes are disabled" do + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert {:error, :cannot_trim_when_hard_deletes_not_enabled} = + EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2) + end + + test "should trim up to the given version" do + # When a stream exists with 2 events + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events) + + # When we trim to stream to the 2nd event + assert :ok = EventStore.trim_stream(stream_uuid, 2) + + # Then the stream has a single event in it, at version 2 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 2 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 2 + end + + test "should trim up to the event given when the stream exists" do + # Given an existing stream with an event + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events) + + # When we append 2 events and ask the stream to be trimmed up to the 3rd event + events = EventFactory.create_events(2) + assert :ok = EventStore.append_to_stream(stream_uuid, 1, events, trim_stream_to_version: 3) + + # Then the stream has a single event in it, at version 3 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 3 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 3 + end + + test "should trim up to the event given even when the stream doesn't exist" do + # When we append 2 events and ask the stream to be trimmed up to the 2nd event + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events, trim_stream_to_version: 2) + + # Then the stream has a single event in it, at version 2 + assert {:ok, [event]} = EventStore.read_stream_forward(stream_uuid) + assert event.stream_version == 2 + + # And so does the $all stream + assert {:ok, [event]} = EventStore.read_stream_forward("$all") + assert event.stream_version == 2 + end + end + describe "link to event store" do setup do source_stream_uuid = UUID.uuid4()