Skip to content

Commit

Permalink
feat: add annotations to subscriptions table
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Jan 24, 2025
1 parent 362f087 commit f600570
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 12 deletions.
7 changes: 4 additions & 3 deletions lib/event_store/sql/statements/insert_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ INSERT INTO "<%= schema %>".subscriptions
(
stream_uuid,
subscription_name,
last_seen
last_seen,
annotations
)
VALUES ($1, $2, $3)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at;
VALUES ($1, $2, $3, $4)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at, annotations;
56 changes: 49 additions & 7 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,40 @@ defmodule EventStore.Storage.Subscription do
QuerySubscription
}

@typedoc """
A subscription to an event stream.
* `subscription_id` - Unique identifier for the subscription
* `stream_uuid` - The stream being subscribed to
* `subscription_name` - Name of the subscription
* `last_seen` - Last event seen by this subscription
* `created_at` - When the subscription was created
* `annotations` - Arbitrary non-identifying metadata attached to the
subscription, inspired by Kubernetes annotations. These are key-value pairs
that can be used to store auxiliary information about a subscription that
is not directly part of its core functionality. For example:
* Build/release information (team owner, git sha, etc.)
* Client-specific configuration
* Debugging info
* Tool information
"""
@type t :: %EventStore.Storage.Subscription{
subscription_id: non_neg_integer(),
stream_uuid: String.t(),
subscription_name: String.t(),
last_seen: non_neg_integer() | nil,
created_at: DateTime.t()
created_at: DateTime.t(),
annotations: map()
}

defstruct [:subscription_id, :stream_uuid, :subscription_name, :last_seen, :created_at]
defstruct [
:subscription_id,
:stream_uuid,
:subscription_name,
:last_seen,
:created_at,
:annotations
]

defdelegate subscriptions(conn, opts), to: QueryAllSubscriptions, as: :execute

Expand Down Expand Up @@ -49,7 +74,17 @@ defmodule EventStore.Storage.Subscription do
do: Subscription.Delete.execute(conn, stream_uuid, subscription_name, opts)

defp create_subscription(conn, stream_uuid, subscription_name, start_from, opts) do
case CreateSubscription.execute(conn, stream_uuid, subscription_name, start_from, opts) do
annotations = Keyword.get(opts, :annotations, %{})
{schema, opts} = Keyword.pop(opts, :schema)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.15.x, 26)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.15.x, 26)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.15.x, 26)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.x, 26, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.x, 26, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.x, 26, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 78 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

variable "schema" is unused (if the variable is not meant to be used, prefix it with an underscore)

case CreateSubscription.execute(
conn,
stream_uuid,
subscription_name,
start_from,
annotations,
opts
) do
{:ok, %Subscription{}} = reply ->
reply

Expand Down Expand Up @@ -96,7 +131,7 @@ defmodule EventStore.Storage.Subscription do
defmodule CreateSubscription do
@moduledoc false

def execute(conn, stream_uuid, subscription_name, start_from, opts) do
def execute(conn, stream_uuid, subscription_name, start_from, annotations, opts) do
Logger.debug(
"Attempting to create subscription on stream " <>
inspect(stream_uuid) <>
Expand All @@ -107,7 +142,12 @@ defmodule EventStore.Storage.Subscription do

query = Statements.insert_subscription(schema)

case Postgrex.query(conn, query, [stream_uuid, subscription_name, start_from], opts) do
case Postgrex.query(
conn,
query,
[stream_uuid, subscription_name, start_from, annotations],
opts
) do
{:ok, %Postgrex.Result{rows: rows}} ->
Logger.debug(
"Created subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\""
Expand Down Expand Up @@ -200,15 +240,17 @@ defmodule EventStore.Storage.Subscription do
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
] = row

%Subscription{
subscription_id: subscription_id,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
last_seen: last_seen,
created_at: created_at
created_at: created_at,
annotations: annotations || %{}
}
end
end
Expand Down
7 changes: 7 additions & 0 deletions priv/event_store/migrations/v1.4.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add annotations field to subscriptions table

ALTER TABLE subscriptions
ADD COLUMN annotations jsonb DEFAULT '{}'::jsonb NOT NULL;

-- Add index on annotations for better query performance
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
45 changes: 43 additions & 2 deletions test/storage/subscription_persistence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,57 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
verify_subscription(subscription, 1)
end

test "create subscription with annotations", context do
annotations = %{"key" => "value", "nested" => %{"data" => 123}}
{:ok, subscription} = subscribe_to_stream(context, annotations: annotations)

verify_subscription(subscription)
assert subscription.annotations == annotations
end

test "create subscription with default empty annotations", context do
{:ok, subscription} = subscribe_to_stream(context)

verify_subscription(subscription)
assert subscription.annotations == %{}
end

test "create subscription when already exists preserves annotations", context do
# First create with annotations
initial_annotations = %{"key" => "value", "metadata" => %{"version" => 1}}
{:ok, subscription1} = subscribe_to_stream(context, annotations: initial_annotations)

# Then try different scenarios that should all preserve the original annotations
# No annotations provided
{:ok, subscription2} = subscribe_to_stream(context)
# Explicit nil
{:ok, subscription3} = subscribe_to_stream(context, annotations: nil)
# Empty map
{:ok, subscription4} = subscribe_to_stream(context, annotations: %{})

# Verify all subscriptions are the same
assert subscription1.subscription_id == subscription2.subscription_id
assert subscription2.subscription_id == subscription3.subscription_id
assert subscription3.subscription_id == subscription4.subscription_id

# Verify annotations are preserved in all cases
assert subscription1.annotations == initial_annotations
assert subscription2.annotations == initial_annotations
assert subscription3.annotations == initial_annotations
assert subscription4.annotations == initial_annotations
end

def ack_last_seen_event(context, last_seen) do
%{conn: conn, schema: schema} = context

Storage.ack_last_seen_event(conn, @all_stream, @subscription_name, last_seen, schema: schema)
end

defp subscribe_to_stream(context) do
defp subscribe_to_stream(context, opts \\ []) do
%{conn: conn, schema: schema} = context
opts = Keyword.merge([schema: schema], opts)

Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, schema: schema)
Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, opts)
end

defp delete_subscription(context) do
Expand Down

0 comments on commit f600570

Please sign in to comment.