diff --git a/src/storage/stream/event/event.go b/src/storage/stream/event/event.go index e7d87b7444..6d256e5a86 100644 --- a/src/storage/stream/event/event.go +++ b/src/storage/stream/event/event.go @@ -10,18 +10,19 @@ * limitations under the License. */ -// Package event TODO +// Package event defines event watch logics package event import "go.mongodb.org/mongo-driver/mongo" -// Event TODO +// Event is the struct for event watch type Event struct { database string + DBName string client *mongo.Client } -// NewEvent TODO -func NewEvent(client *mongo.Client, db string) (*Event, error) { - return &Event{client: client, database: db}, nil +// NewEvent new Event +func NewEvent(client *mongo.Client, db, dbName string) (*Event, error) { + return &Event{client: client, database: db, DBName: dbName}, nil } diff --git a/src/storage/stream/event/list.go b/src/storage/stream/event/list.go index 2700be2f05..6ae9fea4b5 100644 --- a/src/storage/stream/event/list.go +++ b/src/storage/stream/event/list.go @@ -14,14 +14,17 @@ package event import ( "context" - "fmt" "reflect" "time" + "configcenter/pkg/filter" + "configcenter/src/common" "configcenter/src/common/blog" - "configcenter/src/common/json" + "configcenter/src/common/mapstr" "configcenter/src/storage/stream/types" - "github.com/tidwall/gjson" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -32,123 +35,220 @@ func (e *Event) List(ctx context.Context, opts *types.ListOptions) (ch chan *typ if err := opts.CheckSetDefault(); err != nil { return nil, err } - // prepare for list all the data. - totalCnt, err := e.client.Database(e.database). - Collection(opts.Collection). - CountDocuments(ctx, opts.Filter) - if err != nil { - return nil, fmt.Errorf("count db %s, collection: %s with filter: %+v failed, err: %v", - e.database, opts.Collection, opts.Filter, err) + + collOpts := make(map[string]types.WatchCollOptions) + for id, collOpt := range opts.CollOpts { + collOpts[id] = types.WatchCollOptions{CollectionOptions: collOpt} + } + + listOpts := &listOptions{ + collOptsInfo: parseCollOpts(collOpts), + pageSize: opts.PageSize, } eventChan := make(chan *types.Event, types.DefaultEventChanSize) go func() { - e.lister(ctx, false, totalCnt, opts, eventChan) + e.lister(ctx, false, listOpts, eventChan) }() return eventChan, nil } -// lister is try to list data with filter. withRetry is to control whether you need to retry list when an error encountered. -func (e *Event) lister(ctx context.Context, withRetry bool, cnt int64, opts *types.ListOptions, ch chan *types.Event) { +type listOptions struct { + collOptsInfo *parsedCollOptsInfo + pageSize *int +} - pageSize := *opts.PageSize +// lister is try to list data with filter. withRetry is to control whether you need to retry list when an error encountered. +func (e *Event) lister(ctx context.Context, withRetry bool, opts *listOptions, ch chan *types.Event) { reset := func() { // sleep a while and retry later time.Sleep(3 * time.Second) } - for start := 0; start < int(cnt); start += pageSize { + + // list collections + var collections []string + for { + var err error + collections, err = e.client.Database(e.database).ListCollectionNames(ctx, bson.M{}) + if err != nil { + if withRetry { + blog.Errorf("list db: %s collections failed, will *retry later*, err: %v", e.database, err) + reset() + continue + } + blog.Errorf("list db: %s collections failed, will exit list immediately.", e.database) + return + } + break + } + + for _, collection := range collections { + // get collection related task ids, fields, filters + taskIDs, fields, filters := make([]string, 0), make([]string, 0), make([]filter.RuleFactory, 0) + needAllFilter, needAllFields := false, false + for collRegex, regex := range opts.collOptsInfo.collRegexMap { + if !regex.MatchString(collection) { + continue + } + + taskIDs = append(taskIDs, opts.collOptsInfo.collRegexTasksMap[collRegex]...) + if opts.collOptsInfo.collCondMap[collRegex] == nil { + needAllFilter = true + } else if !needAllFilter { + filters = append(filters, opts.collOptsInfo.collCondMap[collRegex]) + } + if len(opts.collOptsInfo.collFieldsMap[collRegex]) == 0 { + needAllFields = true + } else if !needAllFields { + fields = append(fields, opts.collOptsInfo.collFieldsMap[collRegex]...) + } + } + + if len(taskIDs) == 0 { + continue + } findOpts := new(options.FindOptions) - findOpts.SetSkip(int64(start)) - findOpts.SetLimit(int64(pageSize)) - projection := make(map[string]int) - if len(opts.Fields) != 0 { - for _, field := range opts.Fields { + findOpts.SetLimit(int64(*opts.pageSize)) + if !needAllFields && len(fields) != 0 { + projection := make(map[string]int) + for _, field := range fields { if len(field) <= 0 { continue } projection[field] = 1 } + projection["_id"] = 1 findOpts.Projection = projection } + cond := make(mapstr.MapStr) + if !needAllFilter && len(filters) != 0 { + expr := filter.Expression{RuleFactory: &filter.CombinedRule{Condition: filter.Or, Rules: filters}} + var err error + cond, err = expr.ToMgo() + if err != nil { + if withRetry { + blog.Errorf("convert coll %s filter(%s) to mongo failed", collection, expr) + continue + } + blog.Errorf("convert coll %s filter(%s) to mongo failed, will exit list immediately.", collection, expr) + return + } + } + + // list data from this collection + needReturn := e.listOneColl(ctx, &listOneCollOptions{collection: collection, taskIDs: taskIDs, filter: cond, + findOpts: findOpts, ch: ch, withRetry: withRetry, reset: reset}) + if needReturn { + return + } + } + + // tell the user that the list operation has already done. + // we only send for once. + ch <- &types.Event{ + OperationType: types.ListDone, + } +} + +type listOneCollOptions struct { + collection string + taskIDs []string + filter mapstr.MapStr + findOpts *options.FindOptions + taskTypeMap map[string]reflect.Type + taskFilterMap map[string]*filter.Expression + ch chan *types.Event + withRetry bool + reset func() +} + +type mongoID struct { + Oid primitive.ObjectID `bson:"_id"` +} + +// listOneColl try to list data with filter from one collection, returns if list operation needs to exit +func (e *Event) listOneColl(ctx context.Context, opts *listOneCollOptions) bool { + for { retry: cursor, err := e.client.Database(e.database). - Collection(opts.Collection). - Find(ctx, opts.Filter, findOpts) + Collection(opts.collection). + Find(ctx, opts.filter, opts.findOpts) if err != nil { - blog.Errorf("list watch operation, but list db: %s, collection: %s failed, will *retry later*, err: %v", - e.database, opts.Collection, err) - reset() + blog.Errorf("list db: %s, coll: %s failed, will *retry later*, err: %v", e.database, opts.collection, err) + opts.reset() continue } + hasData := false for cursor.Next(ctx) { + hasData = true select { case <-ctx.Done(): blog.Errorf("received stopped lister signal, stop list db: %s, collection: %s, err: %v", e.database, - opts.Collection, ctx.Err()) - return + opts.collection, ctx.Err()) + return true default: - } - // create a new event struct for use - result := reflect.New(reflect.TypeOf(opts.EventStruct)).Elem() - err := cursor.Decode(result.Addr().Interface()) - if err != nil { - blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, will *retry later*, err: %v", - e.database, opts.Collection, err) + rawDoc := bson.Raw{} + if err := cursor.Decode(&rawDoc); err != nil { + blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, "+ + "will *retry later*, err: %v", e.database, opts.collection, err) cursor.Close(ctx) - if !withRetry { - blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, will exit list immediately.", - e.database, opts.Collection) - close(ch) - return + if !opts.withRetry { + blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, "+ + "will exit list immediately.", e.database, opts.collection) + close(opts.ch) + return true } - reset() + opts.reset() goto retry } - byt, _ := json.Marshal(result.Addr().Interface()) - oid := gjson.GetBytes(byt, "_id").String() + oidInfo := new(mongoID) + if err := bson.Unmarshal(rawDoc, &oidInfo); err != nil { + blog.Errorf("decode mongodb oid failed, err: %v, data: %s", err, rawDoc) + continue + } + opts.filter["_id"] = mapstr.MapStr{common.BKDBGT: oidInfo.Oid} - // send the event now - ch <- &types.Event{ - Oid: oid, - Document: result.Interface(), - OperationType: types.Lister, - DocBytes: byt, + for _, taskID := range opts.taskIDs { + parsed, isValid := parseDataForTask(rawDoc, taskID, opts.taskFilterMap, opts.taskTypeMap) + if !isValid { + continue + } + + parsed.Oid = oidInfo.Oid.Hex() + parsed.OperationType = types.Lister + parsed.Collection = opts.collection + opts.ch <- parsed } } if err := cursor.Err(); err != nil { - blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, will *retry later*, err: %v", - e.database, opts.Collection, err) + blog.Errorf("list watch operation, but list db: %s, collection: %s with cursor failed, "+ + "will *retry later*, err: %v", e.database, opts.collection, err) cursor.Close(ctx) - if !withRetry { - blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, will exit list immediately.", - e.database, opts.Collection) - close(ch) - return + if !opts.withRetry { + blog.Warnf("list watch operation, but list db: %s, collection: %s with cursor failed, "+ + "will exit list immediately.", e.database, opts.collection) + close(opts.ch) + return true } - reset() + opts.reset() goto retry } cursor.Close(ctx) - } - // tell the user that the list operation has already done. - // we only send for once. - ch <- &types.Event{ - Oid: "", - Document: reflect.New(reflect.TypeOf(opts.EventStruct)).Elem().Interface(), - OperationType: types.ListDone, + if !hasData { + return false + } } - } diff --git a/src/storage/stream/event/list_watch.go b/src/storage/stream/event/list_watch.go index 9e178c9ba9..1b4b9b056d 100644 --- a/src/storage/stream/event/list_watch.go +++ b/src/storage/stream/event/list_watch.go @@ -18,6 +18,7 @@ import ( "configcenter/src/common/blog" "configcenter/src/storage/stream/types" + "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -30,7 +31,7 @@ func (e *Event) ListWatch(ctx context.Context, opts *types.ListWatchOptions) (*t eventChan := make(chan *types.Event, types.DefaultEventChanSize) go func() { - pipeline, streamOptions := generateOptions(&opts.Options) + pipeline, streamOptions, collOptsInfo := generateOptions(&opts.Options) // TODO: should use the mongodb cluster timestamp, if the time is not synchronise with // mongodb cluster time, then we may have to lost some events. @@ -50,70 +51,24 @@ func (e *Event) ListWatch(ctx context.Context, opts *types.ListWatchOptions) (*t // we watch the stream at first, so that we can know if we can watch success. // and, we do not read the event stream immediately, we wait until all the data // has been listed from database. - stream, err := e.client.Database(e.database). - Collection(opts.Collection). - Watch(ctx, pipeline, streamOptions) - if err != nil && isFatalError(err) { - // TODO: send alarm immediately. - blog.Errorf("mongodb watch collection: %s got a fatal error, skip resume token and retry, err: %v", - opts.Collection, err) - // reset the resume token, because we can not use the former resume token to watch success for now. - streamOptions.StartAfter = nil - opts.StartAfterToken = nil - // cause we have already got a fatal error, we can not try to watch from where we lost. - // so re-watch from 1 minutes ago to avoid lost events. - // Note: apparently, we may got duplicate events with this re-watch - startAtTime := uint32(time.Now().Unix()) - 60 - streamOptions.StartAtOperationTime = &primitive.Timestamp{ - T: startAtTime, - I: 0, - } - opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} - - if opts.WatchFatalErrorCallback != nil { - err := opts.WatchFatalErrorCallback(types.TimeStamp{Sec: startAtTime}) - if err != nil { - blog.Errorf("do watch fatal error callback for coll %s failed, err: %v", opts.Collection, err) - } - } - - stream, err = e.client. - Database(e.database). - Collection(opts.Collection). - Watch(ctx, pipeline, streamOptions) - } - + stream, streamOptions, watchOpts, err := e.watch(ctx, pipeline, streamOptions, &opts.Options) if err != nil { blog.Fatalf("mongodb watch failed with conf: %+v, err: %v", *opts, err) } - // prepare for list all the data. - totalCnt, err := e.client.Database(e.database). - Collection(opts.Collection). - CountDocuments(ctx, opts.Filter) - if err != nil { - // close the event stream. - stream.Close(ctx) - - blog.Fatalf("count db %s, collection: %s with filter: %+v failed, err: %v", - e.database, opts.Collection, opts.Filter, err) - } - - listOptions := &types.ListOptions{ - Filter: opts.Filter, - EventStruct: opts.EventStruct, - Collection: opts.Collection, - PageSize: opts.PageSize, + listOpts := &listOptions{ + collOptsInfo: collOptsInfo, + pageSize: opts.PageSize, } go func() { // list all the data from the collection and send it as an event now. - e.lister(ctx, true, totalCnt, listOptions, eventChan) + e.lister(ctx, true, listOpts, eventChan) select { case <-ctx.Done(): - blog.Errorf("received stopped watch signal, stop list db: %s, collection: %s, err: %v", e.database, - opts.Collection, ctx.Err()) + blog.Errorf("received stopped watch signal, stop list db: %s, name: %s, err: %v", e.database, e.DBName, + ctx.Err()) return default: @@ -121,7 +76,15 @@ func (e *Event) ListWatch(ctx context.Context, opts *types.ListWatchOptions) (*t // all the data has already listed and send the event. // now, it's time to watch the event stream. - e.loopWatch(ctx, &opts.Options, streamOptions, stream, pipeline, eventChan) + loopOpts := &loopWatchOpts{ + Options: watchOpts, + streamOptions: streamOptions, + stream: stream, + pipeline: pipeline, + eventChan: eventChan, + collOptsInfo: collOptsInfo, + } + e.loopWatch(ctx, loopOpts) }() }() diff --git a/src/storage/stream/event/utils.go b/src/storage/stream/event/utils.go index f2ce40f350..cca6e8fdbd 100644 --- a/src/storage/stream/event/utils.go +++ b/src/storage/stream/event/utils.go @@ -14,67 +14,48 @@ package event import ( "reflect" + "regexp" + "strings" + "configcenter/pkg/filter" + "configcenter/src/common" + "configcenter/src/common/blog" + "configcenter/src/common/json" "configcenter/src/storage/stream/types" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) -var ( - esType = reflect.TypeOf(types.EventStream{}) -) - -// newEventStruct construct a change stream event data structure -// which can help us to adjust different kind of collection structure. -func newEventStruct(typ reflect.Type) reflect.Value { - f := reflect.StructOf([]reflect.StructField{ - { - Name: "EventStream", - Type: esType, - Anonymous: true, - Tag: `bson:",inline"`, - }, - { - Name: "FullDocument", - Type: typ, - Anonymous: false, - Tag: `bson:"fullDocument"`, - }, - }) - return reflect.New(f).Elem() -} - const fullDocPrefix = "fullDocument." var eventFields = []string{"_id", "operationType", "clusterTime", "ns", "documentKey", "updateDescription"} -func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStreamOptions) { - - fields := make([]bson.E, 0) - if opts.OperationType != nil { - fields = append(fields, bson.E{Key: "operationType", Value: *opts.OperationType}) - } +func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStreamOptions, *parsedCollOptsInfo) { + collOptsInfo := parseCollOpts(opts.CollOpts) - if opts.Collection == "" { - fields = append(fields, bson.E{Key: "ns.coll", Value: opts.CollectionFilter}) - } + allFilters := genWatchFilter(collOptsInfo.collCondMap, collOptsInfo.collOpTypeMap) - if opts.Filter != nil { - for k, v := range opts.Filter { - fields = append(fields, bson.E{Key: fullDocPrefix + k, Value: v}) + // if any options needs all fields, do not filter fields + allFields := make([]string, 0) + for _, fields := range collOptsInfo.collFieldsMap { + if len(fields) == 0 { + allFields = make([]string, 0) + break } + allFields = append(allFields, fields...) } var pipeline mongo.Pipeline - if len(fields) != 0 { - pipeline = []bson.D{{{Key: "$match", Value: fields}}} + if len(allFilters) != 0 { + pipeline = []bson.D{{{Key: "$match", Value: allFilters}}} } - if len(opts.Fields) != 0 { + if len(allFields) != 0 { project := make(map[string]int) - for _, f := range opts.Fields { + for _, f := range allFields { project[fullDocPrefix+f] = 1 } @@ -122,5 +103,154 @@ func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStream var batchSize int32 = 2000 streamOptions.BatchSize = &batchSize - return pipeline, streamOptions + return pipeline, streamOptions, collOptsInfo +} + +// parsedCollOptsInfo is the parsed watch task and collection info generated by collection options +type parsedCollOptsInfo struct { + // taskTypeMap is watch task id to event data type map + taskTypeMap map[string]reflect.Type + // taskFilterMap is watch task id to filter map + taskFilterMap map[string]*filter.Expression + // collRegexMap is collection regex string value to collection regex expression map + collRegexMap map[string]*regexp.Regexp + // collRegexTasksMap is collection regex to watch task ids map + collRegexTasksMap map[string][]string + // collCondMap is collection regex to merged data filter condition map + collCondMap map[string]*filter.Expression + // collOpTypeMap is collection regex to merged operation types map + collOpTypeMap map[string][]types.OperType + // collFieldsMap is collection regex to merged fields map + collFieldsMap map[string][]string +} + +// parseCollOpts parse collection options to parsedCollOptsInfo +func parseCollOpts(collOpts map[string]types.WatchCollOptions) *parsedCollOptsInfo { + // generate watch task and collection mapping info by collection filter watch options + info := &parsedCollOptsInfo{ + taskTypeMap: make(map[string]reflect.Type), + taskFilterMap: make(map[string]*filter.Expression), + collRegexMap: make(map[string]*regexp.Regexp), + collRegexTasksMap: make(map[string][]string), + collCondMap: make(map[string]*filter.Expression), + collOpTypeMap: make(map[string][]types.OperType), + collFieldsMap: make(map[string][]string), + } + + for id, opt := range collOpts { + // generate watch task and collection mapping info + info.taskTypeMap[id] = reflect.Indirect(reflect.ValueOf(opt.EventStruct)).Type() + info.taskFilterMap[id] = opt.Filter + regex := opt.CollectionFilter.Regex + info.collRegexMap[regex] = regexp.MustCompile(regex) + info.collRegexTasksMap[regex] = append(info.collRegexTasksMap[regex], id) + + // merge collection condition with the same collection regex + cond := opt.Filter + collCond, exists := info.collCondMap[regex] + if exists { + if collCond == nil { + cond = nil + } else if cond != nil { + cond = &filter.Expression{RuleFactory: &filter.CombinedRule{Condition: filter.Or, + Rules: []filter.RuleFactory{collCond, cond}}} + } + } + info.collCondMap[regex] = cond + + // select all operation type if any options needs all types, otherwise, return types specified by all options + collOpTypes, exists := info.collOpTypeMap[regex] + if !exists || len(collOpTypes) != 0 { + if opt.OperationType == nil { + info.collOpTypeMap[regex] = make([]types.OperType, 0) + } else { + info.collOpTypeMap[regex] = append(collOpTypes, *opt.OperationType) + } + } + + // select all fields if any options needs all fields, otherwise, return fields specified by all options + collFields, exists := info.collFieldsMap[regex] + if !exists || len(collFields) != 0 { + if len(opt.Fields) == 0 { + info.collFieldsMap[regex] = make([]string, 0) + } else { + info.collFieldsMap[regex] = append(collFields, opt.Fields...) + } + } + } + + return info +} + +// genWatchFilter generate watch filter by collection to condition and operation type map +func genWatchFilter(collCondMap map[string]*filter.Expression, collOpTypeMap map[string][]types.OperType) [][]bson.E { + allFilters := make([][]bson.E, 0) + noFilterCollRegexes := make([]string, 0) + for regex, cond := range collCondMap { + // if the collection regex has no condition and no operation type filter, add to noFilterCollRegexes + if len(collOpTypeMap[regex]) == 0 && cond == nil { + noFilterCollRegexes = append(noFilterCollRegexes, regex) + continue + } + + // generate filter for collection regex with special condition + filters := []bson.E{{Key: "ns.coll", Value: bson.M{common.BKDBLIKE: regex}}} + + if len(collOpTypeMap[regex]) > 0 { + filters = append(filters, bson.E{Key: "operationType", Value: bson.M{common.BKDBIN: collOpTypeMap[regex]}}) + } + + if cond != nil { + mongoFilter, err := cond.ToMgo() + if err != nil { + blog.Errorf("convert coll(%s) filter(%s) to mongo filter failed, err: %v, skip", regex, cond, err) + continue + } + for k, v := range mongoFilter { + filters = append(filters, bson.E{Key: fullDocPrefix + k, Value: v}) + } + } + + allFilters = append(allFilters, filters) + } + + // merge all no filter collection regexes to one collection regex filter + if len(noFilterCollRegexes) != 0 { + allFilters = append(allFilters, []bson.E{ + {Key: "ns.coll", Value: bson.M{common.BKDBLIKE: strings.Join(noFilterCollRegexes, "|")}}, + }) + } + return allFilters +} + +// parseDataForTask parse event data for task, returns event data and matched flag +func parseDataForTask(rawDoc bson.Raw, taskID string, taskFilterMap map[string]*filter.Expression, + taskTypeMap map[string]reflect.Type) (*types.Event, bool) { + + // decode event data and get json value + doc := reflect.New(taskTypeMap[taskID]).Interface() + if err := bson.Unmarshal(rawDoc, doc); err != nil { + blog.Errorf("decode to struct: %T failed, err: %v, data: %s", doc, err, rawDoc) + return nil, false + } + byt, _ := json.Marshal(doc) + + // check if event data matches watch filter + if expr, exists := taskFilterMap[taskID]; exists && expr != nil { + matched, err := expr.Match(filter.JsonString(byt)) + if err != nil { + blog.Errorf("check if event data(%s) matches watch filter(%s) failed, err: %v", string(byt), expr, err) + return nil, false + } + + if !matched { + return nil, false + } + } + + return &types.Event{ + Document: doc, + DocBytes: byt, + TaskID: taskID, + }, true } diff --git a/src/storage/stream/event/watch.go b/src/storage/stream/event/watch.go index 1d53149e11..2b034f9141 100644 --- a/src/storage/stream/event/watch.go +++ b/src/storage/stream/event/watch.go @@ -15,19 +15,18 @@ package event import ( "context" "errors" - "reflect" "strings" "time" "configcenter/src/common/blog" - "configcenter/src/common/json" "configcenter/src/storage/stream/types" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) -// Watch TODO +// Watch mongodb change stream events func (e *Event) Watch(ctx context.Context, opts *types.WatchOptions) (*types.Watcher, error) { if err := opts.CheckSetDefault(); err != nil { return nil, err @@ -35,61 +34,12 @@ func (e *Event) Watch(ctx context.Context, opts *types.WatchOptions) (*types.Wat eventChan := make(chan *types.Event, types.DefaultEventChanSize) go func() { - pipeline, streamOptions := generateOptions(&opts.Options) - - blog.InfoJSON("start watch with pipeline: %s, options: %s, stream options: %s", pipeline, opts, streamOptions) - - var stream *mongo.ChangeStream - var err error - - if opts.Collection != "" { - stream, err = e.client. - Database(e.database). - Collection(opts.Collection). - Watch(ctx, pipeline, streamOptions) - } else { - stream, err = e.client. - Database(e.database). - Watch(ctx, pipeline, streamOptions) - } - - if err != nil && isFatalError(err) { - // TODO: send alarm immediately. - blog.Errorf("mongodb watch collection: %s got a fatal error, skip resume token and retry, err: %v", - opts.Collection, err) - // reset the resume token, because we can not use the former resume token to watch success for now. - streamOptions.StartAfter = nil - opts.StartAfterToken = nil - // cause we have already got a fatal error, we can not try to watch from where we lost. - // so re-watch from 1 minutes ago to avoid lost events. - // Note: apparently, we may got duplicate events with this re-watch - startAtTime := uint32(time.Now().Unix()) - 60 - streamOptions.StartAtOperationTime = &primitive.Timestamp{ - T: startAtTime, - I: 0, - } - opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} + pipeline, streamOptions, collOptsInfo := generateOptions(&opts.Options) - if opts.WatchFatalErrorCallback != nil { - err := opts.WatchFatalErrorCallback(types.TimeStamp{Sec: startAtTime}) - if err != nil { - blog.Errorf("do watch fatal error callback for coll %s failed, err: %v", opts.Collection, err) - } - } - - blog.InfoJSON("start watch with pipeline: %s, options: %s, stream options: %s", pipeline, opts, streamOptions) - if opts.Collection != "" { - stream, err = e.client. - Database(e.database). - Collection(opts.Collection). - Watch(ctx, pipeline, streamOptions) - } else { - stream, err = e.client. - Database(e.database). - Watch(ctx, pipeline, streamOptions) - } - } + blog.InfoJSON("start watch db %s with pipeline: %s, options: %s, stream options: %s", e.DBName, pipeline, opts, + streamOptions) + stream, streamOptions, watchOpts, err := e.watch(ctx, pipeline, streamOptions, &opts.Options) if err != nil { if errors.Is(err, context.Canceled) { // if error is context cancelled, then loop watch will exit at the same time @@ -98,8 +48,15 @@ func (e *Event) Watch(ctx context.Context, opts *types.WatchOptions) (*types.Wat blog.Fatalf("mongodb watch failed with conf: %+v, err: %v", *opts, err) } - go e.loopWatch(ctx, &opts.Options, streamOptions, stream, pipeline, eventChan) - + loopOpts := &loopWatchOpts{ + Options: watchOpts, + streamOptions: streamOptions, + stream: stream, + pipeline: pipeline, + eventChan: eventChan, + collOptsInfo: collOptsInfo, + } + go e.loopWatch(ctx, loopOpts) }() watcher := &types.Watcher{ @@ -108,173 +65,108 @@ func (e *Event) Watch(ctx context.Context, opts *types.WatchOptions) (*types.Wat return watcher, nil } -func (e *Event) loopWatch(ctx context.Context, - opts *types.Options, - streamOptions *options.ChangeStreamOptions, - stream *mongo.ChangeStream, - pipeline mongo.Pipeline, - eventChan chan *types.Event) { +func (e *Event) watch(ctx context.Context, pipeline mongo.Pipeline, streamOptions *options.ChangeStreamOptions, + opts *types.Options) (*mongo.ChangeStream, *options.ChangeStreamOptions, *types.Options, error) { + + stream, err := e.client. + Database(e.database). + Watch(ctx, pipeline, streamOptions) + + if err != nil && isFatalError(err) { + // TODO: send alarm immediately. + blog.Errorf("mongodb watch db: %s got a fatal error, skip resume token and retry, err: %v", e.DBName, err) + // reset the resume token, because we can not use the former resume token to watch success for now. + streamOptions.StartAfter = nil + opts.StartAfterToken = nil + // cause we have already got a fatal error, we can not try to watch from where we lost. + // so re-watch from 1 minutes ago to avoid lost events. + // Note: apparently, we may got duplicate events with this re-watch + startAtTime := uint32(time.Now().Unix()) - 60 + streamOptions.StartAtOperationTime = &primitive.Timestamp{ + T: startAtTime, + I: 0, + } + opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} + + if opts.WatchFatalErrorCallback != nil { + err := opts.WatchFatalErrorCallback(types.TimeStamp{Sec: startAtTime}) + if err != nil { + blog.Errorf("do watch fatal error callback for db %s failed, err: %v", e.DBName, err) + } + } + blog.InfoJSON("start watch db %s with pipeline: %s, options: %s, stream options: %s", e.DBName, pipeline, + opts, streamOptions) + + stream, err = e.client. + Database(e.database). + Watch(ctx, pipeline, streamOptions) + } + + return stream, streamOptions, opts, err +} + +type loopWatchOpts struct { + *types.Options + streamOptions *options.ChangeStreamOptions + stream *mongo.ChangeStream + pipeline mongo.Pipeline + eventChan chan *types.Event + currentToken types.EventToken + collOptsInfo *parsedCollOptsInfo +} + +func (e *Event) loopWatch(ctx context.Context, opts *loopWatchOpts) { retry := false - currentToken := types.EventToken{Data: ""} - typ := reflect.Indirect(reflect.ValueOf(opts.EventStruct)).Type() + opts.currentToken = types.EventToken{Data: ""} + + e.setCleaner(ctx, opts.eventChan) - e.setCleaner(ctx, eventChan, opts.Collection) + // init collection to task ids map + collTasksMap := make(map[string][]string) for { // no events, try cancel watch here. select { case <-ctx.Done(): - blog.Warnf("received stopped loop watch signal, stop watch db: %s, collection: %s, err: %v", e.database, - opts.Collection, ctx.Err()) + blog.Warnf("received stopped loop watch signal, stop watch db: %s, name: %s, err: %v", e.database, e.DBName, + ctx.Err()) - if stream != nil { - stream.Close(context.Background()) + if opts.stream != nil { + opts.stream.Close(context.Background()) } return default: - } if retry { - time.Sleep(5 * time.Second) - if len(currentToken.Data) != 0 { - // if error occurs, then retry watch and start from the last token. - // so that we can continue the event from where it just broken. - streamOptions.StartAtOperationTime = nil - streamOptions.SetStartAfter(currentToken) - } - - // if start at operation time and start after token is both set, use resume token instead of start time - if streamOptions.StartAtOperationTime != nil && streamOptions.StartAfter != nil { - blog.Infof("resume token and time is both set, discard the resume time, option: %+v", streamOptions) - streamOptions.StartAtOperationTime = nil - } - - blog.InfoJSON("retry watch with pipeline: %s, opts: %s, stream opts: %s", pipeline, opts, streamOptions) - - var err error - if opts.Collection != "" { - stream, err = e.client. - Database(e.database). - Collection(opts.Collection). - Watch(ctx, pipeline, streamOptions) - } else { - stream, err = e.client. - Database(e.database). - Watch(ctx, pipeline, streamOptions) - } - if err != nil { - if isFatalError(err) { - // TODO: send alarm immediately. - blog.Errorf("mongodb watch collection: %s got a fatal error, skip resume token and retry, err: %v", - opts.Collection, err) - // reset the resume token, because we can not use the former resume token to watch success for now. - streamOptions.StartAfter = nil - opts.StartAfterToken = nil - // because we have already got a fatal error, we can not try to watch from where we lost. - // so re-watch from 1 minutes ago to avoid lost events. - // Note: apparently, we may got duplicate events with this re-watch - startAtTime := uint32(time.Now().Unix()) - 60 - streamOptions.StartAtOperationTime = &primitive.Timestamp{ - T: startAtTime, - I: 0, - } - opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} - currentToken.Data = "" - - if opts.WatchFatalErrorCallback != nil { - err := opts.WatchFatalErrorCallback(types.TimeStamp{Sec: startAtTime}) - if err != nil { - blog.Errorf("do watch fatal error callback for coll %s failed, err: %v", opts.Collection, err) - } - } - } - - blog.ErrorJSON("mongodb watch %s failed with opts: %s, pipeline: %s, streamOpts: %s, err: %s", - opts.Collection, opts, pipeline, streamOptions, err) - - retry = true + opts, retry = e.retryWatch(ctx, opts) + if retry { continue } - - // re-watch success, now we clean start at operation time options - streamOptions.StartAtOperationTime = nil } - for stream.Next(ctx) { + for opts.stream.Next(ctx) { // still have events, try cancel steam here. select { case <-ctx.Done(): - blog.Warnf("received stopped loop watch signal, stop loop next, watch db: %s, collection: %s, err: %v", - e.database, opts.Collection, ctx.Err()) - stream.Close(context.Background()) + blog.Warnf("received stopped loop watch signal, stop loop next, watch db: %s, db name: %s, err: %v", + e.database, e.DBName, ctx.Err()) + opts.stream.Close(context.Background()) return default: - - } - - newStruct := newEventStruct(typ) - if err := stream.Decode(newStruct.Addr().Interface()); err != nil { - blog.Errorf("watch collection %s, but decode to event struct: %v failed, err: %v", - opts.Collection, reflect.TypeOf(opts.EventStruct), err) - continue } - base := newStruct.Field(0).Interface().(types.EventStream) - - // if we received a invalid event, which is caused by collection drop, rename or drop database operation, - // we have to try re-watch again. otherwise, this may cause this process CPU high because of continue - // for loop cursor. - // https://docs.mongodb.com/manual/reference/change-events/#invalidate-event - if base.OperationType == types.Invalidate { - blog.ErrorJSON("mongodb watch received a invalid event, will retry watch again, options: %s", *opts) - - // clean the last resume token to force the next try watch from the beginning. otherwise we will - // receive the invalid event again. - streamOptions.StartAfter = nil - opts.StartAfterToken = nil - // cause we have already got a fatal error, we can not try to watch from where we lost. - // so re-watch from 1 minutes ago to avoid lost events. - // Note: apparently, we may got duplicate events with this re-watch - startAtTime := uint32(time.Now().Unix()) - 60 - streamOptions.StartAtOperationTime = &primitive.Timestamp{ - T: startAtTime, - I: 0, - } - opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} - currentToken.Data = "" - - stream.Close(ctx) - retry = true + opts, retry = e.handleStreamEvent(ctx, opts, collTasksMap) + if retry { break } - - currentToken.Data = base.Token.Data - byt, _ := json.Marshal(newStruct.Field(1).Addr().Interface()) - - eventChan <- &types.Event{ - Oid: base.DocumentKey.ID.Hex(), - OperationType: base.OperationType, - Document: newStruct.Field(1).Addr().Interface(), - DocBytes: byt, - Collection: base.Namespace.Collection, - ClusterTime: types.TimeStamp{ - Sec: base.ClusterTime.T, - Nano: base.ClusterTime.I, - }, - Token: base.Token, - ChangeDesc: &types.ChangeDescription{ - UpdatedFields: base.UpdateDesc.UpdatedFields, - RemovedFields: base.UpdateDesc.RemovedFields, - }, - } } - if err := stream.Err(); err != nil { + if err := opts.stream.Err(); err != nil { blog.ErrorJSON("mongodb watch encountered a error, conf: %s, err: %s", *opts, err) - stream.Close(ctx) + opts.stream.Close(ctx) retry = true continue } @@ -283,19 +175,18 @@ func (e *Event) loopWatch(ctx context.Context, // setCleaner set up a monitor to close the cursor when the context is canceled. // this is useful to release stream resource when this watch is canceled outside with context is canceled. -func (e *Event) setCleaner(ctx context.Context, eventChan chan *types.Event, coll string) { +func (e *Event) setCleaner(ctx context.Context, eventChan chan *types.Event) { go func() { select { case <-ctx.Done(): - blog.Warnf("received stopped loop watch collection: %s signal, close cursor now, err: %v", - coll, ctx.Err()) + blog.Warnf("received stopped loop watch db: %s signal, close cursor now, err: %v", e.DBName, ctx.Err()) // even though we may already close the stream, but there may still have events in the stream's // batch cursor, so we need to consume a event, so that we can release the stream resource select { // try consume a event, so that stream.Next(ctx) can be called to release the stream resources. case <-eventChan: - blog.Warnf("received stopped loop watch collection: %s signal, consumed a event", coll) + blog.Warnf("received stopped loop watch db: %s signal, consumed a event", e.DBName) default: // no events, and stream resource will be recycled in the next round. @@ -306,8 +197,160 @@ func (e *Event) setCleaner(ctx context.Context, eventChan chan *types.Event, col }() } -// isFatalError TODO -// if watch encountered a fatal error, we should watch without resume token, which means from now. +func (e *Event) retryWatch(ctx context.Context, opts *loopWatchOpts) (*loopWatchOpts, bool) { + streamOptions := opts.streamOptions + + time.Sleep(5 * time.Second) + if len(opts.currentToken.Data) != 0 { + // if error occurs, then retry watch and start from the last token. + // so that we can continue the event from where it just broken. + streamOptions.StartAtOperationTime = nil + streamOptions.SetStartAfter(opts.currentToken) + } + + // if start at operation time and start after token is both set, use resume token instead of start time + if streamOptions.StartAtOperationTime != nil && streamOptions.StartAfter != nil { + blog.Infof("resume token and time is both set, discard the resume time, option: %+v", streamOptions) + streamOptions.StartAtOperationTime = nil + } + + blog.InfoJSON("retry watch db %s with pipeline: %s, opts: %s, stream opts: %s", e.DBName, opts.pipeline, + opts.Options, streamOptions) + + var err error + opts.stream, err = e.client. + Database(e.database). + Watch(ctx, opts.pipeline, streamOptions) + if err != nil { + if isFatalError(err) { + // TODO: send alarm immediately. + blog.Errorf("mongodb watch db: %s got a fatal error, skip resume token and retry, err: %v", + e.DBName, err) + // reset the resume token, because we can not use the former resume token to watch success for now. + streamOptions.StartAfter = nil + opts.StartAfterToken = nil + // because we have already got a fatal error, we can not try to watch from where we lost. + // so re-watch from 1 minutes ago to avoid lost events. + // Note: apparently, we may got duplicate events with this re-watch + startAtTime := uint32(time.Now().Unix()) - 60 + streamOptions.StartAtOperationTime = &primitive.Timestamp{ + T: startAtTime, + I: 0, + } + opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} + opts.currentToken.Data = "" + + if opts.WatchFatalErrorCallback != nil { + err := opts.WatchFatalErrorCallback(types.TimeStamp{Sec: startAtTime}) + if err != nil { + blog.Errorf("do watch fatal error callback for db %s failed, err: %v", e.DBName, err) + } + } + } + + blog.ErrorJSON("mongodb watch db %s failed with opts: %s, pipeline: %s, streamOpts: %s, err: %s", + e.DBName, opts, opts.pipeline, streamOptions, err) + return nil, true + } + + // re-watch success, now we clean start at operation time options + streamOptions.StartAtOperationTime = nil + return opts, false +} + +func (e *Event) handleStreamEvent(ctx context.Context, opts *loopWatchOpts, collTasksMap map[string][]string) ( + *loopWatchOpts, bool) { + + event := new(types.RawEvent) + if err := opts.stream.Decode(event); err != nil { + blog.Errorf("watch db %s, but decode to raw event struct failed, err: %v", e.DBName, err) + return opts, true + } + + // if we received a invalid event, which is caused by collection drop, rename or drop database operation, + // we have to try re-watch again. otherwise, this may cause this process CPU high because of continue + // for loop cursor. + // https://docs.mongodb.com/manual/reference/change-events/#invalidate-event + if event.EventStream.OperationType == types.Invalidate { + blog.ErrorJSON("mongodb watch received a invalid event, will retry watch again, options: %s", *opts) + + // clean the last resume token to force the next try watch from the beginning. otherwise we will + // receive the invalid event again. + opts.streamOptions.StartAfter = nil + opts.StartAfterToken = nil + // cause we have already got a fatal error, we can not try to watch from where we lost. + // so re-watch from 1 minutes ago to avoid lost events. + // Note: apparently, we may got duplicate events with this re-watch + startAtTime := uint32(time.Now().Unix()) - 60 + opts.streamOptions.StartAtOperationTime = &primitive.Timestamp{ + T: startAtTime, + I: 0, + } + opts.StartAtTime = &types.TimeStamp{Sec: startAtTime} + opts.currentToken.Data = "" + + opts.stream.Close(ctx) + return opts, true + } + + opts.currentToken.Data = event.EventStream.Token.Data + + e.parseEvent(event, opts.eventChan, opts.collOptsInfo, collTasksMap) + + return opts, false +} + +func (e *Event) parseEvent(event *types.RawEvent, eventChan chan *types.Event, collOptsInfo *parsedCollOptsInfo, + collTasksMap map[string][]string) { + + base := event.EventStream + + // get the event task ids matching the collection name + taskIDs, exists := collTasksMap[base.Namespace.Collection] + if !exists { + for collRegex, regex := range collOptsInfo.collRegexMap { + if regex.MatchString(base.Namespace.Collection) { + taskIDs = append(taskIDs, collOptsInfo.collRegexTasksMap[collRegex]...) + } + } + collTasksMap[base.Namespace.Collection] = taskIDs + } + + if len(taskIDs) == 0 { + blog.Errorf("watch db %s, but get invalid event not matching any task, base: %+v", e.DBName, base) + return + } + + // decode the event data to the event data struct, use pre data for delete event + rawDoc := event.FullDoc + if base.OperationType == types.Delete { + rawDoc = event.PreFullDoc + } + + for _, taskID := range taskIDs { + parsed, isValid := parseDataForTask(rawDoc, taskID, collOptsInfo.taskFilterMap, collOptsInfo.taskTypeMap) + if !isValid { + continue + } + + parsed.Oid = base.DocumentKey.ID.Hex() + parsed.OperationType = base.OperationType + parsed.Collection = base.Namespace.Collection + parsed.ClusterTime = types.TimeStamp{ + Sec: base.ClusterTime.T, + Nano: base.ClusterTime.I, + } + parsed.Token = base.Token + parsed.ChangeDesc = &types.ChangeDescription{ + UpdatedFields: base.UpdateDesc.UpdatedFields, + RemovedFields: base.UpdateDesc.RemovedFields, + } + + eventChan <- parsed + } +} + +// isFatalError if watch encountered a fatal error, we should watch without resume token, which means from now. // errors like: // https://jira.mongodb.org/browse/SERVER-44610 // https://jira.mongodb.org/browse/SERVER-44733 diff --git a/src/storage/stream/loop/loop.go b/src/storage/stream/loop/loop.go deleted file mode 100644 index 545b01f441..0000000000 --- a/src/storage/stream/loop/loop.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package loop TODO -package loop diff --git a/src/storage/stream/loop/loop_watch.go b/src/storage/stream/loop/loop_watch.go index ec200a4c67..1340a5eec4 100644 --- a/src/storage/stream/loop/loop_watch.go +++ b/src/storage/stream/loop/loop_watch.go @@ -10,6 +10,7 @@ * limitations under the License. */ +// Package loop defines loop watch logics package loop import ( @@ -47,8 +48,8 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error { startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background()) if err != nil { - blog.Errorf("%s job, run loop watch %s, but get start token failed, err: %v", opts.Name, - opts.WatchOpt.Collection, err) + blog.Errorf("%s job, run loop watch %s, but get start token failed, err: %v", opts.Name, lw.streamWatch.DBName, + err) return err } @@ -97,7 +98,7 @@ func (lw *LoopsWatch) WithBatch(opts *types.LoopBatchOptions) error { startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background()) if err != nil { blog.Errorf("%s job, run loop watch batch %s, but get start token failed, err: %v", opts.Name, - opts.WatchOpt.Collection, err) + lw.streamWatch.DBName, err) return err } @@ -174,8 +175,8 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc, continue } - blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %s.", - opts.Name, opts.WatchOpt.Collection, lastToken) + blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %s.", opts.Name, + lw.streamWatch.DBName, lastToken) // set start after token if needed. if len(lastToken) != 0 { @@ -199,7 +200,7 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc, // start handle loop jobs go doHandler(cancelCtx, watcher, retrySignal) - blog.Warnf("%s job, retry loop %s from token: %s success.", opts.Name, opts.WatchOpt.Collection, lastToken) + blog.Warnf("%s job, retry loop %s from token: %s success.", opts.Name, lw.streamWatch.DBName, lastToken) } } } @@ -225,7 +226,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, // stop the tick to release resource. ticker.Stop() blog.Warnf("%s job, master status has changed, try to re-watch again, collection:%s", opts.Name, - opts.WatchOpt.Collection) + lw.streamWatch.DBName) // trigger re-watch action now. close(retrySignal) @@ -235,7 +236,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, } if !loop { - blog.V(5).Infof("%s job, loop %s event, but not master, skip.", opts.Name, opts.WatchOpt.Collection) + blog.V(5).Infof("%s job, loop %s event, but not master, skip.", opts.Name, lw.streamWatch.DBName) time.Sleep(5 * time.Second) continue } @@ -248,8 +249,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, // stop the tick to release resource. ticker.Stop() - blog.Warnf("%s job, received cancel loop watch %s signal, exit loop.", opts.Name, - opts.WatchOpt.Collection) + blog.Warnf("%s job, received cancel loop watch %s signal, exit loop.", opts.Name, lw.streamWatch.DBName) // exist the goroutine return @@ -258,7 +258,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, if blog.V(4) { blog.Infof("%s job, received %s event, detail: %s, op-time: %s, rid: %s", opts.Name, - opts.WatchOpt.Collection, one.String(), one.ClusterTime.String(), one.ID()) + lw.streamWatch.DBName, one.String(), one.ClusterTime.String(), one.ID()) } // calculate event count, try to get more event for a batch @@ -291,7 +291,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, first := batchEvents[0] - blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, opts.WatchOpt.Collection, + blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName, len(batchEvents), first.ClusterTime.String(), first.ID()) retry := opts.EventHandler.DoBatch(batchEvents) @@ -299,7 +299,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, if retryObserver.canStillRetry() { blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name, - opts.WatchOpt.Collection, len(batchEvents), first.ID()) + lw.streamWatch.DBName, len(batchEvents), first.ID()) // an error occurred, we need to retry it later. // tell the schedule to re-watch again. close(retrySignal) @@ -308,7 +308,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, } blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name, - opts.WatchOpt.Collection, first.ID()) + lw.streamWatch.DBName, first.ID()) // save the event token now. } @@ -319,7 +319,7 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, // update the last watched token for resume usage. if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, last.Token.Data); err != nil { blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.", - opts.Name, opts.WatchOpt.Collection, err, first.ID()) + opts.Name, lw.streamWatch.DBName, err, first.ID()) // retry later. close(retrySignal) @@ -346,7 +346,7 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, select { case <-ctxWithCancel.Done(): blog.Warnf("%s job, received cancel loop watch %s signal, exit loop, exit loop", opts.Name, - opts.WatchOpt.Collection) + lw.streamWatch.DBName) return case <-opts.StopNotifier: @@ -358,7 +358,7 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, reWatch, loop := observer.canLoop() if reWatch { - blog.Warnf("%s job, master status has changed, try to re-watch %s again", opts.Name, opts.WatchOpt.Collection) + blog.Warnf("%s job, master status has changed, try to re-watch %s again", opts.Name, lw.streamWatch.DBName) // trigger re-watch action now. close(retrySignal) // exit the for loop @@ -366,12 +366,12 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, } if !loop { - blog.Infof("%s job, received %s %s event, but not master, skip. details: %s, rid: %s", - opts.Name, opts.WatchOpt.Collection, one.OperationType, one.String(), one.ID()) + blog.Infof("%s job, received %s %s event, but not master, skip. details: %s, rid: %s", opts.Name, + lw.streamWatch.DBName, one.OperationType, one.String(), one.ID()) continue } - blog.Infof("%s job, received %s event, type: %s, op-time: %s rid: %s", opts.Name, opts.WatchOpt.Collection, + blog.Infof("%s job, received %s event, type: %s, op-time: %s rid: %s", opts.Name, lw.streamWatch.DBName, one.OperationType, one.ClusterTime.String(), one.ID()) if blog.V(4) { @@ -381,7 +381,7 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, retry := lw.tryOne(one, opts) if retry { if retryObserver.canStillRetry() { - blog.Warnf("%s job, retry watch %s later. rid: %s", opts.Name, opts.WatchOpt.Collection, one.ID()) + blog.Warnf("%s job, retry watch %s later. rid: %s", opts.Name, lw.streamWatch.DBName, one.ID()) // an error occurred, we need to retry it later. // tell the schedule to re-watch again. close(retrySignal) @@ -390,7 +390,7 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, } blog.Warnf("%s job, retry %s event exceed max count, skip, detail: %s, rid: %s", opts.Name, - opts.WatchOpt.Collection, one.String(), one.ID()) + lw.streamWatch.DBName, one.String(), one.ID()) // save the event token now. } @@ -400,8 +400,7 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, // update the last watched token for resume usage. if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, one.Token.Data); err != nil { blog.Errorf("%s job, loop watch %s event, but set last watched token failed, err: %v, rid: %s, "+ - "retry later.", - opts.WatchOpt.Collection, err, one.ID()) + "retry later.", lw.streamWatch.DBName, err, one.ID()) // retry later. close(retrySignal) @@ -418,7 +417,7 @@ func (lw *LoopsWatch) tryOne(e *types.Event, opts *types.LoopOneOptions) (retry retry := opts.EventHandler.DoAdd(e) if retry { blog.Warnf("%s job, received %s %s event, but do add job failed, retry now, rid: %s", opts.Name, - opts.WatchOpt.Collection, e.OperationType, e.ID()) + lw.streamWatch.DBName, e.OperationType, e.ID()) return retry } @@ -427,7 +426,7 @@ func (lw *LoopsWatch) tryOne(e *types.Event, opts *types.LoopOneOptions) (retry retry := opts.EventHandler.DoUpdate(e) if retry { blog.Warnf("%s job, received %s %s event, but do update job failed, retry now, rid: %s", opts.Name, - opts.WatchOpt.Collection, e.OperationType, e.ID()) + lw.streamWatch.DBName, e.OperationType, e.ID()) return retry } @@ -436,19 +435,19 @@ func (lw *LoopsWatch) tryOne(e *types.Event, opts *types.LoopOneOptions) (retry retry := opts.EventHandler.DoDelete(e) if retry { blog.Warnf("%s job, received %s %s event, but do delete job failed, retry now, rid: %s", opts.Name, - opts.WatchOpt.Collection, e.OperationType, e.ID()) + lw.streamWatch.DBName, e.OperationType, e.ID()) return retry } case types.Invalidate: blog.Errorf("%s job, watch %s event, received invalid operation type, doc: %s, rid: %s", opts.Name, - opts.WatchOpt.Collection, e.DocBytes, e.ID()) + lw.streamWatch.DBName, e.DocBytes, e.ID()) return false default: blog.Errorf("%s job, watch %s event, received unsupported operation type, doc: %s, rid: %s", opts.Name, - opts.WatchOpt.Collection, e.DocBytes, e.ID()) + lw.streamWatch.DBName, e.DocBytes, e.ID()) return false } diff --git a/src/storage/stream/stream.go b/src/storage/stream/stream.go index c20f6841ba..e2ebf148c4 100644 --- a/src/storage/stream/stream.go +++ b/src/storage/stream/stream.go @@ -10,7 +10,7 @@ * limitations under the License. */ -// Package stream TODO +// Package stream defines mongodb change stream logics package stream import ( @@ -29,8 +29,7 @@ import ( "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" ) -// Interface TODO -// Stream Interface defines all the functionality it have. +// Interface defines all the functionality it has. type Interface interface { List(ctx context.Context, opts *types.ListOptions) (ch chan *types.Event, err error) Watch(ctx context.Context, opts *types.WatchOptions) (*types.Watcher, error) @@ -68,14 +67,14 @@ func newEvent(conf local.MongoConf) (*event.Event, error) { return nil, err } - event, err := event.NewEvent(client, connStr.Database) + event, err := event.NewEvent(client, connStr.Database, conf.Name) if err != nil { return nil, fmt.Errorf("new event failed, err: %v", err) } return event, nil } -// LoopInterface TODO +// LoopInterface is the interface for event loop stream. type LoopInterface interface { WithOne(opts *types.LoopOneOptions) error WithBatch(opts *types.LoopBatchOptions) error diff --git a/src/storage/stream/types/types.go b/src/storage/stream/types/types.go index f9318f0997..4184248e3e 100644 --- a/src/storage/stream/types/types.go +++ b/src/storage/stream/types/types.go @@ -10,7 +10,7 @@ * limitations under the License. */ -// Package types TODO +// Package types defines event stream types package types import ( @@ -18,8 +18,12 @@ import ( "errors" "fmt" "reflect" + "regexp" "time" + "configcenter/pkg/filter" + "configcenter/src/common" + "github.com/tidwall/gjson" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsontype" @@ -66,35 +70,26 @@ const ( ListDone OperType = "listerDone" ) -// ListOptions TODO +// ListOptions is the option to list data from specified collections by filter type ListOptions struct { - // Filter helps you filter out which kind of data's change event you want - // to receive, such as the filter : - // {"bk_obj_id":"biz"} means you can only receives the data that has this kv. - // Note: the filter's key must be a exist document key filed in the collection's - // document - Filter map[string]interface{} - - // list the documents only with these fields. - Fields []string - - // EventStruct is the point data struct that the event decoded into. - // Note: must be a point value. - EventStruct interface{} - - // Collection defines which collection you want you watch. - Collection string + // CollOpts is the watch task id to list data options for different collections + CollOpts map[string]CollectionOptions // Step defines the list step when the client try to list all the data defines in the // namespace. default value is `DefaultListStep`, value range [200,2000] PageSize *int } -// CheckSetDefault TODO +// CheckSetDefault validate list options, and set default value for not set fields func (opts *ListOptions) CheckSetDefault() error { - if reflect.ValueOf(opts.EventStruct).Kind() != reflect.Ptr || - reflect.ValueOf(opts.EventStruct).IsNil() { - return fmt.Errorf("invalid EventStruct field, must be a pointer and not nil") + if len(opts.CollOpts) == 0 { + return errors.New("invalid Namespace field, database and collection can not be empty") + } + + for id, opt := range opts.CollOpts { + if err := opt.Validate(); err != nil { + return fmt.Errorf("collection options[%s] is invalid, err: %v", id, err) + } } if opts.PageSize != nil { @@ -104,14 +99,83 @@ func (opts *ListOptions) CheckSetDefault() error { } else { opts.PageSize = &defaultListPageSize } + return nil +} + +// CollectionOptions is the options for collections with the same watch filter +type CollectionOptions struct { + // CollectionFilter helps you filter out which kind of collection's change event you want to receive, + // such as the filter : {"$regex":"_HostBase$"} means you can only receive events from collections + // that ends with the suffix _HostBase + CollectionFilter *CollectionFilter + + // Filter helps you filter out which kind of data's change event you want to receive, + // such as the filter: {"bk_obj_id":"biz"} means you can only receive the data that has this kv. + // Note: the filter's key must be an exist document key filed in the collection's document + Filter *filter.Expression + + // Fields defines which fields will be returned along with the events + // this is optional, if not set, all the fields will be returned. + Fields []string + + // EventStruct is the point data struct that the event decoded into. + // Note: must be a point value. + EventStruct interface{} +} + +// Validate CollectionOptions +func (opts *CollectionOptions) Validate() error { + if reflect.ValueOf(opts.EventStruct).Kind() != reflect.Ptr || + reflect.ValueOf(opts.EventStruct).IsNil() { + return errors.New("invalid EventStruct field, must be a pointer and not nil") + } - if len(opts.Collection) == 0 { + if opts.CollectionFilter == nil { return errors.New("invalid Namespace field, database and collection can not be empty") } + + if err := opts.CollectionFilter.Validate(); err != nil { + return err + } + + if opts.Filter != nil { + validOpt := filter.NewDefaultExprOpt(nil) + validOpt.IgnoreRuleFields = true + return opts.Filter.Validate(validOpt) + } + return nil +} + +// CollectionFilter is the collection filter for watch +type CollectionFilter struct { + Regex string +} + +// Validate CollectionFilter +func (c *CollectionFilter) Validate() error { + if c.Regex == "" { + return errors.New("collection filter has no regex") + } + + _, err := regexp.Compile(c.Regex) + if err != nil { + return fmt.Errorf("collection filter regex %s is invalid, err: %v", c.Regex, err) + } + return nil } -// Options TODO +// ToMongo convert to mongodb filter +func (c *CollectionFilter) ToMongo() interface{} { + return bson.M{common.BKDBLIKE: c.Regex} +} + +// Match checks if the collection name matches the filter +func (c *CollectionFilter) Match(coll string) bool { + return regexp.MustCompile(c.Regex).MatchString(coll) +} + +// Options is the options for watch change stream operation type Options struct { // reference doc: // https://docs.mongodb.com/manual/reference/method/db.collection.watch/#change-stream-with-full-document-update-lookup @@ -124,29 +188,8 @@ type Options struct { // default value is 1000ms MaxAwaitTime *time.Duration - // OperationType describe which kind of operation you want to watch, - // such as a "insert" operation or a "replace" operation. - // If you don't set, it will means watch all kinds of operations. - OperationType *OperType - - // Filter helps you filter out which kind of data's change event you want - // to receive, such as the filter : - // {"bk_obj_id":"biz"} means you can only receives the data that has this kv. - // Note: the filter's key must be a exist document key filed in the collection's - // document - Filter map[string]interface{} - - // CollectionFilter helps you filter out which kind of collection's change event you want to receive, - // such as the filter : {"$regex":"^cc_ObjectBase"} means you can only receive events from collections - // starts with the prefix cc_ObjectBase - CollectionFilter interface{} - - // EventStruct is the point data struct that the event decoded into. - // Note: must be a point value. - EventStruct interface{} - - // Collection defines which collection you want you watch. - Collection string + // CollOpts is the watch task id to watch options for different collections + CollOpts map[string]WatchCollOptions // StartAfterToken describe where you want to watch the event. // Note: the returned event doesn't contains the token represented, @@ -160,20 +203,20 @@ type Options struct { // WatchFatalErrorCallback the function to be called when watch failed with a fatal error // reset the resume token and set the start time for next watch in case it use the mistaken token again WatchFatalErrorCallback func(startAtTime TimeStamp) error `json:"-"` - - // Fields defines which fields will be returned along with the events - // this is optional, if not set, all the fields will be returned. - Fields []string } var defaultMaxAwaitTime = time.Second -// CheckSetDefault TODO -// CheckSet check the legal of each option, and set the default value +// CheckSetDefault check the legal of each option, and set the default value func (opts *Options) CheckSetDefault() error { - if reflect.ValueOf(opts.EventStruct).Kind() != reflect.Ptr || - reflect.ValueOf(opts.EventStruct).IsNil() { - return fmt.Errorf("invalid EventStruct field, must be a pointer and not nil") + if len(opts.CollOpts) == 0 { + return errors.New("invalid Namespace field, database and collection can not be empty") + } + + for i, opt := range opts.CollOpts { + if err := opt.Validate(); err != nil { + return fmt.Errorf("collection options[%s] is invalid, err: %v", i, err) + } } if opts.MajorityCommitted == nil { @@ -184,13 +227,19 @@ func (opts *Options) CheckSetDefault() error { if opts.MaxAwaitTime == nil { opts.MaxAwaitTime = &defaultMaxAwaitTime } - - if len(opts.Collection) == 0 && opts.CollectionFilter == nil { - return errors.New("invalid Namespace field, database and collection can not be empty") - } return nil } +// WatchCollOptions is the watch options for collections with the same watch filter +type WatchCollOptions struct { + // OperationType describe which kind of operation you want to watch, + // such as an "insert" operation or a "replace" operation. + // If you don't set, it will means watch all kinds of operations. + OperationType *OperType + + CollectionOptions +} + // TimeStamp TODO type TimeStamp struct { // the most significant 32 bits are a time_t value (seconds since the Unix epoch) @@ -283,6 +332,8 @@ type Event struct { DocBytes []byte OperationType OperType Collection string + // TaskID is the task id of the event, which is used to distribute event to event watch task + TaskID string // The timestamp from the oplog entry associated with the event. ClusterTime TimeStamp @@ -311,15 +362,13 @@ func (e *Event) ID() string { return fmt.Sprintf("%s-%d-%d", e.Oid, e.ClusterTime.Sec, e.ClusterTime.Nano) } -// EventToken TODO -// mongodb change stream token, which represent a event's identity. +// EventToken mongodb change stream token, which represent a event's identity. type EventToken struct { // Hex value of document's _id Data string `bson:"_data"` } -// EventStream TODO -// reference: +// EventStream reference: // https://docs.mongodb.com/manual/reference/change-events/ type EventStream struct { Token EventToken `bson:"_id"` @@ -350,6 +399,13 @@ type UpdateDescription struct { RemovedFields []string `json:"removedFields" bson:"removedFields"` } +// RawEvent is the change stream event struct with raw event data +type RawEvent struct { + EventStream `bson:",inline"` + FullDoc bson.Raw `bson:"fullDocument"` + PreFullDoc bson.Raw `bson:"fullDocumentBeforeChange"` +} + // EventInfo is mongodb event info type EventInfo struct { UpdatedFields map[string]interface{} `json:"update_fields,omitempty"`