Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow collections #2569

Merged
merged 9 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to

### Added

- Introduces collections, a programatic workflow data sharing resource.
[#2551](https://github.com/OpenFn/lightning/issues/2551)
- Notify users when a Kafka trigger can not persist a message to the database.
[#2386](https://github.com/OpenFn/lightning/issues/2386)

Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ config :lightning, :default_retention_period, nil

config :lightning, Lightning.Runtime.RuntimeManager, start: false

config :lightning, Lightning.Collections, query_all_limit: 1_000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,5 @@ config :lightning, :github_app,
FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
-----END RSA PRIVATE KEY-----
"""

config :lightning, Lightning.Collections, query_all_limit: 50
105 changes: 105 additions & 0 deletions lib/lightning/collections.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Lightning.Collections do
@moduledoc """
Access to collections of unique key-value pairs shared across multiple workflows.
"""
import Ecto.Query

alias Lightning.Collections.Collection
alias Lightning.Collections.Item
alias Lightning.Repo

@query_all_limit Application.compile_env!(:lightning, __MODULE__)[
:query_all_limit
]

@spec get_collection(String.t()) ::
{:ok, Collection.t()} | {:error, :collection_not_found}
def get_collection(name) do
case Repo.get_by(Collection, name: name) do
nil -> {:error, :collection_not_found}
collection -> {:ok, collection}
end
end

@spec create_collection(Ecto.UUID.t(), String.t()) ::
{:ok, Collection.t()} | {:error, Ecto.Changeset.t()}
def create_collection(project_id, name) do
%Collection{}
|> Collection.changeset(%{project_id: project_id, name: name})
|> Repo.insert()
end

@spec delete_collection(Ecto.UUID.t()) ::
{:ok, Collection.t()}
| {:error, Ecto.Changeset.t()}
| {:error, :collection_not_found}
def delete_collection(collection_id) do
case Repo.get(Collection, collection_id) do
nil -> {:error, :collection_not_found}
collection -> Repo.delete(collection)
end
end

@spec get(Collection.t(), String.t()) :: Item.t()
def get(%{id: collection_id}, key) do
Repo.get_by(Item, collection_id: collection_id, key: key)
end

@spec stream_all(Collection.t(), String.t() | nil, integer()) :: Enum.t()
def stream_all(%{id: collection_id}, cursor \\ nil, limit \\ @query_all_limit) do
collection_id
|> stream_query(cursor, limit)
|> Repo.stream()
end

@spec stream_match(Collection.t(), String.t(), String.t() | nil, integer()) ::
Enum.t()
def stream_match(
%{id: collection_id},
pattern,
cursor \\ nil,
limit \\ @query_all_limit
) do
pattern =
pattern
|> String.replace("\\", "\\\\")
|> String.replace("%", "\\%")
|> String.replace("*", "%")

collection_id
|> stream_query(cursor, limit)
|> where([i], like(i.key, ^pattern))
|> Repo.stream()
end

@spec put(Collection.t(), String.t(), String.t()) ::
{:ok, Item.t()} | {:error, Ecto.Changeset.t()}
def put(%{id: collection_id}, key, value) do
with nil <- Repo.get_by(Item, collection_id: collection_id, key: key) do
%Item{}
end
|> Item.changeset(%{collection_id: collection_id, key: key, value: value})
|> Repo.insert_or_update()
end

@spec delete(Collection.t(), String.t()) ::
{:ok, Item.t()} | {:error, :not_found}
def delete(%{id: collection_id}, key) do
case Repo.get_by(Item, collection_id: collection_id, key: key) do
nil -> {:error, :not_found}
item -> Repo.delete(item)
end
end

defp stream_query(collection_id, cursor, limit) do
Item
|> where([i], i.collection_id == ^collection_id)
|> limit(^limit)
|> then(fn query ->
case cursor do
nil -> query
cursor_key -> where(query, [i], i.key > ^cursor_key)
end
end)
end
end
31 changes: 31 additions & 0 deletions lib/lightning/collections/collection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Lightning.Collections.Collection do
@moduledoc """
Collection referenced by name associated to a project.
"""
use Lightning.Schema

import Ecto.Changeset

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
project_id: Ecto.UUID.t(),
name: String.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}

schema "collections" do
field :name, :string
belongs_to :project, Lightning.Projects.Project

timestamps()
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:project_id, :name])
|> validate_required([:project_id, :name])
|> unique_constraint([:name])
end
end
35 changes: 35 additions & 0 deletions lib/lightning/collections/item.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Lightning.Collections.Item do
@moduledoc """
A key value entry of a collection bound to a project.
"""
use Lightning.Schema

import Ecto.Changeset

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
collection_id: Ecto.UUID.t(),
key: String.t(),
value: String.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}

schema "collections_items" do
field :key, :string
field :value, :string

belongs_to :collection, Lightning.Collections.Collection

timestamps()
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:collection_id, :key, :value])
|> validate_required([:collection_id, :key, :value])
|> unique_constraint([:collection_id, :key])
|> foreign_key_constraint(:collection_id)
end
end
36 changes: 36 additions & 0 deletions priv/repo/migrations/20241009021209_create_collections.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Lightning.Repo.Migrations.CreateCollections do
use Ecto.Migration

def change do
create table(:collections, primary_key: false) do
add :id, :binary_id, primary_key: true
add :name, :string

add :project_id,
references(:projects, on_delete: :delete_all, type: :binary_id, null: false)

timestamps()
end

create unique_index(:collections, [:name])

create table(:collections_items, primary_key: false) do
add :id, :binary_id, primary_key: true
add :key, :string
add :value, :string

add :collection_id,
references(:collections, type: :binary_id, on_delete: :delete_all, null: false)

timestamps(type: :naive_datetime_usec)
end

execute "CREATE EXTENSION IF NOT EXISTS pg_trgm",
"DROP EXTENSION IF EXISTS pg_trgm"

create unique_index(:collections_items, [:collection_id, :key])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one, do you think we should put an index on updated_at and inserted_at as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @stuartc I would like to play a bit more with the BRIN indexes because I am trying to avoid adding more cost to the job execution. May we add it during the "query by filter" PR? Will have better stats.


execute "CREATE INDEX collections_items_key_trgm_idx ON collections_items USING GIN (key gin_trgm_ops)",
"DROP INDEX IF EXISTS collections_items_key_trgm_idx"
end
end
Loading