-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replay the unprocessed event from transport when the manager restarts #751
Conversation
4d5cec5
to
7979df9
Compare
/test ci/prow/test-unit |
@yanmxa: The specified target(s) for
Use In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/test test-unit |
1 similar comment
/test test-unit |
/retest |
/retest |
@@ -114,3 +114,11 @@ CREATE TABLE IF NOT EXISTS history.local_compliance_job_log ( | |||
offsets int8, | |||
error TEXT | |||
); | |||
|
|||
CREATE TABLE IF NOT EXISTS status.transport ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle the upgrade case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clyang82 What do you mean the upgrade case?
The table hasn't been created before. So it will create the table if it database doesn't contain the table. And the operator will handle the upgrade.
|
||
CREATE TABLE IF NOT EXISTS status.transport ( | ||
-- transport name, it is the topic name for the kafka transport | ||
name character varying(254) PRIMARY KEY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topic name or managed hub cluster name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the topic name, cause the table only for transport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hoh=# select * from status.transport;
name | payload | created_at | updated_at
------------------+--------------------------------+----------------------------+----------------------------
status.kind-hub1 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802 | 2024-01-09 07:27:48.741223
status.kind-hub2 | {"offset": 16, "partition": 0} | 2024-01-04 02:37:05.56802 | 2024-01-09 07:27:48.741223
if c.withDatabasePosition { | ||
db := database.GetGorm() | ||
var positions []models.Transport | ||
err := db.Find(&positions).Error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to find all or find by name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transport only caches the manager receiver topics. So It's okay to find all the topics for now.
Since all the topics are dynamically created and match the regular ^status.*
. We can also add the filter with the like statement where name ~ '^status*'
Signed-off-by: myan <[email protected]>
Signed-off-by: myan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
df1c47b
to
cd1df4e
Compare
Signed-off-by: myan <[email protected]>
Quality Gate passedKudos, no new issues were introduced! 0 New issues |
if err != nil { | ||
return nil, err | ||
} | ||
offsetToStart = append(offsetToStart, kafka.TopicPartition{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the offsetToStart may contain the old topicpartition information if the managed hub cluster is detached. Need to cleanup the table to keep it up-to-date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: clyang82, yanmxa The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: myan [email protected]
Ref: https://issues.redhat.com/browse/ACM-9056
Blocked by cloudevents/sdk-go#988, cause the sarama binding cannot consume the event from the persist position.