Skip to content

Commit

Permalink
Fix Error handling and add test coverage for Thrift2ProtoAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
natemort committed Nov 7, 2024
1 parent b5d8ef8 commit 0f000cd
Show file tree
Hide file tree
Showing 5 changed files with 1,085 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import io.grpc.StatusRuntimeException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -203,7 +204,7 @@ public RestartWorkflowExecutionResponse RestartWorkflowExecution(
RestartWorkflowExecutionRequest restartRequest)
throws BadRequestError, ServiceBusyError, DomainNotActiveError, LimitExceededError,
EntityNotExistsError, ClientVersionNotSupportedError, TException {
throw new IllegalArgumentException("unimplemented");
throw new UnsupportedOperationException("unimplemented");
}

@Override
Expand Down Expand Up @@ -851,7 +852,7 @@ public void DeprecateDomain(
public void RestartWorkflowExecution(
RestartWorkflowExecutionRequest restartRequest, AsyncMethodCallback resultHandler)
throws TException {
throw new IllegalArgumentException("unimplemented");
throw new UnsupportedOperationException("unimplemented");
}

@Override
Expand Down Expand Up @@ -880,7 +881,7 @@ public void StartWorkflowExecutionAsync(
resultHandler.onComplete(
ResponseMapper.startWorkflowExecutionAsyncResponse(response));
} catch (Exception e) {
resultHandler.onError(e);
handleException(resultHandler, e);
}
},
ForkJoinPool.commonPool());
Expand Down Expand Up @@ -1003,7 +1004,7 @@ public void SignalWorkflowExecution(
com.uber.cadence.api.v1.SignalWorkflowExecutionResponse response = resultFuture.get();
resultHandler.onComplete(null);
} catch (Exception e) {
resultHandler.onError(e);
handleException(resultHandler, e);
}
},
ForkJoinPool.commonPool());
Expand All @@ -1025,7 +1026,7 @@ public void SignalWithStartWorkflowExecutionAsync(
SignalWithStartWorkflowExecutionAsyncRequest signalWithStartRequest,
AsyncMethodCallback resultHandler)
throws TException {
throw new IllegalArgumentException("unimplemented");
throw new UnsupportedOperationException("unimplemented");
}

@Override
Expand Down Expand Up @@ -1199,7 +1200,7 @@ public void StartWorkflowExecutionWithTimeout(
com.uber.cadence.api.v1.StartWorkflowExecutionResponse response = resultFuture.get();
resultHandler.onComplete(ResponseMapper.startWorkflowExecutionResponse(response));
} catch (Exception e) {
resultHandler.onError(e);
handleException(resultHandler, e);
}
},
ForkJoinPool.commonPool());
Expand Down Expand Up @@ -1230,7 +1231,7 @@ public void StartWorkflowExecutionAsyncWithTimeout(
resultHandler.onComplete(
ResponseMapper.startWorkflowExecutionAsyncResponse(response));
} catch (Exception e) {
resultHandler.onError(e);
handleException(resultHandler, e);
}
},
ForkJoinPool.commonPool());
Expand Down Expand Up @@ -1276,7 +1277,7 @@ public void GetWorkflowExecutionHistoryWithTimeout(
resultHandler.onComplete(
ResponseMapper.getWorkflowExecutionHistoryResponse(response));
} catch (Exception e) {
resultHandler.onError(e);
handleException(resultHandler, e);
}
},
ForkJoinPool.commonPool());
Expand All @@ -1293,4 +1294,13 @@ public void SignalWorkflowExecutionWithTimeout(
throws TException {
throw new UnsupportedOperationException("not implemented");
}

private void handleException(AsyncMethodCallback callback, Exception exception) {
if (exception instanceof ExecutionException
&& exception.getCause() instanceof StatusRuntimeException) {
callback.onError(ErrorMapper.Error(((StatusRuntimeException) exception.getCause())));
} else {
callback.onError(exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.uber.cadence.internal.compatibility.proto.serviceclient;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.uber.cadence.api.v1.*;
Expand Down Expand Up @@ -53,7 +54,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class GrpcServiceStubs implements IGrpcServiceStubs {
@VisibleForTesting
public final class GrpcServiceStubs implements IGrpcServiceStubs {

private static final Logger log = LoggerFactory.getLogger(GrpcServiceStubs.class);
private static final Metadata.Key<String> LIBRARY_VERSION_HEADER_KEY =
Expand Down Expand Up @@ -91,7 +93,8 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
private final MetaAPIGrpc.MetaAPIBlockingStub metaBlockingStub;
private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;

GrpcServiceStubs(ClientOptions options) {
@VisibleForTesting
public GrpcServiceStubs(ClientOptions options, boolean enableLogging) {
this.options = options;
if (options.getGRPCChannel() != null) {
this.channel = options.getGRPCChannel();
Expand Down Expand Up @@ -124,7 +127,7 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
MetadataUtils.newAttachHeadersInterceptor(headers),
newOpenTelemetryInterceptor(),
newOpenTracingInterceptor(options.getTracer()));
if (log.isTraceEnabled()) {
if (log.isTraceEnabled() || enableLogging) {
interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
}
if (options.getAuthProvider() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public interface IGrpcServiceStubs {
/** Returns gRPC stubs with default options domain service. */
static IGrpcServiceStubs newInstance() {
return new GrpcServiceStubs(
ClientOptions.newBuilder().setPort(DEFAULT_LOCAL_CADENCE_SERVER_GRPC_PORT).build());
ClientOptions.newBuilder().setPort(DEFAULT_LOCAL_CADENCE_SERVER_GRPC_PORT).build(), false);
}

/** Returns gRPC stubs with given options domain service. */
static IGrpcServiceStubs newInstance(ClientOptions options) {
return new GrpcServiceStubs(options);
return new GrpcServiceStubs(options, false);
}

ClientOptions getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,65 @@ public final class ProtoObjects {
public static final UpdateDomainResponse UPDATE_DOMAIN_RESPONSE =
UpdateDomainResponse.newBuilder().setDomain(DOMAIN).build();

public static final GetSearchAttributesRequest GET_SEARCH_ATTRIBUTES_REQUEST =
GetSearchAttributesRequest.getDefaultInstance();

public static final RegisterDomainResponse REGISTER_DOMAIN_RESPONSE =
RegisterDomainResponse.getDefaultInstance();

public static final DeprecateDomainResponse DEPRECATE_DOMAIN_RESPONSE =
DeprecateDomainResponse.getDefaultInstance();

public static final SignalWorkflowExecutionResponse SIGNAL_WORKFLOW_EXECUTION_RESPONSE =
SignalWorkflowExecutionResponse.getDefaultInstance();

public static final RequestCancelWorkflowExecutionResponse
REQUEST_CANCEL_WORKFLOW_EXECUTION_RESPONSE =
RequestCancelWorkflowExecutionResponse.getDefaultInstance();

public static final TerminateWorkflowExecutionResponse TERMINATE_WORKFLOW_EXECUTION_RESPONSE =
TerminateWorkflowExecutionResponse.getDefaultInstance();

public static final GetClusterInfoRequest GET_CLUSTER_INFO_REQUEST =
GetClusterInfoRequest.getDefaultInstance();

public static final RespondDecisionTaskFailedResponse RESPOND_DECISION_TASK_FAILED_RESPONSE =
RespondDecisionTaskFailedResponse.getDefaultInstance();

public static final RespondActivityTaskCompletedResponse
RESPOND_ACTIVITY_TASK_COMPLETED_RESPONSE =
RespondActivityTaskCompletedResponse.getDefaultInstance();

public static final RespondActivityTaskCompletedByIDResponse
RESPOND_ACTIVITY_TASK_COMPLETED_BY_ID_RESPONSE =
RespondActivityTaskCompletedByIDResponse.getDefaultInstance();

public static final RespondActivityTaskFailedResponse RESPOND_ACTIVITY_TASK_FAILED_RESPONSE =
RespondActivityTaskFailedResponse.getDefaultInstance();

public static final RespondActivityTaskFailedByIDResponse
RESPOND_ACTIVITY_TASK_FAILED_BY_ID_RESPONSE =
RespondActivityTaskFailedByIDResponse.getDefaultInstance();

public static final RespondActivityTaskCanceledResponse RESPOND_ACTIVITY_TASK_CANCELED_RESPONSE =
RespondActivityTaskCanceledResponse.getDefaultInstance();

public static final RespondActivityTaskCanceledByIDResponse
RESPOND_ACTIVITY_TASK_CANCELED_BY_ID_RESPONSE =
RespondActivityTaskCanceledByIDResponse.getDefaultInstance();

public static final RespondQueryTaskCompletedResponse RESPOND_QUERY_TASK_COMPLETED_RESPONSE =
RespondQueryTaskCompletedResponse.getDefaultInstance();

public static final ResetStickyTaskListResponse RESET_STICKY_TASK_LIST_RESPONSE =
ResetStickyTaskListResponse.getDefaultInstance();

public static final RefreshWorkflowTasksRequest REFRESH_WORKFLOW_TASKS_REQUEST =
RefreshWorkflowTasksRequest.getDefaultInstance();

public static final RefreshWorkflowTasksResponse REFRESH_WORKFLOW_TASKS_RESPONSE =
RefreshWorkflowTasksResponse.getDefaultInstance();

private ProtoObjects() {}

private static Payload payload(String value) {
Expand Down
Loading

0 comments on commit 0f000cd

Please sign in to comment.