Skip to content

Commit

Permalink
Multi db projection (#410)
Browse files Browse the repository at this point in the history
* chore: initial projection query DSL implementation

* feat: support complex queries

* feat: add support to multi arch images

* chore: update deps

* fix: projection repo start and some tests

* fix: add child spec to native projection adapter

* feat: added more tests

* fix: String.chars not implemented

* fix: convert to chars

* try fix

* handle tuple args

* fix: assert tests

* fix: formatting

* translate comments

* feat: add support to group by

* feat: added support to having clause

* refactor: fix some compilation warnings

* trying nimble_parsec to parse queries

* some progress in query dsl parser

* feat: create projection tables from protobuf data

* refactor: minor adjust

* feat: added dynamic table creator

* refactor: rename module

* working projection table with upsert and query

* refact

* refact

* merge

* querying and table creation working on projection actor initialization

* saving state after callback

* pagination to queries

* tests fix

* remove ununsed code

* fix

* fix test

* fix some errors

* working for mariaDB and native throwing exception

* mix format

* ci with mariadb and postgres statestores test

* ci changes

* ci fix

* ci fix again

* ci fix again

* password for postgres

* ci again

* run otp 25

* longblob

* ci fix

* ci fix

* remove double mariadb

* ci fix

* ci

* user admin

* ci fix

* ci fix

* comment mariadb

---------

Co-authored-by: Adriano Santos <[email protected]>
Co-authored-by: Elias <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2025
1 parent ba0b650 commit bffcab3
Show file tree
Hide file tree
Showing 20 changed files with 1,811 additions and 1,023 deletions.
85 changes: 81 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
otp: [25. 26]
otp: [25]
elixir: [1.15]

env:
Expand All @@ -25,6 +25,59 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Install and Configure Postgres on Port 5232
run: |
sudo apt-get update
sudo apt-get install -y postgresql postgresql-contrib
sudo service postgresql start
# Set the password for the 'postgres' user to 'postgres'
sudo -u postgres psql -c "ALTER USER postgres WITH PASSWORD 'postgres';"
# Allow password authentication (uncomment in pg_hba.conf)
sudo sed -i "s/^#host all all 127.0.0.1\/32 md5/host all all 127.0.0.1\/32 md5/" /etc/postgresql/*/main/pg_hba.conf
sudo sed -i "s/^#host all all ::1\/128 md5/host all all ::1\/128 md5/" /etc/postgresql/*/main/pg_hba.conf
# Increase max_connections in postgresql.conf
sudo sed -i "s/^#max_connections = [0-9]*/max_connections = 500/" /etc/postgresql/*/main/postgresql.conf
# Restart PostgreSQL to apply changes
sudo service postgresql restart
# Create the database
sudo -u postgres psql -c "CREATE DATABASE \"eigr-functions-db\";"
- name: Shutdown Ubuntu MySQL (SUDO)
run: sudo service mysql stop

# - name: Set up MariaDB
# uses: getong/[email protected]
# with:
# host port: 3307
# container port: 3307
# character set server: 'utf8'
# collation server: 'utf8_general_ci'
# mariadb version: '10.4.10'
# mysql database: 'eigr-functions-db'
#
# - name: Wait for MariaDB to be Ready
# run: |
# for i in {1..10}; do
# if mysqladmin ping -h127.0.0.1 -P3307 --silent; then
# echo "MariaDB is ready!"
# break
# fi
# echo "Waiting for MariaDB..."
# sleep 5
# done
#
# - name: Set up MariaDB User
# run: |
# # Create 'admin' user with password 'admin' and grant privileges
# mysql -h127.0.0.1 -P3307 -uroot -e "CREATE USER IF NOT EXISTS 'admin'@'%' IDENTIFIED BY 'admin';"
# mysql -h127.0.0.1 -P3307 -uroot -e "GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;"
# mysql -h127.0.0.1 -P3307 -uroot -e "FLUSH PRIVILEGES;"

- name: Install NATS with JetStream
run: |
wget https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz
Expand Down Expand Up @@ -66,13 +119,36 @@ jobs:
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
cd ../../
- name: Run tests spawn_statestores
- name: Run tests spawn_statestores_postgres
run: |
cd spawn_statestores/statestores
cd spawn_statestores/statestores_postgres
mix deps.get
MIX_ENV=test PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test \
PROXY_DATABASE_TYPE=postgres \
PROXY_DATABASE_PORT=5432 \
PROXY_DATABASE_USERNAME=postgres \
PROXY_DATABASE_SECRET=postgres \
PROXY_CLUSTER_STRATEGY=gossip \
PROXY_HTTP_PORT=9005 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
elixir --name [email protected] -S mix test
cd ../../
# - name: Run tests spawn_statestores_mariadb
# run: |
# cd spawn_statestores/statestores_mariadb
# mix deps.get
# MIX_ENV=test \
# PROXY_DATABASE_TYPE=mariadb \
# PROXY_DATABASE_PORT=3307 \
# PROXY_DATABASE_USERNAME=admin \
# PROXY_DATABASE_SECRET=admin \
# PROXY_CLUSTER_STRATEGY=gossip \
# PROXY_HTTP_PORT=9005 \
# SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
# elixir --name [email protected] -S mix test
# cd ../../

- name: Run tests statestores_native
run: |
cd spawn_statestores/statestores_native
Expand Down Expand Up @@ -100,3 +176,4 @@ jobs:
# mix deps.get
# MIX_ENV=test PROXY_DATABASE_TYPE=mysql PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
# cd ../../

7 changes: 3 additions & 4 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Actors.Actor.Entity.Invocation do
}

alias Spawn.Utils.Nats
alias Statestores.Manager.StateManager

import Spawn.Utils.AnySerializer,
only: [any_pack!: 1, any_unpack!: 2, normalize_package_name: 1]
Expand Down Expand Up @@ -474,8 +475,7 @@ defmodule Actors.Actor.Entity.Invocation do
|> normalize_package_name()

{:ok, results} =
Statestores.Projection.Query.DynamicTableDataHandler.query(
load_projection_adapter(),
StateManager.projection_query(
state_type,
view.query,
any_unpack!(request.payload |> elem(1), view.input_type),
Expand Down Expand Up @@ -637,8 +637,7 @@ defmodule Actors.Actor.Entity.Invocation do
Macro.underscore(id.parent)
end

Statestores.Projection.Query.DynamicTableDataHandler.upsert(
load_projection_adapter(),
StateManager.projection_upsert(
state_type,
table_name,
any_unpack!(response.updated_context.state, state_type)
Expand Down
7 changes: 1 addition & 6 deletions lib/actors/actor/entity/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
Macro.underscore(actor.id.parent)
end

:ok =
DynamicTableCreator.create_or_update_table(
load_projection_adapter(),
state_type,
table_name
)
:ok = StateManager.projection_create_or_update_table(state_type, table_name)

StreamInitiator.init_projection_stream(actor)
end
Expand Down
86 changes: 18 additions & 68 deletions lib/actors/actor/state_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
alias Statestores.Schemas.Snapshot
alias Statestores.Manager.StateManager, as: StateStoreManager

def projection_create_or_update_table(projection_type, table_name) do
StateStoreManager.projection_create_or_update_table(projection_type, table_name)
end

def projection_upsert(projection_type, table_name, data) do
StateStoreManager.projection_upsert(projection_type, table_name, data)
end

def projection_query(projection_type, query, params, opts) do
StateStoreManager.projection_query(projection_type, query, params, opts)
end

def is_new?(_old_hash, new_state) when is_nil(new_state), do: false

def is_new?(old_hash, new_state) do
with bytes_from_state <- Any.encode(new_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
old_hash != hash
else
_ ->
false
end
catch
_kind, error ->
Expand Down Expand Up @@ -78,69 +87,6 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
{:error, error}
end

@spec load_all(ActorId.t()) :: {:ok, term()} | :not_found | {:error, term()}
def load_all(%ActorId{} = actor_id) do
key = generate_key(actor_id)

snapshots = StateStoreManager.load_all(key)

results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev

{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)

if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end

@spec load_by_interval(ActorId.t(), String.t(), String.t()) ::
{:ok, term()} | :not_found | {:error, term()}
def load_by_interval(%ActorId{} = actor_id, time_start, time_end) do
key = generate_key(actor_id)

snapshots = StateStoreManager.load_by_interval(key, time_start, time_end)

results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev

{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)

if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end

@spec save(ActorId.t(), Spawn.Actors.ActorState.t(), Keyword.t()) ::
{:ok, Spawn.Actors.ActorState.t()}
| {:error, any(), Spawn.Actors.ActorState.t()}
Expand Down Expand Up @@ -285,10 +231,14 @@ else
def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message)
def load(_actor_id), do: raise(@not_loaded_message)
def load(_actor_id, _), do: raise(@not_loaded_message)
def load_all(_), do: raise(@not_loaded_message)
def load_by_interval(_, _, _), do: raise(@not_loaded_message)
def save(_actor_id, _state), do: raise(@not_loaded_message)
def save(_actor_id, _state, _opts), do: raise(@not_loaded_message)
def save_async(_actor_id, _state, _timeout), do: raise(@not_loaded_message)

def projection_create_or_update_table(_projection_type, _table_name),
do: raise(@not_loaded_message)

def projection_upsert(_projection_type, _table_name, _data), do: raise(@not_loaded_message)
def projection_query(_projection_type, _query, _params, _opts), do: raise(@not_loaded_message)
end
end
19 changes: 14 additions & 5 deletions lib/actors/actor/state_manager_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ defmodule Actors.Actor.StateManager.Behaviour do
to be saved to persistent storage using database drivers.
"""

@type projection_type :: module()
@type table_name :: String.t()
@type data :: struct()
@type query :: String.t()
@type params :: struct()
@type opts :: Keyword.t()

@callback is_new?(String.t(), any()) :: {:error, term()} | boolean()

@callback load(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load(String.t(), number()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load_all(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback load_by_interval(String.t(), String.t(), String.t()) ::
{:ok, term()} | {:not_found, %{}} | {:error, term()}

@callback save(String.t(), term(), Keyword.t()) ::
{:ok, term(), String.t()}
| {:error, term(), term(), term()}
Expand All @@ -24,4 +26,11 @@ defmodule Actors.Actor.StateManager.Behaviour do
{:ok, term(), String.t()}
| {:error, term(), term(), term()}
| {:error, term(), term()}

@callback projection_create_or_update_table(projection_type(), table_name()) :: :ok

@callback projection_upsert(projection_type(), table_name(), data()) :: :ok

@callback projection_query(projection_type(), query(), params(), opts()) ::
{:error, term()} | {:ok, data()}
end
2 changes: 1 addition & 1 deletion spawn_activators/activator/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
"protobuf_generate": {:hex, :protobuf_generate, "0.1.3", "57841bc60e2135e190748119d83f78669ee7820c0ad6555ada3cd3cd7df93143", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "dae4139b00ba77a279251a0ceb5593b1bae745e333b4ce1ab7e81e8e4906016b"},
"rabbit_common": {:hex, :rabbit_common, "3.12.13", "a163432b377411d6033344d5f6a8b12443d67c897c9374b9738cc609cab3161c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "26a400f76976e66efd9cdab29a36dd4b129466d431c4e014aae9d2e36fefef44"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
Expand Down
2 changes: 1 addition & 1 deletion spawn_operator/spawn_operator/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
"protobuf_generate": {:hex, :protobuf_generate, "0.1.2", "45b9a9ae8606333cdea993ceaaecd799d206cdfe23348d37c06207eac76cbee6", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "55b0ff8385703317ca90e1bd30a2ece99e80ae0c73e6ebcfb374e84e57870d61"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
Expand Down
Loading

0 comments on commit bffcab3

Please sign in to comment.