diff --git a/materialize-s3-iceberg/catalog.go b/materialize-s3-iceberg/catalog.go index a3693175ef..dc0aa76b14 100644 --- a/materialize-s3-iceberg/catalog.go +++ b/materialize-s3-iceberg/catalog.go @@ -211,34 +211,42 @@ func (c *catalog) UpdateResource(_ context.Context, spec *pf.MaterializationSpec }, nil } +type tableAppend struct { + Table string `json:"table"` + PreviousCheckpoint string `json:"prev_checkpoint"` + NextCheckpoint string `json:"next_checkpoint"` + FilePaths []string `json:"file_paths"` +} + func (c *catalog) appendFiles( ctx context.Context, materialization string, - tablePath []string, - filePaths []string, - prevCheckpoint string, - nextCheckpoint string, + tableAppends []tableAppend, ) error { - fqn := pathToFQN(tablePath) + input, err := json.Marshal(tableAppends) + if err != nil { + return nil + } b, err := runIcebergctl( ctx, c.cfg, "append-files", materialization, - fqn, - prevCheckpoint, - nextCheckpoint, - strings.Join(filePaths, ","), + string(input), ) if err != nil { return err } if len(b) > 0 { + output := make(map[string]string) + if err := json.Unmarshal(b, &output); err != nil { + return err + } + log.WithFields(log.Fields{ - "table": fqn, - "output": string(b), + "output": output, }).Info("append files") } diff --git a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py index 98747cfd98..9d04050788 100644 --- a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py +++ b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py @@ -298,25 +298,88 @@ def alter_table( update.update_column(path=c, required=False) +class TableAppend(BaseModel): + table: str + prev_checkpoint: str + next_checkpoint: str + file_paths: list[str] + + +async def append_to_table( + catalog: Catalog, materialization: str, table_append: TableAppend +) -> str: + table = table_append.table + prev_checkpoint = table_append.prev_checkpoint + next_checkpoint = table_append.next_checkpoint + + tbl = catalog.load_table(table) + checkpoints = TypeAdapter(dict[str, str]).validate_json( + tbl.properties.get("flow_checkpoints_v1", "{}") + ) + cp = checkpoints.get( + materialization, "" + ) # prev_checkpoint will be unset if this is the first commit to the table + + if cp == next_checkpoint: + return f"checkpoint is already '{next_checkpoint}'" + elif cp != "" and cp != prev_checkpoint: + # An absent checkpoint table property is allowed to accommodate cases + # where the user may have manually dropped the table and the + # materialization automatically re-created it, outside the normal + # backfill counter increment process. + raise Exception( + f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint" + ) + + # Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint + # property is updated to the current checkpoint in an atomic operation with appending the files. + # Note that this is not 100% correct exactly-once semantics, since there is a potential race + # between retrieving the table properties and appending the files, where a zombie process could + # append the same files concurrently. In principal Iceberg catalogs support the atomic + # operations necessary for true exactly-once semantics, but we'd need to work with the catalog + # at a lower level than PyIceberg currently makes available. + checkpoints[materialization] = next_checkpoint + txn = tbl.transaction() + txn.add_files(table_append.file_paths) + txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)}) + txn.commit_transaction() + + tbl = catalog.load_table(table) + return f"updated flow_checkpoints_v1 property to {next_checkpoint}" + + +async def run_appends( + catalog: Catalog, materialization: str, table_appends: list[TableAppend] +) -> dict[str, str]: + sem = asyncio.Semaphore(5) + + async def run_append(table_append: TableAppend) -> tuple[str, str]: + async with sem: + return ( + table_append.table, + await append_to_table(catalog, materialization, table_append), + ) + + return dict( + await asyncio.gather( + *(run_append(table_append) for table_append in table_appends) + ) + ) + + @run.command() @click.argument("materialization", type=str) -@click.argument("table", type=str) -@click.argument("prev-checkpoint", type=str) -@click.argument("next-checkpoint", type=str) -@click.argument("file-paths", type=str) +@click.argument("table-appends", type=str) @click.pass_context def append_files( ctx: Context, materialization: str, - table: str, - prev_checkpoint: str, - next_checkpoint: str, - file_paths: str, + table_appends: str, ): ''' - Appends files at "file-paths" to the table. + Appends files per the provided list. - The "prev-checkpoint" and "next-checkpoint" arguments are used to provide a best-effort + The "prev_checkpoint" and "next_checkpoint" properties are used to provide a best-effort avoidance of duplicating data from appending the same files that have previously been appended. A possible scenario is this: Files are successfully appended to Table1 and Table2 but not Table3 in response to the connector receiving a StartCommit message, but the connector is restarted @@ -326,10 +389,10 @@ def append_files( need to have the files appended. When a table is updated to append files, its "checkpoint" property is updated to - "next-checkpoint", and only tables with "checkpoint" equal to "prev-checkpoint" are appended to. + "next_checkpoint", and only tables with "checkpoint" equal to "prev_checkpoint" are appended to. The previously described scenario would then play out like this: - 1) The materialization connector persists values for "prev-checkpoint" and "next-checkpoint" of + 1) The materialization connector persists values for "prev_checkpoint" and "next_checkpoint" of "0001" and "0002", respectively, in its driver checkpoint via StartedCommit. 2) During the partial completion of the transaction, Table1 and Table2 are updated to have a @@ -347,39 +410,12 @@ def append_files( catalog = ctx.obj["catalog"] assert isinstance(catalog, Catalog) - tbl = catalog.load_table(table) - checkpoints = TypeAdapter(dict[str, str]).validate_json(tbl.properties.get("flow_checkpoints_v1", "{}")) - cp = checkpoints.get(materialization, "") # prev_checkpoint will be unset if this is the first commit to the table - if cp == next_checkpoint: - print(f"checkpoint is already '{next_checkpoint}'") - return # already appended these files - elif cp != "" and cp != prev_checkpoint: - # An absent checkpoint table property is allowed to accommodate cases - # where the user may have manually dropped the table and the - # materialization automatically re-created it, outside the normal - # backfill counter increment process. - raise Exception( - f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint" - ) - - # Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint - # property is updated to the current checkpoint in an atomic operation with appending the files. - # Note that this is not 100% correct exactly-once semantics, since there is a potential race - # between retrieving the table properties and appending the files, where a zombie process could - # append the same files concurrently. In principal Iceberg catalogs support the atomic - # operations necessary for true exactly-once semantics, but we'd need to work with the catalog - # at a lower level than PyIceberg currently makes available. - checkpoints[materialization] = next_checkpoint - txn = tbl.transaction() - txn.add_files(file_paths.split(",")) - txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)}) - txn.commit_transaction() + res = asyncio.run( + run_appends(catalog, materialization, TypeAdapter(list[TableAppend]).validate_json(table_appends)) + ) + print(json.dumps(res)) - # TODO(whb): This additional logging should not really be necessary, but is - # included for now to assist in troubleshooting potential errors. - tbl = catalog.load_table(table) - print(f"{table} updated with flow_checkpoints_v1 property of {tbl.properties.get("flow_checkpoints_v1")}") if __name__ == "__main__": run(auto_envvar_prefix="ICEBERG") diff --git a/materialize-s3-iceberg/transactor.go b/materialize-s3-iceberg/transactor.go index 1098ed933b..027e433a00 100644 --- a/materialize-s3-iceberg/transactor.go +++ b/materialize-s3-iceberg/transactor.go @@ -247,6 +247,8 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { } func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) { + var appends []tableAppend + for _, b := range t.bindings { bindingState := t.state.BindingStates[b.stateKey] @@ -254,21 +256,22 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error continue // no data for this binding } - ll := log.WithFields(log.Fields{ - "table": pathToFQN(b.path), - "previousCheckoint": bindingState.PreviousCheckpoint, - "currentCheckpoint": bindingState.CurrentCheckpoint, + appends = append(appends, tableAppend{ + Table: pathToFQN(b.path), + PreviousCheckpoint: bindingState.PreviousCheckpoint, + NextCheckpoint: bindingState.CurrentCheckpoint, + FilePaths: bindingState.FileKeys, }) - ll.Info("starting appendFiles for table") + bindingState.FileKeys = nil // reset for next txn + } + + if len(appends) > 0 { appendCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - if err := t.catalog.appendFiles(appendCtx, t.materialization, b.path, bindingState.FileKeys, bindingState.PreviousCheckpoint, bindingState.CurrentCheckpoint); err != nil { - return nil, fmt.Errorf("appendFiles for %s: %w", b.path, err) + if err := t.catalog.appendFiles(appendCtx, t.materialization, appends); err != nil { + return nil, fmt.Errorf("appendFiles: %w", err) } - ll.Info("finished appendFiles for table") - - bindingState.FileKeys = nil // reset for next txn } checkpointJSON, err := json.Marshal(t.state)