Skip to content

Commit

Permalink
Factored our error details processing; added to signal_with_start_wor…
Browse files Browse the repository at this point in the history
…kflow_execution
  • Loading branch information
cvanderschuere committed Feb 25, 2022
1 parent 937482e commit 5f0d69c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
34 changes: 27 additions & 7 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ def start_workflow_execution(

client.start_workflow_execution(request)
rescue ::GRPC::AlreadyExists => e
cast_error = e.to_rpc_status&.details&.map do |any_error|
next unless any_error.type_url == 'type.googleapis.com/temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure'

any_error.unpack(Temporal::Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure)
end&.compact&.first
error_details_from(e).each do |error|
case error
when Temporal::Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure
raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, error.run_id)
end
end

raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, cast_error&.run_id)
raise Temporal::ApiError, e.details # unhandled error type
end

SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL = 30
Expand All @@ -138,7 +139,7 @@ def get_workflow_execution_history(
event_type: :all,
timeout: nil
)
if wait_for_new_event
if wait_for_new_event
if timeout.nil?
# This is an internal error. Wrappers should enforce this.
raise "You must specify a timeout when wait_for_new_event = true."
Expand Down Expand Up @@ -366,7 +367,17 @@ def signal_with_start_workflow_execution(
request.workflow_id_reuse_policy = policy
end


client.signal_with_start_workflow_execution(request)
rescue ::GRPC::AlreadyExists => e
error_details_from(e).each do |error|
case error
when Temporal::Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure
raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, error.run_id)
end
end

raise Temporal::ApiError, e.details # unhandled error type
end

def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
Expand Down Expand Up @@ -488,6 +499,15 @@ def client
def can_poll?
@poll
end

def error_details_from(error)
error.to_rpc_status&.details&.map do |any_error|
type = Google::Protobuf::DescriptorPool.generated_pool.lookup any_error.type_url.split('/').last
next if type.nil?

any_error.unpack type.msgclass
end&.compact
end
end
end
end
24 changes: 22 additions & 2 deletions spec/unit/lib/temporal/grpc_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@
Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx')
end

before { allow(grpc_stub).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) }

it 'starts a workflow with a signal with scalar arguments' do
allow(grpc_stub).to receive(:signal_with_start_workflow_execution).and_return(temporal_response)
subject.signal_with_start_workflow_execution(
namespace: namespace,
workflow_id: workflow_id,
Expand Down Expand Up @@ -79,6 +78,27 @@
expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"')
end
end

it 'provides the existing run_id when the workflow is already started' do
allow(grpc_stub).to receive(:signal_with_start_workflow_execution).and_raise(already_started_error)

expect do
subject.signal_with_start_workflow_execution(
namespace: namespace,
workflow_id: workflow_id,
workflow_name: 'workflow_name',
task_queue: 'task_queue',
input: ['foo'],
execution_timeout: 1,
run_timeout: 2,
task_timeout: 3,
signal_name: 'the question',
signal_input: 'what do you get if you multiply six by nine?',
)
end.to raise_error(Temporal::WorkflowExecutionAlreadyStartedFailure) do |e|
expect(e.run_id).to eql(run_id)
end
end
end

describe "#list_namespaces" do
Expand Down

0 comments on commit 5f0d69c

Please sign in to comment.