Skip to content

Commit

Permalink
Merge Nexus into Master (#2270)
Browse files Browse the repository at this point in the history
Nexus Support to the Java SDK
  • Loading branch information
Quinn-With-Two-Ns authored Oct 15, 2024
1 parent af7b5b7 commit eff3ca2
Show file tree
Hide file tree
Showing 158 changed files with 10,784 additions and 190 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.2.0-alpha'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)

Expand Down
2 changes: 2 additions & 0 deletions docker/github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ services:
- "6934:6934"
- "6935:6935"
- "6939:6939"
- "7243:7243"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ENABLE_ES=true"
- "ES_SEEDS=elasticsearch"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
- "FRONTEND_HTTP_PORT=7243"
depends_on:
- cassandra
- elasticsearch
Expand Down
10 changes: 9 additions & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ history.MaxBufferedQueryCount:
worker.buildIdScavengerEnabled:
- value: true
worker.removableBuildIdDurationSinceDefault:
- value: 1
- value: 1
system.enableNexus:
- value: true
component.nexusoperations.callback.endpoint.template:
- value: http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
AllowInsecure: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.temporal.kotlin.TemporalDsl

/**
* @see NexusOperationOptions
*/
inline fun NexusOperationOptions(
options: @TemporalDsl NexusOperationOptions.Builder.() -> Unit
): NexusOperationOptions {
return NexusOperationOptions.newBuilder().apply(options).build()
}

/**
* Create a new instance of [NexusOperationOptions], optionally overriding some of its properties.
*/
inline fun NexusOperationOptions.copy(
overrides: @TemporalDsl NexusOperationOptions.Builder.() -> Unit
): NexusOperationOptions {
return toBuilder().apply(overrides).build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.temporal.kotlin.TemporalDsl

/**
* @see NexusServiceOptions
*/
inline fun NexusServiceOptions(
options: @TemporalDsl NexusServiceOptions.Builder.() -> Unit
): NexusServiceOptions {
return NexusServiceOptions.newBuilder().apply(options).build()
}

/**
* Create a new instance of [NexusServiceOptions], optionally overriding some of its properties.
*/
inline fun NexusServiceOptions.copy(
overrides: @TemporalDsl NexusServiceOptions.Builder.() -> Unit
): NexusServiceOptions {
return toBuilder().apply(overrides).build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.nexus

import io.temporal.workflow.NexusOperationOptions
import org.junit.Assert.assertEquals
import org.junit.Test
import java.time.Duration

class NexusOperationOptionsExtTest {

@Test
fun `OperationOptions DSL should be equivalent to builder`() {
val dslOperationOptions = NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(1))
}

val builderOperationOptions = NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(1))
.build()

assertEquals(builderOperationOptions, dslOperationOptions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.nexus

import io.temporal.workflow.NexusOperationOptions
import io.temporal.workflow.NexusServiceOptions
import org.junit.Assert.assertEquals
import org.junit.Test
import java.time.Duration

class NexusServiceOptionsExtTest {

@Test
fun `ServiceOptions DSL should be equivalent to builder`() {
val dslServiceOptions = NexusServiceOptions {
setEndpoint("TestEndpoint")
setOperationOptions(
NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(1))
}
)
setOperationMethodOptions(
mapOf(
"test" to NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofMinutes(2))
}
)
)
}

val builderServiceOptions = NexusServiceOptions.newBuilder()
.setEndpoint("TestEndpoint")
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(1))
.build()
)
.setOperationMethodOptions(
mapOf(
"test" to NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(2))
.build()
)
)
.build()

assertEquals(builderServiceOptions, dslServiceOptions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow

import io.nexusrpc.Operation
import io.nexusrpc.Service
import io.nexusrpc.handler.OperationContext
import io.nexusrpc.handler.OperationHandler
import io.nexusrpc.handler.OperationImpl
import io.nexusrpc.handler.OperationStartDetails
import io.nexusrpc.handler.ServiceImpl
import io.nexusrpc.handler.SynchronousOperationFunction
import io.temporal.client.WorkflowClientOptions
import io.temporal.client.WorkflowOptions
import io.temporal.common.converter.DefaultDataConverter
import io.temporal.common.converter.JacksonJsonPayloadConverter
import io.temporal.common.converter.KotlinObjectMapperFactory
import io.temporal.internal.async.FunctionWrappingUtil
import io.temporal.internal.sync.AsyncInternal
import io.temporal.testing.internal.SDKTestWorkflowRule
import org.junit.Assert.assertTrue
import org.junit.Rule
import org.junit.Test
import java.time.Duration

class KotlinAsyncNexusTest {

@Rule
@JvmField
var testWorkflowRule: SDKTestWorkflowRule = SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(WorkflowImpl::class.java)
.setNexusServiceImplementation(TestNexusServiceImpl())
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setDataConverter(DefaultDataConverter(JacksonJsonPayloadConverter(KotlinObjectMapperFactory.new())))
.build()
)
.build()

@Service
interface TestNexusService {
@Operation
fun operation(): String?
}

@ServiceImpl(service = TestNexusService::class)
class TestNexusServiceImpl {
@OperationImpl
fun operation(): OperationHandler<Void, String> {
// Implemented inline
return OperationHandler.sync<Void, String>(
SynchronousOperationFunction<Void, String> { ctx: OperationContext, details: OperationStartDetails, _: Void? -> "Hello Kotlin" }
)
}
}

@WorkflowInterface
interface TestWorkflow {
@WorkflowMethod
fun execute()
}

class WorkflowImpl : TestWorkflow {
override fun execute() {
val nexusService = Workflow.newNexusServiceStub(
TestNexusService::class.java,
NexusServiceOptions {
setOperationOptions(
NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofSeconds(10))
}
)
}
)
assertTrue(
"This has to be true to make Async.function(nexusService::operation) work correctly as expected",
AsyncInternal.isAsync(nexusService::operation)
)
assertTrue(
"This has to be true to make Async.function(nexusService::operation) work correctly as expected",
AsyncInternal.isAsync(FunctionWrappingUtil.temporalJavaFunctionalWrapper(nexusService::operation))
)
Async.function(nexusService::operation).get()
}
}

@Test
fun asyncNexusWorkflowTest() {
val client = testWorkflowRule.workflowClient
val options = WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.taskQueue).build()
val workflowStub = client.newWorkflowStub(TestWorkflow::class.java, options)
workflowStub.execute()
}
}
1 change: 1 addition & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api project(':temporal-serviceclient')
api "com.google.code.gson:gson:$gsonVersion"
api "io.micrometer:micrometer-core"
api "io.nexusrpc:nexus-sdk:$nexusVersion"

implementation "com.google.guava:guava:$guavaVersion"
api "com.fasterxml.jackson.core:jackson-databind"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.RootWorkflowClientInvoker;
import io.temporal.internal.client.WorkerFactoryRegistry;
import io.temporal.internal.client.WorkflowClientInternal;
Expand Down Expand Up @@ -567,4 +568,16 @@ public void registerWorkerFactory(WorkerFactory workerFactory) {
public void deregisterWorkerFactory(WorkerFactory workerFactory) {
workerFactoryRegistry.deregister(workerFactory);
}

@Override
public WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow) {
enforceNonWorkflowThread();
WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START_NEXUS, request);
try {
workflow.apply();
return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
} finally {
WorkflowInvocationHandler.closeAsyncInvocation();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ private static String getMessage(WorkflowExecution execution, String workflowTyp
+ execution.getWorkflowId()
+ "', runId='"
+ execution.getRunId()
+ (workflowType == null ? "" : "', workflowType='" + workflowType + '\'')
+ '}';
+ (workflowType == null ? "" : "', workflowType='" + workflowType + '\'');
}
}
Loading

0 comments on commit eff3ca2

Please sign in to comment.