Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

master: add operate-source show #838

Merged
merged 11 commits into from
Jul 31, 2020
12 changes: 9 additions & 3 deletions dm/ctl/master/operate_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
// NewOperateSourceCmd creates a OperateSource command
func NewOperateSourceCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "operate-source <operate-type> <config-file> [config-file ...] [--print-sample-config]",
Short: "create/update/stop upstream MySQL/MariaDB source",
Use: "operate-source <operate-type> [config-file ...] [--print-sample-config]",
Short: "create/update/stop/show upstream MySQL/MariaDB source",
Run: operateSourceFunc,
}
cmd.Flags().BoolP("print-sample-config", "p", false, "print sample config file of source")
Expand All @@ -46,6 +46,8 @@ func convertCmdType(t string) pb.SourceOp {
return pb.SourceOp_UpdateSource
case "stop":
return pb.SourceOp_StopSource
case "show":
return pb.SourceOp_ShowSource
default:
return pb.SourceOp_InvalidSourceOp
}
Expand Down Expand Up @@ -73,7 +75,7 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) {
return
}

if len(cmd.Flags().Args()) < 2 {
if len(cmd.Flags().Args()) < 1 {
cmd.SetOut(os.Stdout)
cmd.Usage()
return
Expand All @@ -85,6 +87,10 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) {
common.PrintLines("invalid operate '%s' on worker", cmdType)
return
}
if op != pb.SourceOp_ShowSource && len(cmd.Flags().Args()) == 1 {
common.PrintLines("operate-source create/update/stop should specify config-file(s)")
return
}

contents := make([]string, len(cmd.Flags().Args())-1)
for i := 1; i < len(cmd.Flags().Args()); i++ {
Expand Down
12 changes: 12 additions & 0 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,18 @@ func (s *Scheduler) RemoveTaskCfg(task string) error {
return nil
}

// GetSourceCfgIDs gets all added source ID
func (s *Scheduler) GetSourceCfgIDs() []string {
s.mu.RLock()
defer s.mu.RUnlock()

id := make([]string, 0, len(s.sourceCfgs))
for i := range s.sourceCfgs {
id = append(id, i)
}
return id
}

// GetSourceCfgByID gets source config by source ID.
func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig {
s.mu.RLock()
Expand Down
47 changes: 23 additions & 24 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,10 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
resp.Msg = err.Error()
return resp, nil
}
var workers []*scheduler.Worker

// boundM: sourceID -> worker are used to query status from worker, to return a more real status
boundM := map[string]*scheduler.Worker{}

switch req.Op {
case pb.SourceOp_StartSource:
var (
Expand Down Expand Up @@ -1458,18 +1461,16 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
return resp, nil
}
// for start source, we should get worker after start source
workers = make([]*scheduler.Worker, len(started))
for i, sid := range started {
workers[i] = s.scheduler.GetWorkerBySource(sid)
for _, sid := range started {
boundM[sid] = s.scheduler.GetWorkerBySource(sid)
}
case pb.SourceOp_UpdateSource:
// TODO: support SourceOp_UpdateSource later
resp.Msg = "Update worker config is not supported by dm-ha now"
return resp, nil
case pb.SourceOp_StopSource:
workers = make([]*scheduler.Worker, len(cfgs))
for i, cfg := range cfgs {
workers[i] = s.scheduler.GetWorkerBySource(cfg.SourceID)
for _, cfg := range cfgs {
boundM[cfg.SourceID] = s.scheduler.GetWorkerBySource(cfg.SourceID)
err := s.scheduler.RemoveSourceCfg(cfg.SourceID)
// TODO(lance6716):
// user could not copy-paste same command if encounter error halfway:
Expand All @@ -1484,6 +1485,10 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
return resp, nil
}
}
case pb.SourceOp_ShowSource:
for _, id := range s.scheduler.GetSourceCfgIDs() {
boundM[id] = s.scheduler.GetWorkerBySource(id)
}
default:
resp.Msg = terror.ErrMasterInvalidOperateOp.Generate(req.Op.String(), "source").Error()
return resp, nil
Expand All @@ -1492,7 +1497,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
resp.Result = true
var noWorkerMsg string
switch req.Op {
case pb.SourceOp_StartSource:
case pb.SourceOp_StartSource, pb.SourceOp_ShowSource:
noWorkerMsg = "source is added but there is no free worker to bound"
case pb.SourceOp_StopSource:
noWorkerMsg = "source is stopped and hasn't bound to worker before being stopped"
Expand All @@ -1503,16 +1508,15 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
workerToCheck []string
)

for i := range workers {
w := workers[i]
for id, w := range boundM {
if w == nil {
resp.Sources = append(resp.Sources, &pb.CommonWorkerResponse{
Result: true,
Msg: noWorkerMsg,
Source: cfgs[i].SourceID,
Source: id,
})
} else {
sourceToCheck = append(sourceToCheck, cfgs[i].SourceID)
sourceToCheck = append(sourceToCheck, id)
workerToCheck = append(workerToCheck, w.BaseInfo().Name)
}
}
Expand Down Expand Up @@ -1685,7 +1689,7 @@ func (s *Server) waitOperationOk(ctx context.Context, cli *scheduler.Worker, tas
case *pb.OperateSourceRequest:
req := masterReq.(*pb.OperateSourceRequest)
switch req.Op {
case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource:
case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource, pb.SourceOp_ShowSource:
expect = pb.Stage_Running
case pb.SourceOp_StopSource:
expect = pb.Stage_Stopped
Expand Down Expand Up @@ -1833,19 +1837,14 @@ func (s *Server) handleOperationResult(ctx context.Context, cli *scheduler.Worke
}

func sortCommonWorkerResults(sourceRespCh chan *pb.CommonWorkerResponse) []*pb.CommonWorkerResponse {
sourceRespMap := make(map[string]*pb.CommonWorkerResponse, cap(sourceRespCh))
sources := make([]string, 0, cap(sourceRespCh))
sourceResps := make([]*pb.CommonWorkerResponse, 0, cap(sourceRespCh))
for len(sourceRespCh) > 0 {
sourceResp := <-sourceRespCh
sourceRespMap[sourceResp.Source] = sourceResp
sources = append(sources, sourceResp.Source)
}
// TODO: simplify logic of response sort
sort.Strings(sources)
sourceResps := make([]*pb.CommonWorkerResponse, 0, len(sources))
for _, source := range sources {
sourceResps = append(sourceResps, sourceRespMap[source])
r := <-sourceRespCh
sourceResps = append(sourceResps, r)
}
sort.Slice(sourceResps, func(i, j int) bool {
return sourceResps[i].Source < sourceResps[j].Source
})
return sourceResps
}

Expand Down
Loading