Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gRPC client stream/unary oneway #1384

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from

Conversation

ankitk-me
Copy link
Contributor

Fixes #642

@ankitk-me ankitk-me marked this pull request as draft January 28, 2025 07:37
@ankitk-me ankitk-me requested a review from jfallows January 28, 2025 07:38
@ankitk-me ankitk-me marked this pull request as ready for review January 29, 2025 11:32
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we implicitly sending the empty message in the grpc reply to the client?
Naturally that is not coming from Kafka because we are doing produce only oneway.

Comment on lines +1907 to +1916
protected void onKafkaData(
long traceId,
long authorization,
long budgetId,
int reserved,
int flags,
OctetsFW payload,
KafkaDataExFW kafkaDataEx)
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the same design as HttpKafkaProduceNoReply, where the onKafkaData method in super type is non-abstract and empty implementation. Then subclasses are not forced to override the method where it is not needed.

Note: AFAIK, KafkaProduceProxy does not actually call onKafkaData on HttpProxy delegate.

@@ -1707,7 +2116,8 @@ private void doKafkaData(

private void doKafkaEnd(
long traceId,
long authorization)
long authorization,
boolean tombstone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid changing the signature of doKafkaEnd to handle two different use cases that are not needed by the same implementation instance.

Instead, our overall design approach here has been to use strategy objects dedicated to each use case, adjusting the behavior as needed in each implementation class.

Suggest introducing a KafkaProduceNoReplyProxy that is a subclass of KafkaProduceProxy where doKafkaEnd is protected and overridden in the subclass to get the desired behavior.

Then the type of the delegate in the Grpc stream remains KafkaProduceProxy but instantiates KafkaProduceProxy for GrpcProduceProxy and KafkaProduceNoReplyProxy for GrpcProduceNoReplyProxy. In each case calling delegate.doKafkaEnd(traceId, authorization) causes the desired behavior due to the method override.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support gRPC client streaming to Kafka directly
2 participants