From c727ef114cba47ef5e1e06f87a0b8b7e440daa51 Mon Sep 17 00:00:00 2001 From: Tomasz Stolarczyk Date: Wed, 25 Sep 2024 00:30:08 +0200 Subject: [PATCH] Introduce a new write model alongside the old one The idea for plugging in a new write model is that we want to begin its stream with migration event, that's why we check whether migration event is already in the stream We also use an advisory lock to avoid race conditions when adding the migration event. --- .../app/models/inventory/product.rb | 8 +++ .../app/models/inventory/product_service.rb | 52 +++++++++++++++++-- .../models/inventory/stock_level_migrated.rb | 6 +++ .../test/integration/orders_test.rb | 32 ++++++++++++ 4 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 rails_application/app/models/inventory/stock_level_migrated.rb diff --git a/rails_application/app/models/inventory/product.rb b/rails_application/app/models/inventory/product.rb index 5204693b..1f0b1e70 100644 --- a/rails_application/app/models/inventory/product.rb +++ b/rails_application/app/models/inventory/product.rb @@ -20,6 +20,10 @@ def withdraw(quantity) apply(StockLevelDecreased.new(data: { id:, quantity: })) end + def migration_event(quantity) + apply(StockLevelMigrated.new(data: { id:, quantity: })) + end + on StockLevelIncreased do |event| @stock_level += event.data[:quantity] end @@ -27,5 +31,9 @@ def withdraw(quantity) on StockLevelDecreased do |event| @stock_level -= event.data[:quantity] end + + on StockLevelMigrated do |event| + @stock_level = event.data[:quantity] + end end end diff --git a/rails_application/app/models/inventory/product_service.rb b/rails_application/app/models/inventory/product_service.rb index 8a9defcf..50d62fc1 100644 --- a/rails_application/app/models/inventory/product_service.rb +++ b/rails_application/app/models/inventory/product_service.rb @@ -3,14 +3,58 @@ module Inventory class ProductService + def initialize + @repository = Infra::AggregateRootRepository.new(event_store) + end + def decrement_stock_level(product_id) - product = ::Product.find(product_id) - product.decrement!(:stock_level) + ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do + product = ::Product.find(product_id) + product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a + + if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" } + with_inventory_product(product_id) do |aggregate| + aggregate.withdraw(1) + end + else + with_inventory_product(product_id) do |aggregate| + aggregate.migration_event(product.stock_level) + aggregate.withdraw(1) + end + end + product.decrement!(:stock_level) + end end def increment_stock_level(product_id) - product = ::Product.find(product_id) - product.increment!(:stock_level) + ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do + product = ::Product.find(product_id) + product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a + + if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" } + with_inventory_product(product_id) do |aggregate| + aggregate.supply(1) + end + else + with_inventory_product(product_id) do |aggregate| + aggregate.migration_event(product.stock_level) + aggregate.supply(1) + end + end + product.increment!(:stock_level) + end + end + + private + + def event_store + Rails.configuration.event_store + end + + def with_inventory_product(product_id) + @repository.with_aggregate(Inventory::Product, product_id) do |product| + yield(product) + end end end end diff --git a/rails_application/app/models/inventory/stock_level_migrated.rb b/rails_application/app/models/inventory/stock_level_migrated.rb new file mode 100644 index 00000000..ac95573e --- /dev/null +++ b/rails_application/app/models/inventory/stock_level_migrated.rb @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +module Inventory + StockLevelMigrated = Class.new(Infra::Event) +end + diff --git a/rails_application/test/integration/orders_test.rb b/rails_application/test/integration/orders_test.rb index f82aaa7d..4880e652 100644 --- a/rails_application/test/integration/orders_test.rb +++ b/rails_application/test/integration/orders_test.rb @@ -40,6 +40,13 @@ def test_happy_path assert_changes -> { Product.find(async_remote_id).stock_level }, from: 10, to: 9 do post "/orders/#{order_id}/add_item?product_id=#{async_remote_id}" end + assert_expected_events_in_stream( + inventory_product_stream_name(async_remote_id), + [ + Inventory::StockLevelMigrated.new(data: { id: async_remote_id, quantity: 10 }), + Inventory::StockLevelDecreased.new(data: { id: async_remote_id, quantity: 1 }) + ] + ) assert_changes -> { Product.find(fearless_id).stock_level }, from: 10, to: 8 do post "/orders/#{order_id}/add_item?product_id=#{fearless_id}" @@ -47,6 +54,17 @@ def test_happy_path post "/orders/#{order_id}/remove_item?product_id=#{fearless_id}" post "/orders/#{order_id}/add_item?product_id=#{fearless_id}" end + assert_expected_events_in_stream( + inventory_product_stream_name(fearless_id), + [ + Inventory::StockLevelMigrated.new(data: { id: fearless_id, quantity: 10 }), + Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }), + Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }), + Inventory::StockLevelIncreased.new(data: { id: fearless_id, quantity: 1 }), + Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }) + ] + ) + get "/orders/#{order_id}/edit" assert_remove_buttons_visible(async_remote_id, fearless_id, order_id) @@ -323,4 +341,18 @@ def apply_discount_10_percent(order_id) post "/orders/#{order_id}/update_discount?amount=10" end + + def inventory_product_stream_name(product_id) + "Inventory::Product$#{product_id}" + end + + def assert_expected_events_in_stream(stream_name, expected, event_store: Rails.configuration.event_store) + actual = + event_store + .read + .stream(stream_name) + .map { |event| { data: event.data, type: event.event_type } } + expected = expected.map { |event| { data: event.data, type: event.class.to_s } } + assert_equal(expected, actual) + end end