Skip to content

Commit

Permalink
Add dedicated task executor for the task invoker
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Oct 27, 2022
1 parent e0e7120 commit baac040
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 285 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.api.async.AsyncTaskExecutor;
import org.flowable.common.engine.api.async.AsyncTaskInvoker;
import org.flowable.common.engine.impl.async.AsyncTaskExecutorConfiguration;
import org.flowable.common.engine.impl.async.DefaultAsyncTaskExecutor;
import org.flowable.common.engine.impl.async.DefaultAsyncTaskInvoker;
import org.flowable.common.engine.impl.history.HistoryLevel;
Expand All @@ -48,12 +49,12 @@
public class ServiceTaskWithFuturesNoQueueCapacityTest extends FlowableCmmnTestCase {

protected AsyncTaskInvoker originalAsyncTaskInvoker;
protected AsyncTaskExecutor originalAsyncTaskExecutor;
protected AsyncTaskExecutor originalAsyncTaskInvokerTaskExecutor;

@Before
public void setUp() {
this.originalAsyncTaskInvoker = this.cmmnEngineConfiguration.getAsyncTaskInvoker();
this.originalAsyncTaskExecutor = this.cmmnEngineConfiguration.getAsyncTaskExecutor();
this.originalAsyncTaskInvokerTaskExecutor = this.cmmnEngineConfiguration.getAsyncTaskInvokerTaskExecutor();
}

@After
Expand All @@ -62,13 +63,13 @@ public void tearDown() {
this.cmmnEngineConfiguration.setAsyncTaskInvoker(this.originalAsyncTaskInvoker);
}

AsyncTaskExecutor currentAsyncTaskExecutor = this.cmmnEngineConfiguration.getAsyncTaskExecutor();
AsyncTaskExecutor currentAsyncTaskExecutor = this.cmmnEngineConfiguration.getAsyncTaskInvokerTaskExecutor();

if (this.originalAsyncTaskExecutor != null) {
this.cmmnEngineConfiguration.setAsyncTaskExecutor(this.originalAsyncTaskExecutor);
if (this.originalAsyncTaskInvokerTaskExecutor != null) {
this.cmmnEngineConfiguration.setAsyncTaskInvokerTaskExecutor(this.originalAsyncTaskInvokerTaskExecutor);
}

if (this.originalAsyncTaskExecutor != currentAsyncTaskExecutor) {
if (this.originalAsyncTaskInvokerTaskExecutor != currentAsyncTaskExecutor) {
// If they are different shut down the current one
currentAsyncTaskExecutor.shutdown();
}
Expand All @@ -78,12 +79,14 @@ public void tearDown() {
@CmmnDeployment
public void testDelegateExpression() {

DefaultAsyncTaskExecutor asyncTaskExecutor = new DefaultAsyncTaskExecutor();
asyncTaskExecutor.setCorePoolSize(4);
asyncTaskExecutor.setMaxPoolSize(4);
asyncTaskExecutor.setQueueSize(1);
AsyncTaskExecutorConfiguration executorConfiguration = new AsyncTaskExecutorConfiguration();
executorConfiguration.setCorePoolSize(4);
executorConfiguration.setMaxPoolSize(4);
executorConfiguration.setQueueSize(1);
executorConfiguration.setThreadNamePrefix("flowable-async-task-invoker-thread-");
DefaultAsyncTaskExecutor asyncTaskExecutor = new DefaultAsyncTaskExecutor(executorConfiguration);
asyncTaskExecutor.start();
cmmnEngineConfiguration.setAsyncTaskExecutor(asyncTaskExecutor);
cmmnEngineConfiguration.setAsyncTaskInvokerTaskExecutor(asyncTaskExecutor);
cmmnEngineConfiguration.setAsyncTaskInvoker(new DefaultAsyncTaskInvoker(asyncTaskExecutor));

String currentThreadName = Thread.currentThread().getName();
Expand All @@ -110,10 +113,10 @@ public void testDelegateExpression() {
"executionThread7", "executionThread8", "executionThread9"
)
.containsValues(
"flowable-async-job-executor-thread-1",
"flowable-async-job-executor-thread-2",
"flowable-async-job-executor-thread-3",
"flowable-async-job-executor-thread-4",
"flowable-async-task-invoker-thread-1",
"flowable-async-task-invoker-thread-2",
"flowable-async-task-invoker-thread-3",
"flowable-async-task-invoker-thread-4",
currentThreadName
);
}
Expand All @@ -127,12 +130,12 @@ public void testDelegateExpressionWithRejectAsyncTaskInvoker() {
asyncTaskExecutor.setMaxPoolSize(4);
asyncTaskExecutor.setQueueSize(1);
asyncTaskExecutor.start();
cmmnEngineConfiguration.setAsyncTaskExecutor(asyncTaskExecutor);
cmmnEngineConfiguration.setAsyncTaskInvokerTaskExecutor(asyncTaskExecutor);
cmmnEngineConfiguration.setAsyncTaskInvoker(new AsyncTaskInvoker() {

@Override
public <T> CompletableFuture<T> submit(Callable<T> task) {
return cmmnEngineConfiguration.getAsyncTaskExecutor().submit(task);
return cmmnEngineConfiguration.getAsyncTaskInvokerTaskExecutor().submit(task);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ public void testDelegateExpressionWithFutureJavaDelegate() {
assertThat(historicVariables.get("executionThreadName1"))
.asInstanceOf(STRING)
.isNotEqualTo(currentThreadName)
.startsWith("flowable-async-job-executor-thread-");
.startsWith("flowable-async-task-invoker-thread-");

assertThat(historicVariables.get("executionThreadName2"))
.asInstanceOf(STRING)
.isNotEqualTo(currentThreadName)
// The executions should be done on different threads
.isNotEqualTo(historicVariables.get("executionThreadName1"))
.startsWith("flowable-async-job-executor-thread-");
.startsWith("flowable-async-task-invoker-thread-");
}
}

Expand Down Expand Up @@ -522,14 +522,14 @@ public void testClassWithFutureJavaDelegate() {
assertThat(historicVariables.get("executionThreadName1"))
.asInstanceOf(STRING)
.isNotEqualTo(currentThreadName)
.startsWith("flowable-async-job-executor-thread-");
.startsWith("flowable-async-task-invoker-thread-");

assertThat(historicVariables.get("executionThreadName2"))
.asInstanceOf(STRING)
.isNotEqualTo(currentThreadName)
// The executions should be done on different threads
.isNotEqualTo(historicVariables.get("executionThreadName1"))
.startsWith("flowable-async-job-executor-thread-");
.startsWith("flowable-async-task-invoker-thread-");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.common.engine.impl.async;

import java.time.Duration;

/**
* @author Filip Hrisafov
*/
public class AsyncTaskExecutorConfiguration {

/**
* The minimal number of threads that are kept alive in the thread pool for
* job execution
*/
protected int corePoolSize = 8;

/**
* The maximum number of threads that are kept alive in the thread pool for
* job execution
*/
protected int maxPoolSize = 8;

/**
* The time a thread used for job execution must be kept
* alive before it is destroyed. Default setting is 5 seconds. Having a non-default
* setting of 0 takes resources, but in the case of many job executions it
* avoids creating new threads all the time.
*/
protected Duration keepAlive = Duration.ofSeconds(5);

/**
* The size of the queue on which jobs to be executed are placed
*/
protected int queueSize = 2048;

/**
* Whether core threads can time out (which is needed to scale down the threads)
*/
protected boolean allowCoreThreadTimeout = true;

/**
* The time that is waited to gracefully shut down the
* thread pool used for job execution
*/
protected Duration awaitTerminationPeriod = Duration.ofSeconds(60);

/**
* The naming pattern for the thread pool threads.
*/
protected String threadPoolNamingPattern;

public int getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}

public int getMaxPoolSize() {
return maxPoolSize;
}

public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}

public Duration getKeepAlive() {
return keepAlive;
}

public void setKeepAlive(Duration keepAlive) {
this.keepAlive = keepAlive;
}

public int getQueueSize() {
return queueSize;
}

public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

public boolean isAllowCoreThreadTimeout() {
return allowCoreThreadTimeout;
}

public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
}

public Duration getAwaitTerminationPeriod() {
return awaitTerminationPeriod;
}

public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) {
this.awaitTerminationPeriod = awaitTerminationPeriod;
}

public String getThreadPoolNamingPattern() {
return threadPoolNamingPattern;
}

public void setThreadPoolNamingPattern(String threadPoolNamingPattern) {
this.threadPoolNamingPattern = threadPoolNamingPattern;
}

public void setThreadNamePrefix(String prefix) {
if (prefix == null) {
this.threadPoolNamingPattern = "%d";
} else {
this.threadPoolNamingPattern = prefix + "%d";
}
}
}
Loading

0 comments on commit baac040

Please sign in to comment.