Skip to content

Commit

Permalink
Limit automatic retries to one
Browse files Browse the repository at this point in the history
  • Loading branch information
marlena-b committed Sep 27, 2024
1 parent c135faf commit 5ac22b0
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 64 deletions.
18 changes: 10 additions & 8 deletions ecommerce/pricing/lib/pricing/services.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,29 @@ def call(command)
end

class OnCalculateTotalValue
include Infra::Retry

def initialize(event_store)
@repository = Infra::AggregateRootRepository.new(event_store)
@event_store = event_store
end

def call(command)
@repository.with_aggregate(Offer, command.aggregate_id) do |order|
order.calculate_total_value(PricingCatalog.new(@event_store), time_promotions_discount)
with_retry do
@repository.with_aggregate(Offer, command.aggregate_id) do |order|
order.calculate_total_value(PricingCatalog.new(@event_store), time_promotions_discount)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end



def calculate_sub_amounts(command)
@repository.with_aggregate(Offer, command.aggregate_id) do |order|
order.calculate_sub_amounts(PricingCatalog.new(@event_store), time_promotions_discount)
with_retry do
@repository.with_aggregate(Offer, command.aggregate_id) do |order|
order.calculate_sub_amounts(PricingCatalog.new(@event_store), time_promotions_discount)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end

private
Expand Down
22 changes: 12 additions & 10 deletions ecommerce/processes/lib/processes/order_item_invoicing_process.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Processes
class OrderItemInvoicingProcess
include Infra::Retry

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
Expand All @@ -26,16 +28,16 @@ def call(event)
attr_reader :event_store, :command_bus

def build_state(event)
stream_name = "OrderInvoicingProcess$#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}"
past = event_store.read.stream(stream_name).to_a
last_stored = past.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past.each { |ev| state.call(ev) }
state.call(event)
with_retry do
stream_name = "OrderInvoicingProcess$#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}"
past = event_store.read.stream(stream_name).to_a
last_stored = past.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past.each { |ev| state.call(ev) }
state.call(event)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end

class ProcessState
Expand Down Expand Up @@ -84,4 +86,4 @@ def call
distributed_amounts
end
end
end
end
22 changes: 12 additions & 10 deletions ecommerce/processes/lib/processes/release_payment_process.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Processes
class ReleasePaymentProcess
include Infra::Retry

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
Expand All @@ -19,16 +21,16 @@ def release_payment(state)
attr_reader :command_bus, :event_store

def build_state(event)
stream_name = "PaymentProcess$#{event.data.fetch(:order_id)}"
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
with_retry do
stream_name = "PaymentProcess$#{event.data.fetch(:order_id)}"
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end

class ProcessState
Expand Down Expand Up @@ -60,4 +62,4 @@ def release?
end
end
end
end
end
13 changes: 8 additions & 5 deletions ecommerce/processes/lib/processes/reservation_process.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Processes
class ReservationProcess
include Infra::Retry

def initialize
@event_store = Configuration.event_store
@command_bus = Configuration.command_bus
Expand Down Expand Up @@ -72,12 +74,13 @@ def stream_name(order_id)

def build_state(event)
stream_name = stream_name(event.data.fetch(:order_id))
past_events = nil
begin
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
rescue RubyEventStore::WrongExpectedEventVersion
retry
with_retry do
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
end
rescue RubyEventStore::EventDuplicatedInStream
return
end
Expand Down
22 changes: 12 additions & 10 deletions ecommerce/processes/lib/processes/shipment_process.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Processes
class ShipmentProcess
include Infra::Retry

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
Expand Down Expand Up @@ -33,16 +35,16 @@ def authorize_shipment(state)
attr_reader :command_bus, :event_store

def build_state(event)
stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}"
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
with_retry do
stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}"
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end

class ProcessState
Expand Down Expand Up @@ -78,4 +80,4 @@ def authorize?
end
end
end
end
end
19 changes: 10 additions & 9 deletions ecommerce/processes/lib/processes/three_plus_one_free.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Processes
class ThreePlusOneFree
include Infra::Retry

def initialize(event_store, command_bus)
@event_store = event_store
Expand All @@ -25,16 +26,16 @@ def call(event)
private

def build_state(event)
stream_name = "ThreePlusOneFreeProcess$#{event.data.fetch(:order_id)}"
past_events = @event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
@event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new(event.data.fetch(:order_id)).tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
with_retry do
stream_name = "ThreePlusOneFreeProcess$#{event.data.fetch(:order_id)}"
past_events = @event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
@event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new(event.data.fetch(:order_id)).tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
end

def make_or_remove_free_product(state)
Expand Down
1 change: 1 addition & 0 deletions infra/lib/infra.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
require_relative "infra/event"
require_relative "infra/event_store"
require_relative "infra/process"
require_relative "infra/retry"
require_relative "infra/types"
require_relative "infra/testing"
9 changes: 9 additions & 0 deletions infra/lib/infra/retry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Infra
module Retry
def with_retry
yield
rescue RubyEventStore::WrongExpectedEventVersion
yield
end
end
end
36 changes: 36 additions & 0 deletions infra/test/retry_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require_relative "test_helper"

module Infra
class RetryTest < Minitest::Test
cover "Infra::Retry"

include Infra::Retry

def test_no_error
result = with_retry { true }
assert result
end

def test_retries_once
attempts = 0
with_retry do
attempts += 1
raise RubyEventStore::WrongExpectedEventVersion if attempts == 1
end

assert_equal 2, attempts
end

def test_fails_after_two_attempts
attempts = 0
assert_raises RubyEventStore::WrongExpectedEventVersion do
with_retry do
attempts += 1
raise RubyEventStore::WrongExpectedEventVersion
end
end

assert_equal 2, attempts
end
end
end
26 changes: 14 additions & 12 deletions rails_application/app/read_models/single_table_read_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def copy_handler(event, sequence_of_keys, column)
end

class ReadModelHandler
include Infra::Retry

def initialize(*args)
if args.present?
@event_store = args[0]
Expand All @@ -64,18 +66,18 @@ def concurrent_safely(event)
stream_name = "#{active_record_name}$#{record_id(event)}$#{event.event_type}"
read_scope = event_store.read.as_at.stream(stream_name)
begin
last_event = read_scope.last
return if last_event && last_event.timestamp > event.timestamp
ApplicationRecord.with_advisory_lock(active_record_name, record_id(event)) do
yield
event_store.link(
event.event_id,
stream_name: stream_name,
expected_version: last_event ? read_scope.to(last_event.event_id).count : -1
)
with_retry do
last_event = read_scope.last
return if last_event && last_event.timestamp > event.timestamp
ApplicationRecord.with_advisory_lock(active_record_name, record_id(event)) do
yield
event_store.link(
event.event_id,
stream_name: stream_name,
expected_version: last_event ? read_scope.to(last_event.event_id).count : -1
)
end
end
rescue RubyEventStore::WrongExpectedEventVersion
retry
rescue RubyEventStore::EventDuplicatedInStream
end
end
Expand Down Expand Up @@ -114,4 +116,4 @@ def call(event)

private
attr_reader :sequence_of_keys, :column
end
end

0 comments on commit 5ac22b0

Please sign in to comment.