Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 4, 2024
1 parent 81172fc commit 149c9ef
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.error.JobProcessorInstantiationException;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.util.Logger;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class JobProcessorFactory {

public JobProcessorFactory() {
// Default constructor for CDI
}

/**
* Creates a new instance of the specified job processor class.
*
* @param processorClass The class of the job processor to create.
* @return An optional containing the new job processor instance, or an empty optional if the
* processor could not be created.
*/
JobProcessor newInstance(
Class<? extends JobProcessor> processorClass) {
try {
return processorClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
Logger.error(this, "Error creating job processor", e);
throw new JobProcessorInstantiationException(processorClass, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* jobQueueManagerAPI.setRetryStrategy("contentImport", contentImportRetryStrategy);
*
* // Register job processors
* jobQueueManagerAPI.registerProcessor("contentImport", new ContentImportJobProcessor());
* jobQueueManagerAPI.registerProcessor("contentImport", ContentImportJobProcessor.class);
*
* // Start the job queue manager
* jobQueueManagerAPI.start();
Expand Down Expand Up @@ -113,6 +113,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {

private final RealTimeJobMonitor realTimeJobMonitor;
private final EventProducer eventProducer;
private final JobProcessorFactory jobProcessorFactory;

/**
* Constructs a new JobQueueManagerAPIImpl.
Expand All @@ -138,7 +139,8 @@ public JobQueueManagerAPIImpl(JobQueue jobQueue,
CircuitBreaker circuitBreaker,
RetryStrategy defaultRetryStrategy,
RealTimeJobMonitor realTimeJobMonitor,
EventProducer eventProducer) {
EventProducer eventProducer,
JobProcessorFactory jobProcessorFactory) {

this.jobQueue = jobQueue;
this.threadPoolSize = jobQueueConfig.getThreadPoolSize();
Expand All @@ -157,6 +159,7 @@ public JobQueueManagerAPIImpl(JobQueue jobQueue,
// Events
this.realTimeJobMonitor = realTimeJobMonitor;
this.eventProducer = eventProducer;
this.jobProcessorFactory = jobProcessorFactory;
}

@Override
Expand Down Expand Up @@ -229,14 +232,13 @@ public void close() throws Exception {

@Override
public void registerProcessor(final String queueName, final Class<? extends JobProcessor> processor) {
final String queueNameLower = queueName.toLowerCase();
final Class<? extends JobProcessor> jobProcessor = processors.get(queueNameLower);
final Class<? extends JobProcessor> jobProcessor = processors.get(queueName);
if (null != jobProcessor) {
Logger.warn(this, String.format(
"Job processor [%s] already registered for queue: [%s] is getting overridden.",
jobProcessor.getName(), queueName));
}
processors.put(queueNameLower, processor);
processors.put(queueName, processor);
}

@Override
Expand All @@ -248,20 +250,19 @@ public Map<String,Class<? extends JobProcessor>> getQueueNames() {
@Override
public String createJob(final String queueName, final Map<String, Object> parameters)
throws JobProcessorNotFoundException, DotDataException {
final String queueNameLower = queueName.toLowerCase();
final Class<? extends JobProcessor> clazz = processors.get(queueNameLower);
final Class<? extends JobProcessor> clazz = processors.get(queueName);
if (null == clazz) {
final var error = new JobProcessorNotFoundException(queueName);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

//first attempt instantiating the processor, cuz if we cant no use to create an entry in the db
final var processor = newInstanceOfProcessor(queueNameLower).orElseThrow();
final var processor = newProcessorInstance(queueName);
// now that we know we can instantiate the processor, we can add it to the map of instances
// But first we need the job id
try {
final String jobId = jobQueue.createJob(queueNameLower, parameters);
final String jobId = jobQueue.createJob(queueName, parameters);
addInstanceRef(jobId, processor);
eventProducer.getEvent(JobCreatedEvent.class).fire(
new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
Expand Down Expand Up @@ -298,9 +299,8 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getActiveJobs(queueNameLower, page, pageSize);
return jobQueue.getActiveJobs(queueName, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
Expand Down Expand Up @@ -329,31 +329,28 @@ public void cancelJob(final String jobId) throws DotDataException {
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching job", e);
}
final var processor = processorInstancesByJobId.get(jobId);
if (processor instanceof Cancellable) {

try {

Logger.info(this, "Cancelling job " + jobId);

((Cancellable) processor).cancel(job);
handleJobCancelling(job, processor);
} catch (Exception e) {
final var error = new DotDataException("Error cancelling job " + jobId, e);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
final Optional<JobProcessor> instance = getInstance(jobId);
if (instance.isPresent()) {
final var processor = instance.get();
if (processor instanceof Cancellable) {
try {
Logger.info(this, "Cancelling job " + jobId);
((Cancellable) processor).cancel(job);
handleJobCancelling(job, processor);
} catch (Exception e) {
final var error = new DotDataException("Error cancelling job " + jobId, e);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
} else {
final var error = new DotDataException("Job " + jobId + " cannot be canceled");
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
} else {

if (processor == null) {
final var error = new JobProcessorNotFoundException(job.queueName(), jobId);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

final var error = new DotDataException(jobId, "Job " + jobId + " cannot be canceled");
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
Logger.error(this, "No processor found for job " + jobId);
throw new JobProcessorNotFoundException(job.queueName(), jobId);
}
}

Expand Down Expand Up @@ -634,8 +631,9 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException
*/
private void processJob(final Job job) throws DotDataException {

final JobProcessor processor = processorInstancesByJobId.get(job.id());
if (processor != null) {
final Optional<JobProcessor> optional = getProcessorInstance(job);
if (optional.isPresent()) {
final JobProcessor processor = optional.get();

final ProgressTracker progressTracker = new DefaultProgressTracker();
Job runningJob = job.markAsRunning().withProgressTracker(progressTracker);
Expand Down Expand Up @@ -672,6 +670,8 @@ private void processJob(final Job job) throws DotDataException {
} else {
handleJobCompletion(runningJob, processor);
}
//Free up resources
removeInstanceRef(runningJob.id());
} catch (Exception e) {

Logger.error(this,
Expand All @@ -690,22 +690,36 @@ private void processJob(final Job job) throws DotDataException {
}
}

/**
* Get an instance of a processor for a job.
* If an instance already exists, it will be returned. otherwise a new instance will be created.
* @param job The job to get the processor for
* @return The processor instance
*/
private Optional<JobProcessor> getProcessorInstance(final Job job) {
try {
return Optional.of(processorInstancesByJobId.computeIfAbsent(job.id(),
id -> newProcessorInstance(job.queueName()))
);
} catch (Exception e){
Logger.error(this, "Error getting processor instance", e);
}
return Optional.empty();
}


/**
* Creates a new instance of a JobProcessor for a specific queue.
* @param queueName
* @param queueName The name of the queue
* @return An optional containing the new JobProcessor instance, or an empty optional if the processor could not be created.
*/
Optional<JobProcessor> newInstanceOfProcessor(final String queueName) {
private JobProcessor newProcessorInstance(final String queueName) {
final var processorClass = processors.get(queueName);
if (processorClass != null) {
try {
return Optional.of(processorClass.getDeclaredConstructor().newInstance());
} catch (Exception e) {
Logger.error(this, "Error creating job processor", e);
throw new JobProcessorInstantiationException(processorClass,e);
}
return jobProcessorFactory.newInstance(processorClass);
} else {
throw new JobProcessorNotFoundException(queueName);
}
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ String createJob(String queueName, JobParams form, HttpServletRequest request)
final HashMap <String, Object>in = new HashMap<>(form.getParams());
handleUploadIfPresent(form, in, request);
try {
return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in));
return jobQueueManagerAPI.createJob(queueName.toLowerCase(), Map.copyOf(in));
} catch (JobProcessorNotFoundException e) {
Logger.error(this.getClass(), "Error creating job", e);
throw new DoesNotExistException(e.getMessage());
Expand Down Expand Up @@ -186,7 +186,7 @@ JobPaginatedResult getJobs(int page, int pageSize) {
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) {
try {
return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize);
return jobQueueManagerAPI.getActiveJobs(queueName.toLowerCase(), page, pageSize);
} catch (JobQueueDataException e) {
Logger.error(this.getClass(), "Error fetching active jobs", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class JobQueueManagerAPICDITest {
JobQueue.class, RetryStrategy.class, CircuitBreaker.class,
JobQueueProducer.class, JobQueueConfigProducer.class,
RetryStrategyProducer.class, RealTimeJobMonitor.class,
EventProducer.class)
EventProducer.class, JobProcessorFactory.class)
);

@Inject
Expand Down
Loading

0 comments on commit 149c9ef

Please sign in to comment.