Skip to content

Commit

Permalink
Merge pull request #4150 from Coduz/feat-jobInstanceDataJobEndCallback
Browse files Browse the repository at this point in the history
✨ [Job Engine] Added JobDataCleanupJobEndCallback to delete JobInstanceData after JobExecution has ended
  • Loading branch information
Coduz authored Dec 12, 2024
2 parents 38c4367 + 0335034 commit ca9f887
Show file tree
Hide file tree
Showing 8 changed files with 641 additions and 496 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*******************************************************************************
* Copyright (c) 2024, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.job.engine.jbatch.overrides.callback;

import com.ibm.jbatch.container.callback.JobEndCallback;
import com.ibm.jbatch.container.servicesmanager.ServicesManager;
import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
import org.eclipse.kapua.job.engine.jbatch.driver.JbatchDriver;
import org.eclipse.kapua.job.engine.jbatch.persistence.JPAPersistenceManagerImpl;
import org.eclipse.kapua.job.engine.jbatch.persistence.jpa.JpaJobInstanceData;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.job.Job;
import org.eclipse.kapua.service.job.execution.JobExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JpaJobInstanceData} cleanup {@link JobEndCallback}.
* <p>
* This {@link JobEndCallback} removes {@link JpaJobInstanceData} after the {@link JobExecution} ends it execution.
* This is done to avoid that {@link Job}s that run a lot of times (thousands or more) fill up the {@link JpaJobInstanceData}
* table making {@link JbatchDriver#isRunningJob(KapuaId, KapuaId)} tanking a lot of time to execute.
* <p>
* {@link JpaJobInstanceData} aren't required nor useful once the {@link JobExecution} ends its processing so they can be safely deleted.
*
* @since 2.1.0
*/
public class JobDataCleanupJobEndCallback implements JobEndCallback {

private static final Logger LOG = LoggerFactory.getLogger(JobDataCleanupJobEndCallback.class);

private final JPAPersistenceManagerImpl persistenceService;
private long jobExecutionId;

/**
* Constructor.
*
* @since 2.1.0
*/
public JobDataCleanupJobEndCallback() {
ServicesManager servicesManager = ServicesManagerImpl.getInstance();
persistenceService = (JPAPersistenceManagerImpl) servicesManager.getPersistenceManagerService();
}

@Override
public void setExecutionId(long jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}

@Override
public void done(long jobExecutionId) {
try {
long jobInstanceId = persistenceService.getJobInstanceIdByExecutionId(jobExecutionId);

LOG.info("Deleting JobInstanceData {} after JobExecution {} has completed...", jobInstanceId, jobExecutionId);
persistenceService.deleteJobInstanceData(jobInstanceId);
LOG.info("Deleting JobInstanceData {} after JobExecution {} has completed... DONE!", jobInstanceId, jobExecutionId);
}
catch (Exception e) {
LOG.error("Deleting JobInstanceData for JobExecution {} has completed... ERROR! This will leave JobInstanceData in the DB until the Job gets deleted", jobExecutionId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*******************************************************************************
* Copyright (c) 2024, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.job.engine.jbatch.overrides.callback;

import com.ibm.jbatch.container.callback.IJobEndCallbackService;
import com.ibm.jbatch.container.callback.JobEndCallback;
import com.ibm.jbatch.container.callback.JobEndCallbackManagerImpl;

/**
* Kapua {@link IJobEndCallbackService} which extends default {@link JobEndCallbackManagerImpl} to register custom {@link JobEndCallback}s
*
* @since 2.1.0
*/
public class KapuaJobEndCallbackManagerImpl extends JobEndCallbackManagerImpl implements IJobEndCallbackService {

/**
* Constructor
*
* @since 2.1.0
*/
public KapuaJobEndCallbackManagerImpl() {
super();

// Required to delete JobInstanceData after JobExecution ends
registerJobEndCallback(new JobDataCleanupJobEndCallback());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@
* <pre>
* JobInstanceData (aka: JobInstace)
* |
* |-- as one --- JobStatus
* |-- as many -- ExecutionInstanceData (aka: JobExecution)
* |-- has one --- JobStatus
* |-- has many -- ExecutionInstanceData (aka: JobExecution)
* |
* |-- as many -- StepExecutionInstanceData (aka: JobStepExecution)
* |-- has many -- StepExecutionInstanceData (aka: JobStepExecution)
* | |
* | |-- as one --- StepStatus
* | |-- has one --- StepStatus
* |
* |-- as many -- CheckpointData
* |-- has many -- CheckpointData
* </pre>
*
* @since 1.2.0
Expand Down Expand Up @@ -220,6 +220,20 @@ public String getJobCurrentTag(long jobInstanceId) {
return "NOTSET";
}

/**
* Deletes {@link JpaJobInstanceData} by its {@link JpaJobInstanceData#getId()}.
*
* @param jobInstanceId The {@link JpaJobInstanceData#getId()} to delete.
* @since 2.1.0
*/
public void deleteJobInstanceData(long jobInstanceId) {
try {
txManager.execute(tx -> jobInstanceDataRepository.deleteById(tx, jobInstanceId));
} catch (Exception e) {
throw new PersistenceException(e);
}
}

/**
* Deletes {@link JobInstance}s by name.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
@NamedQuery(name = "JobInstanceData.countByNameTagApp",
query = "SELECT COUNT(jid) FROM JobInstanceData jid WHERE jid.name = :name AND jid.appTag = :appTag"),
@NamedQuery(name = "JobInstanceData.deleteByName",
query = "DELETE FROM JobInstanceData jid WHERE jid.name = :name")
query = "DELETE FROM JobInstanceData jid WHERE jid.name = :name"),
@NamedQuery(name = "JobInstanceData.deleteById",
query = "DELETE FROM JobInstanceData jid WHERE jid.id = :id")

})
public class JpaJobInstanceData implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
public interface JpaJobInstanceDataRepository {
JpaJobInstanceData create(TxContext tx, String name, String appTag, String jobXml);

int deleteByName(TxContext tx, String jobName);

JpaJobInstanceData find(TxContext tx, long id);

Integer getJobInstanceCount(TxContext tx, String jobName, String appTag);

List<Long> getJobInstanceIds(TxContext tx, String jobName, String appTag, Integer offset, Integer limit);

List<JpaJobInstanceData> getExternalJobInstanceData(TxContext tx);

int deleteByName(TxContext tx, String jobName);

/**
* Deletes {@link JpaJobInstanceData} by its {@link JpaJobInstanceData#getId()}.
*
* @param tx The {@link TxContext}
* @param jobInstanceId The {@link JpaJobInstanceData#getId()} to delete.
* @since 2.1.0
*/
int deleteById(TxContext tx, long jobInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ public JpaJobInstanceData create(TxContext tx, String name, String appTag, Strin
return jpaJobInstanceData;
}

@Override
public int deleteByName(TxContext tx, String jobName) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteByName", Integer.class);
deleteByNameQuery.setParameter("name", jobName);
return deleteByNameQuery.executeUpdate();
}

@Override
public JpaJobInstanceData find(TxContext tx, long id) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);
Expand Down Expand Up @@ -102,4 +93,22 @@ public List<JpaJobInstanceData> getExternalJobInstanceData(TxContext tx) {
final List<JpaJobInstanceData> queryResult = selectQuery.getResultList();
return queryResult;
}

@Override
public int deleteByName(TxContext tx, String jobName) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteByName", Integer.class);
deleteByNameQuery.setParameter("name", jobName);
return deleteByNameQuery.executeUpdate();
}

@Override
public int deleteById(TxContext tx, long jobInstanceId) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteById", Integer.class);
deleteByNameQuery.setParameter("id", jobInstanceId);
return deleteByNameQuery.executeUpdate();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
J2SE_MODE=true
JOBXML_LOADER_SERVICE=com.ibm.jbatch.container.services.impl.DirectoryJobXMLLoaderServiceImpl
CONTAINER_ARTIFACT_FACTORY_SERVICE=org.eclipse.kapua.job.engine.jbatch.KapuaDelegatingBatchArtifactFactoryImpl
# com.ibm.jbatch.container.services.impl.DelegatingBatchArtifactFactoryImpl
CALLBACK_SERVICE=org.eclipse.kapua.job.engine.jbatch.overrides.callback.KapuaJobEndCallbackManagerImpl
BATCH_THREADPOOL_SERVICE=com.ibm.jbatch.container.services.impl.GrowableThreadPoolServiceImpl
PERSISTENCE_MANAGEMENT_SERVICE=org.eclipse.kapua.job.engine.jbatch.persistence.JPAPersistenceManagerImpl
Loading

0 comments on commit ca9f887

Please sign in to comment.