Skip to content

Commit

Permalink
chore: integrate sqlconnect-go for databricks
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 25, 2024
1 parent dd6791d commit 6b90417
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 42 deletions.
19 changes: 10 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/viney-shih/go-lock v1.1.2
github.com/xitongsys/parquet-go v1.5.1
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
go.etcd.io/etcd/api/v3 v3.5.15
go.etcd.io/etcd/client/v3 v3.5.15
go.uber.org/atomic v1.11.0
Expand All @@ -116,6 +116,7 @@ require (
require (
github.com/go-ini/ini v1.67.0 // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/rudderlabs/sqlconnect-go v1.7.1 // indirect
)

require (
Expand Down Expand Up @@ -145,19 +146,19 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.54.3 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitfield/gotestdox v0.2.2 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
Expand Down Expand Up @@ -308,7 +309,7 @@ require (
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.22.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX
github.com/aws/aws-sdk-go-v2 v1.23.0/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA=
github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo=
github.com/aws/aws-sdk-go-v2 v1.27.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1/go.mod h1:t8PYl/6LzdAqsU4/9tz28V/kU+asFePvpOMkdul0gEQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to=
Expand All @@ -249,6 +250,7 @@ github.com/aws/aws-sdk-go-v2/credentials v1.11.2/go.mod h1:j8YsY9TXTm31k4eFhspiQ
github.com/aws/aws-sdk-go-v2/credentials v1.16.2/go.mod h1:sDdvGhXrSVT5yzBDR7qXz+rhbpiMpUYfF3vJ01QSdrc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.16 h1:7d2QxY83uYl0l58ceyiSpxg9bSbStqBC6BeEeHEchwo=
github.com/aws/aws-sdk-go-v2/credentials v1.17.16/go.mod h1:Ae6li/6Yc6eMzysRL2BXlPYvnrLLBg3D11/AmOjw50k=
github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3/go.mod h1:uk1vhHHERfSVCUnqSqz8O48LBYDSC+k6brng09jcMOk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4/go.mod h1:t4i+yGHMCcUNIX1x7YVYa6bH/Do7civ5I6cG/6PMfyA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 h1:dQLK4TjtnlRGb0czOht2CevZ5l6RSyRWAnKeGd7VAFE=
Expand All @@ -261,10 +263,12 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9/go.mod h1:AnVH5pvai0p
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3/go.mod h1:7sGSz1JCKHWWBHq98m6sMtWQikmYPpxjqOydDemiVoM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 h1:lf/8VTF2cM+N4SLzaYJERKEWAXq8MOMpZfU6wEPWsPk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7/go.mod h1:4SjkU7QiqK2M9oozyMzfZ/23LmUY+h3oFqhdeP5OMiI=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3/go.mod h1:ssOhaLpRlh88H3UmEcsBoVKq309quMvm3Ds8e9d4eJM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3/go.mod h1:ify42Rb7nKeDDPkFjKn7q1bPscVPu/+gmHH8d2c+anU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 h1:4OYVp0705xu8yjdyoWix0r9wPIRXnIzzOoUpQVHIJ/g=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7/go.mod h1:vd7ESTEvI76T2Na050gODNmNU7+OyKrIKroYTu4ABiI=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10/go.mod h1:8DcYQcz0+ZJaSxANlHIsbbi6S+zMwjwdDqwW3r9AzaE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
Expand All @@ -276,6 +280,7 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1/go.mod h1:G
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3/go.mod h1:Seb8KNmD6kVTjwRjVEgOT5hPin6sq+v4C2ycJQDwuH8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3/go.mod h1:R+/S1O4TYpcktbVwddeOYg+uwUfLhADP2S/x4QwsCTM=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 h1:UXqEWQI0n+q0QixzU0yUUQBZXRd5037qdInTIHFTl98=
Expand All @@ -284,6 +289,7 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3/go.mod h1:wlY
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3/go.mod h1:Owv1I59vaghv1Ax8zz8ELY8DN7/Y0rGS+WWAmjgi950=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3/go.mod h1:Bm/v2IaN6rZ+Op7zX+bOUMdL4fsrYZiD0dsjLhNKwZc=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3/go.mod h1:KZgs2ny8HsxRIRbDwgvJcHHBZPOzQr/+NtGwnP+w2ec=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 h1:uO5XR6QGBcmPyo2gxofYJLFkcVQ4izOoGDNenlZhTEk=
Expand Down Expand Up @@ -312,6 +318,7 @@ github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnw
github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -1129,6 +1136,8 @@ github.com/rudderlabs/rudder-schemas v0.5.0 h1:+140Amou92sB4ZXwsX3jgRIdjNfERJJgY
github.com/rudderlabs/rudder-schemas v0.5.0/go.mod h1:JoDTB9nCDXwRz+G+aYwP3Fj42HLssKARxsFFm+qqgb4=
github.com/rudderlabs/sql-tunnels v0.1.7 h1:wDCRl6zY4M5gfWazf7XkSTGQS3yjBzUiUgEMBIfHNDA=
github.com/rudderlabs/sql-tunnels v0.1.7/go.mod h1:5f7+YL49JHYgteP4rAgqKnr4K2OadB0oIpUS+Tt3sPM=
github.com/rudderlabs/sqlconnect-go v1.7.1 h1:mCyK/pKLZGT2tG0DHiWjtbxPRqTtxTyPJMAS/0ZsOIo=
github.com/rudderlabs/sqlconnect-go v1.7.1/go.mod h1:CB34S/Cb1v8N7jMDUjsGcp4MZ7t8aIAF31ZHaRXnWwo=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
Expand Down Expand Up @@ -1274,6 +1283,7 @@ github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEAB
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76/go.mod h1:SQliXeA7Dhkt//vS29v3zpbEwoa+zb2Cn5xj5uO4K5U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -1565,6 +1575,7 @@ golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXct
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
83 changes: 50 additions & 33 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package deltalake
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
sqlconnectconfig "github.com/rudderlabs/sqlconnect-go/sqlconnect/config"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

dbsql "github.com/databricks/databricks-sql-go"
dbsqllog "github.com/databricks/databricks-sql-go/logger"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -28,23 +31,25 @@ import (
)

const (
host = "host"
port = "port"
path = "path"
token = "token"
catalog = "catalog"
useSTSTokens = "useSTSTokens"
userAgent = "Rudderstack"

provider = warehouseutils.DELTALAKE
configKeyHost = "host"
configKeyPort = "port"
configKeyPath = "path"
configKeyToken = "token"
configKeyCatalog = "catalog"
configKeyUseSTSTokens = "useSTSTokens"
)

// Corresponds to the max length set for event rudder-transformer
// https://github.com/rudderlabs/rudder-transformer/blob/fb8b818b2cbd05f784117b9f3040856dab1a7346/src/warehouse/v1/util.js#L34
tableNameLimit = 127
const (
provider = warehouseutils.DELTALAKE
tableNameLimit = 127 // Maximum table name length in rudder-transformer
)

const (
schemaNotFound = "[SCHEMA_NOT_FOUND]"
columnsAlreadyExists = "already exists in root"
)

const (
rudderStagingTableRegex = "^rudder_staging_.*$" // matches rudder_staging_* tables
nonRudderStagingTableRegex = "^(?!rudder_staging_.*$).*" // matches tables that do not start with rudder_staging_
)
Expand Down Expand Up @@ -185,38 +190,50 @@ func (d *Deltalake) Setup(_ context.Context, warehouse model.Warehouse, uploader

// connect connects to the warehouse
func (d *Deltalake) connect() (*sqlmiddleware.DB, error) {
port, err := strconv.Atoi(warehouseutils.GetConfigValue(port, d.Warehouse))
var (
host = warehouseutils.GetConfigValue(configKeyHost, d.Warehouse)
portString = warehouseutils.GetConfigValue(configKeyPort, d.Warehouse)
path = warehouseutils.GetConfigValue(configKeyPath, d.Warehouse)
token = warehouseutils.GetConfigValue(configKeyToken, d.Warehouse)
catalog = warehouseutils.GetConfigValue(configKeyCatalog, d.Warehouse)
timeout = d.connectTimeout
)

port, err := strconv.Atoi(portString)
if err != nil {
return nil, fmt.Errorf("port is not a number: %w", err)
}

connector, err := dbsql.NewConnector(
dbsql.WithServerHostname(warehouseutils.GetConfigValue(host, d.Warehouse)),
dbsql.WithPort(port),
dbsql.WithHTTPPath(warehouseutils.GetConfigValue(path, d.Warehouse)),
dbsql.WithAccessToken(warehouseutils.GetConfigValue(token, d.Warehouse)),
dbsql.WithSessionParams(map[string]string{
data := sqlconnectconfig.Databricks{
Host: host,
Port: port,
Path: path,
Token: token,
Catalog: catalog,
Timeout: timeout,
SessionParams: map[string]string{
"ansi_mode": "false",
}),
dbsql.WithUserAgentEntry(userAgent),
dbsql.WithTimeout(d.connectTimeout),
dbsql.WithInitialNamespace(
warehouseutils.GetConfigValue(catalog, d.Warehouse),
"",
),
dbsql.WithRetries(d.config.maxRetries, d.config.retryMinWait, d.config.retryMaxWait),
)
},
RetryAttempts: d.config.maxRetries,
MinRetryWaitTime: d.config.retryMinWait,
MaxRetryWaitTime: d.config.retryMaxWait,
}

credentialsJSON, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("creating connector: %w", err)
return nil, fmt.Errorf("marshalling credentials: %w", err)
}

sqlConnectDB, err := sqlconnect.NewDB("databricks", credentialsJSON)
if err != nil {
return nil, fmt.Errorf("creating sqlconnect db: %w", err)
}
if err = dbsqllog.SetLogLevel("disabled"); err != nil {
return nil, fmt.Errorf("setting log level: %w", err)
}

db := sql.OpenDB(connector)
middleware := sqlmiddleware.New(
db,
sqlConnectDB.SqlDB(),
sqlmiddleware.WithStats(d.stats),
sqlmiddleware.WithLogger(d.logger),
sqlmiddleware.WithKeyAndValues(
Expand Down Expand Up @@ -954,7 +971,7 @@ func (d *Deltalake) authQuery() (string, error) {
// canUseAuth returns true if the warehouse is configured to use RudderObjectStorage or STS tokens
func (d *Deltalake) canUseAuth() bool {
canUseRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(d.Warehouse.Destination.Config)
canUseSTSTokens := warehouseutils.GetConfigValueBoolString(useSTSTokens, d.Warehouse) == "true"
canUseSTSTokens := warehouseutils.GetConfigValueBoolString(configKeyUseSTSTokens, d.Warehouse) == "true"

return canUseRudderStorage || canUseSTSTokens
}
Expand Down

0 comments on commit 6b90417

Please sign in to comment.