Skip to content

Commit

Permalink
Adjust CRDB watch to set the checkpoint frequency equal to the resolved
Browse files Browse the repository at this point in the history
Should reduce watch pressure from the caller to CRDB
  • Loading branch information
josephschorr committed Oct 23, 2024
1 parent fcd2670 commit bbfbe20
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
11 changes: 10 additions & 1 deletion internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
dburl: url,
version: version,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchConnectTimeout: config.watchConnectTimeout,
Expand Down Expand Up @@ -286,6 +287,7 @@ type crdbDatastore struct {
revisions.CommonDecoder

dburl string
version crdbVersion
readPool, writePool *pool.RetryPool
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
Expand Down Expand Up @@ -548,6 +550,13 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
defer cancel()
time.AfterFunc(1*time.Second, cancel)

var query string
if cds.version.Major >= 22 {
query = fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s", "1s")
} else {
query = fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s")
}

_ = cds.writePool.ExecFunc(streamCtx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
if err != nil && errors.Is(err, context.Canceled) {
features.Watch.Status = datastore.FeatureSupported
Expand All @@ -557,7 +566,7 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
}
return nil
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s"))
}, query)

<-streamCtx.Done()

Expand Down
10 changes: 8 additions & 2 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '%s';"
queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';"
)

Expand Down Expand Up @@ -140,7 +140,13 @@ func (cds *crdbDatastore) watch(
}

resolvedDurationString := strconv.FormatInt(resolvedDuration.Milliseconds(), 10) + "ms"
interpolated := fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString)

var interpolated string
if cds.version.Major >= 22 {
interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString, resolvedDurationString)
} else {
interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString)
}

sendError := func(err error) {
if errors.Is(ctx.Err(), context.Canceled) {
Expand Down

0 comments on commit bbfbe20

Please sign in to comment.