Skip to content

Commit

Permalink
feat: publish by labels (#2698)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlkaidChan authored Oct 23, 2023
1 parent c8bd99a commit 043bdae
Show file tree
Hide file tree
Showing 12 changed files with 2,679 additions and 2,387 deletions.
32 changes: 18 additions & 14 deletions bcs-services/bcs-bscp/cmd/config-server/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ func (s *Service) Publish(ctx context.Context, req *pbcs.PublishReq) (
}

r := &pbds.PublishReq{
BizId: req.BizId,
AppId: req.AppId,
ReleaseId: req.ReleaseId,
Memo: req.Memo,
All: req.All,
Default: req.Default,
Groups: req.Groups,
BizId: req.BizId,
AppId: req.AppId,
ReleaseId: req.ReleaseId,
Memo: req.Memo,
All: req.All,
GrayPublishMode: req.GrayPublishMode,
Default: req.Default,
Groups: req.Groups,
Labels: req.Labels,
}
rp, err := s.client.DS.Publish(grpcKit.RpcCtx(), r)
if err != nil {
Expand Down Expand Up @@ -75,13 +77,15 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbcs.Gener
}

r := &pbds.GenerateReleaseAndPublishReq{
BizId: req.BizId,
AppId: req.AppId,
ReleaseName: req.ReleaseName,
ReleaseMemo: req.ReleaseMemo,
All: req.All,
Groups: req.Groups,
Variables: req.Variables,
BizId: req.BizId,
AppId: req.AppId,
ReleaseName: req.ReleaseName,
ReleaseMemo: req.ReleaseMemo,
Variables: req.Variables,
All: req.All,
GrayPublishMode: req.GrayPublishMode,
Groups: req.Groups,
Labels: req.Labels,
}
rp, err := s.client.DS.GenerateReleaseAndPublish(grpcKit.RpcCtx(), r)
if err != nil {
Expand Down
213 changes: 153 additions & 60 deletions bcs-services/bcs-bscp/cmd/data-service/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,70 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"

"bscp.io/pkg/dal/gen"
"bscp.io/pkg/dal/table"
"bscp.io/pkg/kit"
"bscp.io/pkg/logs"
pbgroup "bscp.io/pkg/protocol/core/group"
pbds "bscp.io/pkg/protocol/data-service"
"bscp.io/pkg/runtime/selector"
"bscp.io/pkg/types"
)

// Publish exec publish strategy.
func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.PublishResp, error) {
grpcKit := kit.FromGrpcContext(ctx)

kt := kit.FromGrpcContext(ctx)
groupIDs := make([]uint32, 0)
tx := s.dao.GenQuery().Begin()

if !req.All {
if req.GrayPublishMode == "" {
// !NOTE: Compatible with previous pipelined plugins version
req.GrayPublishMode = table.PublishByGroups.String()
}
publishMode := table.GrayPublishMode(req.GrayPublishMode)
if err := publishMode.Validate(); err != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, err
}
// validate and query group ids.
if publishMode == table.PublishByGroups {
for _, groupID := range req.Groups {
if groupID == 0 {
groupIDs = append(groupIDs, groupID)
continue
}
group, e := s.dao.Group().Get(grpcKit, groupID, req.BizId)
if e != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, fmt.Errorf("group %d not exist", groupID)
}
groupIDs = append(groupIDs, group.ID)
}
}
if publishMode == table.PublishByLabels {
groupID, err := s.createGroupByLabels(grpcKit, tx, req.BizId, req.AppId, req.Labels)
if err != nil {
logs.Errorf("create group by labels failed, err: %v, rid: %s", err, grpcKit.Rid)
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, err
}
groupIDs = append(groupIDs, groupID)
}
}

opt := &types.PublishOption{
BizID: req.BizId,
Expand All @@ -39,19 +88,22 @@ func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.Publ
All: req.All,
Default: req.Default,
Memo: req.Memo,
Groups: req.Groups,
Groups: groupIDs,
Revision: &table.CreatedRevision{
Creator: kt.User,
Creator: grpcKit.User,
},
}

if err := s.validatePublishGroups(kt, opt); err != nil {
pshID, err := s.dao.Publish().PublishWithTx(grpcKit, tx, opt)
if err != nil {
logs.Errorf("publish strategy failed, err: %v, rid: %s", err, grpcKit.Rid)
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, err
}

pshID, err := s.dao.Publish().Publish(kt, opt)
if err != nil {
logs.Errorf("publish strategy failed, err: %v, rid: %s", err, kt.Rid)
if err := tx.Commit(); err != nil {
logs.Errorf("commit transaction failed, err: %v, rid: %s", err, grpcKit.Rid)
return nil, err
}

Expand All @@ -66,28 +118,15 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener

grpcKit := kit.FromGrpcContext(ctx)

groupIDs := make([]uint32, 0)

// step1: validate and query group ids.
if !req.All {
for _, name := range req.Groups {
group, e := s.dao.Group().GetByName(grpcKit, req.BizId, name)
if e != nil {
return nil, fmt.Errorf("group %s not exist", name)
}
groupIDs = append(groupIDs, group.ID)
}
}

// Note: need to change batch operator to query config item and it's commit.
// step2: query app's all config items.
// query app's all config items.
cfgItems, err := s.getAppConfigItems(grpcKit)
if err != nil {
logs.Errorf("query app config item list failed, err: %v, rid: %s", err, grpcKit.Rid)
return nil, err
}

// step3: get app template revisions which are template config items
// get app template revisions which are template config items
tmplRevisions, err := s.getAppTmplRevisions(grpcKit)
if err != nil {
logs.Errorf("get app template revisions failed, err: %v, rid: %s", err, grpcKit.Rid)
Expand All @@ -103,11 +142,50 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
return nil, fmt.Errorf("release name %s already exists", req.ReleaseName)
}

// step4: begin transaction to create release and released config item.
groupIDs := make([]uint32, 0)

tx := s.dao.GenQuery().Begin()
// step5: create release.

if !req.All {
if req.GrayPublishMode == "" {
// !NOTE: Compatible with previous pipelined plugins version
req.GrayPublishMode = table.PublishByGroups.String()
}
publishMode := table.GrayPublishMode(req.GrayPublishMode)
if e := publishMode.Validate(); e != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, e
}
// validate and query group ids.
if publishMode == table.PublishByGroups {
for _, name := range req.Groups {
group, e := s.dao.Group().GetByName(grpcKit, req.BizId, name)
if e != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, fmt.Errorf("group %s not exist", name)
}
groupIDs = append(groupIDs, group.ID)
}
}
if publishMode == table.PublishByLabels {
groupID, e := s.createGroupByLabels(grpcKit, tx, req.BizId, req.AppId, req.Labels)
if e != nil {
logs.Errorf("create group by labels failed, err: %v, rid: %s", e, grpcKit.Rid)
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, e
}
groupIDs = append(groupIDs, groupID)
}
}

// create release.
release := &table.Release{
// Spec: req.Spec.ReleaseSpec(),
Spec: &table.ReleaseSpec{
Name: req.ReleaseName,
Memo: req.ReleaseMemo,
Expand All @@ -128,7 +206,7 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
}
return nil, err
}
// step6: create released hook.
// create released hook.
if err = s.createReleasedHook(grpcKit, tx, req.BizId, req.AppId, releaseID); err != nil {
logs.Errorf("create released hook failed, err: %v, rid: %s", err, grpcKit.Rid)
if rErr := tx.Rollback(); rErr != nil {
Expand All @@ -137,7 +215,7 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
return nil, err
}

// step7: do template and non-template config item related operations for create release.
// do template and non-template config item related operations for create release.
if err = s.doConfigItemOperations(grpcKit, req.Variables, tx, release.ID, tmplRevisions, cfgItems); err != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
Expand All @@ -146,7 +224,7 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
return nil, err
}

// step8: publish with transaction.
// publish with transaction.
kt := kit.FromGrpcContext(ctx)

opt := &types.PublishOption{
Expand All @@ -160,9 +238,6 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
Creator: kt.User,
},
}
if e := s.validatePublishGroups(kt, opt); e != nil {
return nil, e
}
pshID, err := s.dao.Publish().PublishWithTx(kt, tx, opt)
if err != nil {
logs.Errorf("publish strategy failed, err: %v, rid: %s", err, kt.Rid)
Expand All @@ -172,14 +247,59 @@ func (s *Service) GenerateReleaseAndPublish(ctx context.Context, req *pbds.Gener
return nil, err
}

// step9: commit transaction.
// commit transaction.
if err = tx.Commit(); err != nil {
logs.Errorf("commit transaction failed, err: %v, rid: %s", err, kt.Rid)
return nil, err
}
return &pbds.PublishResp{PublishedStrategyHistoryId: pshID}, nil
}

func (s *Service) createGroupByLabels(grpcKit *kit.Kit, tx *gen.QueryTx, bizID, appID uint32,
labels []*structpb.Struct) (uint32, error) {
timeStr := time.Now().Format("20060102150405.000")
timeStr = strings.ReplaceAll(timeStr, ".", "")
elements := make([]selector.Element, 0)
for _, label := range labels {
element, err := pbgroup.UnmarshalElement(label)
if err != nil {
return 0, fmt.Errorf("unmarshal group label failed, err: %v", err)
}
elements = append(elements, *element)
}
group := table.Group{
Spec: &table.GroupSpec{
Name: fmt.Sprintf("g_%s", timeStr),
Public: false,
Mode: table.Custom,
Selector: &selector.Selector{
LabelsAnd: elements,
},
},
Attachment: &table.GroupAttachment{
BizID: bizID,
},
Revision: &table.Revision{
Creator: grpcKit.User,
Reviser: grpcKit.User,
},
}
groupID, err := s.dao.Group().CreateWithTx(grpcKit, tx, &group)
if err != nil {
return 0, err
}
if err := s.dao.GroupAppBind().BatchCreateWithTx(grpcKit, tx, []*table.GroupAppBind{
{
GroupID: groupID,
AppID: appID,
BizID: bizID,
},
}); err != nil {
return 0, err
}
return groupID, nil
}

func (s *Service) createReleasedHook(grpcKit *kit.Kit, tx *gen.QueryTx, bizID, appID, releaseID uint32) error {
pre, err := s.dao.ReleasedHook().Get(grpcKit, bizID, appID, 0, table.PreHook)
if err == nil {
Expand Down Expand Up @@ -207,30 +327,3 @@ func (s *Service) createReleasedHook(grpcKit *kit.Kit, tx *gen.QueryTx, bizID, a
}
return nil
}

func (s *Service) validatePublishGroups(kt *kit.Kit, opt *types.PublishOption) error {
for _, groupID := range opt.Groups {
// frontend would set groupID 0 as default.
if groupID == 0 {
opt.Default = true
continue
}
group, e := s.dao.Group().Get(kt, groupID, opt.BizID)
if e != nil {
if e == gorm.ErrRecordNotFound {
return fmt.Errorf("group %d not exists", groupID)
}
return e
}
if group.Spec.Public {
continue
}
if _, e := s.dao.GroupAppBind().Get(kt, groupID, opt.AppID, opt.BizID); e != nil {
if e == gorm.ErrRecordNotFound {
return fmt.Errorf("group %d not bind app %d", groupID, opt.AppID)
}
return e
}
}
return nil
}
Loading

0 comments on commit 043bdae

Please sign in to comment.