Skip to content

Commit

Permalink
Introduce a new write model alongside the old one
Browse files Browse the repository at this point in the history
The idea for plugging in a new write model is that we want to begin its
stream with migration event, that's why we check whether migration event
is already in the stream

We also use an advisory lock to avoid race conditions when adding the
migration event.
  • Loading branch information
stolarczykt committed Sep 24, 2024
1 parent baf91d7 commit c727ef1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 4 deletions.
8 changes: 8 additions & 0 deletions rails_application/app/models/inventory/product.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ def withdraw(quantity)
apply(StockLevelDecreased.new(data: { id:, quantity: }))
end

def migration_event(quantity)
apply(StockLevelMigrated.new(data: { id:, quantity: }))
end

on StockLevelIncreased do |event|
@stock_level += event.data[:quantity]
end

on StockLevelDecreased do |event|
@stock_level -= event.data[:quantity]
end

on StockLevelMigrated do |event|
@stock_level = event.data[:quantity]
end
end
end
52 changes: 48 additions & 4 deletions rails_application/app/models/inventory/product_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,58 @@
module Inventory
class ProductService

def initialize
@repository = Infra::AggregateRootRepository.new(event_store)
end

def decrement_stock_level(product_id)
product = ::Product.find(product_id)
product.decrement!(:stock_level)
ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do
product = ::Product.find(product_id)
product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a

if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" }
with_inventory_product(product_id) do |aggregate|
aggregate.withdraw(1)
end
else
with_inventory_product(product_id) do |aggregate|
aggregate.migration_event(product.stock_level)
aggregate.withdraw(1)
end
end
product.decrement!(:stock_level)
end
end

def increment_stock_level(product_id)
product = ::Product.find(product_id)
product.increment!(:stock_level)
ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do
product = ::Product.find(product_id)
product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a

if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" }
with_inventory_product(product_id) do |aggregate|
aggregate.supply(1)
end
else
with_inventory_product(product_id) do |aggregate|
aggregate.migration_event(product.stock_level)
aggregate.supply(1)
end
end
product.increment!(:stock_level)
end
end

private

def event_store
Rails.configuration.event_store
end

def with_inventory_product(product_id)
@repository.with_aggregate(Inventory::Product, product_id) do |product|
yield(product)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# frozen_string_literal: true

module Inventory
StockLevelMigrated = Class.new(Infra::Event)
end

32 changes: 32 additions & 0 deletions rails_application/test/integration/orders_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,31 @@ def test_happy_path
assert_changes -> { Product.find(async_remote_id).stock_level }, from: 10, to: 9 do
post "/orders/#{order_id}/add_item?product_id=#{async_remote_id}"
end
assert_expected_events_in_stream(
inventory_product_stream_name(async_remote_id),
[
Inventory::StockLevelMigrated.new(data: { id: async_remote_id, quantity: 10 }),
Inventory::StockLevelDecreased.new(data: { id: async_remote_id, quantity: 1 })
]
)

assert_changes -> { Product.find(fearless_id).stock_level }, from: 10, to: 8 do
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/remove_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
end
assert_expected_events_in_stream(
inventory_product_stream_name(fearless_id),
[
Inventory::StockLevelMigrated.new(data: { id: fearless_id, quantity: 10 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelIncreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 })
]
)

get "/orders/#{order_id}/edit"
assert_remove_buttons_visible(async_remote_id, fearless_id, order_id)

Expand Down Expand Up @@ -323,4 +341,18 @@ def apply_discount_10_percent(order_id)

post "/orders/#{order_id}/update_discount?amount=10"
end

def inventory_product_stream_name(product_id)
"Inventory::Product$#{product_id}"
end

def assert_expected_events_in_stream(stream_name, expected, event_store: Rails.configuration.event_store)
actual =
event_store
.read
.stream(stream_name)
.map { |event| { data: event.data, type: event.event_type } }
expected = expected.map { |event| { data: event.data, type: event.class.to_s } }
assert_equal(expected, actual)
end
end

0 comments on commit c727ef1

Please sign in to comment.