Skip to content

Commit

Permalink
Introduce parallel rollout processing (eclipse-hawkbit#2248)
Browse files Browse the repository at this point in the history
* Introduce parallel rollout processing

Signed-off-by: Denislav Prinov <[email protected]>

* Moving the ThreadPoolTaskExecutor initialization in RolloutScheduler. Changing to previous default behaviour when the thread pool size is <=1

Signed-off-by: Denislav Prinov <[email protected]>

* Refactoring

Signed-off-by: Denislav Prinov <[email protected]>

* Refactoring based on review comments

Signed-off-by: Denislav Prinov <[email protected]>

* License header fix

Signed-off-by: Denislav Prinov <[email protected]>

---------

Signed-off-by: Denislav Prinov <[email protected]>
  • Loading branch information
denislavprinov authored Feb 6, 2025
1 parent 46caed1 commit 2d90737
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.eclipse.hawkbit.utils.TenantConfigHelper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -992,8 +993,8 @@ AutoCleanupScheduler autoCleanupScheduler(final SystemManagement systemManagemen
@Profile("!test")
@ConditionalOnProperty(prefix = "hawkbit.rollout.scheduler", name = "enabled", matchIfMissing = true)
RolloutScheduler rolloutScheduler(final SystemManagement systemManagement,
final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext) {
return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext);
final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext, @Value("${hawkbit.rollout.executor.thread-pool.size:1}") final int threadPoolSize) {
return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext, threadPoolSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hawkbit.repository.jpa.rollout;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class BlockWhenFullPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// Because queueCapacity=0 => SynchronousQueue
// This put(...) call blocks if both threads are busy,
// until a thread is free
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Interrupted while waiting to queue task", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.eclipse.hawkbit.repository.SystemManagement;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Scheduler to schedule the {@link RolloutHandler#handleAll()}. The
Expand All @@ -28,12 +29,16 @@ public class RolloutScheduler {
private final SystemManagement systemManagement;
private final RolloutHandler rolloutHandler;
private final SystemSecurityContext systemSecurityContext;
private final ThreadPoolTaskExecutor rolloutTaskExecutor;

public RolloutScheduler(
final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext) {
final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext,
final int threadPoolSize) {
this.systemManagement = systemManagement;
this.rolloutHandler = rolloutHandler;
this.systemSecurityContext = systemSecurityContext;
rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize);

}

/**
Expand All @@ -43,14 +48,58 @@ public RolloutScheduler(
@Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER)
public void runningRolloutScheduler() {
log.debug("rollout schedule checker has been triggered.");

// run this code in system code privileged to have the necessary permission to query and create entities.
// run this code in system code privileged to have the necessary
// permission to query and create entities.
systemSecurityContext.runAsSystem(() -> {
// workaround eclipselink that is currently not possible to execute a query without multi-tenancy if MultiTenant
// annotation is used. https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So, iterate through all tenants and execute the rollout
// check for each tenant separately.
systemManagement.forEachTenant(tenant -> rolloutHandler.handleAll());
// workaround eclipselink that is currently not possible to
// execute a query without multi-tenancy if MultiTenant
// annotation is used.
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So
// iterate through all tenants and execute the rollout check for
// each tenant seperately.

systemManagement.forEachTenant(tenant -> {
if (rolloutTaskExecutor == null) {
handleAll(tenant);
} else {
handleAllAsync(tenant);
}
});
return null;
});
}

private void handleAll(final String tenant) {
log.trace("Handling rollout for tenant: {}", tenant);
try {
rolloutHandler.handleAll();
} catch (Exception e) {
log.error("Error processing rollout for tenant {}", tenant, e);
}
}

private void handleAllAsync(final String tenant) {
rolloutTaskExecutor.submit(() -> systemSecurityContext.runAsSystemAsTenant(() -> {
handleAll(tenant);
return null;
}, tenant));

}

private ThreadPoolTaskExecutor threadPoolTaskExecutor (final int threadPoolSize) {
if (threadPoolSize <= 1) {
return null;
}

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolSize);
executor.setMaxPoolSize(threadPoolSize);
executor.setQueueCapacity(0); // forces a Synchronous Queue
// This policy will block the submitter until a worker thread is free
executor.setRejectedExecutionHandler(new BlockWhenFullPolicy());
executor.setThreadNamePrefix("rollout-exec-");
executor.initialize();
return executor;
}

}

0 comments on commit 2d90737

Please sign in to comment.