Skip to content

Commit

Permalink
Merge pull request #3362 from jsonwan/github_perf/cron
Browse files Browse the repository at this point in the history
perf: 定时任务服务支持优雅停机 #2852
  • Loading branch information
jsonwan authored Dec 27, 2024
2 parents 49729bb + 251e261 commit 09deee6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -43,6 +45,8 @@ public class LoadCronJobRunner implements CommandLineRunner {
private final CronJobLoadingService cronJobLoadingService;
private final ThreadPoolExecutor crontabInitRunnerExecutor;

private Future<?> loadCronJobFuture;

@Autowired
public LoadCronJobRunner(CronJobLoadingService cronJobLoadingService,
@Qualifier("crontabInitRunnerExecutor") ThreadPoolExecutor crontabInitRunnerExecutor) {
Expand All @@ -52,9 +56,18 @@ public LoadCronJobRunner(CronJobLoadingService cronJobLoadingService,

@Override
public void run(String... args) {
crontabInitRunnerExecutor.submit(() -> {
loadCronJobFuture = crontabInitRunnerExecutor.submit(() -> {
log.info("loadCronToQuartzOnStartup");
cronJobLoadingService.loadAllCronJob();
});
}

@PreDestroy
public void destroy() {
log.info("destroy LoadCronJobRunner");
if (loadCronJobFuture != null) {
boolean result = loadCronJobFuture.cancel(true);
log.info("loadCronJobFuture cancel result:{}", result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface CronJobBatchLoadService {
* @param limit 一批定时任务的数量
* @return 加载结果数据
*/
CronLoadResult batchLoadCronToQuartz(int start, int limit);
CronLoadResult batchLoadCronToQuartz(int start, int limit) throws InterruptedException;

@Data
class CronLoadResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public CronJobBatchLoadServiceImpl(CronJobService cronJobService) {

@Override
@JobTransactional(transactionManager = "jobCrontabTransactionManager", timeout = 30)
public CronLoadResult batchLoadCronToQuartz(int start, int limit) {
public CronLoadResult batchLoadCronToQuartz(int start, int limit) throws InterruptedException {
int successNum = 0;
int failedNum = 0;
List<CronJobBasicInfoDTO> failedCronList = new ArrayList<>();
List<CronJobBasicInfoDTO> cronJobBasicInfoList = cronJobService.listEnabledCronBasicInfoForUpdate(start, limit);
for (CronJobBasicInfoDTO cronJobBasicInfoDTO : cronJobBasicInfoList) {
checkInterrupt();
boolean result = false;
try {
result = cronJobService.addJobToQuartz(
Expand Down Expand Up @@ -93,4 +94,10 @@ public CronLoadResult batchLoadCronToQuartz(int start, int limit) {
cronLoadResult.setFailedCronList(failedCronList);
return cronLoadResult;
}

private void checkInterrupt() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("batchLoadCronToQuartz thread is interrupted, exit");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void loadAllCronJob() {
}
loadingCronToQuartz = true;
loadAllCronJobToQuartz();
} catch (InterruptedException e) {
log.info("loadAllCronJob interrupted, application may be closing");
} catch (Exception e) {
log.warn("Fail to loadAllCronJob", e);
} finally {
Expand All @@ -65,7 +67,7 @@ public void loadAllCronJob() {
}
}

private void loadAllCronJobToQuartz() {
private void loadAllCronJobToQuartz() throws InterruptedException {
int start = 0;
int limit = 100;
int currentFetchNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,25 @@

import com.tencent.bk.job.crontab.timer.AbstractQuartzTaskHandler;
import com.tencent.bk.job.crontab.timer.QuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.quartz.*;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* QuartzTaskHandler 的默认实现
**/
@Slf4j
@Component
public class DefaultQuartzTaskHandler extends AbstractQuartzTaskHandler {

Expand All @@ -60,6 +67,11 @@ public void addJob(QuartzJob quartzJob) throws SchedulerException {

Set<? extends Trigger> triggers = createTriggers(quartzJob);

if (scheduler.isShutdown()) {
log.info("scheduler is shutdown, ignore add job {}!", quartzJob.getKey().getName());
return;
}

if (CollectionUtils.isEmpty(triggers)) {
this.scheduler.addJob(jobDetail, false);
} else {
Expand All @@ -84,13 +96,27 @@ public void deleteJob(JobKey jobKey) throws SchedulerException {
Assert.notNull(jobKey, "jobKey cannot be empty!");
Assert.notNull(jobKey.getName(), "jobKey name cannot be empty!");

if (scheduler.isShutdown()) {
log.info("scheduler is shutdown, ignore delete job {}!", jobKey.getName());
return;
}

this.scheduler.deleteJob(jobKey);
}

@Override
public void deleteJob(List<JobKey> jobKeys) throws SchedulerException {
Assert.notNull(jobKeys, "jobKeys cannot be empty!");

if (scheduler.isShutdown()) {
log.info(
"scheduler is shutdown, ignore delete {} job keys: {}",
jobKeys.size(),
jobKeys.stream().map(JobKey::getName).collect(Collectors.toList())
);
return;
}

this.scheduler.deleteJobs(jobKeys);
}

Expand All @@ -99,6 +125,10 @@ public void deleteJob(List<JobKey> jobKeys) throws SchedulerException {
*/
@Override
public void pauseAll() throws SchedulerException {
if (scheduler.isShutdown()) {
log.info("scheduler is shutdown, ignore pauseAll!");
return;
}
this.scheduler.pauseAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,14 @@ data:
shutdown-timeout: 100ms
quartz:
job-store-type: MEMORY
waitForJobsToCompleteOnShutdown: true
properties:
org:
quartz:
jobStore:
class: org.quartz.simpl.RAMJobStore
misfireThreshold: 60000
plugin:
shutdownhook:
class: org.quartz.plugins.management.ShutdownHookPlugin
cleanShutdown: true
triggHistory:
class: org.quartz.plugins.history.LoggingJobHistoryPlugin
scheduler:
Expand Down

0 comments on commit 09deee6

Please sign in to comment.