-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(incremental): copy multiple tables in parallel (#1237) #1413
base: main
Are you sure you want to change the base?
feat(incremental): copy multiple tables in parallel (#1237) #1413
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a maintainer but this is a much needed feature, thanks @AxelThevenot 🙌🏻
Large datasets with a lot of partitions currently performs quite poorly.
I looked into doing this directly with one call copy_table
call but it doesn't look like it's possible currently (unless we delete the partitions upfront and use write_append)
# Runs all the copy jobs in parallel | ||
for source_ref in source_ref_array: | ||
|
||
for partition_id in partition_ids or [None]: | ||
source_ref_partition = ( | ||
f"{source_ref}${partition_id}" if partition_id else source_ref | ||
) | ||
destination_ref_partition = ( | ||
f"{destination_ref}${partition_id}" if partition_id else destination_ref | ||
) | ||
copy_job = client.copy_table( | ||
source_ref_partition, | ||
destination_ref_partition, | ||
job_config=CopyJobConfig(write_disposition=write_disposition), | ||
retry=self._retry.create_reopen_with_deadline(conn), | ||
) | ||
copy_jobs.append(copy_job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would being explicit here clarify the logic?
if partition_ids:
copy_jobs = [
client.copy_table(
f"{source_ref}${partition_id}",
f"{destination_ref}${partition_id}",
job_config=CopyJobConfig(write_disposition=write_disposition),
retry=self._retry.create_reopen_with_deadline(conn),
)
for partition_id in partition_ids
for source_ref in source_ref_array
]
else:
copy_jobs = [client.copy_table(
source_ref_array,
destination_ref,
job_config=CopyJobConfig(write_disposition=write_disposition),
retry=self._retry.create_reopen_with_deadline(conn),
)]
copy_job = client.copy_table( | ||
source_ref_partition, | ||
destination_ref_partition, | ||
job_config=CopyJobConfig(write_disposition=write_disposition), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be more than one element in source_ref_partition
?
If we ever be in the scenario where we have source_ref_array
greater than one and write_disposition
set to WRITE_TRUNCATE
, we'll be overwriting the same data.
resolves dbt-labs/dbt-adapters#559
N/A dbt-labs/docs.getdbt.com/#
Problem
Copy partitions and tables in parallel instead of sequentially which is slow for large partition management
Solution
Run jobs in parallel and waits for the results.
Checklist