Skip to content

Commit

Permalink
api, cli, upstream(ticdc): support basic authentication via upstream …
Browse files Browse the repository at this point in the history
…tidb (#10583)

close #10544
  • Loading branch information
CharlesCheung96 authored and 3AceShowHand committed Feb 22, 2024
1 parent fa615c6 commit 203d570
Show file tree
Hide file tree
Showing 34 changed files with 1,314 additions and 149 deletions.
60 changes: 60 additions & 0 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/upstream"
"go.uber.org/zap"
)

Expand All @@ -36,6 +38,7 @@ func LogMiddleware() gin.HandlerFunc {
start := time.Now()
path := c.Request.URL.Path
query := c.Request.URL.RawQuery
user, _, _ := c.Request.BasicAuth()
c.Next()

cost := time.Since(start)
Expand All @@ -53,6 +56,7 @@ func LogMiddleware() gin.HandlerFunc {
zap.String("query", query),
zap.String("ip", c.ClientIP()),
zap.String("user-agent", c.Request.UserAgent()), zap.String("client-version", version),
zap.String("username", user),
zap.Error(stdErr),
zap.Duration("duration", cost),
)
Expand Down Expand Up @@ -185,3 +189,59 @@ func CheckServerReadyMiddleware(capture capture.Capture) gin.HandlerFunc {
}
}
}

// AuthenticateMiddleware authenticates the request by query upstream TiDB.
func AuthenticateMiddleware(capture capture.Capture) gin.HandlerFunc {
return func(ctx *gin.Context) {
serverCfg := config.GetGlobalServerConfig()
if serverCfg.Security.ClientUserRequired {
up, err := getUpstream(capture)
if err != nil {
_ = ctx.Error(err)
ctx.Abort()
return
}

if err := verify(ctx, up); err != nil {
ctx.IndentedJSON(http.StatusUnauthorized, model.NewHTTPError(err))
ctx.Abort()
return
}
}
ctx.Next()
}
}

func getUpstream(capture capture.Capture) (*upstream.Upstream, error) {
m, err := capture.GetUpstreamManager()
if err != nil {
return nil, errors.Trace(err)
}
return m.GetDefaultUpstream()
}

func verify(ctx *gin.Context, up *upstream.Upstream) error {
// get the username and password from the authorization header
username, password, ok := ctx.Request.BasicAuth()
if !ok {
errMsg := "please specify the user and password via authorization header"
return errors.ErrCredentialNotFound.GenWithStackByArgs(errMsg)
}

allowed := false
serverCfg := config.GetGlobalServerConfig()
for _, user := range serverCfg.Security.ClientAllowedUser {
if user == username {
allowed = true
break
}
}
if !allowed {
errMsg := "The user is not allowed."
return errors.ErrUnauthorized.GenWithStackByArgs(username, errMsg)
}
if err := up.VerifyTiDBUser(ctx, username, password); err != nil {
return errors.ErrUnauthorized.GenWithStackByArgs(username, err.Error())
}
return nil
}
25 changes: 7 additions & 18 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ import (
"go.uber.org/zap"
)

const (
// OpVarAdminJob is the key of admin job in HTTP API
OpVarAdminJob = "admin-job"
// OpVarChangefeedID is the key of changefeed ID in HTTP API
OpVarChangefeedID = "cf-id"
// OpVarTargetCaptureID is the key of to-capture ID in HTTP API
OpVarTargetCaptureID = "target-cp-id"
// OpVarTableID is the key of table ID in HTTP API
OpVarTableID = "table-id"
)

type commonResp struct {
Status bool `json:"status"`
Message string `json:"message"`
Expand Down Expand Up @@ -130,15 +119,15 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques
api.WriteError(w, http.StatusInternalServerError, err)
return
}
typeStr := req.Form.Get(OpVarAdminJob)
typeStr := req.Form.Get(api.OpVarAdminJob)
typ, err := strconv.ParseInt(typeStr, 10, 64)
if err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid admin job type: %s", typeStr))
return
}
job := model.AdminJob{
CfID: model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID)),
CfID: model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID)),
Type: model.AdminJobType(typ),
}

Expand All @@ -158,7 +147,7 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque
api.WriteError(w, http.StatusInternalServerError, err)
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
Expand All @@ -182,19 +171,19 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
cerror.WrapError(cerror.ErrInternalServerError, err))
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
return
}
to := req.Form.Get(OpVarTargetCaptureID)
to := req.Form.Get(api.OpVarTargetCaptureID)
if err := model.ValidateChangefeedID(to); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid target capture id: %s", to))
return
}
tableIDStr := req.Form.Get(OpVarTableID)
tableIDStr := req.Form.Get(api.OpVarTableID)
tableID, err := strconv.ParseInt(tableIDStr, 10, 64)
if err != nil {
api.WriteError(w, http.StatusBadRequest,
Expand All @@ -219,7 +208,7 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
api.WriteError(w, http.StatusInternalServerError, err)
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
Expand Down
3 changes: 2 additions & 1 deletion cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/api/middleware"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/version"
Expand All @@ -44,7 +45,7 @@ type statusAPI struct {
func RegisterStatusAPIRoutes(router *gin.Engine, capture capture.Capture) {
statusAPI := statusAPI{capture: capture}
router.GET("/status", gin.WrapF(statusAPI.handleStatus))
router.GET("/debug/info", gin.WrapF(statusAPI.handleDebugInfo))
router.GET("/debug/info", middleware.AuthenticateMiddleware(capture), gin.WrapF(statusAPI.handleDebugInfo))
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli etcd.CDCEtcdClient, w io.Writer) {
Expand Down
22 changes: 22 additions & 0 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ var httpBadRequestError = []*errors.Error{
}

const (
// OpVarAdminJob is the key of admin job in HTTP API
OpVarAdminJob = "admin-job"
// OpVarChangefeedID is the key of changefeed ID in HTTP API
OpVarChangefeedID = "cf-id"
// OpVarTargetCaptureID is the key of to-capture ID in HTTP API
OpVarTargetCaptureID = "target-cp-id"
// OpVarTableID is the key of table ID in HTTP API
OpVarTableID = "table-id"

// APIOpVarChangefeedState is the key of changefeed state in HTTP API.
APIOpVarChangefeedState = "state"
// APIOpVarChangefeedID is the key of changefeed ID in HTTP API.
APIOpVarChangefeedID = "changefeed_id"
// APIOpVarCaptureID is the key of capture ID in HTTP API.
APIOpVarCaptureID = "capture_id"
// APIOpVarNamespace is the key of changefeed namespace in HTTP API.
APIOpVarNamespace = "namespace"
// APIOpVarTiCDCUser is the key of ticdc user in HTTP API.
APIOpVarTiCDCUser = "user"
// APIOpVarTiCDCPassword is the key of ticdc password in HTTP API.
APIOpVarTiCDCPassword = "password"

// forwardFromCapture is a header to be set when forwarding requests to owner
forwardFromCapture = "TiCDC-ForwardFromCapture"
// forwardTimes is a header to identify how many times the request has been forwarded
Expand Down
47 changes: 20 additions & 27 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ import (
"go.uber.org/zap"
)

const (
// apiOpVarChangefeedState is the key of changefeed state in HTTP API
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
// apiOpVarCaptureID is the key of capture ID in HTTP API
apiOpVarCaptureID = "capture_id"
)

// OpenAPI provides capture APIs.
type OpenAPI struct {
capture capture.Capture
Expand Down Expand Up @@ -85,17 +76,19 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {
controllerMiddleware := middleware.ForwardToControllerMiddleware(api.capture)
changefeedOwnerMiddleware := middleware.
ForwardToChangefeedOwnerMiddleware(api.capture, getChangefeedFromRequest)
authenticateMiddleware := middleware.AuthenticateMiddleware(api.capture)

// changefeed API
changefeedGroup := v1.Group("/changefeeds")
changefeedGroup.GET("", controllerMiddleware, api.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.GetChangefeed)
changefeedGroup.POST("", controllerMiddleware, api.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", changefeedOwnerMiddleware, api.RebalanceTables)
changefeedGroup.POST("/:changefeed_id/tables/move_table", changefeedOwnerMiddleware, api.MoveTable)
changefeedGroup.POST("", controllerMiddleware, authenticateMiddleware, api.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, authenticateMiddleware, api.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, authenticateMiddleware, api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, authenticateMiddleware, api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, authenticateMiddleware, api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", changefeedOwnerMiddleware, authenticateMiddleware, api.RebalanceTables)
changefeedGroup.POST("/:changefeed_id/tables/move_table", changefeedOwnerMiddleware, authenticateMiddleware, api.MoveTable)

// owner API
ownerGroup := v1.Group("/owner")
Expand Down Expand Up @@ -126,7 +119,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {
// @Router /api/v1/changefeeds [get]
func (h *OpenAPI) ListChangefeed(c *gin.Context) {
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
state := c.Query(api.APIOpVarChangefeedState)
controller, err := h.capture.GetController()
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -202,7 +195,7 @@ func (h *OpenAPI) ListChangefeed(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id} [get]
func (h *OpenAPI) GetChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
Expand Down Expand Up @@ -343,7 +336,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
func (h *OpenAPI) PauseChangefeed(c *gin.Context) {
ctx := c.Request.Context()

changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
Expand Down Expand Up @@ -380,7 +373,7 @@ func (h *OpenAPI) PauseChangefeed(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id}/resume [post]
func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
Expand Down Expand Up @@ -418,7 +411,7 @@ func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id} [put]
func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
Expand Down Expand Up @@ -487,7 +480,7 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id} [delete]
func (h *OpenAPI) RemoveChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
Expand Down Expand Up @@ -558,7 +551,7 @@ func (h *OpenAPI) RemoveChangefeed(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *OpenAPI) RebalanceTables(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
Expand Down Expand Up @@ -592,7 +585,7 @@ func (h *OpenAPI) RebalanceTables(c *gin.Context) {
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
func (h *OpenAPI) MoveTable(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
Expand Down Expand Up @@ -658,14 +651,14 @@ func (h *OpenAPI) ResignController(c *gin.Context) {
func (h *OpenAPI) GetProcessor(c *gin.Context) {
ctx := c.Request.Context()

changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(api.APIOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

captureID := c.Param(apiOpVarCaptureID)
captureID := c.Param(api.APIOpVarCaptureID)
if err := model.ValidateChangefeedID(captureID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", captureID))
return
Expand Down Expand Up @@ -955,6 +948,6 @@ func SetLogLevel(c *gin.Context) {
func getChangefeedFromRequest(ctx *gin.Context) model.ChangeFeedID {
return model.ChangeFeedID{
Namespace: model.DefaultNamespace,
ID: ctx.Param(apiOpVarChangefeedID),
ID: ctx.Param(api.APIOpVarChangefeedID),
}
}
Loading

0 comments on commit 203d570

Please sign in to comment.