Skip to content

Commit

Permalink
Merge pull request #1895 from tisnik/mutiple-storage-types
Browse files Browse the repository at this point in the history
Mutiple storages in Aggregator
  • Loading branch information
tisnik authored Nov 30, 2023
2 parents 2d58df8 + 181596a commit e8da9d9
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 22 deletions.
45 changes: 32 additions & 13 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,44 @@ func fillInInfoParams(params map[string]string) {

// createStorage function initializes connection to preconfigured storage,
// usually SQLite, PostgreSQL, or AWS RDS.
func createStorage() (storage.OCPRecommendationsStorage, error) {
func createStorage() (storage.OCPRecommendationsStorage, storage.DVORecommendationsStorage, error) {
storageCfg := conf.GetOCPRecommendationsStorageConfiguration()
redisCfg := conf.GetRedisConfiguration()
// fill-in the missing sub-structure to have the whole Storage
// configuration represented as one data structure
storageCfg.RedisConfiguration = redisCfg

var ocpStorage storage.OCPRecommendationsStorage
var dvoStorage storage.DVORecommendationsStorage
var err error

log.Info().Str("type", storageCfg.Type).Msg("Storage type")

// try to initialize connection to storage
dbStorage, err := storage.NewOCPRecommendationsStorage(storageCfg)
if err != nil {
log.Error().Err(err).Msg("storage.New")
return nil, err
backend := conf.GetStorageBackendConfiguration().Use
switch backend {
case types.OCPRecommendationsStorage:
ocpStorage, err = storage.NewOCPRecommendationsStorage(storageCfg)
if err != nil {
log.Error().Err(err).Msg("storage.NewOCPRecommendationsStorage")
return nil, nil, err
}
case types.DVORecommendationsStorage:
dvoStorage, err = storage.NewDVORecommendationsStorage(storageCfg)
if err != nil {
log.Error().Err(err).Msg("storage.NewDVORecommendationsStorage")
return nil, nil, err
}
default:
return nil, nil, fmt.Errorf("Unknown storage backend %s", backend)
}

return dbStorage, nil
return ocpStorage, dvoStorage, nil
}

// closeStorage function closes specified DBStorage with proper error checking
// whether the close operation was successful or not.
func closeStorage(storage storage.OCPRecommendationsStorage) {
func closeStorage(storage storage.Storage) {
err := storage.Close()
if err != nil {
// TODO: error state might be returned from this function
Expand Down Expand Up @@ -174,27 +190,29 @@ func prepareDBMigrations(dbStorage storage.OCPRecommendationsStorage) int {
// prepareDB function opens a connection to database and loads all available
// rule content into it.
func prepareDB() int {
dbStorage, err := createStorage()
// TODO: when migrations for DVO will be available, update the code below

ocpRecommendationsStorage, _, err := createStorage()
if err != nil {
log.Error().Err(err).Msg("Error creating storage")
return ExitStatusPrepareDbError
}
defer closeStorage(dbStorage)
defer closeStorage(ocpRecommendationsStorage)

// Ensure that the DB is at the latest migration version.
if exitCode := prepareDBMigrations(dbStorage); exitCode != ExitStatusOK {
if exitCode := prepareDBMigrations(ocpRecommendationsStorage); exitCode != ExitStatusOK {
return exitCode
}

// Initialize the database.
err = dbStorage.Init()
err = ocpRecommendationsStorage.Init()
if err != nil {
log.Error().Err(err).Msg("DB initialization error")
return ExitStatusPrepareDbError
}

// temporarily print some information from DB because of limited access to DB
dbStorage.PrintRuleDisableDebugInfo()
ocpRecommendationsStorage.PrintRuleDisableDebugInfo()

return ExitStatusOK
}
Expand Down Expand Up @@ -357,7 +375,8 @@ func printEnv() int {
// migrations. Non-OK exit code is returned as the last return value in case
// of an error. Otherwise, database and connection pointers are returned.
func getDBForMigrations() (storage.OCPRecommendationsStorage, *sql.DB, int) {
db, err := createStorage()
// use OCP recommendations storage only, unless migrations will be available for other storage(s) too
db, _, err := createStorage()
if err != nil {
log.Error().Err(err).Msg("Unable to prepare DB for migrations")
return nil, nil, ExitStatusPrepareDbError
Expand Down
4 changes: 2 additions & 2 deletions aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestCreateStorage(t *testing.T) {
os.Clearenv()
mustLoadConfiguration("tests/config1")

_, err := main.CreateStorage()
_, _, err := main.CreateStorage()
helpers.FailOnError(t, err)
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCreateStorage_BadDriver(t *testing.T) {
"INSIGHTS_RESULTS_AGGREGATOR__OCP_RECOMMENDATIONS_STORAGE__SQLITE_DATASOURCE": "/non/existing/path",
})

_, err := main.CreateStorage()
_, _, err := main.CreateStorage()
assert.EqualError(t, err, "driver non-existing-driver is not supported")
}

Expand Down
9 changes: 6 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ var (
func startConsumer(brokerConf broker.Configuration) error {
defer finishConsumerInstanceInitialization()

dbStorage, err := createStorage()
// right now just the OCP recommendation storage is handled by consumer
ocpRecommendationsStorage, _, err := createStorage()
if err != nil {
return err
}

defer closeStorage(dbStorage)
defer closeStorage(ocpRecommendationsStorage)

consumerInstance, err = consumer.NewOCPRulesConsumer(brokerConf, dbStorage)
// when DVO consumer will be made, it will need to use DVO storage
// (see line that calls createStorage())
consumerInstance, err = consumer.NewOCPRulesConsumer(brokerConf, ocpRecommendationsStorage)
if err != nil {
log.Error().Err(err).Msg("Broker initialization error")
return err
Expand Down
9 changes: 5 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ var (
func startServer() error {
defer finishServerInstanceInitialization()

dbStorage, err := createStorage()
// right now, just the OCP recommendations storage are handled properly
ocpRecommendationsStorage, _, err := createStorage()
if err != nil {
return err
}
defer closeStorage(dbStorage)
defer closeStorage(ocpRecommendationsStorage)

serverCfg := conf.GetServerConfiguration()

serverInstance = server.New(serverCfg, dbStorage)
serverInstance = server.New(serverCfg, ocpRecommendationsStorage)

// fill-in additional info used by /info endpoint handler
fillInInfoParams(serverInstance.InfoParams)
Expand All @@ -54,7 +55,7 @@ func startServer() error {
if conf.GetOCPRecommendationsStorageConfiguration().Type == types.SQLStorage {
// migration and DB versioning is now supported for SQL
// databases only
currentVersion, err := migration.GetDBVersion(dbStorage.GetConnection())
currentVersion, err := migration.GetDBVersion(ocpRecommendationsStorage.GetConnection())
if err != nil {
const msg = "Unable to retrieve DB migration version"
log.Error().Err(err).Msg(msg)
Expand Down
28 changes: 28 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Copyright © 2023 Red Hat, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
*/

package storage

import "database/sql"

// PostgreSQL database driver
// SQLite database driver

// Storage represents an interface to almost any database or storage system
type Storage interface {
Init() error
Close() error
GetConnection() *sql.DB
}
23 changes: 23 additions & 0 deletions types/storage_backends.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 Red Hat, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package types

const (
// OCPRecommendationsStorage represents OCP recommendations database schema
OCPRecommendationsStorage = "ocp_recommendations"

// DVORecommendationsStorage represents DVO recommendations database schema
DVORecommendationsStorage = "dvo_recommendations"
)

0 comments on commit e8da9d9

Please sign in to comment.