Skip to content

Commit

Permalink
materializations: don't persist a spec in the destination system
Browse files Browse the repository at this point in the history
Validate and Apply request messages have had the last successfully
validated/applied specification included with them for a while now. We can use
this prior spec, rather than needing to store and retrieve it out of the
destination metadata table.

This does introduce a tiny difference in behavior, where currently Validate runs
against the last successfully Applied (persisted) spec, rather than the last
successfully Validate spec. I can't think of any practical difference this would
make, and this model has been running successfully with other materializations
already, such as Iceberg.
  • Loading branch information
williamhbaker committed Jan 9, 2025
1 parent 8a0344d commit cbd025c
Show file tree
Hide file tree
Showing 54 changed files with 111 additions and 1,028 deletions.
6 changes: 1 addition & 5 deletions materialize-bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,10 @@ func newBigQueryDriver() *sql.Driver {
"bucket_path": cfg.BucketPath,
}).Info("creating bigquery endpoint")

var metaBase sql.TablePath = []string{cfg.ProjectID, cfg.Dataset}
var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase)

return &sql.Endpoint{
Config: cfg,
Dialect: bqDialect,
MetaSpecs: &metaSpecs,
MetaCheckpoints: &metaCheckpoints,
MetaCheckpoints: sql.FlowCheckpointsTable([]string{cfg.ProjectID, cfg.Dataset}),
NewClient: newClient,
CreateTableTemplate: tplCreateTargetTable,
NewResource: newTableConfig,
Expand Down
17 changes: 2 additions & 15 deletions materialize-bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/bradleyjkemp/cupaloy"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
sql "github.com/estuary/connectors/materialize-sql"
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,18 +72,12 @@ func TestValidateAndApply(t *testing.T) {
t.Helper()
return dumpSchema(t, ctx, client, cfg, resourceConfig)
},
func(t *testing.T, materialization pf.Materialization) {
func(t *testing.T) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}
Expand Down Expand Up @@ -186,18 +179,12 @@ func TestValidateAndApplyMigrations(t *testing.T) {

return b.String()
},
func(t *testing.T, materialization pf.Materialization) {
func(t *testing.T) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}
Expand Down
37 changes: 0 additions & 37 deletions materialize-bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connector

import (
"context"
dbSql "database/sql"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -16,7 +15,6 @@ import (
storage "cloud.google.com/go/storage"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
sql "github.com/estuary/connectors/materialize-sql"
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -113,11 +111,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi
return is, nil
}

func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error {
_, err := c.query(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...)
return err
}

func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error {
_, err := c.query(ctx, tc.TableCreateSql)
return err
Expand Down Expand Up @@ -316,36 +309,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr {
return errs
}

func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (specB64, version string, err error) {
job, err := c.query(ctx, fmt.Sprintf(
"SELECT version, spec FROM %s WHERE materialization=%s;",
specs.Identifier,
specs.Keys[0].Placeholder,
), materialization.String())
if err != nil {
return "", "", err
}

var data struct {
Version string `bigquery:"version"`
SpecB64 string `bigquery:"spec"`
}

if err := c.fetchOne(ctx, job, &data); err == errNotFound {
return "", "", dbSql.ErrNoRows
} else if err != nil {
return "", "", err
}

log.WithFields(log.Fields{
"table": specs.Identifier,
"materialization": materialization.String(),
"version": data.Version,
}).Info("existing materialization spec loaded")

return data.SpecB64, data.Version, nil
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
_, err := c.query(ctx, strings.Join(statements, "\n"))
return err
Expand Down
24 changes: 6 additions & 18 deletions materialize-boilerplate/.snapshots/TestApply
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
* new materialization:
create meta tables
create resource for collection "key/value"
put spec with version "aVersion"
create resource for ["key_value"]

* remove required field:
create meta tables
update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false]

* add required field:
create meta tables
update resource for collection "key/value" [new projections: 1, newly nullable fields: 0, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 1, newly nullable fields: 0, newly delta updates: false]

* add binding:
create meta tables
create resource for collection "extra/collection"
put spec with version "aVersion"
create resource for ["extra_collection"]

* replace binding:
create meta tables
delete resource ["key_value"]
create resource for collection "key/value"
put spec with version "aVersion"
create resource for ["key_value"]

* field is newly nullable:
create meta tables
update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false]
44 changes: 2 additions & 42 deletions materialize-boilerplate/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ type BindingUpdate struct {
// resources based on binding changes. Many of these functions should return an ActionApplyFn, which
// may be executed concurrently.
type Applier interface {
// CreateMetaTables is called to create the tables (or the equivalent endpoint concept) that
// store a persisted spec and any other metadata the materialization needs to persist.
CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, ActionApplyFn, error)

// LoadSpec loads the persisted spec from the metadata table.
LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error)

// PutSpec upserts a spec into the metadata table.
PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, ActionApplyFn, error)

// CreateResource creates a new resource in the endpoint. It is called only if the resource does
// not already exist, either because it is brand new or because it was previously deleted as
// part of a resource replacement.
Expand Down Expand Up @@ -82,11 +72,6 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
return nil, fmt.Errorf("validating request: %w", err)
}

storedSpec, err := applier.LoadSpec(ctx, req.Materialization.Name)
if err != nil {
return nil, fmt.Errorf("getting stored spec: %w", err)
}

actionDescriptions := []string{}
actions := []ActionApplyFn{}

Expand All @@ -97,22 +82,11 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
}
}

// TODO(whb): We will eventually stop persisting specs for materializations, and instead include
// the previous spec as part of the protocol. When that happens this can go away. Then, if
// individual materializations still need individual metadata tables (ex: SQL materializations),
// they should create them as needed separately from the Applier. For now, we always call
// CreateMetaTables, and materializations are free to either create them or not.
desc, action, err := applier.CreateMetaTables(ctx, req.Materialization)
if err != nil {
return nil, fmt.Errorf("getting CreateMetaTables action: %w", err)
}
addAction(desc, action)

for bindingIdx, binding := range req.Materialization.Bindings {
// The existing binding spec is used to extract various properties that can't be learned
// from introspecting the destination system, such as the backfill counter and if the
// materialization was previously delta updates.
existingBinding, err := findExistingBinding(binding.ResourcePath, storedSpec)
existingBinding, err := findExistingBinding(binding.ResourcePath, req.LastMaterialization)
if err != nil {
return nil, fmt.Errorf("finding existing binding: %w", err)
}
Expand Down Expand Up @@ -147,7 +121,7 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
desc = append(desc, createDesc)
}

action = func(ctx context.Context) error {
action := func(ctx context.Context) error {
if err := deleteAction(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -241,19 +215,5 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
}
}

// Only update the spec after all other actions have completed successfully.
desc, action, err = applier.PutSpec(ctx, req.Materialization, req.Version, storedSpec != nil)
if err != nil {
return nil, fmt.Errorf("getting PutSpec action: %w", err)
}
// Although all current materializations always do persist a spec, its possible that some may
// not in the future as we transition to runtime provided specs for apply.
if action != nil {
actionDescriptions = append(actionDescriptions, desc)
if err := action(ctx); err != nil {
return nil, fmt.Errorf("updating persisted specification: %w", err)
}
}

return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil
}
65 changes: 10 additions & 55 deletions materialize-boilerplate/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,62 +52,37 @@ func TestApply(t *testing.T) {
name: "new materialization",
originalSpec: nil,
newSpec: loadApplySpec(t, "base.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
createdResources: 1,
},
want: testResults{createdResources: 1},
},
{
name: "remove required field",
originalSpec: loadApplySpec(t, "base.flow.proto"),
newSpec: loadApplySpec(t, "remove-required.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
nullabledProjections: 1,
},
want: testResults{nullabledProjections: 1},
},
{
name: "add required field",
originalSpec: loadApplySpec(t, "base.flow.proto"),
newSpec: loadApplySpec(t, "add-new-required.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
addedProjections: 1,
},
want: testResults{addedProjections: 1},
},
{
name: "add binding",
originalSpec: loadApplySpec(t, "base.flow.proto"),
newSpec: loadApplySpec(t, "add-new-binding.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
createdResources: 1,
},
want: testResults{createdResources: 1},
},
{
name: "replace binding",
originalSpec: loadApplySpec(t, "base.flow.proto"),
newSpec: loadApplySpec(t, "replace-original-binding.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
deletedResources: 1,
createdResources: 1,
},
want: testResults{deletedResources: 1, createdResources: 1},
},
{
name: "field is newly nullable",
originalSpec: loadApplySpec(t, "base.flow.proto"),
newSpec: loadApplySpec(t, "make-nullable.flow.proto"),
want: testResults{
createdMetaTables: true,
putSpec: true,
nullabledProjections: 1,
},
want: testResults{nullabledProjections: 1},
},
}

Expand All @@ -120,7 +95,7 @@ func TestApply(t *testing.T) {
}
is := testInfoSchemaFromSpec(t, tt.originalSpec, simpleTestTransform)

req := &pm.Request_Apply{Materialization: tt.newSpec, Version: "aVersion"}
req := &pm.Request_Apply{Materialization: tt.newSpec, Version: "aVersion", LastMaterialization: tt.originalSpec}

// Not concurrent.
got, err := ApplyChanges(ctx, req, app, is, false)
Expand All @@ -147,8 +122,6 @@ func TestApply(t *testing.T) {
}

type testResults struct {
createdMetaTables bool
putSpec bool
createdResources int
deletedResources int
addedProjections int
Expand All @@ -164,17 +137,10 @@ type testApplier struct {
results testResults
}

func (a *testApplier) CreateMetaTables(ctx context.Context, spec *pf.MaterializationSpec) (string, ActionApplyFn, error) {
return "create meta tables", func(ctx context.Context) error {
a.results.createdMetaTables = true
return nil
}, nil
}

func (a *testApplier) CreateResource(ctx context.Context, spec *pf.MaterializationSpec, bindingIndex int) (string, ActionApplyFn, error) {
binding := spec.Bindings[bindingIndex]

return fmt.Sprintf("create resource for collection %q", binding.Collection.Name.String()), func(ctx context.Context) error {
return fmt.Sprintf("create resource for %q", binding.ResourcePath), func(ctx context.Context) error {
a.mu.Lock()
defer a.mu.Unlock()

Expand All @@ -183,17 +149,6 @@ func (a *testApplier) CreateResource(ctx context.Context, spec *pf.Materializati
}, nil
}

func (a *testApplier) LoadSpec(ctx context.Context, materialization pf.Materialization) (*pf.MaterializationSpec, error) {
return a.storedSpec, nil
}

func (a *testApplier) PutSpec(ctx context.Context, spec *pf.MaterializationSpec, version string, exists bool) (string, ActionApplyFn, error) {
return fmt.Sprintf("put spec with version %q", version), func(ctx context.Context) error {
a.results.putSpec = true
return nil
}, nil
}

func (a *testApplier) DeleteResource(ctx context.Context, path []string) (string, ActionApplyFn, error) {
return fmt.Sprintf("delete resource %q", path), func(ctx context.Context) error {
a.mu.Lock()
Expand All @@ -214,8 +169,8 @@ func (a *testApplier) UpdateResource(ctx context.Context, spec *pf.Materializati
}

action := fmt.Sprintf(
"update resource for collection %q [new projections: %d, newly nullable fields: %d, newly delta updates: %t]",
binding.Collection.Name.String(),
"update resource for %q [new projections: %d, newly nullable fields: %d, newly delta updates: %t]",
binding.ResourcePath,
len(bindingUpdate.NewProjections),
len(bindingUpdate.NewlyNullableFields),
bindingUpdate.NewlyDeltaUpdates,
Expand Down
Loading

0 comments on commit cbd025c

Please sign in to comment.