From 0ebb21fc49cc2760457bf0388fa2b4e4e685b6a7 Mon Sep 17 00:00:00 2001 From: Paul Maidment Date: Thu, 16 Jan 2025 10:49:11 +0200 Subject: [PATCH] [MGMT-14453](https://issues.redhat.com//browse/MGMT-14453): Fix bugs in the installer cache This PR is for the purpose of resolving multiple bugs within the installer cache, due to the poor condition of the current cache, it makes sense to fix this in a single PR. * https://issues.redhat.com/browse/MGMT-14452 Installer cache removes in-used cached image when out of space * https://issues.redhat.com/browse/MGMT-14453 INSTALLER_CACHE_CAPACITY small value cause to assisted-service crash * https://issues.redhat.com/browse/MGMT-14457 Installer cache - fails to install when running parallel with same version * Additionally, the cache did not respect limits, so this has been addressed here. Fixes: I have implemented fixes for each of the following issues. * Mutex was ineffective as not instantiated corrctly, leading to [MGMT-14452](https://issues.redhat.com//browse/MGMT-14452), [MGMT-14453](https://issues.redhat.com//browse/MGMT-14453). * Naming convention for hardlinks changed to be UUID based to resolve [MGMT-14457](https://issues.redhat.com//browse/MGMT-14457). * Any time we either extract or use a release, the modified time must be updated, not only for cached releases. This was causing premature pruning of hardlinks. * LRU cache order updated to be based on microseconds instead of seconds. * Eviction checks updated to consider max release size and also cache threshold. * We now check there is enough space before writing. * During eviction - releases without hard links will be evicted before releases with hard links. --- cmd/main.go | 30 +- internal/ignition/installmanifests.go | 8 +- internal/ignition/installmanifests_test.go | 162 ++++--- internal/installercache/installercache.go | 275 +++++++++--- .../installercache/installercache_test.go | 418 +++++++++++++++--- internal/metrics/disk_stats_helper.go | 14 +- internal/metrics/metricsManager_test.go | 2 +- internal/metrics/os_disk_stats_helper_test.go | 3 +- pkg/generator/generator.go | 33 +- 9 files changed, 747 insertions(+), 198 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 4c4dc08bca0..7bafcb94a50 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ import ( "time" "github.com/NYTimes/gziphandler" + "github.com/alecthomas/units" "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" @@ -39,6 +40,7 @@ import ( "github.com/openshift/assisted-service/internal/ignition" "github.com/openshift/assisted-service/internal/infraenv" installcfg "github.com/openshift/assisted-service/internal/installcfg/builder" + "github.com/openshift/assisted-service/internal/installercache" internaljson "github.com/openshift/assisted-service/internal/json" "github.com/openshift/assisted-service/internal/manifests" "github.com/openshift/assisted-service/internal/metrics" @@ -172,6 +174,18 @@ var Options struct { // EnableXattrFallback is a boolean flag to enable en emulated fallback methoid of xattr on systems that do not support xattr. EnableXattrFallback bool `envconfig:"ENABLE_XATTR_FALLBACK" default:"true"` + + // InstallerCacheCapacityGiB is the capacity of the installer cache in GiB + InstallerCacheCapacityGiB uint `envconfig:"INSTALLER_CACHE_CAPACITY_GIB" default:"0"` + + // InstallerCacheMaxReleaseSizeGiB is the expected maximum size of a single release in GiB + InstallerCacheMaxReleaseSizeGiB uint `envconfig:"INSTALLER_CACHE_MAX_RELEASE_SIZE_GIB" default:"2"` + + // InstallerCacheEvictionThresholdPercent is the percentage of capacity at which the cache will start to evict releases. + InstallerCacheEvictionThresholdPercent uint `envconfig:"INSTALLER_CACHE_EVICTION_THRESHOLD_PERCENT" default:"80"` + + // ReleaseFetchRetryIntervalSeconds is the number of seconds that the cache should wait before retrying the fetch of a release if unable to do so for capacity reasons. + ReleaseFetchRetryIntervalSeconds uint `envconfig:"INSTALLER_CACHE_RELEASE_FETCH_RETRY_INTERVAL_SECONDS" default:"30"` } func InitLogs(logLevel, logFormat string) *logrus.Logger { @@ -315,7 +329,8 @@ func main() { metricsManagerConfig := &metrics.MetricsManagerConfig{ DirectoryUsageMonitorConfig: metrics.DirectoryUsageMonitorConfig{ Directories: []string{Options.WorkDir}}} - metricsManager := metrics.NewMetricsManager(prometheusRegistry, eventsHandler, metrics.NewOSDiskStatsHelper(), metricsManagerConfig, log) + diskStatsHelper := metrics.NewOSDiskStatsHelper(logrus.New()) + metricsManager := metrics.NewMetricsManager(prometheusRegistry, eventsHandler, diskStatsHelper, metricsManagerConfig, log) if ocmClient != nil { //inject the metric server to the ocm client for purpose of //performance monitoring the calls to ACM. This could not be done @@ -488,7 +503,18 @@ func main() { failOnError(err, "failed to create valid bm config S3 endpoint URL from %s", Options.BMConfig.S3EndpointURL) Options.BMConfig.S3EndpointURL = newUrl - generator := generator.New(log, objectHandler, Options.GeneratorConfig, Options.WorkDir, providerRegistry, manifestsApi, eventsHandler) + installGeneratorDirectoryConfig := generator.InstallGeneratorDirectoryConfig{WorkDir: Options.WorkDir} + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(installGeneratorDirectoryConfig.GetWorkingDirectory(), "installercache"), + MaxCapacity: int64(Options.InstallerCacheCapacityGiB) * int64(units.GiB), + MaxReleaseSize: int64(Options.InstallerCacheMaxReleaseSizeGiB) * int64(units.GiB), + ReleaseFetchRetryInterval: time.Duration(Options.ReleaseFetchRetryIntervalSeconds) * time.Second, + InstallerCacheEvictionThreshold: float64(Options.InstallerCacheEvictionThresholdPercent) / 100, + } + installerCache, err := installercache.New(installerCacheConfig, eventsHandler, diskStatsHelper, log) + failOnError(err, "failed to instantiate installercache") + + generator := generator.New(installGeneratorDirectoryConfig, log, objectHandler, Options.GeneratorConfig, providerRegistry, manifestsApi, eventsHandler, installerCache) var crdUtils bminventory.CRDUtils if ctrlMgr != nil { crdUtils = controllers.NewCRDUtils(ctrlMgr.GetClient(), hostApi) diff --git a/internal/ignition/installmanifests.go b/internal/ignition/installmanifests.go index 88d0e77df49..6adf98d581d 100644 --- a/internal/ignition/installmanifests.go +++ b/internal/ignition/installmanifests.go @@ -85,7 +85,6 @@ type installerGenerator struct { cluster *common.Cluster releaseImage string releaseImageMirror string - installerDir string serviceCACert string encodedDhcpFileContents string s3Client s3wrapper.API @@ -110,16 +109,15 @@ var fileNames = [...]string{ } // NewGenerator returns a generator that can generate ignition files -func NewGenerator(workDir string, installerDir string, cluster *common.Cluster, releaseImage string, releaseImageMirror string, +func NewGenerator(workDir string, cluster *common.Cluster, releaseImage string, releaseImageMirror string, serviceCACert string, installInvoker string, s3Client s3wrapper.API, log logrus.FieldLogger, providerRegistry registry.ProviderRegistry, - installerReleaseImageOverride, clusterTLSCertOverrideDir string, storageCapacityLimit int64, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler) Generator { + installerReleaseImageOverride, clusterTLSCertOverrideDir string, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler, installerCache *installercache.Installers) Generator { return &installerGenerator{ cluster: cluster, log: log, releaseImage: releaseImage, releaseImageMirror: releaseImageMirror, workDir: workDir, - installerDir: installerDir, serviceCACert: serviceCACert, s3Client: s3Client, enableMetal3Provisioning: true, @@ -127,7 +125,7 @@ func NewGenerator(workDir string, installerDir string, cluster *common.Cluster, providerRegistry: providerRegistry, installerReleaseImageOverride: installerReleaseImageOverride, clusterTLSCertOverrideDir: clusterTLSCertOverrideDir, - installerCache: installercache.New(installerDir, storageCapacityLimit, eventsHandler, log), + installerCache: installerCache, manifestApi: manifestApi, } } diff --git a/internal/ignition/installmanifests_test.go b/internal/ignition/installmanifests_test.go index 04061852eb7..97c212ac1dd 100644 --- a/internal/ignition/installmanifests_test.go +++ b/internal/ignition/installmanifests_test.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "time" config_32 "github.com/coreos/ignition/v2/config/v3_2" config_32_types "github.com/coreos/ignition/v2/config/v3_2/types" @@ -25,7 +26,9 @@ import ( "github.com/openshift/assisted-service/internal/constants" eventsapi "github.com/openshift/assisted-service/internal/events/api" "github.com/openshift/assisted-service/internal/host/hostutil" + "github.com/openshift/assisted-service/internal/installercache" manifestsapi "github.com/openshift/assisted-service/internal/manifests/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/network" "github.com/openshift/assisted-service/models" "github.com/openshift/assisted-service/pkg/s3wrapper" @@ -78,29 +81,37 @@ var _ = Describe("Bootstrap Ignition Update", func() { }` var ( - err error - examplePath string - db *gorm.DB - dbName string - bmh *bmh_v1alpha1.BareMetalHost - config *config_32_types.Config - mockS3Client *s3wrapper.MockAPI - workDir string - cluster *common.Cluster - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + err error + examplePath string + db *gorm.DB + dbName string + bmh *bmh_v1alpha1.BareMetalHost + config *config_32_types.Config + mockS3Client *s3wrapper.MockAPI + workDir string + cluster *common.Cluster + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler *eventsapi.MockHandler + installerCache *installercache.Installers ) BeforeEach(func() { // setup temp workdir workDir, err = os.MkdirTemp("", "bootstrap-ignition-update-test-") Expect(err).NotTo(HaveOccurred()) + Expect(err).NotTo(HaveOccurred()) examplePath = filepath.Join(workDir, "example1.ign") var err1 error err1 = os.WriteFile(examplePath, []byte(bootstrap1), 0600) Expect(err1).NotTo(HaveOccurred()) ctrl = gomock.NewController(GinkgoT()) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) mockS3Client = s3wrapper.NewMockAPI(ctrl) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) @@ -113,7 +124,7 @@ var _ = Describe("Bootstrap Ignition Update", func() { }, } db, dbName = common.PrepareTestDB() - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) Expect(g.updateBootstrap(context.Background(), examplePath)).To(Succeed()) @@ -241,16 +252,17 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 -----END CERTIFICATE-----` var ( - masterPath string - workerPath string - caCertPath string - dbName string - db *gorm.DB - cluster *common.Cluster - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + masterPath string + workerPath string + caCertPath string + dbName string + db *gorm.DB + cluster *common.Cluster + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -274,6 +286,13 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -283,7 +302,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 Describe("update ignitions", func() { It("with ca cert file", func() { - g := NewGenerator(workDir, "", cluster, "", "", caCertPath, "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", caCertPath, "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.updateIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -305,7 +324,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 Expect(file.Path).To(Equal(common.HostCACertPath)) }) It("with no ca cert file", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.updateIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -324,7 +343,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 }) Context("DHCP generation", func() { It("Definitions only", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.encodedDhcpFileContents = "data:,abc" err := g.updateIgnitions() @@ -342,7 +361,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 }) }) It("Definitions+leases", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.encodedDhcpFileContents = "data:,abc" cluster.ApiVipLease = "api" @@ -429,14 +448,15 @@ var _ = Describe("createHostIgnitions", func() { }` var ( - dbName string - db *gorm.DB - mockS3Client *s3wrapper.MockAPI - cluster *common.Cluster - ctrl *gomock.Controller - workDir string - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + dbName string + db *gorm.DB + mockS3Client *s3wrapper.MockAPI + cluster *common.Cluster + ctrl *gomock.Controller + workDir string + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -457,6 +477,13 @@ var _ = Describe("createHostIgnitions", func() { manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) cluster = testCluster() + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -492,7 +519,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -546,7 +573,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -590,7 +617,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -638,7 +665,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.nodeIpAllocations = make(map[strfmt.UUID]*network.NodeIpAllocation) for i, h := range cluster.Hosts { g.nodeIpAllocations[*h.ID] = &network.NodeIpAllocation{ @@ -694,7 +721,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.nodeIpAllocations = make(map[strfmt.UUID]*network.NodeIpAllocation) for i, h := range cluster.Hosts { @@ -754,7 +781,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -787,7 +814,7 @@ var _ = Describe("createHostIgnitions", func() { IgnitionConfigOverrides: `{"ignition": {"version": "3.2.0"}, "storage": {"files": [{"path": "/tmp/example", "contents": {"source": "data:text/plain;base64,aGVscGltdHJhcHBlZGluYXN3YWdnZXJzcGVj"}}]}}`, }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -858,7 +885,7 @@ spec: MachineConfigPoolName: "infra", }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderOpenshift)).Return([]s3wrapper.ObjectInfo{{Path: "mcp.yaml"}}, nil).Times(1) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderManifests)).Times(1) mockS3Client.EXPECT().Download(gomock.Any(), gomock.Any()).Return(io.NopCloser(strings.NewReader(mcp)), int64(0), nil) @@ -884,7 +911,7 @@ spec: MachineConfigPoolName: "infra", }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderOpenshift)).Return([]s3wrapper.ObjectInfo{{Path: "mcp.yaml"}}, nil).Times(1) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderManifests)).Times(1) mockS3Client.EXPECT().Download(gomock.Any(), gomock.Any()).Return(io.NopCloser(strings.NewReader(mc)), int64(0), nil) @@ -1709,10 +1736,11 @@ var _ = Describe("Set kubelet node ip", func() { var _ = Describe("Bare metal host generation", func() { var ( - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -1722,6 +1750,14 @@ var _ = Describe("Bare metal host generation", func() { ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -1734,7 +1770,6 @@ var _ = Describe("Bare metal host generation", func() { // Create the generator: generator := NewGenerator( workDir, - "", testCluster(), "", "", @@ -1745,9 +1780,9 @@ var _ = Describe("Bare metal host generation", func() { nil, "", "", - 5, manifestsAPI, eventsHandler, + installerCache, ).(*installerGenerator) // The default host inventory used by these tests has two NICs, each with @@ -1807,14 +1842,15 @@ var _ = Describe("Bare metal host generation", func() { var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { var ( - certDir string - dbName string - db *gorm.DB - cluster *common.Cluster - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + certDir string + dbName string + db *gorm.DB + cluster *common.Cluster + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) certFiles := []string{"test-cert.crt", "test-cert.key"} @@ -1845,6 +1881,14 @@ var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -1853,7 +1897,7 @@ var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { }) It("copies the tls cert files", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", certDir, 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", certDir, manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.importClusterTLSCerts(context.Background()) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/installercache/installercache.go b/internal/installercache/installercache.go index 50f1018d41a..2cc36939777 100644 --- a/internal/installercache/installercache.go +++ b/internal/installercache/installercache.go @@ -8,10 +8,13 @@ import ( "path/filepath" "strings" "sync" + "syscall" "time" "github.com/go-openapi/strfmt" + "github.com/google/uuid" eventsapi "github.com/openshift/assisted-service/internal/events/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/oc" "github.com/openshift/assisted-service/models" "github.com/pkg/errors" @@ -19,8 +22,7 @@ import ( ) var ( - DeleteGracePeriod time.Duration = -5 * time.Minute - CacheLimitThreshold = 0.8 + linkPruningGracePeriod time.Duration = 5 * time.Minute ) // Installers implements a thread safe LRU cache for ocp install binaries @@ -29,11 +31,26 @@ var ( type Installers struct { sync.Mutex log logrus.FieldLogger - // total capcity of the allowed storage (in bytes) - storageCapacity int64 // parent directory of the binary cache - cacheDir string - eventsHandler eventsapi.Handler + eventsHandler eventsapi.Handler + diskStatsHelper metrics.DiskStatsHelper + config InstallerCacheConfig +} + +type InstallerCacheConfig struct { + CacheDir string + MaxCapacity int64 + MaxReleaseSize int64 + ReleaseFetchRetryInterval time.Duration + InstallerCacheEvictionThreshold float64 +} + +type errorInsufficientCacheCapacity struct { + Message string +} + +func (e *errorInsufficientCacheCapacity) Error() string { + return e.Message } type fileInfo struct { @@ -43,7 +60,8 @@ type fileInfo struct { func (fi *fileInfo) Compare(other *fileInfo) bool { //oldest file will be first in queue - return fi.info.ModTime().Unix() < other.info.ModTime().Unix() + // Using micoseconds to make sure that the comparison is granular enough + return fi.info.ModTime().UnixMicro() < other.info.ModTime().UnixMicro() } const ( @@ -65,11 +83,9 @@ type Release struct { extractDuration float64 } -// Cleanup is called to signal that the caller has finished using the relase and that resources may be released. +// Cleanup is called to signal that the caller has finished using the release and that resources may be released. func (rl *Release) Cleanup(ctx context.Context) { - if err := os.Remove(rl.Path); err != nil { - logrus.New().WithError(err).Errorf("Failed to delete release link %s", rl.Path) - } + logrus.New().Infof("Cleaning up release %s", rl.Path) rl.eventsHandler.V2AddMetricsEvent( ctx, &rl.clusterID, nil, nil, "", models.EventSeverityInfo, metricEventInstallerCacheRelease, @@ -80,22 +96,54 @@ func (rl *Release) Cleanup(ctx context.Context) { "cached", rl.cached, "extract_duration", rl.extractDuration, ) + if err := os.Remove(rl.Path); err != nil { + logrus.New().WithError(err).Errorf("Failed to delete release link %s", rl.Path) + } } // New constructs an installer cache with a given storage capacity -func New(cacheDir string, storageCapacity int64, eventsHandler eventsapi.Handler, log logrus.FieldLogger) *Installers { +func New(config InstallerCacheConfig, eventsHandler eventsapi.Handler, diskStatsHelper metrics.DiskStatsHelper, log logrus.FieldLogger) (*Installers, error) { + if config.InstallerCacheEvictionThreshold == 0 { + return nil, errors.New("config.InstallerCacheEvictionThreshold must not be zero") + } + if config.MaxCapacity > 0 && config.MaxReleaseSize == 0 { + return nil, fmt.Errorf("config.MaxReleaseSize (%d bytes) must not be zero", config.MaxReleaseSize) + } + if config.MaxCapacity > 0 && config.MaxReleaseSize > config.MaxCapacity { + return nil, fmt.Errorf("config.MaxReleaseSize (%d bytes) must not be greater than config.MaxCapacity (%d bytes)", config.MaxReleaseSize, config.MaxCapacity) + } return &Installers{ log: log, - storageCapacity: storageCapacity, - cacheDir: cacheDir, eventsHandler: eventsHandler, - } + diskStatsHelper: diskStatsHelper, + config: config, + }, nil } // Get returns the path to an openshift-baremetal-install binary extracted from // the referenced release image. Tries the mirror release image first if it's set. It is safe for concurrent use. A cache of // binaries is maintained to reduce re-downloading of the same release. func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSecret string, ocRelease oc.Release, ocpVersion string, clusterID strfmt.UUID) (*Release, error) { + for { + select { + case <-ctx.Done(): + return nil, errors.Errorf("context cancelled or timed out while fetching release %s", releaseID) + default: + release, err := i.get(releaseID, releaseIDMirror, pullSecret, ocRelease, ocpVersion, clusterID) + if err == nil { + return release, nil + } + _, isCapacityError := err.(*errorInsufficientCacheCapacity) + if !isCapacityError { + return nil, errors.Wrapf(err, "failed to get installer path for release %s", releaseID) + } + i.log.WithError(err).Errorf("insufficient installer cache capacity for release %s", releaseID) + time.Sleep(i.config.ReleaseFetchRetryInterval) + } + } +} + +func (i *Installers) get(releaseID, releaseIDMirror, pullSecret string, ocRelease oc.Release, ocpVersion string, clusterID strfmt.UUID) (*Release, error) { i.Lock() defer i.Unlock() @@ -106,43 +154,108 @@ func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSe startTime: time.Now(), } - var workdir, binary, path string - var err error - - workdir, binary, path, err = ocRelease.GetReleaseBinaryPath(releaseID, i.cacheDir, ocpVersion) + workdir, binary, path, err := ocRelease.GetReleaseBinaryPath(releaseID, i.config.CacheDir, ocpVersion) if err != nil { return nil, err } if _, err = os.Stat(path); os.IsNotExist(err) { + i.log.Infof("release %s - not found in cache - attempting extraction", releaseID) + // Cache eviction and space checks + evictionAlreadyPerformed := false + for { + if i.config.MaxCapacity == 0 { + i.log.Info("cache eviction is disabled -- moving directly to extraction") + break + } + i.log.Infof("checking for space to extract release %s", releaseID) + // Determine actual space usage, accounting for hardlinks. + var usedBytes uint64 + usedBytes, _, err = i.diskStatsHelper.GetDiskUsage(i.config.CacheDir) + if err != nil { + if os.IsNotExist(err) { + // Installer cache directory will not exist prior to first extraction + i.log.WithError(err).Warnf("skipping capacity check as first extraction will trigger creation of directory") + break + } + return nil, errors.Wrapf(err, "could not determine disk usage information for cache dir %s", i.config.CacheDir) + } + shouldEvict, isBlocked := i.checkEvictionStatus(int64(usedBytes)) + // If we have already been around once, we don't want to 'double' evict + if shouldEvict && !evictionAlreadyPerformed { + i.evict() + } + if !isBlocked { + break + } + if evictionAlreadyPerformed { + // We still don't have enough capacity after eviction, so we need to exit and retry + return nil, &errorInsufficientCacheCapacity{ + Message: fmt.Sprintf("insufficient capacity in %s to store release", i.config.CacheDir), + } + } + evictionAlreadyPerformed = true + } + i.log.Infof("space available for release %s -- starting extraction", releaseID) extractStartTime := time.Now() - //evict older files if necessary - i.evict() - - //extract the binary - _, err = ocRelease.Extract(i.log, releaseID, releaseIDMirror, i.cacheDir, pullSecret, ocpVersion) + _, err = ocRelease.Extract(i.log, releaseID, releaseIDMirror, i.config.CacheDir, pullSecret, ocpVersion) if err != nil { return nil, err } release.extractDuration = time.Since(extractStartTime).Seconds() + i.log.Infof("finished extraction of %s in %d seconds", releaseID, release.extractDuration) } else { + i.log.Infof("fetching release %s from cache", releaseID) release.cached = true - //update the file mtime to signal it was recently used - err = os.Chtimes(path, time.Now(), time.Now()) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("Failed to update release binary %s", path)) - } + + } + + // update the file mtime to signal it was recently used + // do this, no matter where we sourced the binary (cache or otherwise) as it would appear that the binary + // can have a modtime - upon extraction - that is significantly in the past + // this can lead to premature link pruning in some scenarios. + err = os.Chtimes(path, time.Now(), time.Now()) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to update release binary %s", path)) } + // return a new hard link to the binary file // the caller should delete the hard link when // it finishes working with the file - link := filepath.Join(workdir, "ln_"+fmt.Sprint(time.Now().Unix())+ - "_"+binary) - err = os.Link(path, link) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("Failed to create hard link to binary %s", path)) + i.log.Infof("attempting hardlink creation for release %s", releaseID) + for { + link := filepath.Join(workdir, fmt.Sprintf("ln_%s_%s", uuid.NewString(), binary)) + err = os.Link(path, link) + if err == nil { + i.log.Infof("created hardlink %s to release %s at path %s", link, releaseID, path) + release.Path = link + return release, nil + } + if !os.IsExist(err) { + return nil, errors.Wrap(err, fmt.Sprintf("failed to create hard link to binary %s", path)) + } + } +} + +func (i *Installers) shouldEvict(totalUsed int64) bool { + shouldEvict, _ := i.checkEvictionStatus(totalUsed) + return shouldEvict +} + +func (i *Installers) checkEvictionStatus(totalUsed int64) (shouldEvict bool, isBlocked bool) { + if i.config.MaxCapacity == 0 { + // The cache eviction is completely disabled. + return false, false + } + if i.config.MaxCapacity-totalUsed < i.config.MaxReleaseSize { + // We are badly blocked, not enough room for even one release more. + return true, true } - release.Path = link - return release, nil + if (float64(totalUsed) / float64(i.config.MaxCapacity)) >= i.config.InstallerCacheEvictionThreshold { + // We need to evict some items in order to keep cache efficient and within capacity. + return true, false + } + // We have enough space. + return false, false } // Walk through the cacheDir and list the files recursively. @@ -151,11 +264,6 @@ func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSe // // Locking must be done outside evict() to avoid contentions. func (i *Installers) evict() { - //if cache limit is undefined skip eviction - if i.storageCapacity == 0 { - return - } - // store the file paths files := NewPriorityQueue(&fileInfo{}) links := make([]*fileInfo, 0) @@ -183,33 +291,34 @@ func (i *Installers) evict() { return nil } - err := filepath.Walk(i.cacheDir, visit) + err := filepath.Walk(i.config.CacheDir, visit) if err != nil { if !os.IsNotExist(err) { //ignore first invocation where the cacheDir does not exist - i.log.WithError(err).Errorf("release binary eviction failed to inspect directory %s", i.cacheDir) + i.log.WithError(err).Errorf("release binary eviction failed to inspect directory %s", i.config.CacheDir) } return } - //prune the hard links just in case the deletion of resources - //in ignition.go did not succeeded as expected - for idx := 0; idx < len(links); idx++ { - finfo := links[idx] - //Allow a grace period of 5 minutes from the link creation time - //to ensure the link is not being used. - grace := time.Now().Add(DeleteGracePeriod).Unix() - if finfo.info.ModTime().Unix() < grace { - os.Remove(finfo.path) - } - } + // TODO: We might want to consider if we need to do this longer term, in theory every hardlink should be automatically freed. + // For now, moved to a function so that this can be tested in unit tests. + i.pruneExpiredHardLinks(links, linkPruningGracePeriod) - //delete the oldest file if necessary - for totalSize >= int64(float64(i.storageCapacity)*CacheLimitThreshold) { - finfo, _ := files.Pop() - totalSize -= finfo.info.Size() - //remove the file - if err := i.evictFile(finfo.path); err != nil { - i.log.WithError(err).Errorf("failed to evict file %s", finfo.path) + // delete the oldest file if necessary + // prefer files without hardlinks first because + // 1: files without hardlinks are likely to be least recently used anyway + // 2: files without hardlinks will immediately free up storage + queues := i.splitQueueOnHardLinks(files) + for _, q := range queues { + for i.shouldEvict(totalSize) { + if q.Len() == 0 { // If we have cleaned out this queue then break the loop + break + } + finfo, _ := q.Pop() + totalSize -= finfo.info.Size() + //remove the file + if err := i.evictFile(finfo.path); err != nil { + i.log.WithError(err).Errorf("failed to evict file %s", finfo.path) + } } } } @@ -232,3 +341,47 @@ func (i *Installers) evictFile(filePath string) error { } return nil } + +// pruneExpiredHardLinks removes any hardlinks that have been around for too long +// the grace period is used to determine which links should be removed. +func (i *Installers) pruneExpiredHardLinks(links []*fileInfo, gracePeriod time.Duration) { + for idx := 0; idx < len(links); idx++ { + finfo := links[idx] + graceTime := time.Now().Add(-1 * gracePeriod) + grace := graceTime.Unix() + if finfo.info.ModTime().Unix() < grace { + i.log.Infof("Found expired hardlink -- pruning %s", finfo.info.Name()) + i.log.Infof("Mod time %s", finfo.info.ModTime().Format("2006-01-02 15:04:05")) + i.log.Infof("Grace time %s", graceTime.Format("2006-01-02 15:04:05")) + os.Remove(finfo.path) + } + } +} + +// splitQueueOnHardLinks Splits the provided *PriorityQueue[*fileInfo] into two queues +// withoutHardLinks will present a queue of files that do not have associated hardlinks +// withHardLinks will present a queue of files that do have associated hardlinks +// This is to allow us to prioritize deletion, favouring files without hardlinks first as these will have an immediate impact on storage. +func (i *Installers) splitQueueOnHardLinks(in *PriorityQueue[*fileInfo]) []*PriorityQueue[*fileInfo] { + withoutHardLinks := &PriorityQueue[*fileInfo]{} + withHardLinks := &PriorityQueue[*fileInfo]{} + for { + if in.Len() == 0 { + break + } + fi, _ := in.Pop() + stat, ok := fi.info.Sys().(*syscall.Stat_t) + if !ok { + // If we do encounter an error while performing stat - let's fall back to the original queue + // it's not optimal, but we don't break anything. + i.log.Errorf("encountered error while trying to split queues - using original queue") + return []*PriorityQueue[*fileInfo]{in} + } + if stat.Nlink == 0 { + withoutHardLinks.Add(fi) + continue + } + withHardLinks.Add(fi) + } + return []*PriorityQueue[*fileInfo]{withoutHardLinks, withHardLinks} +} diff --git a/internal/installercache/installercache_test.go b/internal/installercache/installercache_test.go index a1d8744911a..00f6718c326 100644 --- a/internal/installercache/installercache_test.go +++ b/internal/installercache/installercache_test.go @@ -2,8 +2,13 @@ package installercache import ( "context" + "fmt" + "io/fs" "os" "path/filepath" + "strings" + "sync" + "syscall" "testing" "time" @@ -13,6 +18,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" eventsapi "github.com/openshift/assisted-service/internal/events/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/oc" "github.com/openshift/assisted-service/models" "github.com/sirupsen/logrus" @@ -59,27 +65,35 @@ var _ = Describe("release event", func() { var _ = Describe("installer cache", func() { var ( - ctrl *gomock.Controller - mockRelease *oc.MockRelease - manager *Installers - cacheDir string - eventsHandler *eventsapi.MockHandler - ctx context.Context + ctrl *gomock.Controller + mockRelease *oc.MockRelease + manager *Installers + cacheDir string + eventsHandler *eventsapi.MockHandler + ctx context.Context + diskStatsHelper metrics.DiskStatsHelper ) BeforeEach(func() { - DeleteGracePeriod = 1 * time.Millisecond ctrl = gomock.NewController(GinkgoT()) + diskStatsHelper = metrics.NewOSDiskStatsHelper(logrus.New()) mockRelease = oc.NewMockRelease(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) - var err error cacheDir, err = os.MkdirTemp("/tmp", "cacheDir") Expect(err).NotTo(HaveOccurred()) Expect(os.Mkdir(filepath.Join(cacheDir, "quay.io"), 0755)).To(Succeed()) Expect(os.Mkdir(filepath.Join(filepath.Join(cacheDir, "quay.io"), "release-dev"), 0755)).To(Succeed()) - manager = New(cacheDir, 12, eventsHandler, logrus.New()) + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(12), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 1, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).NotTo(HaveOccurred()) ctx = context.TODO() }) @@ -87,26 +101,38 @@ var _ = Describe("installer cache", func() { os.RemoveAll(cacheDir) }) - expectEventSent := func() { + expectEventsSent := func() { eventsHandler.EXPECT().V2AddMetricsEvent( ctx, gomock.Any(), nil, nil, "", models.EventSeverityInfo, metricEventInstallerCacheRelease, gomock.Any(), gomock.Any(), - ).Times(1) + ).AnyTimes() } - testGet := func(releaseID, version string, clusterID strfmt.UUID, expectCached bool) (string, string) { - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") + mockReleaseCalls := func(releaseID string, version string) { + workdir := filepath.Join(manager.config.CacheDir, "quay.io", "release-dev") fname := filepath.Join(workdir, releaseID) mockRelease.EXPECT().GetReleaseBinaryPath( gomock.Any(), gomock.Any(), version). - Return(workdir, releaseID, fname, nil) + Return(workdir, releaseID, fname, nil).AnyTimes() + mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). + gomock.Any(), manager.config.CacheDir, gomock.Any(), version). DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) + dir, _ := filepath.Split(fname) + err := os.MkdirAll(dir, 0700) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(10 * time.Millisecond) // Add a small amount of latency to simulate extraction + err = os.WriteFile(fname, []byte("abcde"), 0600) return "", err - }) + }).AnyTimes() + } + + testGet := func(releaseID, version string, clusterID strfmt.UUID, expectCached bool) (string, string) { + workdir := filepath.Join(manager.config.CacheDir, "quay.io", "release-dev") + fname := filepath.Join(workdir, releaseID) + mockReleaseCalls(releaseID, version) + expectEventsSent() l, err := manager.Get(ctx, releaseID, "mirror", "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -119,19 +145,162 @@ var _ = Describe("installer cache", func() { } Expect(l.Path).ShouldNot(BeEmpty()) - time.Sleep(1 * time.Second) + time.Sleep(10 * time.Millisecond) Expect(l.startTime.Before(time.Now())).To(BeTrue()) + l.Cleanup(context.TODO()) return fname, l.Path } - It("evicts the oldest file", func() { + + type launchParams struct { + releaseID string + version string + clusterID strfmt.UUID + } + + // runParallelTest launches a batch of fetches from the installer cache in order to simulate multiple requests to the same node + // releases are automatically cleaned up as they are gathered + // returns the first error encountered or nil if no error encountered. + runParallelTest := func(params []launchParams) error { + var wg sync.WaitGroup + var firstError error + var errMutex sync.Mutex + for _, param := range params { + mockReleaseCalls(param.releaseID, param.version) + wg.Add(1) + go func(param launchParams) { + defer wg.Done() + release, err := manager.Get(ctx, param.releaseID, "mirror", "pull-secret", mockRelease, param.version, param.clusterID) + if err != nil { + errMutex.Lock() + if firstError == nil { + firstError = err + } + errMutex.Unlock() + return + } + // Simulate calls to release cleanup to ensure that we can test capacity limits + // Add a very small delay so that we can simulate usage of the release prior to release. + if release != nil { + time.Sleep(2 * time.Millisecond) + release.Cleanup(context.TODO()) + } + }(param) + } + wg.Wait() + return firstError + } + + getUsedBytesForDirectory := func(directory string) uint64 { + var totalBytes uint64 + seenInodes := make(map[uint64]bool) + err := filepath.Walk(directory, func(path string, fileInfo os.FileInfo, err error) error { + if err != nil { + if _, ok := err.(*os.PathError); ok { + // Something deleted the file before we could walk it + // count this as zero bytes + return nil + } + return err + } + stat, ok := fileInfo.Sys().(*syscall.Stat_t) + Expect(ok).To(BeTrue()) + if !fileInfo.IsDir() && !seenInodes[stat.Ino] { + totalBytes += uint64(fileInfo.Size()) + seenInodes[stat.Ino] = true + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + return totalBytes + } + + It("Should raise error on construction if max release size is larger than cache and cache is enabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(10), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.MaxReleaseSize (10 bytes) must not be greater than config.MaxCapacity (5 bytes)")) + }) + + It("Should raise error on construction if max release size is zero and cache is enabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.MaxReleaseSize (0 bytes) must not be zero")) + }) + + It("Should not raise error on construction if max release size is larger than cache and cache eviction is disabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(10), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Should not raise error on construction if max release size is zero and cache eviction is disabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Should not accept InstallerCacheEvictionThreshold of zero", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.InstallerCacheEvictionThreshold must not be zero")) + }) + + It("when cache limit is zero - eviction is skipped", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) clusterId := strfmt.UUID(uuid.New().String()) r1, l1 := testGet("4.8", "4.8.0", clusterId, false) r2, l2 := testGet("4.9", "4.9.0", clusterId, false) r3, l3 := testGet("4.10", "4.10.0", clusterId, false) - By("verify that the oldest file was deleted") - _, err := os.Stat(r1) - Expect(os.IsNotExist(err)).To(BeTrue()) + By("verify that the no file was deleted") + _, err = os.Stat(r1) + Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r2) Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r3) @@ -161,23 +330,34 @@ var _ = Describe("installer cache", func() { Expect(os.IsNotExist(err)).To(BeTrue()) _, err = os.Stat(r3) Expect(os.IsNotExist(err)).To(BeFalse()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) - It("when cache limit is not set eviction is skipped", func() { - manager.storageCapacity = 0 + It("evicts the oldest file", func() { clusterId := strfmt.UUID(uuid.New().String()) - r1, _ := testGet("4.8", "4.8.0", clusterId, false) - r2, _ := testGet("4.9", "4.9.0", clusterId, false) - r3, _ := testGet("4.10", "4.10.0", clusterId, false) + r1, l1 := testGet("4.8", "4.8.0", clusterId, false) + r2, l2 := testGet("4.9", "4.9.0", clusterId, false) + r3, l3 := testGet("4.10", "4.10.0", clusterId, false) - By("verify that the no file was deleted") + By("verify that the oldest file was deleted") _, err := os.Stat(r1) - Expect(os.IsNotExist(err)).To(BeFalse()) + Expect(os.IsNotExist(err)).To(BeTrue()) _, err = os.Stat(r2) Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r3) Expect(os.IsNotExist(err)).To(BeFalse()) + By("verify that the links were purged") + manager.evict() + _, err = os.Stat(l1) + Expect(os.IsNotExist(err)).To(BeTrue()) + _, err = os.Stat(l2) + Expect(os.IsNotExist(err)).To(BeTrue()) + _, err = os.Stat(l3) + Expect(os.IsNotExist(err)).To(BeTrue()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) It("extracts from the mirror", func() { @@ -185,18 +365,7 @@ var _ = Describe("installer cache", func() { releaseMirrorID := "4.10-mirror" clusterID := strfmt.UUID(uuid.New().String()) version := "4.10.0" - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") - fname := filepath.Join(workdir, releaseID) - - mockRelease.EXPECT().GetReleaseBinaryPath( - releaseID, gomock.Any(), version). - Return(workdir, releaseID, fname, nil) - mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). - DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) - return "", err - }) + mockReleaseCalls(releaseID, version) l, err := manager.Get(ctx, releaseID, releaseMirrorID, "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -205,8 +374,10 @@ var _ = Describe("installer cache", func() { Expect(l.cached).To(BeFalse()) Expect(l.extractDuration).ShouldNot(BeZero()) Expect(l.Path).ShouldNot(BeEmpty()) - expectEventSent() + expectEventsSent() l.Cleanup(ctx) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) It("extracts without a mirror", func() { @@ -214,18 +385,7 @@ var _ = Describe("installer cache", func() { releaseMirrorID := "" version := "4.10.0" clusterID := strfmt.UUID(uuid.NewString()) - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") - fname := filepath.Join(workdir, releaseID) - - mockRelease.EXPECT().GetReleaseBinaryPath( - releaseID, gomock.Any(), version). - Return(workdir, releaseID, fname, nil) - mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). - DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) - return "", err - }) + mockReleaseCalls(releaseID, version) l, err := manager.Get(ctx, releaseID, releaseMirrorID, "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -234,8 +394,156 @@ var _ = Describe("installer cache", func() { Expect(l.cached).To(BeFalse()) Expect(l.extractDuration).ShouldNot(BeZero()) Expect(l.Path).ShouldNot(BeEmpty()) - expectEventSent() + expectEventsSent() l.Cleanup(ctx) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should correctly handle multiple requests for the same release at the same time", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(10), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + params := []launchParams{} + for i := 0; i < 10; i++ { + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + } + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should consistently handle multiple requests for different releases at the same time", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(25), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < 10; i++ { + params := []launchParams{} + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.18.11-x86_64", version: "4.18.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.19.11-x86_64", version: "4.19.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.20.11-x86_64", version: "4.20.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.21.11-x86_64", version: "4.21.11", clusterID: strfmt.UUID(uuid.NewString())}) + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + } + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should maintain cache within threshold", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(500), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < 85; i++ { // Deliberately generate a number of requests that will be above the percentage + params := []launchParams{} + params = append(params, launchParams{releaseID: fmt.Sprintf("release-%d", i), version: fmt.Sprintf("release-%d.0.1", i), clusterID: strfmt.UUID(uuid.NewString())}) + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + } + // Ensure that we hold within the correct percentage of the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", float64(manager.config.MaxCapacity)*installerCacheConfig.InstallerCacheEvictionThreshold)) + }) + + It("should stay within the cache limit where there is only sufficient space for one release", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + expectEventsSent() + // Force a scenario where one of the requests will fail because of a pending release cleanup + params := []launchParams{} + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.18.11-x86_64", version: "4.18.11", clusterID: strfmt.UUID(uuid.NewString())}) + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + + // After we have collected all results from the last parallel test, the cleanup should have occurred as part of the test. + // Now assert that a retry would work, there should be enough space for another release + // use a brand new release ID to prove we are not hitting cache here. + params = []launchParams{} + params = append(params, launchParams{releaseID: "4.19.11-x86_64", version: "4.19.11", clusterID: strfmt.UUID(uuid.NewString())}) + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should remove expired links while leaving non expired links intact", func() { + + numberOfLinks := 10 + numberOfExpiredLinks := 5 + + directory, err := os.MkdirTemp("", "testPruneExpiredHardLinks") + Expect(err).ToNot(HaveOccurred()) + + defer os.RemoveAll(directory) + + for i := 0; i < numberOfLinks; i++ { + var someFile *os.File + someFile, err = os.CreateTemp(directory, "somefile") + Expect(err).ToNot(HaveOccurred()) + linkPath := filepath.Join(directory, fmt.Sprintf("ln_%s", uuid.NewString())) + err = os.Link(someFile.Name(), linkPath) + Expect(err).ToNot(HaveOccurred()) + if i > numberOfExpiredLinks-1 { + err = os.Chtimes(linkPath, time.Now().Add(-10*time.Minute), time.Now().Add(-10*time.Minute)) + Expect(err).ToNot(HaveOccurred()) + } + } + + links := make([]*fileInfo, 0) + err = filepath.Walk(directory, func(path string, info fs.FileInfo, err error) error { + if strings.HasPrefix(info.Name(), "ln_") { + links = append(links, &fileInfo{path, info}) + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(links)).To(Equal(10)) + + manager.pruneExpiredHardLinks(links, linkPruningGracePeriod) + + linkCount := 0 + err = filepath.Walk(directory, func(path string, info fs.FileInfo, err error) error { + if strings.HasPrefix(info.Name(), "ln_") { + linkCount++ + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(linkCount).To(Equal(numberOfLinks - numberOfExpiredLinks)) }) }) diff --git a/internal/metrics/disk_stats_helper.go b/internal/metrics/disk_stats_helper.go index 148a6bc65ce..bb3c9223407 100644 --- a/internal/metrics/disk_stats_helper.go +++ b/internal/metrics/disk_stats_helper.go @@ -7,6 +7,7 @@ import ( "syscall" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) @@ -15,11 +16,12 @@ type DiskStatsHelper interface { } type OSDiskStatsHelper struct { + log *logrus.Logger } //go:generate mockgen -source=disk_stats_helper.go -package=metrics -destination=mock_disk_stats_helper.go -func NewOSDiskStatsHelper() *OSDiskStatsHelper { - return &OSDiskStatsHelper{} +func NewOSDiskStatsHelper(log *logrus.Logger) *OSDiskStatsHelper { + return &OSDiskStatsHelper{log: log} } func (c *OSDiskStatsHelper) getUsedBytesInDirectory(directory string) (uint64, error) { @@ -28,6 +30,14 @@ func (c *OSDiskStatsHelper) getUsedBytesInDirectory(directory string) (uint64, e seenInodes := make(map[uint64]bool) err := filepath.Walk(directory, func(path string, fileInfo os.FileInfo, err error) error { if err != nil { + if _, ok := err.(*os.PathError); ok { + // it's possible to encounter a path error if a file has been deleted since we obtained the file list + // in cases where deletion occurs outside of Mutex protection, such as when we clean up an installercache.Release, this can be expected + // we can safely count this as 'zero sized' in these cases + c.log.WithError(err).Warnf("could not stat file %s because it was deleted before we could walk it - assuming zero size", path) + // return nil to ignore the error. + return nil + } return err } // We need to ensure that the size check is based on inodes and not just the sizes gleaned from files. diff --git a/internal/metrics/metricsManager_test.go b/internal/metrics/metricsManager_test.go index 97370938271..085cf2df932 100644 --- a/internal/metrics/metricsManager_test.go +++ b/internal/metrics/metricsManager_test.go @@ -47,7 +47,7 @@ var _ = DescribeTable( metricsManagerConfig := &MetricsManagerConfig{ DirectoryUsageMonitorConfig: DirectoryUsageMonitorConfig{ Directories: []string{"/data"}}} - manager := NewMetricsManager(server.Registry(), handler, NewOSDiskStatsHelper(), metricsManagerConfig, logrus.New()) + manager := NewMetricsManager(server.Registry(), handler, NewOSDiskStatsHelper(logrus.New()), metricsManagerConfig, logrus.New()) manager.ReportHostInstallationMetrics( ctx, "4.10.18", diff --git a/internal/metrics/os_disk_stats_helper_test.go b/internal/metrics/os_disk_stats_helper_test.go index 69ab42f6fb4..823c45551c0 100644 --- a/internal/metrics/os_disk_stats_helper_test.go +++ b/internal/metrics/os_disk_stats_helper_test.go @@ -7,13 +7,14 @@ import ( "github.com/google/uuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" ) var _ = Describe("OS disk stats helper", func() { var ( tempDir string - diskStatsHelper *OSDiskStatsHelper = NewOSDiskStatsHelper() + diskStatsHelper *OSDiskStatsHelper = NewOSDiskStatsHelper(logrus.New()) ) BeforeEach(func() { diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index 49b963dbe19..a08c2fb89f3 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -8,6 +8,7 @@ import ( "github.com/openshift/assisted-service/internal/common" eventsapi "github.com/openshift/assisted-service/internal/events/api" "github.com/openshift/assisted-service/internal/ignition" + "github.com/openshift/assisted-service/internal/installercache" manifestsapi "github.com/openshift/assisted-service/internal/manifests/api" "github.com/openshift/assisted-service/internal/provider/registry" logutil "github.com/openshift/assisted-service/pkg/log" @@ -21,11 +22,10 @@ type InstallConfigGenerator interface { } type Config struct { - ServiceCACertPath string `envconfig:"SERVICE_CA_CERT_PATH" default:""` - ReleaseImageMirror string `envconfig:"OPENSHIFT_INSTALL_RELEASE_IMAGE_MIRROR" default:""` - DummyIgnition bool `envconfig:"DUMMY_IGNITION"` - InstallInvoker string `envconfig:"INSTALL_INVOKER" default:"assisted-installer"` - InstallerCacheCapacity int64 `envconfig:"INSTALLER_CACHE_CAPACITY"` + ServiceCACertPath string `envconfig:"SERVICE_CA_CERT_PATH" default:""` + ReleaseImageMirror string `envconfig:"OPENSHIFT_INSTALL_RELEASE_IMAGE_MIRROR" default:""` + DummyIgnition bool `envconfig:"DUMMY_IGNITION"` + InstallInvoker string `envconfig:"INSTALL_INVOKER" default:"assisted-installer"` // Directory containing pre-generated TLS certs/keys for the ephemeral installer ClusterTLSCertOverrideDir string `envconfig:"EPHEMERAL_INSTALLER_CLUSTER_TLS_CERTS_OVERRIDE_DIR" default:""` @@ -39,18 +39,29 @@ type installGenerator struct { providerRegistry registry.ProviderRegistry manifestApi manifestsapi.ManifestsAPI eventsHandler eventsapi.Handler + installerCache *installercache.Installers } -func New(log logrus.FieldLogger, s3Client s3wrapper.API, cfg Config, workDir string, - providerRegistry registry.ProviderRegistry, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler) *installGenerator { +type InstallGeneratorDirectoryConfig struct { + WorkDir string +} + +// GetWorkingDirectory determines the working directory path for the generator. +func (c *InstallGeneratorDirectoryConfig) GetWorkingDirectory() string { + return filepath.Join(c.WorkDir, "install-config-generate") +} + +func New(directoryConfig InstallGeneratorDirectoryConfig, log logrus.FieldLogger, s3Client s3wrapper.API, cfg Config, + providerRegistry registry.ProviderRegistry, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler, installerCache *installercache.Installers) *installGenerator { return &installGenerator{ Config: cfg, log: log, s3Client: s3Client, - workDir: filepath.Join(workDir, "install-config-generate"), + workDir: directoryConfig.GetWorkingDirectory(), providerRegistry: providerRegistry, manifestApi: manifestApi, eventsHandler: eventsHandler, + installerCache: installerCache, } } @@ -71,15 +82,13 @@ func (k *installGenerator) GenerateInstallConfig(ctx context.Context, cluster co } }() - installerCacheDir := filepath.Join(k.workDir, "installercache") - // runs openshift-install to generate ignition files, then modifies them as necessary var generator ignition.Generator if k.Config.DummyIgnition { generator = ignition.NewDummyGenerator(clusterWorkDir, &cluster, k.s3Client, log) } else { - generator = ignition.NewGenerator(clusterWorkDir, installerCacheDir, &cluster, releaseImage, k.Config.ReleaseImageMirror, - k.Config.ServiceCACertPath, k.Config.InstallInvoker, k.s3Client, log, k.providerRegistry, installerReleaseImageOverride, k.Config.ClusterTLSCertOverrideDir, k.InstallerCacheCapacity, k.manifestApi, k.eventsHandler) + generator = ignition.NewGenerator(clusterWorkDir, &cluster, releaseImage, k.Config.ReleaseImageMirror, + k.Config.ServiceCACertPath, k.Config.InstallInvoker, k.s3Client, log, k.providerRegistry, installerReleaseImageOverride, k.Config.ClusterTLSCertOverrideDir, k.manifestApi, k.eventsHandler, k.installerCache) } err = generator.Generate(ctx, cfg) if err != nil {