Skip to content

Commit

Permalink
Add TaskContext to tasks to be track e2e instant transfer information (
Browse files Browse the repository at this point in the history
  • Loading branch information
hussainkarafallah authored Sep 9, 2024
1 parent 0864b81 commit fc9c7d1
Show file tree
Hide file tree
Showing 19 changed files with 404 additions and 87 deletions.
54 changes: 51 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,97 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### 1.43.0 - 2024/08/09

- Added support for task context

You will need to do the following migration:

Postgres:

```
ALTER TABLE tw_task_data
ADD COLUMN task_context_format SMALLINT,
ADD COLUMN task_context BYTEA;
```

MariaDB:

```
ALTER TABLE tw_task_data
ADD COLUMN task_context_format SMALLINT,
ADD COLUMN task_context BLOB,
ALGORITHM=INPLACE, LOCK=NONE;
```

#### 1.42.0 - 2024/07/16

### Added

- Support for Spring Boot 3.3.

### Removed

- Support for spring boot 3.1 and 2.7 versions.

#### 1.41.6 - 2024/04/17

### Added
- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with extreme amount of tasks might benefit from this.

- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with
extreme amount of tasks might benefit from this.

#### 1.41.5 - 2024/04/05

### Changed

* Use static methods to create BeanPostProcessors.

#### 1.41.4 - 2024/04/02

### Changed

- `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es).
- Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value.

#### 1.41.3 - 2024/02/29

### Changed

* Add compatibility with Spring Boot 3.2.
* Update dependencies

#### 1.41.2 - 2024/02/16

### Changed
* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet.

* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the
CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet.

#### 1.41.1 - 2023/12/19

### Changed

- When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the
`HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6

#### 1.41.0 - 2023/11/16

### Added

- Added `taskType` and `taskSubType` parameters to management query endpoints.
- Added `/getTaskTypes` endpoint to retrieve list of registered task types and sub-types

## 1.40.6 - 2023/11/16

### Fixed

* NullPointerException in TaskManagementService.getTaskData in case task is not found

#### 1.40.5 - 2023/10/30

### Added

- Setting METADATA_MAX_AGE_CONFIG to two minutes for producer

#### 1.40.4 - 2023/10/06
Expand All @@ -75,7 +120,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Added

* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task types in your service.
* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task
types in your service.

#### 1.40.1 - 2023/07/12

Expand All @@ -86,6 +132,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
#### 1.40.0 - 2023/06/12

### Added

* CronJob annotation for Spring bean's methods

#### 1.39.2 - 2023/06/06
Expand All @@ -99,6 +146,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
#### 1.39.1 - 2023/04/19

### Changed

* Kafka consumer offset duration is always considered as positive since we cannot reset the offsets to future timestamps.
* Both `PT1H` and `-PT1H` are treated the same ie `PT1H`. This value gets subtracted by now() timestamp.
* Added second kafka consumer for the tests in `SeekToDurationOnRebalanceListenerIntTest` class
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.42.0
version=1.43.0
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.transferwise.tasks.testapp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.CompressionAlgorithm;
Expand All @@ -10,16 +12,20 @@
import com.transferwise.tasks.ITasksService.AddTaskResponse;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData;
import com.transferwise.tasks.dao.ITaskSqlMapper;
import com.transferwise.tasks.dao.MySqlTaskTypesMapper;
import com.transferwise.tasks.domain.FullTaskRecord;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.test.ITestTasksService;
import com.transferwise.tasks.test.dao.ITestTaskDao;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -43,6 +49,10 @@ public class TaskDataIntTest extends BaseIntTest {
private TasksProperties tasksProperties;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private ITaskDaoDataSerializer taskDataSerializer;

private CompressionAlgorithm originalAlgorithm;
private int originalMinSize;
Expand Down Expand Up @@ -137,4 +147,59 @@ void oldDataFieldIsNotSetForNewTasks() {
assertThat(oldData).isEqualTo("");
}

@ParameterizedTest
@MethodSource("providedAlgorithms")
@SneakyThrows
void testWritingTaskContextData() {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context);
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));

byte[] contextBlob = jdbcTemplate
.queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId));

TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class);
assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap());

}


@ParameterizedTest
@MethodSource("providedAlgorithms")
void testReadingTaskContextData(String algorithm) {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context)
.setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm)));
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));
assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap());

Task taskRecord = taskDao.getTask(taskId, Task.class);
assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext());

}

private static Stream<String> providedAlgorithms() {
return Stream.of("GZIP", "LZ4", null, "NONE");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.transferwise.tasks.testapp;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.TasksService;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class TaskInterceptionIntTest extends BaseIntTest {

@Autowired
private TasksService tasksService;

@Test
void jambiTaskIsInterceptedCorrectly() {
testTasksService.resetAndDeleteTasksWithTypes("test");
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2);
assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler;
import com.transferwise.tasks.impl.jobs.interfaces.IJob;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor;
import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator;
import com.transferwise.tasks.utils.LogUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC
return props;
};
}

@Bean
ITaskRegistrationDecorator jambiRegistrationInterceptor() {
return new JambiTaskRegistrationDecorator();
}

@Bean
ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) {
return new JambiTaskInterceptor(meterRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class JambiTaskInterceptor implements ITaskProcessingInterceptor {

private final MeterRegistry meterRegistry;

@Override
public void doProcess(Task task, Runnable processor) {
if ("Jambi".equals(task.getSubType())) {
if (task.getTaskContext() != null) {
var name = task.getTaskContext().get("adam-jones", String.class);
meterRegistry.counter("tool", "song", name).increment();
}
}
processor.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import java.util.Map;
import org.springframework.stereotype.Component;

@Component
public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator {

@Override
public AddTaskRequest decorate(AddTaskRequest request) {
if ("Jambi".equals(request.getSubType())) {
return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy")));
}
return request;
}
}
1 change: 1 addition & 0 deletions tw-tasks-core-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ dependencies {
implementation libraries.springBeans
implementation libraries.springJdbc
implementation libraries.springContext
implementation libraries.jacksonDatabind
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.transferwise.tasks.test.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.jackson.DefaultJsonConverter;
import com.transferwise.common.baseutils.jackson.JsonConverter;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData;
import com.transferwise.tasks.dao.ITaskSqlMapper;
import com.transferwise.tasks.dao.ITwTaskTables;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.helpers.sql.ArgumentPreparedStatementSetter;
import com.transferwise.tasks.helpers.sql.CacheKey;
Expand Down Expand Up @@ -59,7 +63,7 @@ private static class Queries {
" and status in (??)"
};
getTasksByTypeAndStatusAndSubType = new String[]{
"select id,type,sub_type,t.data,status,version,processing_tries_count,priority,d.data_format,d.data"
"select id,type,sub_type,t.data,status,version,processing_tries_count,priority,d.data_format,d.data,d.task_context_format,d.task_context"
+ " from " + tasksTable + " t left join " + dataTable + " d on t.id=d.task_id"
+ " where type=?",
" and status in (??)",
Expand All @@ -74,6 +78,7 @@ private static class Queries {
private final ITaskSqlMapper sqlMapper;
private final ConcurrentHashMap<CacheKey, String> sqlCache;
private final ITaskDaoDataSerializer taskDataSerializer;
private final JsonConverter jsonConverter = new DefaultJsonConverter(new ObjectMapper());

public JdbcTestTaskDao(DataSource dataSource, ITwTaskTables tables, ITaskSqlMapper sqlMapper, ITaskDaoDataSerializer taskDataSerializer) {
this.sqlCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -198,8 +203,16 @@ public List<Task> findTasksByTypeSubTypeAndStatus(String type, String subType, T
(rs, rowNum) -> {
byte[] data;
byte[] newData = rs.getBytes(10);
TaskContext context = null;
if (newData != null) {
data = taskDataSerializer.deserialize(new SerializedData().setDataFormat(rs.getInt(9)).setData(newData));
var contextBlob = taskDataSerializer.deserialize(new SerializedData()
.setDataFormat(rs.getInt(11))
.setData(rs.getBytes(12))
);
if (contextBlob != null) {
context = jsonConverter.toObject(contextBlob, TaskContext.class);
}
} else {
data = rs.getBytes(4);
}
Expand All @@ -211,7 +224,8 @@ public List<Task> findTasksByTypeSubTypeAndStatus(String type, String subType, T
.setStatus(rs.getString(5))
.setVersion(rs.getLong(6))
.setProcessingTriesCount(rs.getLong(7))
.setPriority(rs.getInt(8));
.setPriority(rs.getInt(8))
.setTaskContext(context);
}
);
}
Expand Down
Loading

0 comments on commit fc9c7d1

Please sign in to comment.