Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka. timeout #1250

Open
akuz0 opened this issue Nov 2, 2024 · 4 comments
Open

Kafka. timeout #1250

akuz0 opened this issue Nov 2, 2024 · 4 comments

Comments

@akuz0
Copy link
Contributor

akuz0 commented Nov 2, 2024

Citrus Version
4.3.3

Question
test case

  1. SUT sends an event in the topic
  2. Validate the event in the topic

Why can't I subtract events? And why poll it is called once?
Maybe I'm doing something wrong.

What I've tried so far
If you run a group of tests, all the tests after the first one fall
image
I think, problem
Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted.

log
2024-11-02 20:49:31 DEBUG AbstractFetch:194 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Fetch read_uncommitted at offset 114 for partition ePC-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=116, lastStableOffset=116, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=661, buffer=java.nio.HeapByteBuffer[pos=0 lim=661 cap=664])) 2024-11-02 20:49:31 DEBUG AbstractFetch:278 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Removing pending request for fetch session: 613051 for node: IP:9092 (id: 1 rack: null) 2024-11-02 20:49:31 DEBUG NetworkClient:954 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, correlationId=16, headerVersion=2): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='ePC', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])]) 2024-11-02 20:51:28 DEBUG ConsumerCoordinator:1326 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Committed offset 114 for partition -0 2024-11-02 20:51:28 INFO ConsumerCoordinator:999 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted. 2024-11-02 20:51:28 INFO ConsumerCoordinator:1012 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Requesting disconnect from last known coordinator IP:9092 (id: 2147483646 rack: null) 2024-11-02 20:51:28 INFO NetworkClient:343 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Client requested disconnect from node 2147483646

Additional information
If in debug mode, when citrus send throw, i use poll and see my event. Why poll it is called once?

for example while

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        executorService.submit(() -> {
            processRecord(record); // Process the record asynchronously
        });
    }
}

https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

image

I can use while in code, but i have timeout exception instante

while (true) {
 testActionRunner.run(
                        receive()
                                .selector("citrus_kafka_messageKey ="+Id)
                                .endpoint(topic)
);
}
@akuz0
Copy link
Contributor Author

akuz0 commented Nov 5, 2024

I`m solved it, but i think it not correctly
image

   HashMap<String, Object> properties = new HashMap<>();
        properties.put("max.poll.records", 2);

@bbortt
Copy link
Collaborator

bbortt commented Nov 5, 2024

is it possible that both your tests use the same Id? I have a gut feeling that this is related to the fact that a citrus endpoint receive message can only return exactly one Message. if multiple match the same Id that would explain the problem. your selector must be unique!

@akuz0
Copy link
Contributor Author

akuz0 commented Nov 7, 2024

@bbortt maybe, how set selector by message body for Kafka? jsonPath works?

.selector(Collections.singletonMap("jsonPath:$.data.commandId", id))

@bbortt
Copy link
Collaborator

bbortt commented Nov 7, 2024

@akuz0 no, I've focused on header-based filtering for now. I can take this as a feature request, if you need it? I think JSON-path-based value (message body) matching sounds reasonable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants