Skip to content

Commit

Permalink
Merge pull request #1490 from josephschorr/checking-resource-improvem…
Browse files Browse the repository at this point in the history
…ents

Significantly reduce the number of goroutines used in the checking stream
  • Loading branch information
ecordell authored Aug 10, 2023
2 parents 3e9b5b8 + 765fcf4 commit 3b37d79
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions internal/graph/checkingresourcestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,31 @@ type resourceQueue struct {
beingProcessed map[uint64]possibleResource
}

type processingStatus int

const (
publishDirectly processingStatus = iota
awaitingMoreResources
readyForProcessing
)

// addPossibleResource queues a resource for processing (if a check is required) or for
// immediate publishing (if a check is not required).
func (rq *resourceQueue) addPossibleResource(pr possibleResource) {
func (rq *resourceQueue) addPossibleResource(pr possibleResource) processingStatus {
rq.lock.Lock()
defer rq.lock.Unlock()

if pr.lookupResult != nil {
rq.toPublish[pr.orderingIndex] = pr
} else {
rq.toProcess[pr.orderingIndex] = pr
return publishDirectly
}

rq.toProcess[pr.orderingIndex] = pr
if len(rq.toProcess) < int(datastore.FilterMaximumIDCount) {
return awaitingMoreResources
}

return readyForProcessing
}

// updateToBePublished marks a resource as ready for publishing.
Expand Down Expand Up @@ -155,9 +169,9 @@ type checkingResourceStream struct {
// rq is the resourceQueue for managing the state of all resources returned by the reachable resources call.
rq *resourceQueue

// reachableResourceAvailable is a channel which indicates to the processing worker(s) that work is available
// reachableResourcesAreAvailableForProcessing is a channel which indicates to the processing worker(s) that work is available
// for processing.
reachableResourceAvailable chan struct{}
reachableResourcesAreAvailableForProcessing chan struct{}

// reachableResourcesCompleted is a channel used to indicate to each processing worker that reachable resources has
// been completed, and that all further processing work should be done before shutting down.
Expand All @@ -179,9 +193,9 @@ type checkingResourceStream struct {
// and waitForPublishing() (after reachable resources has completed).
reachableResourcesCount uint64

// lastResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from queue()
// and waitForPublishing() (after reachable resources has completed).
lastResourceCursor *v1.Cursor
// lastReachableResourceCursor is the cursor from the last received reachable resource result. Should *only* be accessed from
// Publish() and waitForPublishing() (after reachable resources has completed).
lastReachableResourceCursor *v1.Cursor

// dispatchesToBeReported is the number of dispatches that were skipped from being reported due
// to a resource being filtered, and whose count has to be attached to the next outgoing result.
Expand Down Expand Up @@ -241,9 +255,9 @@ func newCheckingResourceStream(
beingProcessed: map[uint64]possibleResource{},
toPublish: map[uint64]possibleResource{},
},
reachableResourceAvailable: make(chan struct{}, concurrencyLimit),
reachableResourcesCompleted: make(chan struct{}, concurrencyLimit),
availableForPublishing: make(chan bool, concurrencyLimit),
reachableResourcesAreAvailableForProcessing: make(chan struct{}, concurrencyLimit),
reachableResourcesCompleted: make(chan struct{}, concurrencyLimit),
availableForPublishing: make(chan bool, concurrencyLimit),

orderingIndexToBePublished: 0,
reachableResourcesCount: 0,
Expand All @@ -269,9 +283,15 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro
crs.reachableResourcesCompleted <- struct{}{}
}

// Wait for all processing to complete.
// Wait for all existing processing to complete.
crs.processingWaitGroup.Wait()

// Run a final processing call to ensure there are no remaining items.
_, err := crs.runProcess(true)
if err != nil {
return 0, nil, err
}

// Mark publishing as ready for final publishing.
select {
case crs.availableForPublishing <- false:
Expand All @@ -285,7 +305,7 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro
// Wait for any remaining publishing to complete.
crs.publishingWaitGroup.Wait()

return crs.reachableResourcesCount, crs.lastResourceCursor, crs.err
return crs.reachableResourcesCount, crs.lastReachableResourceCursor, crs.err
}

// resourcePublisher is the goroutine that publishes resources to the parent stream once they've been
Expand Down Expand Up @@ -393,7 +413,7 @@ func (crs *checkingResourceStream) process() {
}
return

case <-crs.reachableResourceAvailable:
case <-crs.reachableResourcesAreAvailableForProcessing:
for {
ok, err := crs.runProcess(false)
if err != nil {
Expand All @@ -413,6 +433,7 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error)
// Collect any resources that need to be checked, up to the configured limit, and issue a check.
// If a resource does not require a check, simply place on the toPublish queue.
toCheck := mapz.NewMultiMap[string, possibleResource]()

toProcess := crs.rq.selectResourcesToProcess(alwaysProcess)
if len(toProcess) == 0 {
return false, nil
Expand All @@ -426,11 +447,6 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error)
toCheck.Add(current.reachableResult.Resource.ResourceId, current)
}

if toCheck.IsEmpty() {
crs.availableForPublishing <- true
return true, nil
}

// Issue the bulk check over all the resources.
results, checkResultMetadata, err := computed.ComputeBulkCheck(
crs.ctx,
Expand Down Expand Up @@ -551,8 +567,8 @@ func (crs *checkingResourceStream) spawnIfAvailable() {
}
}

// queue queues a reachable resources result to be processed by one of the processing worker(s), before publishing.
func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesResponse) bool {
// Publish implements the Stream interface and is invoked by the ReachableResources call.
func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error {
currentResource := possibleResource{
reachableResult: result,
lookupResult: nil,
Expand All @@ -573,48 +589,54 @@ func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesRe
}
}

crs.rq.addPossibleResource(currentResource)
crs.reachableResourcesCount++
crs.lastResourceCursor = result.AfterResponseCursor
crs.lastReachableResourceCursor = result.AfterResponseCursor

status := crs.rq.addPossibleResource(currentResource)

switch status {
case publishDirectly:
// If the resource found already has permission (i.e. a check is not required), immediately
// publish it, rather than going through a processing worker. This saves a step for better
// performance.
if result.Resource.ResultStatus != v1.ReachableResource_HAS_PERMISSION {
return spiceerrors.MustBugf("got invalid resource for publish directly")
}

// If the resource found already has permission (i.e. a check is not required), immediately
// publish it, rather than going through a processing worker. This saves a step for better
// performance.
if result.Resource.ResultStatus == v1.ReachableResource_HAS_PERMISSION {
select {
case crs.availableForPublishing <- true:
return true
return nil

case <-crs.reachableContext.Done():
return false
return nil

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false
return nil
}
} else {

case awaitingMoreResources:
// If an insufficient amount of resources have been collected for Checking, we're done.
return nil

case readyForProcessing:
// Otherwise, kick off a worker to process the resources.
select {
case crs.reachableResourceAvailable <- struct{}{}:
return true
case crs.reachableResourcesAreAvailableForProcessing <- struct{}{}:
crs.spawnIfAvailable()
return nil

case <-crs.reachableContext.Done():
return false
return nil

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false
return nil
}
}
}

// Publish implements the Stream interface and is invoked by the ReachableResources call.
func (crs *checkingResourceStream) Publish(result *v1.DispatchReachableResourcesResponse) error {
// Queue the result to be processed by the parallel workers.
wasQueued := crs.queue(result)
if wasQueued {
crs.spawnIfAvailable()
default:
return spiceerrors.MustBugf("unknown resource add state")
}
return nil
}

// Context implements the Stream interface.
Expand Down

0 comments on commit 3b37d79

Please sign in to comment.