diff --git a/cmd/main.go b/cmd/main.go index 4c4dc08bca0..7bafcb94a50 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ import ( "time" "github.com/NYTimes/gziphandler" + "github.com/alecthomas/units" "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" @@ -39,6 +40,7 @@ import ( "github.com/openshift/assisted-service/internal/ignition" "github.com/openshift/assisted-service/internal/infraenv" installcfg "github.com/openshift/assisted-service/internal/installcfg/builder" + "github.com/openshift/assisted-service/internal/installercache" internaljson "github.com/openshift/assisted-service/internal/json" "github.com/openshift/assisted-service/internal/manifests" "github.com/openshift/assisted-service/internal/metrics" @@ -172,6 +174,18 @@ var Options struct { // EnableXattrFallback is a boolean flag to enable en emulated fallback methoid of xattr on systems that do not support xattr. EnableXattrFallback bool `envconfig:"ENABLE_XATTR_FALLBACK" default:"true"` + + // InstallerCacheCapacityGiB is the capacity of the installer cache in GiB + InstallerCacheCapacityGiB uint `envconfig:"INSTALLER_CACHE_CAPACITY_GIB" default:"0"` + + // InstallerCacheMaxReleaseSizeGiB is the expected maximum size of a single release in GiB + InstallerCacheMaxReleaseSizeGiB uint `envconfig:"INSTALLER_CACHE_MAX_RELEASE_SIZE_GIB" default:"2"` + + // InstallerCacheEvictionThresholdPercent is the percentage of capacity at which the cache will start to evict releases. + InstallerCacheEvictionThresholdPercent uint `envconfig:"INSTALLER_CACHE_EVICTION_THRESHOLD_PERCENT" default:"80"` + + // ReleaseFetchRetryIntervalSeconds is the number of seconds that the cache should wait before retrying the fetch of a release if unable to do so for capacity reasons. + ReleaseFetchRetryIntervalSeconds uint `envconfig:"INSTALLER_CACHE_RELEASE_FETCH_RETRY_INTERVAL_SECONDS" default:"30"` } func InitLogs(logLevel, logFormat string) *logrus.Logger { @@ -315,7 +329,8 @@ func main() { metricsManagerConfig := &metrics.MetricsManagerConfig{ DirectoryUsageMonitorConfig: metrics.DirectoryUsageMonitorConfig{ Directories: []string{Options.WorkDir}}} - metricsManager := metrics.NewMetricsManager(prometheusRegistry, eventsHandler, metrics.NewOSDiskStatsHelper(), metricsManagerConfig, log) + diskStatsHelper := metrics.NewOSDiskStatsHelper(logrus.New()) + metricsManager := metrics.NewMetricsManager(prometheusRegistry, eventsHandler, diskStatsHelper, metricsManagerConfig, log) if ocmClient != nil { //inject the metric server to the ocm client for purpose of //performance monitoring the calls to ACM. This could not be done @@ -488,7 +503,18 @@ func main() { failOnError(err, "failed to create valid bm config S3 endpoint URL from %s", Options.BMConfig.S3EndpointURL) Options.BMConfig.S3EndpointURL = newUrl - generator := generator.New(log, objectHandler, Options.GeneratorConfig, Options.WorkDir, providerRegistry, manifestsApi, eventsHandler) + installGeneratorDirectoryConfig := generator.InstallGeneratorDirectoryConfig{WorkDir: Options.WorkDir} + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(installGeneratorDirectoryConfig.GetWorkingDirectory(), "installercache"), + MaxCapacity: int64(Options.InstallerCacheCapacityGiB) * int64(units.GiB), + MaxReleaseSize: int64(Options.InstallerCacheMaxReleaseSizeGiB) * int64(units.GiB), + ReleaseFetchRetryInterval: time.Duration(Options.ReleaseFetchRetryIntervalSeconds) * time.Second, + InstallerCacheEvictionThreshold: float64(Options.InstallerCacheEvictionThresholdPercent) / 100, + } + installerCache, err := installercache.New(installerCacheConfig, eventsHandler, diskStatsHelper, log) + failOnError(err, "failed to instantiate installercache") + + generator := generator.New(installGeneratorDirectoryConfig, log, objectHandler, Options.GeneratorConfig, providerRegistry, manifestsApi, eventsHandler, installerCache) var crdUtils bminventory.CRDUtils if ctrlMgr != nil { crdUtils = controllers.NewCRDUtils(ctrlMgr.GetClient(), hostApi) diff --git a/internal/ignition/installmanifests.go b/internal/ignition/installmanifests.go index 88d0e77df49..6adf98d581d 100644 --- a/internal/ignition/installmanifests.go +++ b/internal/ignition/installmanifests.go @@ -85,7 +85,6 @@ type installerGenerator struct { cluster *common.Cluster releaseImage string releaseImageMirror string - installerDir string serviceCACert string encodedDhcpFileContents string s3Client s3wrapper.API @@ -110,16 +109,15 @@ var fileNames = [...]string{ } // NewGenerator returns a generator that can generate ignition files -func NewGenerator(workDir string, installerDir string, cluster *common.Cluster, releaseImage string, releaseImageMirror string, +func NewGenerator(workDir string, cluster *common.Cluster, releaseImage string, releaseImageMirror string, serviceCACert string, installInvoker string, s3Client s3wrapper.API, log logrus.FieldLogger, providerRegistry registry.ProviderRegistry, - installerReleaseImageOverride, clusterTLSCertOverrideDir string, storageCapacityLimit int64, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler) Generator { + installerReleaseImageOverride, clusterTLSCertOverrideDir string, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler, installerCache *installercache.Installers) Generator { return &installerGenerator{ cluster: cluster, log: log, releaseImage: releaseImage, releaseImageMirror: releaseImageMirror, workDir: workDir, - installerDir: installerDir, serviceCACert: serviceCACert, s3Client: s3Client, enableMetal3Provisioning: true, @@ -127,7 +125,7 @@ func NewGenerator(workDir string, installerDir string, cluster *common.Cluster, providerRegistry: providerRegistry, installerReleaseImageOverride: installerReleaseImageOverride, clusterTLSCertOverrideDir: clusterTLSCertOverrideDir, - installerCache: installercache.New(installerDir, storageCapacityLimit, eventsHandler, log), + installerCache: installerCache, manifestApi: manifestApi, } } diff --git a/internal/ignition/installmanifests_test.go b/internal/ignition/installmanifests_test.go index 04061852eb7..97c212ac1dd 100644 --- a/internal/ignition/installmanifests_test.go +++ b/internal/ignition/installmanifests_test.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "time" config_32 "github.com/coreos/ignition/v2/config/v3_2" config_32_types "github.com/coreos/ignition/v2/config/v3_2/types" @@ -25,7 +26,9 @@ import ( "github.com/openshift/assisted-service/internal/constants" eventsapi "github.com/openshift/assisted-service/internal/events/api" "github.com/openshift/assisted-service/internal/host/hostutil" + "github.com/openshift/assisted-service/internal/installercache" manifestsapi "github.com/openshift/assisted-service/internal/manifests/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/network" "github.com/openshift/assisted-service/models" "github.com/openshift/assisted-service/pkg/s3wrapper" @@ -78,29 +81,37 @@ var _ = Describe("Bootstrap Ignition Update", func() { }` var ( - err error - examplePath string - db *gorm.DB - dbName string - bmh *bmh_v1alpha1.BareMetalHost - config *config_32_types.Config - mockS3Client *s3wrapper.MockAPI - workDir string - cluster *common.Cluster - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + err error + examplePath string + db *gorm.DB + dbName string + bmh *bmh_v1alpha1.BareMetalHost + config *config_32_types.Config + mockS3Client *s3wrapper.MockAPI + workDir string + cluster *common.Cluster + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler *eventsapi.MockHandler + installerCache *installercache.Installers ) BeforeEach(func() { // setup temp workdir workDir, err = os.MkdirTemp("", "bootstrap-ignition-update-test-") Expect(err).NotTo(HaveOccurred()) + Expect(err).NotTo(HaveOccurred()) examplePath = filepath.Join(workDir, "example1.ign") var err1 error err1 = os.WriteFile(examplePath, []byte(bootstrap1), 0600) Expect(err1).NotTo(HaveOccurred()) ctrl = gomock.NewController(GinkgoT()) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) mockS3Client = s3wrapper.NewMockAPI(ctrl) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) @@ -113,7 +124,7 @@ var _ = Describe("Bootstrap Ignition Update", func() { }, } db, dbName = common.PrepareTestDB() - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) Expect(g.updateBootstrap(context.Background(), examplePath)).To(Succeed()) @@ -241,16 +252,17 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 -----END CERTIFICATE-----` var ( - masterPath string - workerPath string - caCertPath string - dbName string - db *gorm.DB - cluster *common.Cluster - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + masterPath string + workerPath string + caCertPath string + dbName string + db *gorm.DB + cluster *common.Cluster + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -274,6 +286,13 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -283,7 +302,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 Describe("update ignitions", func() { It("with ca cert file", func() { - g := NewGenerator(workDir, "", cluster, "", "", caCertPath, "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", caCertPath, "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.updateIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -305,7 +324,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 Expect(file.Path).To(Equal(common.HostCACertPath)) }) It("with no ca cert file", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.updateIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -324,7 +343,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 }) Context("DHCP generation", func() { It("Definitions only", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.encodedDhcpFileContents = "data:,abc" err := g.updateIgnitions() @@ -342,7 +361,7 @@ SV4bRR9i0uf+xQ/oYRvugQ25Q7EahO5hJIWRf4aULbk36Zpw3++v2KFnF26zqwB6 }) }) It("Definitions+leases", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.encodedDhcpFileContents = "data:,abc" cluster.ApiVipLease = "api" @@ -429,14 +448,15 @@ var _ = Describe("createHostIgnitions", func() { }` var ( - dbName string - db *gorm.DB - mockS3Client *s3wrapper.MockAPI - cluster *common.Cluster - ctrl *gomock.Controller - workDir string - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + dbName string + db *gorm.DB + mockS3Client *s3wrapper.MockAPI + cluster *common.Cluster + ctrl *gomock.Controller + workDir string + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -457,6 +477,13 @@ var _ = Describe("createHostIgnitions", func() { manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) cluster = testCluster() + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -492,7 +519,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -546,7 +573,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -590,7 +617,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -638,7 +665,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.nodeIpAllocations = make(map[strfmt.UUID]*network.NodeIpAllocation) for i, h := range cluster.Hosts { g.nodeIpAllocations[*h.ID] = &network.NodeIpAllocation{ @@ -694,7 +721,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) g.nodeIpAllocations = make(map[strfmt.UUID]*network.NodeIpAllocation) for i, h := range cluster.Hosts { @@ -754,7 +781,7 @@ var _ = Describe("createHostIgnitions", func() { host.ID = &id } - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -787,7 +814,7 @@ var _ = Describe("createHostIgnitions", func() { IgnitionConfigOverrides: `{"ignition": {"version": "3.2.0"}, "storage": {"files": [{"path": "/tmp/example", "contents": {"source": "data:text/plain;base64,aGVscGltdHJhcHBlZGluYXN3YWdnZXJzcGVj"}}]}}`, }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.createHostIgnitions() Expect(err).NotTo(HaveOccurred()) @@ -858,7 +885,7 @@ spec: MachineConfigPoolName: "infra", }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderOpenshift)).Return([]s3wrapper.ObjectInfo{{Path: "mcp.yaml"}}, nil).Times(1) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderManifests)).Times(1) mockS3Client.EXPECT().Download(gomock.Any(), gomock.Any()).Return(io.NopCloser(strings.NewReader(mcp)), int64(0), nil) @@ -884,7 +911,7 @@ spec: MachineConfigPoolName: "infra", }} - g := NewGenerator(workDir, "", cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", mockS3Client, logrus.New(), nil, "", "", manifestsAPI, eventsHandler, installerCache).(*installerGenerator) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderOpenshift)).Return([]s3wrapper.ObjectInfo{{Path: "mcp.yaml"}}, nil).Times(1) mockS3Client.EXPECT().ListObjectsByPrefixWithMetadata(gomock.Any(), filepath.Join(clusterID.String(), constants.ManifestFolder, models.ManifestFolderManifests)).Times(1) mockS3Client.EXPECT().Download(gomock.Any(), gomock.Any()).Return(io.NopCloser(strings.NewReader(mc)), int64(0), nil) @@ -1709,10 +1736,11 @@ var _ = Describe("Set kubelet node ip", func() { var _ = Describe("Bare metal host generation", func() { var ( - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) BeforeEach(func() { @@ -1722,6 +1750,14 @@ var _ = Describe("Bare metal host generation", func() { ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -1734,7 +1770,6 @@ var _ = Describe("Bare metal host generation", func() { // Create the generator: generator := NewGenerator( workDir, - "", testCluster(), "", "", @@ -1745,9 +1780,9 @@ var _ = Describe("Bare metal host generation", func() { nil, "", "", - 5, manifestsAPI, eventsHandler, + installerCache, ).(*installerGenerator) // The default host inventory used by these tests has two NICs, each with @@ -1807,14 +1842,15 @@ var _ = Describe("Bare metal host generation", func() { var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { var ( - certDir string - dbName string - db *gorm.DB - cluster *common.Cluster - workDir string - ctrl *gomock.Controller - manifestsAPI *manifestsapi.MockManifestsAPI - eventsHandler eventsapi.Handler + certDir string + dbName string + db *gorm.DB + cluster *common.Cluster + workDir string + ctrl *gomock.Controller + manifestsAPI *manifestsapi.MockManifestsAPI + eventsHandler eventsapi.Handler + installerCache *installercache.Installers ) certFiles := []string{"test-cert.crt", "test-cert.key"} @@ -1845,6 +1881,14 @@ var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { ctrl = gomock.NewController(GinkgoT()) manifestsAPI = manifestsapi.NewMockManifestsAPI(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) + installerCacheConfig := installercache.InstallerCacheConfig{ + CacheDir: filepath.Join(workDir, "some-dir", "installercache"), + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + } + installerCache, err = installercache.New(installerCacheConfig, eventsHandler, metrics.NewOSDiskStatsHelper(logrus.New()), logrus.New()) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -1853,7 +1897,7 @@ var _ = Describe("Import Cluster TLS Certs for ephemeral installer", func() { }) It("copies the tls cert files", func() { - g := NewGenerator(workDir, "", cluster, "", "", "", "", nil, logrus.New(), nil, "", certDir, 5, manifestsAPI, eventsHandler).(*installerGenerator) + g := NewGenerator(workDir, cluster, "", "", "", "", nil, logrus.New(), nil, "", certDir, manifestsAPI, eventsHandler, installerCache).(*installerGenerator) err := g.importClusterTLSCerts(context.Background()) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/installercache/installercache.go b/internal/installercache/installercache.go index 50f1018d41a..2cc36939777 100644 --- a/internal/installercache/installercache.go +++ b/internal/installercache/installercache.go @@ -8,10 +8,13 @@ import ( "path/filepath" "strings" "sync" + "syscall" "time" "github.com/go-openapi/strfmt" + "github.com/google/uuid" eventsapi "github.com/openshift/assisted-service/internal/events/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/oc" "github.com/openshift/assisted-service/models" "github.com/pkg/errors" @@ -19,8 +22,7 @@ import ( ) var ( - DeleteGracePeriod time.Duration = -5 * time.Minute - CacheLimitThreshold = 0.8 + linkPruningGracePeriod time.Duration = 5 * time.Minute ) // Installers implements a thread safe LRU cache for ocp install binaries @@ -29,11 +31,26 @@ var ( type Installers struct { sync.Mutex log logrus.FieldLogger - // total capcity of the allowed storage (in bytes) - storageCapacity int64 // parent directory of the binary cache - cacheDir string - eventsHandler eventsapi.Handler + eventsHandler eventsapi.Handler + diskStatsHelper metrics.DiskStatsHelper + config InstallerCacheConfig +} + +type InstallerCacheConfig struct { + CacheDir string + MaxCapacity int64 + MaxReleaseSize int64 + ReleaseFetchRetryInterval time.Duration + InstallerCacheEvictionThreshold float64 +} + +type errorInsufficientCacheCapacity struct { + Message string +} + +func (e *errorInsufficientCacheCapacity) Error() string { + return e.Message } type fileInfo struct { @@ -43,7 +60,8 @@ type fileInfo struct { func (fi *fileInfo) Compare(other *fileInfo) bool { //oldest file will be first in queue - return fi.info.ModTime().Unix() < other.info.ModTime().Unix() + // Using micoseconds to make sure that the comparison is granular enough + return fi.info.ModTime().UnixMicro() < other.info.ModTime().UnixMicro() } const ( @@ -65,11 +83,9 @@ type Release struct { extractDuration float64 } -// Cleanup is called to signal that the caller has finished using the relase and that resources may be released. +// Cleanup is called to signal that the caller has finished using the release and that resources may be released. func (rl *Release) Cleanup(ctx context.Context) { - if err := os.Remove(rl.Path); err != nil { - logrus.New().WithError(err).Errorf("Failed to delete release link %s", rl.Path) - } + logrus.New().Infof("Cleaning up release %s", rl.Path) rl.eventsHandler.V2AddMetricsEvent( ctx, &rl.clusterID, nil, nil, "", models.EventSeverityInfo, metricEventInstallerCacheRelease, @@ -80,22 +96,54 @@ func (rl *Release) Cleanup(ctx context.Context) { "cached", rl.cached, "extract_duration", rl.extractDuration, ) + if err := os.Remove(rl.Path); err != nil { + logrus.New().WithError(err).Errorf("Failed to delete release link %s", rl.Path) + } } // New constructs an installer cache with a given storage capacity -func New(cacheDir string, storageCapacity int64, eventsHandler eventsapi.Handler, log logrus.FieldLogger) *Installers { +func New(config InstallerCacheConfig, eventsHandler eventsapi.Handler, diskStatsHelper metrics.DiskStatsHelper, log logrus.FieldLogger) (*Installers, error) { + if config.InstallerCacheEvictionThreshold == 0 { + return nil, errors.New("config.InstallerCacheEvictionThreshold must not be zero") + } + if config.MaxCapacity > 0 && config.MaxReleaseSize == 0 { + return nil, fmt.Errorf("config.MaxReleaseSize (%d bytes) must not be zero", config.MaxReleaseSize) + } + if config.MaxCapacity > 0 && config.MaxReleaseSize > config.MaxCapacity { + return nil, fmt.Errorf("config.MaxReleaseSize (%d bytes) must not be greater than config.MaxCapacity (%d bytes)", config.MaxReleaseSize, config.MaxCapacity) + } return &Installers{ log: log, - storageCapacity: storageCapacity, - cacheDir: cacheDir, eventsHandler: eventsHandler, - } + diskStatsHelper: diskStatsHelper, + config: config, + }, nil } // Get returns the path to an openshift-baremetal-install binary extracted from // the referenced release image. Tries the mirror release image first if it's set. It is safe for concurrent use. A cache of // binaries is maintained to reduce re-downloading of the same release. func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSecret string, ocRelease oc.Release, ocpVersion string, clusterID strfmt.UUID) (*Release, error) { + for { + select { + case <-ctx.Done(): + return nil, errors.Errorf("context cancelled or timed out while fetching release %s", releaseID) + default: + release, err := i.get(releaseID, releaseIDMirror, pullSecret, ocRelease, ocpVersion, clusterID) + if err == nil { + return release, nil + } + _, isCapacityError := err.(*errorInsufficientCacheCapacity) + if !isCapacityError { + return nil, errors.Wrapf(err, "failed to get installer path for release %s", releaseID) + } + i.log.WithError(err).Errorf("insufficient installer cache capacity for release %s", releaseID) + time.Sleep(i.config.ReleaseFetchRetryInterval) + } + } +} + +func (i *Installers) get(releaseID, releaseIDMirror, pullSecret string, ocRelease oc.Release, ocpVersion string, clusterID strfmt.UUID) (*Release, error) { i.Lock() defer i.Unlock() @@ -106,43 +154,108 @@ func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSe startTime: time.Now(), } - var workdir, binary, path string - var err error - - workdir, binary, path, err = ocRelease.GetReleaseBinaryPath(releaseID, i.cacheDir, ocpVersion) + workdir, binary, path, err := ocRelease.GetReleaseBinaryPath(releaseID, i.config.CacheDir, ocpVersion) if err != nil { return nil, err } if _, err = os.Stat(path); os.IsNotExist(err) { + i.log.Infof("release %s - not found in cache - attempting extraction", releaseID) + // Cache eviction and space checks + evictionAlreadyPerformed := false + for { + if i.config.MaxCapacity == 0 { + i.log.Info("cache eviction is disabled -- moving directly to extraction") + break + } + i.log.Infof("checking for space to extract release %s", releaseID) + // Determine actual space usage, accounting for hardlinks. + var usedBytes uint64 + usedBytes, _, err = i.diskStatsHelper.GetDiskUsage(i.config.CacheDir) + if err != nil { + if os.IsNotExist(err) { + // Installer cache directory will not exist prior to first extraction + i.log.WithError(err).Warnf("skipping capacity check as first extraction will trigger creation of directory") + break + } + return nil, errors.Wrapf(err, "could not determine disk usage information for cache dir %s", i.config.CacheDir) + } + shouldEvict, isBlocked := i.checkEvictionStatus(int64(usedBytes)) + // If we have already been around once, we don't want to 'double' evict + if shouldEvict && !evictionAlreadyPerformed { + i.evict() + } + if !isBlocked { + break + } + if evictionAlreadyPerformed { + // We still don't have enough capacity after eviction, so we need to exit and retry + return nil, &errorInsufficientCacheCapacity{ + Message: fmt.Sprintf("insufficient capacity in %s to store release", i.config.CacheDir), + } + } + evictionAlreadyPerformed = true + } + i.log.Infof("space available for release %s -- starting extraction", releaseID) extractStartTime := time.Now() - //evict older files if necessary - i.evict() - - //extract the binary - _, err = ocRelease.Extract(i.log, releaseID, releaseIDMirror, i.cacheDir, pullSecret, ocpVersion) + _, err = ocRelease.Extract(i.log, releaseID, releaseIDMirror, i.config.CacheDir, pullSecret, ocpVersion) if err != nil { return nil, err } release.extractDuration = time.Since(extractStartTime).Seconds() + i.log.Infof("finished extraction of %s in %d seconds", releaseID, release.extractDuration) } else { + i.log.Infof("fetching release %s from cache", releaseID) release.cached = true - //update the file mtime to signal it was recently used - err = os.Chtimes(path, time.Now(), time.Now()) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("Failed to update release binary %s", path)) - } + + } + + // update the file mtime to signal it was recently used + // do this, no matter where we sourced the binary (cache or otherwise) as it would appear that the binary + // can have a modtime - upon extraction - that is significantly in the past + // this can lead to premature link pruning in some scenarios. + err = os.Chtimes(path, time.Now(), time.Now()) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to update release binary %s", path)) } + // return a new hard link to the binary file // the caller should delete the hard link when // it finishes working with the file - link := filepath.Join(workdir, "ln_"+fmt.Sprint(time.Now().Unix())+ - "_"+binary) - err = os.Link(path, link) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("Failed to create hard link to binary %s", path)) + i.log.Infof("attempting hardlink creation for release %s", releaseID) + for { + link := filepath.Join(workdir, fmt.Sprintf("ln_%s_%s", uuid.NewString(), binary)) + err = os.Link(path, link) + if err == nil { + i.log.Infof("created hardlink %s to release %s at path %s", link, releaseID, path) + release.Path = link + return release, nil + } + if !os.IsExist(err) { + return nil, errors.Wrap(err, fmt.Sprintf("failed to create hard link to binary %s", path)) + } + } +} + +func (i *Installers) shouldEvict(totalUsed int64) bool { + shouldEvict, _ := i.checkEvictionStatus(totalUsed) + return shouldEvict +} + +func (i *Installers) checkEvictionStatus(totalUsed int64) (shouldEvict bool, isBlocked bool) { + if i.config.MaxCapacity == 0 { + // The cache eviction is completely disabled. + return false, false + } + if i.config.MaxCapacity-totalUsed < i.config.MaxReleaseSize { + // We are badly blocked, not enough room for even one release more. + return true, true } - release.Path = link - return release, nil + if (float64(totalUsed) / float64(i.config.MaxCapacity)) >= i.config.InstallerCacheEvictionThreshold { + // We need to evict some items in order to keep cache efficient and within capacity. + return true, false + } + // We have enough space. + return false, false } // Walk through the cacheDir and list the files recursively. @@ -151,11 +264,6 @@ func (i *Installers) Get(ctx context.Context, releaseID, releaseIDMirror, pullSe // // Locking must be done outside evict() to avoid contentions. func (i *Installers) evict() { - //if cache limit is undefined skip eviction - if i.storageCapacity == 0 { - return - } - // store the file paths files := NewPriorityQueue(&fileInfo{}) links := make([]*fileInfo, 0) @@ -183,33 +291,34 @@ func (i *Installers) evict() { return nil } - err := filepath.Walk(i.cacheDir, visit) + err := filepath.Walk(i.config.CacheDir, visit) if err != nil { if !os.IsNotExist(err) { //ignore first invocation where the cacheDir does not exist - i.log.WithError(err).Errorf("release binary eviction failed to inspect directory %s", i.cacheDir) + i.log.WithError(err).Errorf("release binary eviction failed to inspect directory %s", i.config.CacheDir) } return } - //prune the hard links just in case the deletion of resources - //in ignition.go did not succeeded as expected - for idx := 0; idx < len(links); idx++ { - finfo := links[idx] - //Allow a grace period of 5 minutes from the link creation time - //to ensure the link is not being used. - grace := time.Now().Add(DeleteGracePeriod).Unix() - if finfo.info.ModTime().Unix() < grace { - os.Remove(finfo.path) - } - } + // TODO: We might want to consider if we need to do this longer term, in theory every hardlink should be automatically freed. + // For now, moved to a function so that this can be tested in unit tests. + i.pruneExpiredHardLinks(links, linkPruningGracePeriod) - //delete the oldest file if necessary - for totalSize >= int64(float64(i.storageCapacity)*CacheLimitThreshold) { - finfo, _ := files.Pop() - totalSize -= finfo.info.Size() - //remove the file - if err := i.evictFile(finfo.path); err != nil { - i.log.WithError(err).Errorf("failed to evict file %s", finfo.path) + // delete the oldest file if necessary + // prefer files without hardlinks first because + // 1: files without hardlinks are likely to be least recently used anyway + // 2: files without hardlinks will immediately free up storage + queues := i.splitQueueOnHardLinks(files) + for _, q := range queues { + for i.shouldEvict(totalSize) { + if q.Len() == 0 { // If we have cleaned out this queue then break the loop + break + } + finfo, _ := q.Pop() + totalSize -= finfo.info.Size() + //remove the file + if err := i.evictFile(finfo.path); err != nil { + i.log.WithError(err).Errorf("failed to evict file %s", finfo.path) + } } } } @@ -232,3 +341,47 @@ func (i *Installers) evictFile(filePath string) error { } return nil } + +// pruneExpiredHardLinks removes any hardlinks that have been around for too long +// the grace period is used to determine which links should be removed. +func (i *Installers) pruneExpiredHardLinks(links []*fileInfo, gracePeriod time.Duration) { + for idx := 0; idx < len(links); idx++ { + finfo := links[idx] + graceTime := time.Now().Add(-1 * gracePeriod) + grace := graceTime.Unix() + if finfo.info.ModTime().Unix() < grace { + i.log.Infof("Found expired hardlink -- pruning %s", finfo.info.Name()) + i.log.Infof("Mod time %s", finfo.info.ModTime().Format("2006-01-02 15:04:05")) + i.log.Infof("Grace time %s", graceTime.Format("2006-01-02 15:04:05")) + os.Remove(finfo.path) + } + } +} + +// splitQueueOnHardLinks Splits the provided *PriorityQueue[*fileInfo] into two queues +// withoutHardLinks will present a queue of files that do not have associated hardlinks +// withHardLinks will present a queue of files that do have associated hardlinks +// This is to allow us to prioritize deletion, favouring files without hardlinks first as these will have an immediate impact on storage. +func (i *Installers) splitQueueOnHardLinks(in *PriorityQueue[*fileInfo]) []*PriorityQueue[*fileInfo] { + withoutHardLinks := &PriorityQueue[*fileInfo]{} + withHardLinks := &PriorityQueue[*fileInfo]{} + for { + if in.Len() == 0 { + break + } + fi, _ := in.Pop() + stat, ok := fi.info.Sys().(*syscall.Stat_t) + if !ok { + // If we do encounter an error while performing stat - let's fall back to the original queue + // it's not optimal, but we don't break anything. + i.log.Errorf("encountered error while trying to split queues - using original queue") + return []*PriorityQueue[*fileInfo]{in} + } + if stat.Nlink == 0 { + withoutHardLinks.Add(fi) + continue + } + withHardLinks.Add(fi) + } + return []*PriorityQueue[*fileInfo]{withoutHardLinks, withHardLinks} +} diff --git a/internal/installercache/installercache_test.go b/internal/installercache/installercache_test.go index a1d8744911a..00f6718c326 100644 --- a/internal/installercache/installercache_test.go +++ b/internal/installercache/installercache_test.go @@ -2,8 +2,13 @@ package installercache import ( "context" + "fmt" + "io/fs" "os" "path/filepath" + "strings" + "sync" + "syscall" "testing" "time" @@ -13,6 +18,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" eventsapi "github.com/openshift/assisted-service/internal/events/api" + "github.com/openshift/assisted-service/internal/metrics" "github.com/openshift/assisted-service/internal/oc" "github.com/openshift/assisted-service/models" "github.com/sirupsen/logrus" @@ -59,27 +65,35 @@ var _ = Describe("release event", func() { var _ = Describe("installer cache", func() { var ( - ctrl *gomock.Controller - mockRelease *oc.MockRelease - manager *Installers - cacheDir string - eventsHandler *eventsapi.MockHandler - ctx context.Context + ctrl *gomock.Controller + mockRelease *oc.MockRelease + manager *Installers + cacheDir string + eventsHandler *eventsapi.MockHandler + ctx context.Context + diskStatsHelper metrics.DiskStatsHelper ) BeforeEach(func() { - DeleteGracePeriod = 1 * time.Millisecond ctrl = gomock.NewController(GinkgoT()) + diskStatsHelper = metrics.NewOSDiskStatsHelper(logrus.New()) mockRelease = oc.NewMockRelease(ctrl) eventsHandler = eventsapi.NewMockHandler(ctrl) - var err error cacheDir, err = os.MkdirTemp("/tmp", "cacheDir") Expect(err).NotTo(HaveOccurred()) Expect(os.Mkdir(filepath.Join(cacheDir, "quay.io"), 0755)).To(Succeed()) Expect(os.Mkdir(filepath.Join(filepath.Join(cacheDir, "quay.io"), "release-dev"), 0755)).To(Succeed()) - manager = New(cacheDir, 12, eventsHandler, logrus.New()) + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(12), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 1, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).NotTo(HaveOccurred()) ctx = context.TODO() }) @@ -87,26 +101,38 @@ var _ = Describe("installer cache", func() { os.RemoveAll(cacheDir) }) - expectEventSent := func() { + expectEventsSent := func() { eventsHandler.EXPECT().V2AddMetricsEvent( ctx, gomock.Any(), nil, nil, "", models.EventSeverityInfo, metricEventInstallerCacheRelease, gomock.Any(), gomock.Any(), - ).Times(1) + ).AnyTimes() } - testGet := func(releaseID, version string, clusterID strfmt.UUID, expectCached bool) (string, string) { - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") + mockReleaseCalls := func(releaseID string, version string) { + workdir := filepath.Join(manager.config.CacheDir, "quay.io", "release-dev") fname := filepath.Join(workdir, releaseID) mockRelease.EXPECT().GetReleaseBinaryPath( gomock.Any(), gomock.Any(), version). - Return(workdir, releaseID, fname, nil) + Return(workdir, releaseID, fname, nil).AnyTimes() + mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). + gomock.Any(), manager.config.CacheDir, gomock.Any(), version). DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) + dir, _ := filepath.Split(fname) + err := os.MkdirAll(dir, 0700) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(10 * time.Millisecond) // Add a small amount of latency to simulate extraction + err = os.WriteFile(fname, []byte("abcde"), 0600) return "", err - }) + }).AnyTimes() + } + + testGet := func(releaseID, version string, clusterID strfmt.UUID, expectCached bool) (string, string) { + workdir := filepath.Join(manager.config.CacheDir, "quay.io", "release-dev") + fname := filepath.Join(workdir, releaseID) + mockReleaseCalls(releaseID, version) + expectEventsSent() l, err := manager.Get(ctx, releaseID, "mirror", "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -119,19 +145,162 @@ var _ = Describe("installer cache", func() { } Expect(l.Path).ShouldNot(BeEmpty()) - time.Sleep(1 * time.Second) + time.Sleep(10 * time.Millisecond) Expect(l.startTime.Before(time.Now())).To(BeTrue()) + l.Cleanup(context.TODO()) return fname, l.Path } - It("evicts the oldest file", func() { + + type launchParams struct { + releaseID string + version string + clusterID strfmt.UUID + } + + // runParallelTest launches a batch of fetches from the installer cache in order to simulate multiple requests to the same node + // releases are automatically cleaned up as they are gathered + // returns the first error encountered or nil if no error encountered. + runParallelTest := func(params []launchParams) error { + var wg sync.WaitGroup + var firstError error + var errMutex sync.Mutex + for _, param := range params { + mockReleaseCalls(param.releaseID, param.version) + wg.Add(1) + go func(param launchParams) { + defer wg.Done() + release, err := manager.Get(ctx, param.releaseID, "mirror", "pull-secret", mockRelease, param.version, param.clusterID) + if err != nil { + errMutex.Lock() + if firstError == nil { + firstError = err + } + errMutex.Unlock() + return + } + // Simulate calls to release cleanup to ensure that we can test capacity limits + // Add a very small delay so that we can simulate usage of the release prior to release. + if release != nil { + time.Sleep(2 * time.Millisecond) + release.Cleanup(context.TODO()) + } + }(param) + } + wg.Wait() + return firstError + } + + getUsedBytesForDirectory := func(directory string) uint64 { + var totalBytes uint64 + seenInodes := make(map[uint64]bool) + err := filepath.Walk(directory, func(path string, fileInfo os.FileInfo, err error) error { + if err != nil { + if _, ok := err.(*os.PathError); ok { + // Something deleted the file before we could walk it + // count this as zero bytes + return nil + } + return err + } + stat, ok := fileInfo.Sys().(*syscall.Stat_t) + Expect(ok).To(BeTrue()) + if !fileInfo.IsDir() && !seenInodes[stat.Ino] { + totalBytes += uint64(fileInfo.Size()) + seenInodes[stat.Ino] = true + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + return totalBytes + } + + It("Should raise error on construction if max release size is larger than cache and cache is enabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(10), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.MaxReleaseSize (10 bytes) must not be greater than config.MaxCapacity (5 bytes)")) + }) + + It("Should raise error on construction if max release size is zero and cache is enabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.MaxReleaseSize (0 bytes) must not be zero")) + }) + + It("Should not raise error on construction if max release size is larger than cache and cache eviction is disabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(10), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Should not raise error on construction if max release size is zero and cache eviction is disabled", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Should not accept InstallerCacheEvictionThreshold of zero", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(0), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("config.InstallerCacheEvictionThreshold must not be zero")) + }) + + It("when cache limit is zero - eviction is skipped", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(0), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) clusterId := strfmt.UUID(uuid.New().String()) r1, l1 := testGet("4.8", "4.8.0", clusterId, false) r2, l2 := testGet("4.9", "4.9.0", clusterId, false) r3, l3 := testGet("4.10", "4.10.0", clusterId, false) - By("verify that the oldest file was deleted") - _, err := os.Stat(r1) - Expect(os.IsNotExist(err)).To(BeTrue()) + By("verify that the no file was deleted") + _, err = os.Stat(r1) + Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r2) Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r3) @@ -161,23 +330,34 @@ var _ = Describe("installer cache", func() { Expect(os.IsNotExist(err)).To(BeTrue()) _, err = os.Stat(r3) Expect(os.IsNotExist(err)).To(BeFalse()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) - It("when cache limit is not set eviction is skipped", func() { - manager.storageCapacity = 0 + It("evicts the oldest file", func() { clusterId := strfmt.UUID(uuid.New().String()) - r1, _ := testGet("4.8", "4.8.0", clusterId, false) - r2, _ := testGet("4.9", "4.9.0", clusterId, false) - r3, _ := testGet("4.10", "4.10.0", clusterId, false) + r1, l1 := testGet("4.8", "4.8.0", clusterId, false) + r2, l2 := testGet("4.9", "4.9.0", clusterId, false) + r3, l3 := testGet("4.10", "4.10.0", clusterId, false) - By("verify that the no file was deleted") + By("verify that the oldest file was deleted") _, err := os.Stat(r1) - Expect(os.IsNotExist(err)).To(BeFalse()) + Expect(os.IsNotExist(err)).To(BeTrue()) _, err = os.Stat(r2) Expect(os.IsNotExist(err)).To(BeFalse()) _, err = os.Stat(r3) Expect(os.IsNotExist(err)).To(BeFalse()) + By("verify that the links were purged") + manager.evict() + _, err = os.Stat(l1) + Expect(os.IsNotExist(err)).To(BeTrue()) + _, err = os.Stat(l2) + Expect(os.IsNotExist(err)).To(BeTrue()) + _, err = os.Stat(l3) + Expect(os.IsNotExist(err)).To(BeTrue()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) It("extracts from the mirror", func() { @@ -185,18 +365,7 @@ var _ = Describe("installer cache", func() { releaseMirrorID := "4.10-mirror" clusterID := strfmt.UUID(uuid.New().String()) version := "4.10.0" - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") - fname := filepath.Join(workdir, releaseID) - - mockRelease.EXPECT().GetReleaseBinaryPath( - releaseID, gomock.Any(), version). - Return(workdir, releaseID, fname, nil) - mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). - DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) - return "", err - }) + mockReleaseCalls(releaseID, version) l, err := manager.Get(ctx, releaseID, releaseMirrorID, "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -205,8 +374,10 @@ var _ = Describe("installer cache", func() { Expect(l.cached).To(BeFalse()) Expect(l.extractDuration).ShouldNot(BeZero()) Expect(l.Path).ShouldNot(BeEmpty()) - expectEventSent() + expectEventsSent() l.Cleanup(ctx) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) }) It("extracts without a mirror", func() { @@ -214,18 +385,7 @@ var _ = Describe("installer cache", func() { releaseMirrorID := "" version := "4.10.0" clusterID := strfmt.UUID(uuid.NewString()) - workdir := filepath.Join(cacheDir, "quay.io", "release-dev") - fname := filepath.Join(workdir, releaseID) - - mockRelease.EXPECT().GetReleaseBinaryPath( - releaseID, gomock.Any(), version). - Return(workdir, releaseID, fname, nil) - mockRelease.EXPECT().Extract(gomock.Any(), releaseID, - gomock.Any(), cacheDir, gomock.Any(), version). - DoAndReturn(func(log logrus.FieldLogger, releaseImage string, releaseImageMirror string, cacheDir string, pullSecret string, version string) (string, error) { - err := os.WriteFile(fname, []byte("abcde"), 0600) - return "", err - }) + mockReleaseCalls(releaseID, version) l, err := manager.Get(ctx, releaseID, releaseMirrorID, "pull-secret", mockRelease, version, clusterID) Expect(err).ShouldNot(HaveOccurred()) Expect(l.releaseID).To(Equal(releaseID)) @@ -234,8 +394,156 @@ var _ = Describe("installer cache", func() { Expect(l.cached).To(BeFalse()) Expect(l.extractDuration).ShouldNot(BeZero()) Expect(l.Path).ShouldNot(BeEmpty()) - expectEventSent() + expectEventsSent() l.Cleanup(ctx) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should correctly handle multiple requests for the same release at the same time", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(10), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + params := []launchParams{} + for i := 0; i < 10; i++ { + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + } + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should consistently handle multiple requests for different releases at the same time", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(25), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < 10; i++ { + params := []launchParams{} + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.18.11-x86_64", version: "4.18.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.19.11-x86_64", version: "4.19.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.20.11-x86_64", version: "4.20.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.21.11-x86_64", version: "4.21.11", clusterID: strfmt.UUID(uuid.NewString())}) + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + } + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + }) + + It("should maintain cache within threshold", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(500), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < 85; i++ { // Deliberately generate a number of requests that will be above the percentage + params := []launchParams{} + params = append(params, launchParams{releaseID: fmt.Sprintf("release-%d", i), version: fmt.Sprintf("release-%d.0.1", i), clusterID: strfmt.UUID(uuid.NewString())}) + expectEventsSent() + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + } + // Ensure that we hold within the correct percentage of the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", float64(manager.config.MaxCapacity)*installerCacheConfig.InstallerCacheEvictionThreshold)) + }) + + It("should stay within the cache limit where there is only sufficient space for one release", func() { + var err error + installerCacheConfig := InstallerCacheConfig{ + CacheDir: cacheDir, + MaxCapacity: int64(5), + MaxReleaseSize: int64(5), + ReleaseFetchRetryInterval: 1 * time.Millisecond, + InstallerCacheEvictionThreshold: 0.8, + } + manager, err = New(installerCacheConfig, eventsHandler, diskStatsHelper, logrus.New()) + Expect(err).ToNot(HaveOccurred()) + expectEventsSent() + // Force a scenario where one of the requests will fail because of a pending release cleanup + params := []launchParams{} + params = append(params, launchParams{releaseID: "4.17.11-x86_64", version: "4.17.11", clusterID: strfmt.UUID(uuid.NewString())}) + params = append(params, launchParams{releaseID: "4.18.11-x86_64", version: "4.18.11", clusterID: strfmt.UUID(uuid.NewString())}) + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + // Now measure disk usage, we should be under the cache size + Expect(getUsedBytesForDirectory(manager.config.CacheDir)).To(BeNumerically("<=", manager.config.MaxCapacity)) + + // After we have collected all results from the last parallel test, the cleanup should have occurred as part of the test. + // Now assert that a retry would work, there should be enough space for another release + // use a brand new release ID to prove we are not hitting cache here. + params = []launchParams{} + params = append(params, launchParams{releaseID: "4.19.11-x86_64", version: "4.19.11", clusterID: strfmt.UUID(uuid.NewString())}) + err = runParallelTest(params) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should remove expired links while leaving non expired links intact", func() { + + numberOfLinks := 10 + numberOfExpiredLinks := 5 + + directory, err := os.MkdirTemp("", "testPruneExpiredHardLinks") + Expect(err).ToNot(HaveOccurred()) + + defer os.RemoveAll(directory) + + for i := 0; i < numberOfLinks; i++ { + var someFile *os.File + someFile, err = os.CreateTemp(directory, "somefile") + Expect(err).ToNot(HaveOccurred()) + linkPath := filepath.Join(directory, fmt.Sprintf("ln_%s", uuid.NewString())) + err = os.Link(someFile.Name(), linkPath) + Expect(err).ToNot(HaveOccurred()) + if i > numberOfExpiredLinks-1 { + err = os.Chtimes(linkPath, time.Now().Add(-10*time.Minute), time.Now().Add(-10*time.Minute)) + Expect(err).ToNot(HaveOccurred()) + } + } + + links := make([]*fileInfo, 0) + err = filepath.Walk(directory, func(path string, info fs.FileInfo, err error) error { + if strings.HasPrefix(info.Name(), "ln_") { + links = append(links, &fileInfo{path, info}) + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(links)).To(Equal(10)) + + manager.pruneExpiredHardLinks(links, linkPruningGracePeriod) + + linkCount := 0 + err = filepath.Walk(directory, func(path string, info fs.FileInfo, err error) error { + if strings.HasPrefix(info.Name(), "ln_") { + linkCount++ + } + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(linkCount).To(Equal(numberOfLinks - numberOfExpiredLinks)) }) }) diff --git a/internal/metrics/disk_stats_helper.go b/internal/metrics/disk_stats_helper.go index 148a6bc65ce..bb3c9223407 100644 --- a/internal/metrics/disk_stats_helper.go +++ b/internal/metrics/disk_stats_helper.go @@ -7,6 +7,7 @@ import ( "syscall" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) @@ -15,11 +16,12 @@ type DiskStatsHelper interface { } type OSDiskStatsHelper struct { + log *logrus.Logger } //go:generate mockgen -source=disk_stats_helper.go -package=metrics -destination=mock_disk_stats_helper.go -func NewOSDiskStatsHelper() *OSDiskStatsHelper { - return &OSDiskStatsHelper{} +func NewOSDiskStatsHelper(log *logrus.Logger) *OSDiskStatsHelper { + return &OSDiskStatsHelper{log: log} } func (c *OSDiskStatsHelper) getUsedBytesInDirectory(directory string) (uint64, error) { @@ -28,6 +30,14 @@ func (c *OSDiskStatsHelper) getUsedBytesInDirectory(directory string) (uint64, e seenInodes := make(map[uint64]bool) err := filepath.Walk(directory, func(path string, fileInfo os.FileInfo, err error) error { if err != nil { + if _, ok := err.(*os.PathError); ok { + // it's possible to encounter a path error if a file has been deleted since we obtained the file list + // in cases where deletion occurs outside of Mutex protection, such as when we clean up an installercache.Release, this can be expected + // we can safely count this as 'zero sized' in these cases + c.log.WithError(err).Warnf("could not stat file %s because it was deleted before we could walk it - assuming zero size", path) + // return nil to ignore the error. + return nil + } return err } // We need to ensure that the size check is based on inodes and not just the sizes gleaned from files. diff --git a/internal/metrics/metricsManager_test.go b/internal/metrics/metricsManager_test.go index 97370938271..085cf2df932 100644 --- a/internal/metrics/metricsManager_test.go +++ b/internal/metrics/metricsManager_test.go @@ -47,7 +47,7 @@ var _ = DescribeTable( metricsManagerConfig := &MetricsManagerConfig{ DirectoryUsageMonitorConfig: DirectoryUsageMonitorConfig{ Directories: []string{"/data"}}} - manager := NewMetricsManager(server.Registry(), handler, NewOSDiskStatsHelper(), metricsManagerConfig, logrus.New()) + manager := NewMetricsManager(server.Registry(), handler, NewOSDiskStatsHelper(logrus.New()), metricsManagerConfig, logrus.New()) manager.ReportHostInstallationMetrics( ctx, "4.10.18", diff --git a/internal/metrics/os_disk_stats_helper_test.go b/internal/metrics/os_disk_stats_helper_test.go index 69ab42f6fb4..823c45551c0 100644 --- a/internal/metrics/os_disk_stats_helper_test.go +++ b/internal/metrics/os_disk_stats_helper_test.go @@ -7,13 +7,14 @@ import ( "github.com/google/uuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" ) var _ = Describe("OS disk stats helper", func() { var ( tempDir string - diskStatsHelper *OSDiskStatsHelper = NewOSDiskStatsHelper() + diskStatsHelper *OSDiskStatsHelper = NewOSDiskStatsHelper(logrus.New()) ) BeforeEach(func() { diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index 49b963dbe19..a08c2fb89f3 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -8,6 +8,7 @@ import ( "github.com/openshift/assisted-service/internal/common" eventsapi "github.com/openshift/assisted-service/internal/events/api" "github.com/openshift/assisted-service/internal/ignition" + "github.com/openshift/assisted-service/internal/installercache" manifestsapi "github.com/openshift/assisted-service/internal/manifests/api" "github.com/openshift/assisted-service/internal/provider/registry" logutil "github.com/openshift/assisted-service/pkg/log" @@ -21,11 +22,10 @@ type InstallConfigGenerator interface { } type Config struct { - ServiceCACertPath string `envconfig:"SERVICE_CA_CERT_PATH" default:""` - ReleaseImageMirror string `envconfig:"OPENSHIFT_INSTALL_RELEASE_IMAGE_MIRROR" default:""` - DummyIgnition bool `envconfig:"DUMMY_IGNITION"` - InstallInvoker string `envconfig:"INSTALL_INVOKER" default:"assisted-installer"` - InstallerCacheCapacity int64 `envconfig:"INSTALLER_CACHE_CAPACITY"` + ServiceCACertPath string `envconfig:"SERVICE_CA_CERT_PATH" default:""` + ReleaseImageMirror string `envconfig:"OPENSHIFT_INSTALL_RELEASE_IMAGE_MIRROR" default:""` + DummyIgnition bool `envconfig:"DUMMY_IGNITION"` + InstallInvoker string `envconfig:"INSTALL_INVOKER" default:"assisted-installer"` // Directory containing pre-generated TLS certs/keys for the ephemeral installer ClusterTLSCertOverrideDir string `envconfig:"EPHEMERAL_INSTALLER_CLUSTER_TLS_CERTS_OVERRIDE_DIR" default:""` @@ -39,18 +39,29 @@ type installGenerator struct { providerRegistry registry.ProviderRegistry manifestApi manifestsapi.ManifestsAPI eventsHandler eventsapi.Handler + installerCache *installercache.Installers } -func New(log logrus.FieldLogger, s3Client s3wrapper.API, cfg Config, workDir string, - providerRegistry registry.ProviderRegistry, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler) *installGenerator { +type InstallGeneratorDirectoryConfig struct { + WorkDir string +} + +// GetWorkingDirectory determines the working directory path for the generator. +func (c *InstallGeneratorDirectoryConfig) GetWorkingDirectory() string { + return filepath.Join(c.WorkDir, "install-config-generate") +} + +func New(directoryConfig InstallGeneratorDirectoryConfig, log logrus.FieldLogger, s3Client s3wrapper.API, cfg Config, + providerRegistry registry.ProviderRegistry, manifestApi manifestsapi.ManifestsAPI, eventsHandler eventsapi.Handler, installerCache *installercache.Installers) *installGenerator { return &installGenerator{ Config: cfg, log: log, s3Client: s3Client, - workDir: filepath.Join(workDir, "install-config-generate"), + workDir: directoryConfig.GetWorkingDirectory(), providerRegistry: providerRegistry, manifestApi: manifestApi, eventsHandler: eventsHandler, + installerCache: installerCache, } } @@ -71,15 +82,13 @@ func (k *installGenerator) GenerateInstallConfig(ctx context.Context, cluster co } }() - installerCacheDir := filepath.Join(k.workDir, "installercache") - // runs openshift-install to generate ignition files, then modifies them as necessary var generator ignition.Generator if k.Config.DummyIgnition { generator = ignition.NewDummyGenerator(clusterWorkDir, &cluster, k.s3Client, log) } else { - generator = ignition.NewGenerator(clusterWorkDir, installerCacheDir, &cluster, releaseImage, k.Config.ReleaseImageMirror, - k.Config.ServiceCACertPath, k.Config.InstallInvoker, k.s3Client, log, k.providerRegistry, installerReleaseImageOverride, k.Config.ClusterTLSCertOverrideDir, k.InstallerCacheCapacity, k.manifestApi, k.eventsHandler) + generator = ignition.NewGenerator(clusterWorkDir, &cluster, releaseImage, k.Config.ReleaseImageMirror, + k.Config.ServiceCACertPath, k.Config.InstallInvoker, k.s3Client, log, k.providerRegistry, installerReleaseImageOverride, k.Config.ClusterTLSCertOverrideDir, k.manifestApi, k.eventsHandler, k.installerCache) } err = generator.Generate(ctx, cfg) if err != nil {