Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tournament termination function and flag for guarding against race condition on End callback #729

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions server/core_tournament.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,29 @@ func TournamentDelete(ctx context.Context, cache LeaderboardCache, rankCache Lea
return nil
}

func TournamentTerminate(ctx context.Context, cache LeaderboardCache, rankCache LeaderboardRankCache, scheduler LeaderboardScheduler, leaderboardId string) error {
ts := time.Now().Unix()
tMinusOne := time.Unix(ts-1, 0).UTC()

if err := TournamentDelete(ctx, cache, rankCache, scheduler, leaderboardId); err != nil {
return err
}
leaderboard := cache.Get(leaderboardId)
if leaderboard == nil || !leaderboard.IsTournament() {
// If it does not exist treat it as success.
return nil
}

if leaderboard.EndTime > 0 && leaderboard.EndTime < ts {
// Tournament has ended, callback has already run.
return nil
}

scheduler.QueueCallback(&LeaderboardSchedulerCallback{id: leaderboardId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End})

return nil
}

func TournamentAddAttempt(ctx context.Context, logger *zap.Logger, db *sql.DB, cache LeaderboardCache, leaderboardId string, owner string, count int) error {
if count == 0 {
// No-op.
Expand Down
36 changes: 19 additions & 17 deletions server/leaderboard_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,25 @@ const (
)

type Leaderboard struct {
Id string
Authoritative bool
SortOrder int
Operator int
ResetScheduleStr string
ResetSchedule *cronexpr.Expression
Metadata string
CreateTime int64
Category int
Description string
Duration int
EndTime int64
JoinRequired bool
MaxSize int
MaxNumScore int
Title string
StartTime int64
mu sync.Mutex // Guarding endCallbackInvoked
Id string
Authoritative bool
SortOrder int
Operator int
ResetScheduleStr string
ResetSchedule *cronexpr.Expression
Metadata string
CreateTime int64
Category int
Description string
Duration int
EndTime int64
JoinRequired bool
MaxSize int
MaxNumScore int
Title string
StartTime int64
endCallbackInvoked bool
}

func (l *Leaderboard) IsTournament() bool {
Expand Down
56 changes: 45 additions & 11 deletions server/leaderboard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@ import (
"go.uber.org/zap"
)

type CallbackType int

const (
Expiry CallbackType = iota // Reset
End
)

type LeaderboardSchedulerCallback struct {
id string
leaderboard *Leaderboard
ts int64
t time.Time
id string
leaderboard *Leaderboard
ts int64
t time.Time
callbackType CallbackType
}

type LeaderboardScheduler interface {
Expand All @@ -39,6 +47,7 @@ type LeaderboardScheduler interface {
Resume()
Stop()
Update()
QueueCallback(callback *LeaderboardSchedulerCallback)
}

type LocalLeaderboardScheduler struct {
Expand Down Expand Up @@ -279,6 +288,10 @@ func (ls *LocalLeaderboardScheduler) Update() {
ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds)))
}

func (ls *LocalLeaderboardScheduler) QueueCallback(c *LeaderboardSchedulerCallback) {
ls.queue <- c
}

func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) {
if ls.active.Load() != 1 {
// Not active.
Expand Down Expand Up @@ -311,8 +324,15 @@ func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []str
// Process the current set of tournament ends.
for _, id := range ids {
currentId := id

leaderboard := ls.cache.Get(id)
if leaderboard == nil {
// Cached entry was deleted before it reached the scheduler here.
continue
}

// Will block if the queue is full.
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne}
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End}
}
}()
}
Expand Down Expand Up @@ -357,7 +377,7 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string
continue
}
// Will block if queue is full.
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne}
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: Expiry}
}
}()
}
Expand All @@ -368,7 +388,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() {
case <-ls.ctx.Done():
return
case callback := <-ls.queue:
if callback.leaderboard != nil {
if callback.callbackType == Expiry {
if callback.leaderboard.IsTournament() {
// Tournament, fetch most up to date info for size etc.
// Some processing is needed even if there is no runtime callback registered for tournament reset.
Expand Down Expand Up @@ -412,6 +432,11 @@ WHERE id = $1`
}
}
} else {
// Skip processing if there is no tournament end callback registered.
if ls.fnTournamentEnd == nil {
continue
}

query := `SELECT
id, sort_order, operator, reset_schedule, metadata, create_time,
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
Expand All @@ -427,10 +452,19 @@ WHERE id = $1`
continue
}

// fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all.
if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
}
func() {
callback.leaderboard.mu.Lock()
defer callback.leaderboard.mu.Unlock()
if callback.leaderboard.endCallbackInvoked {
// already activated once
return
}
// fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all.
if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
}
callback.leaderboard.endCallbackInvoked = true
}()
}
}
}
Expand Down