Skip to content

Commit

Permalink
source-mongodb: fix various bugs related to batch bindings
Browse files Browse the repository at this point in the history
Fixes a handful of bugs I found when doing some final Q/A tests, including:

* Don't panic when getting server info if there are 0 configured bindings
* Discovery must discover bindings in batch mode if the server doesn't support
  change streams
* Correctly get server info in Validate, and compare empty resource mode with
  change stream requirements using the implicit default

Also re-ordered the resource configuration fields for cursor & poll schedule to
make a little more intuitive sense hopefully.
  • Loading branch information
williamhbaker committed Sep 20, 2024
1 parent da48bf4 commit d127dc6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions source-mongodb/.snapshots/TestDiscoverBatchCollections
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ Binding 1:
"database": "db",
"collection": "timeseries",
"captureMode": "Batch Incremental",
"pollSchedule": "5m",
"cursorField": "ts"
"cursorField": "ts",
"pollSchedule": "5m"
},
"document_schema_json": {
"if": {
Expand Down
14 changes: 7 additions & 7 deletions source-mongodb/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,18 @@
"title": "Capture Mode",
"order": 2
},
"cursorField": {
"type": "string",
"title": "Cursor Field",
"description": "The name of the field to use as a cursor for batch-mode bindings. For best performance this field should be indexed. When used with 'Batch Incremental' mode documents added to the collection are expected to always have the cursor field and for it to be strictly increasing.",
"order": 3
},
"pollSchedule": {
"type": "string",
"title": "Polling Schedule",
"description": "When and how often to poll batch collections (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset.",
"order": 3,
"order": 4,
"pattern": "^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"
},
"cursorField": {
"type": "string",
"title": "Cursor Field",
"description": "The name of the field to use as a cursor for batch-mode bindings. For best performance this field should be indexed. When used with 'Batch Incremental' mode documents added to the collection are expected to always have the cursor field and for it to be strictly increasing.",
"order": 4
}
},
"type": "object",
Expand Down
2 changes: 1 addition & 1 deletion source-mongodb/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (d *driver) Discover(ctx context.Context, req *pc.Request_Discover) (*pc.Re
var mode = captureModeChangeStream
var cursor string
var pollSchedule string
if !collectionType.canChangeStream() {
if !collectionType.canChangeStream() || !serverInfo.supportsChangeStreams {
mode = captureModeSnapshot
cursor = idProperty
}
Expand Down
25 changes: 14 additions & 11 deletions source-mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type resource struct {
Database string `json:"database" jsonschema:"title=Database name" jsonschema_extras:"order=0"`
Collection string `json:"collection" jsonschema:"title=Collection name" jsonschema_extras:"order=1"`
Mode captureMode `json:"captureMode,omitempty" jsonschema:"title=Capture Mode,enum=Change Stream Incremental,enum=Batch Snapshot,enum=Batch Incremental" jsonschema_extras:"order=2"`
PollSchedule string `json:"pollSchedule,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to poll batch collections (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$,order=3"`
Cursor string `json:"cursorField,omitempty" jsonschema:"title=Cursor Field,description=The name of the field to use as a cursor for batch-mode bindings. For best performance this field should be indexed. When used with 'Batch Incremental' mode documents added to the collection are expected to always have the cursor field and for it to be strictly increasing." jsonschema_extras:"order=4"`
Cursor string `json:"cursorField,omitempty" jsonschema:"title=Cursor Field,description=The name of the field to use as a cursor for batch-mode bindings. For best performance this field should be indexed. When used with 'Batch Incremental' mode documents added to the collection are expected to always have the cursor field and for it to be strictly increasing." jsonschema_extras:"order=3"`
PollSchedule string `json:"pollSchedule,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to poll batch collections (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$,order=4"`
}

func (r resource) Validate() error {
Expand Down Expand Up @@ -298,11 +298,6 @@ func (d *driver) Validate(ctx context.Context, req *pc.Request_Validate) (*pc.Re
return nil, fmt.Errorf("getting list of databases: %w", err)
}

serverInfo, err := getServerInfo(ctx, client, existingDatabases[0])
if err != nil {
return nil, fmt.Errorf("getting server info: %w", err)
}

var lastResources []resource
var lastBackfillCounters []uint32
if req.LastCapture != nil {
Expand All @@ -318,13 +313,21 @@ func (d *driver) Validate(ctx context.Context, req *pc.Request_Validate) (*pc.Re

var dbCollectionSpecs = make(map[string][]*mongo.CollectionSpecification)
var bindings = []*pc.Response_Validated_Binding{}
var servInf *serverInfo

for _, binding := range req.Bindings {
var res resource
if err := pf.UnmarshalStrict(binding.ResourceConfigJson, &res); err != nil {
return nil, fmt.Errorf("error parsing resource config: %w", err)
}

if servInf == nil {
servInf, err = getServerInfo(ctx, client, res.Database)
if err != nil {
return nil, fmt.Errorf("getting server info: %w", err)
}
}

var resourcePath = []string{res.Database, res.Collection}

// Validate changes to the resource spec.
Expand Down Expand Up @@ -374,10 +377,10 @@ func (d *driver) Validate(ctx context.Context, req *pc.Request_Validate) (*pc.Re
return nil, fmt.Errorf("unsupported collection type: %w", err)
}

if !serverInfo.supportsChangeStreams && res.Mode == captureModeChangeStream {
return nil, fmt.Errorf("binding %q is configured with mode '%s', but the server does not support change streams", resourcePath, res.Mode)
} else if res.Mode == captureModeChangeStream && !collectionType.canChangeStream() {
return nil, fmt.Errorf("binding %q is configured with mode '%s', but the collection type '%s' does not support change streams", resourcePath, res.Mode, collectionType)
if !servInf.supportsChangeStreams && res.getMode() == captureModeChangeStream {
return nil, fmt.Errorf("binding %q is configured with mode '%s', but the server does not support change streams", resourcePath, res.getMode())
} else if res.getMode() == captureModeChangeStream && !collectionType.canChangeStream() {
return nil, fmt.Errorf("binding %q is configured with mode '%s', but the collection type '%s' does not support change streams", resourcePath, res.getMode(), collectionType)
}

bindings = append(bindings, &pc.Response_Validated_Binding{
Expand Down
20 changes: 10 additions & 10 deletions source-mongodb/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,6 @@ func (d *driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) err
}
}()

serverInfo, err := getServerInfo(ctx, client, allBindings[0].resource.Database)
if err != nil {
return fmt.Errorf("checking server info: %w", err)
}
log.WithFields(log.Fields{
"version": serverInfo.version,
"supportsChangeStreams": serverInfo.supportsChangeStreams,
"supportsPreImages": serverInfo.supportsPreImages,
}).Info("connected to database")

var prevState captureState
if err := pf.UnmarshalStrict(open.StateJson, &prevState); err != nil {
return fmt.Errorf("unmarshalling previous state: %w", err)
Expand Down Expand Up @@ -152,6 +142,16 @@ func (d *driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) err
return nil
}

serverInfo, err := getServerInfo(ctx, client, allBindings[0].resource.Database)
if err != nil {
return fmt.Errorf("checking server info: %w", err)
}
log.WithFields(log.Fields{
"version": serverInfo.version,
"supportsChangeStreams": serverInfo.supportsChangeStreams,
"supportsPreImages": serverInfo.supportsPreImages,
}).Info("connected to database")

coordinator := newBatchStreamCoordinator(changeStreamBindings, func(ctx context.Context) (primitive.Timestamp, error) {
return getClusterOpTime(ctx, client)
})
Expand Down

0 comments on commit d127dc6

Please sign in to comment.