Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: add pullBinlogs interface for dm-worker to get local/grpc relay log #2216

Closed
wants to merge 10 commits into from
Closed
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrWorkerFailConnectMaster,[code=40077:class=dm-worker:scope=internal:level=high], "Message: cannot join with master endpoints: %v, error: %v, Workaround: Please check network connection of worker and check worker name is unique."
ErrWorkerRelayConfigChanging,[code=40079:class=dm-worker:scope=internal:level=low], "Message: relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s, Workaround: Please try again later"
ErrWorkerPullBinlogsInvalidRequest,[code=40080:class=dm-worker:scope=internal:level=low], "Message: request for PullBinlogs is invalid, reason: %s, Workaround: Please check the worker's status using `dmctl list-member`"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down
2 changes: 1 addition & 1 deletion cmd/dm-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
log.L().Info("", zap.Stringer("dm-syncer conf", conf))
})

sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer.
sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL, relay log for singleton syncer.
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

Expand Down
1,022 changes: 884 additions & 138 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

279 changes: 278 additions & 1 deletion dm/pbmock/dmworker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ service Worker {
rpc HandleError(HandleWorkerErrorRequest) returns(CommonWorkerResponse) {}

rpc GetWorkerCfg(GetWorkerCfgRequest) returns(GetWorkerCfgResponse) {}

// Sends binlog stream from a given location.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
rpc PullBinlogs(PullBinlogReq) returns (stream PullBinlogResp) {}
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

enum TaskOp {
Expand Down Expand Up @@ -372,4 +375,29 @@ message GetWorkerCfgRequest {

message GetWorkerCfgResponse {
string cfg = 1;
}

// The Location describes the starts position of a binlog, could be either gtid or position way.
message Location {
// The suffix of binlog file, like .000001 .000002
string name = 1;
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved

// The binlog offset in a file.
uint32 pos = 2;

// The binlog offset in a file.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
string gtid = 3;
}

message PullBinlogReq {
// Specifies which source of binlog to pull.
string source = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the value of source? source-name or server-id?

there's another layer of sequence in relay dir, can it handle that?

<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Source is source-name. UUID is the detail that relay to think about. For the dm-worker layer I think we'd better use source.
  2. Current relay can handle this so I think it's okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relay log filename maybe the same under different relay dir, for pos-based sync, it maybe ambiguous

<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001
--|_mysql-000001
|-- another-server-under-load-balancer.000001
--|_mysql-000001

Copy link
Contributor Author

@lichunzhu lichunzhu Oct 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Relay replicated by binlog position shouldn't support switch MySQL leader/follower. So I think using Source doesn't have a problem.


// The position from which the binlog will be sent.
Location startFrom = 2;
}

message PullBinlogResp {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include a error field here? What if the source is not exist in the target worker?

// The binlog entity that send in a stream
bytes Payload = 1;
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}
Loading