Skip to content

Commit

Permalink
subset refactor - handle double references (#1967)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored May 20, 2024
1 parent 450f1ac commit ee79e50
Show file tree
Hide file tree
Showing 8 changed files with 1,178 additions and 490 deletions.
59 changes: 59 additions & 0 deletions backend/pkg/sqlmanager/mock_SqlDatabase.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions backend/pkg/sqlmanager/mysql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ func (m *MysqlManager) GetForeignKeyConstraintsMap(ctx context.Context, schemas
return constraints, err
}

// Key is schema.table value is list of tables that key depends on
func (m *MysqlManager) GetForeignKeyReferencesMap(ctx context.Context, schemas []string) (map[string][]*ColumnConstraint, error) {
fkConstraints, err := m.GetForeignKeyConstraints(ctx, schemas)
if err != nil {
return nil, err
}
groupedFks := map[string][]*ForeignKeyConstraintsRow{} // grouped by constraint name
for _, row := range fkConstraints {
groupedFks[row.ConstraintName] = append(groupedFks[row.ConstraintName], row)
}
constraints := map[string][]*ColumnConstraint{}
for _, fks := range groupedFks {
cols := []string{}
notNullable := []bool{}
fkCols := []string{}
for _, fk := range fks {
cols = append(cols, fk.ColumnName)
notNullable = append(notNullable, !fk.IsNullable)
fkCols = append(fkCols, fk.ForeignColumnName)
}
row := fks[0]
tableName := BuildTable(row.SchemaName, row.TableName)
constraints[tableName] = append(constraints[tableName], &ColumnConstraint{
Columns: cols,
NotNullable: notNullable,
ForeignKey: &ReferenceKey{
Table: BuildTable(row.ForeignSchemaName, row.ForeignTableName),
Columns: fkCols,
},
})
}

return constraints, err
}

func (m *MysqlManager) GetPrimaryKeyConstraints(ctx context.Context, schemas []string) ([]*PrimaryKey, error) {
holder := make([][]*mysql_queries.GetPrimaryKeyConstraintsRow, len(schemas))
errgrp, errctx := errgroup.WithContext(ctx)
Expand Down
41 changes: 41 additions & 0 deletions backend/pkg/sqlmanager/postgres-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,47 @@ func (p *PostgresManager) GetForeignKeyConstraintsMap(ctx context.Context, schem
return tableConstraints, nil
}

// Key is schema.table value is list of tables that key depends on
func (p *PostgresManager) GetForeignKeyReferencesMap(ctx context.Context, schemas []string) (map[string][]*ColumnConstraint, error) {
if len(schemas) == 0 {
return map[string][]*ColumnConstraint{}, nil
}
rows, err := p.querier.GetTableConstraintsBySchema(ctx, p.pool, schemas)
if err != nil && !nucleusdb.IsNoRows(err) {
return nil, err
} else if err != nil && nucleusdb.IsNoRows(err) {
return map[string][]*ColumnConstraint{}, nil
}

constraintRows := []*pg_queries.GetTableConstraintsBySchemaRow{}
for _, row := range rows {
if row.ConstraintType != "f" {
continue
}
constraintRows = append(constraintRows, row)
}
tableConstraints := map[string][]*ColumnConstraint{}
for _, row := range constraintRows {
if len(row.ConstraintColumns) != len(row.ForeignColumnNames) {
return nil, fmt.Errorf("length of columns was not equal to length of foreign key cols: %d %d", len(row.ConstraintColumns), len(row.ForeignColumnNames))
}
if len(row.ConstraintColumns) != len(row.Notnullable) {
return nil, fmt.Errorf("length of columns was not equal to length of not nullable cols: %d %d", len(row.ConstraintColumns), len(row.Notnullable))
}

tableName := BuildTable(row.SchemaName, row.TableName)
tableConstraints[tableName] = append(tableConstraints[tableName], &ColumnConstraint{
Columns: row.ConstraintColumns,
NotNullable: row.Notnullable,
ForeignKey: &ReferenceKey{
Table: BuildTable(row.ForeignSchemaName, row.ForeignTableName),
Columns: row.ForeignColumnNames,
},
})
}
return tableConstraints, nil
}

func (p *PostgresManager) GetPrimaryKeyConstraints(ctx context.Context, schemas []string) ([]*PrimaryKey, error) {
if len(schemas) == 0 {
return []*PrimaryKey{}, nil
Expand Down
11 changes: 11 additions & 0 deletions backend/pkg/sqlmanager/sql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ type ForeignConstraint struct {
ForeignKey *ForeignKey
}

type ReferenceKey struct {
Table string
Columns []string
}
type ColumnConstraint struct {
Columns []string
NotNullable []bool
ForeignKey *ReferenceKey
}

type ColumnInfo struct {
OrdinalPosition int32 // Specifies the sequence or order in which each column is defined within the table. Starts at 1 for the first column.
ColumnDefault string // Specifies the default value for a column, if any is set.
Expand All @@ -51,6 +61,7 @@ type SqlDatabase interface {
GetSchemaColumnMap(ctx context.Context) (map[string]map[string]*ColumnInfo, error) // ex: {public.users: { id: struct{}{}, created_at: struct{}{}}}
GetForeignKeyConstraints(ctx context.Context, schemas []string) ([]*ForeignKeyConstraintsRow, error)
GetForeignKeyConstraintsMap(ctx context.Context, schemas []string) (map[string][]*ForeignConstraint, error)
GetForeignKeyReferencesMap(ctx context.Context, schemas []string) (map[string][]*ColumnConstraint, error)
GetPrimaryKeyConstraints(ctx context.Context, schemas []string) ([]*PrimaryKey, error)
GetPrimaryKeyConstraintsMap(ctx context.Context, schemas []string) (map[string][]string, error)
GetUniqueConstraintsMap(ctx context.Context, schemas []string) (map[string][][]string, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
return nil, errors.New(haltOnSchemaAdditionErrMsg)
}

// todo should use GetForeignKeyReferencesMap instead
tableDependencyMap, err := db.Db.GetForeignKeyConstraintsMap(ctx, uniqueSchemas)
if err != nil {
return nil, fmt.Errorf("unable to retrieve database foreign key constraints: %w", err)
Expand Down Expand Up @@ -176,11 +177,15 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
if err != nil {
return nil, err
}
tableConstraintsSource = getForeignKeyToSourceMap(tableDependencyMap)

// reverse of table dependency
// map of foreign key to source table + column
tableConstraintsSource = getForeignKeyToSourceMap(tableDependencyMap)
tableQueryMap, err := buildSelectQueryMap(db.Driver, groupedTableMapping, sourceTableOpts, tableDependencyMap, runConfigs, sqlSourceOpts.SubsetByForeignKeyConstraints)
fkReferenceMap, err := db.Db.GetForeignKeyReferencesMap(ctx, uniqueSchemas)
if err != nil {
return nil, fmt.Errorf("unable to retrieve database foreign key constraints: %w", err)
}
tableQueryMap, err := buildSelectQueryMap(db.Driver, groupedTableMapping, sourceTableOpts, fkReferenceMap, runConfigs, sqlSourceOpts.SubsetByForeignKeyConstraints)
if err != nil {
return nil, fmt.Errorf("unable to build select queries: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,9 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_PrimaryKey_Transformer_Pg_Pg(t *
mockSqlDb.On("GetForeignKeyConstraintsMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ForeignConstraint{
"public.orders": {{Column: "buyer_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.orders": {{Columns: []string{"buyer_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
}, nil)
bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, redisConfig, false)

resp, err := bbuilder.GenerateBenthosConfigs(
Expand Down Expand Up @@ -1000,6 +1003,9 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_PrimaryKey_Passthrough_Pg_Pg(t *
mockSqlDb.On("GetForeignKeyConstraintsMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ForeignConstraint{
"public.orders": {{Column: "buyer_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.orders": {{Columns: []string{"buyer_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, nil, false)

Expand Down Expand Up @@ -1240,6 +1246,9 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_CircularDependency_PrimaryKey_Tr
mockSqlDb.On("GetForeignKeyConstraintsMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ForeignConstraint{
"public.jobs": {{Column: "parent_id", IsNullable: true, ForeignKey: &sql_manager.ForeignKey{Table: "public.jobs", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.jobs": {{Columns: []string{"parent_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.jobs", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, redisConfig, false)

Expand Down Expand Up @@ -1524,6 +1533,9 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Pg_Pg_With_Constraints(t *
mockSqlDb.On("GetForeignKeyConstraintsMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ForeignConstraint{
"public.user_account_associations": {{Column: "user_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.user_account_associations": {{Columns: []string{"user_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, nil, false)

Expand Down Expand Up @@ -1690,6 +1702,10 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Pg_Pg_With_Circular_Depend
"public.user_account_associations": {{Column: "user_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
"public.users": {{Column: "user_assoc_id", IsNullable: true, ForeignKey: &sql_manager.ForeignKey{Table: "public.user_account_associations", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.user_account_associations": {{Columns: []string{"user_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
"public.users": {{Columns: []string{"user_assoc_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.user_account_associations", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, nil, false)

Expand Down Expand Up @@ -2034,6 +2050,11 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Pg_Pg_With_Circular_Depend
"public.users": {{Column: "user_assoc_id", IsNullable: true, ForeignKey: &sql_manager.ForeignKey{Table: "public.user_account_associations", Column: "id"}}},
}, nil)

mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.user_account_associations": {{Columns: []string{"user_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
"public.users": {{Columns: []string{"user_assoc_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.user_account_associations", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, nil, false)

resp, err := bbuilder.GenerateBenthosConfigs(
Expand Down Expand Up @@ -2390,6 +2411,9 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Mysql_Mysql(t *testing.T)
mockSqlDb.On("GetForeignKeyConstraintsMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ForeignConstraint{
"public.user_account_associations": {{Column: "user_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.user_account_associations": {{Columns: []string{"user_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformersClient, mockJobId, mockRunId, nil, false)

Expand Down Expand Up @@ -2649,6 +2673,10 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Mysql_Mysql_With_Circular_
"public.user_account_associations": {{Column: "user_id", IsNullable: false, ForeignKey: &sql_manager.ForeignKey{Table: "public.users", Column: "id"}}},
"public.users": {{Column: "user_assoc_id", IsNullable: true, ForeignKey: &sql_manager.ForeignKey{Table: "public.user_account_associations", Column: "id"}}},
}, nil)
mockSqlDb.On("GetForeignKeyReferencesMap", mock.Anything, []string{"public"}).Return(map[string][]*sql_manager.ColumnConstraint{
"public.user_account_associations": {{Columns: []string{"user_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.users", Columns: []string{"id"}}}},
"public.users": {{Columns: []string{"user_assoc_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.user_account_associations", Columns: []string{"id"}}}},
}, nil)

bbuilder := newBenthosBuilder(mockSqlManager, mockJobClient, mockConnectionClient, mockTransformerClient, mockJobId, mockRunId, nil, false)

Expand Down
Loading

0 comments on commit ee79e50

Please sign in to comment.