diff --git a/CHANGELOG.md b/CHANGELOG.md index ccc0f263..6e002033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,52 +5,97 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +#### 1.43.0 - 2024/08/09 + +- Added support for task context + +You will need to do the following migration: + +Postgres: + +``` +ALTER TABLE tw_task_data +ADD COLUMN task_context_format SMALLINT, +ADD COLUMN task_context BYTEA; +``` + +MariaDB: + +``` +ALTER TABLE tw_task_data +ADD COLUMN task_context_format SMALLINT, +ADD COLUMN task_context BLOB, +ALGORITHM=INPLACE, LOCK=NONE; +``` + #### 1.42.0 - 2024/07/16 + ### Added + - Support for Spring Boot 3.3. ### Removed + - Support for spring boot 3.1 and 2.7 versions. #### 1.41.6 - 2024/04/17 + ### Added -- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with extreme amount of tasks might benefit from this. + +- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with + extreme amount of tasks might benefit from this. #### 1.41.5 - 2024/04/05 + ### Changed + * Use static methods to create BeanPostProcessors. #### 1.41.4 - 2024/04/02 + ### Changed + - `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es). - Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value. #### 1.41.3 - 2024/02/29 + ### Changed + * Add compatibility with Spring Boot 3.2. * Update dependencies #### 1.41.2 - 2024/02/16 + ### Changed -* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet. + +* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the + CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet. #### 1.41.1 - 2023/12/19 + ### Changed + - When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the `HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6 #### 1.41.0 - 2023/11/16 + ### Added + - Added `taskType` and `taskSubType` parameters to management query endpoints. - Added `/getTaskTypes` endpoint to retrieve list of registered task types and sub-types ## 1.40.6 - 2023/11/16 + ### Fixed * NullPointerException in TaskManagementService.getTaskData in case task is not found #### 1.40.5 - 2023/10/30 + ### Added + - Setting METADATA_MAX_AGE_CONFIG to two minutes for producer #### 1.40.4 - 2023/10/06 @@ -75,7 +120,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Added -* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task types in your service. +* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task + types in your service. #### 1.40.1 - 2023/07/12 @@ -86,6 +132,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). #### 1.40.0 - 2023/06/12 ### Added + * CronJob annotation for Spring bean's methods #### 1.39.2 - 2023/06/06 @@ -99,6 +146,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). #### 1.39.1 - 2023/04/19 ### Changed + * Kafka consumer offset duration is always considered as positive since we cannot reset the offsets to future timestamps. * Both `PT1H` and `-PT1H` are treated the same ie `PT1H`. This value gets subtracted by now() timestamp. * Added second kafka consumer for the tests in `SeekToDurationOnRebalanceListenerIntTest` class diff --git a/gradle.properties b/gradle.properties index 168f4a0e..cc94780d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.42.0 +version=1.43.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java index 60c14ba8..c3b8539b 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java @@ -1,7 +1,9 @@ package com.transferwise.tasks.testapp; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.transferwise.tasks.BaseIntTest; import com.transferwise.tasks.CompressionAlgorithm; @@ -10,16 +12,20 @@ import com.transferwise.tasks.ITasksService.AddTaskResponse; import com.transferwise.tasks.TasksProperties; import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.dao.ITaskDaoDataSerializer; import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData; import com.transferwise.tasks.dao.ITaskSqlMapper; import com.transferwise.tasks.dao.MySqlTaskTypesMapper; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.test.ITestTasksService; import com.transferwise.tasks.test.dao.ITestTaskDao; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.UUID; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,6 +49,10 @@ public class TaskDataIntTest extends BaseIntTest { private TasksProperties tasksProperties; @Autowired private JdbcTemplate jdbcTemplate; + @Autowired + private ObjectMapper objectMapper; + @Autowired + private ITaskDaoDataSerializer taskDataSerializer; private CompressionAlgorithm originalAlgorithm; private int originalMinSize; @@ -137,4 +147,59 @@ void oldDataFieldIsNotSetForNewTasks() { assertThat(oldData).isEqualTo(""); } + @ParameterizedTest + @MethodSource("providedAlgorithms") + @SneakyThrows + void testWritingTaskContextData() { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + + byte[] contextBlob = jdbcTemplate + .queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId)); + + TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class); + assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap()); + + } + + + @ParameterizedTest + @MethodSource("providedAlgorithms") + void testReadingTaskContextData(String algorithm) { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context) + .setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm))); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap()); + + Task taskRecord = taskDao.getTask(taskId, Task.class); + assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext()); + + } + + private static Stream providedAlgorithms() { + return Stream.of("GZIP", "LZ4", null, "NONE"); + } + } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java new file mode 100644 index 00000000..160971eb --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.testapp; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.TasksService; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +public class TaskInterceptionIntTest extends BaseIntTest { + + @Autowired + private TasksService tasksService; + + @Test + void jambiTaskIsInterceptedCorrectly() { + testTasksService.resetAndDeleteTasksWithTypes("test"); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2); + assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count()); + + } + +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java index 5e42c05b..2b8c4136 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java @@ -9,7 +9,11 @@ import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler; import com.transferwise.tasks.impl.jobs.interfaces.IJob; import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor; +import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator; import com.transferwise.tasks.utils.LogUtils; +import io.micrometer.core.instrument.MeterRegistry; import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC return props; }; } + + @Bean + ITaskRegistrationDecorator jambiRegistrationInterceptor() { + return new JambiTaskRegistrationDecorator(); + } + + @Bean + ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) { + return new JambiTaskInterceptor(meterRegistry); + } } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java new file mode 100644 index 00000000..36945087 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java @@ -0,0 +1,25 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class JambiTaskInterceptor implements ITaskProcessingInterceptor { + + private final MeterRegistry meterRegistry; + + @Override + public void doProcess(Task task, Runnable processor) { + if ("Jambi".equals(task.getSubType())) { + if (task.getTaskContext() != null) { + var name = task.getTaskContext().get("adam-jones", String.class); + meterRegistry.counter("tool", "song", name).increment(); + } + } + processor.run(); + } +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java new file mode 100644 index 00000000..2acf4e84 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java @@ -0,0 +1,19 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.domain.TaskContext; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator { + + @Override + public AddTaskRequest decorate(AddTaskRequest request) { + if ("Jambi".equals(request.getSubType())) { + return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy"))); + } + return request; + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index dce558f7..0515004c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -1,6 +1,7 @@ package com.transferwise.tasks; import com.transferwise.tasks.ITasksService.AddTaskResponse.Result; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; @@ -45,6 +46,7 @@ class AddTaskRequest { private boolean warnWhenTaskExists; private Duration expectedQueueTime; private CompressionRequest compression; + private TaskContext taskContext; @Data @Accessors(chain = true) @@ -95,6 +97,7 @@ class ResumeTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskRequest { + private UUID taskId; private long version; private ZonedDateTime runAfterTime; @@ -103,6 +106,7 @@ class RescheduleTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskResponse { + private UUID taskId; private Result result; @@ -116,12 +120,14 @@ public enum Result { @Data @Accessors(chain = true) class GetTaskRequest { + private UUID taskId; } @Data @Accessors(chain = true) class GetTaskResponse { + private UUID taskId; private String type; private long version; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 51bf53d2..37e40738 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -16,9 +16,12 @@ import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; import com.transferwise.tasks.helpers.executors.IExecutorsHelper; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; import com.transferwise.tasks.triggering.ITasksExecutionTriggerer; import com.transferwise.tasks.utils.LogUtils; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -58,6 +61,8 @@ public class TasksService implements ITasksService, GracefulShutdownStrategy, In private IEnvironmentValidator environmentValidator; @Autowired private ICoreMetricsTemplate coreMetricsTemplate; + @Autowired(required = false) + private List taskRegistrationDecorators = new ArrayList<>(); private ExecutorService afterCommitExecutorService; private TxSyncAdapterFactory txSyncAdapterFactory; @@ -85,12 +90,18 @@ public void afterPropertiesSet() { @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) - public AddTaskResponse addTask(AddTaskRequest request) { + public AddTaskResponse addTask(AddTaskRequest requestParam) { return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ADD_TASK, () -> { + AddTaskRequest request = requestParam; mdcService.put(request.getTaskId(), 0L); mdcService.putType(request.getType()); mdcService.putSubType(request.getSubType()); + + for (ITaskRegistrationDecorator interceptor : taskRegistrationDecorators) { + request = interceptor.decorate(request); + } + ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock()); final TaskStatus status = request.getRunAfterTime() == null || !request.getRunAfterTime().isAfter(now) ? TaskStatus.SUBMITTED : TaskStatus.WAITING; @@ -102,17 +113,19 @@ public AddTaskResponse addTask(AddTaskRequest request) { ZonedDateTime maxStuckTime = request.getExpectedQueueTime() == null ? now.plus(tasksProperties.getTaskStuckTimeout()) : now.plus(request.getExpectedQueueTime()); - byte[] data = request.getData(); + ITaskDao.InsertTaskResponse insertTaskResponse = taskDao.insertTask( - new ITaskDao.InsertTaskRequest().setData(data).setKey(request.getUniqueKey()) + new ITaskDao.InsertTaskRequest().setData(request.getData()).setKey(request.getUniqueKey()) .setRunAfterTime(request.getRunAfterTime()) .setSubType(request.getSubType()) .setType(request.getType()).setTaskId(request.getTaskId()) .setMaxStuckTime(maxStuckTime).setStatus(status).setPriority(priority) - .setCompression(request.getCompression())); + .setCompression(request.getCompression()) + .setTaskContext(request.getTaskContext()) + ); - coreMetricsTemplate - .registerTaskAdding(request.getType(), request.getUniqueKey(), insertTaskResponse.isInserted(), request.getRunAfterTime(), data); + coreMetricsTemplate.registerTaskAdding(request.getType(), request.getUniqueKey(), + insertTaskResponse.isInserted(), request.getRunAfterTime(), request.getData()); if (!insertTaskResponse.isInserted()) { coreMetricsTemplate.registerDuplicateTask(request.getType(), !request.isWarnWhenTaskExists()); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java index c5470d9c..4522fa1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java @@ -4,6 +4,7 @@ import com.transferwise.tasks.domain.BaseTask; import com.transferwise.tasks.domain.IBaseTask; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -54,6 +55,7 @@ class InsertTaskRequest { private ZonedDateTime maxStuckTime; private Integer priority; private CompressionRequest compression; + private TaskContext taskContext; } @Data diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 558460b7..7b840a71 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -2,6 +2,7 @@ import static com.transferwise.tasks.utils.TimeUtils.toZonedDateTime; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.transferwise.common.baseutils.ExceptionUtils; import com.transferwise.common.baseutils.UuidUtils; @@ -12,6 +13,7 @@ import com.transferwise.tasks.domain.BaseTask1; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; @@ -30,6 +32,7 @@ import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +77,8 @@ public abstract class JdbcTaskDao implements ITaskDao, InitializingBean { protected ITaskDaoDataSerializer taskDataSerializer; @Autowired protected ICoreMetricsTemplate coreMetricsTemplate; + @Autowired + protected ObjectMapper objectMapper; private final ConcurrentHashMap sqlCache = new ConcurrentHashMap<>(); @@ -94,6 +99,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String insertTaskSql; protected String insertUniqueTaskKeySql; protected String insertTaskDataSql; + protected String insertTaskContext; protected String setToBeRetriedSql; protected String setToBeRetriedSql1; protected String grabForProcessingWithStatusAssertionSql; @@ -130,7 +136,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String getApproximateTaskDatasCountSql1; protected final int[] questionBuckets = {1, 5, 25, 125, 625}; - + static final byte[] NULL_BLOB = "./g".getBytes(StandardCharsets.UTF_8); protected final TaskStatus[] stuckStatuses = new TaskStatus[]{TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.WAITING, TaskStatus.PROCESSING}; protected ITwTaskTables twTaskTables(TasksProperties tasksProperties) { @@ -147,7 +153,7 @@ public void afterPropertiesSet() { insertTaskSql = "insert ignore into " + taskTable + "(id,type,sub_type,status,data,next_event_time" + ",state_time,time_created,time_updated,processing_tries_count,version,priority) values (?,?,?,?,?,?,?,?,?,?,?,?)"; insertUniqueTaskKeySql = "insert ignore into " + uniqueTaskKeyTable + "(task_id,key_hash,`key`) values (?, ?, ?)"; - insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data) values (?,?,?)"; + insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data,task_context_format,task_context) values (?,?,?,?,?)"; setToBeRetriedSql = "update " + taskTable + " set status=?,next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=?"; setToBeRetriedSql1 = "update " + taskTable + " set status=?,next_event_time=?" + ",processing_tries_count=?,state_time=?,time_updated=?,version=? where id=? and version=?"; @@ -174,11 +180,12 @@ public void afterPropertiesSet() { getStuckTasksCountGroupedSql = "select type, count(*) from (select type from " + taskTable + " where status=?" + " and next_event_time T getTask(UUID taskId, Class clazz) { } else if (clazz.equals(Task.class)) { List result = jdbcTemplate.query(getTaskSql1, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 9, 10); + TaskContext context = getContext(rs, 11, 12); return new Task().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) .setSubType(rs.getString(6)).setData(data) - .setProcessingTriesCount(rs.getLong(8)); + .setProcessingTriesCount(rs.getLong(8)) + .setTaskContext(context); }); return (T) getFirst(result); } else if (clazz.equals(FullTaskRecord.class)) { List result = jdbcTemplate.query(getTaskSql2, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 12, 13); + TaskContext context = getContext(rs, 14, 15); return new FullTaskRecord().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) @@ -475,7 +492,8 @@ public T getTask(UUID taskId, Class clazz) { .setProcessingTriesCount(rs.getLong(8)) .setStateTime(toZonedDateTime(rs.getTimestamp(9))) .setNextEventTime(toZonedDateTime(rs.getTimestamp(10))) - .setProcessingClientId(rs.getString(11)); + .setProcessingClientId(rs.getString(11)) + .setTaskContext(context); }); return (T) getFirst(result); } else { @@ -716,9 +734,22 @@ protected void assertIsolationLevel(Isolation isolation) { } } + protected TaskContext getContext(ResultSet rs, int contextFormatIdx, int contextIdx) throws SQLException { + return ExceptionUtils.doUnchecked(() -> { + var blob = taskDataSerializer.deserialize(new SerializedData() + .setDataFormat(rs.getInt(contextFormatIdx)) + .setData(rs.getBytes(contextIdx)) + ); + if (blob == null) { + return null; + } + return objectMapper.readValue(blob, TaskContext.class); + }); + } + protected byte[] getData(ResultSet rs, int deprecatedDataIdx, int dataFormatIdx, int dataIdx) throws SQLException { byte[] data = rs.getBytes(dataIdx); - if (data != null) { + if (data != null && !Arrays.equals(NULL_BLOB, data)) { return taskDataSerializer.deserialize(new SerializedData().setDataFormat(rs.getInt(dataFormatIdx)).setData(data)); } else { String deprecatedData = rs.getString(deprecatedDataIdx); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java index 4364e4bd..cc286d1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java @@ -2,6 +2,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.ZonedDateTime; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -22,7 +23,8 @@ public class FullTaskRecord implements ITask { private ZonedDateTime stateTime; private ZonedDateTime nextEventTime; private String processingClientId; - + private TaskContext taskContext; + @Override public ITaskVersionId getVersionId() { return new TaskVersionId(id, version); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java index 2e97e061..03f95d96 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java @@ -1,6 +1,7 @@ package com.transferwise.tasks.domain; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -18,6 +19,7 @@ public class Task implements ITask { private long version; private long processingTriesCount; private int priority; + private TaskContext taskContext; // TODO: We should create an interface instead. public BaseTask toBaseTask() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java new file mode 100644 index 00000000..add371a7 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.domain; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(chain = true) +public class TaskContext { + + public static final TaskContext EMPTY = new TaskContext(); + + private Map contextMap = new HashMap<>(); + + public T get(String key, Class cls) { + return Optional.ofNullable(contextMap.get(key)).map(cls::cast).orElse(null); + } + + public void merge(TaskContext taskContext) { + if (taskContext != null) { + if (taskContext.contextMap != null) { + contextMap.putAll(taskContext.contextMap); + } + } + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java new file mode 100644 index 00000000..2f817d13 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java @@ -0,0 +1,8 @@ +package com.transferwise.tasks.processing; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; + +public interface ITaskRegistrationDecorator { + + AddTaskRequest decorate(AddTaskRequest request); +} diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml index 224c13b6..00628713 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml @@ -1,41 +1,47 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - CREATE TABLE tw_task ( - id BINARY(16) PRIMARY KEY NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), - -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 - next_event_time DATETIME(6) NOT NULL, - state_time DATETIME(6) NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5, - processing_start_time DATETIME(6) NULL, - processing_tries_count BIGINT NOT NULL, - time_created DATETIME(6) NOT NULL, - time_updated DATETIME(6) NOT NULL, - type VARCHAR(250) CHARACTER SET latin1 NOT NULL, - sub_type VARCHAR(250) CHARACTER SET latin1 NULL, - processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, - data LONGTEXT NOT NULL); + CREATE TABLE tw_task + ( + id BINARY(16) PRIMARY KEY NOT NULL, + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 + next_event_time DATETIME(6) NOT NULL, + state_time DATETIME(6) NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5, + processing_start_time DATETIME(6) NULL, + processing_tries_count BIGINT NOT NULL, + time_created DATETIME(6) NOT NULL, + time_updated DATETIME(6) NOT NULL, + type VARCHAR(250) CHARACTER SET latin1 NOT NULL, + sub_type VARCHAR(250) CHARACTER SET latin1 NULL, + processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, + data LONGTEXT NOT NULL + ); CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id BINARY(16) PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data LONGBLOB NOT NULL + CREATE TABLE tw_task_data + ( + task_id BINARY(16) PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data LONGBLOB NOT NULL, + task_context_format SMALLINT, + task_context BLOB ); - CREATE TABLE unique_tw_task_key ( - task_id BINARY(16) PRIMARY KEY, - key_hash INT NOT NULL, - `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, - UNIQUE KEY uidx1 (key_hash, `key`) + CREATE TABLE unique_tw_task_key + ( + task_id BINARY(16) PRIMARY KEY, + key_hash INT NOT NULL, + `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, + UNIQUE KEY uidx1 (key_hash, `key`) ); diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml index b7364a06..39cd09f2 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml @@ -4,41 +4,46 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - - - CREATE TABLE tw_task ( - id UUID PRIMARY KEY, - type TEXT NOT NULL, - sub_type TEXT NULL, - status TEXT NOT NULL, - data TEXT NOT NULL, - next_event_time TIMESTAMPTZ(6) NOT NULL, - state_time TIMESTAMPTZ(3) NOT NULL, - processing_client_id TEXT NULL, - processing_start_time TIMESTAMPTZ(3) NULL, - time_created TIMESTAMPTZ(3) NOT NULL, - time_updated TIMESTAMPTZ(3) NOT NULL, - processing_tries_count BIGINT NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5 - ); + + + CREATE TABLE tw_task + ( + id UUID PRIMARY KEY, + type TEXT NOT NULL, + sub_type TEXT NULL, + status TEXT NOT NULL, + data TEXT NOT NULL, + next_event_time TIMESTAMPTZ(6) NOT NULL, + state_time TIMESTAMPTZ(3) NOT NULL, + processing_client_id TEXT NULL, + processing_start_time TIMESTAMPTZ(3) NULL, + time_created TIMESTAMPTZ(3) NOT NULL, + time_updated TIMESTAMPTZ(3) NOT NULL, + processing_tries_count BIGINT NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5 + ); - CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); + CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id UUID PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data BYTEA NOT NULL - ) WITH (toast_tuple_target=8160); + CREATE TABLE tw_task_data + ( + task_id UUID PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data BYTEA NOT NULL, + task_context_format SMALLINT, + task_context BYTEA + ) WITH (toast_tuple_target = 8160); - ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; + ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; - CREATE TABLE unique_tw_task_key ( - task_id UUID PRIMARY KEY NOT NULL, - key_hash INT NOT NULL, - key TEXT NOT NULL, - unique (key_hash, key) - ); - - + CREATE TABLE unique_tw_task_key + ( + task_id UUID PRIMARY KEY NOT NULL, + key_hash INT NOT NULL, + key TEXT NOT NULL, + unique (key_hash, key) + ); + +