Skip to content

Commit

Permalink
Add workflow collections (#2569)
Browse files Browse the repository at this point in the history
* Add workflow collections

These are named and shared key-value storages

* Increase timestamp precision

* Use single return patter on same get function

* Optimize all ops once the names are stable

There is no user update on collection name

* Changelog and formatting

* Add stream_all allowing equivalent queries to HGETALL and HSCAN

* Add stream_match for wildcard prefix queries

* Add pg_trigram and GIN index on the key to allow multi wildcard
  • Loading branch information
jyeshe committed Oct 22, 2024
1 parent 1ff1ff1 commit fd14f8f
Show file tree
Hide file tree
Showing 9 changed files with 610 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to
Arcade videos [#2563](https://github.com/OpenFn/lightning/issues/2563)
- Store user preferences in database
[#2564](https://github.com/OpenFn/lightning/issues/2564)
- Introduces collections, a programatic workflow data sharing resource.
[#2551](https://github.com/OpenFn/lightning/issues/2551)

### Changed

Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ config :lightning, :default_retention_period, nil

config :lightning, Lightning.Runtime.RuntimeManager, start: false

config :lightning, Lightning.Collections, query_all_limit: 1_000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,5 @@ config :lightning, :github_app,
FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
-----END RSA PRIVATE KEY-----
"""

config :lightning, Lightning.Collections, query_all_limit: 50
105 changes: 105 additions & 0 deletions lib/lightning/collections.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Lightning.Collections do
@moduledoc """
Access to collections of unique key-value pairs shared across multiple workflows.
"""
import Ecto.Query

alias Lightning.Collections.Collection
alias Lightning.Collections.Item
alias Lightning.Repo

@query_all_limit Application.compile_env!(:lightning, __MODULE__)[
:query_all_limit
]

@spec get_collection(String.t()) ::
{:ok, Collection.t()} | {:error, :collection_not_found}
def get_collection(name) do
case Repo.get_by(Collection, name: name) do
nil -> {:error, :collection_not_found}
collection -> {:ok, collection}
end
end

@spec create_collection(Ecto.UUID.t(), String.t()) ::
{:ok, Collection.t()} | {:error, Ecto.Changeset.t()}
def create_collection(project_id, name) do
%Collection{}
|> Collection.changeset(%{project_id: project_id, name: name})
|> Repo.insert()
end

@spec delete_collection(Ecto.UUID.t()) ::
{:ok, Collection.t()}
| {:error, Ecto.Changeset.t()}
| {:error, :collection_not_found}
def delete_collection(collection_id) do
case Repo.get(Collection, collection_id) do
nil -> {:error, :collection_not_found}
collection -> Repo.delete(collection)
end
end

@spec get(Collection.t(), String.t()) :: Item.t()
def get(%{id: collection_id}, key) do
Repo.get_by(Item, collection_id: collection_id, key: key)
end

@spec stream_all(Collection.t(), String.t() | nil, integer()) :: Enum.t()
def stream_all(%{id: collection_id}, cursor \\ nil, limit \\ @query_all_limit) do
collection_id
|> stream_query(cursor, limit)
|> Repo.stream()
end

@spec stream_match(Collection.t(), String.t(), String.t() | nil, integer()) ::
Enum.t()
def stream_match(
%{id: collection_id},
pattern,
cursor \\ nil,
limit \\ @query_all_limit
) do
pattern =
pattern
|> String.replace("\\", "\\\\")
|> String.replace("%", "\\%")
|> String.replace("*", "%")

collection_id
|> stream_query(cursor, limit)
|> where([i], like(i.key, ^pattern))
|> Repo.stream()
end

@spec put(Collection.t(), String.t(), String.t()) ::
{:ok, Item.t()} | {:error, Ecto.Changeset.t()}
def put(%{id: collection_id}, key, value) do
with nil <- Repo.get_by(Item, collection_id: collection_id, key: key) do
%Item{}
end
|> Item.changeset(%{collection_id: collection_id, key: key, value: value})
|> Repo.insert_or_update()
end

@spec delete(Collection.t(), String.t()) ::
{:ok, Item.t()} | {:error, :not_found}
def delete(%{id: collection_id}, key) do
case Repo.get_by(Item, collection_id: collection_id, key: key) do
nil -> {:error, :not_found}
item -> Repo.delete(item)
end
end

defp stream_query(collection_id, cursor, limit) do
Item
|> where([i], i.collection_id == ^collection_id)
|> limit(^limit)
|> then(fn query ->
case cursor do
nil -> query
cursor_key -> where(query, [i], i.key > ^cursor_key)
end
end)
end
end
31 changes: 31 additions & 0 deletions lib/lightning/collections/collection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Lightning.Collections.Collection do
@moduledoc """
Collection referenced by name associated to a project.
"""
use Lightning.Schema

import Ecto.Changeset

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
project_id: Ecto.UUID.t(),
name: String.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}

schema "collections" do
field :name, :string
belongs_to :project, Lightning.Projects.Project

timestamps()
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:project_id, :name])
|> validate_required([:project_id, :name])
|> unique_constraint([:name])
end
end
35 changes: 35 additions & 0 deletions lib/lightning/collections/item.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Lightning.Collections.Item do
@moduledoc """
A key value entry of a collection bound to a project.
"""
use Lightning.Schema

import Ecto.Changeset

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
collection_id: Ecto.UUID.t(),
key: String.t(),
value: String.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}

schema "collections_items" do
field :key, :string
field :value, :string

belongs_to :collection, Lightning.Collections.Collection

timestamps()
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:collection_id, :key, :value])
|> validate_required([:collection_id, :key, :value])
|> unique_constraint([:collection_id, :key])
|> foreign_key_constraint(:collection_id)
end
end
36 changes: 36 additions & 0 deletions priv/repo/migrations/20241009021209_create_collections.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Lightning.Repo.Migrations.CreateCollections do
use Ecto.Migration

def change do
create table(:collections, primary_key: false) do
add :id, :binary_id, primary_key: true
add :name, :string

add :project_id,
references(:projects, on_delete: :delete_all, type: :binary_id, null: false)

timestamps()
end

create unique_index(:collections, [:name])

create table(:collections_items, primary_key: false) do
add :id, :binary_id, primary_key: true
add :key, :string
add :value, :string

add :collection_id,
references(:collections, type: :binary_id, on_delete: :delete_all, null: false)

timestamps(type: :naive_datetime_usec)
end

execute "CREATE EXTENSION IF NOT EXISTS pg_trgm",
"DROP EXTENSION IF EXISTS pg_trgm"

create unique_index(:collections_items, [:collection_id, :key])

execute "CREATE INDEX collections_items_key_trgm_idx ON collections_items USING GIN (key gin_trgm_ops)",
"DROP INDEX IF EXISTS collections_items_key_trgm_idx"
end
end
Loading

0 comments on commit fd14f8f

Please sign in to comment.