From f6005708e95f70d2221a78e9d031c2f16fc7b2e7 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 23 Jan 2025 19:11:13 -0500 Subject: [PATCH] feat: add annotations to subscriptions table --- .../statements/insert_subscription.sql.eex | 7 ++- lib/event_store/storage/subscription.ex | 56 ++++++++++++++++--- priv/event_store/migrations/v1.4.0.sql | 7 +++ .../storage/subscription_persistence_test.exs | 45 ++++++++++++++- 4 files changed, 103 insertions(+), 12 deletions(-) create mode 100644 priv/event_store/migrations/v1.4.0.sql diff --git a/lib/event_store/sql/statements/insert_subscription.sql.eex b/lib/event_store/sql/statements/insert_subscription.sql.eex index 41821ff1..e0b630e0 100644 --- a/lib/event_store/sql/statements/insert_subscription.sql.eex +++ b/lib/event_store/sql/statements/insert_subscription.sql.eex @@ -2,7 +2,8 @@ INSERT INTO "<%= schema %>".subscriptions ( stream_uuid, subscription_name, - last_seen + last_seen, + annotations ) -VALUES ($1, $2, $3) -RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at; +VALUES ($1, $2, $3, $4) +RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at, annotations; diff --git a/lib/event_store/storage/subscription.ex b/lib/event_store/storage/subscription.ex index 160294d0..69b40d8f 100644 --- a/lib/event_store/storage/subscription.ex +++ b/lib/event_store/storage/subscription.ex @@ -12,15 +12,40 @@ defmodule EventStore.Storage.Subscription do QuerySubscription } + @typedoc """ + A subscription to an event stream. + + * `subscription_id` - Unique identifier for the subscription + * `stream_uuid` - The stream being subscribed to + * `subscription_name` - Name of the subscription + * `last_seen` - Last event seen by this subscription + * `created_at` - When the subscription was created + * `annotations` - Arbitrary non-identifying metadata attached to the + subscription, inspired by Kubernetes annotations. These are key-value pairs + that can be used to store auxiliary information about a subscription that + is not directly part of its core functionality. For example: + * Build/release information (team owner, git sha, etc.) + * Client-specific configuration + * Debugging info + * Tool information + """ @type t :: %EventStore.Storage.Subscription{ subscription_id: non_neg_integer(), stream_uuid: String.t(), subscription_name: String.t(), last_seen: non_neg_integer() | nil, - created_at: DateTime.t() + created_at: DateTime.t(), + annotations: map() } - defstruct [:subscription_id, :stream_uuid, :subscription_name, :last_seen, :created_at] + defstruct [ + :subscription_id, + :stream_uuid, + :subscription_name, + :last_seen, + :created_at, + :annotations + ] defdelegate subscriptions(conn, opts), to: QueryAllSubscriptions, as: :execute @@ -49,7 +74,17 @@ defmodule EventStore.Storage.Subscription do do: Subscription.Delete.execute(conn, stream_uuid, subscription_name, opts) defp create_subscription(conn, stream_uuid, subscription_name, start_from, opts) do - case CreateSubscription.execute(conn, stream_uuid, subscription_name, start_from, opts) do + annotations = Keyword.get(opts, :annotations, %{}) + {schema, opts} = Keyword.pop(opts, :schema) + + case CreateSubscription.execute( + conn, + stream_uuid, + subscription_name, + start_from, + annotations, + opts + ) do {:ok, %Subscription{}} = reply -> reply @@ -96,7 +131,7 @@ defmodule EventStore.Storage.Subscription do defmodule CreateSubscription do @moduledoc false - def execute(conn, stream_uuid, subscription_name, start_from, opts) do + def execute(conn, stream_uuid, subscription_name, start_from, annotations, opts) do Logger.debug( "Attempting to create subscription on stream " <> inspect(stream_uuid) <> @@ -107,7 +142,12 @@ defmodule EventStore.Storage.Subscription do query = Statements.insert_subscription(schema) - case Postgrex.query(conn, query, [stream_uuid, subscription_name, start_from], opts) do + case Postgrex.query( + conn, + query, + [stream_uuid, subscription_name, start_from, annotations], + opts + ) do {:ok, %Postgrex.Result{rows: rows}} -> Logger.debug( "Created subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\"" @@ -200,7 +240,8 @@ defmodule EventStore.Storage.Subscription do stream_uuid, subscription_name, last_seen, - created_at + created_at, + annotations ] = row %Subscription{ @@ -208,7 +249,8 @@ defmodule EventStore.Storage.Subscription do stream_uuid: stream_uuid, subscription_name: subscription_name, last_seen: last_seen, - created_at: created_at + created_at: created_at, + annotations: annotations || %{} } end end diff --git a/priv/event_store/migrations/v1.4.0.sql b/priv/event_store/migrations/v1.4.0.sql new file mode 100644 index 00000000..394fdccd --- /dev/null +++ b/priv/event_store/migrations/v1.4.0.sql @@ -0,0 +1,7 @@ +-- Add annotations field to subscriptions table + +ALTER TABLE subscriptions +ADD COLUMN annotations jsonb DEFAULT '{}'::jsonb NOT NULL; + +-- Add index on annotations for better query performance +CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations); \ No newline at end of file diff --git a/test/storage/subscription_persistence_test.exs b/test/storage/subscription_persistence_test.exs index c438c88d..96388eae 100644 --- a/test/storage/subscription_persistence_test.exs +++ b/test/storage/subscription_persistence_test.exs @@ -69,16 +69,57 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do verify_subscription(subscription, 1) end + test "create subscription with annotations", context do + annotations = %{"key" => "value", "nested" => %{"data" => 123}} + {:ok, subscription} = subscribe_to_stream(context, annotations: annotations) + + verify_subscription(subscription) + assert subscription.annotations == annotations + end + + test "create subscription with default empty annotations", context do + {:ok, subscription} = subscribe_to_stream(context) + + verify_subscription(subscription) + assert subscription.annotations == %{} + end + + test "create subscription when already exists preserves annotations", context do + # First create with annotations + initial_annotations = %{"key" => "value", "metadata" => %{"version" => 1}} + {:ok, subscription1} = subscribe_to_stream(context, annotations: initial_annotations) + + # Then try different scenarios that should all preserve the original annotations + # No annotations provided + {:ok, subscription2} = subscribe_to_stream(context) + # Explicit nil + {:ok, subscription3} = subscribe_to_stream(context, annotations: nil) + # Empty map + {:ok, subscription4} = subscribe_to_stream(context, annotations: %{}) + + # Verify all subscriptions are the same + assert subscription1.subscription_id == subscription2.subscription_id + assert subscription2.subscription_id == subscription3.subscription_id + assert subscription3.subscription_id == subscription4.subscription_id + + # Verify annotations are preserved in all cases + assert subscription1.annotations == initial_annotations + assert subscription2.annotations == initial_annotations + assert subscription3.annotations == initial_annotations + assert subscription4.annotations == initial_annotations + end + def ack_last_seen_event(context, last_seen) do %{conn: conn, schema: schema} = context Storage.ack_last_seen_event(conn, @all_stream, @subscription_name, last_seen, schema: schema) end - defp subscribe_to_stream(context) do + defp subscribe_to_stream(context, opts \\ []) do %{conn: conn, schema: schema} = context + opts = Keyword.merge([schema: schema], opts) - Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, schema: schema) + Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, opts) end defp delete_subscription(context) do