Skip to content

Commit

Permalink
Increaes RecordHeartbeat frequency in VerifyReplicationTask (#4771)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**


<!-- Tell your future self why have you made these changes -->
**Why?**


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**


<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
hehaifengcn committed Aug 14, 2023
1 parent 194a893 commit 1eaaaf8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 38 deletions.
26 changes: 16 additions & 10 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,17 @@ func (a *activities) verifyReplicationTasks(
details *replicationTasksHeartbeatDetails,
remoteClient adminservice.AdminServiceClient,
ns *namespace.Namespace,
heartbeat func(details replicationTasksHeartbeatDetails),
) (bool, []SkippedWorkflowExecution, error) {
start := time.Now()
progress := false
defer func() {
if progress {
// Update CheckPoint where there is a progress
details.CheckPoint = time.Now()
}

heartbeat(*details)
a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

Expand Down Expand Up @@ -636,6 +644,9 @@ func (a *activities) verifyReplicationTasks(

return false, skippedList, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
}

heartbeat(*details)
progress = true
}

return true, skippedList, nil
Expand Down Expand Up @@ -682,24 +693,19 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
// - more than checkSkipThreshold, it checks if outstanding workflow execution can be skipped locally (#2 and #3)
// - more than NonRetryableTimeout, it means potentially #4. The activity returns
// non-retryable error and force-replication will fail.

for {
// Since replication has a lag, sleep first.
time.Sleep(request.VerifyInterval)

lastIndex := details.NextIndex
verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry)
verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry,
func(d replicationTasksHeartbeatDetails) {
activity.RecordHeartbeat(ctx, d)
})

if err != nil {
return response, err
}

if lastIndex < details.NextIndex {
// Update CheckPoint where there is a progress
details.CheckPoint = time.Now()
}

activity.RecordHeartbeat(ctx, details)

if len(skippedList) > 0 {
response.SkippedWorkflowExecutions = append(response.SkippedWorkflowExecutions, skippedList...)
response.SkippedWorkflowCount = len(response.SkippedWorkflowExecutions)
Expand Down
66 changes: 38 additions & 28 deletions service/worker/migration/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,7 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() {
_, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request)
s.NoError(err)

s.Greater(len(iceptor.replicationRecordedHeartbeats), 0)
lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1]
s.Equal(len(request.Executions), lastHeartBeat.NextIndex)
s.Equal(len(iceptor.replicationRecordedHeartbeats), 1)
}

type executionState int
Expand Down Expand Up @@ -383,6 +381,14 @@ func createExecutions(mockClient *adminservicemock.MockAdminServiceClient, state
return executions
}

type mockHeartBeatRecorder struct {
lastHeartBeat replicationTasksHeartbeatDetails
}

func (m *mockHeartBeatRecorder) hearbeat(details replicationTasksHeartbeatDetails) {
m.lastHeartBeat = details
}

func (s *activitiesSuite) Test_verifyReplicationTasks() {
request := verifyReplicationTasksRequest{
Namespace: mockedNamespace,
Expand All @@ -393,36 +399,36 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() {
ctx := context.TODO()

var tests = []struct {
executionStates []executionState
nextIndex int
expectedVerified bool
expectedErr error
expectedIndex int
remoteExecutionStates []executionState
nextIndex int
expectedVerified bool
expectedErr error
expectedIndex int
}{
{
expectedVerified: true,
expectedErr: nil,
},
{
executionStates: []executionState{executionFound, executionFound, executionFound, executionFound},
nextIndex: 0,
expectedVerified: true,
expectedErr: nil,
expectedIndex: 4,
remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound},
nextIndex: 0,
expectedVerified: true,
expectedErr: nil,
expectedIndex: 4,
},
{
executionStates: []executionState{executionFound, executionFound, executionFound, executionFound},
nextIndex: 2,
expectedVerified: true,
expectedErr: nil,
expectedIndex: 4,
remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound},
nextIndex: 2,
expectedVerified: true,
expectedErr: nil,
expectedIndex: 4,
},
{
executionStates: []executionState{executionFound, executionFound, executionNotfound},
nextIndex: 0,
expectedVerified: false,
expectedErr: nil,
expectedIndex: 2,
remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound},
nextIndex: 0,
expectedVerified: false,
expectedErr: nil,
expectedIndex: 2,
},
}

Expand All @@ -432,20 +438,22 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() {
}).Return(&completeState, nil).AnyTimes()

for _, tc := range tests {
var recorder mockHeartBeatRecorder
mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller)
request.Executions = createExecutions(mockRemoteAdminClient, tc.executionStates, tc.nextIndex)
request.Executions = createExecutions(mockRemoteAdminClient, tc.remoteExecutionStates, tc.nextIndex)
details := replicationTasksHeartbeatDetails{
NextIndex: tc.nextIndex,
}

verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace)
verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, recorder.hearbeat)
if tc.expectedErr == nil {
s.NoError(err)
}
s.Equal(tc.expectedVerified, verified)
s.Equal(tc.expectedIndex, details.NextIndex)
s.GreaterOrEqual(len(tc.executionStates), details.NextIndex)
if details.NextIndex < len(tc.executionStates) && tc.executionStates[details.NextIndex] == executionNotfound {
s.GreaterOrEqual(len(tc.remoteExecutionStates), details.NextIndex)
s.Equal(recorder.lastHeartBeat, details)
if details.NextIndex < len(tc.remoteExecutionStates) && tc.remoteExecutionStates[details.NextIndex] == executionNotfound {
s.Equal(execution1, details.LastNotFoundWorkflowExecution)
}
}
Expand Down Expand Up @@ -476,6 +484,7 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() {
}

for _, tc := range tests {
var recorder mockHeartBeatRecorder
deleteTime := time.Now().Add(tc.deleteDiff)
retention := time.Hour
closeTime := deleteTime.Add(-retention)
Expand Down Expand Up @@ -512,9 +521,10 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() {

details := replicationTasksHeartbeatDetails{}
ctx := context.TODO()
verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns)
verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, recorder.hearbeat)
s.NoError(err)
s.Equal(tc.verified, verified)
s.Equal(recorder.lastHeartBeat, details)
}
}

Expand Down

0 comments on commit 1eaaaf8

Please sign in to comment.