Skip to content

Commit

Permalink
sqlcapture: Temporarily initialize KeyColumns
Browse files Browse the repository at this point in the history
This commit adds a temporary bit of migration logic which will
fill in `state.KeyColumns` for any already-active only-changes
bindings with a nil value for that state property.

This means that preexisting bindings will behave the same as
any new ones initialized after the corresponding change to the
activatePendingStreams logic.

Since this will immediately add the missing property as soon
as a task restarts and it will persist thereafter, the migration
can probably be removed in just a few days at most.
  • Loading branch information
willdonnelly committed Dec 13, 2024
1 parent 260b22d commit 027f511
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions sqlcapture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,26 @@ func (c *Capture) reconcileStateWithBindings(_ context.Context) error {
}
}

// This is part of a migration dated 2024-12-12 and can be removed within a few days. The
// only purpose here is to make sure that preexisting only-changes bindings have an
// initialized StateKey value, the same as activatePendingStreams has been modified to
// produce going forward.
for _, binding := range c.Bindings {
if state, ok := c.State.Streams[binding.StateKey]; ok && binding.Resource.Mode == BackfillModeOnlyChanges && state.Mode == TableStateActive && state.KeyColumns == nil {
if !slices.Equal(binding.CollectionKey, c.Database.FallbackCollectionKey()) {
for _, ptr := range binding.CollectionKey {
state.KeyColumns = append(state.KeyColumns, collectionKeyToPrimaryKey(ptr))
}
}
if len(binding.Resource.PrimaryKey) > 0 {
logrus.WithFields(logrus.Fields{"stateKey": binding.StateKey, "key": binding.Resource.PrimaryKey}).Debug("key overriden by resource config")
state.KeyColumns = binding.Resource.PrimaryKey
}
logrus.WithField("stateKey", binding.StateKey).WithField("key", state.KeyColumns).Info("initialized missing KeyColumns state for only-changes binding")
state.dirty = true
}
}

// If all bindings are new (or there are no bindings), reset the replication cursor. This is
// safe because logically if no streams are currently active then we can't miss any events of
// interest when the replication stream jumps ahead, and doing this allows the user an easy
Expand Down

0 comments on commit 027f511

Please sign in to comment.