-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support gRPC client stream/unary oneway #1384
base: develop
Are you sure you want to change the base?
Conversation
...fka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java
Outdated
Show resolved
Hide resolved
...fka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java
Outdated
Show resolved
Hide resolved
...ka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java
Outdated
Show resolved
Hide resolved
...ka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are we implicitly sending the empty message in the grpc reply to the client?
Naturally that is not coming from Kafka because we are doing produce only oneway.
...ity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt
Outdated
Show resolved
Hide resolved
...io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java
Outdated
Show resolved
Hide resolved
...java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java
Outdated
Show resolved
Hide resolved
...java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java
Show resolved
Hide resolved
...ity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt
Outdated
Show resolved
Hide resolved
protected void onKafkaData( | ||
long traceId, | ||
long authorization, | ||
long budgetId, | ||
int reserved, | ||
int flags, | ||
OctetsFW payload, | ||
KafkaDataExFW kafkaDataEx) | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the same design as HttpKafkaProduceNoReply
, where the onKafkaData
method in super type is non-abstract and empty implementation. Then subclasses are not forced to override the method where it is not needed.
Note: AFAIK, KafkaProduceProxy
does not actually call onKafkaData
on HttpProxy
delegate.
@@ -1707,7 +2116,8 @@ private void doKafkaData( | |||
|
|||
private void doKafkaEnd( | |||
long traceId, | |||
long authorization) | |||
long authorization, | |||
boolean tombstone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid changing the signature of doKafkaEnd
to handle two different use cases that are not needed by the same implementation instance.
Instead, our overall design approach here has been to use strategy objects dedicated to each use case, adjusting the behavior as needed in each implementation class.
Suggest introducing a KafkaProduceNoReplyProxy
that is a subclass of KafkaProduceProxy
where doKafkaEnd
is protected and overridden in the subclass to get the desired behavior.
Then the type of the delegate in the Grpc stream remains KafkaProduceProxy
but instantiates KafkaProduceProxy
for GrpcProduceProxy
and KafkaProduceNoReplyProxy
for GrpcProduceNoReplyProxy
. In each case calling delegate.doKafkaEnd(traceId, authorization)
causes the desired behavior due to the method override.
Fixes #642