-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Confluent kafka binding #988
Conversation
dd224b3
to
5937f4b
Compare
2a2bdfd
to
d196693
Compare
@embano1 Thanks for your review! |
620866c
to
021e19a
Compare
Thy @yanmxa and sorry for my slow responses. I'm under the water at the moment at work and can't give you a concrete date when I'll be able to review. Perhaps @duglin and @lionelvillard can take another look? |
@yanmxa can you update the documentation: https://github.com/cloudevents/sdk-go/blob/main/docs/protocol_implementations.md? Also I wonder if it makes sense to indicate (maybe in the documentation) this is a new implementation and it hasn't been widely tested in the wild. WDYT? |
Thanks @lionelvillard! I update the protocol binding of the related document, #1008. PTAL~ |
a6ee37f
to
345f8fc
Compare
345f8fc
to
3cbec58
Compare
Thanks @duglin ! |
e133055
to
8a34203
Compare
8a34203
to
f2a38c7
Compare
@duglin @embano1 @lionelvillard PTAL~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't checked the whole PR because for some parts especially structured/binary handling, I'd like to see more tests for non-happy path scenarios and mis-use if possible. Could you please also add Confluent to the Github integration tests so we run this against Confluent images?
contentType = string(header.Value) | ||
} | ||
if k == specs.PrefixedSpecVersionName() { | ||
contentVersion = string(header.Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: what if header.Value
is 1.0.2
but MUST be 1.0
according to
Compliant event producers MUST use a value of 1.0 when referring to this version of the specification
https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#specversion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I‘m not sure how to handle the specversion with value 1.0.2. Maybe just validate the 'major' and 'minor' versions whether compliant with "1.0"? and if not, then print a warning message?
I checked the other protocol. Then all just assert whether the specversion key exists and ignore the value.
Do you have any other advice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, IMHO this is a shortcoming in the other implementations. I'm fine leaving this here as is then given it's consistent with our other implementations. cc/ @duglin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like being forgiving and just ignoring the patch version number - if we want to do anything different at all. I'm ok with the current choice of only looking for 0.3 and 1.0.
f2a38c7
to
91c54be
Compare
Thank @embano1! I added some tests for the misuse cases when trying to initialize the client. If others come up with scenarios beyond these, we can continually improve it in the future. Also, I updated the GitHub integration test workflow to add the kafka_confluent service(running on port 9192, the sarama running on 9092) with the confluent image. So the current confluent kafka protocol uses it for the tests. |
06adc56
to
af8817f
Compare
Please add a comment to the |
Done! |
Please add a doc string to the |
Also, why did you not use |
e9fafcb
to
6836a1c
Compare
@embano1 Thanks for your prompt feedback!
Could you please help me to check if the wording in the above document is appropriate? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just that one comment please
Can you please check your integration tests? They're not looking positive, but passing: https://github.com/cloudevents/sdk-go/actions/runs/8420863202/job/23056453953?pr=988#step:5:284 (example) |
This unit test is also not looking ok: https://github.com/cloudevents/sdk-go/actions/runs/8420863203/job/23056453818#step:4:3883 |
0ac67a3
to
20a4a89
Compare
The error log disappears in the unit test now. |
20a4a89
to
c70613e
Compare
Done! |
c70613e
to
194cce9
Compare
Signed-off-by: myan <[email protected]> add message implementation Signed-off-by: myan <[email protected]> add ut test Signed-off-by: myan <[email protected]> add integration test Signed-off-by: myan <[email protected]> add integration test and samples Signed-off-by: Meng Yan <[email protected]> offset Signed-off-by: Meng Yan <[email protected]> remove the ctx Signed-off-by: myan <[email protected]> review Signed-off-by: Meng Yan <[email protected]> remove Signed-off-by: Meng Yan <[email protected]> init consumer and producer on 1 client Signed-off-by: myan <[email protected]> reply the reviews Signed-off-by: myan <[email protected]> fix the ci Signed-off-by: myan <[email protected]> ci fix Signed-off-by: Meng Yan <[email protected]> add confluent test in github action Signed-off-by: Meng Yan <[email protected]> add the mis-used test case Signed-off-by: Meng Yan <[email protected]> remove the invalidated bootstrapserver Signed-off-by: Meng Yan <[email protected]> log kafka error message Signed-off-by: myan <[email protected]> review Signed-off-by: myan <[email protected]> Update protocol/kafka_confluent/v2/option.go Co-authored-by: Michael Gasch <[email protected]> Signed-off-by: myan <[email protected]> add the auto recover option Signed-off-by: myan <[email protected]> kafka error handler Signed-off-by: myan <[email protected]> remove the delievery chan Signed-off-by: myan <[email protected]> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <[email protected]> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <[email protected]> Update protocol/kafka_confluent/v2/option.go Co-authored-by: Michael Gasch <[email protected]> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <[email protected]> reply review Signed-off-by: myan <[email protected]> modify the git action Signed-off-by: Meng Yan <[email protected]> reply review Signed-off-by: Meng Yan <[email protected]> reply review Signed-off-by: Meng Yan <[email protected]> handle race condition between sender and closer Signed-off-by: Meng Yan <[email protected]> reply review Signed-off-by: Meng Yan <[email protected]> reply review1 Signed-off-by: Meng Yan <[email protected]> add defer close Signed-off-by: Meng Yan <[email protected]> add comment Signed-off-by: Meng Yan <[email protected]>
194cce9
to
5cd87ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@duglin can you please take a final look just overall whether we missed a README or general requirements for a new protocol binding? |
Sorry, been swamped. I'll try to give it a look today |
@lionelvillard too :-) |
@@ -27,6 +27,15 @@ jobs: | |||
- 9091:9091 | |||
- 9092:9092 | |||
|
|||
kafka_confluent: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the one above from "kafka" to "kafka_sarama" to be consistent with the rest of the naming?
LGTM I'll let @embano1 hit the button. |
Signed-off-by: myan [email protected]
Adding the Kafka binding with https://github.com/confluentinc/confluent-kafka-go. This implementation will support the following features compared to the sarama binding.
Note: In the confluent samples, with configuration WithPollGoroutines(1) to ensure the order of the messages on the partition.
Since Kafka already natively utilizes consumer groups to enhance consumption efficiency, there is no need to use multiple goroutines to boost consumption speed.
Resolve: #918