-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use assertion function #7433
base: priority
Are you sure you want to change the base?
Use assertion function #7433
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,9 +36,11 @@ import ( | |
"go.temporal.io/server/common" | ||
"go.temporal.io/server/common/backoff" | ||
"go.temporal.io/server/common/clock" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/metrics" | ||
"go.temporal.io/server/common/primitives/timestamp" | ||
"go.temporal.io/server/common/quotas" | ||
"go.temporal.io/server/common/softassert" | ||
"go.temporal.io/server/common/tqid" | ||
"go.temporal.io/server/common/util" | ||
) | ||
|
@@ -59,7 +61,8 @@ type priTaskMatcher struct { | |
fwdr *priForwarder | ||
validator taskValidator | ||
metricsHandler metrics.Handler // namespace metric scope | ||
numPartitions func() int // number of task queue partitions | ||
logger log.Logger | ||
numPartitions func() int // number of task queue partitions | ||
|
||
limiterLock sync.Mutex | ||
adminNsRate float64 | ||
|
@@ -118,12 +121,14 @@ func newPriTaskMatcher( | |
partition tqid.Partition, | ||
fwdr *priForwarder, | ||
validator taskValidator, | ||
logger log.Logger, | ||
metricsHandler metrics.Handler, | ||
) *priTaskMatcher { | ||
tm := &priTaskMatcher{ | ||
config: config, | ||
data: newMatcherData(config), | ||
data: newMatcherData(config, logger), | ||
tqCtx: tqCtx, | ||
logger: logger, | ||
metricsHandler: metricsHandler, | ||
partition: partition, | ||
fwdr: fwdr, | ||
|
@@ -178,7 +183,7 @@ func (tm *priTaskMatcher) forwardTasks(lim quotas.RateLimiter, retrier backoff.R | |
if res.ctxErr != nil { | ||
return // task queue closing | ||
} | ||
bugIf(res.task == nil, "bug: bad match result in forwardTasks") | ||
softassert.That(tm.logger, res.task != nil, "expected a task from match") | ||
|
||
err := tm.forwardTask(res.task) | ||
|
||
|
@@ -247,11 +252,13 @@ func (tm *priTaskMatcher) validateTasksOnRoot(lim quotas.RateLimiter, retrier ba | |
if res.ctxErr != nil { | ||
return // task queue closing | ||
} | ||
bugIf(res.task == nil, "bug: bad match result in validateTasksOnRoot") | ||
softassert.That(tm.logger, res.task != nil, "expected a task from match") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this too |
||
|
||
task := res.task | ||
bugIf(task.forwardCtx != nil || task.isSyncMatchTask() || task.source != enumsspb.TASK_SOURCE_DB_BACKLOG, | ||
"bug: validator got a sync task") | ||
softassert.That(tm.logger, task.forwardCtx == nil, "expected non-forwarded task") | ||
softassert.That(tm.logger, !task.isSyncMatchTask(), "expected non-sync match task") | ||
softassert.That(tm.logger, task.source == enumsspb.TASK_SOURCE_DB_BACKLOG, "expected backlog task") | ||
|
||
maybeValid := tm.validator == nil || tm.validator.maybeValidate(task.event.AllocatedTaskInfo, tm.partition.TaskType()) | ||
if !maybeValid { | ||
// We found an invalid one, complete it and go back for another immediately. | ||
|
@@ -276,7 +283,7 @@ func (tm *priTaskMatcher) forwardPolls() { | |
if res.ctxErr != nil { | ||
return // task queue closing | ||
} | ||
bugIf(res.poller == nil, "bug: bad match result in forwardPolls") | ||
softassert.That(tm.logger, res.poller != nil, "expected a poller from match") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needs to continue or return |
||
|
||
poller := res.poller | ||
// We need to use the real source poller context since it has the poller id and | ||
|
@@ -324,7 +331,7 @@ func (tm *priTaskMatcher) forwardPolls() { | |
func (tm *priTaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error) { | ||
finish := func() (bool, error) { | ||
res, ok := task.getResponse() | ||
bugIf(!ok, "Offer must be given a sync match task") | ||
softassert.That(tm.logger, ok, "expected a sync match task") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needs to return error? |
||
if res.forwarded { | ||
if res.forwardErr == nil { | ||
// task was remotely sync matched on the parent partition | ||
|
@@ -364,7 +371,7 @@ func (tm *priTaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, | |
if res.ctxErr != nil { | ||
return false, res.ctxErr | ||
} | ||
bugIf(res.poller == nil, "bug: bad match result in Offer") | ||
softassert.That(tm.logger, res.poller != nil, "expeced poller from match") | ||
return finish() | ||
} | ||
|
||
|
@@ -403,9 +410,9 @@ again: | |
} | ||
return nil, res.ctxErr | ||
} | ||
bugIf(res.poller == nil, "bug: bad match result in syncOfferTask") | ||
softassert.That(tm.logger, res.poller != nil, "expected poller from match") | ||
response, ok := task.getResponse() | ||
bugIf(!ok, "OfferQuery/OfferNexusTask must be given a sync match task") | ||
softassert.That(tm.logger, ok, "expected a sync match task") | ||
// Note: if task was not forwarded, this will just be the zero value and nil. | ||
// That's intended: the query/nexus handler in matchingEngine will wait for the real | ||
// result separately. | ||
|
@@ -553,7 +560,7 @@ func (tm *priTaskMatcher) poll( | |
} | ||
return nil, errNoTasks | ||
} | ||
bugIf(res.task == nil, "bug: bad match result in poll") | ||
softassert.That(tm.logger, res.task != nil, "expected task from match") | ||
|
||
task := res.task | ||
pollWasForwarded = task.isStarted() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,10 +38,12 @@ import ( | |
"go.temporal.io/server/common" | ||
"go.temporal.io/server/common/backoff" | ||
"go.temporal.io/server/common/clock" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/log/tag" | ||
"go.temporal.io/server/common/metrics" | ||
"go.temporal.io/server/common/persistence" | ||
serviceerrors "go.temporal.io/server/common/serviceerror" | ||
"go.temporal.io/server/common/softassert" | ||
"go.temporal.io/server/common/util" | ||
"golang.org/x/sync/semaphore" | ||
) | ||
|
@@ -63,6 +65,7 @@ type ( | |
backlogMgr *priBacklogManagerImpl | ||
subqueue int | ||
notifyC chan struct{} // Used as signal to notify pump of new tasks | ||
logger log.Logger | ||
|
||
lock sync.Mutex | ||
|
||
|
@@ -98,6 +101,7 @@ func newPriTaskReader( | |
backlogMgr: backlogMgr, | ||
subqueue: subqueue, | ||
notifyC: make(chan struct{}, 1), | ||
logger: backlogMgr.logger, | ||
retrier: backoff.NewRetrier( | ||
common.CreateReadTaskRetryPolicy(), | ||
clock.NewRealTimeSource(), | ||
|
@@ -410,7 +414,7 @@ func (tr *priTaskReader) signalNewTasks(resp subqueueCreateTasksResponse) { | |
// adding these tasks to outstandingTasks. So they should definitely not be there. | ||
for _, t := range resp.tasks { | ||
_, found := tr.outstandingTasks.Get(t.TaskId) | ||
bugIf(found, "bug: newly-written task already present in outstanding tasks") | ||
softassert.That(tr.logger, !found, "newly-written task already present in outstanding tasks") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should probably do a slices.DeleteFunc and remove the ones that are found |
||
} | ||
|
||
tr.recordNewTasksLocked(resp.tasks) | ||
|
@@ -445,8 +449,8 @@ func (tr *priTaskReader) getLoadedTasks() int { | |
|
||
func (tr *priTaskReader) ackTaskLocked(taskId int64) int64 { | ||
wasAlreadyAcked, found := tr.outstandingTasks.Get(taskId) | ||
bugIf(!found, "bug: completed task not found in outstandingTasks") | ||
bugIf(wasAlreadyAcked.(bool), "bug: completed task was already acked") | ||
softassert.That(tr.logger, found, "completed task not found in oustandingTasks") | ||
softassert.That(tr.logger, !wasAlreadyAcked.(bool), "completed task was already acked") | ||
Comment on lines
+452
to
+453
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably just return if either of these are true? |
||
|
||
tr.outstandingTasks.Put(taskId, true) | ||
tr.loadedTasks-- | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should continue or return on failure, otherwise it'll just panic immediately