From cbd025cdb62b7efa9801a03d687aaca76881e6f0 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Thu, 9 Jan 2025 17:27:58 -0500 Subject: [PATCH] materializations: don't persist a spec in the destination system Validate and Apply request messages have had the last successfully validated/applied specification included with them for a while now. We can use this prior spec, rather than needing to store and retrieve it out of the destination metadata table. This does introduce a tiny difference in behavior, where currently Validate runs against the last successfully Applied (persisted) spec, rather than the last successfully Validate spec. I can't think of any practical difference this would make, and this model has been running successfully with other materializations already, such as Iceberg. --- materialize-bigquery/bigquery.go | 6 +- materialize-bigquery/bigquery_test.go | 17 +- materialize-bigquery/client.go | 37 ----- materialize-boilerplate/.snapshots/TestApply | 24 +-- materialize-boilerplate/apply.go | 44 +---- materialize-boilerplate/apply_test.go | 65 ++------ materialize-boilerplate/test_support.go | 8 +- materialize-boilerplate/validate.go | 12 +- materialize-databricks/client.go | 23 --- materialize-databricks/driver.go | 4 - materialize-databricks/driver_test.go | 19 +-- materialize-dynamodb/apply.go | 16 -- materialize-dynamodb/driver.go | 5 +- materialize-dynamodb/driver_test.go | 3 +- materialize-elasticsearch/apply.go | 16 -- materialize-elasticsearch/client.go | 82 --------- materialize-elasticsearch/driver.go | 7 +- materialize-elasticsearch/driver_test.go | 6 +- materialize-mongodb/apply.go | 48 ------ materialize-motherduck/client.go | 10 -- materialize-motherduck/driver.go | 5 +- materialize-motherduck/driver_test.go | 17 +- materialize-mysql/client.go | 10 -- materialize-mysql/driver.go | 6 +- materialize-mysql/driver_test.go | 26 +-- materialize-postgres/client.go | 9 - materialize-postgres/driver.go | 4 +- materialize-postgres/driver_test.go | 19 +-- materialize-redshift/client.go | 56 ------- materialize-redshift/driver.go | 4 +- materialize-redshift/driver_test.go | 19 +-- materialize-s3-iceberg/catalog.go | 24 +-- materialize-s3-iceberg/driver.go | 6 +- materialize-s3-iceberg/driver_test.go | 4 +- .../.snapshots/TestSpecification | 2 +- materialize-snowflake/client.go | 10 -- materialize-snowflake/config.go | 2 +- materialize-snowflake/snowflake.go | 8 +- materialize-snowflake/snowflake_test.go | 19 +-- materialize-sql/.snapshots/TestTableTemplate | 14 +- materialize-sql/apply.go | 156 ------------------ materialize-sql/driver.go | 54 +++--- materialize-sql/endpoint.go | 8 - materialize-sql/meta_tables.go | 70 +------- materialize-sql/std_sql.go | 22 --- materialize-sql/templating_test.go | 4 +- materialize-sql/test_support.go | 9 +- materialize-sqlite/sqlite.go | 11 -- materialize-sqlserver/client.go | 10 -- materialize-sqlserver/driver.go | 5 +- materialize-sqlserver/driver_test.go | 17 +- materialize-starburst/client.go | 36 ---- materialize-starburst/starburst.go | 3 - materialize-starburst/starburst_test.go | 18 +- 54 files changed, 111 insertions(+), 1028 deletions(-) diff --git a/materialize-bigquery/bigquery.go b/materialize-bigquery/bigquery.go index fd8e3e4aee..bab0fc01f2 100644 --- a/materialize-bigquery/bigquery.go +++ b/materialize-bigquery/bigquery.go @@ -154,14 +154,10 @@ func newBigQueryDriver() *sql.Driver { "bucket_path": cfg.BucketPath, }).Info("creating bigquery endpoint") - var metaBase sql.TablePath = []string{cfg.ProjectID, cfg.Dataset} - var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase) - return &sql.Endpoint{ Config: cfg, Dialect: bqDialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable([]string{cfg.ProjectID, cfg.Dataset}), NewClient: newClient, CreateTableTemplate: tplCreateTargetTable, NewResource: newTableConfig, diff --git a/materialize-bigquery/bigquery_test.go b/materialize-bigquery/bigquery_test.go index d98cfccd63..8ea7824c6f 100644 --- a/materialize-bigquery/bigquery_test.go +++ b/materialize-bigquery/bigquery_test.go @@ -14,7 +14,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -73,18 +72,12 @@ func TestValidateAndApply(t *testing.T) { t.Helper() return dumpSchema(t, ctx, client, cfg, resourceConfig) }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = client.query(ctx, fmt.Sprintf( "drop table %s;", bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), )) - - _, _ = client.query(ctx, fmt.Sprintf( - "delete from %s where materialization = 'test/sqlite'", - bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations), - )) }, ) } @@ -186,18 +179,12 @@ func TestValidateAndApplyMigrations(t *testing.T) { return b.String() }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = client.query(ctx, fmt.Sprintf( "drop table %s;", bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), )) - - _, _ = client.query(ctx, fmt.Sprintf( - "delete from %s where materialization = 'test/sqlite'", - bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations), - )) }, ) } diff --git a/materialize-bigquery/client.go b/materialize-bigquery/client.go index a9808830e5..405acee548 100644 --- a/materialize-bigquery/client.go +++ b/materialize-bigquery/client.go @@ -2,7 +2,6 @@ package connector import ( "context" - dbSql "database/sql" "encoding/base64" "errors" "fmt" @@ -16,7 +15,6 @@ import ( storage "cloud.google.com/go/storage" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" "github.com/google/uuid" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -113,11 +111,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return is, nil } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.query(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { _, err := c.query(ctx, tc.TableCreateSql) return err @@ -316,36 +309,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (specB64, version string, err error) { - job, err := c.query(ctx, fmt.Sprintf( - "SELECT version, spec FROM %s WHERE materialization=%s;", - specs.Identifier, - specs.Keys[0].Placeholder, - ), materialization.String()) - if err != nil { - return "", "", err - } - - var data struct { - Version string `bigquery:"version"` - SpecB64 string `bigquery:"spec"` - } - - if err := c.fetchOne(ctx, job, &data); err == errNotFound { - return "", "", dbSql.ErrNoRows - } else if err != nil { - return "", "", err - } - - log.WithFields(log.Fields{ - "table": specs.Identifier, - "materialization": materialization.String(), - "version": data.Version, - }).Info("existing materialization spec loaded") - - return data.SpecB64, data.Version, nil -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { _, err := c.query(ctx, strings.Join(statements, "\n")) return err diff --git a/materialize-boilerplate/.snapshots/TestApply b/materialize-boilerplate/.snapshots/TestApply index 43c6c8c7ff..60e6adfb0e 100644 --- a/materialize-boilerplate/.snapshots/TestApply +++ b/materialize-boilerplate/.snapshots/TestApply @@ -1,30 +1,18 @@ * new materialization: -create meta tables -create resource for collection "key/value" -put spec with version "aVersion" +create resource for ["key_value"] * remove required field: -create meta tables -update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false] -put spec with version "aVersion" +update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false] * add required field: -create meta tables -update resource for collection "key/value" [new projections: 1, newly nullable fields: 0, newly delta updates: false] -put spec with version "aVersion" +update resource for ["key_value"] [new projections: 1, newly nullable fields: 0, newly delta updates: false] * add binding: -create meta tables -create resource for collection "extra/collection" -put spec with version "aVersion" +create resource for ["extra_collection"] * replace binding: -create meta tables delete resource ["key_value"] -create resource for collection "key/value" -put spec with version "aVersion" +create resource for ["key_value"] * field is newly nullable: -create meta tables -update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false] -put spec with version "aVersion" +update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false] diff --git a/materialize-boilerplate/apply.go b/materialize-boilerplate/apply.go index 5e4815f0f0..c20e82ad66 100644 --- a/materialize-boilerplate/apply.go +++ b/materialize-boilerplate/apply.go @@ -43,16 +43,6 @@ type BindingUpdate struct { // resources based on binding changes. Many of these functions should return an ActionApplyFn, which // may be executed concurrently. type Applier interface { - // CreateMetaTables is called to create the tables (or the equivalent endpoint concept) that - // store a persisted spec and any other metadata the materialization needs to persist. - CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, ActionApplyFn, error) - - // LoadSpec loads the persisted spec from the metadata table. - LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) - - // PutSpec upserts a spec into the metadata table. - PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, ActionApplyFn, error) - // CreateResource creates a new resource in the endpoint. It is called only if the resource does // not already exist, either because it is brand new or because it was previously deleted as // part of a resource replacement. @@ -82,11 +72,6 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i return nil, fmt.Errorf("validating request: %w", err) } - storedSpec, err := applier.LoadSpec(ctx, req.Materialization.Name) - if err != nil { - return nil, fmt.Errorf("getting stored spec: %w", err) - } - actionDescriptions := []string{} actions := []ActionApplyFn{} @@ -97,22 +82,11 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i } } - // TODO(whb): We will eventually stop persisting specs for materializations, and instead include - // the previous spec as part of the protocol. When that happens this can go away. Then, if - // individual materializations still need individual metadata tables (ex: SQL materializations), - // they should create them as needed separately from the Applier. For now, we always call - // CreateMetaTables, and materializations are free to either create them or not. - desc, action, err := applier.CreateMetaTables(ctx, req.Materialization) - if err != nil { - return nil, fmt.Errorf("getting CreateMetaTables action: %w", err) - } - addAction(desc, action) - for bindingIdx, binding := range req.Materialization.Bindings { // The existing binding spec is used to extract various properties that can't be learned // from introspecting the destination system, such as the backfill counter and if the // materialization was previously delta updates. - existingBinding, err := findExistingBinding(binding.ResourcePath, storedSpec) + existingBinding, err := findExistingBinding(binding.ResourcePath, req.LastMaterialization) if err != nil { return nil, fmt.Errorf("finding existing binding: %w", err) } @@ -147,7 +121,7 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i desc = append(desc, createDesc) } - action = func(ctx context.Context) error { + action := func(ctx context.Context) error { if err := deleteAction(ctx); err != nil { return err } @@ -241,19 +215,5 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i } } - // Only update the spec after all other actions have completed successfully. - desc, action, err = applier.PutSpec(ctx, req.Materialization, req.Version, storedSpec != nil) - if err != nil { - return nil, fmt.Errorf("getting PutSpec action: %w", err) - } - // Although all current materializations always do persist a spec, its possible that some may - // not in the future as we transition to runtime provided specs for apply. - if action != nil { - actionDescriptions = append(actionDescriptions, desc) - if err := action(ctx); err != nil { - return nil, fmt.Errorf("updating persisted specification: %w", err) - } - } - return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil } diff --git a/materialize-boilerplate/apply_test.go b/materialize-boilerplate/apply_test.go index d4bac11a85..922597bc39 100644 --- a/materialize-boilerplate/apply_test.go +++ b/materialize-boilerplate/apply_test.go @@ -52,62 +52,37 @@ func TestApply(t *testing.T) { name: "new materialization", originalSpec: nil, newSpec: loadApplySpec(t, "base.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - createdResources: 1, - }, + want: testResults{createdResources: 1}, }, { name: "remove required field", originalSpec: loadApplySpec(t, "base.flow.proto"), newSpec: loadApplySpec(t, "remove-required.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - nullabledProjections: 1, - }, + want: testResults{nullabledProjections: 1}, }, { name: "add required field", originalSpec: loadApplySpec(t, "base.flow.proto"), newSpec: loadApplySpec(t, "add-new-required.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - addedProjections: 1, - }, + want: testResults{addedProjections: 1}, }, { name: "add binding", originalSpec: loadApplySpec(t, "base.flow.proto"), newSpec: loadApplySpec(t, "add-new-binding.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - createdResources: 1, - }, + want: testResults{createdResources: 1}, }, { name: "replace binding", originalSpec: loadApplySpec(t, "base.flow.proto"), newSpec: loadApplySpec(t, "replace-original-binding.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - deletedResources: 1, - createdResources: 1, - }, + want: testResults{deletedResources: 1, createdResources: 1}, }, { name: "field is newly nullable", originalSpec: loadApplySpec(t, "base.flow.proto"), newSpec: loadApplySpec(t, "make-nullable.flow.proto"), - want: testResults{ - createdMetaTables: true, - putSpec: true, - nullabledProjections: 1, - }, + want: testResults{nullabledProjections: 1}, }, } @@ -120,7 +95,7 @@ func TestApply(t *testing.T) { } is := testInfoSchemaFromSpec(t, tt.originalSpec, simpleTestTransform) - req := &pm.Request_Apply{Materialization: tt.newSpec, Version: "aVersion"} + req := &pm.Request_Apply{Materialization: tt.newSpec, Version: "aVersion", LastMaterialization: tt.originalSpec} // Not concurrent. got, err := ApplyChanges(ctx, req, app, is, false) @@ -147,8 +122,6 @@ func TestApply(t *testing.T) { } type testResults struct { - createdMetaTables bool - putSpec bool createdResources int deletedResources int addedProjections int @@ -164,17 +137,10 @@ type testApplier struct { results testResults } -func (a *testApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, ActionApplyFn, error) { - return "create meta tables", func(ctx context.Context) error { - a.results.createdMetaTables = true - return nil - }, nil -} - func (a *testApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, ActionApplyFn, error) { binding := spec.Bindings[bindingIndex] - return fmt.Sprintf("create resource for collection %q", binding.Collection.Name.String()), func(ctx context.Context) error { + return fmt.Sprintf("create resource for %q", binding.ResourcePath), func(ctx context.Context) error { a.mu.Lock() defer a.mu.Unlock() @@ -183,17 +149,6 @@ func (a *testApplier) CreateResource(ctx context.Context, spec *pf.Materializati }, nil } -func (a *testApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - return a.storedSpec, nil -} - -func (a *testApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, ActionApplyFn, error) { - return fmt.Sprintf("put spec with version %q", version), func(ctx context.Context) error { - a.results.putSpec = true - return nil - }, nil -} - func (a *testApplier) DeleteResource(ctx context.Context, path []string) (string, ActionApplyFn, error) { return fmt.Sprintf("delete resource %q", path), func(ctx context.Context) error { a.mu.Lock() @@ -214,8 +169,8 @@ func (a *testApplier) UpdateResource(ctx context.Context, spec *pf.Materializati } action := fmt.Sprintf( - "update resource for collection %q [new projections: %d, newly nullable fields: %d, newly delta updates: %t]", - binding.Collection.Name.String(), + "update resource for %q [new projections: %d, newly nullable fields: %d, newly delta updates: %t]", + binding.ResourcePath, len(bindingUpdate.NewProjections), len(bindingUpdate.NewlyNullableFields), bindingUpdate.NewlyDeltaUpdates, diff --git a/materialize-boilerplate/test_support.go b/materialize-boilerplate/test_support.go index afdcfb3f2e..fb0371365e 100644 --- a/materialize-boilerplate/test_support.go +++ b/materialize-boilerplate/test_support.go @@ -45,7 +45,7 @@ func RunValidateAndApplyTestCases( config any, resourceConfig any, dumpSchema func(t *testing.T) string, - cleanup func(t *testing.T, materialization pf.Materialization), + cleanup func(t *testing.T), ) { ctx := context.Background() var snap strings.Builder @@ -57,7 +57,7 @@ func RunValidateAndApplyTestCases( require.NoError(t, err) t.Run("validate and apply many different types of fields", func(t *testing.T) { - defer cleanup(t, pf.Materialization("test/sqlite")) + defer cleanup(t) fixture := loadSpec(t, "big-schema.flow.proto") @@ -155,7 +155,7 @@ func RunValidateAndApplyTestCases( for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - defer cleanup(t, pf.Materialization("test/sqlite")) + defer cleanup(t) initial := loadSpec(t, "base.flow.proto") @@ -178,7 +178,7 @@ func RunValidateAndApplyTestCases( }) t.Run("validate and apply fields with challenging names", func(t *testing.T) { - defer cleanup(t, pf.Materialization("test/sqlite")) + defer cleanup(t) fixture := loadSpec(t, "challenging-fields.flow.proto") diff --git a/materialize-boilerplate/validate.go b/materialize-boilerplate/validate.go index d1abc3e2af..0af39a8011 100644 --- a/materialize-boilerplate/validate.go +++ b/materialize-boilerplate/validate.go @@ -63,9 +63,9 @@ func (v Validator) ValidateBinding( backfill uint32, boundCollection pf.CollectionSpec, fieldConfigJsonMap map[string]json.RawMessage, - storedSpec *pf.MaterializationSpec, + lastSpec *pf.MaterializationSpec, ) (map[string]*pm.Response_Validated_Constraint, error) { - existingBinding, err := findExistingBinding(path, storedSpec) + existingBinding, err := findExistingBinding(path, lastSpec) if err != nil { return nil, err } @@ -389,12 +389,12 @@ func (v Validator) ambiguousFieldIsSelected(p pf.Projection, fieldSelection []st return false } -// findExistingBinding locates a binding within an existing stored specification. -func findExistingBinding(resourcePath []string, storedSpec *pf.MaterializationSpec) (*pf.MaterializationSpec_Binding, error) { - if storedSpec == nil { +// findExistingBinding locates a binding within an previously applied or validated specification. +func findExistingBinding(resourcePath []string, lastSpec *pf.MaterializationSpec) (*pf.MaterializationSpec_Binding, error) { + if lastSpec == nil { return nil, nil // Binding is trivially not found } - for _, existingBinding := range storedSpec.Bindings { + for _, existingBinding := range lastSpec.Bindings { if slices.Equal(resourcePath, existingBinding.ResourcePath) { return existingBinding, nil } diff --git a/materialize-databricks/client.go b/materialize-databricks/client.go index 0b13227d2a..186b5957a3 100644 --- a/materialize-databricks/client.go +++ b/materialize-databricks/client.go @@ -18,7 +18,6 @@ import ( dbsqlerr "github.com/databricks/databricks-sql-go/errors" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" log "github.com/sirupsen/logrus" _ "github.com/databricks/databricks-sql-go" @@ -123,11 +122,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return is, nil } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.QueryString) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { _, err := c.db.ExecContext(ctx, tc.TableCreateSql) return err @@ -303,23 +297,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - var version, spec string - - if err := c.db.QueryRowContext( - ctx, - fmt.Sprintf( - "SELECT version, spec FROM %s WHERE materialization = %s;", - specs.Identifier, - databricksDialect.Literal(materialization.String()), - ), - ).Scan(&version, &spec); err != nil { - return "", "", err - } - - return spec, version, nil -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 7bbe42840d..7469ab89e7 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -86,13 +86,9 @@ func newDatabricksDriver() *sql.Driver { "catalog": cfg.CatalogName, }).Info("connecting to databricks") - var metaBase sql.TablePath = []string{cfg.SchemaName} - var metaSpecs, _ = sql.MetaTables(metaBase) - return &sql.Endpoint{ Config: cfg, Dialect: databricksDialect, - MetaSpecs: &metaSpecs, MetaCheckpoints: nil, NewClient: newClient, CreateTableTemplate: tplCreateTargetTable, diff --git a/materialize-databricks/driver_test.go b/materialize-databricks/driver_test.go index 61504d7c11..a895f1452f 100644 --- a/materialize-databricks/driver_test.go +++ b/materialize-databricks/driver_test.go @@ -12,7 +12,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" _ "github.com/databricks/databricks-sql-go" @@ -75,16 +74,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", databricksDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - databricksDialect.Identifier(cfg.SchemaName, sql.DefaultFlowMaterializations), - databricksDialect.Literal(materialization.String()), - )) }, ) } @@ -143,16 +135,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", databricksDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - databricksDialect.Identifier(sql.DefaultFlowMaterializations), - databricksDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-dynamodb/apply.go b/materialize-dynamodb/apply.go index d2400b63e9..0e843f3e86 100644 --- a/materialize-dynamodb/apply.go +++ b/materialize-dynamodb/apply.go @@ -17,14 +17,6 @@ import ( type ddbApplier struct { client *client cfg config - // TODO(whb): Including the lastSpec from the validate or apply request is a - // temporary hack until we get around to removing the "load/persist a spec - // in the destination" concept more thoroughly. - lastSpec *pf.MaterializationSpec -} - -func (e *ddbApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, boilerplate.ActionApplyFn, error) { - return "", nil, nil } func (e *ddbApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, boilerplate.ActionApplyFn, error) { @@ -38,14 +30,6 @@ func (e *ddbApplier) CreateResource(ctx context.Context, spec *pf.Materializatio }, nil } -func (e *ddbApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - return e.lastSpec, nil -} - -func (e *ddbApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, _ bool) (string, boilerplate.ActionApplyFn, error) { - return "", nil, nil -} - func (e *ddbApplier) DeleteResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) { return fmt.Sprintf("delete table %q", path[0]), func(ctx context.Context) error { return deleteTable(ctx, e.client, path[0]) diff --git a/materialize-dynamodb/driver.go b/materialize-dynamodb/driver.go index eb3ee01436..93fffda6b9 100644 --- a/materialize-dynamodb/driver.go +++ b/materialize-dynamodb/driver.go @@ -262,9 +262,8 @@ func (d driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_ } return boilerplate.ApplyChanges(ctx, req, &ddbApplier{ - client: client, - cfg: cfg, - lastSpec: req.LastMaterialization, + client: client, + cfg: cfg, }, is, true) } diff --git a/materialize-dynamodb/driver_test.go b/materialize-dynamodb/driver_test.go index 56c9123d0f..a0a0af788e 100644 --- a/materialize-dynamodb/driver_test.go +++ b/materialize-dynamodb/driver_test.go @@ -16,7 +16,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/stretchr/testify/require" ) @@ -71,7 +70,7 @@ func TestValidateAndApply(t *testing.T) { return out.String() }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() for _, table := range []string{resourceConfig.Table} { diff --git a/materialize-elasticsearch/apply.go b/materialize-elasticsearch/apply.go index 677594acfe..b3b16f60d3 100644 --- a/materialize-elasticsearch/apply.go +++ b/materialize-elasticsearch/apply.go @@ -14,12 +14,6 @@ type elasticApplier struct { cfg config } -func (e *elasticApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, boilerplate.ActionApplyFn, error) { - return fmt.Sprintf("create index %q", defaultFlowMaterializations), func(ctx context.Context) error { - return e.client.createMetaIndex(ctx, e.cfg.Advanced.Replicas) - }, nil -} - func (e *elasticApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, boilerplate.ActionApplyFn, error) { binding := spec.Bindings[bindingIndex] @@ -38,16 +32,6 @@ func (e *elasticApplier) CreateResource(ctx context.Context, spec *pf.Materializ }, nil } -func (e *elasticApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - return e.client.getSpec(ctx, materialization) -} - -func (e *elasticApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, _ bool) (string, boilerplate.ActionApplyFn, error) { - return fmt.Sprintf("update stored materialization spec and set version = %s", version), func(ctx context.Context) error { - return e.client.putSpec(ctx, spec, version) - }, nil -} - func (e *elasticApplier) DeleteResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) { return fmt.Sprintf("delete index %q", path[0]), func(ctx context.Context) error { return e.client.deleteIndex(ctx, path[0]) diff --git a/materialize-elasticsearch/client.go b/materialize-elasticsearch/client.go index ee22069f9c..481d49c17c 100644 --- a/materialize-elasticsearch/client.go +++ b/materialize-elasticsearch/client.go @@ -2,102 +2,20 @@ package main import ( "context" - "encoding/base64" "encoding/json" "fmt" - "io" "net/http" - "net/url" "strings" elasticsearch "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" boilerplate "github.com/estuary/connectors/materialize-boilerplate" - pf "github.com/estuary/flow/go/protocols/flow" - "github.com/tidwall/gjson" -) - -const ( - defaultFlowMaterializations = "flow_materializations_v2" ) type client struct { es *elasticsearch.Client } -func (c *client) createMetaIndex(ctx context.Context, replicas *int) error { - props := map[string]property{ - "version": {Type: elasticTypeKeyword, Index: boolPtr(false)}, - // Binary mappings are never indexed, and to specify index: false on such a mapping results - // in an error. - "specBytes": {Type: elasticTypeBinary}, - } - - // The meta index will always be created with the default number of shards. - // Long term we plan to not require a meta index at all. - return c.createIndex(ctx, defaultFlowMaterializations, nil, replicas, props) -} - -func (c *client) putSpec(ctx context.Context, spec *pf.MaterializationSpec, version string) error { - specBytes, err := spec.Marshal() - if err != nil { - return fmt.Errorf("marshalling spec: %w", err) - } - - res, err := c.es.Index( - defaultFlowMaterializations, - esutil.NewJSONReader(map[string]string{ - "specBytes": base64.StdEncoding.EncodeToString(specBytes), - "version": version, - }), - c.es.Index.WithContext(ctx), - c.es.Index.WithDocumentID(url.PathEscape(spec.Name.String())), - ) - if err != nil { - return fmt.Errorf("putSpec: %w", err) - } - defer res.Body.Close() - if res.IsError() { - return fmt.Errorf("putSpec error response [%s] %s", res.Status(), res.String()) - } - - return nil -} - -func (c *client) getSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - res, err := c.es.Get( - defaultFlowMaterializations, - url.PathEscape(string(materialization)), - c.es.Get.WithContext(ctx), - ) - if err != nil { - return nil, fmt.Errorf("getSpec spec: %w", err) - } - defer res.Body.Close() - - if res.StatusCode == http.StatusNotFound { - return nil, nil - } else if res.IsError() { - return nil, fmt.Errorf("getSpec error response [%s] %s", res.Status(), res.String()) - } - - var spec pf.MaterializationSpec - - if jsonBytes, err := io.ReadAll(res.Body); err != nil { - return nil, fmt.Errorf("reading response body: %w", err) - } else if loc := gjson.GetBytes(jsonBytes, "_source.specBytes"); !loc.Exists() { - return nil, fmt.Errorf("malformed response: '_source.specBytes' does not exist") - } else if specBytes, err := base64.StdEncoding.DecodeString(loc.String()); err != nil { - return nil, fmt.Errorf("base64.Decode: %w", err) - } else if err := spec.Unmarshal(specBytes); err != nil { - return nil, fmt.Errorf("spec.Unmarshal: %w", err) - } else if err := spec.Validate(); err != nil { - return nil, fmt.Errorf("validating spec: %w", err) - } - - return &spec, nil -} - type createIndexParams struct { Settings indexSettings `json:"settings,omitempty"` Mappings indexMappings `json:"mappings"` diff --git a/materialize-elasticsearch/driver.go b/materialize-elasticsearch/driver.go index fede893bc5..10a83b4bcb 100644 --- a/materialize-elasticsearch/driver.go +++ b/materialize-elasticsearch/driver.go @@ -392,11 +392,6 @@ func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo return nil, cerrors.NewUserError(nil, fmt.Sprintf("could not connect to endpoint: received status code %d", p.StatusCode)) } - storedSpec, err := client.getSpec(ctx, req.Name) - if err != nil { - return nil, fmt.Errorf("getting spec: %w", err) - } - is, err := client.infoSchema(ctx) if err != nil { return nil, fmt.Errorf("getting infoSchema for validate: %w", err) @@ -422,7 +417,7 @@ func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo binding.Backfill, binding.Collection, binding.FieldConfigJsonMap, - storedSpec, + req.LastMaterialization, ) if err != nil { return nil, fmt.Errorf("validating binding: %w", err) diff --git a/materialize-elasticsearch/driver_test.go b/materialize-elasticsearch/driver_test.go index 51f815a4d7..d45c9f521b 100644 --- a/materialize-elasticsearch/driver_test.go +++ b/materialize-elasticsearch/driver_test.go @@ -13,7 +13,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -82,10 +81,9 @@ func TestValidateAndApply(t *testing.T) { return out.String() }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - - _, err := client.es.Indices.Delete([]string{defaultFlowMaterializations, resourceConfig.Index}) + _, err := client.es.Indices.Delete([]string{resourceConfig.Index}) require.NoError(t, err) }, ) diff --git a/materialize-mongodb/apply.go b/materialize-mongodb/apply.go index 421cd777b4..d6852b5a73 100644 --- a/materialize-mongodb/apply.go +++ b/materialize-mongodb/apply.go @@ -6,10 +6,7 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" pf "github.com/estuary/flow/go/protocols/flow" - proto "github.com/gogo/protobuf/proto" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) const specCollection = "flow_materializations" @@ -19,56 +16,11 @@ type mongoApplier struct { cfg config } -func (e *mongoApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, boilerplate.ActionApplyFn, error) { - // No-op since new collections are automatically created when data is added to them. - return "", nil, nil -} - func (e *mongoApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, boilerplate.ActionApplyFn, error) { // No-op since new collections are automatically created when data is added to them. return "", nil, nil } -func (a *mongoApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - metaCollection := a.client.Database(a.cfg.Database).Collection(specCollection) - - type savedSpec struct { - Id string `bson:"uuid"` - Spec []byte `bson:"spec"` - } - - var s savedSpec - if err := metaCollection.FindOne(ctx, bson.D{{Key: idField, Value: bson.D{{Key: "$eq", Value: materialization}}}}).Decode(&s); err != nil { - if err == mongo.ErrNoDocuments { - return nil, nil - } - return nil, fmt.Errorf("could not find spec: %w", err) - } - - var existing pf.MaterializationSpec - if err := proto.Unmarshal(s.Spec, &existing); err != nil { - return nil, fmt.Errorf("parsing existing materialization spec: %w", err) - } - - return &existing, nil -} - -func (a *mongoApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, _ bool) (string, boilerplate.ActionApplyFn, error) { - bs, err := proto.Marshal(spec) - if err != nil { - return "", nil, fmt.Errorf("encoding existing materialization spec: %w", err) - } - - return "update persisted spec", func(ctx context.Context) error { - metaCollection := a.client.Database(a.cfg.Database).Collection(specCollection) - opts := options.Replace().SetUpsert(true) - if _, err := metaCollection.ReplaceOne(ctx, bson.D{{Key: idField, Value: spec.Name.String()}}, bson.D{{Key: "spec", Value: bs}}, opts); err != nil { - return fmt.Errorf("upserting spec: %w", err) - } - return nil - }, nil -} - func (a *mongoApplier) DeleteResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) { return fmt.Sprintf("drop collection %q", path[1]), func(ctx context.Context) error { return a.client.Database(path[0]).Collection(path[1]).Drop(ctx) diff --git a/materialize-motherduck/client.go b/materialize-motherduck/client.go index cc59f7bcab..4bd11fcd3c 100644 --- a/materialize-motherduck/client.go +++ b/materialize-motherduck/client.go @@ -15,7 +15,6 @@ import ( awsHttp "github.com/aws/smithy-go/transport/http" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" "github.com/google/uuid" _ "github.com/marcboeker/go-duckdb" @@ -48,11 +47,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return sql.StdFetchInfoSchema(ctx, c.db, c.ep.Dialect, c.cfg.Database, resourcePaths) } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.QueryString) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { _, err := c.db.ExecContext(ctx, tc.TableCreateSql) return err @@ -202,10 +196,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - return sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-motherduck/driver.go b/materialize-motherduck/driver.go index 031522ad5c..b7fab8b155 100644 --- a/materialize-motherduck/driver.go +++ b/materialize-motherduck/driver.go @@ -163,13 +163,10 @@ func newDuckDriver() *sql.Driver { "database": cfg.Database, }).Info("opening database") - metaSpecs, metaCheckpoints := sql.MetaTables([]string{cfg.Database, cfg.Schema}) - return &sql.Endpoint{ Config: cfg, Dialect: duckDialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable([]string{cfg.Database, cfg.Schema}), NewClient: newClient, CreateTableTemplate: tplCreateTargetTable, NewResource: newTableConfig, diff --git a/materialize-motherduck/driver_test.go b/materialize-motherduck/driver_test.go index 604257fc89..0fbd1dcd18 100644 --- a/materialize-motherduck/driver_test.go +++ b/materialize-motherduck/driver_test.go @@ -11,7 +11,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -76,7 +75,7 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() db, err := cfg.db(ctx) @@ -84,12 +83,6 @@ func TestValidateAndApply(t *testing.T) { defer db.Close() _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - duckDialect.Identifier(cfg.Database, cfg.Schema, sql.DefaultFlowMaterializations), - duckDialect.Literal(materialization.String()), - )) }, ) } @@ -157,7 +150,7 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() db, err := cfg.db(ctx) @@ -165,12 +158,6 @@ func TestValidateAndApplyMigrations(t *testing.T) { defer db.Close() _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - duckDialect.Identifier(cfg.Database, cfg.Schema, sql.DefaultFlowMaterializations), - duckDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-mysql/client.go b/materialize-mysql/client.go index b381812b33..6b27e62d34 100644 --- a/materialize-mysql/client.go +++ b/materialize-mysql/client.go @@ -11,7 +11,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - "github.com/estuary/flow/go/protocols/flow" "github.com/go-sql-driver/mysql" ) @@ -238,11 +237,6 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler }, nil } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { return sql.StdInstallFence(ctx, c.db, checkpoints, fence) } @@ -251,10 +245,6 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error return sql.StdSQLExecStatements(ctx, c.db, statements) } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization flow.Materialization) (string, string, error) { - return sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) -} - func (c *client) Close() { c.db.Close() } diff --git a/materialize-mysql/driver.go b/materialize-mysql/driver.go index 21d026b883..a967579dbf 100644 --- a/materialize-mysql/driver.go +++ b/materialize-mysql/driver.go @@ -263,9 +263,6 @@ func newMysqlDriver() *sql.Driver { "user": cfg.User, }).Info("opening database") - var metaBase sql.TablePath - var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase) - if cfg.Advanced.SSLMode == "verify_ca" || cfg.Advanced.SSLMode == "verify_identity" { if err := registerCustomSSL(cfg); err != nil { return nil, fmt.Errorf("error registering custom ssl: %w", err) @@ -325,8 +322,7 @@ func newMysqlDriver() *sql.Driver { return &sql.Endpoint{ Config: cfg, Dialect: dialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable(nil), NewClient: prepareNewClient(tzLocation), CreateTableTemplate: templates.createTargetTable, NewResource: newTableConfig, diff --git a/materialize-mysql/driver_test.go b/materialize-mysql/driver_test.go index 7635f231ab..7782614607 100644 --- a/materialize-mysql/driver_test.go +++ b/materialize-mysql/driver_test.go @@ -71,15 +71,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = 'test/sqlite'", - testDialect.Identifier(sql.DefaultFlowMaterializations), - )) }, ) } @@ -137,16 +131,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } @@ -204,16 +191,9 @@ func TestValidateAndApplyMigrationsMariaDB(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-postgres/client.go b/materialize-postgres/client.go index 739c55a62a..36bcee916a 100644 --- a/materialize-postgres/client.go +++ b/materialize-postgres/client.go @@ -88,11 +88,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return sql.StdFetchInfoSchema(ctx, c.db, pgDialect, catalog, resourcePaths) } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { var res tableConfig if tc.ResourceConfigJson != nil { @@ -179,10 +174,6 @@ func (c *client) CreateSchema(ctx context.Context, schemaName string) error { return sql.StdCreateSchema(ctx, c.db, pgDialect, schemaName) } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - return sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index da1f691a14..d8e24e4328 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -225,13 +225,11 @@ func newPostgresDriver() *sql.Driver { if cfg.Schema != "" { metaBase = append(metaBase, cfg.Schema) } - var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase) return &sql.Endpoint{ Config: cfg, Dialect: pgDialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable(metaBase), NewClient: newClient, CreateTableTemplate: tplCreateTargetTable, NewResource: newTableConfig, diff --git a/materialize-postgres/driver_test.go b/materialize-postgres/driver_test.go index 38f85f9989..7f50776147 100644 --- a/materialize-postgres/driver_test.go +++ b/materialize-postgres/driver_test.go @@ -11,7 +11,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" _ "github.com/jackc/pgx/v5/stdlib" @@ -54,16 +53,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", pgDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - pgDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - pgDialect.Literal(materialization.String()), - )) }, ) } @@ -122,16 +114,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", pgDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - pgDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - pgDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-redshift/client.go b/materialize-redshift/client.go index fe3808832d..6fcbbd853f 100644 --- a/materialize-redshift/client.go +++ b/materialize-redshift/client.go @@ -20,7 +20,6 @@ import ( awsHttp "github.com/aws/smithy-go/transport/http" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -63,35 +62,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return sql.StdFetchInfoSchema(ctx, c.db, c.ep.Dialect, catalog, resourcePaths) } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - // Compress the spec bytes to store, since we are using a VARBYTE column with a limit of 1MB. - // TODO(whb): This will go away when we start passing in the last validated spec to Validate - // calls and stop needing to persist a spec at all. - // updateSpec.Parameters - specB64 := updateSpec.Parameters[1] // The second parameter is the spec - specBytes, err := base64.StdEncoding.DecodeString(specB64.(string)) - if err != nil { - return fmt.Errorf("decoding base64 spec prior to compressing: %w", err) - } - - // Sanity check that this is indeed a spec. By all rights this is totally unnecessary but it - // makes me feel a little better about indexing updateSpec.Parameters up above. - var spec pf.MaterializationSpec - if err := spec.Unmarshal(specBytes); err != nil { - return fmt.Errorf("application logic error - specBytes was not a spec: %w", err) - } - - compressed, err := compressBytes(specBytes) - if err != nil { - return fmt.Errorf("compressing spec bytes: %w", err) - } - - updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(compressed) - - _, err = c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { _, err := c.db.ExecContext(ctx, tc.TableCreateSql) return err @@ -257,32 +227,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - var version, spec string - - if err := c.db.QueryRowContext( - ctx, - fmt.Sprintf( - "SELECT version, spec FROM %s WHERE materialization = %s;", - specs.Identifier, - specs.Keys[0].Placeholder, - ), - materialization.String(), - ).Scan(&version, &spec); err != nil { - return "", "", err - } - - if hexBytes, err := hex.DecodeString(spec); err != nil { - return "", "", fmt.Errorf("hex.DecodeString: %w", err) - } else if specBytes, err := base64.StdEncoding.DecodeString(string(hexBytes)); err != nil { - return "", "", fmt.Errorf("base64.DecodeString: %w", err) - } else if specBytes, err = maybeDecompressBytes(specBytes); err != nil { - return "", "", fmt.Errorf("decompressing spec: %w", err) - } else { - return base64.StdEncoding.EncodeToString(specBytes), version, nil - } -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return c.withDB(func(db *stdsql.DB) error { return sql.StdSQLExecStatements(ctx, db, statements) }) } diff --git a/materialize-redshift/driver.go b/materialize-redshift/driver.go index 00e0f503b9..0f5b552158 100644 --- a/materialize-redshift/driver.go +++ b/materialize-redshift/driver.go @@ -232,7 +232,6 @@ func newRedshiftDriver() *sql.Driver { if cfg.Schema != "" { metaBase = append(metaBase, cfg.Schema) } - metaSpecs, metaCheckpoints := sql.MetaTables(metaBase) db, err := stdsql.Open("pgx", cfg.toURI()) if err != nil { @@ -256,8 +255,7 @@ func newRedshiftDriver() *sql.Driver { return &sql.Endpoint{ Config: cfg, Dialect: dialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable(metaBase), NewClient: newClient, CreateTableTemplate: templates.createTargetTable, NewResource: newTableConfig, diff --git a/materialize-redshift/driver_test.go b/materialize-redshift/driver_test.go index 47caa0ed3c..05be370ed8 100644 --- a/materialize-redshift/driver_test.go +++ b/materialize-redshift/driver_test.go @@ -13,7 +13,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/google/uuid" "github.com/jackc/pgx/v5" @@ -79,16 +78,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } @@ -147,16 +139,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-s3-iceberg/catalog.go b/materialize-s3-iceberg/catalog.go index a3693175ef..c634cf6729 100644 --- a/materialize-s3-iceberg/catalog.go +++ b/materialize-s3-iceberg/catalog.go @@ -12,20 +12,14 @@ import ( ) type catalog struct { - cfg *config - // TODO(whb): Including the lastSpec from the validate or apply request is a temporary hack - // until we get around to removing the "load/persist a spec in the destination" concept more - // thoroughly. As of this writing, the iceberg materialization is the first one to actually use - // the lastSpec from the validate or apply request. - lastSpec *pf.MaterializationSpec + cfg *config resourcePaths [][]string } -func newCatalog(cfg config, resourcePaths [][]string, lastSpec *pf.MaterializationSpec) *catalog { +func newCatalog(cfg config, resourcePaths [][]string) *catalog { return &catalog{ cfg: &cfg, resourcePaths: resourcePaths, - lastSpec: lastSpec, } } @@ -244,17 +238,3 @@ func (c *catalog) appendFiles( return nil } - -// These functions are vestigial from the age of persisting specs in the destination. - -func (c *catalog) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, boilerplate.ActionApplyFn, error) { - return "", nil, nil -} - -func (c *catalog) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - return c.lastSpec, nil -} - -func (c *catalog) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, boilerplate.ActionApplyFn, error) { - return "", nil, nil -} diff --git a/materialize-s3-iceberg/driver.go b/materialize-s3-iceberg/driver.go index f4fbe01c52..6a8b1f941a 100644 --- a/materialize-s3-iceberg/driver.go +++ b/materialize-s3-iceberg/driver.go @@ -256,7 +256,7 @@ func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo resourcePaths = append(resourcePaths, res.path()) } - catalog := newCatalog(cfg, resourcePaths, req.LastMaterialization) + catalog := newCatalog(cfg, resourcePaths) is, err := catalog.infoSchema(ctx) if err != nil { @@ -304,7 +304,7 @@ func (driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_Ap resourcePaths = append(resourcePaths, b.ResourcePath) } - catalog := newCatalog(cfg, resourcePaths, req.LastMaterialization) + catalog := newCatalog(cfg, resourcePaths) existingNamespaces, err := catalog.listNamespaces(ctx) if err != nil { @@ -361,7 +361,7 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil }) } - catalog := newCatalog(cfg, resourcePaths, open.Materialization) + catalog := newCatalog(cfg, resourcePaths) tablePaths, err := catalog.tablePaths(ctx, resourcePaths) if err != nil { return nil, nil, nil, fmt.Errorf("looking up table paths: %w", err) diff --git a/materialize-s3-iceberg/driver_test.go b/materialize-s3-iceberg/driver_test.go index aa3c8bd6ee..1d90db1286 100644 --- a/materialize-s3-iceberg/driver_test.go +++ b/materialize-s3-iceberg/driver_test.go @@ -10,7 +10,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/stretchr/testify/require" ) @@ -94,9 +93,8 @@ func TestValidateAndApply(t *testing.T) { return out.String() }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, do, _ := catalog.DeleteResource(ctx, resourceConfig.path()) do(ctx) }, diff --git a/materialize-snowflake/.snapshots/TestSpecification b/materialize-snowflake/.snapshots/TestSpecification index c9b28a491d..6baa3c5ee0 100644 --- a/materialize-snowflake/.snapshots/TestSpecification +++ b/materialize-snowflake/.snapshots/TestSpecification @@ -19,7 +19,7 @@ "schema": { "type": "string", "title": "Schema", - "description": "Database schema for bound collection tables (unless overridden within the binding resource configuration) as well as associated materialization metadata tables.", + "description": "Database schema for bound collection tables (unless overridden within the binding resource configuration).", "order": 4 }, "warehouse": { diff --git a/materialize-snowflake/client.go b/materialize-snowflake/client.go index 55aed0297b..d84e15c66b 100644 --- a/materialize-snowflake/client.go +++ b/materialize-snowflake/client.go @@ -10,7 +10,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" sf "github.com/snowflakedb/gosnowflake" ) @@ -58,11 +57,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (is * return sql.StdFetchInfoSchema(ctx, c.db, c.ep.Dialect, catalog, resourcePaths) } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - // The error message returned from Snowflake for the multi-statement table // creation queries is mostly a bunch of garbled nonsense about Javascript // execution errors if the user doesn't have permission to create tables in the @@ -179,10 +173,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - return sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-snowflake/config.go b/materialize-snowflake/config.go index 8884a7afa4..56693bfa0e 100644 --- a/materialize-snowflake/config.go +++ b/materialize-snowflake/config.go @@ -23,7 +23,7 @@ import ( type config struct { Host string `json:"host" jsonschema:"title=Host (Account URL),description=The Snowflake Host used for the connection. Must include the account identifier and end in .snowflakecomputing.com. Example: orgname-accountname.snowflakecomputing.com (do not include the protocol)." jsonschema_extras:"order=0,pattern=^[^/:]+.snowflakecomputing.com$"` Database string `json:"database" jsonschema:"title=Database,description=The SQL database to connect to." jsonschema_extras:"order=3"` - Schema string `json:"schema" jsonschema:"title=Schema,description=Database schema for bound collection tables (unless overridden within the binding resource configuration) as well as associated materialization metadata tables." jsonschema_extras:"order=4"` + Schema string `json:"schema" jsonschema:"title=Schema,description=Database schema for bound collection tables (unless overridden within the binding resource configuration)." jsonschema_extras:"order=4"` Warehouse string `json:"warehouse,omitempty" jsonschema:"title=Warehouse,description=The Snowflake virtual warehouse used to execute queries. Uses the default warehouse for the Snowflake user if left blank." jsonschema_extras:"order=5"` Role string `json:"role,omitempty" jsonschema:"title=Role,description=The user role used to perform actions." jsonschema_extras:"order=6"` Account string `json:"account,omitempty" jsonschema:"title=Account,description=Optional Snowflake account identifier." jsonschema_extras:"order=7"` diff --git a/materialize-snowflake/snowflake.go b/materialize-snowflake/snowflake.go index b00757bc7b..280c840a21 100644 --- a/materialize-snowflake/snowflake.go +++ b/materialize-snowflake/snowflake.go @@ -97,9 +97,6 @@ func newSnowflakeDriver() *sql.Driver { "tenant": tenant, }).Info("opening Snowflake") - var metaBase sql.TablePath = []string{parsed.Schema} - var metaSpecs, _ = sql.MetaTables(metaBase) - dsn, err := parsed.toURI(tenant) if err != nil { return nil, fmt.Errorf("building snowflake dsn: %w", err) @@ -120,9 +117,8 @@ func newSnowflakeDriver() *sql.Driver { var templates = renderTemplates(dialect) return &sql.Endpoint{ - Config: parsed, - Dialect: dialect, - MetaSpecs: &metaSpecs, + Config: parsed, + Dialect: dialect, // Snowflake does not use the checkpoint table, instead we use the recovery log // as the authoritative checkpoint and idempotent apply pattern MetaCheckpoints: nil, diff --git a/materialize-snowflake/snowflake_test.go b/materialize-snowflake/snowflake_test.go index d326e6bccc..72768e61ae 100644 --- a/materialize-snowflake/snowflake_test.go +++ b/materialize-snowflake/snowflake_test.go @@ -12,7 +12,6 @@ import ( "github.com/bradleyjkemp/cupaloy" boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/stretchr/testify/require" @@ -82,16 +81,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } @@ -153,16 +145,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-sql/.snapshots/TestTableTemplate b/materialize-sql/.snapshots/TestTableTemplate index 17cc58b6f6..ca0eba37d7 100644 --- a/materialize-sql/.snapshots/TestTableTemplate +++ b/materialize-sql/.snapshots/TestTableTemplate @@ -1,5 +1,5 @@ - CREATE TABLE one."reserved".checkpoints ( + CREATE TABLE one."reserved".checkpoints.flow_checkpoints_v1 ( materialization TEXT NOT NULL, key_begin BIGINT NOT NULL, key_end BIGINT NOT NULL, @@ -9,10 +9,10 @@ ); - COMMENT ON TABLE one."reserved".checkpoints IS 'This table holds Flow processing checkpoints used for exactly-once processing of materializations'; - COMMENT ON COLUMN one."reserved".checkpoints.materialization IS 'The name of the materialization.'; - COMMENT ON COLUMN one."reserved".checkpoints.key_begin IS 'The inclusive lower-bound key hash covered by this checkpoint.'; - COMMENT ON COLUMN one."reserved".checkpoints.key_end IS 'The inclusive upper-bound key hash covered by this checkpoint.'; - COMMENT ON COLUMN one."reserved".checkpoints.fence IS 'This nonce is used to uniquely identify unique process assignments of a shard and prevent them from conflicting.'; - COMMENT ON COLUMN one."reserved".checkpoints.checkpoint IS 'Checkpoint of the Flow consumer shard, encoded as base64 protobuf.'; + COMMENT ON TABLE one."reserved".checkpoints.flow_checkpoints_v1 IS 'This table holds Flow processing checkpoints used for exactly-once processing of materializations'; + COMMENT ON COLUMN one."reserved".checkpoints.flow_checkpoints_v1.materialization IS 'The name of the materialization.'; + COMMENT ON COLUMN one."reserved".checkpoints.flow_checkpoints_v1.key_begin IS 'The inclusive lower-bound key hash covered by this checkpoint.'; + COMMENT ON COLUMN one."reserved".checkpoints.flow_checkpoints_v1.key_end IS 'The inclusive upper-bound key hash covered by this checkpoint.'; + COMMENT ON COLUMN one."reserved".checkpoints.flow_checkpoints_v1.fence IS 'This nonce is used to uniquely identify unique process assignments of a shard and prevent them from conflicting.'; + COMMENT ON COLUMN one."reserved".checkpoints.flow_checkpoints_v1.checkpoint IS 'Checkpoint of the Flow consumer shard, encoded as base64 protobuf.'; diff --git a/materialize-sql/apply.go b/materialize-sql/apply.go index 682ef1f733..cbd5a4c26c 100644 --- a/materialize-sql/apply.go +++ b/materialize-sql/apply.go @@ -2,12 +2,9 @@ package sql import ( "context" - "database/sql" - "encoding/base64" "encoding/json" "fmt" "slices" - "strings" boilerplate "github.com/estuary/connectors/materialize-boilerplate" pf "github.com/estuary/flow/go/protocols/flow" @@ -41,14 +38,6 @@ type TableAlter struct { ColumnTypeChanges []ColumnTypeMigration } -// MetaSpecsUpdate is an endpoint-specific parameterized query and parameters needed to persist a -// new or updated materialization spec. -type MetaSpecsUpdate struct { - ParameterizedQuery string - Parameters []interface{} - QueryString string // For endpoints that do not support parameterized queries. -} - var _ boilerplate.Applier = (*sqlApplier)(nil) type sqlApplier struct { @@ -67,51 +56,6 @@ func newSqlApplier(client Client, is *boilerplate.InfoSchema, endpoint *Endpoint } } -func (a *sqlApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, boilerplate.ActionApplyFn, error) { - if (a.endpoint.MetaCheckpoints == nil || a.is.HasResource(a.endpoint.MetaCheckpoints.Path)) && - (a.endpoint.MetaSpecs == nil || a.is.HasResource(a.endpoint.MetaSpecs.Path)) { - // If this materialization does not use the checkpoints or specs table OR it does and - // they already exist, there is nothing more to do here. - return "", nil, nil - } - - var creates []TableCreate - var actionDesc []string - - for _, meta := range []*TableShape{a.endpoint.MetaSpecs, a.endpoint.MetaCheckpoints} { - if meta == nil { - continue - } - resolved, err := ResolveTable(*meta, a.endpoint.Dialect) - if err != nil { - return "", nil, err - } - createStatement, err := RenderTableTemplate(resolved, a.endpoint.CreateTableTemplate) - if err != nil { - return "", nil, err - } - creates = append(creates, TableCreate{ - Table: resolved, - TableCreateSql: createStatement, - ResourceConfigJson: nil, // not applicable for meta tables - }) - actionDesc = append(actionDesc, createStatement) - } - - if len(creates) == 0 { - return "", nil, nil - } - - return strings.Join(actionDesc, "\n"), func(ctx context.Context) error { - for _, c := range creates { - if err := a.client.CreateTable(ctx, c); err != nil { - return fmt.Errorf("creating meta table %s: %w", c.Identifier, err) - } - } - return nil - }, nil -} - func (a *sqlApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, boilerplate.ActionApplyFn, error) { table, err := getTable(a.endpoint, spec, bindingIndex) if err != nil { @@ -140,69 +84,6 @@ func (a *sqlApplier) CreateResource(ctx context.Context, spec *pf.Materializatio }, nil } -func (a *sqlApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) { - spec, _, err := loadSpec(ctx, a.client, a.endpoint, materialization) - if err != nil { - return nil, err - } - - return spec, nil -} - -func (a *sqlApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, boilerplate.ActionApplyFn, error) { - specBytes, err := spec.Marshal() - if err != nil { - panic(err) // Cannot fail to marshal. - } - - var description string - var specUpdate MetaSpecsUpdate - if a.endpoint.MetaSpecs != nil { - // Insert or update the materialization specification. Both parameterized queries and - // literal query strings are supported. A parameterized query is generally preferable, but - // some endpoints don't have support for those. - var paramArgs = []interface{}{ - a.endpoint.Identifier(a.endpoint.MetaSpecs.Path...), - a.endpoint.Placeholder(0), - a.endpoint.Placeholder(1), - a.endpoint.Placeholder(2), - } - var params = []interface{}{ - version, - base64.StdEncoding.EncodeToString(specBytes), - spec.Name.String(), - } - var queryStringArgs = []interface{}{ - paramArgs[0], - a.endpoint.Literal(params[0].(string)), - a.endpoint.Literal(params[1].(string)), - a.endpoint.Literal(params[2].(string)), - } - var descriptionArgs = []interface{}{ - paramArgs[0], - a.endpoint.Literal(params[0].(string)), - a.endpoint.Literal("(a-base64-encoded-value)"), - a.endpoint.Literal(params[2].(string)), - } - - var q string - if exists { - q = "UPDATE %[1]s SET version = %[2]s, spec = %[3]s WHERE materialization = %[4]s;" - } else { - q = "INSERT INTO %[1]s (version, spec, materialization) VALUES (%[2]s, %[3]s, %[4]s);" - } - - specUpdate.ParameterizedQuery = fmt.Sprintf(q, paramArgs...) - specUpdate.Parameters = params - specUpdate.QueryString = fmt.Sprintf(q, queryStringArgs...) - description = fmt.Sprintf(q, descriptionArgs...) - } - - return description, func(ctx context.Context) error { - return a.client.PutSpec(ctx, specUpdate) - }, nil -} - func (a *sqlApplier) DeleteResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) { return a.client.DeleteTable(ctx, path) } @@ -317,40 +198,3 @@ func getTable(endpoint *Endpoint, spec *pf.MaterializationSpec, bindingIndex int tableShape := BuildTableShape(spec, bindingIndex, resource) return ResolveTable(tableShape, endpoint.Dialect) } - -// loadSpec loads the named MaterializationSpec and its version that's stored within the Endpoint, -// if any. -func loadSpec(ctx context.Context, client Client, endpoint *Endpoint, materialization pf.Materialization) (*pf.MaterializationSpec, string, error) { - var ( - err error - metaSpecs Table - spec = new(pf.MaterializationSpec) - specB64, version string - ) - - if endpoint.MetaSpecs == nil { - return nil, "", nil - } - if metaSpecs, err = ResolveTable(*endpoint.MetaSpecs, endpoint.Dialect); err != nil { - return nil, "", fmt.Errorf("resolving specifications table: %w", err) - } - specB64, version, err = client.FetchSpecAndVersion(ctx, metaSpecs, materialization) - - if err == sql.ErrNoRows { - return nil, "", nil - } else if err != nil { - log.WithFields(log.Fields{ - "table": endpoint.MetaSpecs.Path, - "err": err, - }).Info("failed to query materialization spec (the table may not be initialized?)") - return nil, "", nil - } else if specBytes, err := base64.StdEncoding.DecodeString(specB64); err != nil { - return nil, version, fmt.Errorf("base64.Decode: %w", err) - } else if err = spec.Unmarshal(specBytes); err != nil { - return nil, version, fmt.Errorf("spec.Unmarshal: %w", err) - } else if err = spec.Validate(); err != nil { - return nil, version, fmt.Errorf("validating spec: %w", err) - } - - return spec, version, nil -} diff --git a/materialize-sql/driver.go b/materialize-sql/driver.go index de78b9e190..0befe0daae 100644 --- a/materialize-sql/driver.go +++ b/materialize-sql/driver.go @@ -72,12 +72,11 @@ func (d *Driver) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_S func (d *Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) { var ( - err error - endpoint *Endpoint - client Client - loadedSpec *pf.MaterializationSpec - resp = new(pm.Response_Validated) - conf = d.EndpointSpecType + err error + endpoint *Endpoint + client Client + resp = new(pm.Response_Validated) + conf = d.EndpointSpecType ) if err = req.Validate(); err != nil { @@ -95,10 +94,6 @@ func (d *Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Re } defer client.Close() - if loadedSpec, _, err = loadSpec(ctx, client, endpoint, req.Name); err != nil { - return nil, fmt.Errorf("loading current applied materialization spec: %w", err) - } - resources := make([]Resource, 0, len(req.Bindings)) resourcePaths := make([][]string, 0, len(req.Bindings)) for _, b := range req.Bindings { @@ -109,9 +104,6 @@ func (d *Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Re resources = append(resources, res) resourcePaths = append(resourcePaths, res.Path()) } - if endpoint.MetaSpecs != nil { - resourcePaths = append(resourcePaths, endpoint.MetaSpecs.Path) - } if endpoint.MetaCheckpoints != nil { resourcePaths = append(resourcePaths, endpoint.MetaCheckpoints.Path) } @@ -146,7 +138,7 @@ func (d *Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Re bindingSpec.Backfill, bindingSpec.Collection, bindingSpec.FieldConfigJsonMap, - loadedSpec, + req.LastMaterialization, ) if err != nil { return nil, err @@ -188,9 +180,6 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response for _, b := range req.Materialization.Bindings { resourcePaths = append(resourcePaths, b.ResourcePath) } - if endpoint.MetaSpecs != nil { - resourcePaths = append(resourcePaths, endpoint.MetaSpecs.Path) - } if endpoint.MetaCheckpoints != nil { resourcePaths = append(resourcePaths, endpoint.MetaCheckpoints.Path) } @@ -200,6 +189,22 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response return nil, err } + if endpoint.MetaCheckpoints != nil && !is.HasResource(endpoint.MetaCheckpoints.Path) { + if resolved, err := ResolveTable(*endpoint.MetaCheckpoints, endpoint.Dialect); err != nil { + return nil, err + } else if createStatement, err := RenderTableTemplate(resolved, endpoint.CreateTableTemplate); err != nil { + return nil, err + } else if err := client.CreateTable(ctx, TableCreate{ + Table: resolved, + TableCreateSql: createStatement, + ResourceConfigJson: nil, // not applicable for meta tables + }); err != nil { + return nil, fmt.Errorf("creating checkpoints table: %w", err) + } else { + log.WithField("table", resolved.Identifier).Info("created checkpoints table") + } + } + if sm, ok := client.(SchemaManager); ok { // Create any schemas that don't already exist, if the endpoint supports schemas. existingSchemas, err := sm.ListSchemas(ctx) @@ -226,7 +231,6 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response } func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open, be *boilerplate.BindingEvents) (m.Transactor, *pm.Response_Opened, *boilerplate.MaterializeOptions, error) { - var loadedVersion string var conf = d.EndpointSpecType if err := json.Unmarshal(open.Materialization.ConfigJson, conf); err != nil { @@ -247,20 +251,6 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open, be *bo defer client.Close() var resourcePaths [][]string - if endpoint.MetaSpecs != nil { - resourcePaths = append(resourcePaths, endpoint.MetaSpecs.Path) - - if _, loadedVersion, err = loadSpec(ctx, client, endpoint, open.Materialization.Name); err != nil { - return nil, nil, nil, fmt.Errorf("loading prior applied materialization spec: %w", err) - } else if loadedVersion == "" { - return nil, nil, nil, fmt.Errorf("materialization has not been applied") - } else if loadedVersion != open.Version { - return nil, nil, nil, fmt.Errorf( - "applied and current materializations are different versions (applied: %s vs current: %s)", - loadedVersion, open.Version) - } - } - var tables []Table for index, spec := range open.Materialization.Bindings { resourcePaths = append(resourcePaths, spec.ResourcePath) diff --git a/materialize-sql/endpoint.go b/materialize-sql/endpoint.go index e3da9acd20..13d6dc944a 100644 --- a/materialize-sql/endpoint.go +++ b/materialize-sql/endpoint.go @@ -13,9 +13,6 @@ import ( ) type Client interface { - // FetchSpecAndVersion retrieves the materialization from Table `specs`, - // or returns sql.ErrNoRows if no such spec exists. - FetchSpecAndVersion(ctx context.Context, specs Table, materialization pf.Materialization) (specB64, version string, _ error) ExecStatements(ctx context.Context, statements []string) error InstallFence(ctx context.Context, checkpoints Table, fence Fence) (Fence, error) @@ -25,9 +22,6 @@ type Client interface { // the relevant schemas. InfoSchema(ctx context.Context, resourcePaths [][]string) (*boilerplate.InfoSchema, error) - // PutSpec executes the MetaSpecsUpdate to upsert the spec into the metadata table. - PutSpec(ctx context.Context, spec MetaSpecsUpdate) error - // CreateTable creates a table in the destination system. CreateTable(ctx context.Context, tc TableCreate) error @@ -97,8 +91,6 @@ type Endpoint struct { Config interface{} // Dialect of the Endpoint. Dialect - // MetaSpecs is the specification meta-table of the Endpoint. - MetaSpecs *TableShape // MetaCheckpoints is the checkpoints meta-table of the Endpoint. // It's optional, and won't be created or used if it's nil. MetaCheckpoints *TableShape diff --git a/materialize-sql/meta_tables.go b/materialize-sql/meta_tables.go index 13e34dc165..39f40c8491 100644 --- a/materialize-sql/meta_tables.go +++ b/materialize-sql/meta_tables.go @@ -7,22 +7,13 @@ import ( const ( // DefaultFlowCheckpoints is the default table for checkpoints. DefaultFlowCheckpoints = "flow_checkpoints_v1" - // DefaultFlowMaterializations is the default table for materialization specs. - DefaultFlowMaterializations = "flow_materializations_v2" ) -// MetaTables returns *Table configurations for Flow materialization metadata tables, -// having the given `base` TablePath extended with their well-known table names. -func MetaTables(base TablePath) (specs, checkpoints TableShape) { - return FlowMaterializationsTable(append(base, DefaultFlowMaterializations)...), - FlowCheckpointsTable(append(base, DefaultFlowCheckpoints)...) -} - // FlowCheckpointsTable returns the TableShape that (optionally) holds fenced // materialization checkpoints, where supported by the endpoint and connector. -func FlowCheckpointsTable(path ...string) TableShape { - return TableShape{ - Path: path, +func FlowCheckpointsTable(base TablePath) *TableShape { + return &TableShape{ + Path: append(base, DefaultFlowCheckpoints), Binding: -1, Source: "", Comment: "This table holds Flow processing checkpoints used for exactly-once processing of materializations", @@ -93,58 +84,3 @@ func FlowCheckpointsTable(path ...string) TableShape { Document: nil, } } - -// FlowMaterializationsTable returns the TableShape that holds MaterializationSpecs. -func FlowMaterializationsTable(path ...string) TableShape { - - return TableShape{ - Path: path, - Binding: -1, - Source: "", - Comment: "This table is the source of truth for all materializations into this system.", - DeltaUpdates: false, - Keys: []Projection{ - { - Projection: flow.Projection{ - Field: "materialization", - Inference: flow.Inference{ - Types: []string{"string"}, - Exists: flow.Inference_MUST, - String_: &flow.Inference_String{}, - }, - }, - RawFieldConfig: nil, - Comment: "The name of the materialization.", - }, - }, - Values: []Projection{ - { - Projection: flow.Projection{ - Field: "version", - Inference: flow.Inference{ - Types: []string{"string"}, - Exists: flow.Inference_MUST, - String_: &flow.Inference_String{}, - }, - }, - RawFieldConfig: nil, - Comment: "Version of the materialization.", - }, - { - Projection: flow.Projection{ - Field: "spec", - Inference: flow.Inference{ - Types: []string{"string"}, - Exists: flow.Inference_MUST, - String_: &flow.Inference_String{ - ContentType: "application/x-protobuf; proto=flow.MaterializationSpec", - }, - }, - }, - RawFieldConfig: nil, - Comment: "Specification of the materialization, encoded as base64 protobuf.", - }, - }, - Document: nil, - } -} diff --git a/materialize-sql/std_sql.go b/materialize-sql/std_sql.go index 021c4abf4d..1ea9f25220 100644 --- a/materialize-sql/std_sql.go +++ b/materialize-sql/std_sql.go @@ -13,31 +13,9 @@ import ( "time" boilerplate "github.com/estuary/connectors/materialize-boilerplate" - pf "github.com/estuary/flow/go/protocols/flow" log "github.com/sirupsen/logrus" ) -// StdFetchSpecAndVersion is a convenience for Client implementations which -// use Go's standard `sql.DB` type under the hood. -func StdFetchSpecAndVersion(ctx context.Context, db *sql.DB, specs Table, materialization pf.Materialization) (spec, version string, err error) { - // Fail-fast: surface a connection issue. - if err = db.PingContext(ctx); err != nil { - err = fmt.Errorf("connecting to DB: %w", err) - return - } - err = db.QueryRowContext( - ctx, - fmt.Sprintf( - "SELECT version, spec FROM %s WHERE materialization = %s;", - specs.Identifier, - specs.Keys[0].Placeholder, - ), - materialization.String(), - ).Scan(&version, &spec) - - return -} - // StdSQLExecStatements is a convenience for Client implementations which // use Go's standard `sql.DB` type under the hood. func StdSQLExecStatements(ctx context.Context, db *sql.DB, statements []string) error { diff --git a/materialize-sql/templating_test.go b/materialize-sql/templating_test.go index c37afc38f9..513698c35e 100644 --- a/materialize-sql/templating_test.go +++ b/materialize-sql/templating_test.go @@ -54,9 +54,9 @@ func newTestDialect() Dialect { func TestTableTemplate(t *testing.T) { var ( - shape = FlowCheckpointsTable("one", "reserved", "checkpoints") + shape = FlowCheckpointsTable([]string{"one", "reserved", "checkpoints"}) dialect = newTestDialect() - table, err = ResolveTable(shape, dialect) + table, err = ResolveTable(*shape, dialect) ) assert.NoError(t, err) diff --git a/materialize-sql/test_support.go b/materialize-sql/test_support.go index c17a1111bf..3941a2924a 100644 --- a/materialize-sql/test_support.go +++ b/materialize-sql/test_support.go @@ -42,8 +42,9 @@ func RunFenceTestCases( var runTest = func(t *testing.T, ranges ...uint32) { var ctx = context.Background() - var metaShape = FlowCheckpointsTable(checkpointsPath...) - var metaTable, err = ResolveTable(metaShape, dialect) + var metaShape = FlowCheckpointsTable(checkpointsPath) + metaShape.Path = checkpointsPath + var metaTable, err = ResolveTable(*metaShape, dialect) require.NoError(t, err) createSQL, err := RenderTableTemplate(metaTable, createTableTpl) @@ -319,7 +320,7 @@ func RunValidateAndApplyMigrationsTests( dumpSchema func(t *testing.T) string, insertData func(t *testing.T, cols []string, values []string), dumpData func(t *testing.T) string, - cleanup func(t *testing.T, materialization pf.Materialization), + cleanup func(t *testing.T), ) { ctx := context.Background() var snap strings.Builder @@ -331,7 +332,7 @@ func RunValidateAndApplyMigrationsTests( require.NoError(t, err) t.Run("validate and apply migratable type changes", func(t *testing.T) { - defer cleanup(t, pf.Materialization("test/sqlite")) + defer cleanup(t) fixture := loadValidateSpec(t, "base.flow.proto") diff --git a/materialize-sqlite/sqlite.go b/materialize-sqlite/sqlite.go index a0126c83da..ab230b64a9 100644 --- a/materialize-sqlite/sqlite.go +++ b/materialize-sqlite/sqlite.go @@ -79,7 +79,6 @@ func NewSQLiteDriver() *sql.Driver { return &sql.Endpoint{ Config: config{path: path}, Dialect: sqliteDialect, - MetaSpecs: nil, MetaCheckpoints: nil, NewClient: newClient, CreateTableTemplate: tplCreateTargetTable, @@ -125,16 +124,6 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler return "", nil, nil } -// We don't use specs table for sqlite since it is ephemeral and won't be -// persisted between ApplyUpsert and Transactions calls -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (specB64, version string, err error) { - return "", "", stdsql.ErrNoRows -} - -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - return nil -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-sqlserver/client.go b/materialize-sqlserver/client.go index d9cd489d2e..db1a9da90a 100644 --- a/materialize-sqlserver/client.go +++ b/materialize-sqlserver/client.go @@ -14,7 +14,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - "github.com/estuary/flow/go/protocols/flow" mssqldb "github.com/microsoft/go-mssqldb" ) @@ -222,11 +221,6 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler }, nil } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - _, err := c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) - return err -} - func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { return installFence(ctx, c.ep.Dialect, c.db, checkpoints, fence, base64.StdEncoding.DecodeString) } @@ -235,10 +229,6 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error return sql.StdSQLExecStatements(ctx, c.db, statements) } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization flow.Materialization) (string, string, error) { - return sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) -} - func (c *client) Close() { c.db.Close() } diff --git a/materialize-sqlserver/driver.go b/materialize-sqlserver/driver.go index 40a7758e04..457eada0f7 100644 --- a/materialize-sqlserver/driver.go +++ b/materialize-sqlserver/driver.go @@ -181,8 +181,6 @@ func newSqlServerDriver() *sql.Driver { if cfg.Schema != "" { metaBase = append(metaBase, cfg.Schema) } - var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase) - db, err := stdsql.Open("sqlserver", cfg.ToURI()) if err != nil { return nil, fmt.Errorf("opening db: %w", err) @@ -209,8 +207,7 @@ func newSqlServerDriver() *sql.Driver { return &sql.Endpoint{ Config: cfg, Dialect: dialect, - MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: sql.FlowCheckpointsTable(metaBase), NewClient: newClient, CreateTableTemplate: templates.createTargetTable, NewResource: newTableConfig, diff --git a/materialize-sqlserver/driver_test.go b/materialize-sqlserver/driver_test.go index 039058eb69..22cd53e286 100644 --- a/materialize-sqlserver/driver_test.go +++ b/materialize-sqlserver/driver_test.go @@ -59,15 +59,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = 'test/sqlite'", - testDialect.Identifier("flow_materializations_v2"), - )) }, ) } @@ -133,16 +127,9 @@ func TestValidateAndApplyMigrations(t *testing.T) { return rows }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - testDialect.Identifier(sql.DefaultFlowMaterializations), - testDialect.Literal(materialization.String()), - )) }, ) } diff --git a/materialize-starburst/client.go b/materialize-starburst/client.go index 924340407e..9818115664 100644 --- a/materialize-starburst/client.go +++ b/materialize-starburst/client.go @@ -9,7 +9,6 @@ import ( boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" ) var _ sql.SchemaManager = (*client)(nil) @@ -111,13 +110,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi return is, nil } -func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error { - // Normalize query by removing trailing ';' as Trino does not accept it. - updateSpec.QueryString = strings.TrimRight(updateSpec.QueryString, ";") - _, err := c.db.ExecContext(ctx, updateSpec.QueryString) - return err -} - func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error { _, err := c.db.ExecContext(ctx, tc.TableCreateSql) return err @@ -186,34 +178,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { return errs } -func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (spec, version string, err error) { - - fetchVersionAndSpecQuery, err := sql.RenderTableTemplate(specs, c.templates.fetchVersionAndSpec) - if err != nil { - return "", "", err - } - - // QueryRowContext cannot be used until Trino driver issue is fixed https://github.com/trinodb/trino-go-client/issues/102 - rows, err := c.db.QueryContext(ctx, - fetchVersionAndSpecQuery, - materialization.String()) - if err != nil { - return "", "", fmt.Errorf("quering spec and version faield: %w", err) - } - var numberOfResults int - for rows.Next() { - numberOfResults++ - if err := rows.Scan(&version, &spec); err != nil { - return "", "", fmt.Errorf("quering spec and version faield: %w", err) - } - } - if numberOfResults != 1 { - return "", "", fmt.Errorf("quering spec and version should return exactly one result number of results: %d", numberOfResults) - } - - return -} - func (c *client) ExecStatements(ctx context.Context, statements []string) error { return sql.StdSQLExecStatements(ctx, c.db, statements) } diff --git a/materialize-starburst/starburst.go b/materialize-starburst/starburst.go index e63930df82..9806b63651 100644 --- a/materialize-starburst/starburst.go +++ b/materialize-starburst/starburst.go @@ -117,14 +117,11 @@ func newStarburstDriver() *sql.Driver { "schema": cfg.Schema, }).Info("opening Starburst") - var metaBase sql.TablePath - var metaSpecs, _ = sql.MetaTables(metaBase) var templates = renderTemplates(starburstTrinoDialect) return &sql.Endpoint{ Config: cfg, Dialect: starburstTrinoDialect, - MetaSpecs: &metaSpecs, NewClient: newClient, CreateTableTemplate: templates.createTargetTable, NewResource: newTableConfig, diff --git a/materialize-starburst/starburst_test.go b/materialize-starburst/starburst_test.go index 0406d18973..d250310d63 100644 --- a/materialize-starburst/starburst_test.go +++ b/materialize-starburst/starburst_test.go @@ -7,16 +7,15 @@ import ( stdsql "database/sql" "encoding/json" "fmt" - "github.com/bradleyjkemp/cupaloy" - boilerplate "github.com/estuary/connectors/materialize-boilerplate" - sql "github.com/estuary/connectors/materialize-sql" - pf "github.com/estuary/flow/go/protocols/flow" - pm "github.com/estuary/flow/go/protocols/materialize" "os" "slices" "strings" "testing" + "github.com/bradleyjkemp/cupaloy" + boilerplate "github.com/estuary/connectors/materialize-boilerplate" + pm "github.com/estuary/flow/go/protocols/materialize" + "github.com/stretchr/testify/require" ) @@ -80,16 +79,9 @@ func TestValidateAndApply(t *testing.T) { return sch }, - func(t *testing.T, materialization pf.Materialization) { + func(t *testing.T) { t.Helper() - _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s", targetTableDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))) - - _, _ = db.ExecContext(ctx, fmt.Sprintf( - "delete from %s where materialization = %s", - targetTableDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations), - targetTableDialect.Literal(materialization.String()), - )) }, ) }