Skip to content

Commit

Permalink
materialize-s3-iceberg: run table appends concurrently
Browse files Browse the repository at this point in the history
For materializations with a large number of bindings, running each "table
append" action sequentially can end up taking quite a while.

These append actions can be run concurrently which requires some restructuring,
but should make the commits complete quite a bit faster, and provide a
significant reduction in latency.
  • Loading branch information
williamhbaker committed Dec 20, 2024
1 parent 4de5a35 commit a79a235
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 64 deletions.
30 changes: 19 additions & 11 deletions materialize-s3-iceberg/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,34 +211,42 @@ func (c *catalog) UpdateResource(_ context.Context, spec *pf.MaterializationSpec
}, nil
}

type tableAppend struct {
Table string `json:"table"`
PreviousCheckpoint string `json:"prev_checkpoint"`
NextCheckpoint string `json:"next_checkpoint"`
FilePaths []string `json:"file_paths"`
}

func (c *catalog) appendFiles(
ctx context.Context,
materialization string,
tablePath []string,
filePaths []string,
prevCheckpoint string,
nextCheckpoint string,
tableAppends []tableAppend,
) error {
fqn := pathToFQN(tablePath)
input, err := json.Marshal(tableAppends)
if err != nil {
return nil
}

b, err := runIcebergctl(
ctx,
c.cfg,
"append-files",
materialization,
fqn,
prevCheckpoint,
nextCheckpoint,
strings.Join(filePaths, ","),
string(input),
)
if err != nil {
return err
}

if len(b) > 0 {
output := make(map[string]string)
if err := json.Unmarshal(b, &output); err != nil {
return err
}

log.WithFields(log.Fields{
"table": fqn,
"output": string(b),
"output": output,
}).Info("append files")
}

Expand Down
122 changes: 79 additions & 43 deletions materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,88 @@ def alter_table(
update.update_column(path=c, required=False)


class TableAppend(BaseModel):
table: str
prev_checkpoint: str
next_checkpoint: str
file_paths: list[str]


async def append_to_table(
catalog: Catalog, materialization: str, table_append: TableAppend
) -> str:
table = table_append.table
prev_checkpoint = table_append.prev_checkpoint
next_checkpoint = table_append.next_checkpoint

tbl = catalog.load_table(table)
checkpoints = TypeAdapter(dict[str, str]).validate_json(
tbl.properties.get("flow_checkpoints_v1", "{}")
)
cp = checkpoints.get(
materialization, ""
) # prev_checkpoint will be unset if this is the first commit to the table

if cp == next_checkpoint:
return f"checkpoint is already '{next_checkpoint}'"
elif cp != "" and cp != prev_checkpoint:
# An absent checkpoint table property is allowed to accommodate cases
# where the user may have manually dropped the table and the
# materialization automatically re-created it, outside the normal
# backfill counter increment process.
raise Exception(
f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint"
)

# Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint
# property is updated to the current checkpoint in an atomic operation with appending the files.
# Note that this is not 100% correct exactly-once semantics, since there is a potential race
# between retrieving the table properties and appending the files, where a zombie process could
# append the same files concurrently. In principal Iceberg catalogs support the atomic
# operations necessary for true exactly-once semantics, but we'd need to work with the catalog
# at a lower level than PyIceberg currently makes available.
checkpoints[materialization] = next_checkpoint
txn = tbl.transaction()
txn.add_files(table_append.file_paths)
txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)})
txn.commit_transaction()

tbl = catalog.load_table(table)
return f"updated flow_checkpoints_v1 property to {next_checkpoint}"


async def run_appends(
catalog: Catalog, materialization: str, table_appends: list[TableAppend]
) -> dict[str, str]:
sem = asyncio.Semaphore(5)

async def run_append(table_append: TableAppend) -> tuple[str, str]:
async with sem:
return (
table_append.table,
await append_to_table(catalog, materialization, table_append),
)

return dict(
await asyncio.gather(
*(run_append(table_append) for table_append in table_appends)
)
)


@run.command()
@click.argument("materialization", type=str)
@click.argument("table", type=str)
@click.argument("prev-checkpoint", type=str)
@click.argument("next-checkpoint", type=str)
@click.argument("file-paths", type=str)
@click.argument("table-appends", type=str)
@click.pass_context
def append_files(
ctx: Context,
materialization: str,
table: str,
prev_checkpoint: str,
next_checkpoint: str,
file_paths: str,
table_appends: str,
):
'''
Appends files at "file-paths" to the table.
Appends files per the provided list.
The "prev-checkpoint" and "next-checkpoint" arguments are used to provide a best-effort
The "prev_checkpoint" and "next_checkpoint" properties are used to provide a best-effort
avoidance of duplicating data from appending the same files that have previously been appended.
A possible scenario is this: Files are successfully appended to Table1 and Table2 but not Table3
in response to the connector receiving a StartCommit message, but the connector is restarted
Expand All @@ -326,10 +389,10 @@ def append_files(
need to have the files appended.
When a table is updated to append files, its "checkpoint" property is updated to
"next-checkpoint", and only tables with "checkpoint" equal to "prev-checkpoint" are appended to.
"next_checkpoint", and only tables with "checkpoint" equal to "prev_checkpoint" are appended to.
The previously described scenario would then play out like this:
1) The materialization connector persists values for "prev-checkpoint" and "next-checkpoint" of
1) The materialization connector persists values for "prev_checkpoint" and "next_checkpoint" of
"0001" and "0002", respectively, in its driver checkpoint via StartedCommit.
2) During the partial completion of the transaction, Table1 and Table2 are updated to have a
Expand All @@ -347,39 +410,12 @@ def append_files(
catalog = ctx.obj["catalog"]
assert isinstance(catalog, Catalog)

tbl = catalog.load_table(table)
checkpoints = TypeAdapter(dict[str, str]).validate_json(tbl.properties.get("flow_checkpoints_v1", "{}"))
cp = checkpoints.get(materialization, "") # prev_checkpoint will be unset if this is the first commit to the table

if cp == next_checkpoint:
print(f"checkpoint is already '{next_checkpoint}'")
return # already appended these files
elif cp != "" and cp != prev_checkpoint:
# An absent checkpoint table property is allowed to accommodate cases
# where the user may have manually dropped the table and the
# materialization automatically re-created it, outside the normal
# backfill counter increment process.
raise Exception(
f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint"
)

# Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint
# property is updated to the current checkpoint in an atomic operation with appending the files.
# Note that this is not 100% correct exactly-once semantics, since there is a potential race
# between retrieving the table properties and appending the files, where a zombie process could
# append the same files concurrently. In principal Iceberg catalogs support the atomic
# operations necessary for true exactly-once semantics, but we'd need to work with the catalog
# at a lower level than PyIceberg currently makes available.
checkpoints[materialization] = next_checkpoint
txn = tbl.transaction()
txn.add_files(file_paths.split(","))
txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)})
txn.commit_transaction()
res = asyncio.run(
run_appends(catalog, materialization, TypeAdapter(list[TableAppend]).validate_json(table_appends))
)
print(json.dumps(res))

# TODO(whb): This additional logging should not really be necessary, but is
# included for now to assist in troubleshooting potential errors.
tbl = catalog.load_table(table)
print(f"{table} updated with flow_checkpoints_v1 property of {tbl.properties.get("flow_checkpoints_v1")}")

if __name__ == "__main__":
run(auto_envvar_prefix="ICEBERG")
23 changes: 13 additions & 10 deletions materialize-s3-iceberg/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,28 +247,31 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
var appends []tableAppend

for _, b := range t.bindings {
bindingState := t.state.BindingStates[b.stateKey]

if len(bindingState.FileKeys) == 0 {
continue // no data for this binding
}

ll := log.WithFields(log.Fields{
"table": pathToFQN(b.path),
"previousCheckoint": bindingState.PreviousCheckpoint,
"currentCheckpoint": bindingState.CurrentCheckpoint,
appends = append(appends, tableAppend{
Table: pathToFQN(b.path),
PreviousCheckpoint: bindingState.PreviousCheckpoint,
NextCheckpoint: bindingState.CurrentCheckpoint,
FilePaths: bindingState.FileKeys,
})

ll.Info("starting appendFiles for table")
bindingState.FileKeys = nil // reset for next txn
}

if len(appends) > 0 {
appendCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := t.catalog.appendFiles(appendCtx, t.materialization, b.path, bindingState.FileKeys, bindingState.PreviousCheckpoint, bindingState.CurrentCheckpoint); err != nil {
return nil, fmt.Errorf("appendFiles for %s: %w", b.path, err)
if err := t.catalog.appendFiles(appendCtx, t.materialization, appends); err != nil {
return nil, fmt.Errorf("appendFiles: %w", err)
}
ll.Info("finished appendFiles for table")

bindingState.FileKeys = nil // reset for next txn
}

checkpointJSON, err := json.Marshal(t.state)
Expand Down

0 comments on commit a79a235

Please sign in to comment.