Skip to content

Commit

Permalink
add metric for catchup failure and increase catchup time to 2 minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
eparker-tulip committed Dec 11, 2024
1 parent e6eaeea commit 2f8841b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 17 deletions.
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

buildGoModule {
pname = "oplogtoredis";
version = "3.8.0";
version = "3.8.1";
src = builtins.path { path = ./.; };

postInstall = ''
Expand Down
4 changes: 2 additions & 2 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type oplogtoredisConfiguration struct {
HTTPServerAddr string `default:"0.0.0.0:9000" envconfig:"HTTP_SERVER_ADDR"`
BufferSize int `default:"10000" split_words:"true"`
TimestampFlushInterval time.Duration `default:"1s" split_words:"true"`
MaxCatchUp time.Duration `default:"60s" split_words:"true"`
RedisDedupeExpiration time.Duration `default:"120s" split_words:"true"`
MaxCatchUp time.Duration `default:"120s" split_words:"true"`
RedisDedupeExpiration time.Duration `default:"150s" split_words:"true"`
RedisMetadataPrefix string `default:"oplogtoredis::" split_words:"true"`
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
Expand Down
20 changes: 10 additions & 10 deletions lib/config/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var envTests = map[string]struct {
HTTPServerAddr: "0.0.0.0:9000",
BufferSize: 10000,
TimestampFlushInterval: time.Second,
MaxCatchUp: time.Minute,
RedisDedupeExpiration: 2 * time.Minute,
MaxCatchUp: 2 * time.Minute,
RedisDedupeExpiration: 2 * time.Minute + 30 * time.Second,
RedisMetadataPrefix: "oplogtoredis::",
},
},
Expand Down Expand Up @@ -109,42 +109,42 @@ func TestParseEnv(t *testing.T) {

func checkConfigExpectation(t *testing.T, expectedConfig *oplogtoredisConfiguration) {
if expectedConfig.MongoURL != MongoURL() {
t.Errorf("Incorrect Mongo URL. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect Mongo URL. Expected \"%s\", Got \"%s\"",
expectedConfig.MongoURL, MongoURL())
}

if expectedConfig.RedisURL != strings.Join(RedisURL()[:], "") {
t.Errorf("Incorrect Redis URL. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect Redis URL. Expected \"%s\", Got \"%s\"",
expectedConfig.RedisURL, RedisURL())
}

if expectedConfig.HTTPServerAddr != HTTPServerAddr() {
t.Errorf("Incorrect HTTPServerAddr. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect HTTPServerAddr. Expected \"%s\", Got \"%s\"",
expectedConfig.HTTPServerAddr, HTTPServerAddr())
}

if expectedConfig.BufferSize != BufferSize() {
t.Errorf("Incorrect BufferSize. Got %d, Expected %d",
t.Errorf("Incorrect BufferSize. Expected %d, Got %d",
expectedConfig.BufferSize, BufferSize())
}

if expectedConfig.TimestampFlushInterval != TimestampFlushInterval() {
t.Errorf("Incorrect TimestampFlushInterval. Got %d, Expected %d",
t.Errorf("Incorrect TimestampFlushInterval. Expected %d, Got %d",
expectedConfig.TimestampFlushInterval, TimestampFlushInterval())
}

if expectedConfig.MaxCatchUp != MaxCatchUp() {
t.Errorf("Incorrect MaxCatchUp. Got %d, Expected %d",
t.Errorf("Incorrect MaxCatchUp. Expected %d, Got %d",
expectedConfig.MaxCatchUp, MaxCatchUp())
}

if expectedConfig.RedisDedupeExpiration != RedisDedupeExpiration() {
t.Errorf("Incorrect RedisDedupeExpiration. Got %d, Expected %d",
t.Errorf("Incorrect RedisDedupeExpiration. Expected %d, Got %d",
expectedConfig.RedisDedupeExpiration, RedisDedupeExpiration())
}

if expectedConfig.RedisMetadataPrefix != RedisMetadataPrefix() {
t.Errorf("Incorrect RedisMetadataPrefix. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect RedisMetadataPrefix. Expected \"%s\", Got \"%s\"",
expectedConfig.RedisMetadataPrefix, RedisMetadataPrefix())
}
}
16 changes: 12 additions & 4 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ var (
Name: "last_received_staleness",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
}, []string{"ordinal"})

metricOplogFailedResume = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "resume_failed",
Help: "Failures to resume tailing, gaps in publications",
})
)

func init() {
Expand Down Expand Up @@ -441,20 +448,21 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry
ts, tsTime, redisErr := redispub.FirstLastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, maxOrdinal)

if redisErr == nil {
// we have a last write time, check that it's not too far in the
// past
// we have a last write time, check that it's not too far in the past
if tsTime.After(time.Now().Add(-1 * tailer.MaxCatchUp)) {
log.Log.Infof("Found last processed timestamp, resuming oplog tailing from %d", tsTime.Unix())
log.Log.Infof("Found last processed timestamp (%d), resuming oplog tailing from %ds ago", tsTime.Unix(), time.Since(tsTime) / time.Second)
return ts
}

log.Log.Warnf("Found last processed timestamp, but it was too far in the past (%d). Will start from end of oplog", tsTime.Unix())
log.Log.Warnf("Found last processed timestamp (%d), but it was too far in the past (%ds ago). Will start from end of oplog", tsTime.Unix(), time.Since(tsTime) / time.Second)
}

if (redisErr != nil) && (redisErr != redis.Nil) {
log.Log.Errorw("Error querying Redis for last processed timestamp. Will start from end of oplog.",
"error", redisErr)
}

metricOplogFailedResume.Inc()

mongoOplogEndTimestamp, mongoErr := getTimestampOfLastOplogEntry()
if mongoErr == nil {
Expand Down

0 comments on commit 2f8841b

Please sign in to comment.