Skip to content

Commit

Permalink
Fixes MDCs handling and reduces context switches. (#211)
Browse files Browse the repository at this point in the history
Fixes MDCs handling and reduces context switches.
  • Loading branch information
onukristo authored Oct 31, 2024
1 parent b576433 commit 3d9d10d
Show file tree
Hide file tree
Showing 22 changed files with 317 additions and 211 deletions.
43 changes: 25 additions & 18 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### 1.44.0 - 2024/10/16
## 1.45.0 - 2024/10/30

### Changed

- Context switches are reduced.
- MDC values are cleared more aggressively.

## 1.44.0 - 2024/10/16

- When registering cron tasks, log error if job already exists, but task is in error state.
- If silent mode is turned on, then this log will not appear.
- If silent mode is turned on, then this log will not appear.

#### 1.43.0 - 2024/08/09
## 1.43.0 - 2024/08/09

- Added support for task context

Expand All @@ -34,7 +41,7 @@ ALTER TABLE tw_task_data WAIT 2
LOCK = NONE;
```

#### 1.42.0 - 2024/07/16
## 1.42.0 - 2024/07/16

### Added

Expand All @@ -44,48 +51,48 @@ ALTER TABLE tw_task_data WAIT 2

- Support for spring boot 3.1 and 2.7 versions.

#### 1.41.6 - 2024/04/17
## 1.41.6 - 2024/04/17

### Added

- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with
extreme amount of tasks might benefit from this.

#### 1.41.5 - 2024/04/05
## 1.41.5 - 2024/04/05

### Changed

* Use static methods to create BeanPostProcessors.

#### 1.41.4 - 2024/04/02
## 1.41.4 - 2024/04/02

### Changed

- `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es).
- Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value.

#### 1.41.3 - 2024/02/29
## 1.41.3 - 2024/02/29

### Changed

* Add compatibility with Spring Boot 3.2.
* Update dependencies

#### 1.41.2 - 2024/02/16
## 1.41.2 - 2024/02/16

### Changed

* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the
CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet.

#### 1.41.1 - 2023/12/19
## 1.41.1 - 2023/12/19

### Changed

- When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the
`HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6

#### 1.41.0 - 2023/11/16
## 1.41.0 - 2023/11/16

### Added

Expand All @@ -98,19 +105,19 @@ ALTER TABLE tw_task_data WAIT 2

* NullPointerException in TaskManagementService.getTaskData in case task is not found

#### 1.40.5 - 2023/10/30
## 1.40.5 - 2023/10/30

### Added

- Setting METADATA_MAX_AGE_CONFIG to two minutes for producer

#### 1.40.4 - 2023/10/06
## 1.40.4 - 2023/10/06

### Fixed

* Monitoring queries for Postgres finding approximate table sizes in the databases were using a wrong schema and thus no records were found.

#### 1.40.3 - 2023/08/01
## 1.40.3 - 2023/08/01

### Added

Expand All @@ -122,26 +129,26 @@ ALTER TABLE tw_task_data WAIT 2
* Build against Spring Boot 2.7.11 --> 2.7.14
* Build against Spring Boot 2.6.14 --> 2.6.15

#### 1.40.2 - 2023/07/14
## 1.40.2 - 2023/07/14

### Added

* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task
types in your service.

#### 1.40.1 - 2023/07/12
## 1.40.1 - 2023/07/12

### Fixed

* `commitSync` operation sometimes reporting a WakeupException.

#### 1.40.0 - 2023/06/12
## 1.40.0 - 2023/06/12

### Added

* CronJob annotation for Spring bean's methods

#### 1.39.2 - 2023/06/06
## 1.39.2 - 2023/06/06

### Fixed

Expand Down
7 changes: 7 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ topics configurations. Could refactor the properties to more hierarhical structu
20. Add metric for how long it takes a task from adding to processing. Or scheduling time to processing.

23. Start using Avro or other binary messages for triggering queue. This Json crap is expensive?

25. Check automatically if the concurrency policy returned is the same instance.
Unfortunately, it is quite common for services to create a new instance everytime we ask it, for let's say SimpleConcurrencyPolicy, and with
doing that, losing any concurrency control.
We could allow to still return a separate instance, if for example a special property is set.

26. Add DSL from Wkp example.
6 changes: 2 additions & 4 deletions build.common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ group = "com.transferwise.tasks"
apply plugin: "java-library"
apply plugin: "checkstyle"
apply plugin: "idea"
apply plugin: "com.github.spotbugs"

apply from: "../build.libraries.gradle"

Expand All @@ -16,7 +15,7 @@ repositories {
}

configurations {
all {
configureEach {
exclude(group: 'junit', module: 'junit')
exclude(group: 'org.junit.vintage', module: 'junit-vintage-engine')

Expand Down Expand Up @@ -134,7 +133,6 @@ test {
}

tasks.findAll { it.name.startsWith("spotbugs") }*.configure {
effort = "max"
excludeFilter = file('../spotbugs-exclude.xml')
reports {
xml.required = true
Expand All @@ -143,7 +141,7 @@ tasks.findAll { it.name.startsWith("spotbugs") }*.configure {
}


tasks.withType(Checkstyle) {
tasks.withType(Checkstyle).configureEach {
config = resources.text.fromFile(file('../google_checks.xml'))

maxWarnings = 0
Expand Down
18 changes: 13 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import org.eclipse.jgit.api.errors.RefAlreadyExistsException
import com.github.spotbugs.snom.Confidence
import com.github.spotbugs.snom.Effort

buildscript {
if (!project.hasProperty("springBootVersion")) {
ext.springBootVersion = System.getenv("SPRING_BOOT_VERSION") ?: "3.2.2"
}
dependencies {
classpath "com.avast.gradle:gradle-docker-compose-plugin:0.16.4"
classpath "com.avast.gradle:gradle-docker-compose-plugin:0.17.10"
}
}

plugins {
id "com.github.spotbugs" version "5.0.14" apply false
id "com.github.spotbugs" version "6.0.+"
id 'org.springframework.boot' version "$springBootVersion" apply false
id "idea"
id 'org.ajoberstar.grgit' version '5.2.0'
id 'io.github.gradle-nexus.publish-plugin' version "1.1.0"
id 'org.ajoberstar.grgit' version '5.3.0'
id 'io.github.gradle-nexus.publish-plugin' version "2.0.0"

}

Expand All @@ -36,7 +38,7 @@ idea.module {
excludeDirs += file('logs2')
}

task tagRelease {
tasks.register('tagRelease') {
doLast {
try {
grgit.tag.add {
Expand Down Expand Up @@ -65,3 +67,9 @@ nexusPublishing {
tasks.findByName("initializeSonatypeStagingRepository").setOnlyIf {
System.getenv("OSS_SIGNING_KEY")
}

spotbugs {
effort = Effort.valueOf('MAX')
reportLevel = Confidence.valueOf('DEFAULT')
}

10 changes: 5 additions & 5 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
ext {
libraries = [
// version defined
awaitility : 'org.awaitility:awaitility:4.2.0',
awaitility : 'org.awaitility:awaitility:4.2.2',
twGafferJtaStarter : 'com.transferwise.common:tw-gaffer-jta-starter:2.2.0',
twGafferJtaJakartaStarter : 'com.transferwise.common:tw-gaffer-jta-jakarta-starter:3.0.0',
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.5.0",
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.7.1",
apacheCommonsCollections : "org.apache.commons:commons-collections4:4.4",
commonsIo : "commons-io:commons-io:2.15.1",
guava : "com.google.guava:guava:33.0.0-jre",
guava : "com.google.guava:guava:33.3.1-jre",
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2',
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final",
semver4j : "com.vdurmont:semver4j:3.1.0",
Expand All @@ -20,10 +20,10 @@ ext {
twIncidents : 'com.transferwise.common:tw-incidents:1.2.2',
twLeaderSelector : "com.transferwise.common:tw-leader-selector:1.10.2",
twLeaderSelectorStarter : "com.transferwise.common:tw-leader-selector-starter:1.10.2",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.12.4",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.13.0",
twEntryPointsStarter : 'com.transferwise.common:tw-entrypoints-starter:2.16.0',
twSpyqlStarter : 'com.transferwise.common:tw-spyql-starter:1.6.2',
lubenZstd : 'com.github.luben:zstd-jni:1.5.5-2',
lubenZstd : 'com.github.luben:zstd-jni:1.5.6-7',
lz4Java : 'org.lz4:lz4-java:1.8.0',

// versions managed by spring-boot-dependencies platform
Expand Down
3 changes: 1 addition & 2 deletions demoapp/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ services:
# --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --transaction-isolation=READ-COMMITTED"

mariadb:
image: mariadb:10.6
image: mariadb:10.11
hostname: mysql
ports:
- "13307:3306"
Expand All @@ -78,7 +78,6 @@ services:
--innodb_sync_array_size=16
--innodb_log_file_size=10g
--query_cache_type=0
--innodb_adaptive_hash_index=0
--collation-server=utf8mb4_unicode_ci --transaction-isolation=READ-COMMITTED
--innodb_autoinc_lock_mode=2
"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.44.0
version=1.45.0
org.gradle.internal.http.socketTimeout=120000
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.transferwise.tasks.core.autoconfigure;

import com.transferwise.tasks.helpers.CoreMetricsTemplate;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;

@Slf4j
public class TwTasksEnvironmentPostProcessor implements EnvironmentPostProcessor {

private static final String PROPERTY_SOURCE_KEY = TwTasksEnvironmentPostProcessor.class.getName();
static final String TW_OBS_BASE_EXTREMUM_CONFIG_PATH = "transferwise.observability.base.metrics.local-extremum-gauge-names.tw-tasks";

@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
PropertySource<?> propertySource = environment.getPropertySources().get(PROPERTY_SOURCE_KEY);
if (propertySource == null) {
final HashMap<String, Object> map = new HashMap<>();

// Calculate last minute min/max using tw-observability-base local extremums.
Set<String> gaugeNames = new HashSet<>();
gaugeNames.add(CoreMetricsTemplate.GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT);
gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_RUNNING_TASKS_COUNT);
gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT);
gaugeNames.add(CoreMetricsTemplate.GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT);

map.put(TW_OBS_BASE_EXTREMUM_CONFIG_PATH, gaugeNames);

MapPropertySource mapPropertySource = new MapPropertySource(PROPERTY_SOURCE_KEY, map);
environment.getPropertySources().addLast(mapPropertySource);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.transferwise.tasks.core.autoconfigure.TwTasksCoreAutoConfiguration
org.springframework.context.ApplicationListener=com.transferwise.tasks.TwTasksApplicationListener
org.springframework.boot.env.EnvironmentPostProcessor=com.transferwise.tasks.core.autoconfigure.TwTasksEnvironmentPostProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

Expand Down Expand Up @@ -58,7 +59,9 @@ protected void registerUniqueBucketIds() {
for (String bucketId : bucketIds) {
globalProcessingState.getBuckets().put(bucketId,
new GlobalProcessingState.Bucket(priorityManager.getHighestPriority(), priorityManager.getLowestPriority())
.setBucketId(bucketId));
.setBucketId(bucketId)
.setTasksGrabbingSemaphore(new Semaphore(tasksProperties.getTaskGrabbingMaxConcurrency()))
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) {
protected String insertTaskSql;
protected String insertUniqueTaskKeySql;
protected String insertTaskDataSql;
protected String insertTaskContext;
protected String setToBeRetriedSql;
protected String setToBeRetriedSql1;
protected String grabForProcessingWithStatusAssertionSql;
Expand Down Expand Up @@ -286,7 +285,7 @@ public InsertTaskResponse insertTask(InsertTaskRequest request) {
SerializedData serializedContext = taskDataSerializer.serialize(contextBlob, request.getCompression());
jdbcTemplate.update(
insertTaskDataSql,
args(taskId, Integer.valueOf(serializedData.getDataFormat()), serializedData.getData(), serializedContext.getDataFormat(),
args(taskId, serializedData.getDataFormat(), serializedData.getData(), serializedContext.getDataFormat(),
serializedContext.getData())
);
if (request.getData() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ public class EntryPointsService implements IEntryPointsService {
@Autowired
private UnitOfWorkManager unitOfWorkManager;

@Autowired
private MdcService mdcService;

@Override
public <T> T continueOrCreate(String group, String name, Supplier<T> supplier) {
if (TwContext.current().isEntryPoint()) {
return supplier.get();
return mdcService.with(() -> supplier.get());
}

return unitOfWorkManager.createEntryPoint(group, name).toContext().execute(supplier);
return createEntrypoint(group, name, supplier);
}

@Override
public <T> T createEntrypoint(String group, String name, Supplier<T> supplier) {
return unitOfWorkManager.createEntryPoint(group, name).toContext().execute(mdcService.with(() -> supplier));
}
}
Loading

0 comments on commit 3d9d10d

Please sign in to comment.