diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 16f67030f1..d2093a258e 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -200,6 +200,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas ), CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, dburl: url, + version: version, watchBufferLength: config.watchBufferLength, watchBufferWriteTimeout: config.watchBufferWriteTimeout, watchConnectTimeout: config.watchConnectTimeout, @@ -286,6 +287,7 @@ type crdbDatastore struct { revisions.CommonDecoder dburl string + version crdbVersion readPool, writePool *pool.RetryPool watchBufferLength uint16 watchBufferWriteTimeout time.Duration @@ -548,6 +550,13 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er defer cancel() time.AfterFunc(1*time.Second, cancel) + var query string + if cds.version.Major >= 22 { + query = fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s", "1s") + } else { + query = fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s") + } + _ = cds.writePool.ExecFunc(streamCtx, func(ctx context.Context, tag pgconn.CommandTag, err error) error { if err != nil && errors.Is(err, context.Canceled) { features.Watch.Status = datastore.FeatureSupported @@ -557,7 +566,7 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error()) } return nil - }, fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s")) + }, query) <-streamCtx.Done() diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 883d14978a..3e93e23ff6 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -24,7 +24,7 @@ import ( ) const ( - queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';" + queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '%s';" queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';" ) @@ -140,7 +140,13 @@ func (cds *crdbDatastore) watch( } resolvedDurationString := strconv.FormatInt(resolvedDuration.Milliseconds(), 10) + "ms" - interpolated := fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString) + + var interpolated string + if cds.version.Major >= 22 { + interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString, resolvedDurationString) + } else { + interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString) + } sendError := func(err error) { if errors.Is(ctx.Err(), context.Canceled) {