-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support merge append action #902
base: main
Are you sure you want to change the base?
Conversation
@@ -1182,6 +1182,12 @@ impl ManifestEntry { | |||
pub fn data_file(&self) -> &DataFile { | |||
&self.data_file | |||
} | |||
|
|||
/// get file sequence number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// get file sequence number | |
/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall just some small nits. implementing the defaults values for new fields can be done after this pr (#737)
// Enable merge append for table | ||
let tx = Transaction::new(&table); | ||
table = tx | ||
.set_properties(HashMap::from([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also try adding a test here for the MANIFEST_TARGET_SIZE_BYTES
property?
/// Finished building the action and apply it to the transaction. | ||
pub async fn apply(self) -> Result<Transaction<'a>> { | ||
if self.merge_enabled { | ||
let process = MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let process = MergeManifsetProcess { | |
let process = MergeManifestProcess { |
} | ||
} | ||
|
||
impl ManifestProcess for MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl ManifestProcess for MergeManifsetProcess { | |
impl ManifestProcess for MergeManifestProcess { |
struct MergeManifsetProcess { | ||
target_size_bytes: u32, | ||
min_count_to_merge: u32, | ||
} | ||
|
||
impl MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct MergeManifsetProcess { | |
target_size_bytes: u32, | |
min_count_to_merge: u32, | |
} | |
impl MergeManifsetProcess { | |
struct MergeManifestProcess { | |
target_size_bytes: u32, | |
min_count_to_merge: u32, | |
} | |
impl MergeManifestProcess { |
for manifset_file in manifest_bin { | ||
let manifest_file = manifset_file.load_manifest(&file_io).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for manifset_file in manifest_bin { | |
let manifest_file = manifset_file.load_manifest(&file_io).await?; | |
for manifest_file in manifest_bin { | |
let manifest_file = manifest_file.load_manifest(&file_io).await?; |
Ok(merged_bins.into_iter().flatten().collect()) | ||
} | ||
|
||
async fn merge_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn merge_manifeset<'a>( | |
async fn merge_manifest<'a>( |
@@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { | |||
struct DefaultManifestProcess; | |||
|
|||
impl ManifestProcess for DefaultManifestProcess { | |||
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { | |||
manifests | |||
async fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn process_manifeset<'a>( | |
async fn process_manifest<'a>( |
} | ||
} | ||
|
||
trait ManifestProcess: Send + Sync { | ||
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>; | ||
fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn process_manifeset<'a>( | |
fn process_manifest<'a>( |
} | ||
|
||
impl ManifestProcess for MergeManifsetProcess { | ||
async fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn process_manifeset<'a>( | |
async fn process_manifest<'a>( |
return Ok(manifests); | ||
} | ||
|
||
let first_manifest = manifests[0].clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be an expensive clone (not wrapped in arc)? is there a way to avoid?
This PR complete #736