Skip to content

Commit

Permalink
feat: Update Firebolt connector to use the new Firebolt backend
Browse files Browse the repository at this point in the history
  • Loading branch information
ptiurin authored and mdibaiee committed Nov 7, 2024
1 parent ba0f13c commit 539415e
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 277 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ jobs:
- name: Materialization connector ${{ matrix.connector }} integration tests
if: |
contains(fromJson('[
"materialize-firebolt",
"materialize-dynamodb",
"materialize-elasticsearch",
"materialize-google-sheets",
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ require (
github.com/apache/arrow/go/v16 v16.0.0 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/astaxie/beego v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 // indirect
Expand All @@ -141,6 +142,7 @@ require (
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/firebolt-db/firebolt-go-sdk v1.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down Expand Up @@ -178,6 +180,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
Expand Down Expand Up @@ -216,6 +219,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
Expand Down
89 changes: 89 additions & 0 deletions go.sum

Large diffs are not rendered by default.

40 changes: 22 additions & 18 deletions materialize-firebolt/.snapshots/TestDriverSpec
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,76 @@
"$schema": "http://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/materialize-firebolt/config",
"properties": {
"engine_url": {
"type": "string",
"title": "Engine URL",
"description": "Engine URL of the Firebolt database, in the format: `\u003cengine-name\u003e.\u003corganisation\u003e.\u003cregion\u003e.app.firebolt.io`.",
"examples": [
"engine-name.organisation.region.app.firebolt.io"
],
"order": 0
},
"username": {
"type": "string",
"title": "Username",
"description": "Firebolt username.",
"order": 1
"order": 0
},
"password": {
"type": "string",
"title": "Password",
"description": "Firebolt password.",
"order": 2,
"order": 1,
"secret": true
},
"account_name": {
"type": "string",
"title": "Account Name",
"description": "Firebolt account within your organization.",
"order": 2
},
"engine_name": {
"type": "string",
"title": "Engine Name",
"description": "Engine Name to process your queries.",
"order": 3
},
"database": {
"type": "string",
"title": "Database",
"description": "Name of the Firebolt database.",
"order": 3
"order": 4
},
"s3_bucket": {
"type": "string",
"title": "S3 Bucket",
"description": "Name of S3 bucket where the intermediate files for external table will be stored.",
"order": 4
"order": 5
},
"s3_prefix": {
"type": "string",
"title": "S3 Prefix",
"description": "A prefix for files stored in the bucket. Example: my-prefix.",
"default": "/",
"order": 5
"order": 6
},
"aws_key_id": {
"type": "string",
"title": "AWS Key ID",
"description": "AWS Key ID for accessing the S3 bucket.",
"order": 6
"order": 7
},
"aws_secret_key": {
"type": "string",
"title": "AWS Secret Key",
"description": "AWS Secret Key for accessing the S3 bucket.",
"order": 7,
"order": 8,
"secret": true
},
"aws_region": {
"type": "string",
"title": "AWS Region",
"description": "AWS Region the bucket is in.",
"order": 8
"order": 9
}
},
"type": "object",
"required": [
"engine_url",
"username",
"password",
"account_name",
"engine_name",
"database",
"s3_bucket"
],
Expand Down
48 changes: 27 additions & 21 deletions materialize-firebolt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,33 @@ import (
)

type config struct {
EngineURL string `json:"engine_url" jsonschema:"title=Engine URL,example=engine-name.organisation.region.app.firebolt.io" jsonschema_extras:"order=0"`
Username string `json:"username" jsonschema:"title=Username" jsonschema_extras:"order=1"`
Password string `json:"password" jsonschema:"title=Password" jsonschema_extras:"secret=true,order=2"`
Database string `json:"database" jsonschema:"title=Database" jsonschema_extras:"order=3"`
S3Bucket string `json:"s3_bucket" jsonschema:"title=S3 Bucket" jsonschema_extras:"order=4"`
S3Prefix string `json:"s3_prefix,omitempty" jsonschema:"title=S3 Prefix,default=/" jsonschema_extras:"order=5"`
AWSKeyId string `json:"aws_key_id,omitempty" jsonschema:"title=AWS Key ID" jsonschema_extras:"order=6"`
AWSSecretKey string `json:"aws_secret_key,omitempty" jsonschema:"title=AWS Secret Key" jsonschema_extras:"secret=true,order=7"`
AWSRegion string `json:"aws_region,omitempty" jsonschema:"title=AWS Region" jsonschema_extras:"order=8"`
ClientId string `json:"client_id" jsonschema:"title=Client ID" jsonschema_extras:"order=0"`
ClientSecret string `json:"client_secret" jsonschema:"title=Client Secret" jsonschema_extras:"secret=true,order=1"`
AccountName string `json:"account_name" jsonschema:"title=Account Name" jsonschema_extras:"order=2"`
EngineName string `json:"engine_name" jsonschema:"title=Engine Name" jsonschema_extras:"order=3"`
Database string `json:"database" jsonschema:"title=Database" jsonschema_extras:"order=4"`
S3Bucket string `json:"s3_bucket" jsonschema:"title=S3 Bucket" jsonschema_extras:"order=5"`
S3Prefix string `json:"s3_prefix,omitempty" jsonschema:"title=S3 Prefix,default=/" jsonschema_extras:"order=6"`
AWSKeyId string `json:"aws_key_id,omitempty" jsonschema:"title=AWS Key ID" jsonschema_extras:"order=7"`
AWSSecretKey string `json:"aws_secret_key,omitempty" jsonschema:"title=AWS Secret Key" jsonschema_extras:"secret=true,order=8"`
AWSRegion string `json:"aws_region,omitempty" jsonschema:"title=AWS Region" jsonschema_extras:"order=9"`
}

func (c config) Validate() error {
if c.EngineURL == "" {
return fmt.Errorf("missing required engine_url")
if c.EngineName == "" {
return fmt.Errorf("missing required engine_name")
}
if c.Database == "" {
return fmt.Errorf("missing required database")
}
if c.Username == "" {
return fmt.Errorf("missing required username")
if c.AccountName == "" {
return fmt.Errorf("missing required account_name")
}
if c.Password == "" {
return fmt.Errorf("missing required password")
if c.ClientId == "" {
return fmt.Errorf("missing required client_id")
}
if c.ClientSecret == "" {
return fmt.Errorf("missing required client_secret")
}
if c.S3Bucket == "" {
return fmt.Errorf("missing required bucket")
Expand All @@ -40,14 +44,16 @@ func (c config) Validate() error {
// which provides the jsonschema description of the fields
func (config) GetFieldDocString(fieldName string) string {
switch fieldName {
case "EngineURL":
return "Engine URL of the Firebolt database, in the format: `<engine-name>.<organisation>.<region>.app.firebolt.io`."
case "EngineName":
return "Engine Name to process your queries."
case "Database":
return "Name of the Firebolt database."
case "Username":
return "Firebolt username."
case "Password":
return "Firebolt password."
case "ClientID":
return "ID of your Firebolt service account."
case "ClientSecret":
return "Secret key of your Firebolt service account."
case "AccountName":
return "Firebolt account within your organization."
case "S3Bucket":
return "Name of S3 bucket where the intermediate files for external table will be stored."
case "S3Prefix":
Expand Down
21 changes: 12 additions & 9 deletions materialize-firebolt/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"strings"

"database/sql"

schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
"github.com/estuary/connectors/materialize-firebolt/firebolt"
"github.com/estuary/connectors/materialize-firebolt/schemalate"
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
_ "github.com/firebolt-db/firebolt-go-sdk"
)

// driver implements the DriverServer interface.
Expand Down Expand Up @@ -108,12 +110,13 @@ func (d driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_
return nil, fmt.Errorf("parsing endpoint config: %w", err)
}

var fb, err = firebolt.New(firebolt.Config{
EngineURL: cfg.EngineURL,
Database: cfg.Database,
Username: cfg.Username,
Password: cfg.Password,
})
dsn := fmt.Sprintf("firebolt:///%s?account_name=%s&client_id=%s&client_secret=%s&engine=%s", cfg.Database, cfg.AccountName, cfg.ClientId, cfg.ClientSecret, cfg.EngineName)

// opening the firebolt driver
db, err := sql.Open("firebolt", dsn)
if err != nil {
fmt.Printf("error during opening a driver: %v\n", err)
}

if err != nil {
return nil, fmt.Errorf("creating firebolt client: %w", err)
Expand Down Expand Up @@ -151,12 +154,12 @@ func (d driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_
}

for i, bundle := range queries.Bindings {
_, err := fb.Query(bundle.CreateExternalTable)
_, err := db.Query(bundle.CreateExternalTable)
if err != nil {
return nil, fmt.Errorf("running external table creation query: %w", err)
}

_, err = fb.Query(bundle.CreateTable)
_, err = db.Query(bundle.CreateTable)
if err != nil {
return nil, fmt.Errorf("running table creation query: %w", err)
}
Expand Down
Loading

0 comments on commit 539415e

Please sign in to comment.