diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b829cf13..9a439f44 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - otp: [25. 26] + otp: [25] elixir: [1.15] env: @@ -25,6 +25,59 @@ jobs: - name: Install Protoc uses: arduino/setup-protoc@v3 + - name: Install and Configure Postgres on Port 5232 + run: | + sudo apt-get update + sudo apt-get install -y postgresql postgresql-contrib + sudo service postgresql start + + # Set the password for the 'postgres' user to 'postgres' + sudo -u postgres psql -c "ALTER USER postgres WITH PASSWORD 'postgres';" + + # Allow password authentication (uncomment in pg_hba.conf) + sudo sed -i "s/^#host all all 127.0.0.1\/32 md5/host all all 127.0.0.1\/32 md5/" /etc/postgresql/*/main/pg_hba.conf + sudo sed -i "s/^#host all all ::1\/128 md5/host all all ::1\/128 md5/" /etc/postgresql/*/main/pg_hba.conf + + # Increase max_connections in postgresql.conf + sudo sed -i "s/^#max_connections = [0-9]*/max_connections = 500/" /etc/postgresql/*/main/postgresql.conf + + # Restart PostgreSQL to apply changes + sudo service postgresql restart + + # Create the database + sudo -u postgres psql -c "CREATE DATABASE \"eigr-functions-db\";" + + - name: Shutdown Ubuntu MySQL (SUDO) + run: sudo service mysql stop + + # - name: Set up MariaDB + # uses: getong/mariadb-action@v1.11 + # with: + # host port: 3307 + # container port: 3307 + # character set server: 'utf8' + # collation server: 'utf8_general_ci' + # mariadb version: '10.4.10' + # mysql database: 'eigr-functions-db' + # + # - name: Wait for MariaDB to be Ready + # run: | + # for i in {1..10}; do + # if mysqladmin ping -h127.0.0.1 -P3307 --silent; then + # echo "MariaDB is ready!" + # break + # fi + # echo "Waiting for MariaDB..." + # sleep 5 + # done + # + # - name: Set up MariaDB User + # run: | + # # Create 'admin' user with password 'admin' and grant privileges + # mysql -h127.0.0.1 -P3307 -uroot -e "CREATE USER IF NOT EXISTS 'admin'@'%' IDENTIFIED BY 'admin';" + # mysql -h127.0.0.1 -P3307 -uroot -e "GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;" + # mysql -h127.0.0.1 -P3307 -uroot -e "FLUSH PRIVILEGES;" + - name: Install NATS with JetStream run: | wget https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz @@ -66,13 +119,36 @@ jobs: MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name spawn@127.0.0.1 -S mix test cd ../../ - - name: Run tests spawn_statestores + - name: Run tests spawn_statestores_postgres run: | - cd spawn_statestores/statestores + cd spawn_statestores/statestores_postgres mix deps.get - MIX_ENV=test PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name spawn@127.0.0.1 -S mix test + MIX_ENV=test \ + PROXY_DATABASE_TYPE=postgres \ + PROXY_DATABASE_PORT=5432 \ + PROXY_DATABASE_USERNAME=postgres \ + PROXY_DATABASE_SECRET=postgres \ + PROXY_CLUSTER_STRATEGY=gossip \ + PROXY_HTTP_PORT=9005 \ + SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \ + elixir --name spawn@127.0.0.1 -S mix test cd ../../ + # - name: Run tests spawn_statestores_mariadb + # run: | + # cd spawn_statestores/statestores_mariadb + # mix deps.get + # MIX_ENV=test \ + # PROXY_DATABASE_TYPE=mariadb \ + # PROXY_DATABASE_PORT=3307 \ + # PROXY_DATABASE_USERNAME=admin \ + # PROXY_DATABASE_SECRET=admin \ + # PROXY_CLUSTER_STRATEGY=gossip \ + # PROXY_HTTP_PORT=9005 \ + # SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \ + # elixir --name spawn@127.0.0.1 -S mix test + # cd ../../ + - name: Run tests statestores_native run: | cd spawn_statestores/statestores_native @@ -100,3 +176,4 @@ jobs: # mix deps.get # MIX_ENV=test PROXY_DATABASE_TYPE=mysql PROXY_CLUSTER_STRATEGY=gossip PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name spawn@127.0.0.1 -S mix test # cd ../../ + diff --git a/lib/actors/actor/entity/invocation.ex b/lib/actors/actor/entity/invocation.ex index 60e1430f..4d241553 100644 --- a/lib/actors/actor/entity/invocation.ex +++ b/lib/actors/actor/entity/invocation.ex @@ -38,6 +38,7 @@ defmodule Actors.Actor.Entity.Invocation do } alias Spawn.Utils.Nats + alias Statestores.Manager.StateManager import Spawn.Utils.AnySerializer, only: [any_pack!: 1, any_unpack!: 2, normalize_package_name: 1] @@ -474,8 +475,7 @@ defmodule Actors.Actor.Entity.Invocation do |> normalize_package_name() {:ok, results} = - Statestores.Projection.Query.DynamicTableDataHandler.query( - load_projection_adapter(), + StateManager.projection_query( state_type, view.query, any_unpack!(request.payload |> elem(1), view.input_type), @@ -637,8 +637,7 @@ defmodule Actors.Actor.Entity.Invocation do Macro.underscore(id.parent) end - Statestores.Projection.Query.DynamicTableDataHandler.upsert( - load_projection_adapter(), + StateManager.projection_upsert( state_type, table_name, any_unpack!(response.updated_context.state, state_type) diff --git a/lib/actors/actor/entity/lifecycle.ex b/lib/actors/actor/entity/lifecycle.ex index f71be2b8..080ef4b7 100644 --- a/lib/actors/actor/entity/lifecycle.ex +++ b/lib/actors/actor/entity/lifecycle.ex @@ -321,12 +321,7 @@ defmodule Actors.Actor.Entity.Lifecycle do Macro.underscore(actor.id.parent) end - :ok = - DynamicTableCreator.create_or_update_table( - load_projection_adapter(), - state_type, - table_name - ) + :ok = StateManager.projection_create_or_update_table(state_type, table_name) StreamInitiator.init_projection_stream(actor) end diff --git a/lib/actors/actor/state_manager.ex b/lib/actors/actor/state_manager.ex index 950612cc..1d867590 100644 --- a/lib/actors/actor/state_manager.ex +++ b/lib/actors/actor/state_manager.ex @@ -11,15 +11,24 @@ if Code.ensure_loaded?(Statestores.Supervisor) do alias Statestores.Schemas.Snapshot alias Statestores.Manager.StateManager, as: StateStoreManager + def projection_create_or_update_table(projection_type, table_name) do + StateStoreManager.projection_create_or_update_table(projection_type, table_name) + end + + def projection_upsert(projection_type, table_name, data) do + StateStoreManager.projection_upsert(projection_type, table_name, data) + end + + def projection_query(projection_type, query, params, opts) do + StateStoreManager.projection_query(projection_type, query, params, opts) + end + def is_new?(_old_hash, new_state) when is_nil(new_state), do: false def is_new?(old_hash, new_state) do with bytes_from_state <- Any.encode(new_state), hash <- :crypto.hash(:sha256, bytes_from_state) do old_hash != hash - else - _ -> - false end catch _kind, error -> @@ -78,69 +87,6 @@ if Code.ensure_loaded?(Statestores.Supervisor) do {:error, error} end - @spec load_all(ActorId.t()) :: {:ok, term()} | :not_found | {:error, term()} - def load_all(%ActorId{} = actor_id) do - key = generate_key(actor_id) - - snapshots = StateStoreManager.load_all(key) - - results = - Enum.map(snapshots, fn %Snapshot{ - status: status, - node: node, - revision: rev, - tags: tags, - data_type: type, - data: data - } = _event -> - revision = if is_nil(rev), do: 0, else: rev - - {%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}}, - revision, status, node} - end) - - if Enum.empty?(results) do - :not_found - else - {:ok, results} - end - catch - _kind, error -> - {:error, error} - end - - @spec load_by_interval(ActorId.t(), String.t(), String.t()) :: - {:ok, term()} | :not_found | {:error, term()} - def load_by_interval(%ActorId{} = actor_id, time_start, time_end) do - key = generate_key(actor_id) - - snapshots = StateStoreManager.load_by_interval(key, time_start, time_end) - - results = - Enum.map(snapshots, fn %Snapshot{ - status: status, - node: node, - revision: rev, - tags: tags, - data_type: type, - data: data - } = _event -> - revision = if is_nil(rev), do: 0, else: rev - - {%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}}, - revision, status, node} - end) - - if Enum.empty?(results) do - :not_found - else - {:ok, results} - end - catch - _kind, error -> - {:error, error} - end - @spec save(ActorId.t(), Spawn.Actors.ActorState.t(), Keyword.t()) :: {:ok, Spawn.Actors.ActorState.t()} | {:error, any(), Spawn.Actors.ActorState.t()} @@ -285,10 +231,14 @@ else def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message) def load(_actor_id), do: raise(@not_loaded_message) def load(_actor_id, _), do: raise(@not_loaded_message) - def load_all(_), do: raise(@not_loaded_message) - def load_by_interval(_, _, _), do: raise(@not_loaded_message) def save(_actor_id, _state), do: raise(@not_loaded_message) def save(_actor_id, _state, _opts), do: raise(@not_loaded_message) def save_async(_actor_id, _state, _timeout), do: raise(@not_loaded_message) + + def projection_create_or_update_table(_projection_type, _table_name), + do: raise(@not_loaded_message) + + def projection_upsert(_projection_type, _table_name, _data), do: raise(@not_loaded_message) + def projection_query(_projection_type, _query, _params, _opts), do: raise(@not_loaded_message) end end diff --git a/lib/actors/actor/state_manager_behaviour.ex b/lib/actors/actor/state_manager_behaviour.ex index 7319d252..44a68f9a 100644 --- a/lib/actors/actor/state_manager_behaviour.ex +++ b/lib/actors/actor/state_manager_behaviour.ex @@ -4,17 +4,19 @@ defmodule Actors.Actor.StateManager.Behaviour do to be saved to persistent storage using database drivers. """ + @type projection_type :: module() + @type table_name :: String.t() + @type data :: struct() + @type query :: String.t() + @type params :: struct() + @type opts :: Keyword.t() + @callback is_new?(String.t(), any()) :: {:error, term()} | boolean() @callback load(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()} @callback load(String.t(), number()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()} - @callback load_all(String.t()) :: {:ok, term()} | {:not_found, %{}} | {:error, term()} - - @callback load_by_interval(String.t(), String.t(), String.t()) :: - {:ok, term()} | {:not_found, %{}} | {:error, term()} - @callback save(String.t(), term(), Keyword.t()) :: {:ok, term(), String.t()} | {:error, term(), term(), term()} @@ -24,4 +26,11 @@ defmodule Actors.Actor.StateManager.Behaviour do {:ok, term(), String.t()} | {:error, term(), term(), term()} | {:error, term(), term()} + + @callback projection_create_or_update_table(projection_type(), table_name()) :: :ok + + @callback projection_upsert(projection_type(), table_name(), data()) :: :ok + + @callback projection_query(projection_type(), query(), params(), opts()) :: + {:error, term()} | {:ok, data()} end diff --git a/spawn_activators/activator/mix.lock b/spawn_activators/activator/mix.lock index 83bd30db..d9a04018 100644 --- a/spawn_activators/activator/mix.lock +++ b/spawn_activators/activator/mix.lock @@ -92,7 +92,7 @@ "poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"}, - "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, + "protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"}, "protobuf_generate": {:hex, :protobuf_generate, "0.1.3", "57841bc60e2135e190748119d83f78669ee7820c0ad6555ada3cd3cd7df93143", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "dae4139b00ba77a279251a0ceb5593b1bae745e333b4ce1ab7e81e8e4906016b"}, "rabbit_common": {:hex, :rabbit_common, "3.12.13", "a163432b377411d6033344d5f6a8b12443d67c897c9374b9738cc609cab3161c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "26a400f76976e66efd9cdab29a36dd4b129466d431c4e014aae9d2e36fefef44"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, diff --git a/spawn_operator/spawn_operator/mix.lock b/spawn_operator/spawn_operator/mix.lock index 8d56359c..f9b6237f 100644 --- a/spawn_operator/spawn_operator/mix.lock +++ b/spawn_operator/spawn_operator/mix.lock @@ -77,7 +77,7 @@ "poly1305": {:hex, :poly1305, "1.0.4", "7cdc8961a0a6e00a764835918cdb8ade868044026df8ef5d718708ea6cc06611", [:mix], [{:chacha20, "~> 1.0", [hex: :chacha20, repo: "hexpm", optional: false]}, {:equivalex, "~> 1.0", [hex: :equivalex, repo: "hexpm", optional: false]}], "hexpm", "e14e684661a5195e149b3139db4a1693579d4659d65bba115a307529c47dbc3b"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"}, - "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, + "protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"}, "protobuf_generate": {:hex, :protobuf_generate, "0.1.2", "45b9a9ae8606333cdea993ceaaecd799d206cdfe23348d37c06207eac76cbee6", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "55b0ff8385703317ca90e1bd30a2ece99e80ae0c73e6ebcfb374e84e57870d61"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"}, diff --git a/spawn_statestores/statestores/lib/google/protobuf/descriptor.pb.ex b/spawn_statestores/statestores/lib/google/protobuf/descriptor.pb.ex index d08d3e2a..b9999bb4 100644 --- a/spawn_statestores/statestores/lib/google/protobuf/descriptor.pb.ex +++ b/spawn_statestores/statestores/lib/google/protobuf/descriptor.pb.ex @@ -154,7 +154,7 @@ defmodule Google.Protobuf.ExtensionRangeOptions do field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.FieldDescriptorProto do @@ -275,7 +275,7 @@ defmodule Google.Protobuf.FileOptions do field :ruby_package, 45, optional: true, type: :string field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.MessageOptions do @@ -290,7 +290,7 @@ defmodule Google.Protobuf.MessageOptions do field :deprecated_legacy_json_field_conflicts, 11, optional: true, type: :bool, deprecated: true field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.FieldOptions do @@ -319,7 +319,7 @@ defmodule Google.Protobuf.FieldOptions do field :debug_redact, 16, optional: true, type: :bool, default: false field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.OneofOptions do @@ -329,7 +329,7 @@ defmodule Google.Protobuf.OneofOptions do field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.EnumOptions do @@ -342,7 +342,7 @@ defmodule Google.Protobuf.EnumOptions do field :deprecated_legacy_json_field_conflicts, 6, optional: true, type: :bool, deprecated: true field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.EnumValueOptions do @@ -353,7 +353,7 @@ defmodule Google.Protobuf.EnumValueOptions do field :deprecated, 1, optional: true, type: :bool, default: false field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.ServiceOptions do @@ -364,7 +364,7 @@ defmodule Google.Protobuf.ServiceOptions do field :deprecated, 33, optional: true, type: :bool, default: false field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.MethodOptions do @@ -382,7 +382,7 @@ defmodule Google.Protobuf.MethodOptions do field :uninterpreted_option, 999, repeated: true, type: Google.Protobuf.UninterpretedOption - extensions [{1000, Protobuf.Extension.max()}] + extensions([{1000, Protobuf.Extension.max()}]) end defmodule Google.Protobuf.UninterpretedOption.NamePart do diff --git a/spawn_statestores/statestores/lib/google/protobuf/struct.pb.ex b/spawn_statestores/statestores/lib/google/protobuf/struct.pb.ex index e74f96ca..e47bd2ea 100644 --- a/spawn_statestores/statestores/lib/google/protobuf/struct.pb.ex +++ b/spawn_statestores/statestores/lib/google/protobuf/struct.pb.ex @@ -28,7 +28,7 @@ defmodule Google.Protobuf.Value do use Protobuf, protoc_gen_elixir_version: "0.13.0", syntax: :proto3 - oneof :kind, 0 + oneof(:kind, 0) field :null_value, 1, type: Google.Protobuf.NullValue, diff --git a/spawn_statestores/statestores/lib/statestores/adapters/projection_behaviour.ex b/spawn_statestores/statestores/lib/statestores/adapters/projection_behaviour.ex index 84776ef3..8162ce6d 100644 --- a/spawn_statestores/statestores/lib/statestores/adapters/projection_behaviour.ex +++ b/spawn_statestores/statestores/lib/statestores/adapters/projection_behaviour.ex @@ -1,82 +1,21 @@ defmodule Statestores.Adapters.ProjectionBehaviour do @moduledoc """ - Defines the default behavior for each Statestore Provider. + Defines the default behavior for each Projection Provider. """ - alias Scrivener.Page - alias Statestores.Schemas.Projection - @type metadata_key :: String.t() + @type projection_type :: module() + @type table_name :: String.t() + @type data :: struct() + @type query :: String.t() + @type params :: struct() + @type opts :: Keyword.t() - @type metadata_value :: String.t() + @callback create_or_update_table(projection_type(), table_name()) :: :ok - @type page :: integer() + @callback upsert(projection_type(), table_name(), data()) :: :ok - @type page_size :: integer() - - @type page_data :: Page.t() - - @type projection :: Projection.t() - - @type projections :: list(Projection.t()) - - @type projection_name :: String.t() - - @type projection_id :: String.t() - - @type revision :: integer() - - @type time_start :: String.t() - - @type time_end :: String.t() - - @callback create_table(projection_name()) :: {:ok, String.t()} - - @callback get_last(projection_name()) :: {:error, any} | {:ok, projection()} - - @callback get_last_by_projection_id(projection_name(), projection_id()) :: - {:error, any} | {:ok, projection()} - - @callback get_all(projection_name(), page(), page_size()) :: {:error, any} | {:ok, page_data()} - - @callback get_all_by_projection_id(projection_name(), projection_id(), page(), page_size()) :: - {:error, any} | {:ok, page_data()} - - @callback get_by_interval( - projection_name(), - time_start(), - time_end(), - page(), - page_size() - ) :: {:error, any} | {:ok, page_data()} - - @callback get_by_projection_id_and_interval( - projection_name(), - projection_id(), - time_start(), - time_end(), - page(), - page_size() - ) :: {:error, any} | {:ok, page_data()} - - @callback search_by_metadata( - projection_name(), - metadata_key(), - metadata_value(), - page(), - page_size() - ) :: - {:error, any} | {:ok, page_data()} - - @callback search_by_projection_id_and_metadata( - projection_name(), - projection_id(), - metadata_key(), - metadata_value(), - page(), - page_size() - ) :: {:error, any} | {:ok, page_data()} - - @callback save(projection()) :: {:error, any} | {:ok, projection()} + @callback query(projection_type(), query(), params(), opts()) :: + {:error, term()} | {:ok, data()} @callback default_port :: <<_::32>> diff --git a/spawn_statestores/statestores/lib/statestores/manager/manager.ex b/spawn_statestores/statestores/lib/statestores/manager/manager.ex index 79855cad..6d2bcea4 100644 --- a/spawn_statestores/statestores/lib/statestores/manager/manager.ex +++ b/spawn_statestores/statestores/lib/statestores/manager/manager.ex @@ -3,16 +3,20 @@ defmodule Statestores.Manager.StateManager do This module must be used by the proxy to interact with databases regardless of which provider is used. """ - import Statestores.Util, only: [load_snapshot_adapter: 0] + import Statestores.Util, only: [load_snapshot_adapter: 0, load_projection_adapter: 0] def load(id), do: load_snapshot_adapter().get_by_key(id) def load(id, revision), do: load_snapshot_adapter().get_by_key_and_revision(id, revision) - def load_all(id), do: load_snapshot_adapter().get_all_snapshots_by_key(id) + def save(event), do: load_snapshot_adapter().save(event) - def load_by_interval(id, time_start, time_end), - do: load_snapshot_adapter().get_snapshots_by_interval(id, time_start, time_end) + def projection_create_or_update_table(projection_type, table_name), + do: load_projection_adapter().create_or_update_table(projection_type, table_name) - def save(event), do: load_snapshot_adapter().save(event) + def projection_upsert(projection_type, table_name, data), + do: load_projection_adapter().upsert(projection_type, table_name, data) + + def projection_query(projection_type, query, params, opts), + do: load_projection_adapter().query(projection_type, query, params, opts) end diff --git a/spawn_statestores/statestores/mix.exs b/spawn_statestores/statestores/mix.exs index 0ab36728..ba49aee8 100644 --- a/spawn_statestores/statestores/mix.exs +++ b/spawn_statestores/statestores/mix.exs @@ -58,9 +58,6 @@ defmodule Statestores.MixProject do {:castore, "~> 1.0"}, {:cloak_ecto, "~> 1.2"}, {:ecto_sql, "~> 3.12"}, - {:scrivener_ecto, "~> 3.0"}, - {:nimble_parsec, "~> 1.4"}, - {:sql_parser, "~> 0.2"}, {:jason, "~> 1.3"}, {:protobuf, "~> 0.13"}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, diff --git a/spawn_statestores/statestores_mariadb/lib/statestores/adapters/mariadb_projection_adapter.ex b/spawn_statestores/statestores_mariadb/lib/statestores/adapters/mariadb_projection_adapter.ex index b0535c7a..ce12756f 100644 --- a/spawn_statestores/statestores_mariadb/lib/statestores/adapters/mariadb_projection_adapter.ex +++ b/spawn_statestores/statestores_mariadb/lib/statestores/adapters/mariadb_projection_adapter.ex @@ -3,303 +3,422 @@ defmodule Statestores.Adapters.MariaDBProjectionAdapter do Implements the ProjectionBehaviour for MariaDB, with dynamic table name support. """ use Statestores.Adapters.ProjectionBehaviour - use Ecto.Repo, otp_app: :spawn_statestores, adapter: Ecto.Adapters.MyXQL - use Scrivener, page_size: 50 - alias Statestores.Schemas.{Projection, ValueObjectSchema} + use Ecto.Repo, + otp_app: :spawn_statestores, + adapter: Ecto.Adapters.MyXQL - import Ecto.Query - import Statestores.Util, only: [normalize_table_name: 1] + alias Ecto.Adapters.SQL - @impl true - def create_table(nil), do: {:error, "Projection name cannot be nil."} - - def create_table(projection_name) do - table_name = normalize_table_name(projection_name) - - query = """ - CREATE TABLE IF NOT EXISTS #{table_name} ( - id VARCHAR(255) PRIMARY KEY, - projection_id VARCHAR(255), - projection_name VARCHAR(255), - system VARCHAR(255), - metadata JSON, - data_type VARCHAR(255), - data BLOB, - inserted_at DATETIME, - updated_at DATETIME - ); - """ + @type_map %{ + :TYPE_INT32 => "INT", + :TYPE_INT64 => "BIGINT", + :TYPE_STRING => "TEXT", + :TYPE_BOOL => "BOOLEAN", + :TYPE_FLOAT => "FLOAT", + :TYPE_DOUBLE => "DOUBLE", + :TYPE_BYTES => "LONGBLOB", + :TYPE_MESSAGE => "JSON", + :TYPE_ENUM => "TEXT" + } - case Ecto.Adapters.SQL.query(__MODULE__, query) do - {:ok, _result} -> - {:ok, "Table #{table_name} created or already exists."} + @doc """ + Dynamically creates or updates a table in the PostgreSQL database from a Protobuf module. - {:error, ex} -> - {:error, "Error during creation of table #{table_name}.: #{inspect(ex)}"} - end - end + ## Parameters + + - `repo`: The module from the Ecto repository. + - `protobuf_module`: The Elixir module generated from a Protobuf file. + - `table_name`: Name of the table to be created or updated in the database. + ## Example + + iex> create_or_update_table(MyProtobufModule, "my_table") + + """ @impl true - def get_last(nil), do: {:error, "No record found"} + def create_or_update_table(protobuf_module, table_name) do + repo = __MODULE__ + descriptor = protobuf_module.descriptor() + fields = descriptor.field - def get_last(projection_name) do - table_name = normalize_table_name(projection_name) + # Create table if it does not exist + create_table_if_not_exists(repo, table_name, fields) - query = - from(p in {table_name, Projection}, - order_by: [desc: p.updated_at], - limit: 1 - ) + # Update table to add missing columns + update_table_columns(repo, table_name, fields) - fetch_single_record(query) - end + # Add indexes for searchable columns + create_indexes(repo, table_name, fields) - @impl true - def get_last_by_projection_id(nil, _projection_id), do: {:error, "No record found"} - def get_last_by_projection_id(_projection_name, nil), do: {:error, "No record found"} + :ok + end - def get_last_by_projection_id(projection_name, projection_id) do - table_name = normalize_table_name(projection_name) + defp create_table_if_not_exists(repo, table_name, fields) do + columns_sql = + fields + |> Enum.map(&field_to_column_sql/1) + |> Enum.reject(&is_nil/1) + |> Enum.join(", ") + + timestamp_columns = + "created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" + + primary_key_column = + fields + |> Enum.find(fn field -> + options = field.options || %{} + + actor_id_extension = + options + |> Map.get(:__pb_extensions__, %{}) + |> Map.get({Spawn.Actors.PbExtension, :actor_id}) + + actor_id_extension == true + end) + |> case do + nil -> + "id BIGINT AUTO_INCREMENT PRIMARY KEY" + + field -> + column_type = + if field.label == :LABEL_REPEATED do + Map.get(@type_map, :TYPE_MESSAGE) + else + Map.get(@type_map, field.type) + end + + length_spec = + case column_type do + # Limit index length to the first 255 characters + "TEXT" -> "(255)" + # Limit index length to the first 255 bytes + "BLOB" -> "(255)" + _ -> "" + end + + "PRIMARY KEY (#{field.name}#{length_spec})" + end + + create_table_sql = + [ + "CREATE TABLE IF NOT EXISTS #{table_name} (", + columns_sql, + ", #{timestamp_columns}", + ", #{primary_key_column}", + ")" + ] + |> Enum.reject(&is_nil/1) + |> Enum.join(" ") + + SQL.query!(repo, create_table_sql) + end - query = - from(p in {table_name, Projection}, - where: p.projection_id == ^projection_id, - order_by: [desc: p.updated_at], - limit: 1 + defp update_table_columns(repo, table_name, fields) do + existing_columns = + SQL.query!( + repo, + "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ?", + [table_name] ) - - fetch_single_record(query) + |> Map.get(:rows) + |> List.flatten() + + fields + |> Enum.reject(fn field -> validate_column_name(field.name) in existing_columns end) + |> Enum.each(fn field -> + column_sql = field_to_column_sql(field) + alter_table_sql = "ALTER TABLE #{table_name} ADD COLUMN #{column_sql}" + SQL.query!(repo, alter_table_sql) + end) end - @impl true - def get_all(nil, _page, _page_size), do: {:error, "No records found"} + defp field_to_column_sql(%{name: "created_at"}), do: nil + defp field_to_column_sql(%{name: "updated_at"}), do: nil - def get_all(projection_name, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + defp field_to_column_sql(%{name: name, type: type} = field) do + column_name = validate_column_name(name) + nullable = if field.label == :LABEL_OPTIONAL, do: "NULL", else: "NOT NULL" - query = - from(p in {table_name, Projection}, - order_by: [asc: p.inserted_at] - ) + column_type = + if field.label == :LABEL_REPEATED do + Map.get(@type_map, :TYPE_MESSAGE) + else + Map.get(@type_map, type) + end - paginate_query(query, page, page_size) + "#{column_name} #{column_type} #{nullable}" end - @impl true - def get_all_by_projection_id(nil, _projection_id, _page, _page_size), - do: {:error, "No records found"} + defp create_indexes(repo, table_name, fields) do + fields + |> Enum.reject(fn field -> is_nil(field.options) end) + |> Enum.filter(fn field -> + Map.get(field.options.__pb_extensions__, {Spawn.Actors.PbExtension, :searchable}) == true + end) + |> Enum.each(fn field -> + # Determine if the column is TEXT or BLOB + column_type = Map.get(@type_map, field.type) + + length_spec = + case column_type do + # Limit index length to the first 255 characters + "TEXT" -> "(255)" + # Limit index length to the first 255 bytes + "BLOB" -> "(255)" + _ -> "" + end + + # Create the index with the length spec if required + index_sql = + "CREATE INDEX IF NOT EXISTS idx_#{table_name}_#{validate_column_name(field.name)} " <> + "ON #{table_name} (#{validate_column_name(field.name)}#{length_spec})" + + SQL.query!(repo, index_sql) + end) + end - def get_all_by_projection_id(_projection_name, nil, _page, _page_size), - do: {:error, "No records found"} + defp validate_column_name(name) do + name + |> Macro.underscore() + |> String.replace(~r/[^a-z0-9_]/, "") + end - def get_all_by_projection_id(projection_name, projection_id, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + @doc """ + Performs a raw query and returns the results. - query = - from(p in {table_name, Projection}, - where: p.projection_id == ^projection_id, - order_by: [asc: p.inserted_at] - ) + ## Parameters - paginate_query(query, page, page_size) - end + - `repo`: The Ecto repository module. + - `query`: The raw SQL query string with named parameters (e.g., :id). + - `params`: A map of parameter values. + + Returns the result rows as a list of maps. + ## Examples + iex> results = query("SELECT age, metadata FROM example WHERE id = :id", %{id: "value"}) + {:ok, [%{age: 30, metadata: "example data"}]} + """ @impl true - def get_by_interval(nil, _time_start, _time_end, _page, _page_size), - do: {:error, "No records found"} + def query(protobuf_module, query, params, opts) do + repo = __MODULE__ - def get_by_interval(_projection_name, nil, _time_end, _page, _page_size), - do: {:error, "No records found"} + case validate_params(query, params) do + {:error, message} -> + {:error, message} - def get_by_interval(_projection_name, _time_start, nil, _page, _page_size), - do: {:error, "No records found"} + :ok -> + {query, values} = build_params_for_query(params, query) - def get_by_interval(projection_name, time_start, time_end, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + page = opts[:page] || 1 + page_size = opts[:page_size] || 10 - query = - from(p in {table_name, Projection}, - where: p.inserted_at >= ^time_start and p.inserted_at <= ^time_end, - order_by: [asc: p.inserted_at] - ) + # Append LIMIT and OFFSET dynamically + offset = (page - 1) * page_size + + {query, values} = + if has_outer_limit_or_offset?(query) do + # If already present, don't modify the query + {query, values} + else + query = """ + #{query} + LIMIT ? + OFFSET ? + """ + + values = values ++ [page_size, offset] + + {query, values} + end + + result = SQL.query!(repo, query, values) + + columns = result.columns + + results = + Enum.map(result.rows, fn row -> + map_value = Enum.zip(columns, row) |> Enum.into(%{}) - paginate_query(query, page, page_size) + {:ok, decoded} = from_decoded(protobuf_module, map_value) + + decoded + end) + + {:ok, results} + end end - @impl true - def get_by_projection_id_and_interval( - nil, - _projection_id, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - nil, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - nil, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - _time_start, - nil, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - projection_name, - projection_id, - time_start, - time_end, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - - query = - from(p in {table_name, Projection}, - where: - p.projection_id == ^projection_id and p.inserted_at >= ^time_start and - p.inserted_at <= ^time_end, - order_by: [asc: p.inserted_at] - ) + defp from_decoded(module, data) when is_map(data) and is_atom(module) do + data + |> decode_fields(module) + |> Protobuf.JSON.from_decoded(module) + end + + defp decode_fields(existing_map, module) do + descriptor = module.descriptor() + + Map.new(existing_map, fn {key, value} -> + field = Enum.find(descriptor.field, &(&1.name == key)) + + cond do + !field -> + {key, to_proto_decoded(value)} + + field.type == :TYPE_BOOL -> + value = value == 1 - paginate_query(query, page, page_size) + {key, to_proto_decoded(value)} + + (field.type == :TYPE_MESSAGE or field.label == :LABEL_REPEATED) and is_bitstring(value) -> + value = Jason.decode!(value) + + {key, to_proto_decoded(value)} + + true -> + {key, to_proto_decoded(value)} + end + end) end - @impl true - def search_by_metadata( - projection_name, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - key = "$.#{metadata_key}" - - query = - from(p in {table_name, Projection}, - where: - fragment("JSON_UNQUOTE(JSON_EXTRACT(?, ?)) = ?", p.metadata, ^key, ^metadata_value), - order_by: [asc: p.inserted_at] - ) + defp to_proto_decoded(value) when is_boolean(value) do + value + end - paginate_query(query, page, page_size) + defp to_proto_decoded(%NaiveDateTime{} = value) do + DateTime.from_naive!(value, "Etc/UTC") + |> to_proto_decoded() end - @impl true - def search_by_projection_id_and_metadata( - projection_name, - projection_id, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - key = "$.#{metadata_key}" - - query = - from(p in {table_name, Projection}, - where: - p.projection_id == ^projection_id and - fragment("JSON_UNQUOTE(JSON_EXTRACT(?, ?)) = ?", p.metadata, ^key, ^metadata_value), - order_by: [asc: p.inserted_at] - ) + defp to_proto_decoded(%DateTime{} = value) do + DateTime.to_iso8601(value) + end - paginate_query(query, page, page_size) + defp to_proto_decoded(value) when is_atom(value) do + Atom.to_string(value) end - @impl true - def save(%Projection{} = projection) do - table_name = normalize_table_name(projection.projection_name) - record = ValueObjectSchema.to_map(projection) - {:ok, data} = Statestores.Vault.encrypt(record.data) - - query = """ - INSERT INTO #{table_name} - (id, projection_id, projection_name, system, metadata, data_type, data, inserted_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON DUPLICATE KEY UPDATE - projection_id = VALUES(projection_id), - projection_name = VALUES(projection_name), - system = VALUES(system), - metadata = VALUES(metadata), - data_type = VALUES(data_type), - data = VALUES(data), - inserted_at = VALUES(inserted_at), - updated_at = VALUES(updated_at) - """ + defp to_proto_decoded(""), do: nil + + defp to_proto_decoded(value) do + value + end - bindings = [ - record.id, - record.projection_id, - record.projection_name, - record.system, - to_json(record.metadata), - record.data_type, - data, - record.inserted_at, - record.updated_at - ] - - case Ecto.Adapters.SQL.query(__MODULE__, query, bindings) do - {:ok, _result} -> - {:ok, projection} - - {:error, reason} -> - {:error, reason} + defp has_outer_limit_or_offset?(query) do + query + |> String.split(~r/(\(|\)|\bLIMIT\b|\bOFFSET\b)/i, trim: true, include_captures: true) + |> Enum.reduce_while(0, fn token, depth -> + cond do + token == "(" -> {:cont, depth + 1} + token == ")" -> {:cont, depth - 1} + String.match?(token, ~r/\bLIMIT\b|\bOFFSET\b/i) and depth == 0 -> {:halt, true} + true -> {:cont, depth} + end + end) + |> case do + true -> true + _ -> false end end - @impl true - def default_port, do: "3306" + defp build_params_for_query(params, query) when is_struct(params), + do: Map.from_struct(params) |> build_params_for_query(query) + + defp build_params_for_query(params, query) when is_map(params) do + Enum.reduce(params, {query, []}, fn {key, value}, {q, acc} -> + if String.contains?(q, ":#{key}") do + {String.replace(q, ":#{key}", "?"), acc ++ [value]} + else + {q, acc} + end + end) + end + + defp validate_params(query, params) do + required_params = + Regex.scan(~r/:("\w+"|\w+)/, query) + |> List.flatten() + |> Enum.filter(&String.starts_with?(&1, ":")) + |> Enum.map(&String.trim_leading(&1, ":")) - defp to_json(nil), do: Jason.encode!(%{}) - defp to_json(map), do: Jason.encode!(map) + param_keys = params |> Map.keys() |> Enum.map(fn key -> "#{key}" end) - # Private helper to fetch a single record from the database - defp fetch_single_record(query) do - case __MODULE__.one(query) do - nil -> - {:error, "No record found"} + contains_all_params? = Enum.all?(required_params, fn param -> param in param_keys end) - projection -> - {:ok, projection} + if contains_all_params? do + :ok + else + {:error, "Required parameters(s): #{Enum.join(required_params, ", ")}"} end end - # Private helper to handle pagination - defp paginate_query(query, page, page_size) do - case __MODULE__.paginate(query, page: page, page_size: page_size) do - %Scrivener.Page{} = page_data -> - {:ok, page_data} + @doc """ + Performs an upsert (insert or update) of data in the table. - _ -> - {:error, "No records found"} - end + ## Parameters + + - `repo`: The Ecto repository module. + - `protobuf_module`: The Elixir module generated from a Protobuf file. + - `table_name`: Name of the table in the database. + - `data`: Protobuf structure containing the data to be inserted or updated. + + Returns `:ok` on success. + """ + @impl true + def upsert(protobuf_module, table_name, data) do + repo = __MODULE__ + + # Extract the fields from the Protobuf module + descriptor = protobuf_module.descriptor() + fields = descriptor.field + + # Map columns and values + columns = Enum.map(fields, &validate_column_name(&1.name)) + + # Construct the placeholders for the values + placeholders = Enum.map(1..length(columns), fn _ -> "?" end) |> Enum.join(", ") + + # Prepare SET clause for updating values on duplicate key + update_clause = + Enum.map(columns, fn col -> "#{col} = VALUES(#{col})" end) + |> Enum.join(", ") + + # Construct the SQL query for MariaDB + sql = """ + INSERT INTO #{table_name} (#{Enum.join(columns, ", ")}) + VALUES (#{placeholders}) + ON DUPLICATE KEY UPDATE #{update_clause} + """ + + values = + Enum.map(fields, fn field -> + value = Map.get(data, String.to_atom(Macro.underscore(field.name))) + + parse_value = fn + parse_value, %{__unknown_fields__: _} = struct -> + Map.from_struct(struct) + |> Map.delete(:__unknown_fields__) + |> Map.new(fn {key, value} -> {key, parse_value.(parse_value, value)} end) + + _, value when is_boolean(value) -> + value + + _, value when is_atom(value) -> + "#{value}" + + _, value -> + value + end + + parse_value.(parse_value, value) + end) + + # Execute the query + SQL.query!(repo, sql, values) + + :ok end + + @impl true + def default_port, do: "3306" end diff --git a/spawn_statestores/statestores_mariadb/mix.lock b/spawn_statestores/statestores_mariadb/mix.lock index e0e73863..ef96edb9 100644 --- a/spawn_statestores/statestores_mariadb/mix.lock +++ b/spawn_statestores/statestores_mariadb/mix.lock @@ -14,6 +14,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "myxql": {:hex, :myxql, "0.7.1", "7c7b75aa82227cd2bc9b7fbd4de774fb19a1cdb309c219f411f82ca8860f8e01", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:geo, "~> 3.4", [hex: :geo, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "a491cdff53353a09b5850ac2d472816ebe19f76c30b0d36a43317a67c9004936"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"}, "scrivener": {:hex, :scrivener, "2.7.2", "1d913c965ec352650a7f864ad7fd8d80462f76a32f33d57d1e48bc5e9d40aba2", [:mix], [], "hexpm", "7866a0ec4d40274efbee1db8bead13a995ea4926ecd8203345af8f90d2b620d9"}, "scrivener_ecto": {:hex, :scrivener_ecto, "3.1.0", "6e0fcfcabd289b1afe2f2be93db363030716c84ec0ff91fad9054fc6465bd2ee", [:mix], [{:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:scrivener, "~> 2.4", [hex: :scrivener, repo: "hexpm", optional: false]}], "hexpm", "86b721669c2718e8569bcc2573650ad749d5eada5f5bee41c9e260e7201fddf6"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, diff --git a/spawn_statestores/statestores_mariadb/test/mariadb_projection_adapter_test.exs b/spawn_statestores/statestores_mariadb/test/mariadb_projection_adapter_test.exs index 15b22273..f652dde6 100644 --- a/spawn_statestores/statestores_mariadb/test/mariadb_projection_adapter_test.exs +++ b/spawn_statestores/statestores_mariadb/test/mariadb_projection_adapter_test.exs @@ -1,262 +1,149 @@ -defmodule Statestores.Adapters.MariaDBProjectionAdapterTest do - use Statestores.DataCase - alias Statestores.Adapters.MariaDBProjectionAdapter, as: Adapter - alias Statestores.Schemas.Projection +defmodule StatestoresMariaDB.MariaDBProjectionAdapterTest do + use Statestores.DataCase, async: false + + alias Statestores.Manager.StateManager + alias Test.TestMessage import Statestores.Util, only: [load_projection_adapter: 0] setup do repo = load_projection_adapter() - %{repo: repo} + table_name = "test_messages" + + data = %TestMessage{ + name: "test_user", + age: 25, + balance: 100.50, + active: true, + document: "binary-data", + address: %TestMessage.Address{ + street: "123 Main St", + city: "Testville", + state: "TS", + zip_code: "12345", + country: %TestMessage.Address.Country{ + name: "Test Country", + code: "TC" + } + }, + created_at: DateTime.utc_now(), + metadata: %{"key" => "value"}, + tags: ["elixir", "protobuf"], + attributes: %{"role" => "admin"} + } + + {:ok, _} = Ecto.Adapters.SQL.query(repo, "DROP TABLE IF EXISTS #{table_name}") + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) + + %{repo: repo, table_name: table_name, data: data} end - describe "create_table/1" do - test "creates a table if it does not exist" do - projection_name = "test_projections" + test "add new field to the table if schema changes", ctx do + %{ + repo: repo, + data: data, + table_name: table_name + } = ctx - assert {:ok, message} = Adapter.create_table(projection_name) - assert message == "Table #{projection_name} created or already exists." - end - end + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) - describe "get_last/1" do - test "returns the last inserted projection", ctx do - repo = ctx.repo - IO.inspect(repo) - projection_name = "test_projections" - - {:ok, _} = - repo.save(%Projection{ - id: "123", - projection_id: "proj_1", - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - - assert {:ok, projection} = Adapter.get_last(projection_name) - assert projection.projection_id == "proj_1" - end - end + {:ok, _result} = Ecto.Adapters.SQL.query(repo, "ALTER TABLE #{table_name} DROP COLUMN age") - describe "get_last_by_projection_id/2" do - test "returns the last inserted projection for a specific projection_id", ctx do - repo = ctx.repo - projection_name = "test_projections" - projection_id = "proj_1" - - {:ok, _} = - repo.save(%Projection{ - id: "123", - projection_id: projection_id, - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - - assert {:ok, projection} = - Adapter.get_last_by_projection_id(projection_name, projection_id) - - assert projection.projection_id == projection_id - end - end + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) - describe "get_all/3" do - test "returns paginated projections", ctx do - repo = ctx.repo - projection_name = "test_projections" - - Enum.each(1..20, fn n -> - repo.save(%Projection{ - id: "#{n}", - projection_id: "proj_#{n}", - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value#{n}"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - end) - - {:ok, result} = Adapter.get_all(projection_name, 1, 10) - IO.inspect(result, label: "Pagination Result -----------") - assert length(result.entries) == 10 - assert result.page_number == 1 - end - end + data = %{data | age: 34} - describe "search_by_metadata/5" do - test "returns projections matching metadata key and value", ctx do - repo = ctx.repo - projection_name = "test_projections" - metadata_key = "key" - metadata_value = "value1" - - repo.save(%Projection{ - id: "1", - projection_id: "proj_1", - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value1"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - - repo.save(%Projection{ - id: "2", - projection_id: "proj_2", - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value2"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - - {:ok, result} = Adapter.get_all(projection_name) - assert length(result.entries) == 2 - - {:ok, result} = - Adapter.search_by_metadata( - projection_name, - metadata_key, - metadata_value - ) - - assert length(result.entries) == 1 - assert result.entries |> Enum.at(0) |> Map.get(:projection_id) == "proj_1" - end - end + :ok = StateManager.projection_upsert(TestMessage, table_name, data) - describe "search_by_projection_id_and_metadata/6" do - test "returns projections matching projection_id and metadata", ctx do - repo = ctx.repo - projection_name = "test_projections" - projection_id = "proj_1" - metadata_key = "key" - metadata_value = "value1" - - repo.save(%Projection{ - id: "1", - projection_id: projection_id, - projection_name: projection_name, - system: "test_system", - metadata: %{"key" => "value1"}, - data_type: "type.googleapis.com/io.eigr.spawn.example.MyState", - data: <<1, 2, 3>>, - inserted_at: DateTime.utc_now(), - updated_at: DateTime.utc_now() - }) - - {:ok, result} = - Adapter.search_by_projection_id_and_metadata( - projection_name, - projection_id, - metadata_key, - metadata_value - ) - - assert length(result.entries) == 1 - assert result.entries |> Enum.at(0) |> Map.get(:projection_id) == projection_id - end - end + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) - describe "fail for get_last_by_projection_id/2" do - test "returns error if no matching projection_id found", _ctx do - projection_name = "test_projections" - non_existing_projection_id = "non_existing_proj" - - assert {:error, _error_msg} = - Adapter.get_last_by_projection_id( - projection_name, - non_existing_projection_id - ) - end + assert [%TestMessage{age: 34}] = result end - describe "fail get_last/1" do - test "returns error if no projections exist", ctx do - projection_name = "empty_projections_table" + test "performs upsert and query operations", ctx do + %{table_name: table_name, data: data} = ctx + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) + + assert [%TestMessage{name: "test_user"}] = result - assert_raise MyXQL.Error, fn -> - Adapter.get_last(projection_name) - end - end + data = %{data | age: 30} + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) + + assert [%TestMessage{age: 30}] = result end - describe "fail search_by_metadata/5" do - test "returns no results for non-existing metadata key", ctx do - repo = ctx.repo - projection_name = "test_projections" - invalid_metadata_key = "non_existing_key" - metadata_value = "value1" - - {:ok, result} = - Adapter.search_by_metadata( - projection_name, - invalid_metadata_key, - metadata_value - ) - - assert length(result.entries) == 0 - end - - test "returns no results for non-existing metadata value", ctx do - repo = ctx.repo - projection_name = "test_projections" - metadata_key = "key" - invalid_metadata_value = "non_existing_value" - - {:ok, result} = - Adapter.search_by_metadata( - projection_name, - metadata_key, - invalid_metadata_value - ) - - assert length(result.entries) == 0 - end + test "performs upsert and query operations with pagination", ctx do + %{table_name: table_name, data: data} = ctx + + Enum.each(1..10, fn item -> + :ok = StateManager.projection_upsert(TestMessage, table_name, %{data | name: "#{item}"}) + end) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT * FROM test_messages ORDER BY name", + %{name: "test_user"}, + page_size: 3, + page: 2 + ) + + assert [%TestMessage{name: "3"}, %TestMessage{name: "4"}, %TestMessage{name: "5"}] = result end - describe "fail get_last_by_projection_id/2" do - test "returns error if invalid parameters are provided", ctx do - projection_name = "test_projections" - invalid_projection_id = nil - - assert {:error, _message} = - Adapter.get_last_by_projection_id( - projection_name, - invalid_projection_id - ) - end + test "performs a query with no parameters matching query", ctx do + %{table_name: table_name, data: data} = ctx + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + assert {:error, _result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{}, + [] + ) end - describe "fail get_all/3" do - test "returns empty results if requested page is out of bounds", ctx do - projection_name = "test_projections" + test "performs a query with more unecessary parameters", ctx do + %{table_name: table_name, data: data} = ctx + + data = %{data | enum_test: nil} - {:ok, result} = Adapter.get_all(projection_name, 5, 10) + :ok = StateManager.projection_upsert(TestMessage, table_name, data) - IO.inspect(result, - label: "fail get_all/3 ----------------------------------------------------------------" + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT * FROM test_messages", + %{metadata: nil}, + [] ) - assert length(result.entries) == 0 - # this totall create on test - assert result.page_number == 1 - end + assert [%TestMessage{name: "test_user"}] = result end end diff --git a/spawn_statestores/statestores_mariadb/test/support/test.pb.ex b/spawn_statestores/statestores_mariadb/test/support/test.pb.ex new file mode 100644 index 00000000..958beb0e --- /dev/null +++ b/spawn_statestores/statestores_mariadb/test/support/test.pb.ex @@ -0,0 +1,690 @@ +defmodule Test.EnumTest do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.13.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.EnumDescriptorProto{ + name: "EnumTest", + value: [ + %Google.Protobuf.EnumValueDescriptorProto{ + name: "ENUM_TEST_UNKNOWN", + number: 0, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "ENUM_TEST_ACTIVE", + number: 1, + options: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.EnumValueDescriptorProto{ + name: "ENUM_TEST_INACTIVE", + number: 2, + options: nil, + __unknown_fields__: [] + } + ], + options: nil, + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:ENUM_TEST_UNKNOWN, 0) + field(:ENUM_TEST_ACTIVE, 1) + field(:ENUM_TEST_INACTIVE, 2) +end + +defmodule Test.TestMessage.Address.Country do + @moduledoc false + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "Country", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "name", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "name", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "code", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "code", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:name, 1, type: :string) + field(:code, 2, type: :string) +end + +defmodule Test.TestMessage.Address do + @moduledoc false + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "Address", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "street", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "street", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "city", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "city", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "state", + extendee: nil, + number: 3, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "state", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "zip_code", + extendee: nil, + number: 4, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "zipCode", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "country", + extendee: nil, + number: 5, + label: :LABEL_OPTIONAL, + type: :TYPE_MESSAGE, + type_name: ".test.TestMessage.Address.Country", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "country", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [ + %Google.Protobuf.DescriptorProto{ + name: "Country", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "name", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "name", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "code", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "code", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + ], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:street, 1, type: :string) + field(:city, 2, type: :string) + field(:state, 3, type: :string) + field(:zip_code, 4, type: :string, json_name: "zipCode") + field(:country, 5, type: Test.TestMessage.Address.Country) +end + +defmodule Test.TestMessage.AttributesEntry do + @moduledoc false + use Protobuf, map: true, syntax: :proto3, protoc_gen_elixir_version: "0.13.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "AttributesEntry", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "key", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "key", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "value", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "value", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: %Google.Protobuf.MessageOptions{ + message_set_wire_format: false, + no_standard_descriptor_accessor: false, + deprecated: false, + map_entry: true, + deprecated_legacy_json_field_conflicts: nil, + uninterpreted_option: [], + __pb_extensions__: %{}, + __unknown_fields__: [] + }, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:key, 1, type: :string) + field(:value, 2, type: :string) +end + +defmodule Test.TestMessage do + @moduledoc false + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.13.0" + + def descriptor do + # credo:disable-for-next-line + %Google.Protobuf.DescriptorProto{ + name: "TestMessage", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "name", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: %Google.Protobuf.FieldOptions{ + ctype: :STRING, + packed: nil, + deprecated: false, + lazy: false, + jstype: :JS_NORMAL, + weak: false, + unverified_lazy: false, + debug_redact: false, + uninterpreted_option: [], + __pb_extensions__: %{{Spawn.Actors.PbExtension, :actor_id} => true}, + __unknown_fields__: [] + }, + oneof_index: nil, + json_name: "name", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "age", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_INT32, + type_name: nil, + default_value: nil, + options: %Google.Protobuf.FieldOptions{ + ctype: :STRING, + packed: nil, + deprecated: false, + lazy: false, + jstype: :JS_NORMAL, + weak: false, + unverified_lazy: false, + debug_redact: false, + uninterpreted_option: [], + __pb_extensions__: %{{Spawn.Actors.PbExtension, :searchable} => true}, + __unknown_fields__: [] + }, + oneof_index: nil, + json_name: "age", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "balance", + extendee: nil, + number: 3, + label: :LABEL_OPTIONAL, + type: :TYPE_DOUBLE, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "balance", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "active", + extendee: nil, + number: 4, + label: :LABEL_OPTIONAL, + type: :TYPE_BOOL, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "active", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "document", + extendee: nil, + number: 5, + label: :LABEL_OPTIONAL, + type: :TYPE_BYTES, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "document", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "address", + extendee: nil, + number: 7, + label: :LABEL_OPTIONAL, + type: :TYPE_MESSAGE, + type_name: ".test.TestMessage.Address", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "address", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "created_at", + extendee: nil, + number: 8, + label: :LABEL_OPTIONAL, + type: :TYPE_MESSAGE, + type_name: ".google.protobuf.Timestamp", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "createdAt", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "metadata", + extendee: nil, + number: 9, + label: :LABEL_OPTIONAL, + type: :TYPE_MESSAGE, + type_name: ".google.protobuf.Struct", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "metadata", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "tags", + extendee: nil, + number: 10, + label: :LABEL_REPEATED, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "tags", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "attributes", + extendee: nil, + number: 11, + label: :LABEL_REPEATED, + type: :TYPE_MESSAGE, + type_name: ".test.TestMessage.AttributesEntry", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "attributes", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "enum_test", + extendee: nil, + number: 12, + label: :LABEL_OPTIONAL, + type: :TYPE_ENUM, + type_name: ".test.EnumTest", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "enumTest", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [ + %Google.Protobuf.DescriptorProto{ + name: "Address", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "street", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "street", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "city", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "city", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "state", + extendee: nil, + number: 3, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "state", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "zip_code", + extendee: nil, + number: 4, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "zipCode", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "country", + extendee: nil, + number: 5, + label: :LABEL_OPTIONAL, + type: :TYPE_MESSAGE, + type_name: ".test.TestMessage.Address.Country", + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "country", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [ + %Google.Protobuf.DescriptorProto{ + name: "Country", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "name", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "name", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "code", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "code", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + ], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + }, + %Google.Protobuf.DescriptorProto{ + name: "AttributesEntry", + field: [ + %Google.Protobuf.FieldDescriptorProto{ + name: "key", + extendee: nil, + number: 1, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "key", + proto3_optional: nil, + __unknown_fields__: [] + }, + %Google.Protobuf.FieldDescriptorProto{ + name: "value", + extendee: nil, + number: 2, + label: :LABEL_OPTIONAL, + type: :TYPE_STRING, + type_name: nil, + default_value: nil, + options: nil, + oneof_index: nil, + json_name: "value", + proto3_optional: nil, + __unknown_fields__: [] + } + ], + nested_type: [], + enum_type: [], + extension_range: [], + extension: [], + options: %Google.Protobuf.MessageOptions{ + message_set_wire_format: false, + no_standard_descriptor_accessor: false, + deprecated: false, + map_entry: true, + deprecated_legacy_json_field_conflicts: nil, + uninterpreted_option: [], + __pb_extensions__: %{}, + __unknown_fields__: [] + }, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + ], + enum_type: [], + extension_range: [], + extension: [], + options: nil, + oneof_decl: [], + reserved_range: [], + reserved_name: [], + __unknown_fields__: [] + } + end + + field(:name, 1, type: :string, deprecated: false) + field(:age, 2, type: :int32, deprecated: false) + field(:balance, 3, type: :double) + field(:active, 4, type: :bool) + field(:document, 5, type: :bytes) + field(:address, 7, type: Test.TestMessage.Address) + field(:created_at, 8, type: Google.Protobuf.Timestamp, json_name: "createdAt") + field(:metadata, 9, type: Google.Protobuf.Struct) + field(:tags, 10, repeated: true, type: :string) + field(:attributes, 11, repeated: true, type: Test.TestMessage.AttributesEntry, map: true) + field(:enum_test, 12, type: Test.EnumTest, json_name: "enumTest", enum: true) +end diff --git a/spawn_statestores/statestores_native/lib/statestores/adapters/native_projection_adapter.ex b/spawn_statestores/statestores_native/lib/statestores/adapters/native_projection_adapter.ex index 4d20fd84..c18589e3 100644 --- a/spawn_statestores/statestores_native/lib/statestores/adapters/native_projection_adapter.ex +++ b/spawn_statestores/statestores_native/lib/statestores/adapters/native_projection_adapter.ex @@ -10,138 +10,18 @@ defmodule Statestores.Adapters.NativeProjectionAdapter do import Statestores.Util, only: [normalize_table_name: 1] @impl true - def create_table(nil), do: {:error, "Projection name cannot be nil."} - - def create_table(projection_name) do - table_name = normalize_table_name(projection_name) - {:ok, "Table #{table_name} created or already exists."} - end - - @impl true - def get_last(nil), do: {:error, "No record found"} - - def get_last(projection_name) do - nil - end - - @impl true - def get_last_by_projection_id(nil, _projection_id), do: {:error, "No record found"} - def get_last_by_projection_id(_projection_name, nil), do: {:error, "No record found"} - - def get_last_by_projection_id(projection_name, projection_id) do - nil - end - - @impl true - def get_all(nil, _page, _page_size), do: {:error, "No records found"} - - def get_all(projection_name, page \\ 1, page_size \\ 50) do - nil - end - - @impl true - def get_all_by_projection_id(nil, _projection_id, _page, _page_size), - do: {:error, "No records found"} - - def get_all_by_projection_id(_projection_name, nil, _page, _page_size), - do: {:error, "No records found"} - - def get_all_by_projection_id(projection_name, projection_id, page \\ 1, page_size \\ 50) do - nil - end - - @impl true - def get_by_interval(nil, _time_start, _time_end, _page, _page_size), - do: {:error, "No records found"} - - def get_by_interval(_projection_name, nil, _time_end, _page, _page_size), - do: {:error, "No records found"} - - def get_by_interval(_projection_name, _time_start, nil, _page, _page_size), - do: {:error, "No records found"} - - def get_by_interval(projection_name, time_start, time_end, page \\ 1, page_size \\ 50) do - nil - end - - @impl true - def get_by_projection_id_and_interval( - nil, - _projection_id, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - nil, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - nil, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - _time_start, - nil, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - projection_name, - projection_id, - time_start, - time_end, - page \\ 1, - page_size \\ 50 - ) do - nil - end - - @impl true - def search_by_metadata( - projection_name, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - nil + def create_or_update_table(_projection_type, _table_name) do + raise "Projections are not supported using native adapter" end @impl true - def search_by_projection_id_and_metadata( - projection_name, - projection_id, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - nil + def upsert(_projection_type, _table_name, _data) do + raise "Projections are not supported using native adapter" end @impl true - def save(%Projection{} = projection) do - {:ok, projection} + def query(_projection_type, _query, _params, _opts) do + raise "Projections are not supported using native adapter" end @impl true diff --git a/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_projection_adapter.ex b/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_projection_adapter.ex index a29bf562..94bb15a2 100644 --- a/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_projection_adapter.ex +++ b/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_projection_adapter.ex @@ -8,301 +8,393 @@ defmodule Statestores.Adapters.PostgresProjectionAdapter do otp_app: :spawn_statestores, adapter: Ecto.Adapters.Postgres - use Scrivener, page_size: 50 + alias Ecto.Adapters.SQL - alias Statestores.Schemas.{Projection, ValueObjectSchema} + @type_map %{ + :TYPE_INT32 => "INTEGER", + :TYPE_INT64 => "BIGINT", + :TYPE_STRING => "TEXT", + :TYPE_BOOL => "BOOLEAN", + :TYPE_FLOAT => "REAL", + :TYPE_DOUBLE => "DOUBLE PRECISION", + :TYPE_BYTES => "BYTEA", + :TYPE_MESSAGE => "JSONB", + :TYPE_ENUM => "TEXT" + } - import Ecto.Query - import Statestores.Util, only: [normalize_table_name: 1] + @doc """ + Dynamically creates or updates a table in the PostgreSQL database from a Protobuf module. - @impl true - def create_table(nil), do: {:error, "Projection name cannot be nil."} - - def create_table(projection_name) do - table_name = normalize_table_name(projection_name) - - query = """ - CREATE TABLE IF NOT EXISTS #{table_name} ( - id VARCHAR(255) PRIMARY KEY, - projection_id VARCHAR(255), - projection_name VARCHAR(255), - system VARCHAR(255), - metadata JSON, - data_type VARCHAR(255), - data BYTEA, - inserted_at TIMESTAMP, - updated_at TIMESTAMP - ); - """ + ## Parameters - case Ecto.Adapters.SQL.query(__MODULE__, query) do - {:ok, _result} -> - {:ok, "Table #{table_name} created or already exists."} + - `repo`: The module from the Ecto repository. + - `protobuf_module`: The Elixir module generated from a Protobuf file. + - `table_name`: Name of the table to be created or updated in the database. - {:error, ex} -> - {:error, "Error during creation of table #{table_name}.: #{inspect(ex)}"} - end - end + ## Example + + iex> create_or_update_table(MyProtobufModule, "my_table") + """ @impl true - def get_last(nil), do: {:error, "No record found"} + def create_or_update_table(protobuf_module, table_name) do + repo = __MODULE__ + descriptor = protobuf_module.descriptor() + fields = descriptor.field - def get_last(projection_name) do - table_name = normalize_table_name(projection_name) + {:ok, _} = + repo.transaction(fn -> + # Create table if it does not exist + create_table_if_not_exists(repo, table_name, fields) - query = - from(p in {table_name, Projection}, - order_by: [desc: p.updated_at], - limit: 1 - ) + # Update table to add missing columns + update_table_columns(repo, table_name, fields) - fetch_single_record(query) - end + # Add indexes for searchable columns + create_indexes(repo, table_name, fields) + end) - @impl true - def get_last_by_projection_id(nil, _projection_id), do: {:error, "No record found"} - def get_last_by_projection_id(_projection_name, nil), do: {:error, "No record found"} + :ok + end - def get_last_by_projection_id(projection_name, projection_id) do - table_name = normalize_table_name(projection_name) + defp create_table_if_not_exists(repo, table_name, fields) do + columns_sql = + fields + |> Enum.map(&field_to_column_sql/1) + |> Enum.reject(&is_nil/1) + |> Enum.join(", ") + + timestamp_columns = + "created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP" + + primary_key_column = + fields + |> Enum.find(fn field -> + options = field.options || %{} + + actor_id_extension = + options + |> Map.get(:__pb_extensions__, %{}) + |> Map.get({Spawn.Actors.PbExtension, :actor_id}) + + actor_id_extension == true + end) + |> case do + nil -> "id SERIAL PRIMARY KEY" + field -> "PRIMARY KEY (#{Macro.underscore(field.name)})" + end + + create_table_sql = + [ + "CREATE TABLE IF NOT EXISTS #{table_name} (", + columns_sql, + ", #{timestamp_columns}", + ", #{primary_key_column}", + ")" + ] + |> Enum.reject(&is_nil/1) + |> Enum.join(" ") + + SQL.query!(repo, create_table_sql) + end - query = - from(p in {table_name, Projection}, - where: p.projection_id == ^projection_id, - order_by: [desc: p.updated_at], - limit: 1 + defp update_table_columns(repo, table_name, fields) do + existing_columns = + SQL.query!( + repo, + "SELECT column_name FROM information_schema.columns WHERE table_name = $1", + [table_name] ) - - fetch_single_record(query) + |> Map.get(:rows) + |> List.flatten() + + fields + |> Enum.reject(fn field -> validate_column_name(field.name) in existing_columns end) + |> Enum.each(fn field -> + column_sql = field_to_column_sql(field) + alter_table_sql = "ALTER TABLE #{table_name} ADD COLUMN #{column_sql}" + SQL.query!(repo, alter_table_sql) + end) end - @impl true - def get_all(nil, _page, _page_size), do: {:error, "No records found"} + defp field_to_column_sql(%{name: "created_at"}), do: nil + defp field_to_column_sql(%{name: "updated_at"}), do: nil - def get_all(projection_name, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + defp field_to_column_sql(%{name: name, type: type} = field) do + column_name = validate_column_name(name) + nullable = if field.label == :LABEL_OPTIONAL, do: "NULL", else: "NOT NULL" - query = - from(p in {table_name, Projection}, - order_by: [asc: p.inserted_at] - ) + column_type = + if field.label == :LABEL_REPEATED do + Map.get(@type_map, :TYPE_MESSAGE) + else + Map.get(@type_map, type) + end + + "#{column_name} #{column_type} #{nullable}" + end + + defp create_indexes(repo, table_name, fields) do + fields + |> Enum.reject(fn field -> is_nil(field.options) end) + |> Enum.filter(fn field -> + Map.get(field.options.__pb_extensions__, {Spawn.Actors.PbExtension, :searchable}) == true + end) + |> Enum.each(fn field -> + index_sql = + "CREATE INDEX IF NOT EXISTS idx_#{table_name}_#{validate_column_name(field.name)} ON #{table_name} (#{validate_column_name(field.name)})" + + SQL.query!(repo, index_sql) + end) + end - paginate_query(query, page, page_size) + defp validate_column_name(name) do + name + |> Macro.underscore() + |> String.replace(~r/[^a-z0-9_]/, "") end + @doc """ + Performs a raw query and returns the results. + + ## Parameters + + - `repo`: The Ecto repository module. + - `query`: The raw SQL query string with named parameters (e.g., :id). + - `params`: A map of parameter values. + + Returns the result rows as a list of maps. + + ## Examples + iex> results = query("SELECT age, metadata FROM example WHERE id = :id", %{id: "value"}) + {:ok, [%{age: 30, metadata: "example data"}]} + """ @impl true - def get_all_by_projection_id(nil, _projection_id, _page, _page_size), - do: {:error, "No records found"} + def query(protobuf_module, query, params, opts) do + repo = __MODULE__ - def get_all_by_projection_id(_projection_name, nil, _page, _page_size), - do: {:error, "No records found"} + case validate_params(query, params) do + {:error, message} -> + {:error, message} - def get_all_by_projection_id(projection_name, projection_id, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + :ok -> + {query, values} = build_params_for_query(params, query) - query = - from(p in {table_name, Projection}, - where: p.projection_id == ^projection_id, - order_by: [asc: p.inserted_at] - ) + page = opts[:page] || 1 + page_size = opts[:page_size] || 10 + # Append LIMIT and OFFSET dynamically + offset = (page - 1) * page_size + + {query, values} = + if has_outer_limit_or_offset?(query) do + # If already present, don't modify the query + {query, values} + else + query = """ + #{query} + LIMIT $#{length(values) + 1} + OFFSET $#{length(values) + 2} + """ + + values = values ++ [page_size, offset] + + {query, values} + end + + result = SQL.query!(repo, query, values) + + columns = result.columns + + results = + Enum.map(result.rows, fn row -> + map_value = Enum.zip(columns, row) |> Enum.into(%{}) - paginate_query(query, page, page_size) + {:ok, decoded} = from_decoded(protobuf_module, map_value) + + decoded + end) + + {:ok, results} + end end - @impl true - def get_by_interval(nil, _time_start, _time_end, _page, _page_size), - do: {:error, "No records found"} + defp has_outer_limit_or_offset?(query) do + query + |> String.split(~r/(\(|\)|\bLIMIT\b|\bOFFSET\b)/i, trim: true, include_captures: true) + |> Enum.reduce_while(0, fn token, depth -> + cond do + token == "(" -> {:cont, depth + 1} + token == ")" -> {:cont, depth - 1} + String.match?(token, ~r/\bLIMIT\b|\bOFFSET\b/i) and depth == 0 -> {:halt, true} + true -> {:cont, depth} + end + end) + |> case do + true -> true + _ -> false + end + end - def get_by_interval(_projection_name, nil, _time_end, _page, _page_size), - do: {:error, "No records found"} + defp build_params_for_query(params, query) when is_struct(params), + do: Map.from_struct(params) |> build_params_for_query(query) + + defp build_params_for_query(params, query) when is_map(params) do + Enum.reduce(params, {query, []}, fn {key, value}, {q, acc} -> + if String.contains?(q, ":#{key}") do + {String.replace(q, ":#{key}", "$#{length(acc) + 1}"), acc ++ [value]} + else + {q, acc} + end + end) + end - def get_by_interval(_projection_name, _time_start, nil, _page, _page_size), - do: {:error, "No records found"} + defp validate_params(query, params) do + required_params = + Regex.scan(~r/:("\w+"|\w+)/, query) + |> List.flatten() + |> Enum.filter(&String.starts_with?(&1, ":")) + |> Enum.map(&String.trim_leading(&1, ":")) - def get_by_interval(projection_name, time_start, time_end, page \\ 1, page_size \\ 50) do - table_name = normalize_table_name(projection_name) + param_keys = params |> Map.keys() |> Enum.map(fn key -> "#{key}" end) - query = - from(p in {table_name, Projection}, - where: p.inserted_at >= ^time_start and p.inserted_at <= ^time_end, - order_by: [asc: p.inserted_at] - ) + contains_all_params? = Enum.all?(required_params, fn param -> param in param_keys end) - paginate_query(query, page, page_size) + if contains_all_params? do + :ok + else + {:error, "Required parameters(s): #{Enum.join(required_params, ", ")}"} + end end - @impl true - def get_by_projection_id_and_interval( - nil, - _projection_id, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - nil, - _time_start, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - nil, - _time_end, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - _projection_name, - _projection_id, - _time_start, - nil, - _page, - _page_size - ), - do: {:error, "No records found"} - - def get_by_projection_id_and_interval( - projection_name, - projection_id, - time_start, - time_end, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - - query = - from(p in {table_name, Projection}, - where: - p.projection_id == ^projection_id and p.inserted_at >= ^time_start and - p.inserted_at <= ^time_end, - order_by: [asc: p.inserted_at] - ) + defp from_decoded(module, data) when is_map(data) and is_atom(module) do + data + |> to_proto_decoded() + |> Protobuf.JSON.from_decoded(module) + end - paginate_query(query, page, page_size) + defp to_proto_decoded({k, v}) when is_atom(k) do + {Atom.to_string(k), to_proto_decoded(v)} end - @impl true - def search_by_metadata( - projection_name, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - - query = - from(p in {table_name, Projection}, - where: fragment("?->>? = ?", p.metadata, ^metadata_key, ^metadata_value), - order_by: [asc: p.inserted_at] - ) + defp to_proto_decoded({k, v}) do + {k, to_proto_decoded(v)} + end - paginate_query(query, page, page_size) + defp to_proto_decoded(value) when is_list(value) do + Enum.map(value, &to_proto_decoded/1) end - @impl true - def search_by_projection_id_and_metadata( - projection_name, - projection_id, - metadata_key, - metadata_value, - page \\ 1, - page_size \\ 50 - ) do - table_name = normalize_table_name(projection_name) - - query = - from(p in {table_name, Projection}, - where: - p.projection_id == ^projection_id and - fragment("?->>? = ?", p.metadata, ^metadata_key, ^metadata_value), - order_by: [asc: p.inserted_at] - ) + defp to_proto_decoded(value) when is_boolean(value) do + value + end - paginate_query(query, page, page_size) + defp to_proto_decoded(%NaiveDateTime{} = value) do + DateTime.from_naive!(value, "Etc/UTC") + |> to_proto_decoded() end - @impl true - def save(%Projection{} = projection) do - record = ValueObjectSchema.to_map(projection) - {:ok, data} = Statestores.Vault.encrypt(record.data) - - # Prepare the SQL query using `ON CONFLICT` - query = """ - INSERT INTO #{projection.projection_name} - (id, projection_id, projection_name, system, metadata, data_type, data, inserted_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (id) DO UPDATE - SET - projection_id = EXCLUDED.projection_id, - projection_name = EXCLUDED.projection_name, - system = EXCLUDED.system, - metadata = EXCLUDED.metadata, - data_type = EXCLUDED.data_type, - data = EXCLUDED.data, - inserted_at = EXCLUDED.inserted_at, - updated_at = EXCLUDED.updated_at - """ + defp to_proto_decoded(%DateTime{} = value) do + DateTime.to_iso8601(value) + end - bindings = [ - record.id, - record.projection_id, - record.projection_name, - record.system, - record.metadata, - record.data_type, - data, - record.inserted_at, - record.updated_at - ] - - # Execute the query using Ecto.Adapters.SQL.query/4 - case Ecto.Adapters.SQL.query(__MODULE__, query, bindings) do - {:ok, _result} -> - {:ok, projection} - - {:error, reason} -> - {:error, reason} - end + defp to_proto_decoded(value) when is_atom(value) do + Atom.to_string(value) + end + + defp to_proto_decoded(existing_map) when is_map(existing_map) do + Map.new(existing_map, &to_proto_decoded/1) + end + + defp to_proto_decoded(""), do: nil + + defp to_proto_decoded(value) do + value end + @doc """ + Performs an upsert (insert or update) of data in the table. + + ## Parameters + + - `repo`: The Ecto repository module. + - `protobuf_module`: The Elixir module generated from a Protobuf file. + - `table_name`: Name of the table in the database. + - `data`: Protobuf structure containing the data to be inserted or updated. + + Returns `:ok` on success. + """ @impl true - def default_port, do: "5432" + def upsert(protobuf_module, table_name, data) do + repo = __MODULE__ - defp to_json(nil), do: Jason.encode!(%{}) - defp to_json(map), do: Jason.encode!(map) + descriptor = protobuf_module.descriptor() + fields = descriptor.field - # Private helper to fetch a single record from the database - defp fetch_single_record(query) do - case __MODULE__.one(query) do - nil -> - {:error, "No record found"} + primary_key = get_primary_key(fields) - projection -> - {:ok, projection} - end + columns = + fields + |> Enum.map(&Macro.underscore(&1.name)) + + placeholders = Enum.map(columns, &"$#{Enum.find_index(columns, fn col -> col == &1 end) + 1}") + + updates = + columns + |> Enum.reject(&(&1 == primary_key)) + |> Enum.map(&"#{&1} = EXCLUDED.#{&1}") + |> Enum.join(", ") + + sql = """ + INSERT INTO #{table_name} (#{Enum.join(columns, ", ")}) + VALUES (#{Enum.join(placeholders, ", ")}) + ON CONFLICT (#{primary_key}) DO UPDATE + SET #{updates} + """ + + values = + Enum.map(fields, fn field -> + value = Map.get(data, String.to_atom(Macro.underscore(field.name))) + + parse_value = fn + parse_value, %{__unknown_fields__: _} = struct -> + Map.from_struct(struct) + |> Map.delete(:__unknown_fields__) + |> Map.new(fn {key, value} -> {key, parse_value.(parse_value, value)} end) + + _, value when is_boolean(value) -> + value + + _, value when is_atom(value) -> + "#{value}" + + _, value -> + value + end + + parse_value.(parse_value, value) + end) + + SQL.query!(repo, sql, values) + + :ok end - # Private helper to handle pagination - defp paginate_query(query, page, page_size) do - case __MODULE__.paginate(query, page: page, page_size: page_size) do - %Scrivener.Page{} = page_data -> - {:ok, page_data} + defp get_primary_key(fields) do + case Enum.find(fields, fn field -> + options = field.options || %{} - _ -> - {:error, "No records found"} + actor_id_extension = + options + |> Map.get(:__pb_extensions__, %{}) + |> Map.get({Spawn.Actors.PbExtension, :actor_id}) + + actor_id_extension == true + end) do + nil -> "id" + field -> Macro.underscore(field.name) end end + + @impl true + def default_port, do: "5432" end diff --git a/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_snapshot_adapter.ex.ex b/spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_snapshot_adapter.ex similarity index 100% rename from spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_snapshot_adapter.ex.ex rename to spawn_statestores/statestores_postgres/lib/statestores/adapters/postgres_snapshot_adapter.ex diff --git a/spawn_statestores/statestores_postgres/test/postgres_projection_test.exs b/spawn_statestores/statestores_postgres/test/postgres_projection_test.exs new file mode 100644 index 00000000..a50a0591 --- /dev/null +++ b/spawn_statestores/statestores_postgres/test/postgres_projection_test.exs @@ -0,0 +1,149 @@ +defmodule StatestoresPostgres.PostgresProjectionTest do + use Statestores.DataCase, async: false + + alias Statestores.Manager.StateManager + alias Test.TestMessage + + import Statestores.Util, only: [load_projection_adapter: 0] + + setup do + repo = load_projection_adapter() + table_name = "test_messages" + + data = %TestMessage{ + name: "test_user", + age: 25, + balance: 100.50, + active: true, + document: "binary-data", + address: %TestMessage.Address{ + street: "123 Main St", + city: "Testville", + state: "TS", + zip_code: "12345", + country: %TestMessage.Address.Country{ + name: "Test Country", + code: "TC" + } + }, + created_at: DateTime.utc_now(), + metadata: %{"key" => "value"}, + tags: ["elixir", "protobuf"], + attributes: %{"role" => "admin"} + } + + {:ok, _} = Ecto.Adapters.SQL.query(repo, "DROP TABLE IF EXISTS #{table_name}") + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) + + %{repo: repo, table_name: table_name, data: data} + end + + test "add new field to the table if schema changes", ctx do + %{ + data: data, + repo: repo, + table_name: table_name + } = ctx + + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) + + {:ok, _result} = Ecto.Adapters.SQL.query(repo, "ALTER TABLE #{table_name} DROP COLUMN age") + + :ok = StateManager.projection_create_or_update_table(TestMessage, table_name) + + data = %{data | age: 34} + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) + + assert [%TestMessage{age: 34}] = result + end + + test "performs upsert and query operations", ctx do + %{table_name: table_name, data: data} = ctx + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) + + assert [%TestMessage{name: "test_user"}] = result + + data = %{data | age: 30} + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{name: "test_user"}, + [] + ) + + assert [%TestMessage{age: 30}] = result + end + + test "performs upsert and query operations with pagination", ctx do + %{table_name: table_name, data: data} = ctx + + Enum.each(1..10, fn item -> + :ok = StateManager.projection_upsert(TestMessage, table_name, %{data | name: "#{item}"}) + end) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT * FROM test_messages ORDER BY name", + %{name: "test_user"}, + page_size: 3, + page: 2 + ) + + assert [%TestMessage{name: "3"}, %TestMessage{name: "4"}, %TestMessage{name: "5"}] = result + end + + test "performs a query with no parameters matching query", ctx do + %{table_name: table_name, data: data} = ctx + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + assert {:error, _result} = + StateManager.projection_query( + TestMessage, + "SELECT age, name FROM test_messages WHERE name = :name", + %{}, + [] + ) + end + + test "performs a query with more unecessary parameters", ctx do + %{table_name: table_name, data: data} = ctx + + data = %{data | enum_test: nil} + + :ok = StateManager.projection_upsert(TestMessage, table_name, data) + + {:ok, result} = + StateManager.projection_query( + TestMessage, + "SELECT * FROM test_messages", + %{metadata: nil}, + [] + ) + + assert [%TestMessage{name: "test_user"}] = result + end +end