Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TaskContext to tasks to be track e2e instant transfer information #204

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,97 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### 1.43.0 - 2024/08/09

- Added support for task context

You will need to do the following migration:

Postgres:

```
ALTER TABLE tw_task_data
ADD COLUMN task_context_format SMALLINT,
ADD COLUMN task_context BYTEA;
```

MariaDB:

```
ALTER TABLE tw_task_data
ADD COLUMN task_context_format SMALLINT,
ADD COLUMN task_context BLOB,
ALGORITHM=INPLACE, LOCK=NONE;
```

#### 1.42.0 - 2024/07/16

### Added

- Support for Spring Boot 3.3.

### Removed

- Support for spring boot 3.1 and 2.7 versions.

#### 1.41.6 - 2024/04/17

### Added
- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with extreme amount of tasks might benefit from this.

- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with
extreme amount of tasks might benefit from this.

#### 1.41.5 - 2024/04/05

### Changed

* Use static methods to create BeanPostProcessors.

#### 1.41.4 - 2024/04/02

### Changed

- `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es).
- Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value.

#### 1.41.3 - 2024/02/29

### Changed

* Add compatibility with Spring Boot 3.2.
* Update dependencies

#### 1.41.2 - 2024/02/16

### Changed
* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet.

* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the
CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet.

#### 1.41.1 - 2023/12/19

### Changed

- When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the
`HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6

#### 1.41.0 - 2023/11/16

### Added

- Added `taskType` and `taskSubType` parameters to management query endpoints.
- Added `/getTaskTypes` endpoint to retrieve list of registered task types and sub-types

## 1.40.6 - 2023/11/16

### Fixed

* NullPointerException in TaskManagementService.getTaskData in case task is not found

#### 1.40.5 - 2023/10/30

### Added

- Setting METADATA_MAX_AGE_CONFIG to two minutes for producer

#### 1.40.4 - 2023/10/06
Expand All @@ -75,7 +120,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Added

* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task types in your service.
* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task
types in your service.

#### 1.40.1 - 2023/07/12

Expand All @@ -86,6 +132,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
#### 1.40.0 - 2023/06/12

### Added

* CronJob annotation for Spring bean's methods

#### 1.39.2 - 2023/06/06
Expand All @@ -99,6 +146,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
#### 1.39.1 - 2023/04/19

### Changed

* Kafka consumer offset duration is always considered as positive since we cannot reset the offsets to future timestamps.
* Both `PT1H` and `-PT1H` are treated the same ie `PT1H`. This value gets subtracted by now() timestamp.
* Added second kafka consumer for the tests in `SeekToDurationOnRebalanceListenerIntTest` class
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.42.0
version=1.43.0
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.transferwise.tasks.testapp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.CompressionAlgorithm;
Expand All @@ -10,16 +12,20 @@
import com.transferwise.tasks.ITasksService.AddTaskResponse;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData;
import com.transferwise.tasks.dao.ITaskSqlMapper;
import com.transferwise.tasks.dao.MySqlTaskTypesMapper;
import com.transferwise.tasks.domain.FullTaskRecord;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.test.ITestTasksService;
import com.transferwise.tasks.test.dao.ITestTaskDao;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -43,6 +49,10 @@ public class TaskDataIntTest extends BaseIntTest {
private TasksProperties tasksProperties;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private ITaskDaoDataSerializer taskDataSerializer;

private CompressionAlgorithm originalAlgorithm;
private int originalMinSize;
Expand Down Expand Up @@ -137,4 +147,59 @@ void oldDataFieldIsNotSetForNewTasks() {
assertThat(oldData).isEqualTo("");
}

@ParameterizedTest
@MethodSource("providedAlgorithms")
@SneakyThrows
void testWritingTaskContextData() {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context);
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));

byte[] contextBlob = jdbcTemplate
.queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId));

TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class);
assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap());

}


@ParameterizedTest
@MethodSource("providedAlgorithms")
void testReadingTaskContextData(String algorithm) {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context)
.setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm)));
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));
assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap());

Task taskRecord = taskDao.getTask(taskId, Task.class);
assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext());

}

private static Stream<String> providedAlgorithms() {
return Stream.of("GZIP", "LZ4", null, "NONE");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.transferwise.tasks.testapp;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.TasksService;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class TaskInterceptionIntTest extends BaseIntTest {

@Autowired
private TasksService tasksService;

@Test
void jambiTaskIsInterceptedCorrectly() {
testTasksService.resetAndDeleteTasksWithTypes("test");
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2);
assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler;
import com.transferwise.tasks.impl.jobs.interfaces.IJob;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor;
import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator;
import com.transferwise.tasks.utils.LogUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC
return props;
};
}

@Bean
ITaskRegistrationDecorator jambiRegistrationInterceptor() {
return new JambiTaskRegistrationDecorator();
}

@Bean
ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) {
return new JambiTaskInterceptor(meterRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class JambiTaskInterceptor implements ITaskProcessingInterceptor {

private final MeterRegistry meterRegistry;

@Override
public void doProcess(Task task, Runnable processor) {
if ("Jambi".equals(task.getSubType())) {
if (task.getTaskContext() != null) {
var name = task.getTaskContext().get("adam-jones", String.class);
meterRegistry.counter("tool", "song", name).increment();
}
}
processor.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import java.util.Map;
import org.springframework.stereotype.Component;

@Component
public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator {

@Override
public AddTaskRequest decorate(AddTaskRequest request) {
if ("Jambi".equals(request.getSubType())) {
return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy")));
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.transferwise.tasks;

import com.transferwise.tasks.ITasksService.AddTaskResponse.Result;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.domain.TaskStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
Expand Down Expand Up @@ -45,6 +46,7 @@ class AddTaskRequest {
private boolean warnWhenTaskExists;
private Duration expectedQueueTime;
private CompressionRequest compression;
private TaskContext taskContext;

@Data
@Accessors(chain = true)
Expand Down Expand Up @@ -95,6 +97,7 @@ class ResumeTaskRequest {
@Data
@Accessors(chain = true)
class RescheduleTaskRequest {

private UUID taskId;
private long version;
private ZonedDateTime runAfterTime;
Expand All @@ -103,6 +106,7 @@ class RescheduleTaskRequest {
@Data
@Accessors(chain = true)
class RescheduleTaskResponse {

private UUID taskId;
private Result result;

Expand All @@ -116,12 +120,14 @@ public enum Result {
@Data
@Accessors(chain = true)
class GetTaskRequest {

private UUID taskId;
}

@Data
@Accessors(chain = true)
class GetTaskResponse {

private UUID taskId;
private String type;
private long version;
Expand Down
Loading
Loading