Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist since token periodically #209

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type PollerID struct {
DeviceID string
}

// alias time.Sleep so tests can monkey patch it out
// alias time.Sleep/time.Since so tests can monkey patch it out
var timeSleep = time.Sleep
var timeSince = time.Since

// log at most once every duration. Always logs before terminating.
var logInterval = 30 * time.Second
Expand Down Expand Up @@ -427,9 +428,10 @@ func (p *poller) Terminate() {
}

type pollLoopState struct {
firstTime bool
failCount int
since string
firstTime bool
failCount int
since string
lastStoredSince time.Time // The time we last stored the since token in the database
}

// Poll will block forever, repeatedly calling v2 sync. Do this in a goroutine.
Expand Down Expand Up @@ -463,6 +465,8 @@ func (p *poller) Poll(since string) {
firstTime: true,
failCount: 0,
since: since,
// Setting time.Time{} results in the first poll loop to immediately store the since token.
lastStoredSince: time.Time{},
}
for !p.terminated.Load() {
ctx, task := internal.StartTask(ctx, "Poll")
Expand Down Expand Up @@ -508,7 +512,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error {
p.numOutstandingSyncReqs.Dec()
}
region.End()
p.trackRequestDuration(time.Since(start), s.since == "", s.firstTime)
p.trackRequestDuration(timeSince(start), s.since == "", s.firstTime)
if p.terminated.Load() {
return fmt.Errorf("poller terminated")
}
Expand Down Expand Up @@ -544,14 +548,18 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error {
wasFirst := s.firstTime

s.since = resp.NextBatch
// persist the since token (TODO: this could get slow if we hammer the DB too much)
p.receiver.UpdateDeviceSince(ctx, p.userID, p.deviceID, s.since)
// Persist the since token if it either was more than one minute ago since we
// last stored it OR the response contains to-device messages
if timeSince(s.lastStoredSince) > time.Minute || len(resp.ToDevice.Events) > 0 {
p.receiver.UpdateDeviceSince(ctx, p.userID, p.deviceID, s.since)
s.lastStoredSince = time.Now()
}

if s.firstTime {
s.firstTime = false
p.wg.Done()
}
p.trackProcessDuration(time.Since(start), wasInitial, wasFirst)
p.trackProcessDuration(timeSince(start), wasInitial, wasFirst)
p.maybeLogStats(false)
return nil
}
Expand Down Expand Up @@ -727,7 +735,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) {
}

func (p *poller) maybeLogStats(force bool) {
if !force && time.Since(p.lastLogged) < logInterval {
if !force && timeSince(p.lastLogged) < logInterval {
// only log at most once every logInterval
return
}
Expand Down
136 changes: 131 additions & 5 deletions sync2/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func TestPollerPollFromExisting(t *testing.T) {
json.RawMessage(`{"event":10}`),
},
}
toDeviceResponses := [][]json.RawMessage{
{}, {}, {}, {json.RawMessage(`{}`)},
}
hasPolledSuccessfully := make(chan struct{})
accumulator, client := newMocks(func(authHeader, since string) (*SyncResponse, int, error) {
if since == "" {
Expand All @@ -295,6 +298,10 @@ func TestPollerPollFromExisting(t *testing.T) {
var joinResp SyncV2JoinResponse
joinResp.Timeline.Events = roomTimelineResponses[sinceInt]
return &SyncResponse{
// Add in dummy toDevice messages, so the poller actually persists the since token. (Which
// it only does for the first poll, after 1min (this test doesn't run that long) OR there are
// ToDevice messages in the response)
ToDevice: EventsResponse{Events: toDeviceResponses[sinceInt]},
NextBatch: fmt.Sprintf("%d", sinceInt+1),
Rooms: struct {
Join map[string]SyncV2JoinResponse `json:"join"`
Expand Down Expand Up @@ -336,6 +343,121 @@ func TestPollerPollFromExisting(t *testing.T) {
}
}

// Check that the since token in the database
// 1. is updated if it is the first iteration of poll
// 2. is NOT updated for random events
// 3. is updated if the syncV2 response contains ToDevice messages
// 4. is updated if at least 1min has passed since we last stored a token
func TestPollerPollUpdateDeviceSincePeriodically(t *testing.T) {
pid := PollerID{UserID: "@alice:localhost", DeviceID: "FOOBAR"}

syncResponses := make(chan *SyncResponse, 1)
syncCalledWithSince := make(chan string)
accumulator, client := newMocks(func(authHeader, since string) (*SyncResponse, int, error) {
Copy link
Member

@kegsay kegsay Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check that client.DoSyncV2 is called with the correct since tokens.

if since != "" {
syncCalledWithSince <- since
}
return <-syncResponses, 200, nil
})
accumulator.updateSinceCalled = make(chan struct{}, 1)
poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false)
defer poller.Terminate()
go func() {
poller.Poll("0")
}()

hasPolledSuccessfully := make(chan struct{})

go func() {
poller.WaitUntilInitialSync()
close(hasPolledSuccessfully)
}()

// 1. Initial poll updates the database
next := "1"
syncResponses <- &SyncResponse{NextBatch: next}
mustEqualSince(t, <-syncCalledWithSince, "0")

select {
case <-hasPolledSuccessfully:
break
case <-time.After(time.Second):
t.Errorf("WaitUntilInitialSync failed to fire")
}
// Also check that UpdateDeviceSince was called
select {
case <-accumulator.updateSinceCalled:
case <-time.After(time.Millisecond * 100): // give the Poller some time to process the response
t.Fatalf("did not receive call to UpdateDeviceSince in time")
}

if got := accumulator.pollerIDToSince[pid]; got != next {
t.Fatalf("expected since to be updated to %s, but got %s", next, got)
}

// The since token used by calls to doSyncV2
wantSinceFromSync := next

// 2. Second request updates the state but NOT the database
syncResponses <- &SyncResponse{NextBatch: "2"}
mustEqualSince(t, <-syncCalledWithSince, wantSinceFromSync)

select {
case <-accumulator.updateSinceCalled:
t.Fatalf("unexpected call to UpdateDeviceSince")
case <-time.After(time.Millisecond * 100):
}

if got := accumulator.pollerIDToSince[pid]; got != next {
t.Fatalf("expected since to be updated to %s, but got %s", next, got)
}

// 3. Sync response contains a toDevice message and should be stored in the database
wantSinceFromSync = "2"
next = "3"
syncResponses <- &SyncResponse{
NextBatch: next,
ToDevice: EventsResponse{Events: []json.RawMessage{{}}},
}
mustEqualSince(t, <-syncCalledWithSince, wantSinceFromSync)

select {
case <-accumulator.updateSinceCalled:
case <-time.After(time.Millisecond * 100):
t.Fatalf("did not receive call to UpdateDeviceSince in time")
}

if got := accumulator.pollerIDToSince[pid]; got != next {
t.Fatalf("expected since to be updated to %s, but got %s", wantSinceFromSync, got)
}
wantSinceFromSync = next

// 4. ... some time has passed, this triggers the 1min limit
timeSince = func(d time.Time) time.Duration {
return time.Minute * 2
}
next = "10"
syncResponses <- &SyncResponse{NextBatch: next}
mustEqualSince(t, <-syncCalledWithSince, wantSinceFromSync)

select {
case <-accumulator.updateSinceCalled:
case <-time.After(time.Millisecond * 100):
t.Fatalf("did not receive call to UpdateDeviceSince in time")
}

if got := accumulator.pollerIDToSince[pid]; got != next {
t.Fatalf("expected since to be updated to %s, but got %s", wantSinceFromSync, got)
}
}

func mustEqualSince(t *testing.T, gotSince, expectedSince string) {
t.Helper()
if gotSince != expectedSince {
t.Fatalf("client.DoSyncV2 using unexpected since token: %s, want %s", gotSince, expectedSince)
}
}

// Tests that the poller backs off in 2,4,8,etc second increments to a variety of errors
func TestPollerBackoff(t *testing.T) {
deviceID := "FOOBAR"
Expand Down Expand Up @@ -453,11 +575,12 @@ func (c *mockClient) WhoAmI(authHeader string) (string, string, error) {
}

type mockDataReceiver struct {
states map[string][]json.RawMessage
timelines map[string][]json.RawMessage
pollerIDToSince map[PollerID]string
incomingProcess chan struct{}
unblockProcess chan struct{}
states map[string][]json.RawMessage
timelines map[string][]json.RawMessage
pollerIDToSince map[PollerID]string
incomingProcess chan struct{}
unblockProcess chan struct{}
updateSinceCalled chan struct{}
}

func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
Expand All @@ -479,6 +602,9 @@ func (a *mockDataReceiver) SetTyping(ctx context.Context, roomID string, ephEven
}
func (s *mockDataReceiver) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
s.pollerIDToSince[PollerID{UserID: userID, DeviceID: deviceID}] = since
if s.updateSinceCalled != nil {
s.updateSinceCalled <- struct{}{}
}
}
func (s *mockDataReceiver) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
}
Expand Down
Loading