diff --git a/pkg/cache/draft.go b/pkg/cache/draft.go index cb8e5bca..6510783d 100644 --- a/pkg/cache/draft.go +++ b/pkg/cache/draft.go @@ -16,6 +16,7 @@ package cache import ( "context" + "fmt" "github.com/nephio-project/porch/pkg/repository" ) @@ -28,9 +29,20 @@ type cachedDraft struct { var _ repository.PackageDraft = &cachedDraft{} func (cd *cachedDraft) Close(ctx context.Context) (repository.PackageRevision, error) { - if closed, err := cd.PackageDraft.Close(ctx); err != nil { + closed, err := cd.PackageDraft.Close(ctx) + if err != nil { return nil, err - } else { - return cd.cache.update(ctx, closed) } + + err = cd.cache.reconcileCache(ctx, "close-draft") + if err != nil { + return nil, err + } + + cpr := cd.cache.getPackageRevision(closed.Key()) + if cpr == nil { + return nil, fmt.Errorf("closed draft not found") + } + + return cpr, nil } diff --git a/pkg/cache/repository.go b/pkg/cache/repository.go index d825d5f8..acc34567 100644 --- a/pkg/cache/repository.go +++ b/pkg/cache/repository.go @@ -54,16 +54,30 @@ type cachedRepository struct { lastVersion string - mutex sync.Mutex + // We use separate mutexes for cache map changes and for the overall + // reconcile process. We want update, delete, and reconcile + // to all block on the reconcileMutex, which could be held for a long time + // during reconcile. For much of that time (during reconcile) we do NOT + // want to block reads. There are a few protected areas where we touch map + // entries where we need to block reads, so those will also grab the general + // mutex. + // + // Any code that needs to hold both locks MUST get the reconcileMutex first, + // or we could end up with deadlocks + mutex sync.RWMutex + reconcileMutex sync.Mutex cachedPackageRevisions map[repository.PackageRevisionKey]*cachedPackageRevision - cachedPackages map[repository.PackageKey]*cachedPackage + + // not ideal but this is another cache, used by the underlying storage to avoid + // reloading. Would be best to combine these somehow, but not doing that now. + // Eventual CRD-based redesign should make this entire repo cache obsolete + packageRevisionCache repository.PackageRevisionCache // TODO: Currently we support repositories with homogenous content (only packages xor functions). Model this more optimally? cachedFunctions []repository.Function // Error encountered on repository refresh by the refresh goroutine. // This is returned back by the cache to the background goroutine when it calls periodicall to resync repositories. refreshRevisionsError error - refreshPkgsError error objectNotifier objectNotifier @@ -88,12 +102,16 @@ func newRepository(id string, repoSpec *configapi.Repository, repo repository.Re return r } +func (r *cachedRepository) nn() string { + return r.repoSpec.Namespace + "/" + r.repoSpec.Name +} + func (r *cachedRepository) Version(ctx context.Context) (string, error) { return r.repo.Version(ctx) } func (r *cachedRepository) ListPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter) ([]repository.PackageRevision, error) { - packages, err := r.getPackageRevisions(ctx, filter, false) + packages, err := r.getPackageRevisions(ctx, filter) if err != nil { return nil, err } @@ -109,21 +127,23 @@ func (r *cachedRepository) ListFunctions(ctx context.Context) ([]repository.Func return functions, nil } -func (r *cachedRepository) getRefreshError() error { - r.mutex.Lock() - defer r.mutex.Unlock() +func (r *cachedRepository) getPackageRevision(key repository.PackageRevisionKey) *cachedPackageRevision { + r.mutex.RLock() + defer r.mutex.RUnlock() + + cpr, _ := r.cachedPackageRevisions[key] + return cpr +} - // TODO: This should also check r.refreshPkgsError when - // the package resource is fully supported. +func (r *cachedRepository) getRefreshError() error { + r.mutex.RLock() + defer r.mutex.RUnlock() return r.refreshRevisionsError } -func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter, forceRefresh bool) ([]repository.PackageRevision, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - _, packageRevisions, err := r.getCachedPackages(ctx, forceRefresh) +func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter) ([]repository.PackageRevision, error) { + packageRevisions, err := r.getCachedPackageRevisions(ctx) if err != nil { return nil, err } @@ -131,44 +151,44 @@ func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repos return toPackageRevisionSlice(packageRevisions, filter), nil } -func (r *cachedRepository) getPackages(ctx context.Context, filter repository.ListPackageFilter, forceRefresh bool) ([]repository.Package, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - packages, _, err := r.getCachedPackages(ctx, forceRefresh) +// getCachedPackageRevisions returns the cache contents, blocking until +// the cache is loaded +// caller must NOT hold the lock +// returned *map* is a copy and can be operated on without locks +// map entries are NOT copies and should not be modified +func (r *cachedRepository) getCachedPackageRevisions(ctx context.Context) (map[repository.PackageRevisionKey]*cachedPackageRevision, error) { + err := r.blockUntilLoaded(ctx) if err != nil { return nil, err } - return toPackageSlice(packages, filter), nil + r.mutex.RLock() + defer r.mutex.RUnlock() + + packageRevisions := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(r.cachedPackageRevisions)) + for k, v := range r.cachedPackageRevisions { + packageRevisions[k] = v + } + + return packageRevisions, r.refreshRevisionsError } -// getCachedPackages returns cachedPackages; fetching it if not cached or if forceRefresh. -// mutex must be held. -func (r *cachedRepository) getCachedPackages(ctx context.Context, forceRefresh bool) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) { - // must hold mutex - packages := r.cachedPackages - packageRevisions := r.cachedPackageRevisions - err := r.refreshRevisionsError - - if forceRefresh { - packages = nil - packageRevisions = nil - - if gitRepo, isGitRepo := r.repo.(git.GitRepository); isGitRepo { - // TODO: Figure out a way to do this without the cache layer - // needing to know what type of repo we are working with. - if err := gitRepo.UpdateDeletionProposedCache(); err != nil { - return nil, nil, err +// blocks waiting until the cache is loaded +func (r *cachedRepository) blockUntilLoaded(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("repo %s: stopped waiting for load because context is done: %v", r.nn(), ctx.Err()) + default: + r.mutex.RLock() + if r.cachedPackageRevisions != nil { + r.mutex.RUnlock() + return nil } + r.mutex.RUnlock() + time.Sleep(1 * time.Second) } } - - if packages == nil { - packages, packageRevisions, err = r.refreshAllCachedPackages(ctx) - } - - return packages, packageRevisions, err } func (r *cachedRepository) getFunctions(ctx context.Context, force bool) ([]repository.Function, error) { @@ -206,6 +226,11 @@ func (r *cachedRepository) CreatePackageRevision(ctx context.Context, obj *v1alp return nil, err } + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "create"); err != nil { + klog.Warningf("error reconciling cache after creating %v in %s: %v", created, r.nn(), err) + } + return &cachedDraft{ PackageDraft: created, cache: r, @@ -220,48 +245,17 @@ func (r *cachedRepository) UpdatePackageRevision(ctx context.Context, old reposi return nil, err } + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "update"); err != nil { + klog.Warningf("error reconciling cache after updating %v in %s: %v", unwrapped.Key(), r.nn(), err) + } + return &cachedDraft{ PackageDraft: created, cache: r, }, nil } -func (r *cachedRepository) update(ctx context.Context, updated repository.PackageRevision) (*cachedPackageRevision, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - // TODO: Technically we only need this package, not all packages - if _, _, err := r.getCachedPackages(ctx, false); err != nil { - klog.Warningf("failed to get cached packages: %v", err) - // TODO: Invalidate all watches? We're dropping an add/update event - return nil, err - } - - k := updated.Key() - // previous := r.cachedPackageRevisions[k] - - if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) { - oldKey := repository.PackageRevisionKey{ - Repository: k.Repository, - Package: k.Package, - WorkspaceName: k.WorkspaceName, - } - if _, ok := r.cachedPackageRevisions[oldKey]; ok { - delete(r.cachedPackageRevisions, oldKey) - } - } - - cached := &cachedPackageRevision{PackageRevision: updated} - r.cachedPackageRevisions[k] = cached - - // Recompute latest package revisions. - // TODO: Just updated package? - identifyLatestRevisions(r.cachedPackageRevisions) - - // TODO: Update the latest revisions for the r.cachedPackages - return cached, nil -} - func (r *cachedRepository) DeletePackageRevision(ctx context.Context, old repository.PackageRevision) error { // Unwrap unwrapped := old.(*cachedPackageRevision).PackageRevision @@ -269,52 +263,35 @@ func (r *cachedRepository) DeletePackageRevision(ctx context.Context, old reposi return err } - r.mutex.Lock() - if r.cachedPackages != nil { - k := old.Key() - // previous := r.cachedPackages[k] - delete(r.cachedPackageRevisions, k) - - // Recompute latest package revisions. - // TODO: Only for affected object / key? - identifyLatestRevisions(r.cachedPackageRevisions) + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "delete"); err != nil { + klog.Warningf("error reconciling cache after deleting %v in %s: %v", unwrapped.Key(), r.nn(), err) } - r.mutex.Unlock() - return nil } func (r *cachedRepository) ListPackages(ctx context.Context, filter repository.ListPackageFilter) ([]repository.Package, error) { - packages, err := r.getPackages(ctx, filter, false) - if err != nil { - return nil, err - } - - return packages, nil + return nil, fmt.Errorf("not implemented") } func (r *cachedRepository) CreatePackage(ctx context.Context, obj *v1alpha1.Package) (repository.Package, error) { - klog.Infoln("cachedRepository::CreatePackage") - return r.repo.CreatePackage(ctx, obj) + return nil, fmt.Errorf("not implemented") } func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Package) error { - // Unwrap - unwrapped := old.(*cachedPackage).Package - if err := r.repo.DeletePackage(ctx, unwrapped); err != nil { - return err - } - - // TODO: Do something more efficient than a full cache flush - r.flush() - - return nil + return fmt.Errorf("not implemented") } func (r *cachedRepository) Close() error { r.cancel() + r.reconcileMutex.Lock() + defer r.reconcileMutex.Unlock() + + r.mutex.Lock() + defer r.mutex.Unlock() + // Make sure that watch events are sent for packagerevisions that are // removed as part of closing the repository. sent := 0 @@ -326,13 +303,15 @@ func (r *cachedRepository) Close() error { // There isn't really any correct way to handle finalizers here. We are removing // the repository, so we have to just delete the PackageRevision regardless of any // finalizers. - klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name) + klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.nn(), nn.Namespace, nn.Name) pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true) if err != nil { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. - klog.Warningf("Error looking up PackageRev CR for %s: %v") + if !apierrors.IsNotFound(err) { + klog.Warningf("Error deleting PackageRev CR %s/%s: %s", nn.Namespace, nn.Name, err) + } pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, @@ -340,7 +319,7 @@ func (r *cachedRepository) Close() error { } sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } - klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) + klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.nn(), sent, len(r.cachedPackageRevisions)) return r.repo.Close() } @@ -350,7 +329,7 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti for { select { case <-ctx.Done(): - klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err()) + klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.nn(), ctx.Err()) return default: r.pollOnce(ctx) @@ -360,77 +339,90 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti } func (r *cachedRepository) pollOnce(ctx context.Context) { - start := time.Now() - klog.Infof("repo %s: poll started", r.id) - defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }() ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes()) defer span.End() - if _, err := r.getPackageRevisions(ctx, repository.ListPackageRevisionFilter{}, true); err != nil { - klog.Warningf("error polling repo packages %s: %v", r.id, err) + if err := r.reconcileCache(ctx, "poll"); err != nil { + klog.Warningf("error polling repo packages %s: %v", r.nn(), err) } - // TODO: Uncomment when package resources are fully supported - //if _, err := r.getPackages(ctx, repository.ListPackageRevisionFilter{}, true); err != nil { - // klog.Warningf("error polling repo packages %s: %v", r.id, err) - //} if _, err := r.getFunctions(ctx, true); err != nil { - klog.Warningf("error polling repo functions %s: %v", r.id, err) + klog.Warningf("error polling repo functions %s: %v", r.nn(), err) } } -func (r *cachedRepository) flush() { - r.mutex.Lock() - defer r.mutex.Unlock() - - r.cachedPackageRevisions = nil - r.cachedPackages = nil -} - -// refreshAllCachedPackages updates the cached map for this repository with all the newPackages, -// it also triggers notifications for all package changes. -// mutex must be held. -func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) { - // TODO: Avoid simultaneous fetches? - // TODO: Push-down partial refresh? +// reconcileCache updates the cached map for this repository +// it also triggers notifications for all package changes +// caller must NOT hold any locks +func (r *cachedRepository) reconcileCache(ctx context.Context, reason string) error { + // if this is not a package repo, just set the cache to "loaded" and return + if r.repoSpec.Spec.Content != configapi.RepositoryContentPackage { + r.mutex.Lock() + defer r.mutex.Unlock() + if r.cachedPackageRevisions != nil { + return nil + } + r.cachedPackageRevisions = make(map[repository.PackageRevisionKey]*cachedPackageRevision) + return nil + } start := time.Now() - defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }() + defer func() { + klog.Infof("repo %s: reconcile for %s finished in %f secs", r.nn(), reason, time.Since(start).Seconds()) + }() curVer, err := r.Version(ctx) if err != nil { - return nil, nil, err + return err } if curVer == r.lastVersion { - return r.cachedPackages, r.cachedPackageRevisions, nil + return nil + } + + // get the reconcile lock first, to block any repo-level mutations + r.reconcileMutex.Lock() + defer r.reconcileMutex.Unlock() + + if gitRepo, isGitRepo := r.repo.(git.GitRepository); isGitRepo { + // TODO: Figure out a way to do this without the cache layer + // needing to know what type of repo we are working with. + if err := gitRepo.UpdateDeletionProposedCache(); err != nil { + return err + } } - // Look up all existing PackageRevCRs so we an compare those to the - // actual Packagerevisions found in git/oci, and add/prune PackageRevCRs + // Look up all existing PackageRevCRs so we can compare those to the + // actual PackageRevisions found in git/oci, and add/prune PackageRevCRs // as necessary. existingPkgRevCRs, err := r.metadataStore.List(ctx, r.repoSpec) if err != nil { - return nil, nil, err + return err } + // Create a map so we can quickly check if a specific PackageRevisionMeta exists. - existingPkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) + pkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) for i := range existingPkgRevCRs { pr := existingPkgRevCRs[i] - existingPkgRevCRsMap[pr.Name] = pr + pkgRevCRsMap[pr.Name] = pr } - // TODO: Can we avoid holding the lock for the ListPackageRevisions / identifyLatestRevisions section? - newPackageRevisions, err := r.repo.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{}) + ctxWithCache := repository.ContextWithPackageRevisionCache(ctx, r.packageRevisionCache) + newPackageRevisions, err := r.repo.ListPackageRevisions(ctxWithCache, repository.ListPackageRevisionFilter{}) if err != nil { - return nil, nil, fmt.Errorf("error listing packages: %w", err) + return fmt.Errorf("error listing packages: %w", err) } // Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions. + // and also recreate packageRevisionCache + prc := make(repository.PackageRevisionCache, len(newPackageRevisions)) newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions)) for _, newPackage := range newPackageRevisions { + cid := newPackage.CachedIdentifier() + prc[cid.Key] = repository.PackageRevisionCacheEntry{Version: cid.Version, PackageRevision: newPackage} + kname := newPackage.KubeObjectName() if newPackageRevisionNames[kname] != nil { - klog.Warningf("repo %s: found duplicate packages with name %v", kname) + klog.Warningf("repo %s: found duplicate packages with name %v", r.nn(), kname) } pkgRev := &cachedPackageRevision{ @@ -441,18 +433,23 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re } // Build mapping from kubeObjectName to PackageRevisions for existing PackageRevisions + // Grab the RLock while we create this map + r.mutex.RLock() oldPackageRevisionNames := make(map[string]*cachedPackageRevision, len(r.cachedPackageRevisions)) for _, oldPackage := range r.cachedPackageRevisions { oldPackageRevisionNames[oldPackage.KubeObjectName()] = oldPackage } + r.mutex.RUnlock() + + addMeta := 0 + delMeta := 0 // We go through all PackageRev CRs that represents PackageRevisions // in the current repo and make sure they all have a corresponding // PackageRevision. The ones that doesn't is removed. for _, prm := range existingPkgRevCRs { if _, found := newPackageRevisionNames[prm.Name]; !found { - klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found", - r.id, prm.Namespace, prm.Name) + delMeta += 1 if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{ Name: prm.Name, Namespace: prm.Namespace, @@ -460,7 +457,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re if !apierrors.IsNotFound(err) { // This will be retried the next time the sync runs. klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w", - r.id, prm.Name, prm.Namespace, err) + r.nn(), prm.Name, prm.Namespace, err) } } } @@ -469,28 +466,55 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re // We go through all the PackageRevisions and make sure they have // a corresponding PackageRev CR. for pkgRevName, pkgRev := range newPackageRevisionNames { - if _, found := existingPkgRevCRsMap[pkgRevName]; !found { + if _, found := pkgRevCRsMap[pkgRevName]; !found { pkgRevMeta := meta.PackageRevisionMeta{ Name: pkgRevName, Namespace: r.repoSpec.Namespace, } - if _, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { + addMeta += 1 + if created, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { // TODO: We should try to find a way to make these errors available through // either the repository CR or the PackageRevision CR. This will be // retried on the next sync. klog.Warningf("unable to create PackageRev CR for %s/%s: %w", r.repoSpec.Namespace, pkgRevName, err) + } else { + // add to the cache for notifications later + pkgRevCRsMap[pkgRevName] = created } } } + // fix up the isLatestRevision in the new maps + newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions)) + for _, newPackage := range newPackageRevisions { + k := newPackage.Key() + pkgRev := &cachedPackageRevision{ + PackageRevision: newPackage, + isLatestRevision: false, + } + newPackageRevisionMap[k] = pkgRev + } + + identifyLatestRevisions(newPackageRevisionMap) + + // hold the RW lock while swap in the new packages + // we do this now, *before* sending notifications, so that + // anyone responding to the notification will get the new values + r.mutex.Lock() + r.cachedPackageRevisions = newPackageRevisionMap + r.packageRevisionCache = prc + r.lastVersion = curVer + r.mutex.Unlock() + // Send notification for packages that changed. addSent := 0 modSent := 0 for kname, newPackage := range newPackageRevisionNames { oldPackage := oldPackageRevisionNames[kname] - metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()] + metaPackage, found := pkgRevCRsMap[newPackage.KubeObjectName()] if !found { + // should never happen klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName()) } if oldPackage == nil { @@ -503,57 +527,16 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re } delSent := 0 - // Send notifications for packages that was deleted in the SoT + // Send notifications for packages that were deleted in the SoT for kname, oldPackage := range oldPackageRevisionNames { if newPackageRevisionNames[kname] == nil { - nn := types.NamespacedName{ + metaPackage := meta.PackageRevisionMeta{ Name: oldPackage.KubeObjectName(), Namespace: oldPackage.KubeObjectNamespace(), } - klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT", - r.id, nn.Namespace, nn.Name) - metaPackage, err := r.metadataStore.Delete(ctx, nn, true) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err) - } - metaPackage = meta.PackageRevisionMeta{ - Name: nn.Name, - Namespace: nn.Namespace, - } - } delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) } } - klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) - - newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions)) - for _, newPackage := range newPackageRevisions { - k := newPackage.Key() - pkgRev := &cachedPackageRevision{ - PackageRevision: newPackage, - isLatestRevision: false, - } - newPackageRevisionMap[k] = pkgRev - } - - identifyLatestRevisions(newPackageRevisionMap) - - newPackageMap := make(map[repository.PackageKey]*cachedPackage) - - for _, newPackageRevision := range newPackageRevisionMap { - if !newPackageRevision.isLatestRevision { - continue - } - // TODO: Build package? - // newPackage := &cachedPackage{ - // } - // newPackageMap[newPackage.Key()] = newPackage - } - - r.cachedPackageRevisions = newPackageRevisionMap - r.cachedPackages = newPackageMap - r.lastVersion = curVer - - return newPackageMap, newPackageRevisionMap, nil + klog.Infof("repo %s: addMeta %d, delMeta %d, addSent %d, modSent %d, delSent %d for %d in-cache and %d in-storage package revisions", r.nn(), addMeta, delMeta, addSent, modSent, delSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) + return nil } diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 76bd2468..c7f0a14b 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -308,7 +308,7 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * return nil, fmt.Errorf("error listing package revisions: %w", err) } - if err := ensureUniqueWorkspaceName(obj, revs); err != nil { + if err := ensureUniqueWorkspaceName(repositoryObj, obj, revs); err != nil { return nil, err } @@ -351,9 +351,20 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * } // The workspaceName must be unique, because it used to generate the package revision's metadata.name. -func ensureUniqueWorkspaceName(obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { +func ensureUniqueWorkspaceName(repositoryObj *configapi.Repository, obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { + // HACK + // It's ok for the "main" revision to have the same workspace name + // So ignore main revisions in this calculation + mainRev := "" + if repositoryObj.Spec.Git != nil { + mainRev = repositoryObj.Spec.Git.Branch + } + for _, r := range existingRevs { k := r.Key() + if mainRev != "" && k.Revision == mainRev { + continue + } if k.WorkspaceName == obj.Spec.WorkspaceName { return fmt.Errorf("package revision workspaceNames must be unique; package revision with name %s in repo %s with "+ "workspaceName %s already exists", obj.Spec.PackageName, obj.Spec.RepositoryName, obj.Spec.WorkspaceName) diff --git a/pkg/engine/fake/packagerevision.go b/pkg/engine/fake/packagerevision.go index 697e9614..3b66027a 100644 --- a/pkg/engine/fake/packagerevision.go +++ b/pkg/engine/fake/packagerevision.go @@ -35,6 +35,10 @@ type PackageRevision struct { Kptfile kptfile.KptFile } +func (pr *PackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: pr.Key().String(), Version: pr.Key().Revision} +} + func (pr *PackageRevision) KubeObjectName() string { return pr.Name } diff --git a/pkg/engine/watchermanager.go b/pkg/engine/watchermanager.go index 5a2d74eb..9589efe2 100644 --- a/pkg/engine/watchermanager.go +++ b/pkg/engine/watchermanager.go @@ -65,6 +65,18 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos r.mutex.Lock() defer r.mutex.Unlock() + // reap any dead watchers + for i, watcher := range r.watchers { + if watcher == nil { + continue + } + if err := watcher.isDoneFunction(); err != nil { + klog.Infof("stopping watcher in reaper: %v", err) + r.watchers[i] = nil + continue + } + } + w := &watcher{ isDoneFunction: ctx.Err, callback: callback, diff --git a/pkg/git/git.go b/pkg/git/git.go index d86de001..e0921b3c 100644 --- a/pkg/git/git.go +++ b/pkg/git/git.go @@ -278,6 +278,14 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit mainBranch := r.branch.RefInLocal() // Looking for the registered branch + // if a cache is available, use it + cache := repository.PackageRevisionCacheFromContext(ctx) + draftCache := 0 + tagCache := 0 + mainCache := 0 + draftLoaded := 0 + tagLoaded := 0 + mainLoaded := 0 for { ref, err := refs.Next() if err == io.EOF { @@ -290,9 +298,27 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit continue case isProposedBranchNameInLocal(ref.Name()), isDraftBranchNameInLocal(ref.Name()): - draft, err := r.loadDraft(ctx, ref) - if err != nil { - return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + var draft *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + draft = dd + draftCache += 1 + } + } + } + + if draft == nil { + draft, err = r.loadDraft(ctx, ref) + if err != nil { + return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + } + if draft != nil { + draftLoaded += 1 + } } if draft != nil { drafts = append(drafts, draft) @@ -300,24 +326,63 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit klog.Warningf("no package draft found for ref %v", ref) } case isTagInLocalRepo(ref.Name()): - tagged, err := r.loadTaggedPackages(ctx, ref) - if err != nil { - // this tag is not associated with any package (e.g. could be a release tag) - continue + var tagged *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + tagged = dd + tagCache += 1 + } + } } - for _, p := range tagged { - if filter.Matches(p) { - result = append(result, p) + if tagged == nil { + tagged, err = r.loadTaggedPackage(ctx, ref) + if err != nil { + // this tag is not associated with any package (e.g. could be a release tag) + continue } + if tagged != nil { + tagLoaded += 1 + } + } + if tagged != nil && filter.Matches(tagged) { + result = append(result, tagged) } } } if main != nil { + // Look for any package whose cached identifier starts with main.Name() + // There will be one for each pacakge found in main, but they all will have the same + // hash. If that matches main.Hash() there is no change in main and so we can just + // copy all the packages rather than rediscovering. + var mainpkgs []*gitPackageRevision + for k, v := range cache { + if strings.Index(k, main.Name().String()) == 0 { + if v.Version != main.Hash().String() { + continue + } + gpr, ok := v.PackageRevision.(*gitPackageRevision) + if !ok { + klog.Warningf("Found current cached main package %s version %s, but it is not a gitPackageRevision", k, v.Version) + } else { + mainpkgs = append(mainpkgs, gpr) + mainCache += 1 + } + } + } + // TODO: ignore packages that are unchanged in main branch, compared to a tagged version? - mainpkgs, err := r.discoverFinalizedPackages(ctx, main) - if err != nil { - return nil, err + if len(mainpkgs) == 0 { + mp, err := r.discoverFinalizedPackages(ctx, main) + if err != nil { + return nil, err + } + mainpkgs = mp + mainLoaded = len(mainpkgs) } for _, p := range mainpkgs { if filter.Matches(p) { @@ -332,6 +397,8 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit } } + klog.Infof("repo %s/%s: %d draftCache, %d draftLoaded, %d tagCache, %d tagLoaded, %d mainCache, %d mainLoaded", r.namespace, r.name, + draftCache, draftLoaded, tagCache, tagLoaded, mainCache, mainLoaded) return result, nil } @@ -753,8 +820,8 @@ func parseDraftName(draft *plumbing.Reference) (name string, workspaceName v1alp return name, workspaceName, nil } -func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Reference) ([]*gitPackageRevision, error) { - ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackages", trace.WithAttributes()) +func (r *gitRepository) loadTaggedPackage(ctx context.Context, tag *plumbing.Reference) (*gitPackageRevision, error) { + ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackage", trace.WithAttributes()) defer span.End() name, ok := getTagNameInLocalRepo(tag.Name()) @@ -803,9 +870,7 @@ func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Re return nil, err } - return []*gitPackageRevision{ - packageRevision, - }, nil + return packageRevision, nil } @@ -1072,8 +1137,6 @@ func (r *gitRepository) pushAndCleanup(ctx context.Context, ph *pushRefSpecBuild return err } - klog.Infof("pushing refs: %v", specs) - if err := r.doGitWithAuth(ctx, func(auth transport.AuthMethod) error { return r.repo.Push(&git.PushOptions{ RemoteName: OriginName, @@ -1698,9 +1761,6 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov return nil, err } - if opt.FilterPrefix == "" { - klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash) - } return t, nil } diff --git a/pkg/git/package.go b/pkg/git/package.go index c1e44082..f5d9c899 100644 --- a/pkg/git/package.go +++ b/pkg/git/package.go @@ -78,6 +78,18 @@ func (p *gitPackageRevision) UID() types.UID { return p.uid() } +func (p *gitPackageRevision) CachedIdentifier() repository.CachedIdentifier { + if p.ref != nil { + k := p.ref.Name().String() + if p.revision == string(p.repo.branch) { + k += ":" + p.path + } + return repository.CachedIdentifier{Key: k, Version: p.ref.Hash().String()} + } + + return repository.CachedIdentifier{} +} + func (p *gitPackageRevision) ResourceVersion() string { return p.commit.String() } diff --git a/pkg/oci/oci.go b/pkg/oci/oci.go index bb57eee6..596a2919 100644 --- a/pkg/oci/oci.go +++ b/pkg/oci/oci.go @@ -402,6 +402,10 @@ type ociPackageRevision struct { lifecycle v1alpha1.PackageRevisionLifecycle } +func (p *ociPackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: p.packageName + ":" + string(p.workspaceName), Version: p.resourceVersion} +} + var _ repository.PackageRevision = &ociPackageRevision{} func (p *ociPackageRevision) GetResources(ctx context.Context) (*v1alpha1.PackageRevisionResources, error) { diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index 64deca28..4bbba603 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -48,6 +48,37 @@ func (n PackageKey) String() string { return fmt.Sprintf("Repository: %q, Package: %q", n.Repository, n.Package) } +// CachedIdentier is a used by a cache and underlying storage +// implementation to avoid unnecessary reloads +type CachedIdentifier struct { + // Key uniquely identifies the resource in the underlying storage + Key string + + // Version uniquely identifies the version of the resource in the underlying storage + Version string +} + +type PackageRevisionCacheEntry struct { + Version string + PackageRevision PackageRevision +} + +type PackageRevisionCache map[string]PackageRevisionCacheEntry + +type packageCacheKey struct{} + +func ContextWithPackageRevisionCache(ctx context.Context, cache PackageRevisionCache) context.Context { + return context.WithValue(ctx, packageCacheKey{}, cache) +} + +func PackageRevisionCacheFromContext(ctx context.Context) PackageRevisionCache { + cache, ok := ctx.Value(packageCacheKey{}).(PackageRevisionCache) + if !ok { + cache = make(PackageRevisionCache) + } + return cache +} + // PackageRevision is an abstract package version. // We have a single object for both Revision and Resources, because conceptually they are one object. // The best way we've found (so far) to represent them in k8s is as two resources, but they map to the same object. @@ -67,6 +98,9 @@ type PackageRevision interface { // Key returns the "primary key" of the package. Key() PackageRevisionKey + // CachedIdentier returns a unique identifer for this package revision and version + CachedIdentifier() CachedIdentifier + // Lifecycle returns the current lifecycle state of the package. Lifecycle() v1alpha1.PackageRevisionLifecycle diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index cf65752e..348c9670 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -2568,7 +2568,8 @@ func (t *PorchSuite) mustNotExist(ctx context.Context, obj client.Object) { // provided name and namespace is ready, i.e. the Ready condition is true. // It also queries for Functions and PackageRevisions, to ensure these are also // ready - this is an artifact of the way we've implemented the aggregated apiserver, -// where the first fetch can sometimes be synchronous. +// where the first fetch will block on the cache loading. Wait up to two minutes for the +// package revisions and functions. func (t *PorchSuite) waitUntilRepositoryReady(ctx context.Context, name, namespace string) { nn := types.NamespacedName{ Name: name, @@ -2593,7 +2594,7 @@ func (t *PorchSuite) waitUntilRepositoryReady(ctx context.Context, name, namespa } // While we're using an aggregated apiserver, make sure we can query the generated objects - if err := wait.PollImmediateWithContext(ctx, time.Second, 10*time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollImmediateWithContext(ctx, time.Second, 120*time.Second, func(ctx context.Context) (bool, error) { var revisions porchapi.PackageRevisionList if err := t.client.List(ctx, &revisions, client.InNamespace(nn.Namespace)); err != nil { innerErr = err @@ -2605,7 +2606,7 @@ func (t *PorchSuite) waitUntilRepositoryReady(ctx context.Context, name, namespa } // Check for functions also (until we move them to CRDs) - if err := wait.PollImmediateWithContext(ctx, time.Second, 10*time.Second, func(ctx context.Context) (bool, error) { + if err := wait.PollImmediateWithContext(ctx, time.Second, 120*time.Second, func(ctx context.Context) (bool, error) { var functions porchapi.FunctionList if err := t.client.List(ctx, &functions, client.InNamespace(nn.Namespace)); err != nil { innerErr = err