Skip to content

Commit

Permalink
Significantly reduce the number of goroutines used in the checking st…
Browse files Browse the repository at this point in the history
…ream

We now only spawn a goroutine once the minimum amount of resources to be checked are available for checking, rather than always
  • Loading branch information
josephschorr committed Aug 9, 2023
1 parent 41cec17 commit 765fcf4
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions internal/graph/checkingresourcestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,31 @@ type resourceQueue struct {
beingProcessed map[uint64]possibleResource
}

type processingStatus int

const (
publishDirectly processingStatus = iota
awaitingMoreResources
readyForProcessing
)

// addPossibleResource queues a resource for processing (if a check is required) or for
// immediate publishing (if a check is not required).
func (rq *resourceQueue) addPossibleResource(pr possibleResource) {
func (rq *resourceQueue) addPossibleResource(pr possibleResource) processingStatus {
rq.lock.Lock()
defer rq.lock.Unlock()

if pr.lookupResult != nil {
rq.toPublish[pr.orderingIndex] = pr
} else {
rq.toProcess[pr.orderingIndex] = pr
return publishDirectly
}

rq.toProcess[pr.orderingIndex] = pr
if len(rq.toProcess) < int(datastore.FilterMaximumIDCount) {
return awaitingMoreResources
}

return readyForProcessing
}

// updateToBePublished marks a resource as ready for publishing.
Expand Down Expand Up @@ -155,9 +169,9 @@ type checkingResourceStream struct {
// rq is the resourceQueue for managing the state of all resources returned by the reachable resources call.
rq *resourceQueue

// reachableResourceAvailable is a channel which indicates to the processing worker(s) that work is available
// reachableResourcesAreAvailableForProcessing is a channel which indicates to the processing worker(s) that work is available
// for processing.
reachableResourceAvailable chan struct{}
reachableResourcesAreAvailableForProcessing chan struct{}

// reachableResourcesCompleted is a channel used to indicate to each processing worker that reachable resources has
// been completed, and that all further processing work should be done before shutting down.
Expand All @@ -179,9 +193,9 @@ type checkingResourceStream struct {
// and waitForPublishing() (after reachable resources has completed).
reachableResourcesCount uint64

// lastResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from queue()
// and waitForPublishing() (after reachable resources has completed).
lastResourceCursor *v1.Cursor
// lastReachableResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from
// Publish() and waitForPublishing() (after reachable resources has completed).
lastReachableResourceCursor *v1.Cursor

// dispatchesToBeReported is the number of dispatches that were skipped from being reported due
// to a resource being filtered, and whose count has to be attached to the next outgoing result.
Expand Down Expand Up @@ -241,9 +255,9 @@ func newCheckingResourceStream(
beingProcessed: map[uint64]possibleResource{},
toPublish: map[uint64]possibleResource{},
},
reachableResourceAvailable: make(chan struct{}, concurrencyLimit),
reachableResourcesCompleted: make(chan struct{}, concurrencyLimit),
availableForPublishing: make(chan bool, concurrencyLimit),
reachableResourcesAreAvailableForProcessing: make(chan struct{}, concurrencyLimit),
reachableResourcesCompleted: make(chan struct{}, concurrencyLimit),
availableForPublishing: make(chan bool, concurrencyLimit),

orderingIndexToBePublished: 0,
reachableResourcesCount: 0,
Expand All @@ -269,9 +283,15 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro
crs.reachableResourcesCompleted <- struct{}{}
}

// Wait for all processing to complete.
// Wait for all existing processing to complete.
crs.processingWaitGroup.Wait()

// Run a final processing call to ensure there are no remaining items.
_, err := crs.runProcess(true)
if err != nil {
return 0, nil, err
}

// Mark publishing as ready for final publishing.
select {
case crs.availableForPublishing <- false:
Expand All @@ -285,7 +305,7 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro
// Wait for any remaining publishing to complete.
crs.publishingWaitGroup.Wait()

return crs.reachableResourcesCount, crs.lastResourceCursor, crs.err
return crs.reachableResourcesCount, crs.lastReachableResourceCursor, crs.err
}

// resourcePublisher is the goroutine that publishes resources to the parent stream once they've been
Expand Down Expand Up @@ -393,7 +413,7 @@ func (crs *checkingResourceStream) process() {
}
return

case <-crs.reachableResourceAvailable:
case <-crs.reachableResourcesAreAvailableForProcessing:
for {
ok, err := crs.runProcess(false)
if err != nil {
Expand All @@ -413,6 +433,7 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error)
// Collect any resources that need to be checked, up to the configured limit, and issue a check.
// If a resource does not require a check, simply place on the toPublish queue.
toCheck := mapz.NewMultiMap[string, possibleResource]()

toProcess := crs.rq.selectResourcesToProcess(alwaysProcess)
if len(toProcess) == 0 {
return false, nil
Expand All @@ -426,11 +447,6 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error)
toCheck.Add(current.reachableResult.Resource.ResourceId, current)
}

if toCheck.IsEmpty() {
crs.availableForPublishing <- true
return true, nil
}

// Issue the bulk check over all the resources.
results, checkResultMetadata, err := computed.ComputeBulkCheck(
crs.ctx,
Expand Down Expand Up @@ -551,8 +567,8 @@ func (crs *checkingResourceStream) spawnIfAvailable() {
}
}

// queue queues a reachable resources result to be processed by one of the processing worker(s), before publishing.
func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesResponse) bool {
// Publish implements the Stream interface and is invoked by the ReachableResources call.
func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error {
currentResource := possibleResource{
reachableResult: result,
lookupResult: nil,
Expand All @@ -573,48 +589,54 @@ func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesRe
}
}

crs.rq.addPossibleResource(currentResource)
crs.reachableResourcesCount++
crs.lastResourceCursor = result.AfterResponseCursor
crs.lastReachableResourceCursor = result.AfterResponseCursor

status := crs.rq.addPossibleResource(currentResource)

switch status {
case publishDirectly:
// If the resource found already has permission (i.e. a check is not required), immediately
// publish it, rather than going through a processing worker. This saves a step for better
// performance.
if result.Resource.ResultStatus != v1.ReachableResource_HAS_PERMISSION {
return spiceerrors.MustBugf("got invalid resource for publish directly")
}

// If the resource found already has permission (i.e. a check is not required), immediately
// publish it, rather than going through a processing worker. This saves a step for better
// performance.
if result.Resource.ResultStatus == v1.ReachableResource_HAS_PERMISSION {
select {
case crs.availableForPublishing <- true:
return true
return nil

case <-crs.reachableContext.Done():
return false
return nil

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false
return nil
}
} else {

case awaitingMoreResources:
// If an insufficient amount of resources have been collected for Checking, we're done.
return nil

case readyForProcessing:
// Otherwise, kick off a worker to process the resources.
select {
case crs.reachableResourceAvailable <- struct{}{}:
return true
case crs.reachableResourcesAreAvailableForProcessing <- struct{}{}:
crs.spawnIfAvailable()
return nil

case <-crs.reachableContext.Done():
return false
return nil

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false
return nil
}
}
}

// Publish implements the Stream interface and is invoked by the ReachableResources call.
func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error {
// Queue the result to be processed by the parallel workers.
wasQueued := crs.queue(result)
if wasQueued {
crs.spawnIfAvailable()
default:
return spiceerrors.MustBugf("unknown resource add state")
}
return nil
}

// Context implements the Stream interface.
Expand Down

0 comments on commit 765fcf4

Please sign in to comment.