Skip to content

Commit

Permalink
feat: sync external table with existing table schema
Browse files Browse the repository at this point in the history
  • Loading branch information
WantToBePro31 committed Jan 22, 2025
1 parent 37ddc80 commit b269743
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 67 deletions.
2 changes: 1 addition & 1 deletion ext/store/maxcompute/external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func buildExternalTableSchema(t *ExternalTable, location string) (tableschema.Ta
func externalTableColumns(t *ExternalTable, schemaBuilder *tableschema.SchemaBuilder) error {
partitionColNames := map[string]struct{}{}

return t.Schema.ToMaxComputeColumns(partitionColNames, nil, schemaBuilder)
return t.Schema.ToMaxComputeColumns(partitionColNames, nil, schemaBuilder, "external")
}

func ToOtherExternalSQLString(projectName, schemaName string, serdeProperties map[string]string, schema tableschema.TableSchema, format string) (string, error) {
Expand Down
70 changes: 38 additions & 32 deletions ext/store/maxcompute/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s Schema) Validate() error {
return nil
}

func (s Schema) ToMaxComputeColumns(partitionColumn map[string]struct{}, clusterColumn *Cluster, schemaBuilder *tableschema.SchemaBuilder) error {
func (s Schema) ToMaxComputeColumns(partitionColumn map[string]struct{}, clusterColumn *Cluster, schemaBuilder *tableschema.SchemaBuilder, tableType string) error {
mu := errors.NewMultiError("converting to max compute column")

clusterColumnAllowed := map[string]struct{}{}
Expand All @@ -45,47 +45,53 @@ func (s Schema) ToMaxComputeColumns(partitionColumn map[string]struct{}, cluster
}

if _, ok := partitionColumn[f.Name]; ok {
schemaBuilder.PartitionColumn(column)
schemaBuilder.PartitionColumn(tableschema.Column{
Name: "_partition_value",
Type: datatype.StringType,
GenerateExpression: tableschema.NewTruncTime(column.Name, tableschema.DAY),
})
} else {
schemaBuilder.Column(column)
clusterColumnAllowed[column.Name] = struct{}{}
}
schemaBuilder.Column(column)
}

if clusterColumn != nil && len(clusterColumn.Using) != 0 {
if clusterColumn.Type == "" {
clusterColumn.Type = tableschema.CLUSTER_TYPE.Hash
}
schemaBuilder.ClusterType(clusterColumn.Type)

if clusterColumn.Type == tableschema.CLUSTER_TYPE.Hash {
if clusterColumn.Buckets == 0 {
mu.Append(errors.InvalidArgument(resourceSchema, "number of cluster buckets is needed for hash type clustering"))
return mu.ToErr()
if tableType == "" || tableType == "common" {
if clusterColumn != nil && len(clusterColumn.Using) != 0 {
if clusterColumn.Type == "" {
clusterColumn.Type = tableschema.CLUSTER_TYPE.Hash
}
schemaBuilder.ClusterBucketNum(clusterColumn.Buckets)
}
schemaBuilder.ClusterType(clusterColumn.Type)

sortClusterAllowed := map[string]struct{}{}
for _, column := range clusterColumn.Using {
if _, ok := clusterColumnAllowed[column]; !ok {
mu.Append(errors.InvalidArgument(resourceSchema, fmt.Sprintf("cluster column %s not found in normal column", column)))
return mu.ToErr()
if clusterColumn.Type == tableschema.CLUSTER_TYPE.Hash {
if clusterColumn.Buckets == 0 {
mu.Append(errors.InvalidArgument(resourceSchema, "number of cluster buckets is needed for hash type clustering"))
return mu.ToErr()
}
schemaBuilder.ClusterBucketNum(clusterColumn.Buckets)
}
sortClusterAllowed[column] = struct{}{}
}
schemaBuilder.ClusterColumns(clusterColumn.Using)

if len(clusterColumn.SortBy) != 0 {
var sortClusterColumn []tableschema.SortColumn
for _, sortColumn := range clusterColumn.SortBy {
if _, ok := sortClusterAllowed[sortColumn.Name]; !ok {
mu.Append(errors.InvalidArgument(resourceSchema, fmt.Sprintf("sort column %s not found in cluster column", sortColumn.Name)))
sortClusterAllowed := map[string]struct{}{}
for _, column := range clusterColumn.Using {
if _, ok := clusterColumnAllowed[column]; !ok {
mu.Append(errors.InvalidArgument(resourceSchema, fmt.Sprintf("cluster column %s not found in normal column", column)))
return mu.ToErr()
}
sortClusterColumn = append(sortClusterColumn, tableschema.SortColumn{Name: sortColumn.Name, Order: tableschema.SortOrder(sortColumn.Order)})
sortClusterAllowed[column] = struct{}{}
}
schemaBuilder.ClusterColumns(clusterColumn.Using)

if len(clusterColumn.SortBy) != 0 {
var sortClusterColumn []tableschema.SortColumn
for _, sortColumn := range clusterColumn.SortBy {
if _, ok := sortClusterAllowed[sortColumn.Name]; !ok {
mu.Append(errors.InvalidArgument(resourceSchema, fmt.Sprintf("sort column %s not found in cluster column", sortColumn.Name)))
return mu.ToErr()
}
sortClusterColumn = append(sortClusterColumn, tableschema.SortColumn{Name: sortColumn.Name, Order: tableschema.SortOrder(sortColumn.Order)})
}
schemaBuilder.ClusterSortColumns(sortClusterColumn)
}
schemaBuilder.ClusterSortColumns(sortClusterColumn)
}
}

Expand Down Expand Up @@ -193,11 +199,11 @@ func (f *Field) ToColumn() (tableschema.Column, error) {
Type: dataType,
Comment: f.Description,
ExtendedLabels: nil,
IsNullable: true,
NotNull: false,
}

if f.Required {
c1.IsNullable = false
c1.NotNull = true
}

if f.DefaultValue != "" {
Expand Down
94 changes: 85 additions & 9 deletions ext/store/maxcompute/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
},
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unknown data type")
})
Expand All @@ -65,7 +65,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
},
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unknown data type")
})
Expand All @@ -83,7 +83,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
},
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unknown data type")
})
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
},
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, nil, nil, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unknown data type")
})
Expand All @@ -141,7 +141,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
SortBy: []maxcompute.SortColumn{{Name: "name", Order: "asc"}},
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "number of cluster buckets is needed for hash type clustering")
})
Expand All @@ -164,7 +164,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
Buckets: 5,
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, fmt.Sprintf("cluster column %s not found in normal column", invalidClusterColumn))
})
Expand All @@ -188,11 +188,11 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
Buckets: 5,
}

err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder)
err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder, "common")
assert.NotNil(t, err)
assert.ErrorContains(t, err, fmt.Sprintf("sort column %s not found in cluster column", invalidSortClusterColumn))
})
t.Run("return success when schema column is valid", func(t *testing.T) {
t.Run("return success when schema column is valid for common table", func(t *testing.T) {
builder := tableschema.NewSchemaBuilder()
schema := maxcompute.Schema{
{
Expand Down Expand Up @@ -265,7 +265,83 @@ func TestSchemaToMaxComputeColumn(t *testing.T) {
Buckets: 5,
}

err := schema.ToMaxComputeColumns(partitionColumnName, clusterColumns, builder)
err := schema.ToMaxComputeColumns(partitionColumnName, clusterColumns, builder, "common")
assert.Nil(t, err)
})
t.Run("return success when schema column is valid for delta table", func(t *testing.T) {
builder := tableschema.NewSchemaBuilder()
schema := maxcompute.Schema{
{
Name: "name",
Required: true,
DefaultValue: "test",
Type: "char",
Char: &maxcompute.Char{Length: 255},
Labels: []string{"owner", "member"},
},
{
Name: "introduction",
Type: "varchar",
VarChar: &maxcompute.VarChar{Length: 300},
},
{
Name: "age",
Type: "int",
},
{
Name: "weight",
Type: "decimal",
Decimal: &maxcompute.Decimal{Precision: 2, Scale: 1},
},
{
Name: "friends",
Type: "array",
ArraySchema: &maxcompute.Field{
Type: "string",
},
},
{
Name: "address",
Type: "struct",
StructSchema: []maxcompute.Field{
{
Name: "city",
Type: "string",
},
{
Name: "zip",
Type: "string",
},
},
},
{
Name: "other",
Type: "map",
MapSchema: &maxcompute.MapSchema{
Key: maxcompute.Field{
Type: "string",
},
Value: maxcompute.Field{
Type: "string",
},
},
},
{
Name: "data",
Type: "json",
},
}
partitionColumnName := map[string]struct{}{
"data": {},
}

clusterColumns := &maxcompute.Cluster{
Using: []string{"name", "age"},
SortBy: []maxcompute.SortColumn{{Name: "name", Order: "asc"}},
Buckets: 5,
}

err := schema.ToMaxComputeColumns(partitionColumnName, clusterColumns, builder, "delta")
assert.Nil(t, err)
})
}
Expand Down
19 changes: 11 additions & 8 deletions ext/store/maxcompute/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package maxcompute

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"strings"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"

Expand Down Expand Up @@ -133,7 +133,10 @@ func buildTableSchema(t *Table) (tableschema.TableSchema, error) {
return tableschema.TableSchema{}, err
}

return builder.Build(), nil
if t.Type == "" || t.Type == "common" {
return builder.Build(), nil
}
return builder.TblProperties(map[string]string{"transactional": "true"}).Build(), nil
}

func populateColumns(t *Table, schemaBuilder *tableschema.SchemaBuilder) error {
Expand All @@ -142,7 +145,7 @@ func populateColumns(t *Table, schemaBuilder *tableschema.SchemaBuilder) error {
partitionColNames = utils.ListToMap(t.Partition.Columns)
}

return t.Schema.ToMaxComputeColumns(partitionColNames, t.Cluster, schemaBuilder)
return t.Schema.ToMaxComputeColumns(partitionColNames, t.Cluster, schemaBuilder, t.Type)
}

func generateUpdateQuery(incoming, existing tableschema.TableSchema, schemaName string) ([]string, error) {
Expand Down Expand Up @@ -181,7 +184,7 @@ func trackSchema(parent string, column tableschema.Column, columnCollection map[
Name: column.Name,
Type: column.Type,
Comment: column.Comment,
IsNullable: true,
NotNull: false,
HasDefaultValue: false,
}, columnCollection, columnList, isExistingTable)

Expand All @@ -198,7 +201,7 @@ func trackSchema(parent string, column tableschema.Column, columnCollection map[
tableschema.Column{
Name: field.Name,
Type: field.Type,
IsNullable: true,
NotNull: false,
HasDefaultValue: false,
},
columnCollection,
Expand Down Expand Up @@ -240,7 +243,7 @@ func getNormalColumnDifferences(tableName, schemaName string, incoming []ColumnR
for _, incomingColumnRecord := range incoming {
columnFound, ok := existing[incomingColumnRecord.columnStructure]
if !ok {
if !incomingColumnRecord.columnValue.IsNullable {
if incomingColumnRecord.columnValue.NotNull {
return fmt.Errorf("unable to add new required column")
}
segment := fmt.Sprintf("if not exists %s %s", incomingColumnRecord.columnStructure, incomingColumnRecord.columnValue.Type.Name())
Expand All @@ -251,9 +254,9 @@ func getNormalColumnDifferences(tableName, schemaName string, incoming []ColumnR
continue
}

if columnFound.IsNullable && !incomingColumnRecord.columnValue.IsNullable {
if !columnFound.NotNull && incomingColumnRecord.columnValue.NotNull {
return fmt.Errorf("unable to modify column mode from nullable to required")
} else if !columnFound.IsNullable && incomingColumnRecord.columnValue.IsNullable {
} else if columnFound.NotNull && !incomingColumnRecord.columnValue.NotNull {
*sqlTasks = append(*sqlTasks, fmt.Sprintf("alter table %s.%s change column %s null;", schemaName, tableName, columnFound.Name))
}

Expand Down
1 change: 1 addition & 0 deletions ext/store/maxcompute/table_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Table struct {
Cluster *Cluster `mapstructure:"cluster,omitempty"`
Partition *Partition `mapstructure:"partition,omitempty"`
Lifecycle int `mapstructure:"lifecycle,omitempty"`
Type string `mapstructure:"type,omitempty"`

Hints map[string]string `mapstructure:"hints,omitempty"`
ExtraConfig map[string]interface{} `mapstructure:",remain"`
Expand Down
Loading

0 comments on commit b269743

Please sign in to comment.