Skip to content

Commit

Permalink
Merge pull request #38 from vahidhashemian/add_internal_subscribe_to_…
Browse files Browse the repository at this point in the history
…memq_consumer_assign

Add an internal subscribe call to MemQ consumer's assign()
  • Loading branch information
vahidhashemian authored Jun 20, 2024
2 parents 7c90b23 + e1d1936 commit 6da931c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ void unsubscribe() throws Exception {
pscMemqConsumer.close();
}

@Test
void testAssignDoesNotRequireSubscribe() throws Exception {
PscMemqConsumer<byte[], byte[]> pscMemqConsumer = getPscMemqConsumer(
"testAssignDoesNotRequireSubscribe");

pscMemqConsumer.assign(Collections.emptySet());

MemqTopicUri uri1 = MemqTopicUri.validate(TopicUri.validate(testMemqTopic1));
TopicUriPartition topicUriPartition = TestUtils.getFinalizedTopicUriPartition(uri1, 0);

pscMemqConsumer.assign(Sets.newHashSet(topicUriPartition));

pscMemqConsumer.close();
}

@Test
void close() throws Exception {
PscMemqConsumer<byte[], byte[]> pscMemqConsumer = getPscMemqConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public void assign(Set<TopicUriPartition> topicUriPartitions) throws ConsumerExc
currentAssignment.retainAll(topicUriPartitions);
currentAssignment.addAll(topicUriPartitions);
try {
memqConsumer.subscribe(topicUriPartitions.stream().map(TopicUriPartition::getTopicUri)
.map(TopicUri::getTopic).collect(Collectors.toSet()));
memqConsumer.assign(topicUriPartitions.stream().map(TopicUriPartition::getPartition)
.collect(Collectors.toList()));
} catch (Exception exception) {
Expand Down

0 comments on commit 6da931c

Please sign in to comment.