Skip to content

Commit

Permalink
tw-tasks-kafka-listener-spring-boot-starter - Allow for the consumer …
Browse files Browse the repository at this point in the history
…assignment strategy to be overriden by kafka config (#220)

* Allow for the consumer assignment strategy to be overriden by kafka config

* update the changelog

* address comments

* bump containers versions in gha workflow
  • Loading branch information
maru-tw authored Jan 16, 2025
1 parent 269472f commit 5f9e939
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ jobs:
- name: "Install packages"
run: apt-get update && apt-get install -y git
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: "Gradle cache"
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: |
~/.gradle
Expand All @@ -92,7 +92,7 @@ jobs:
- name: "Test if publishing works"
run: GRADLE_USER_HOME=$HOME/.gradle ./gradlew publishToMavenLocal --console=plain --info --stacktrace
- name: "Publish Test Report"
uses: mikepenz/action-junit-report@v3
uses: mikepenz/action-junit-report@v4
if: always()
with:
check_name: Test Report-(${{ matrix.spring_boot_version }})
Expand All @@ -116,7 +116,7 @@ jobs:
tar -zcvf all-test-reports-${{ matrix.spring_boot_version }}.tar.gz **/build/reports
if: always()
- name: "Store test results"
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: always()
with:
name: all-test-reports-${{ matrix.spring_boot_version }}
Expand All @@ -141,9 +141,9 @@ jobs:
run: |
apt-get update && apt-get install -y git unzip
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: "Gradle cache"
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: |
~/.gradle
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.49.0 - 2025/01/08

### Changed

- Allow for partition.assignment.strategy for consumer to be overridden in tw-tasks-kafka-listener-spring-boot-starter
- Nothing to do for most cases.

#### Consider the following only if you use tw-tasks-kafka-listener-spring-boot-starter

It is worth keeping an eye on:
- changes to assignors used, log `Successfully synced group in generation Generation`
- on assignment strategy failures on consumers in prod and [consumer state](https://dashboards.tw.ee/d/f7094f30-a509-4592-aced-37584a70132a/kafka-consumer-groups-and-lag-details-kminion?orgId=1&refresh=30s&viewPanel=14).

If you use `com.wise.kafka.assignors.CanaryAwareRangeAssignor`, consider setting this config:
```
spring.kafka.consumer.properties.partition.assignment.strategy:
com.wise.kafka.assignors.CanaryAwareRangeAssignor, org.apache.kafka.clients.consumer.RangeAssignor, org.apache.kafka.clients.consumer.CooperativeStickyAssignor
```

## 1.48.2 - 2024/12/20

### Changed
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.48.2
version=1.49.0
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.transferwise.tasks.helpers.kafka.IKafkaListenerConsumerPropertiesProvider;
import com.vdurmont.semver4j.Semver;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -19,7 +20,7 @@ public class SpringKafkaConsumerPropertiesProvider implements IKafkaListenerCons

@Override
public Map<String, Object> getProperties(int shard) {
var props = kafkaProperties.buildConsumerProperties(null);
var props = new HashMap<String, Object>();

try {
var kafkaClientsVersion = new Semver(AppInfoParser.getVersion());
Expand All @@ -31,6 +32,8 @@ public Map<String, Object> getProperties(int shard) {
log.error("Could not understand Kafka client version.", e);
}

props.putAll(kafkaProperties.buildConsumerProperties(null));

return props;
}
}

0 comments on commit 5f9e939

Please sign in to comment.