From 765fcf427b24897c938f7945d118806ad3ab7620 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 9 Aug 2023 14:32:50 -0400 Subject: [PATCH] Significantly reduce the number of goroutines used in the checking stream We now only spawn a goroutine once the minimum amount of resources to be checked are available for checking, rather than always --- internal/graph/checkingresourcestream.go | 110 ++++++++++++++--------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/internal/graph/checkingresourcestream.go b/internal/graph/checkingresourcestream.go index 6deaf08424..8d650c7a47 100644 --- a/internal/graph/checkingresourcestream.go +++ b/internal/graph/checkingresourcestream.go @@ -50,17 +50,31 @@ type resourceQueue struct { beingProcessed map[uint64]possibleResource } +type processingStatus int + +const ( + publishDirectly processingStatus = iota + awaitingMoreResources + readyForProcessing +) + // addPossibleResource queues a resource for processing (if a check is required) or for // immediate publishing (if a check is not required). -func (rq *resourceQueue) addPossibleResource(pr possibleResource) { +func (rq *resourceQueue) addPossibleResource(pr possibleResource) processingStatus { rq.lock.Lock() defer rq.lock.Unlock() if pr.lookupResult != nil { rq.toPublish[pr.orderingIndex] = pr - } else { - rq.toProcess[pr.orderingIndex] = pr + return publishDirectly + } + + rq.toProcess[pr.orderingIndex] = pr + if len(rq.toProcess) < int(datastore.FilterMaximumIDCount) { + return awaitingMoreResources } + + return readyForProcessing } // updateToBePublished marks a resource as ready for publishing. @@ -155,9 +169,9 @@ type checkingResourceStream struct { // rq is the resourceQueue for managing the state of all resources returned by the reachable resources call. rq *resourceQueue - // reachableResourceAvailable is a channel which indicates to the processing worker(s) that work is available + // reachableResourcesAreAvailableForProcessing is a channel which indicates to the processing worker(s) that work is available // for processing. - reachableResourceAvailable chan struct{} + reachableResourcesAreAvailableForProcessing chan struct{} // reachableResourcesCompleted is a channel used to indicate to each processing worker that reachable resources has // been completed, and that all further processing work should be done before shutting down. @@ -179,9 +193,9 @@ type checkingResourceStream struct { // and waitForPublishing() (after reachable resources has completed). reachableResourcesCount uint64 - // lastResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from queue() - // and waitForPublishing() (after reachable resources has completed). - lastResourceCursor *v1.Cursor + // lastReachableResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from + // Publish() and waitForPublishing() (after reachable resources has completed). + lastReachableResourceCursor *v1.Cursor // dispatchesToBeReported is the number of dispatches that were skipped from being reported due // to a resource being filtered, and whose count has to be attached to the next outgoing result. @@ -241,9 +255,9 @@ func newCheckingResourceStream( beingProcessed: map[uint64]possibleResource{}, toPublish: map[uint64]possibleResource{}, }, - reachableResourceAvailable: make(chan struct{}, concurrencyLimit), - reachableResourcesCompleted: make(chan struct{}, concurrencyLimit), - availableForPublishing: make(chan bool, concurrencyLimit), + reachableResourcesAreAvailableForProcessing: make(chan struct{}, concurrencyLimit), + reachableResourcesCompleted: make(chan struct{}, concurrencyLimit), + availableForPublishing: make(chan bool, concurrencyLimit), orderingIndexToBePublished: 0, reachableResourcesCount: 0, @@ -269,9 +283,15 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro crs.reachableResourcesCompleted <- struct{}{} } - // Wait for all processing to complete. + // Wait for all existing processing to complete. crs.processingWaitGroup.Wait() + // Run a final processing call to ensure there are no remaining items. + _, err := crs.runProcess(true) + if err != nil { + return 0, nil, err + } + // Mark publishing as ready for final publishing. select { case crs.availableForPublishing <- false: @@ -285,7 +305,7 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro // Wait for any remaining publishing to complete. crs.publishingWaitGroup.Wait() - return crs.reachableResourcesCount, crs.lastResourceCursor, crs.err + return crs.reachableResourcesCount, crs.lastReachableResourceCursor, crs.err } // resourcePublisher is the goroutine that publishes resources to the parent stream once they've been @@ -393,7 +413,7 @@ func (crs *checkingResourceStream) process() { } return - case <-crs.reachableResourceAvailable: + case <-crs.reachableResourcesAreAvailableForProcessing: for { ok, err := crs.runProcess(false) if err != nil { @@ -413,6 +433,7 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error) // Collect any resources that need to be checked, up to the configured limit, and issue a check. // If a resource does not require a check, simply place on the toPublish queue. toCheck := mapz.NewMultiMap[string, possibleResource]() + toProcess := crs.rq.selectResourcesToProcess(alwaysProcess) if len(toProcess) == 0 { return false, nil @@ -426,11 +447,6 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error) toCheck.Add(current.reachableResult.Resource.ResourceId, current) } - if toCheck.IsEmpty() { - crs.availableForPublishing <- true - return true, nil - } - // Issue the bulk check over all the resources. results, checkResultMetadata, err := computed.ComputeBulkCheck( crs.ctx, @@ -551,8 +567,8 @@ func (crs *checkingResourceStream) spawnIfAvailable() { } } -// queue queues a reachable resources result to be processed by one of the processing worker(s), before publishing. -func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesResponse) bool { +// Publish implements the Stream interface and is invoked by the ReachableResources call. +func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error { currentResource := possibleResource{ reachableResult: result, lookupResult: nil, @@ -573,48 +589,54 @@ func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesRe } } - crs.rq.addPossibleResource(currentResource) crs.reachableResourcesCount++ - crs.lastResourceCursor = result.AfterResponseCursor + crs.lastReachableResourceCursor = result.AfterResponseCursor + + status := crs.rq.addPossibleResource(currentResource) + + switch status { + case publishDirectly: + // If the resource found already has permission (i.e. a check is not required), immediately + // publish it, rather than going through a processing worker. This saves a step for better + // performance. + if result.Resource.ResultStatus != v1.ReachableResource_HAS_PERMISSION { + return spiceerrors.MustBugf("got invalid resource for publish directly") + } - // If the resource found already has permission (i.e. a check is not required), immediately - // publish it, rather than going through a processing worker. This saves a step for better - // performance. - if result.Resource.ResultStatus == v1.ReachableResource_HAS_PERMISSION { select { case crs.availableForPublishing <- true: - return true + return nil case <-crs.reachableContext.Done(): - return false + return nil case <-crs.ctx.Done(): crs.setError(crs.ctx.Err()) - return false + return nil } - } else { + + case awaitingMoreResources: + // If an insufficient amount of resources have been collected for Checking, we're done. + return nil + + case readyForProcessing: + // Otherwise, kick off a worker to process the resources. select { - case crs.reachableResourceAvailable <- struct{}{}: - return true + case crs.reachableResourcesAreAvailableForProcessing <- struct{}{}: + crs.spawnIfAvailable() + return nil case <-crs.reachableContext.Done(): - return false + return nil case <-crs.ctx.Done(): crs.setError(crs.ctx.Err()) - return false + return nil } - } -} -// Publish implements the Stream interface and is invoked by the ReachableResources call. -func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error { - // Queue the result to be processed by the parallel workers. - wasQueued := crs.queue(result) - if wasQueued { - crs.spawnIfAvailable() + default: + return spiceerrors.MustBugf("unknown resource add state") } - return nil } // Context implements the Stream interface.