Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provider-SQL reduce statement and connection overhead #197

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "build"]
path = build
url = https://github.com/upbound/build
url = https://github.com/crossplane/build
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ dev: $(KIND) $(KUBECTL)
@$(KIND) create cluster --name=$(PROJECT_NAME)-dev
@$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev
@$(INFO) Installing Crossplane CRDs
@$(KUBECTL) apply -k https://github.com/crossplane/crossplane//cluster?ref=master
@$(KUBECTL) apply -k https://github.com/crossplane/crossplane//cluster?ref=release-1.16 --server-side
@$(INFO) Installing Provider SQL CRDs
@$(KUBECTL) apply -R -f package/crds
@$(INFO) Starting Provider SQL controllers
Expand Down
2 changes: 2 additions & 0 deletions apis/postgresql/v1alpha1/role_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type RoleConfigurationParameter struct {

// A RoleObservation represents the observed state of a PostgreSQL role.
type RoleObservation struct {
// ConnectionLimit represents the applied connectionlimit
ConnectionLimit *int32 `json:"connectionLimit,omitempty"`
// PrivilegesAsClauses represents the applied privileges state, taking into account
// any defaults applied by Postgres, and expressed as a list of ROLE PRIVILEGE clauses.
PrivilegesAsClauses []string `json:"privilegesAsClauses,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions apis/postgresql/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions package/crds/postgresql.sql.crossplane.io_roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ spec:
type: string
type: object
type: array
connectionLimit:
description: ConnectionLimit represents the applied connectionlimit
format: int32
type: integer
privilegesAsClauses:
description: |-
PrivilegesAsClauses represents the applied privileges state, taking into account
Expand Down
71 changes: 59 additions & 12 deletions pkg/clients/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"net/url"
"time"

"github.com/crossplane-contrib/provider-sql/pkg/clients/xsql"
"github.com/lib/pq"
Expand All @@ -20,6 +21,8 @@ const (
)

type postgresDB struct {
db *sql.DB
err error
dsn string
endpoint string
port string
Expand All @@ -38,14 +41,46 @@ func New(creds map[string][]byte, database, sslmode string) xsql.DB {
password := string(creds[xpv1.ResourceCredentialsSecretPasswordKey])
dsn := DSN(username, password, endpoint, port, database, sslmode)

db, err := openDB(dsn, true)

return postgresDB{
db: db,
err: err,
dsn: dsn,
endpoint: endpoint,
port: port,
sslmode: sslmode,
}
}

// openDB returns a new database connection
func openDB(dsn string, setLimits bool) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}

// Since we are now using connection pooling, establish some sensible defaults for connections
// Ideally these parameters would be set in the config section for the provider, but that
// can be deferred to a later time.
if setLimits {
db.SetMaxOpenConns(5)
db.SetMaxIdleConns(2)
db.SetConnMaxIdleTime(2 * time.Minute)
db.SetConnMaxLifetime(10 * time.Minute)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = db.PingContext(ctx)
if err != nil {
return nil, err
}

return db, nil
}

// DSN returns the DSN URL
func DSN(username, password, endpoint, port, database, sslmode string) string {
// Use net/url UserPassword to encode the username and password
Expand All @@ -63,20 +98,23 @@ func DSN(username, password, endpoint, port, database, sslmode string) string {
// ExecTx executes an array of queries, committing if all are successful and
// rolling back immediately on failure.
func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}

tx, err := d.BeginTx(ctx, nil)
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}

// Rollback or Commit based on error state. Defer close in defer to make
// sure the connection is always closed.
defer func() {
defer d.Close() //nolint:errcheck
if err != nil {
tx.Rollback() //nolint:errcheck
return
Expand All @@ -94,37 +132,46 @@ func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {

// Exec the supplied query.
func (c postgresDB) Exec(ctx context.Context, q xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer d.Close() //nolint:errcheck

_, err = d.ExecContext(ctx, q.String, q.Parameters...)
_, err = c.db.ExecContext(ctx, q.String, q.Parameters...)
return err
}

// Query the supplied query.
func (c postgresDB) Query(ctx context.Context, q xsql.Query) (*sql.Rows, error) {
d, err := sql.Open("postgres", c.dsn)
if c.err != nil || c.db == nil {
return nil, c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return nil, err
}
defer d.Close() //nolint:errcheck

rows, err := d.QueryContext(ctx, q.String, q.Parameters...)
rows, err := c.db.QueryContext(ctx, q.String, q.Parameters...)
return rows, err
}

// Scan the results of the supplied query into the supplied destination.
func (c postgresDB) Scan(ctx context.Context, q xsql.Query, dest ...interface{}) error {
db, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer db.Close() //nolint:errcheck

return db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
return c.db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
}

// GetConnectionDetails returns the connection details for a user of this DB
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/postgresql/role/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
Replication: new(bool),
BypassRls: new(bool),
},
ConnectionLimit: new(int32),
}

query := "SELECT " +
Expand Down Expand Up @@ -252,6 +253,8 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}
cr.Status.AtProvider.ConfigurationParameters = observed.ConfigurationParameters

cr.Status.AtProvider.ConnectionLimit = observed.ConnectionLimit

_, pwdChanged, err := c.getPassword(ctx, cr)
if err != nil {
return managed.ExternalObservation{}, err
Expand Down Expand Up @@ -398,10 +401,11 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
// Update state to reflect the current configuration parameters
cr.Status.AtProvider.ConfigurationParameters = cr.Spec.ForProvider.ConfigurationParameters
}
cl := cr.Spec.ForProvider.ConnectionLimit
if cl != nil {
newCl := cr.Spec.ForProvider.ConnectionLimit
currCl := cr.Status.AtProvider.ConnectionLimit
if (newCl != nil && currCl != nil) && (int64(*currCl) != int64(*newCl)) {
if err := c.db.Exec(ctx, xsql.Query{
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*cl)),
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*newCl)),
}); err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errUpdateRole)
}
Expand Down