-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add workflow collections #2569
Merged
Merged
Add workflow collections #2569
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
744a635
Add workflow collections
jyeshe e00c6d4
Increase timestamp precision
jyeshe 2397b6e
Use single return patter on same get function
jyeshe eea968f
Optimize all ops once the names are stable
jyeshe 77c5728
Changelog and formatting
jyeshe d2adc7d
Add stream_all allowing equivalent queries to HGETALL and HSCAN
jyeshe 47f8607
Add stream_match for wildcard prefix queries
jyeshe e276fbd
Add pg_trigram and GIN index on the key to allow multi wildcard
jyeshe 9049069
Merge branch 'main' into collections
jyeshe File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
defmodule Lightning.Collections do | ||
@moduledoc """ | ||
Access to collections of unique key-value pairs shared across multiple workflows. | ||
""" | ||
import Ecto.Query | ||
|
||
alias Lightning.Collections.Collection | ||
alias Lightning.Collections.Item | ||
alias Lightning.Repo | ||
|
||
@query_all_limit Application.compile_env!(:lightning, __MODULE__)[ | ||
:query_all_limit | ||
] | ||
|
||
@spec get_collection(String.t()) :: | ||
{:ok, Collection.t()} | {:error, :collection_not_found} | ||
def get_collection(name) do | ||
case Repo.get_by(Collection, name: name) do | ||
nil -> {:error, :collection_not_found} | ||
collection -> {:ok, collection} | ||
end | ||
end | ||
|
||
@spec create_collection(Ecto.UUID.t(), String.t()) :: | ||
{:ok, Collection.t()} | {:error, Ecto.Changeset.t()} | ||
def create_collection(project_id, name) do | ||
%Collection{} | ||
|> Collection.changeset(%{project_id: project_id, name: name}) | ||
|> Repo.insert() | ||
end | ||
|
||
@spec delete_collection(Ecto.UUID.t()) :: | ||
{:ok, Collection.t()} | ||
| {:error, Ecto.Changeset.t()} | ||
| {:error, :collection_not_found} | ||
def delete_collection(collection_id) do | ||
case Repo.get(Collection, collection_id) do | ||
nil -> {:error, :collection_not_found} | ||
collection -> Repo.delete(collection) | ||
end | ||
end | ||
|
||
@spec get(Collection.t(), String.t()) :: Item.t() | ||
def get(%{id: collection_id}, key) do | ||
Repo.get_by(Item, collection_id: collection_id, key: key) | ||
end | ||
|
||
@spec stream_all(Collection.t(), String.t() | nil, integer()) :: Enum.t() | ||
def stream_all(%{id: collection_id}, cursor \\ nil, limit \\ @query_all_limit) do | ||
collection_id | ||
|> stream_query(cursor, limit) | ||
|> Repo.stream() | ||
end | ||
|
||
@spec stream_match(Collection.t(), String.t(), String.t() | nil, integer()) :: | ||
Enum.t() | ||
def stream_match( | ||
%{id: collection_id}, | ||
pattern, | ||
cursor \\ nil, | ||
limit \\ @query_all_limit | ||
) do | ||
pattern = | ||
pattern | ||
|> String.replace("\\", "\\\\") | ||
|> String.replace("%", "\\%") | ||
|> String.replace("*", "%") | ||
|
||
collection_id | ||
|> stream_query(cursor, limit) | ||
|> where([i], like(i.key, ^pattern)) | ||
|> Repo.stream() | ||
end | ||
|
||
@spec put(Collection.t(), String.t(), String.t()) :: | ||
{:ok, Item.t()} | {:error, Ecto.Changeset.t()} | ||
def put(%{id: collection_id}, key, value) do | ||
with nil <- Repo.get_by(Item, collection_id: collection_id, key: key) do | ||
%Item{} | ||
end | ||
|> Item.changeset(%{collection_id: collection_id, key: key, value: value}) | ||
|> Repo.insert_or_update() | ||
end | ||
|
||
@spec delete(Collection.t(), String.t()) :: | ||
{:ok, Item.t()} | {:error, :not_found} | ||
def delete(%{id: collection_id}, key) do | ||
case Repo.get_by(Item, collection_id: collection_id, key: key) do | ||
nil -> {:error, :not_found} | ||
item -> Repo.delete(item) | ||
end | ||
end | ||
|
||
defp stream_query(collection_id, cursor, limit) do | ||
Item | ||
|> where([i], i.collection_id == ^collection_id) | ||
|> limit(^limit) | ||
|> then(fn query -> | ||
case cursor do | ||
nil -> query | ||
cursor_key -> where(query, [i], i.key > ^cursor_key) | ||
end | ||
end) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
defmodule Lightning.Collections.Collection do | ||
@moduledoc """ | ||
Collection referenced by name associated to a project. | ||
""" | ||
use Lightning.Schema | ||
|
||
import Ecto.Changeset | ||
|
||
@type t :: %__MODULE__{ | ||
id: Ecto.UUID.t(), | ||
project_id: Ecto.UUID.t(), | ||
name: String.t(), | ||
inserted_at: NaiveDateTime.t(), | ||
updated_at: NaiveDateTime.t() | ||
} | ||
|
||
schema "collections" do | ||
field :name, :string | ||
belongs_to :project, Lightning.Projects.Project | ||
|
||
timestamps() | ||
end | ||
|
||
@doc false | ||
def changeset(entry, attrs) do | ||
entry | ||
|> cast(attrs, [:project_id, :name]) | ||
|> validate_required([:project_id, :name]) | ||
|> unique_constraint([:name]) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
defmodule Lightning.Collections.Item do | ||
@moduledoc """ | ||
A key value entry of a collection bound to a project. | ||
""" | ||
use Lightning.Schema | ||
|
||
import Ecto.Changeset | ||
|
||
@type t :: %__MODULE__{ | ||
id: Ecto.UUID.t(), | ||
collection_id: Ecto.UUID.t(), | ||
key: String.t(), | ||
value: String.t(), | ||
inserted_at: NaiveDateTime.t(), | ||
updated_at: NaiveDateTime.t() | ||
} | ||
|
||
schema "collections_items" do | ||
field :key, :string | ||
field :value, :string | ||
|
||
belongs_to :collection, Lightning.Collections.Collection | ||
|
||
timestamps() | ||
end | ||
|
||
@doc false | ||
def changeset(entry, attrs) do | ||
entry | ||
|> cast(attrs, [:collection_id, :key, :value]) | ||
|> validate_required([:collection_id, :key, :value]) | ||
|> unique_constraint([:collection_id, :key]) | ||
|> foreign_key_constraint(:collection_id) | ||
end | ||
end |
36 changes: 36 additions & 0 deletions
36
priv/repo/migrations/20241009021209_create_collections.exs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
defmodule Lightning.Repo.Migrations.CreateCollections do | ||
use Ecto.Migration | ||
|
||
def change do | ||
create table(:collections, primary_key: false) do | ||
add :id, :binary_id, primary_key: true | ||
add :name, :string | ||
|
||
add :project_id, | ||
references(:projects, on_delete: :delete_all, type: :binary_id, null: false) | ||
|
||
timestamps() | ||
end | ||
|
||
create unique_index(:collections, [:name]) | ||
|
||
create table(:collections_items, primary_key: false) do | ||
add :id, :binary_id, primary_key: true | ||
add :key, :string | ||
add :value, :string | ||
|
||
add :collection_id, | ||
references(:collections, type: :binary_id, on_delete: :delete_all, null: false) | ||
|
||
timestamps(type: :naive_datetime_usec) | ||
end | ||
|
||
execute "CREATE EXTENSION IF NOT EXISTS pg_trgm", | ||
"DROP EXTENSION IF EXISTS pg_trgm" | ||
|
||
create unique_index(:collections_items, [:collection_id, :key]) | ||
|
||
execute "CREATE INDEX collections_items_key_trgm_idx ON collections_items USING GIN (key gin_trgm_ops)", | ||
"DROP INDEX IF EXISTS collections_items_key_trgm_idx" | ||
end | ||
end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one, do you think we should put an index on updated_at and inserted_at as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @stuartc I would like to play a bit more with the BRIN indexes because I am trying to avoid adding more cost to the job execution. May we add it during the "query by filter" PR? Will have better stats.