Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch remote schema async #5347

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *Manager) addColumns(ctx context.Context, namespace, tableName string, c
return fmt.Errorf("creating snowflake manager: %w", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
snowflakeManager.Close()
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
return fmt.Errorf("adding column: %w, %w", errAuthz, err)
Expand Down Expand Up @@ -160,7 +160,7 @@ func (m *Manager) handleSchemaError(
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
snowflakeManager.Close()
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %w, %w", errAuthz, err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func (m *Manager) handleTableError(
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
snowflakeManager.Close()
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w, %w", errAuthz, err)
Expand Down
2 changes: 1 addition & 1 deletion router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ func (w *worker) AvailableSlots() int {
return cap(w.input) - len(w.input) - w.inputReservations
}

// Reserve tries to reserve a slot in the worker's input channel, if available
// ReserveSlot tries to reserve a slot in the worker's input channel, if available
func (w *worker) ReserveSlot() *workerSlot {
if w.AvailableSlots() > 0 {
w.inputReservations++
Expand Down
9 changes: 6 additions & 3 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@
tableSchemaInUpload,
)
previousColumnKeys := warehouseutils.SortColumnKeysFromColumnMap(
as.Uploader.GetTableSchemaInWarehouse(
as.Uploader.GetTableSchema(

Check warning on line 281 in warehouse/integrations/azure-synapse/azure-synapse.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/azure-synapse/azure-synapse.go#L281

Added line #L281 was not covered by tests
tableName,
),
)
Expand Down Expand Up @@ -608,7 +608,7 @@
defer as.dropStagingTable(ctx, unionStagingTableName)
defer as.dropStagingTable(ctx, identifyStagingTable)

userColMap := as.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := as.Uploader.GetTableSchema(warehouseutils.UsersTable)

Check warning on line 611 in warehouse/integrations/azure-synapse/azure-synapse.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/azure-synapse/azure-synapse.go#L611

Added line #L611 was not covered by tests
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down Expand Up @@ -946,10 +946,13 @@
logfield.Error, err,
)
}
_ = as.DB.Close()
}
}

func (as *AzureSynapse) Close() {
_ = as.DB.Close()

Check warning on line 953 in warehouse/integrations/azure-synapse/azure-synapse.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/azure-synapse/azure-synapse.go#L952-L953

Added lines #L952 - L953 were not covered by tests
}

func (*AzureSynapse) LoadIdentityMergeRulesTable(_ context.Context) (err error) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/azure-synapse/azure_synapse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func newMockUploader(
mockUploader.EXPECT().UseRudderStorage().Return(false).AnyTimes()
mockUploader.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(loadFiles, nil).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetTableSchema(tableName).Return(schemaInWarehouse).AnyTimes()

return mockUploader
}
8 changes: 6 additions & 2 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
return fmt.Sprintf("FIRST_VALUE(`%[1]s` IGNORE NULLS) OVER (PARTITION BY id ORDER BY received_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `%[1]s`", column)
}

userColMap := bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := bq.uploader.GetTableSchema(warehouseutils.UsersTable)
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down Expand Up @@ -705,7 +705,7 @@ func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingT
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

usersSchema := getTableSchema(bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable))
usersSchema := getTableSchema(bq.uploader.GetTableSchema(warehouseutils.UsersTable))
metaData := &bigquery.TableMetadata{
Schema: usersSchema,
}
Expand Down Expand Up @@ -1000,7 +1000,11 @@ func (bq *BigQuery) Cleanup(ctx context.Context) {
obskit.Error(err),
)
}
}
}

func (bq *BigQuery) Close() {
if bq.db != nil {
_ = bq.db.Close()
}
}
Expand Down
3 changes: 2 additions & 1 deletion warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func TestIntegration(t *testing.T) {
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_ENABLE_DELETE_BY_JOBS", "true")
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_MAX_PARALLEL_LOADS", "8")
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_SLOW_QUERY_THRESHOLD", "0s")
t.Setenv("RSERVER_WAREHOUSE_SYNC_SCHEMA_FREQUENCY", "1s")

whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)

Expand Down Expand Up @@ -1469,7 +1470,7 @@ func newMockUploader(
},
).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetTableSchema(tableName).Return(schemaInWarehouse).AnyTimes()

return mockUploader
}
5 changes: 4 additions & 1 deletion warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (ch *Clickhouse) AddColumns(ctx context.Context, tableName string, columnsI
}

func (ch *Clickhouse) CreateSchema(ctx context.Context) error {
if !ch.Uploader.IsWarehouseSchemaEmpty() {
if !ch.Uploader.IsSchemaEmpty() {
return nil
}

Expand Down Expand Up @@ -1050,6 +1050,9 @@ func (ch *Clickhouse) LoadTable(ctx context.Context, tableName string) (*types.L
}

func (ch *Clickhouse) Cleanup(_ context.Context) {
}

func (ch *Clickhouse) Close() {
if ch.DB != nil {
_ = ch.DB.Close()
}
Expand Down
11 changes: 4 additions & 7 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"

"go.uber.org/mock/gomock"

clickhousestd "github.com/ClickHouse/clickhouse-go"
"github.com/google/uuid"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/rudderlabs/compose-test/compose"
"github.com/rudderlabs/compose-test/testcompose"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -1222,7 +1219,7 @@ func newMockUploader(
u.EXPECT().GetTableSchemaInUpload(gomock.Any()).Return(tableSchema).AnyTimes()
u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(metadata, nil).AnyTimes()
u.EXPECT().UseRudderStorage().Return(false).AnyTimes()
u.EXPECT().IsWarehouseSchemaEmpty().Return(true).AnyTimes()
u.EXPECT().IsSchemaEmpty().Return(true).AnyTimes()

return u
}
2 changes: 2 additions & 0 deletions warehouse/integrations/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (d *Datalake) LoadIdentityMappingsTable(context.Context) error {
func (*Datalake) Cleanup(context.Context) {
}

func (*Datalake) Close() {}

func (*Datalake) IsEmpty(context.Context, model.Warehouse) (bool, error) {
return false, nil
}
Expand Down
11 changes: 7 additions & 4 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (d *Deltalake) LoadTable(
tableName string,
) (*types.LoadTableStats, error) {
uploadTableSchema := d.Uploader.GetTableSchemaInUpload(tableName)
warehouseTableSchema := d.Uploader.GetTableSchemaInWarehouse(tableName)
warehouseTableSchema := d.Uploader.GetTableSchema(tableName)

loadTableStat, _, err := d.loadTable(
ctx,
Expand Down Expand Up @@ -980,9 +980,9 @@ func (d *Deltalake) hasAWSCredentials() bool {
func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error {
var (
identifiesSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable)
identifiesSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable)
identifiesSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.IdentifiesTable)
usersSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable)
usersSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
usersSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.UsersTable)
)

d.logger.Infow("started loading for identifies and users tables",
Expand Down Expand Up @@ -1253,10 +1253,13 @@ func (d *Deltalake) Cleanup(ctx context.Context) {
logfield.Error, err.Error(),
)
}
_ = d.DB.Close()
}
}

func (d *Deltalake) Close() {
_ = d.DB.Close()
}

// IsEmpty checks if the warehouse is empty or not
func (*Deltalake) IsEmpty(context.Context, model.Warehouse) (bool, error) {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func newMockUploader(
).AnyTimes()
mockUploader.EXPECT().GetSampleLoadFileLocation(gomock.Any(), gomock.Any()).Return(loadFiles[0].Location, nil).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetTableSchema(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetLoadFileType().Return(loadFileType).AnyTimes()

return mockUploader
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -22,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand All @@ -38,6 +37,7 @@ type Manager interface {
LoadIdentityMergeRulesTable(ctx context.Context) error
LoadIdentityMappingsTable(ctx context.Context) error
Cleanup(ctx context.Context)
Close()
IsEmpty(ctx context.Context, warehouse model.Warehouse) (bool, error)
TestConnection(ctx context.Context, warehouse model.Warehouse) error
DownloadIdentityRules(ctx context.Context, gzWriter *misc.GZipWriter) error
Expand Down
8 changes: 5 additions & 3 deletions warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@
defer ms.dropStagingTable(ctx, unionStagingTableName)
defer ms.dropStagingTable(ctx, identifyStagingTable)

userColMap := ms.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := ms.Uploader.GetTableSchema(warehouseutils.UsersTable)

Check warning on line 631 in warehouse/integrations/mssql/mssql.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/mssql/mssql.go#L631

Added line #L631 was not covered by tests
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down Expand Up @@ -961,11 +961,13 @@
logfield.Error, err,
)
}

_ = ms.DB.Close()
}
}

func (ms *MSSQL) Close() {
_ = ms.DB.Close()

Check warning on line 968 in warehouse/integrations/mssql/mssql.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/mssql/mssql.go#L967-L968

Added lines #L967 - L968 were not covered by tests
}

func (*MSSQL) LoadIdentityMergeRulesTable(context.Context) (err error) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/mssql/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func newMockUploader(
mockUploader.EXPECT().UseRudderStorage().Return(false).AnyTimes()
mockUploader.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(loadFiles, nil).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetTableSchema(tableName).Return(schemaInWarehouse).AnyTimes()

return mockUploader
}
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (pg *Postgres) LoadUserTables(ctx context.Context) map[string]error {

identifiesSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable)
usersSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable)
usersSchemaInWarehouse := pg.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
usersSchemaInWarehouse := pg.Uploader.GetTableSchema(warehouseutils.UsersTable)

var loadingError loadUsersTableResponse
_ = pg.DB.WithTx(ctx, func(tx *sqlmiddleware.Tx) error {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestLoadUsersTable(t *testing.T) {
}
mockUploader := mockuploader.NewMockUploader(ctrl)
mockUploader.EXPECT().GetTableSchemaInUpload(gomock.Any()).AnyTimes().DoAndReturn(f)
mockUploader.EXPECT().GetTableSchemaInWarehouse(gomock.Any()).AnyTimes().DoAndReturn(f)
mockUploader.EXPECT().GetTableSchema(gomock.Any()).AnyTimes().DoAndReturn(f)
mockUploader.EXPECT().CanAppend().AnyTimes().Return(true)

pg.DB = db
Expand Down
3 changes: 3 additions & 0 deletions warehouse/integrations/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ func (pg *Postgres) FetchSchema(ctx context.Context) (model.Schema, error) {
}

func (pg *Postgres) Cleanup(context.Context) {
}

func (pg *Postgres) Close() {
if pg.DB != nil {
_ = pg.DB.Close()
}
Expand Down
6 changes: 3 additions & 3 deletions warehouse/integrations/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,8 +1324,8 @@ func TestIntegration(t *testing.T) {
mockUploader.EXPECT().GetLoadFilesMetadata(gomock.Any(), whutils.GetLoadFilesOptions{Table: whutils.IdentifiesTable}).Return([]whutils.LoadFile{{Location: identifiesUploadOutput.Location}}, nil).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(whutils.UsersTable).Return(usersTableSchema).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(whutils.IdentifiesTable).Return(IdentifiesTableSchema).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(whutils.UsersTable).Return(usersTableSchema).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(whutils.IdentifiesTable).Return(IdentifiesTableSchema).AnyTimes()
mockUploader.EXPECT().GetTableSchema(whutils.UsersTable).Return(usersTableSchema).AnyTimes()
mockUploader.EXPECT().GetTableSchema(whutils.IdentifiesTable).Return(IdentifiesTableSchema).AnyTimes()
mockUploader.EXPECT().CanAppend().Return(true).AnyTimes()

primaryPG := postgres.New(config.New(), logger.NOP, stats.NOP)
Expand Down Expand Up @@ -1420,7 +1420,7 @@ func mockUploader(
mockUploader.EXPECT().UseRudderStorage().Return(false).AnyTimes()
mockUploader.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(loadFiles, nil).AnyTimes() // Try removing this
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetTableSchema(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().CanAppend().Return(true).AnyTimes()

return mockUploader
Expand Down
16 changes: 11 additions & 5 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@
_, identifyStagingTable, err = rs.loadTable(ctx,
warehouseutils.IdentifiesTable,
rs.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable),
rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable),
rs.Uploader.GetTableSchema(warehouseutils.IdentifiesTable),
true,
)
if err != nil {
Expand All @@ -760,7 +760,7 @@
_, _, err := rs.loadTable(ctx,
warehouseutils.UsersTable,
rs.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable),
rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable),
rs.Uploader.GetTableSchema(warehouseutils.UsersTable),
false,
)
if err != nil {
Expand All @@ -775,7 +775,7 @@
}
}

userColMap := rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := rs.Uploader.GetTableSchema(warehouseutils.UsersTable)
for colName := range userColMap {
// do not reference uuid in queries as it can be an autoincrement field set by segment compatible tables
if colName == "id" || colName == "user_id" || colName == "uuid" {
Expand Down Expand Up @@ -1294,10 +1294,16 @@
logfield.Error, err.Error(),
)
}
_ = rs.DB.Close()
}
}

func (rs *Redshift) Close() {
if rs.DB == nil {
return
}

Check warning on line 1303 in warehouse/integrations/redshift/redshift.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/redshift/redshift.go#L1302-L1303

Added lines #L1302 - L1303 were not covered by tests
Comment on lines +1301 to +1303

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor:

if rs.DB != nil {
    _ = rs.DB.Close()
}

CC: @achettyiitr common interface might help reduce some duplicate code per warehouse. Just keeping a note of it.

_ = rs.DB.Close()
}

func (*Redshift) IsEmpty(context.Context, model.Warehouse) (empty bool, err error) {
return
}
Expand All @@ -1311,7 +1317,7 @@
ctx,
tableName,
rs.Uploader.GetTableSchemaInUpload(tableName),
rs.Uploader.GetTableSchemaInWarehouse(tableName),
rs.Uploader.GetTableSchema(tableName),
false,
)
return loadTableStat, err
Expand Down
Loading
Loading