diff --git a/ext/store/maxcompute/client.go b/ext/store/maxcompute/client.go index 6085cfcf18..02f77f6154 100644 --- a/ext/store/maxcompute/client.go +++ b/ext/store/maxcompute/client.go @@ -38,19 +38,24 @@ func NewClient(svcAccount string) (*MaxComputeClient, error) { aliAccount := account.NewAliyunAccount(cred.AccessID, cred.AccessKey) odpsIns := odps.NewOdps(aliAccount, cred.Endpoint) - odpsIns.SetDefaultProjectName(cred.ProjectName) return &MaxComputeClient{odpsIns}, nil } -func (c *MaxComputeClient) TableHandleFrom() TableResourceHandle { +func (c *MaxComputeClient) TableHandleFrom(projectSchema ProjectSchema) TableResourceHandle { + c.SetDefaultProjectName(projectSchema.Project) + c.SetCurrentSchemaName(projectSchema.Schema) + s := c.Schemas() t := c.Tables() - return NewTableHandle(c, &t) + return NewTableHandle(c, s, t) } -func (c *MaxComputeClient) ViewHandleFrom() TableResourceHandle { +func (c *MaxComputeClient) ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle { + c.SetDefaultProjectName(projectSchema.Project) + c.SetCurrentSchemaName(projectSchema.Schema) + s := c.Schemas() t := c.Tables() - return NewViewHandle(c, &t) + return NewViewHandle(c, s, t) } func collectMaxComputeCredential(jsonData []byte) (*maxComputeCredentials, error) { diff --git a/ext/store/maxcompute/client_test.go b/ext/store/maxcompute/client_test.go index 65b9fd2e7f..bf9647e378 100644 --- a/ext/store/maxcompute/client_test.go +++ b/ext/store/maxcompute/client_test.go @@ -35,7 +35,10 @@ func TestMaxComputeClient(t *testing.T) { client, err := maxcompute.NewClient(testCredJSON) assert.Nil(t, err) - tableHandle := client.TableHandleFrom() + projectSchema, err := maxcompute.ProjectSchemaFrom("proj", "schema") + assert.Nil(t, err) + + tableHandle := client.TableHandleFrom(projectSchema) assert.NotNil(t, tableHandle) }) }) @@ -44,7 +47,10 @@ func TestMaxComputeClient(t *testing.T) { client, err := maxcompute.NewClient(testCredJSON) assert.Nil(t, err) - viewHandle := client.ViewHandleFrom() + projectSchema, err := maxcompute.ProjectSchemaFrom("proj", "schema") + assert.Nil(t, err) + + viewHandle := client.ViewHandleFrom(projectSchema) assert.NotNil(t, viewHandle) }) }) diff --git a/ext/store/maxcompute/maxcompute.go b/ext/store/maxcompute/maxcompute.go index 67d892bf40..9787ba0059 100644 --- a/ext/store/maxcompute/maxcompute.go +++ b/ext/store/maxcompute/maxcompute.go @@ -17,8 +17,6 @@ const ( store = "MaxComputeStore" maxcomputeID = "maxcompute" - - TableNameSections = 3 ) type ResourceHandle interface { @@ -32,8 +30,8 @@ type TableResourceHandle interface { } type Client interface { - TableHandleFrom() TableResourceHandle - ViewHandleFrom() TableResourceHandle + TableHandleFrom(projectSchema ProjectSchema) TableResourceHandle + ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle } type ClientProvider interface { @@ -63,13 +61,18 @@ func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { return err } + projectSchema, _, err := getCompleteComponentName(res) + if err != nil { + return err + } + switch res.Kind() { case KindTable: - handle := odpsClient.TableHandleFrom() + handle := odpsClient.TableHandleFrom(projectSchema) return handle.Create(res) case KindView: - handle := odpsClient.ViewHandleFrom() + handle := odpsClient.ViewHandleFrom(projectSchema) return handle.Create(res) default: @@ -77,11 +80,11 @@ func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { } } -func (m MaxCompute) Update(ctx context.Context, resource *resource.Resource) error { +func (m MaxCompute) Update(ctx context.Context, res *resource.Resource) error { spanCtx, span := startChildSpan(ctx, "maxcompute/UpdateResource") defer span.End() - account, err := m.secretProvider.GetSecret(spanCtx, resource.Tenant(), accountKey) + account, err := m.secretProvider.GetSecret(spanCtx, res.Tenant(), accountKey) if err != nil { return err } @@ -91,17 +94,22 @@ func (m MaxCompute) Update(ctx context.Context, resource *resource.Resource) err return err } - switch resource.Kind() { + projectSchema, _, err := getCompleteComponentName(res) + if err != nil { + return err + } + + switch res.Kind() { case KindTable: - handle := odpsClient.TableHandleFrom() - return handle.Update(resource) + handle := odpsClient.TableHandleFrom(projectSchema) + return handle.Update(res) case KindView: - handle := odpsClient.ViewHandleFrom() - return handle.Update(resource) + handle := odpsClient.ViewHandleFrom(projectSchema) + return handle.Update(res) default: - return errors.InvalidArgument(store, "invalid kind for maxcompute resource "+resource.Kind()) + return errors.InvalidArgument(store, "invalid kind for maxcompute resource "+res.Kind()) } } @@ -164,7 +172,12 @@ func (m MaxCompute) Exist(ctx context.Context, tnnt tenant.Tenant, urn resource. return false, err } - kindToHandleFn := map[string]func() TableResourceHandle{ + projectSchema, err := ProjectSchemaFor(name) + if err != nil { + return false, err + } + + kindToHandleFn := map[string]func(projectSchema ProjectSchema) TableResourceHandle{ KindTable: client.TableHandleFrom, KindView: client.ViewHandleFrom, } @@ -175,7 +188,7 @@ func (m MaxCompute) Exist(ctx context.Context, tnnt tenant.Tenant, urn resource. return true, err } - if resourceHandleFn().Exists(resourceName) { + if resourceHandleFn(projectSchema).Exists(resourceName) { return true, nil } } diff --git a/ext/store/maxcompute/maxcompute_test.go b/ext/store/maxcompute/maxcompute_test.go index 570e84c336..fb57034c1c 100644 --- a/ext/store/maxcompute/maxcompute_test.go +++ b/ext/store/maxcompute/maxcompute_test.go @@ -15,8 +15,9 @@ import ( func TestMaxComputeStore(t *testing.T) { ctx := context.Background() - tableName := "test_table" - tnnt, _ := tenant.NewTenant("proj", "ns") + projectName, schemaName, tableName := "proj", "schema", "test_table" + fullName := projectName + "." + schemaName + "." + tableName + tnnt, _ := tenant.NewTenant(projectName, "ns") pts, _ := tenant.NewPlainTextSecret("secret_name", "secret_value") store := resource.MaxCompute metadata := resource.Metadata{ @@ -36,7 +37,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Create(ctx, res) @@ -50,17 +51,38 @@ func TestMaxComputeStore(t *testing.T) { defer secretProvider.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(nil, errors.New("error in client")) + clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Create(ctx, res) assert.NotNil(t, err) assert.EqualError(t, err, "error in client") }) + t.Run("returns error when schema name is empty", func(t *testing.T) { + secretProvider := new(mockSecretProvider) + secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE"). + Return(pts, nil) + defer secretProvider.AssertExpectations(t) + + client := new(mockClient) + defer client.AssertExpectations(t) + + clientProvider := new(mockClientProvider) + clientProvider.On("Get", pts.Value()).Return(client, nil) + defer clientProvider.AssertExpectations(t) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + + res, err := resource.NewResource(projectName, maxcompute.KindTable, store, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = mcStore.Create(ctx, res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid schema name: "+projectName) + }) t.Run("returns error when kind is invalid", func(t *testing.T) { secretProvider := new(mockSecretProvider) secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE"). @@ -71,12 +93,12 @@ func TestMaxComputeStore(t *testing.T) { defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, "unknown", store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Create(ctx, res) @@ -89,7 +111,7 @@ func TestMaxComputeStore(t *testing.T) { Return(pts, nil) defer secretProvider.AssertExpectations(t) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) tableHandle := new(mockTableResourceHandle) @@ -97,11 +119,11 @@ func TestMaxComputeStore(t *testing.T) { defer tableHandle.AssertExpectations(t) client := new(mockClient) - client.On("TableHandleFrom").Return(tableHandle) + client.On("TableHandleFrom", mock.Anything).Return(tableHandle) defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) @@ -114,7 +136,7 @@ func TestMaxComputeStore(t *testing.T) { Return(pts, nil) defer secretProvider.AssertExpectations(t) - res, err := resource.NewResource(tableName, maxcompute.KindView, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindView, store, tnnt, &metadata, spec) assert.Nil(t, err) tableHandle := new(mockTableResourceHandle) @@ -122,11 +144,11 @@ func TestMaxComputeStore(t *testing.T) { defer tableHandle.AssertExpectations(t) client := new(mockClient) - client.On("ViewHandleFrom").Return(tableHandle) + client.On("ViewHandleFrom", mock.Anything).Return(tableHandle) defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) @@ -144,7 +166,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Update(ctx, res) @@ -158,17 +180,38 @@ func TestMaxComputeStore(t *testing.T) { defer secretProvider.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(nil, errors.New("error in client")) + clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Update(ctx, res) assert.NotNil(t, err) assert.EqualError(t, err, "error in client") }) + t.Run("returns error when schema name is empty", func(t *testing.T) { + secretProvider := new(mockSecretProvider) + secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE"). + Return(pts, nil) + defer secretProvider.AssertExpectations(t) + + client := new(mockClient) + defer client.AssertExpectations(t) + + clientProvider := new(mockClientProvider) + clientProvider.On("Get", pts.Value()).Return(client, nil) + defer clientProvider.AssertExpectations(t) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + + res, err := resource.NewResource(projectName, maxcompute.KindTable, store, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = mcStore.Update(ctx, res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid schema name: "+projectName) + }) t.Run("returns error when kind is invalid", func(t *testing.T) { secretProvider := new(mockSecretProvider) secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE"). @@ -179,12 +222,12 @@ func TestMaxComputeStore(t *testing.T) { defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - res, err := resource.NewResource(tableName, "unknown", store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, spec) assert.Nil(t, err) err = mcStore.Update(ctx, res) @@ -197,7 +240,7 @@ func TestMaxComputeStore(t *testing.T) { Return(pts, nil) defer secretProvider.AssertExpectations(t) - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) tableHandle := new(mockTableResourceHandle) @@ -205,11 +248,11 @@ func TestMaxComputeStore(t *testing.T) { defer tableHandle.AssertExpectations(t) client := new(mockClient) - client.On("TableHandleFrom").Return(tableHandle) + client.On("TableHandleFrom", mock.Anything).Return(tableHandle) defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) @@ -222,7 +265,7 @@ func TestMaxComputeStore(t *testing.T) { Return(pts, nil) defer secretProvider.AssertExpectations(t) - res, err := resource.NewResource(tableName, maxcompute.KindView, store, tnnt, &metadata, spec) + res, err := resource.NewResource(fullName, maxcompute.KindView, store, tnnt, &metadata, spec) assert.Nil(t, err) tableHandle := new(mockTableResourceHandle) @@ -230,11 +273,11 @@ func TestMaxComputeStore(t *testing.T) { defer tableHandle.AssertExpectations(t) client := new(mockClient) - client.On("ViewHandleFrom").Return(tableHandle) + client.On("ViewHandleFrom", mock.Anything).Return(tableHandle) defer client.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(client, nil) + clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) @@ -248,7 +291,7 @@ func TestMaxComputeStore(t *testing.T) { } specWithoutValues := map[string]any{"a": "b"} t.Run("returns error when resource kind is invalid", func(t *testing.T) { - res, err := resource.NewResource(tableName, "unknown", store, tnnt, &metadata, invalidSpec) + res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, invalidSpec) assert.Nil(t, err) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) @@ -258,61 +301,61 @@ func TestMaxComputeStore(t *testing.T) { }) t.Run("for table", func(t *testing.T) { t.Run("returns error when cannot decode table", func(t *testing.T) { - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, invalidSpec) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, invalidSpec) assert.Nil(t, err) - assert.Equal(t, tableName, res.FullName()) + assert.Equal(t, fullName, res.FullName()) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) - assert.ErrorContains(t, err, "not able to decode spec for "+tableName) + assert.ErrorContains(t, err, "not able to decode spec for "+fullName) }) t.Run("returns error when decode empty table schema", func(t *testing.T) { - res, err := resource.NewResource(tableName, maxcompute.KindTable, store, tnnt, &metadata, specWithoutValues) + res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, specWithoutValues) assert.Nil(t, err) - assert.Equal(t, tableName, res.FullName()) + assert.Equal(t, fullName, res.FullName()) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) - assert.ErrorContains(t, err, "empty schema for table "+tableName) + assert.ErrorContains(t, err, "empty schema for table "+fullName) }) }) t.Run("for view", func(t *testing.T) { t.Run("returns error when cannot decode view", func(t *testing.T) { - res, err := resource.NewResource(tableName, maxcompute.KindView, store, tnnt, &metadata, invalidSpec) + res, err := resource.NewResource(fullName, maxcompute.KindView, store, tnnt, &metadata, invalidSpec) assert.Nil(t, err) - assert.Equal(t, tableName, res.FullName()) + assert.Equal(t, fullName, res.FullName()) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) - assert.ErrorContains(t, err, "not able to decode spec for "+tableName) + assert.ErrorContains(t, err, "not able to decode spec for "+fullName) }) t.Run("returns error when decode empty view schema", func(t *testing.T) { - res, err := resource.NewResource(tableName, maxcompute.KindView, store, tnnt, &metadata, specWithoutValues) + res, err := resource.NewResource(fullName, maxcompute.KindView, store, tnnt, &metadata, specWithoutValues) assert.Nil(t, err) - assert.Equal(t, tableName, res.FullName()) + assert.Equal(t, fullName, res.FullName()) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) - assert.ErrorContains(t, err, "view query is empty for "+tableName) + assert.ErrorContains(t, err, "view query is empty for "+fullName) }) }) }) t.Run("GetURN", func(t *testing.T) { spec := map[string]any{ "description": "resource", - "project": "proj", - "database": "schema", + "project": projectName, + "database": schemaName, "name": tableName, } t.Run("returns urn for resource", func(t *testing.T) { - expectedURN, err := resource.ParseURN("maxcompute://proj.schema." + tableName) + expectedURN, err := resource.ParseURN("maxcompute://" + projectName + "." + schemaName + "." + tableName) assert.NoError(t, err) - res, err := resource.NewResource("proj.schema."+tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) + res, err := resource.NewResource(projectName+"."+schemaName+"."+tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.NoError(t, err) mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) @@ -349,7 +392,7 @@ func TestMaxComputeStore(t *testing.T) { mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - urn, err := resource.NewURN("maxcompute", "project.table") + urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) @@ -362,18 +405,40 @@ func TestMaxComputeStore(t *testing.T) { defer secretProvider.AssertExpectations(t) clientProvider := new(mockClientProvider) - clientProvider.On("Get", "secret_value").Return(nil, errors.New("error in client")) + clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - urn, err := resource.NewURN("maxcompute", "project.table") + urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) assert.False(t, actualExist) assert.ErrorContains(t, actualError, "error in client") }) + t.Run("returns error when schema name is empty", func(t *testing.T) { + secretProvider := new(mockSecretProvider) + secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE"). + Return(pts, nil) + defer secretProvider.AssertExpectations(t) + + client := new(mockClient) + defer client.AssertExpectations(t) + + clientProvider := new(mockClientProvider) + clientProvider.On("Get", pts.Value()).Return(client, nil) + defer clientProvider.AssertExpectations(t) + + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + + urn, err := resource.NewURN("maxcompute", projectName) + assert.NoError(t, err) + + actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) + assert.False(t, actualExist) + assert.ErrorContains(t, actualError, "invalid schema name: "+projectName) + }) t.Run("returns true and error when resource name is invalid", func(t *testing.T) { secretProvider := new(mockSecretProvider) secretProvider.On("GetSecret", mock.Anything, tnnt, "DATASTORE_MAXCOMPUTE").Return(pts, nil) @@ -391,12 +456,12 @@ func TestMaxComputeStore(t *testing.T) { mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - urn, err := resource.NewURN("maxcompute", "table") + urn, err := resource.NewURN("maxcompute", "project.table") assert.NoError(t, err) actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) assert.True(t, actualExist) - assert.ErrorContains(t, actualError, "invalid resource name: table") + assert.ErrorContains(t, actualError, "invalid resource name: project.table") }) t.Run("returns true and error when resource name is empty", func(t *testing.T) { secretProvider := new(mockSecretProvider) @@ -415,12 +480,12 @@ func TestMaxComputeStore(t *testing.T) { mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) - urn, err := resource.NewURN("maxcompute", "project.") + urn, err := resource.NewURN("maxcompute", "project.schema.") assert.NoError(t, err) actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) assert.True(t, actualExist) - assert.ErrorContains(t, actualError, "invalid resource name: project.") + assert.ErrorContains(t, actualError, "invalid resource name: project.schema.") }) t.Run("returns true and nil when schema table resource does exist", func(t *testing.T) { secretProvider := new(mockSecretProvider) @@ -446,9 +511,9 @@ func TestMaxComputeStore(t *testing.T) { urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) - client.On("TableHandleFrom").Return(tableHandle).Maybe() + client.On("TableHandleFrom", mock.Anything).Return(tableHandle).Maybe() tableHandle.On("Exists", mock.Anything).Return(false).Maybe() - client.On("ViewHandleFrom").Return(viewHandle) + client.On("ViewHandleFrom", mock.Anything).Return(viewHandle) viewHandle.On("Exists", mock.Anything).Return(true) actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) @@ -479,9 +544,9 @@ func TestMaxComputeStore(t *testing.T) { urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) - client.On("TableHandleFrom").Return(tableHandle).Maybe() + client.On("TableHandleFrom", mock.Anything).Return(tableHandle).Maybe() tableHandle.On("Exists", mock.Anything).Return(false).Maybe() - client.On("ViewHandleFrom").Return(viewHandle).Maybe() + client.On("ViewHandleFrom", mock.Anything).Return(viewHandle).Maybe() viewHandle.On("Exists", mock.Anything).Return(false).Maybe() actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) @@ -514,13 +579,13 @@ type mockClient struct { mock.Mock } -func (m *mockClient) TableHandleFrom() maxcompute.TableResourceHandle { - args := m.Called() +func (m *mockClient) TableHandleFrom(projectSchema maxcompute.ProjectSchema) maxcompute.TableResourceHandle { + args := m.Called(projectSchema) return args.Get(0).(maxcompute.TableResourceHandle) } -func (m *mockClient) ViewHandleFrom() maxcompute.TableResourceHandle { - args := m.Called() +func (m *mockClient) ViewHandleFrom(projectSchema maxcompute.ProjectSchema) maxcompute.TableResourceHandle { + args := m.Called(projectSchema) return args.Get(0).(maxcompute.TableResourceHandle) } diff --git a/ext/store/maxcompute/resource_urn.go b/ext/store/maxcompute/resource_urn.go index 7ded6b8bad..9d56f89a3a 100644 --- a/ext/store/maxcompute/resource_urn.go +++ b/ext/store/maxcompute/resource_urn.go @@ -10,6 +10,14 @@ import ( "github.com/goto/optimus/internal/errors" ) +const ( + EntityProject = "project" + EntitySchema = "schema" + + ProjectSchemaSections = 2 + TableNameSections = 3 +) + type ResourceURN struct { Project string `mapstructure:"project"` Schema string `mapstructure:"database"` @@ -86,6 +94,34 @@ type ProjectSchema struct { Schema string } +func ProjectSchemaFrom(project, schemaName string) (ProjectSchema, error) { + if project == "" { + return ProjectSchema{}, errors.InvalidArgument(EntityProject, "maxcompute project name is empty") + } + + if schemaName == "" { + return ProjectSchema{}, errors.InvalidArgument(EntitySchema, "maxcompute schema name is empty") + } + + return ProjectSchema{ + Project: project, + Schema: schemaName, + }, nil +} + +func (ps ProjectSchema) FullName() string { + return ps.Project + "." + ps.Schema +} + +func ProjectSchemaFor(name resource.Name) (ProjectSchema, error) { + parts := strings.Split(name.String(), ".") + if len(parts) < ProjectSchemaSections { + return ProjectSchema{}, errors.InvalidArgument(EntitySchema, "invalid schema name: "+name.String()) + } + + return ProjectSchemaFrom(parts[0], parts[1]) +} + type ResourceURNs []ResourceURN func (n ResourceURNs) GroupByProjectschema() map[ProjectSchema][]string { @@ -116,18 +152,39 @@ func URNFor(res *resource.Resource) (resource.URN, error) { func getURNComponent(res *resource.Resource) (ResourceURN, error) { var spec ResourceURN if err := mapstructure.Decode(res.Spec(), &spec); err != nil { - return spec, err + msg := fmt.Sprintf("%s: not able to decode spec for %s", err, res.FullName()) + return spec, errors.InvalidArgument(resource.EntityResource, msg) } return spec, nil } -func getComponentName(res *resource.Resource) (resource.Name, error) { - component, err := getURNComponent(res) +func getCompleteComponentName(res *resource.Resource) (ProjectSchema, resource.Name, error) { + if res.Version() == resource.ResourceSpecV2 { + mcURN, err := getURNComponent(res) + if err != nil { + return ProjectSchema{}, "", err + } + + projectSchema, err := ProjectSchemaFrom(mcURN.Project, mcURN.Schema) + if err != nil { + return ProjectSchema{}, "", err + } + + return projectSchema, resource.Name(mcURN.Name), nil + } + + projectSchema, err := ProjectSchemaFor(res.Name()) + if err != nil { + return ProjectSchema{}, "", err + } + + resourceName, err := resourceNameFor(res.Name()) if err != nil { - return "", err + return ProjectSchema{}, "", err } - return resource.Name(component.Name), nil + + return projectSchema, resource.Name(resourceName), nil } func resourceNameFor(name resource.Name) (string, error) { @@ -136,5 +193,9 @@ func resourceNameFor(name resource.Name) (string, error) { return "", errors.InvalidArgument(resource.EntityResource, "invalid resource name: "+name.String()) } + if parts[2] == "" { + return "", errors.InvalidArgument(resource.EntityResource, "invalid resource name: "+name.String()) + } + return parts[2], nil } diff --git a/ext/store/maxcompute/schema_test.go b/ext/store/maxcompute/schema_test.go index 1589384dd9..1814f0064d 100644 --- a/ext/store/maxcompute/schema_test.go +++ b/ext/store/maxcompute/schema_test.go @@ -141,7 +141,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) { SortBy: []maxcompute.SortColumn{{Name: "name", Order: "asc"}}, } - err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, &builder) + err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder) assert.NotNil(t, err) assert.ErrorContains(t, err, "number of cluster buckets is needed for hash type clustering") }) @@ -164,7 +164,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) { Buckets: 5, } - err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, &builder) + err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder) assert.NotNil(t, err) assert.ErrorContains(t, err, fmt.Sprintf("cluster column %s not found in normal column", invalidClusterColumn)) }) @@ -188,7 +188,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) { Buckets: 5, } - err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, &builder) + err := schema.ToMaxComputeColumns(emptyPartitionColumnName, clusterColumns, builder) assert.NotNil(t, err) assert.ErrorContains(t, err, fmt.Sprintf("sort column %s not found in cluster column", invalidSortClusterColumn)) }) @@ -265,7 +265,7 @@ func TestSchemaToMaxComputeColumn(t *testing.T) { Buckets: 5, } - err := schema.ToMaxComputeColumns(partitionColumnName, clusterColumns, &builder) + err := schema.ToMaxComputeColumns(partitionColumnName, clusterColumns, builder) assert.Nil(t, err) }) } diff --git a/ext/store/maxcompute/table.go b/ext/store/maxcompute/table.go index 6e860189fd..cf468cce07 100644 --- a/ext/store/maxcompute/table.go +++ b/ext/store/maxcompute/table.go @@ -20,15 +20,21 @@ type ColumnRecord struct { type McSQLExecutor interface { ExecSQlWithHints(sql string, hints map[string]string) (*odps.Instance, error) + CurrentSchemaName() string +} + +type McSchema interface { + Create(schemaName string, createIfNotExists bool, comment string) error } type McTable interface { Create(schema tableschema.TableSchema, createIfNotExists bool, hints, alias map[string]string) error - BatchLoadTables(tableNames []string) ([]odps.Table, error) + BatchLoadTables(tableNames []string) ([]*odps.Table, error) } type TableHandle struct { mcSQLExecutor McSQLExecutor + mcSchema McSchema mcTable McTable } @@ -37,18 +43,22 @@ func (t TableHandle) Create(res *resource.Resource) error { if err != nil { return err } - name, err := getComponentName(res) + + _, table.Name, err = getCompleteComponentName(res) if err != nil { return err } - table.Name = name - schema, err := buildTableSchema(table) + if err := t.mcSchema.Create(t.mcSQLExecutor.CurrentSchemaName(), true, ""); err != nil { + return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err) + } + + tableSchema, err := buildTableSchema(table) if err != nil { return errors.AddErrContext(err, EntityTable, "failed to build table schema to create for "+res.FullName()) } - err = t.mcTable.Create(schema, false, table.Hints, nil) + err = t.mcTable.Create(tableSchema, false, table.Hints, nil) if err != nil { if strings.Contains(err.Error(), "Table or view already exists") { return errors.AlreadyExists(EntityTable, "table already exists on maxcompute: "+res.FullName()) @@ -59,7 +69,7 @@ func (t TableHandle) Create(res *resource.Resource) error { } func (t TableHandle) Update(res *resource.Resource) error { - tableName, err := getComponentName(res) + projectSchema, tableName, err := getCompleteComponentName(res) if err != nil { return err } @@ -68,11 +78,7 @@ func (t TableHandle) Update(res *resource.Resource) error { if err != nil { return errors.InternalError(EntityTable, "error while get table on maxcompute", err) } - - existingSchema, err := existing[0].GetSchema() - if err != nil { - return errors.AddErrContext(err, EntityTable, "failed to get old table schema to update for "+res.FullName()) - } + existingSchema := existing[0].Schema() table, err := ConvertSpecTo[Table](res) if err != nil { @@ -84,12 +90,12 @@ func (t TableHandle) Update(res *resource.Resource) error { } table.Hints["odps.sql.schema.evolution.json.enable"] = "true" - schema, err := buildTableSchema(table) + tableSchema, err := buildTableSchema(table) if err != nil { return errors.AddErrContext(err, EntityTable, "failed to build table schema to update for "+res.FullName()) } - sqlTasks, err := generateUpdateQuery(schema, *existingSchema) + sqlTasks, err := generateUpdateQuery(tableSchema, existingSchema, projectSchema.Schema) if err != nil { return errors.AddErrContext(err, EntityTable, "invalid schema for table "+res.FullName()) } @@ -100,8 +106,7 @@ func (t TableHandle) Update(res *resource.Resource) error { return errors.AddErrContext(err, EntityTable, "failed to create sql task to update for "+res.FullName()) } - err = ins.WaitForSuccess() - if err != nil { + if err = ins.WaitForSuccess(); err != nil { return errors.InternalError(EntityTable, "error while execute sql query on maxcompute", err) } } @@ -121,7 +126,7 @@ func buildTableSchema(t *Table) (tableschema.TableSchema, error) { Comment(t.Description). Lifecycle(t.Lifecycle) - err := populateColumns(t, &builder) + err := populateColumns(t, builder) if err != nil { return tableschema.TableSchema{}, err } @@ -138,20 +143,20 @@ func populateColumns(t *Table, schemaBuilder *tableschema.SchemaBuilder) error { return t.Schema.ToMaxComputeColumns(partitionColNames, t.Cluster, schemaBuilder) } -func generateUpdateQuery(incoming, existing tableschema.TableSchema) ([]string, error) { +func generateUpdateQuery(incoming, existing tableschema.TableSchema, schemaName string) ([]string, error) { var sqlTasks []string if incoming.Comment != existing.Comment { - sqlTasks = append(sqlTasks, fmt.Sprintf("alter table %s set comment '%s';", existing.TableName, incoming.Comment)) + sqlTasks = append(sqlTasks, fmt.Sprintf("alter table %s.%s set comment '%s';", schemaName, existing.TableName, incoming.Comment)) } if incoming.Lifecycle != existing.Lifecycle { - sqlTasks = append(sqlTasks, fmt.Sprintf("alter table %s set lifecycle %d;", existing.TableName, incoming.Lifecycle)) + sqlTasks = append(sqlTasks, fmt.Sprintf("alter table %s.%s set lifecycle %d;", schemaName, existing.TableName, incoming.Lifecycle)) } _, incomingFlattenSchema := flattenSchema(incoming, false) existingFlattenSchema, _ := flattenSchema(existing, true) - if err := getNormalColumnDifferences(existing.TableName, incomingFlattenSchema, existingFlattenSchema, &sqlTasks); err != nil { + if err := getNormalColumnDifferences(existing.TableName, schemaName, incomingFlattenSchema, existingFlattenSchema, &sqlTasks); err != nil { return []string{}, err } @@ -228,7 +233,7 @@ func specifyColumnStructure(parent, columnName string, isArrayStruct bool) strin return fmt.Sprintf("%s.%s", parent, columnName) } -func getNormalColumnDifferences(tableName string, incoming []ColumnRecord, existing map[string]tableschema.Column, sqlTasks *[]string) error { +func getNormalColumnDifferences(tableName, schemaName string, incoming []ColumnRecord, existing map[string]tableschema.Column, sqlTasks *[]string) error { var columnAddition []string for _, incomingColumnRecord := range incoming { columnFound, ok := existing[incomingColumnRecord.columnStructure] @@ -250,7 +255,7 @@ func getNormalColumnDifferences(tableName string, incoming []ColumnRecord, exist if columnFound.IsNullable && !incomingColumnRecord.columnValue.IsNullable { return fmt.Errorf("unable to modify column mode from nullable to required") } else if !columnFound.IsNullable && incomingColumnRecord.columnValue.IsNullable { - *sqlTasks = append(*sqlTasks, fmt.Sprintf("alter table %s change column %s null;", tableName, columnFound.Name)) + *sqlTasks = append(*sqlTasks, fmt.Sprintf("alter table %s.%s change column %s null;", schemaName, tableName, columnFound.Name)) } if columnFound.Type.ID() != incomingColumnRecord.columnValue.Type.ID() { @@ -258,8 +263,8 @@ func getNormalColumnDifferences(tableName string, incoming []ColumnRecord, exist } if incomingColumnRecord.columnValue.Comment != columnFound.Comment { - *sqlTasks = append(*sqlTasks, fmt.Sprintf("alter table %s change column %s %s %s comment '%s';", - tableName, columnFound.Name, incomingColumnRecord.columnValue.Name, columnFound.Type, incomingColumnRecord.columnValue.Comment)) + *sqlTasks = append(*sqlTasks, fmt.Sprintf("alter table %s.%s change column %s %s %s comment '%s';", + schemaName, tableName, columnFound.Name, incomingColumnRecord.columnValue.Name, columnFound.Type, incomingColumnRecord.columnValue.Comment)) } delete(existing, incomingColumnRecord.columnStructure) } @@ -272,7 +277,7 @@ func getNormalColumnDifferences(tableName string, incoming []ColumnRecord, exist if len(columnAddition) > 0 { for _, segment := range columnAddition { - addColumnQuery := fmt.Sprintf("alter table %s add column ", tableName) + segment + ";" + addColumnQuery := fmt.Sprintf("alter table %s.%s add column ", schemaName, tableName) + segment + ";" *sqlTasks = append(*sqlTasks, addColumnQuery) } } @@ -280,6 +285,6 @@ func getNormalColumnDifferences(tableName string, incoming []ColumnRecord, exist return nil } -func NewTableHandle(mcSQLExecutor McSQLExecutor, mc McTable) *TableHandle { - return &TableHandle{mcSQLExecutor: mcSQLExecutor, mcTable: mc} +func NewTableHandle(mcSQLExecutor McSQLExecutor, mcSchema McSchema, mcTable McTable) *TableHandle { + return &TableHandle{mcSQLExecutor: mcSQLExecutor, mcSchema: mcSchema, mcTable: mcTable} } diff --git a/ext/store/maxcompute/table_test.go b/ext/store/maxcompute/table_test.go index fd89f9e2bb..3120427ee7 100644 --- a/ext/store/maxcompute/table_test.go +++ b/ext/store/maxcompute/table_test.go @@ -27,16 +27,25 @@ func (m *mockMaxComputeTable) Create(schema tableschema.TableSchema, createIfNot return args.Error(0) } -func (m *mockMaxComputeTable) BatchLoadTables(tableNames []string) ([]odps.Table, error) { +func (m *mockMaxComputeTable) BatchLoadTables(tableNames []string) ([]*odps.Table, error) { args := m.Called(tableNames) - return args.Get(0).([]odps.Table), args.Error(1) + return args.Get(0).([]*odps.Table), args.Error(1) +} + +type mockMaxComputeSchema struct { + mock.Mock +} + +func (m *mockMaxComputeSchema) Create(schemaName string, createIfNotExists bool, comment string) error { + args := m.Called(schemaName, createIfNotExists, comment) + return args.Error(0) } type mockOdpsIns struct { mock.Mock } -func (m *mockOdpsIns) ExecSQl(sql string) (*odps.Instance, error) { // nolint +func (m *mockOdpsIns) ExecSQl(sql string, hints ...map[string]string) (*odps.Instance, error) { // nolint args := m.Called(sql) return args.Get(0).(*odps.Instance), args.Error(1) } @@ -46,6 +55,11 @@ func (m *mockOdpsIns) ExecSQlWithHints(sql string, hints map[string]string) (*od return args.Get(0).(*odps.Instance), args.Error(1) } +func (m *mockOdpsIns) CurrentSchemaName() string { + args := m.Called() + return args.String(0) +} + func TestTableHandle(t *testing.T) { accessID, accessKey, endpoint := "LNRJ5tH1XMSINW5J3TjYAvfX", "lAZBJhdkNbwVj3bej5BuhjwbdV0nSp", "http://service.ap-southeast-5.maxcompute.aliyun.com/api" projectName, schemaName, tableName := "proj", "schema", "test_table" @@ -58,15 +72,18 @@ func TestTableHandle(t *testing.T) { Labels: map[string]string{"owner": "optimus"}, } - normalTables := []odps.Table{ - odps.NewTable(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, tableName), + odpsInstance := odps.NewInstance(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, "") + + normalTables := []*odps.Table{ + odps.NewTable(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, schemaName, tableName), } t.Run("Create", func(t *testing.T) { t.Run("returns error when cannot convert spec", func(t *testing.T) { table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test create"}} res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) @@ -76,10 +93,63 @@ func TestTableHandle(t *testing.T) { assert.NotNil(t, err) assert.ErrorContains(t, err, "not able to decode spec for "+fullName) }) - t.Run("returns error when use invalid schema data type", func(t *testing.T) { + t.Run("returns error when table name is empty", func(t *testing.T) { table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test create", + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + } + res, err := resource.NewResource(projectName+"."+schemaName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid resource name: "+projectName+"."+schemaName) + }) + t.Run("returns error when failed to create schema", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(fmt.Errorf("error while creating schema on maxcompute")) + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + odpsIns.On("CurrentSchemaName").Return(schemaName) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test create", + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + } + res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error while creating schema on maxcompute") + }) + t.Run("returns error when use invalid table schema data type", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + odpsIns.On("CurrentSchemaName").Return(schemaName) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -105,8 +175,13 @@ func TestTableHandle(t *testing.T) { table := new(mockMaxComputeTable) table.On("Create", mock.Anything, false, emptyStringMap, emptyStringMap).Return(existTableErr) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + odpsIns.On("CurrentSchemaName").Return(schemaName) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -131,8 +206,13 @@ func TestTableHandle(t *testing.T) { table := new(mockMaxComputeTable) table.On("Create", mock.Anything, false, emptyStringMap, emptyStringMap).Return(errors.New("some error")) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + odpsIns.On("CurrentSchemaName").Return(schemaName) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -157,8 +237,13 @@ func TestTableHandle(t *testing.T) { table := new(mockMaxComputeTable) table.On("Create", mock.Anything, false, emptyStringMap, emptyStringMap).Return(nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + odpsIns.On("CurrentSchemaName").Return(schemaName) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -189,12 +274,35 @@ func TestTableHandle(t *testing.T) { }) t.Run("Update", func(t *testing.T) { + t.Run("returns error when table name is empty", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test create", + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + } + res, err := resource.NewResource(projectName+"."+schemaName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid resource name: "+projectName+"."+schemaName) + }) t.Run("returns error when table is not found on maxcompute", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, fmt.Errorf("table not found")) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, fmt.Errorf("table not found")) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test update"}} res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) @@ -204,12 +312,13 @@ func TestTableHandle(t *testing.T) { assert.NotNil(t, err) assert.ErrorContains(t, err, "error while get table on maxcompute") }) - t.Run("returns error when get table schema", func(t *testing.T) { + t.Run("returns error when cannot convert spec", func(t *testing.T) { table := new(mockMaxComputeTable) table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test update"}} res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) @@ -217,17 +326,145 @@ func TestTableHandle(t *testing.T) { err = tableHandle.Update(res) assert.NotNil(t, err) - assert.ErrorContains(t, err, "failed to get old table schema to update for "+fullName) + assert.ErrorContains(t, err, "not able to decode spec for "+fullName) + }) + t.Run("returns error when use invalid table schema", func(t *testing.T) { + table := new(mockMaxComputeTable) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test update", + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING_ERROR", + }, + }, + } + res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "failed to build table schema to update for "+fullName) + }) + t.Run("return error when update the resource for new required column", func(t *testing.T) { + table := new(mockMaxComputeTable) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test update", + "schema": []map[string]any{ + { + "required": true, + "name": "customer_id", + "type": "STRING", + }, + { + "name": "customer_address", + "type": "STRUCT", + "description": "customer address", + "struct": []map[string]any{ + { + "name": "zipcode", + "type": "ARRAY", + "description": "address zipcode", + "array": map[string]any{ + "type": "STRUCT", + "struct": []map[string]any{ + { + "name": "inside_1", + "type": "STRING", + }, + { + "name": "inside_2", + "type": "STRING", + }, + }, + }, + }, + }, + }, + }, + "lifecycle": 1, + } + res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid schema for table "+fullName) + }) + t.Run("returns error when table update query is invalid", func(t *testing.T) { + table := new(mockMaxComputeTable) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + odpsIns.On("ExecSQlWithHints", mock.Anything, mock.Anything).Return(odpsInstance, fmt.Errorf("sql task is invalid")) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + "description": "customer test", + "default_value": "customer_test", + }, + }, + } + res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "failed to create sql task to update for "+fullName) + }) + t.Run("returns error when view creation returns error", func(t *testing.T) { + table := new(mockMaxComputeTable) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + odpsIns.On("ExecSQlWithHints", mock.Anything, mock.Anything).Return(odpsInstance, nil) + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + } + res, err := resource.NewResource(fullName, maxcompute.KindTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error while execute sql query on maxcompute") }) }) t.Run("Exists", func(t *testing.T) { t.Run("returns false when error in checking existing tables", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, errors.New("error in get")) + table.On("BatchLoadTables", mock.Anything).Return(normalTables, errors.New("error in get")) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) exists := tableHandle.Exists(tableName) assert.False(t, exists) @@ -236,8 +473,9 @@ func TestTableHandle(t *testing.T) { table := new(mockMaxComputeTable) table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - tableHandle := maxcompute.NewTableHandle(odpsIns, table) + tableHandle := maxcompute.NewTableHandle(odpsIns, schema, table) exists := tableHandle.Exists(tableName) assert.True(t, exists) diff --git a/ext/store/maxcompute/view.go b/ext/store/maxcompute/view.go index 5703d99fe0..c5795f7b80 100644 --- a/ext/store/maxcompute/view.go +++ b/ext/store/maxcompute/view.go @@ -12,15 +12,21 @@ import ( ) type ViewSQLExecutor interface { - ExecSQl(sql string) (*odps.Instance, error) + ExecSQl(sql string, hints ...map[string]string) (*odps.Instance, error) + CurrentSchemaName() string +} + +type ViewSchema interface { + Create(schemaName string, createIfNotExists bool, comment string) error } type ViewTable interface { - BatchLoadTables(tableNames []string) ([]odps.Table, error) + BatchLoadTables(tableNames []string) ([]*odps.Table, error) } type ViewHandle struct { viewSQLExecutor ViewSQLExecutor + viewSchema ViewSchema viewTable ViewTable } @@ -30,7 +36,16 @@ func (v ViewHandle) Create(res *resource.Resource) error { return err } - view.Name, err = getComponentName(res) + projectSchema, viewName, err := getCompleteComponentName(res) + if err != nil { + return err + } + + if err := v.viewSchema.Create(v.viewSQLExecutor.CurrentSchemaName(), true, ""); err != nil { + return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err) + } + + view.Name, err = resource.NameFrom(projectSchema.Schema + "." + viewName.String()) if err != nil { return err } @@ -45,8 +60,7 @@ func (v ViewHandle) Create(res *resource.Resource) error { return errors.AddErrContext(err, EntityView, "failed to create sql task to create view "+res.FullName()) } - err = inst.WaitForSuccess() - if err != nil { + if err = inst.WaitForSuccess(); err != nil { if strings.Contains(err.Error(), "Table or view already exists") { return errors.AlreadyExists(EntityView, "view already exists on maxcompute: "+res.FullName()) } @@ -57,7 +71,7 @@ func (v ViewHandle) Create(res *resource.Resource) error { } func (v ViewHandle) Update(res *resource.Resource) error { - viewName, err := getComponentName(res) + projectSchema, viewName, err := getCompleteComponentName(res) if err != nil { return err } @@ -71,7 +85,11 @@ func (v ViewHandle) Update(res *resource.Resource) error { if err != nil { return err } - view.Name = viewName + + view.Name, err = resource.NameFrom(projectSchema.Schema + "." + viewName.String()) + if err != nil { + return err + } sql, err := ToViewSQL(view) if err != nil { @@ -83,8 +101,7 @@ func (v ViewHandle) Update(res *resource.Resource) error { return errors.AddErrContext(err, EntityView, "failed to create sql task to update view "+res.FullName()) } - err = inst.WaitForSuccess() - if err != nil { + if err = inst.WaitForSuccess(); err != nil { return errors.InternalError(EntityView, "failed to update view "+res.FullName(), err) } @@ -123,6 +140,6 @@ func ToViewSQL(v *View) (string, error) { return out.String(), nil } -func NewViewHandle(viewSQLExecutor ViewSQLExecutor, view ViewTable) *ViewHandle { - return &ViewHandle{viewSQLExecutor: viewSQLExecutor, viewTable: view} +func NewViewHandle(viewSQLExecutor ViewSQLExecutor, viewSchema ViewSchema, viewTable ViewTable) *ViewHandle { + return &ViewHandle{viewSQLExecutor: viewSQLExecutor, viewSchema: viewSchema, viewTable: viewTable} } diff --git a/ext/store/maxcompute/view_test.go b/ext/store/maxcompute/view_test.go index e015b35d19..06b81ca3a9 100644 --- a/ext/store/maxcompute/view_test.go +++ b/ext/store/maxcompute/view_test.go @@ -29,15 +29,16 @@ func TestViewHandle(t *testing.T) { odpsInstance := odps.NewInstance(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, "") - normalTables := []odps.Table{ - odps.NewTable(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, tableName), + normalTables := []*odps.Table{ + odps.NewTable(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, schemaName, tableName), } t.Run("Create", func(t *testing.T) { t.Run("returns error when cannot convert spec", func(t *testing.T) { table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test create"}} res, err := resource.NewResource(fullName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) @@ -47,12 +48,33 @@ func TestViewHandle(t *testing.T) { assert.NotNil(t, err) assert.ErrorContains(t, err, "not able to decode spec for "+fullName) }) - t.Run("returns error when view query is invalid", func(t *testing.T) { + t.Run("returns error when view name is empty", func(t *testing.T) { table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - odpsIns.On("ExecSQl", mock.Anything).Return(&odpsInstance, fmt.Errorf("sql task is invalid")) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test create", + "columns": []string{"customer_id", "customer_name", "product_name"}, + "view_query": "select * from test_customer;", + } + res, err := resource.NewResource(projectName+"."+schemaName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = viewHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid resource name: "+projectName+"."+schemaName) + }) + t.Run("returns error when failed to create schema", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(fmt.Errorf("error while creating schema on maxcompute")) + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + odpsIns.On("CurrentSchemaName").Return(schemaName) defer odpsIns.AssertExpectations(t) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -62,16 +84,43 @@ func TestViewHandle(t *testing.T) { res, err := resource.NewResource(fullName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) assert.Nil(t, err) + err = viewHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error while creating schema on maxcompute") + }) + t.Run("returns error when view query is invalid", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + odpsIns.On("CurrentSchemaName").Return(schemaName) + odpsIns.On("ExecSQl", mock.Anything).Return(odpsInstance, fmt.Errorf("sql task is invalid")) + defer odpsIns.AssertExpectations(t) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test create", + "columns": []string{"customer_id", "customer_name", "product_name"}, + "view_query": "select from test_customer;", + } + res, err := resource.NewResource(fullName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + err = viewHandle.Create(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "failed to create sql task to create view "+fullName) }) t.Run("returns error when view creation returns error", func(t *testing.T) { table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + schema.On("Create", schemaName, true, mock.Anything).Return(nil) + defer schema.AssertExpectations(t) odpsIns := new(mockOdpsIns) - odpsIns.On("ExecSQl", mock.Anything).Return(&odpsInstance, nil) + odpsIns.On("CurrentSchemaName").Return(schemaName) + odpsIns.On("ExecSQl", mock.Anything).Return(odpsInstance, nil) defer odpsIns.AssertExpectations(t) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test create", @@ -87,12 +136,31 @@ func TestViewHandle(t *testing.T) { }) t.Run("Update", func(t *testing.T) { + t.Run("returns error when view name is empty", func(t *testing.T) { + table := new(mockMaxComputeTable) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": "test update", + "columns": []string{"customer_id", "customer_name", "product_name"}, + "view_query": "select * from test_customer;", + } + res, err := resource.NewResource(projectName+"."+schemaName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = viewHandle.Update(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid resource name: "+projectName+"."+schemaName) + }) t.Run("returns error when view is not found", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, fmt.Errorf("view is not found")) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, fmt.Errorf("view is not found")) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test update"}} res, err := resource.NewResource(fullName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) @@ -104,10 +172,11 @@ func TestViewHandle(t *testing.T) { }) t.Run("returns error when cannot convert spec", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, nil) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{"description": []string{"test update"}} res, err := resource.NewResource(fullName, maxcompute.KindView, mcStore, tnnt, &metadata, spec) @@ -119,12 +188,13 @@ func TestViewHandle(t *testing.T) { }) t.Run("returns error when view query is invalid", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, nil) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - odpsIns.On("ExecSQl", mock.Anything).Return(&odpsInstance, fmt.Errorf("sql task is invalid")) + odpsIns.On("ExecSQl", mock.Anything).Return(odpsInstance, fmt.Errorf("sql task is invalid")) defer odpsIns.AssertExpectations(t) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test update", @@ -140,12 +210,13 @@ func TestViewHandle(t *testing.T) { }) t.Run("returns error when view creation returns error", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, nil) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - odpsIns.On("ExecSQl", mock.Anything).Return(&odpsInstance, nil) + odpsIns.On("ExecSQl", mock.Anything).Return(odpsInstance, nil) defer odpsIns.AssertExpectations(t) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) spec := map[string]any{ "description": "test update", @@ -163,10 +234,11 @@ func TestViewHandle(t *testing.T) { t.Run("Exists", func(t *testing.T) { t.Run("returns false when error in checking existing view", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]odps.Table{}, errors.New("error in get")) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, errors.New("error in get")) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) exists := viewHandle.Exists(tableName) assert.False(t, exists) @@ -175,8 +247,9 @@ func TestViewHandle(t *testing.T) { table := new(mockMaxComputeTable) table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) - viewHandle := maxcompute.NewViewHandle(odpsIns, table) + viewHandle := maxcompute.NewViewHandle(odpsIns, schema, table) exists := viewHandle.Exists(tableName) assert.True(t, exists) @@ -198,13 +271,13 @@ func TestToViewSQL(t *testing.T) { name: "create_view", args: args{ v: &maxcompute.View{ - Name: "Test_View1", + Name: "schema.test_view", Description: "Create Test View", Columns: []string{"a", "b", "c"}, ViewQuery: "select a, b, c from t1", }, }, - want: `create or replace view Test_View1 + want: `create or replace view schema.test_view (a, b, c) comment 'Create Test View' as @@ -215,12 +288,12 @@ func TestToViewSQL(t *testing.T) { name: "create_view_missing_description", args: args{ v: &maxcompute.View{ - Name: "Test_View1", + Name: "schema.test_view", Columns: []string{"a", "b", "c"}, ViewQuery: "select a, b, c from t1", }, }, - want: `create or replace view Test_View1 + want: `create or replace view schema.test_view (a, b, c) as select a, b, c from t1;`, diff --git a/go.mod b/go.mod index aaf2508829..d10d6e325b 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/MakeNowJust/heredoc v1.0.0 github.com/PagerDuty/go-pagerduty v1.5.1 github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1 - github.com/aliyun/aliyun-odps-go-sdk v0.3.7 + github.com/aliyun/aliyun-odps-go-sdk v0.3.13 github.com/briandowns/spinner v1.18.0 github.com/charmbracelet/bubbles v0.13.0 github.com/charmbracelet/bubbletea v0.22.1 @@ -55,7 +55,7 @@ require ( go.opentelemetry.io/otel/trace v1.7.0 go.uber.org/automaxprocs v1.5.1 gocloud.dev v0.26.0 - golang.org/x/net v0.10.0 + golang.org/x/net v0.20.0 golang.org/x/oauth2 v0.6.0 google.golang.org/api v0.103.0 google.golang.org/genproto v0.0.0-20221117204609-8f9c96812029 @@ -73,6 +73,9 @@ require ( cloud.google.com/go/storage v1.27.0 // indirect github.com/alecthomas/chroma v0.8.2 // indirect github.com/alessio/shellescape v1.4.1 // indirect + github.com/alibabacloud-go/debug v1.0.1 // indirect + github.com/alibabacloud-go/tea v1.2.2 // indirect + github.com/aliyun/credentials-go v1.3.10 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aws/aws-sdk-go v1.43.31 // indirect @@ -118,6 +121,7 @@ require ( github.com/jackc/puddle/v2 v2.1.2 // indirect github.com/jeremywohl/flatten v1.0.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect @@ -132,6 +136,8 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b // indirect github.com/muesli/cancelreader v0.2.2 // indirect github.com/muesli/reflow v0.3.0 // indirect @@ -161,11 +167,11 @@ require ( go.uber.org/multierr v1.8.0 // indirect go.uber.org/ratelimit v0.2.0 // indirect go.uber.org/zap v1.21.0 // indirect - golang.org/x/crypto v0.12.0 // indirect + golang.org/x/crypto v0.18.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/term v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/term v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.4.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 5a0c8fd040..5242d3814a 100644 --- a/go.sum +++ b/go.sum @@ -207,12 +207,17 @@ github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVK github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk= +github.com/alibabacloud-go/debug v1.0.0/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc= +github.com/alibabacloud-go/debug v1.0.1 h1:MsW9SmUtbb1Fnt3ieC6NNZi6aEwrXfDksD4QA6GSbPg= +github.com/alibabacloud-go/debug v1.0.1/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc= +github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU= +github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk= github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1 h1:Sc2T9vs8SCq0DErL/QIY3ZVx8F+dm+DIv4RFP9NLwC4= github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= -github.com/aliyun/aliyun-odps-go-sdk v0.3.4 h1:IwidtZJUmFjlwBRb/24LGsYn/PSeIAcV7r5Ia09dvkE= -github.com/aliyun/aliyun-odps-go-sdk v0.3.4/go.mod h1:o2yLh138hfeBZThn+rorDVNhoaFsPwFSF+CgE69yaw8= -github.com/aliyun/aliyun-odps-go-sdk v0.3.7 h1:mG+pmrQPLOwy5ycI54zJ9lcgpI7GHV8cPfX0iDM8WbM= -github.com/aliyun/aliyun-odps-go-sdk v0.3.7/go.mod h1:o2yLh138hfeBZThn+rorDVNhoaFsPwFSF+CgE69yaw8= +github.com/aliyun/aliyun-odps-go-sdk v0.3.13 h1:frPJCVxhlHceH8077tNJQEuLjPBIbnzq0FE/1HtxrFY= +github.com/aliyun/aliyun-odps-go-sdk v0.3.13/go.mod h1:t/tgF/iN5aAs/gLL7sEI8/qdax4NuFCKEjO3OJbHZqI= +github.com/aliyun/credentials-go v1.3.10 h1:45Xxrae/evfzQL9V10zL3xX31eqgLWEaIdCoPipOEQA= +github.com/aliyun/credentials-go v1.3.10/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -1042,6 +1047,7 @@ github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -1202,9 +1208,11 @@ github.com/moby/term v0.0.0-20210610120745-9d4ed1856297/go.mod h1:vgPCkQMyxTZ7ID github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= @@ -1673,8 +1681,8 @@ golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1818,8 +1826,9 @@ golang.org/x/net v0.0.0-20220919232410-f2f64ebce3c1/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2013,8 +2022,9 @@ golang.org/x/sys v0.0.0-20220818161305-2296e01440c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2024,8 +2034,9 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= -golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2038,8 +2049,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=