Skip to content

Commit

Permalink
Revert "materialize-s3-iceberg: run table appends concurrently"
Browse files Browse the repository at this point in the history
This reverts commit a79a235.

I'm seeing (intermittent) failures with the many concurrent requests to REST
catalogs. The relevant error information doesn't seem to be making it through
the Go/Python interface. Likely there needs to be some kind of retry mechanism
implemented. I am reverting this for now until that can be investigated, since
the intermittent crashes this is causing are probably worse than the latency
boost it offered.
  • Loading branch information
williamhbaker committed Dec 21, 2024
1 parent a79a235 commit fc1e495
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 111 deletions.
30 changes: 11 additions & 19 deletions materialize-s3-iceberg/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,42 +211,34 @@ func (c *catalog) UpdateResource(_ context.Context, spec *pf.MaterializationSpec
}, nil
}

type tableAppend struct {
Table string `json:"table"`
PreviousCheckpoint string `json:"prev_checkpoint"`
NextCheckpoint string `json:"next_checkpoint"`
FilePaths []string `json:"file_paths"`
}

func (c *catalog) appendFiles(
ctx context.Context,
materialization string,
tableAppends []tableAppend,
tablePath []string,
filePaths []string,
prevCheckpoint string,
nextCheckpoint string,
) error {
input, err := json.Marshal(tableAppends)
if err != nil {
return nil
}
fqn := pathToFQN(tablePath)

b, err := runIcebergctl(
ctx,
c.cfg,
"append-files",
materialization,
string(input),
fqn,
prevCheckpoint,
nextCheckpoint,
strings.Join(filePaths, ","),
)
if err != nil {
return err
}

if len(b) > 0 {
output := make(map[string]string)
if err := json.Unmarshal(b, &output); err != nil {
return err
}

log.WithFields(log.Fields{
"output": output,
"table": fqn,
"output": string(b),
}).Info("append files")
}

Expand Down
122 changes: 43 additions & 79 deletions materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,88 +298,25 @@ def alter_table(
update.update_column(path=c, required=False)


class TableAppend(BaseModel):
table: str
prev_checkpoint: str
next_checkpoint: str
file_paths: list[str]


async def append_to_table(
catalog: Catalog, materialization: str, table_append: TableAppend
) -> str:
table = table_append.table
prev_checkpoint = table_append.prev_checkpoint
next_checkpoint = table_append.next_checkpoint

tbl = catalog.load_table(table)
checkpoints = TypeAdapter(dict[str, str]).validate_json(
tbl.properties.get("flow_checkpoints_v1", "{}")
)
cp = checkpoints.get(
materialization, ""
) # prev_checkpoint will be unset if this is the first commit to the table

if cp == next_checkpoint:
return f"checkpoint is already '{next_checkpoint}'"
elif cp != "" and cp != prev_checkpoint:
# An absent checkpoint table property is allowed to accommodate cases
# where the user may have manually dropped the table and the
# materialization automatically re-created it, outside the normal
# backfill counter increment process.
raise Exception(
f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint"
)

# Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint
# property is updated to the current checkpoint in an atomic operation with appending the files.
# Note that this is not 100% correct exactly-once semantics, since there is a potential race
# between retrieving the table properties and appending the files, where a zombie process could
# append the same files concurrently. In principal Iceberg catalogs support the atomic
# operations necessary for true exactly-once semantics, but we'd need to work with the catalog
# at a lower level than PyIceberg currently makes available.
checkpoints[materialization] = next_checkpoint
txn = tbl.transaction()
txn.add_files(table_append.file_paths)
txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)})
txn.commit_transaction()

tbl = catalog.load_table(table)
return f"updated flow_checkpoints_v1 property to {next_checkpoint}"


async def run_appends(
catalog: Catalog, materialization: str, table_appends: list[TableAppend]
) -> dict[str, str]:
sem = asyncio.Semaphore(5)

async def run_append(table_append: TableAppend) -> tuple[str, str]:
async with sem:
return (
table_append.table,
await append_to_table(catalog, materialization, table_append),
)

return dict(
await asyncio.gather(
*(run_append(table_append) for table_append in table_appends)
)
)


@run.command()
@click.argument("materialization", type=str)
@click.argument("table-appends", type=str)
@click.argument("table", type=str)
@click.argument("prev-checkpoint", type=str)
@click.argument("next-checkpoint", type=str)
@click.argument("file-paths", type=str)
@click.pass_context
def append_files(
ctx: Context,
materialization: str,
table_appends: str,
table: str,
prev_checkpoint: str,
next_checkpoint: str,
file_paths: str,
):
'''
Appends files per the provided list.
Appends files at "file-paths" to the table.
The "prev_checkpoint" and "next_checkpoint" properties are used to provide a best-effort
The "prev-checkpoint" and "next-checkpoint" arguments are used to provide a best-effort
avoidance of duplicating data from appending the same files that have previously been appended.
A possible scenario is this: Files are successfully appended to Table1 and Table2 but not Table3
in response to the connector receiving a StartCommit message, but the connector is restarted
Expand All @@ -389,10 +326,10 @@ def append_files(
need to have the files appended.
When a table is updated to append files, its "checkpoint" property is updated to
"next_checkpoint", and only tables with "checkpoint" equal to "prev_checkpoint" are appended to.
"next-checkpoint", and only tables with "checkpoint" equal to "prev-checkpoint" are appended to.
The previously described scenario would then play out like this:
1) The materialization connector persists values for "prev_checkpoint" and "next_checkpoint" of
1) The materialization connector persists values for "prev-checkpoint" and "next-checkpoint" of
"0001" and "0002", respectively, in its driver checkpoint via StartedCommit.
2) During the partial completion of the transaction, Table1 and Table2 are updated to have a
Expand All @@ -410,12 +347,39 @@ def append_files(
catalog = ctx.obj["catalog"]
assert isinstance(catalog, Catalog)

tbl = catalog.load_table(table)
checkpoints = TypeAdapter(dict[str, str]).validate_json(tbl.properties.get("flow_checkpoints_v1", "{}"))
cp = checkpoints.get(materialization, "") # prev_checkpoint will be unset if this is the first commit to the table

res = asyncio.run(
run_appends(catalog, materialization, TypeAdapter(list[TableAppend]).validate_json(table_appends))
)
print(json.dumps(res))
if cp == next_checkpoint:
print(f"checkpoint is already '{next_checkpoint}'")
return # already appended these files
elif cp != "" and cp != prev_checkpoint:
# An absent checkpoint table property is allowed to accommodate cases
# where the user may have manually dropped the table and the
# materialization automatically re-created it, outside the normal
# backfill counter increment process.
raise Exception(
f"checkpoint from snapshot ({cp}) did not match either previous ({prev_checkpoint}) or next ({next_checkpoint}) checkpoint"
)

# Files are only added if the table checkpoint property has the prior checkpoint. The checkpoint
# property is updated to the current checkpoint in an atomic operation with appending the files.
# Note that this is not 100% correct exactly-once semantics, since there is a potential race
# between retrieving the table properties and appending the files, where a zombie process could
# append the same files concurrently. In principal Iceberg catalogs support the atomic
# operations necessary for true exactly-once semantics, but we'd need to work with the catalog
# at a lower level than PyIceberg currently makes available.
checkpoints[materialization] = next_checkpoint
txn = tbl.transaction()
txn.add_files(file_paths.split(","))
txn.set_properties({"flow_checkpoints_v1": json.dumps(checkpoints)})
txn.commit_transaction()

# TODO(whb): This additional logging should not really be necessary, but is
# included for now to assist in troubleshooting potential errors.
tbl = catalog.load_table(table)
print(f"{table} updated with flow_checkpoints_v1 property of {tbl.properties.get("flow_checkpoints_v1")}")

if __name__ == "__main__":
run(auto_envvar_prefix="ICEBERG")
23 changes: 10 additions & 13 deletions materialize-s3-iceberg/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,31 +247,28 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
var appends []tableAppend

for _, b := range t.bindings {
bindingState := t.state.BindingStates[b.stateKey]

if len(bindingState.FileKeys) == 0 {
continue // no data for this binding
}

appends = append(appends, tableAppend{
Table: pathToFQN(b.path),
PreviousCheckpoint: bindingState.PreviousCheckpoint,
NextCheckpoint: bindingState.CurrentCheckpoint,
FilePaths: bindingState.FileKeys,
ll := log.WithFields(log.Fields{
"table": pathToFQN(b.path),
"previousCheckoint": bindingState.PreviousCheckpoint,
"currentCheckpoint": bindingState.CurrentCheckpoint,
})

bindingState.FileKeys = nil // reset for next txn
}

if len(appends) > 0 {
ll.Info("starting appendFiles for table")
appendCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := t.catalog.appendFiles(appendCtx, t.materialization, appends); err != nil {
return nil, fmt.Errorf("appendFiles: %w", err)
if err := t.catalog.appendFiles(appendCtx, t.materialization, b.path, bindingState.FileKeys, bindingState.PreviousCheckpoint, bindingState.CurrentCheckpoint); err != nil {
return nil, fmt.Errorf("appendFiles for %s: %w", b.path, err)
}
ll.Info("finished appendFiles for table")

bindingState.FileKeys = nil // reset for next txn
}

checkpointJSON, err := json.Marshal(t.state)
Expand Down

0 comments on commit fc1e495

Please sign in to comment.