Skip to content

Commit

Permalink
Introduce a new write model alongside the old one
Browse files Browse the repository at this point in the history
The idea for plugging in a new write model is that we want to begin its
stream with migration event, that's why we check whether migration event
is already in the stream

We also use an advisory lock to avoid race conditions when adding the
migration event.

New tests were added to check whether the logic of
Inventory::ProductService works correctly for all 3 cases.
  • Loading branch information
stolarczykt authored and lukaszreszke committed Sep 25, 2024
1 parent 4478999 commit 0ed66a8
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 8 deletions.
2 changes: 1 addition & 1 deletion rails_application/app/controllers/supplies_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ def new
end

def create
Inventory::ProductService.new.supply(params[:product_id], params[:quantity].to_i)
Inventory::ProductService.new.supply(params[:product_id].to_i, params[:quantity].to_i)
redirect_to products_path, notice: "Stock level changed"
end
end
8 changes: 8 additions & 0 deletions rails_application/app/models/inventory/product.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ def withdraw(quantity)
apply(StockLevelDecreased.new(data: { id:, quantity: }))
end

def migration_event(quantity)
apply(StockLevelMigrated.new(data: { id:, quantity: }))
end

on StockLevelIncreased do |event|
@stock_level += event.data[:quantity]
end

on StockLevelDecreased do |event|
@stock_level -= event.data[:quantity]
end

on StockLevelMigrated do |event|
@stock_level = event.data[:quantity]
end
end
end
71 changes: 64 additions & 7 deletions rails_application/app/models/inventory/product_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,77 @@
module Inventory
class ProductService

def initialize
@repository = Infra::AggregateRootRepository.new(event_store)
end

def decrement_stock_level(product_id)
product = ::Product.find(product_id)
product.decrement!(:stock_level)
ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do
product = ::Product.find(product_id)
product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a

if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" }
with_inventory_product(product_id) do |aggregate|
aggregate.withdraw(1)
end
else
with_inventory_product(product_id) do |aggregate|
aggregate.migration_event(product.stock_level)
aggregate.withdraw(1)
end
end
product.decrement!(:stock_level)
end
end

def increment_stock_level(product_id)
product = ::Product.find(product_id)
product.increment!(:stock_level)
ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do
product = ::Product.find(product_id)
product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a

if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" }
with_inventory_product(product_id) do |aggregate|
aggregate.supply(1)
end
else
with_inventory_product(product_id) do |aggregate|
aggregate.migration_event(product.stock_level)
aggregate.supply(1)
end
end
product.increment!(:stock_level)
end
end

def supply(product_id, quantity)
product = ::Product.find(product_id)
product.stock_level == nil ? product.stock_level = quantity : product.stock_level += quantity
product.save!
ApplicationRecord.with_advisory_lock("change_stock_level_for_#{product_id}") do
product = ::Product.find(product_id)
product.stock_level == nil ? product.stock_level = quantity : product.stock_level += quantity
product_stream = event_store.read.stream("Inventory::Product$#{product_id}").to_a

if product_stream.any? { |event| event.event_type == "Inventory::StockLevelMigrated" }
with_inventory_product(product_id) do |aggregate|
aggregate.supply(quantity)
end
else
with_inventory_product(product_id) do |aggregate|
aggregate.migration_event(product.stock_level)
end
end
product.save!
end
end

private

def event_store
Rails.configuration.event_store
end

def with_inventory_product(product_id)
@repository.with_aggregate(Inventory::Product, product_id) do |product|
yield(product)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# frozen_string_literal: true

module Inventory
StockLevelMigrated = Class.new(Infra::Event)
end

86 changes: 86 additions & 0 deletions rails_application/test/integration/orders_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,31 @@ def test_happy_path
assert_changes -> { Product.find(async_remote_id).stock_level }, from: 10, to: 9 do
post "/orders/#{order_id}/add_item?product_id=#{async_remote_id}"
end
assert_expected_events_in_stream(
inventory_product_stream_name(async_remote_id),
[
Inventory::StockLevelMigrated.new(data: { id: async_remote_id, quantity: 10 }),
Inventory::StockLevelDecreased.new(data: { id: async_remote_id, quantity: 1 })
]
)

assert_changes -> { Product.find(fearless_id).stock_level }, from: 10, to: 8 do
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/remove_item?product_id=#{fearless_id}"
post "/orders/#{order_id}/add_item?product_id=#{fearless_id}"
end
assert_expected_events_in_stream(
inventory_product_stream_name(fearless_id),
[
Inventory::StockLevelMigrated.new(data: { id: fearless_id, quantity: 10 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelIncreased.new(data: { id: fearless_id, quantity: 1 }),
Inventory::StockLevelDecreased.new(data: { id: fearless_id, quantity: 1 })
]
)

get "/orders/#{order_id}/edit"
assert_remove_buttons_visible(async_remote_id, fearless_id, order_id)

Expand Down Expand Up @@ -74,6 +92,56 @@ def test_happy_path
verify_invoice_generation(order_id)
end

def test_cover_product_created_without_supply_is_migrated_through_add_item
order_id = new_order.id
post "/products", params: { product: { name: 'Async Remote', price: 39, vat_rate: 23, sku: sku = SecureRandom.uuid} }
async_remote = Product.find_by(sku:)
async_remote_id = async_remote.id

async_remote.stock_level = 10
async_remote.save!

post "/orders/#{order_id}/add_item?product_id=#{async_remote_id}"
post "/orders/#{order_id}/remove_item?product_id=#{async_remote_id}"

assert_expected_events_in_stream(
inventory_product_stream_name(async_remote_id),
[
Inventory::StockLevelMigrated.new(data: { id: async_remote_id, quantity: 10 }),
Inventory::StockLevelDecreased.new(data: { id: async_remote_id, quantity: 1 }),
Inventory::StockLevelIncreased.new(data: { id: async_remote_id, quantity: 1 }),
]
)
end

def test_cover_product_created_without_supply_is_migrated_through_remove_item
order = new_order
order_id = order.id
post "/products", params: { product: { name: 'Async Remote', price: 39, vat_rate: 23, sku: sku = SecureRandom.uuid} }
async_remote = Product.find_by(sku:)
async_remote_id = async_remote.id

async_remote.stock_level = 10
async_remote.save!

order.add_item(async_remote)
order.save!

assert event_store.read.stream(inventory_product_stream_name(async_remote_id)).to_a.empty?

post "/orders/#{order_id}/remove_item?product_id=#{async_remote_id}"

assert_expected_events_in_stream(
inventory_product_stream_name(async_remote_id),
[
Inventory::StockLevelMigrated.new(data: { id: async_remote_id, quantity: 10 }),
Inventory::StockLevelIncreased.new(data: { id: async_remote_id, quantity: 1 }),
]
)
end



def test_expiring_orders
order_id = new_order.id
async_remote_id = register_product("Async Remote", 39, 10)
Expand Down Expand Up @@ -323,4 +391,22 @@ def apply_discount_10_percent(order_id)

post "/orders/#{order_id}/update_discount?amount=10"
end

def inventory_product_stream_name(product_id)
"Inventory::Product$#{product_id}"
end

def assert_expected_events_in_stream(stream_name, expected, event_store: Rails.configuration.event_store)
actual =
event_store
.read
.stream(stream_name)
.map { |event| { data: event.data, type: event.event_type } }
expected = expected.map { |event| { data: event.data, type: event.class.to_s } }
assert_equal(expected, actual)
end

def event_store
Rails.configuration.event_store
end
end

0 comments on commit 0ed66a8

Please sign in to comment.