Skip to content

Commit

Permalink
feat: support watch by database
Browse files Browse the repository at this point in the history
--story=120905990
  • Loading branch information
wcy00000000000000 committed Dec 10, 2024
1 parent 7004b81 commit c457c95
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 464 deletions.
11 changes: 6 additions & 5 deletions src/storage/stream/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
* limitations under the License.
*/

// Package event TODO
// Package event defines event watch logics
package event

import "go.mongodb.org/mongo-driver/mongo"

// Event TODO
// Event is the struct for event watch
type Event struct {
database string
DBName string
client *mongo.Client
}

// NewEvent TODO
func NewEvent(client *mongo.Client, db string) (*Event, error) {
return &Event{client: client, database: db}, nil
// NewEvent new Event
func NewEvent(client *mongo.Client, db, dbName string) (*Event, error) {
return &Event{client: client, database: db, DBName: dbName}, nil
}
228 changes: 164 additions & 64 deletions src/storage/stream/event/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ package event

import (
"context"
"fmt"
"reflect"
"time"

"configcenter/pkg/filter"
"configcenter/src/common"
"configcenter/src/common/blog"
"configcenter/src/common/json"
"configcenter/src/common/mapstr"
"configcenter/src/storage/stream/types"
"github.com/tidwall/gjson"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
)

Expand All @@ -32,123 +35,220 @@ func (e *Event) List(ctx context.Context, opts *types.ListOptions) (ch chan *typ
if err := opts.CheckSetDefault(); err != nil {
return nil, err
}
// prepare for list all the data.
totalCnt, err := e.client.Database(e.database).
Collection(opts.Collection).
CountDocuments(ctx, opts.Filter)
if err != nil {
return nil, fmt.Errorf("count db %s, collection: %s with filter: %+v failed, err: %v",
e.database, opts.Collection, opts.Filter, err)

collOpts := make(map[string]types.WatchCollOptions)
for id, collOpt := range opts.CollOpts {
collOpts[id] = types.WatchCollOptions{CollectionOptions: collOpt}
}

listOpts := &listOptions{
collOptsInfo: parseCollOpts(collOpts),
pageSize: opts.PageSize,
}

eventChan := make(chan *types.Event, types.DefaultEventChanSize)

go func() {
e.lister(ctx, false, totalCnt, opts, eventChan)
e.lister(ctx, false, listOpts, eventChan)
}()

return eventChan, nil

}

// lister is try to list data with filter. withRetry is to control whether you need to retry list when an error encountered.
func (e *Event) lister(ctx context.Context, withRetry bool, cnt int64, opts *types.ListOptions, ch chan *types.Event) {
type listOptions struct {
collOptsInfo *parsedCollOptsInfo
pageSize *int
}

pageSize := *opts.PageSize
// lister is try to list data with filter. withRetry is to control whether you need to retry list when an error encountered.
func (e *Event) lister(ctx context.Context, withRetry bool, opts *listOptions, ch chan *types.Event) {
reset := func() {
// sleep a while and retry later
time.Sleep(3 * time.Second)
}
for start := 0; start < int(cnt); start += pageSize {

// list collections
var collections []string
for {
var err error
collections, err = e.client.Database(e.database).ListCollectionNames(ctx, bson.M{})
if err != nil {
if withRetry {
blog.Errorf("list db: %s collections failed, will *retry later*, err: %v", e.database, err)
reset()
continue
}
blog.Errorf("list db: %s collections failed, will exit list immediately.", e.database)
return
}
break
}

for _, collection := range collections {
// get collection related task ids, fields, filters
taskIDs, fields, filters := make([]string, 0), make([]string, 0), make([]filter.RuleFactory, 0)
needAllFilter, needAllFields := false, false
for collRegex, regex := range opts.collOptsInfo.collRegexMap {
if !regex.MatchString(collection) {
continue
}

taskIDs = append(taskIDs, opts.collOptsInfo.collRegexTasksMap[collRegex]...)
if opts.collOptsInfo.collCondMap[collRegex] == nil {
needAllFilter = true
} else if !needAllFilter {
filters = append(filters, opts.collOptsInfo.collCondMap[collRegex])
}
if len(opts.collOptsInfo.collFieldsMap[collRegex]) == 0 {
needAllFields = true
} else if !needAllFields {
fields = append(fields, opts.collOptsInfo.collFieldsMap[collRegex]...)
}
}

if len(taskIDs) == 0 {
continue
}

findOpts := new(options.FindOptions)
findOpts.SetSkip(int64(start))
findOpts.SetLimit(int64(pageSize))
projection := make(map[string]int)
if len(opts.Fields) != 0 {
for _, field := range opts.Fields {
findOpts.SetLimit(int64(*opts.pageSize))
if !needAllFields && len(fields) != 0 {
projection := make(map[string]int)
for _, field := range fields {
if len(field) <= 0 {
continue
}
projection[field] = 1
}
projection["_id"] = 1
findOpts.Projection = projection
}

cond := make(mapstr.MapStr)
if !needAllFilter && len(filters) != 0 {
expr := filter.Expression{RuleFactory: &filter.CombinedRule{Condition: filter.Or, Rules: filters}}
var err error
cond, err = expr.ToMgo()
if err != nil {
if withRetry {
blog.Errorf("convert coll %s filter(%s) to mongo failed", collection, expr)
continue
}
blog.Errorf("convert coll %s filter(%s) to mongo failed, will exit list immediately.", collection, expr)
return
}
}

// list data from this collection
needReturn := e.listOneColl(ctx, &listOneCollOptions{collection: collection, taskIDs: taskIDs, filter: cond,
findOpts: findOpts, ch: ch, withRetry: withRetry, reset: reset})
if needReturn {
return
}
}

// tell the user that the list operation has already done.
// we only send for once.
ch <- &types.Event{
OperationType: types.ListDone,
}
}

type listOneCollOptions struct {
collection string
taskIDs []string
filter mapstr.MapStr
findOpts *options.FindOptions
taskTypeMap map[string]reflect.Type
taskFilterMap map[string]*filter.Expression
ch chan *types.Event
withRetry bool
reset func()
}

type mongoID struct {
Oid primitive.ObjectID `bson:"_id"`
}

// listOneColl try to list data with filter from one collection, returns if list operation needs to exit
func (e *Event) listOneColl(ctx context.Context, opts *listOneCollOptions) bool {
for {
retry:
cursor, err := e.client.Database(e.database).
Collection(opts.Collection).
Find(ctx, opts.Filter, findOpts)
Collection(opts.collection).
Find(ctx, opts.filter, opts.findOpts)
if err != nil {
blog.Errorf("list watch operation, but list db: %s, collection: %s failed, will *retry later*, err: %v",
e.database, opts.Collection, err)
reset()
blog.Errorf("list db: %s, coll: %s failed, will *retry later*, err: %v", e.database, opts.collection, err)
opts.reset()
continue
}

hasData := false
for cursor.Next(ctx) {
hasData = true
select {
case <-ctx.Done():
blog.Errorf("received stopped lister signal, stop list db: %s, collection: %s, err: %v", e.database,
opts.Collection, ctx.Err())
return
opts.collection, ctx.Err())
return true
default:

}

// create a new event struct for use
result := reflect.New(reflect.TypeOf(opts.EventStruct)).Elem()
err := cursor.Decode(result.Addr().Interface())
if err != nil {
blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, will *retry later*, err: %v",
e.database, opts.Collection, err)
rawDoc := bson.Raw{}
if err := cursor.Decode(&rawDoc); err != nil {
blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, "+
"will *retry later*, err: %v", e.database, opts.collection, err)

cursor.Close(ctx)
if !withRetry {
blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, will exit list immediately.",
e.database, opts.Collection)
close(ch)
return
if !opts.withRetry {
blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, "+
"will exit list immediately.", e.database, opts.collection)
close(opts.ch)
return true
}

reset()
opts.reset()
goto retry
}

byt, _ := json.Marshal(result.Addr().Interface())
oid := gjson.GetBytes(byt, "_id").String()
oidInfo := new(mongoID)
if err := bson.Unmarshal(rawDoc, &oidInfo); err != nil {
blog.Errorf("decode mongodb oid failed, err: %v, data: %s", err, rawDoc)
continue
}
opts.filter["_id"] = mapstr.MapStr{common.BKDBGT: oidInfo.Oid}

// send the event now
ch <- &types.Event{
Oid: oid,
Document: result.Interface(),
OperationType: types.Lister,
DocBytes: byt,
for _, taskID := range opts.taskIDs {
parsed, isValid := parseDataForTask(rawDoc, taskID, opts.taskFilterMap, opts.taskTypeMap)
if !isValid {
continue
}

parsed.Oid = oidInfo.Oid.Hex()
parsed.OperationType = types.Lister
parsed.Collection = opts.collection
opts.ch <- parsed
}
}

if err := cursor.Err(); err != nil {
blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, will *retry later*, err: %v",
e.database, opts.Collection, err)
blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, "+
"will *retry later*, err: %v", e.database, opts.collection, err)
cursor.Close(ctx)
if !withRetry {
blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, will exit list immediately.",
e.database, opts.Collection)
close(ch)
return
if !opts.withRetry {
blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, "+
"will exit list immediately.", e.database, opts.collection)
close(opts.ch)
return true
}
reset()
opts.reset()
goto retry
}
cursor.Close(ctx)
}

// tell the user that the list operation has already done.
// we only send for once.
ch <- &types.Event{
Oid: "",
Document: reflect.New(reflect.TypeOf(opts.EventStruct)).Elem().Interface(),
OperationType: types.ListDone,
if !hasData {
return false
}
}

}
Loading

0 comments on commit c457c95

Please sign in to comment.