From 2a6cdf9c22c85918062064aebe800829685fb0b6 Mon Sep 17 00:00:00 2001 From: Fernando Takagi Date: Tue, 30 Nov 2021 16:09:05 -0300 Subject: [PATCH 1/3] Add tournament termination function and guarding against race conditions on End callback --- server/core_tournament.go | 23 ++++++++++++++++ server/leaderboard_cache.go | 36 +++++++++++++------------ server/leaderboard_scheduler.go | 47 ++++++++++++++++++++++++++++----- 3 files changed, 82 insertions(+), 24 deletions(-) diff --git a/server/core_tournament.go b/server/core_tournament.go index 52a9577bca..45e7d43ed0 100644 --- a/server/core_tournament.go +++ b/server/core_tournament.go @@ -84,6 +84,29 @@ func TournamentDelete(ctx context.Context, cache LeaderboardCache, rankCache Lea return nil } +func TournamentTerminate(ctx context.Context, cache LeaderboardCache, rankCache LeaderboardRankCache, scheduler LeaderboardScheduler, leaderboardId string) error { + ts := time.Now().Unix() + tMinusOne := time.Unix(ts-1, 0).UTC() + + if err := TournamentDelete(ctx, cache, rankCache, scheduler, leaderboardId); err != nil { + return err + } + leaderboard := cache.Get(leaderboardId) + if leaderboard == nil || !leaderboard.IsTournament() { + // If it does not exist treat it as success. + return nil + } + + if leaderboard.EndTime > 0 && leaderboard.EndTime < ts { + // Tournament has ended, callback has already run. + return nil + } + + scheduler.QueueCallback(&LeaderboardSchedulerCallback{id: leaderboardId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End}) + + return nil +} + func TournamentAddAttempt(ctx context.Context, logger *zap.Logger, db *sql.DB, cache LeaderboardCache, leaderboardId string, owner string, count int) error { if count == 0 { // No-op. diff --git a/server/leaderboard_cache.go b/server/leaderboard_cache.go index 98729b8acc..8ba839d4c8 100644 --- a/server/leaderboard_cache.go +++ b/server/leaderboard_cache.go @@ -43,23 +43,25 @@ const ( ) type Leaderboard struct { - Id string - Authoritative bool - SortOrder int - Operator int - ResetScheduleStr string - ResetSchedule *cronexpr.Expression - Metadata string - CreateTime int64 - Category int - Description string - Duration int - EndTime int64 - JoinRequired bool - MaxSize int - MaxNumScore int - Title string - StartTime int64 + sync.Mutex // Guarding EndCallbackInvoked + Id string + Authoritative bool + SortOrder int + Operator int + ResetScheduleStr string + ResetSchedule *cronexpr.Expression + Metadata string + CreateTime int64 + Category int + Description string + Duration int + EndTime int64 + JoinRequired bool + MaxSize int + MaxNumScore int + Title string + StartTime int64 + EndCallbackInvoked bool } func (l *Leaderboard) IsTournament() bool { diff --git a/server/leaderboard_scheduler.go b/server/leaderboard_scheduler.go index adec8cbb3b..5de4150500 100644 --- a/server/leaderboard_scheduler.go +++ b/server/leaderboard_scheduler.go @@ -26,11 +26,19 @@ import ( "go.uber.org/zap" ) +type CallbackType int + +const ( + Expiry CallbackType = iota // Reset + End +) + type LeaderboardSchedulerCallback struct { - id string - leaderboard *Leaderboard - ts int64 - t time.Time + id string + leaderboard *Leaderboard + ts int64 + t time.Time + callbackType CallbackType } type LeaderboardScheduler interface { @@ -39,6 +47,7 @@ type LeaderboardScheduler interface { Resume() Stop() Update() + QueueCallback(callback *LeaderboardSchedulerCallback) } type LocalLeaderboardScheduler struct { @@ -279,6 +288,10 @@ func (ls *LocalLeaderboardScheduler) Update() { ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds))) } +func (ls *LocalLeaderboardScheduler) QueueCallback(c *LeaderboardSchedulerCallback) { + ls.queue <- c +} + func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) { if ls.active.Load() != 1 { // Not active. @@ -311,8 +324,15 @@ func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []str // Process the current set of tournament ends. for _, id := range ids { currentId := id + + leaderboard := ls.cache.Get(id) + if leaderboard == nil { + // Cached entry was deleted before it reached the scheduler here. + continue + } + // Will block if the queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne} + ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End} } }() } @@ -357,7 +377,7 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string continue } // Will block if queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne} + ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: Expiry} } }() } @@ -368,7 +388,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() { case <-ls.ctx.Done(): return case callback := <-ls.queue: - if callback.leaderboard != nil { + if callback.callbackType == Expiry { if callback.leaderboard.IsTournament() { // Tournament, fetch most up to date info for size etc. // Some processing is needed even if there is no runtime callback registered for tournament reset. @@ -412,6 +432,11 @@ WHERE id = $1` } } } else { + // Skip processing if there is no tournament end callback registered. + if ls.fnTournamentEnd == nil { + continue + } + query := `SELECT id, sort_order, operator, reset_schedule, metadata, create_time, category, description, duration, end_time, max_size, max_num_score, title, size, start_time @@ -427,10 +452,18 @@ WHERE id = $1` continue } + callback.leaderboard.Lock() + if callback.leaderboard.EndCallbackInvoked { + callback.leaderboard.Unlock() + // already activated once + continue + } // fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all. if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil { ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err)) } + callback.leaderboard.EndCallbackInvoked = true + callback.leaderboard.Unlock() } } } From cc0e9bf2ed08dd19b4abc6823ade879c08be8919 Mon Sep 17 00:00:00 2001 From: Fernando Takagi Date: Wed, 1 Dec 2021 10:41:16 -0300 Subject: [PATCH 2/3] Change to safer locking on tournament end callback --- server/leaderboard_scheduler.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/server/leaderboard_scheduler.go b/server/leaderboard_scheduler.go index 5de4150500..93d8cd89f4 100644 --- a/server/leaderboard_scheduler.go +++ b/server/leaderboard_scheduler.go @@ -452,18 +452,19 @@ WHERE id = $1` continue } - callback.leaderboard.Lock() - if callback.leaderboard.EndCallbackInvoked { - callback.leaderboard.Unlock() - // already activated once - continue - } - // fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all. - if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil { - ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err)) - } - callback.leaderboard.EndCallbackInvoked = true - callback.leaderboard.Unlock() + func() { + callback.leaderboard.Lock() + defer callback.leaderboard.Unlock() + if callback.leaderboard.EndCallbackInvoked { + // already activated once + return + } + // fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all. + if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil { + ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err)) + } + callback.leaderboard.EndCallbackInvoked = true + }() } } } From 5cfcac4535fe1f14684002981b3f9e8d7281928a Mon Sep 17 00:00:00 2001 From: Fernando Takagi Date: Wed, 1 Dec 2021 10:49:21 -0300 Subject: [PATCH 3/3] Change better encapsulation for mutable state --- server/leaderboard_cache.go | 4 ++-- server/leaderboard_scheduler.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/leaderboard_cache.go b/server/leaderboard_cache.go index 8ba839d4c8..745446e9be 100644 --- a/server/leaderboard_cache.go +++ b/server/leaderboard_cache.go @@ -43,7 +43,7 @@ const ( ) type Leaderboard struct { - sync.Mutex // Guarding EndCallbackInvoked + mu sync.Mutex // Guarding endCallbackInvoked Id string Authoritative bool SortOrder int @@ -61,7 +61,7 @@ type Leaderboard struct { MaxNumScore int Title string StartTime int64 - EndCallbackInvoked bool + endCallbackInvoked bool } func (l *Leaderboard) IsTournament() bool { diff --git a/server/leaderboard_scheduler.go b/server/leaderboard_scheduler.go index 93d8cd89f4..1a82ac7ad2 100644 --- a/server/leaderboard_scheduler.go +++ b/server/leaderboard_scheduler.go @@ -453,9 +453,9 @@ WHERE id = $1` } func() { - callback.leaderboard.Lock() - defer callback.leaderboard.Unlock() - if callback.leaderboard.EndCallbackInvoked { + callback.leaderboard.mu.Lock() + defer callback.leaderboard.mu.Unlock() + if callback.leaderboard.endCallbackInvoked { // already activated once return } @@ -463,7 +463,7 @@ WHERE id = $1` if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil { ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err)) } - callback.leaderboard.EndCallbackInvoked = true + callback.leaderboard.endCallbackInvoked = true }() } }