-
-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes required for Parseable Enterprise #1215
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces a new public enterprise utility module along with its associated functions for generating time filters and retrieving parquet file paths from an object storage system. It also adjusts module visibility in multiple parts of the codebase, simplifies log formatting in the stream processing module, and adds new asynchronous methods— Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Fetcher as fetch_parquet_file_paths
participant Filter as create_time_filter
participant OS as ObjectStorage
participant Collector as collect_manifest_files
Caller->>Fetcher: Call fetch_parquet_file_paths(stream, time_range)
Fetcher->>Filter: Call create_time_filter(time_range, partition, table)
Filter-->>Fetcher: Return time filter expressions
Fetcher->>OS: Retrieve snapshot and manifest URLs
OS-->>Fetcher: Return matching objects
Fetcher->>Collector: Spawn async tasks to collect manifest files
Collector-->>Fetcher: Return list of manifests
Fetcher->>Caller: Return mapping of parquet paths by date
sequenceDiagram
participant Client
participant Storage as ObjectStorage (S3, LocalFS, AzureBlob)
Client->>Storage: Call get_buffered_reader(path)
Storage-->>Client: Return BufReader (data stream)
Client->>Storage: Call head(path)
Storage-->>Client: Return ObjectMeta (metadata)
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
src/storage/azure_blob.rs (2)
427-432
: Unimplementedget_buffered_reader
method
This stub method currently returnsunimplemented!()
. Consider implementing a basic version to match the functionality in other storage backends, such as S3. Otherwise, ensure proper error handling or documentation clarifying that it's not yet supported.Do you want me to propose a reference implementation that mirrors the approach in
s3.rs
?
433-435
: Unimplementedhead
method
Without a workinghead
implementation, calls to retrieve object metadata will fail on Azure storage. Consider returning aNotImplemented
error or implementing the metadata retrieval logic.src/storage/object_storage.rs (1)
79-83
: Trait methods for buffered reading and metadata (API consistency)
Addingget_buffered_reader
andhead
to theObjectStorage
trait enforces consistency across providers. Ensure that all existing implementations (e.g., localfs, azure, s3) follow through with a valid or at least a gracefully failing implementation.src/enterprise/utils.rs (2)
63-135
:fetch_parquet_file_paths
for retrieving and filtering manifest files
This function merges snapshots, fetches manifest files, and applies time-based filters. Consider additional checks:
• Handling partial JSON parse failures gracefully (skip vs. abort).
• Ensuring date path parsing remains robust for different file naming conventions.
137-161
: Async fetching of manifests incollect_manifest_files
Spawning tasks for each manifest file concurrently can improve performance. However, ensure partial failures (e.g., a single corrupted file) do not affect overall success. Consider adding timeouts or concurrency limits if the number of files is large.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
src/enterprise/mod.rs
(1 hunks)src/enterprise/utils.rs
(1 hunks)src/lib.rs
(2 hunks)src/parseable/streams.rs
(1 hunks)src/query/stream_schema_provider.rs
(1 hunks)src/storage/azure_blob.rs
(2 hunks)src/storage/localfs.rs
(2 hunks)src/storage/object_storage.rs
(2 hunks)src/storage/s3.rs
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/enterprise/mod.rs
🔇 Additional comments (10)
src/lib.rs (1)
24-24
: Module visibility changes and new enterprise module addition.These changes make several internal modules public and add a new enterprise module. This expands the library's public API surface, which is aligned with the PR objectives for Parseable Enterprise.
Make sure that:
- These modules' public APIs are well-documented
- You've considered API stability implications
- The new enterprise module follows the same design patterns as other modules
Also applies to: 29-29, 41-41, 49-49
src/parseable/streams.rs (1)
516-517
: Simplified warning log format.The warning log has been simplified from a multi-line format to a single line, which makes the logging more concise.
src/query/stream_schema_provider.rs (1)
897-897
:❓ Verification inconclusive
Made ManifestExt trait public.
This change makes the
ManifestExt
trait accessible outside its module, which is consistent with making thequery
module public inlib.rs
. The trait provides useful methods for filtering and pruning data based on query conditions.
🏁 Script executed:
#!/bin/bash # Check if the ManifestExt trait is being used in the new enterprise module rg -A 2 -B 2 "ManifestExt" --include="*.rs" ./src/enterprise/Length of output: 503
ManifestExt Trait Made Public – Further Verification Required
The change to make the
ManifestExt
trait public remains valid as it aligns with exposing thequery
module. However, the verification command intended to check its usage in the enterprise module returned an error due to an unsupported--include
flag. Please manually verify or update the search command (for example, using--glob "*.rs"
) to confirm that the trait is referenced as expected in thesrc/enterprise/
code.src/storage/localfs.rs (1)
31-31
: Added new ObjectStore imports for buffered reading and metadata.The import for
BufReader
andObjectMeta
supports the new methods being added to theObjectStorage
trait implementation.src/storage/azure_blob.rs (1)
38-41
: Potential usage of newly importedBufReader
Thebuffered::BufReader
import suggests upcoming buffered reading support, but it is not yet referenced in the existing code (beyond the added trait method). Ensure that usage is fully integrated and tested to avoid dead code or missing functionality.src/storage/object_storage.rs (1)
36-37
: New imports for trait enhancements
The import ofBufReader
andObjectMeta
is essential for the added methods (get_buffered_reader
andhead
). This extension allows for more versatile and standardized object I/O operations in the trait.src/storage/s3.rs (2)
40-43
: Updated imports for buffered reading and object metadata retrieval
The addition ofBufReader
,BackoffConfig
, andObjectMeta
imports aligns with new S3 functionalities for metadata and streamed reads. Ensure consistent modern error handling patterns across the code.
333-333
: Removal ofLimitStore
wrapper
Switching theclient
field to useAmazonS3
directly can simplify architecture but may remove concurrency or rate-limiting controls thatLimitStore
provided. Verify if this is an intentional design choice and whether it affects performance under high load.src/enterprise/utils.rs (2)
1-19
: Initial imports for enterprise utilities
Necessary crates likedatafusion
,itertools
, andrelative_path
are introduced. This setup appears aligned with upcoming filtering and file path generation. Verify that none of these additions significantly increase build overhead or introduce overlapping functionality with existing imports.
20-61
:create_time_filter
for generating start/end expressions
The logic handles both partitioned and non-partitioned timestamps. Verify that the inclusive/exclusive bounds (Bound::Included
vs.Bound::Excluded
) match the desired time slice. Also consider adding unit tests covering edge cases.
3ea414e
to
03687d6
Compare
03687d6
to
79321a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/catalog/mod.rs (1)
354-354
:⚠️ Potential issueReplace
unimplemented!()
with a proper error handling approachSimilar to the previous comment, using
unimplemented!()
will cause a runtime panic if this code path is executed.Replace with a more graceful error handling approach:
- Mode::Index => unimplemented!(), + Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Index mode not yet implemented for get_first_event" + ) + ))),
🧹 Nitpick comments (4)
src/main.rs (1)
42-45
: Consider adding clarity about program terminationThe implementation correctly informs users that indexing is an enterprise feature, but it might be helpful to explicitly mention that the program will exit after displaying this message.
Additionally, consider whether an exit code of 0 (which indicates successful execution) is the most appropriate choice for this scenario.
src/handlers/http/middleware.rs (1)
361-378
: Mode::Index middleware implementation follows established patternsThe implementation correctly restricts access to endpoints except those containing "create" or "delete" when in Index mode, following the same pattern as other mode implementations.
However, you might want to consider documenting what "Index API" specifically refers to, either in a comment or in external documentation, to make it clearer for other developers what functionality is available in this mode.
src/storage/s3.rs (1)
330-330
: Consider maintaining a concurrency limit as in the datafusion runtime setup.The
construct_client
method now uses a directAmazonS3
client rather than theLimitStore<AmazonS3>
wrapper. If you wish to preserve the concurrency-limiting behavior from the datafusion runtime in this path, reintroduce a concurrency wrapper or clarify if this new usage is intentionally unlimited.src/enterprise/utils.rs (1)
20-61
: Ensure the time bounds align with the intended semantics.Currently, the lower bound is inclusive, and the upper bound is exclusive. Confirm that these match the expected time range filtering semantics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
src/catalog/mod.rs
(2 hunks)src/enterprise/mod.rs
(1 hunks)src/enterprise/utils.rs
(1 hunks)src/handlers/http/middleware.rs
(1 hunks)src/lib.rs
(2 hunks)src/main.rs
(2 hunks)src/option.rs
(2 hunks)src/parseable/mod.rs
(1 hunks)src/parseable/streams.rs
(1 hunks)src/query/stream_schema_provider.rs
(1 hunks)src/storage/azure_blob.rs
(2 hunks)src/storage/localfs.rs
(2 hunks)src/storage/object_storage.rs
(4 hunks)src/storage/s3.rs
(3 hunks)src/storage/store_metadata.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- src/enterprise/mod.rs
- src/query/stream_schema_provider.rs
- src/storage/azure_blob.rs
- src/lib.rs
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: coverage
🔇 Additional comments (12)
src/storage/store_metadata.rs (1)
162-167
: Implementation looks good for Index modeThe implementation for Mode::Index correctly follows the same pattern as Mode::Ingest, updating the server mode and staging directory in the metadata.
src/main.rs (1)
1-2
: Added import for process exitImport correctly added for the new exit functionality.
src/option.rs (2)
25-25
: Index mode added to Mode enumThe new Mode::Index variant is correctly added to the enum.
132-132
: Parsing for "index" string correctly implementedThe mode function is properly updated to handle the new "index" string input.
src/parseable/mod.rs (1)
246-246
: LGTM: Mode::Index variant correctly addedThis change adds proper handling for the new
Mode::Index
variant, which now returns "Distributed (Index)" in the string representation of server modes.src/storage/object_storage.rs (3)
79-83
: LGTM: New storage interface methods enhance functionalityThe addition of
get_buffered_reader
andhead
methods to theObjectStorage
trait provides more efficient ways to interact with stored objects. Theget_buffered_reader
allows streaming data with buffering for better memory usage, while thehead
method enables checking metadata without downloading the entire object.
823-826
: LGTM: Schema path generation updated for Index modeThe schema path generation function has been properly updated to include the new
Mode::Index
variant in its control flow.
841-846
: LGTM: Stream JSON path generation updated for Index modeThe stream JSON path generation function has been properly updated to include the new
Mode::Index
variant in its control flow.src/storage/localfs.rs (1)
107-125
: LGTM: Proper error handling for unimplemented methodsThe implementation of the new
get_buffered_reader
andhead
methods correctly returns meaningful error messages instead of causing runtime panics. This approach allows the code to gracefully handle these cases while indicating that the functionality is not yet supported.This implementation follows the best practice suggested in previous code reviews, providing clear error messages that will help developers understand why the operation failed.
src/storage/s3.rs (3)
40-43
: Great addition of the BufReader and related imports.These imports enable buffered read capabilities, which is crucial for streaming large objects efficiently. No immediate concerns here.
558-568
: Check for potential error states and large object edge cases.The
get_buffered_reader
method gracefully returns an error if thehead
call fails. This is correct. However, for extremely large objects, ensure that partial reads or network timeouts are handled appropriately upstream, especially in high-latency environments.
569-571
: Direct pass-through of metadata retrieval looks good.This
head
method is straightforward and properly propagates errors. No issues found.
src/enterprise/utils.rs
Outdated
pub async fn fetch_parquet_file_paths( | ||
stream: &str, | ||
time_range: &TimeRange, | ||
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> { | ||
let glob_storage = PARSEABLE.storage.get_object_store(); | ||
|
||
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap(); | ||
|
||
let time_partition = object_store_format.time_partition; | ||
|
||
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream); | ||
|
||
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition); | ||
|
||
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default(); | ||
|
||
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); | ||
let obs = glob_storage | ||
.get_objects( | ||
Some(&path), | ||
Box::new(|file_name| file_name.ends_with("stream.json")), | ||
) | ||
.await; | ||
if let Ok(obs) = obs { | ||
for ob in obs { | ||
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) { | ||
let snapshot = object_store_format.snapshot; | ||
for manifest in snapshot.manifest_list { | ||
merged_snapshot.manifest_list.push(manifest); | ||
} | ||
} | ||
} | ||
} | ||
|
||
let manifest_files = collect_manifest_files( | ||
glob_storage, | ||
merged_snapshot | ||
.manifests(&time_filters) | ||
.into_iter() | ||
.sorted_by_key(|file| file.time_lower_bound) | ||
.map(|item| item.manifest_path) | ||
.collect(), | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new(); | ||
|
||
let mut selected_files = manifest_files | ||
.into_iter() | ||
.flat_map(|file| file.files) | ||
.rev() | ||
.collect_vec(); | ||
|
||
for filter in time_filter_expr { | ||
selected_files.retain(|file| !file.can_be_pruned(&filter)) | ||
} | ||
|
||
selected_files | ||
.into_iter() | ||
.map(|file| { | ||
let date = file.file_path.split("/").collect_vec(); | ||
|
||
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | ||
|
||
let date = RelativePathBuf::from_iter(date); | ||
|
||
parquet_files.entry(date).or_default().push(file); | ||
}) | ||
.for_each(|_| {}); | ||
|
||
Ok(parquet_files) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using unwrap()
when collecting manifest files.
On line 107, collect_manifest_files(...).await.unwrap()
can cause a panic if an error occurs. Instead, consider propagating or handling the error gracefully to prevent unexpected runtime failures.
--- a/src/enterprise/utils.rs
+++ b/src/enterprise/utils.rs
@@ -106,7 +106,11 @@ pub async fn fetch_parquet_file_paths(
.await
- .unwrap();
+ .map_err(|err| {
+ // Log or handle error
+ eprintln!("Failed to collect manifest files: {:?}", err);
+ err
+ })?;
// Then decide how the function returns or logs the error
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn fetch_parquet_file_paths( | |
stream: &str, | |
time_range: &TimeRange, | |
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> { | |
let glob_storage = PARSEABLE.storage.get_object_store(); | |
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap(); | |
let time_partition = object_store_format.time_partition; | |
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream); | |
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition); | |
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default(); | |
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); | |
let obs = glob_storage | |
.get_objects( | |
Some(&path), | |
Box::new(|file_name| file_name.ends_with("stream.json")), | |
) | |
.await; | |
if let Ok(obs) = obs { | |
for ob in obs { | |
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) { | |
let snapshot = object_store_format.snapshot; | |
for manifest in snapshot.manifest_list { | |
merged_snapshot.manifest_list.push(manifest); | |
} | |
} | |
} | |
} | |
let manifest_files = collect_manifest_files( | |
glob_storage, | |
merged_snapshot | |
.manifests(&time_filters) | |
.into_iter() | |
.sorted_by_key(|file| file.time_lower_bound) | |
.map(|item| item.manifest_path) | |
.collect(), | |
) | |
.await | |
.unwrap(); | |
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new(); | |
let mut selected_files = manifest_files | |
.into_iter() | |
.flat_map(|file| file.files) | |
.rev() | |
.collect_vec(); | |
for filter in time_filter_expr { | |
selected_files.retain(|file| !file.can_be_pruned(&filter)) | |
} | |
selected_files | |
.into_iter() | |
.map(|file| { | |
let date = file.file_path.split("/").collect_vec(); | |
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
let date = RelativePathBuf::from_iter(date); | |
parquet_files.entry(date).or_default().push(file); | |
}) | |
.for_each(|_| {}); | |
Ok(parquet_files) | |
} | |
pub async fn fetch_parquet_file_paths( | |
stream: &str, | |
time_range: &TimeRange, | |
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> { | |
let glob_storage = PARSEABLE.storage.get_object_store(); | |
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap(); | |
let time_partition = object_store_format.time_partition; | |
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream); | |
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition); | |
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default(); | |
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); | |
let obs = glob_storage | |
.get_objects( | |
Some(&path), | |
Box::new(|file_name| file_name.ends_with("stream.json")), | |
) | |
.await; | |
if let Ok(obs) = obs { | |
for ob in obs { | |
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) { | |
let snapshot = object_store_format.snapshot; | |
for manifest in snapshot.manifest_list { | |
merged_snapshot.manifest_list.push(manifest); | |
} | |
} | |
} | |
} | |
let manifest_files = collect_manifest_files( | |
glob_storage, | |
merged_snapshot | |
.manifests(&time_filters) | |
.into_iter() | |
.sorted_by_key(|file| file.time_lower_bound) | |
.map(|item| item.manifest_path) | |
.collect(), | |
) | |
.await | |
.map_err(|err| { | |
// Log or handle error | |
eprintln!("Failed to collect manifest files: {:?}", err); | |
err | |
})?; | |
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new(); | |
let mut selected_files = manifest_files | |
.into_iter() | |
.flat_map(|file| file.files) | |
.rev() | |
.collect_vec(); | |
for filter in time_filter_expr { | |
selected_files.retain(|file| !file.can_be_pruned(&filter)) | |
} | |
selected_files | |
.into_iter() | |
.map(|file| { | |
let date = file.file_path.split("/").collect_vec(); | |
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
let date = RelativePathBuf::from_iter(date); | |
parquet_files.entry(date).or_default().push(file); | |
}) | |
.for_each(|_| {}); | |
Ok(parquet_files) | |
} |
src/enterprise/utils.rs
Outdated
async fn collect_manifest_files( | ||
storage: Arc<dyn ObjectStorage>, | ||
manifest_urls: Vec<String>, | ||
) -> Result<Vec<Manifest>, object_store::Error> { | ||
let mut tasks = Vec::new(); | ||
manifest_urls.into_iter().for_each(|path| { | ||
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap(); | ||
let storage = Arc::clone(&storage); | ||
tasks.push(tokio::task::spawn(async move { | ||
storage.get_object(&path).await.unwrap() | ||
})); | ||
}); | ||
|
||
let mut op = Vec::new(); | ||
for task in tasks { | ||
let file = task.await.unwrap(); | ||
op.push(file); | ||
} | ||
|
||
Ok(op | ||
.into_iter() | ||
.map(|res| serde_json::from_slice(&res).unwrap()) | ||
.collect()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle unwrapping in spawned tasks to avoid panics.
Within collect_manifest_files
, both the spawned task (storage.get_object(&path).await.unwrap()
) and the task join (task.await.unwrap()
) use unwrap()
. Additionally, deserialization (serde_json::from_slice(&res).unwrap()
) can also panic. Use error handling or propagate errors back to the caller to ensure robust, crash-free behavior.
--- a/src/enterprise/utils.rs
+++ b/src/enterprise/utils.rs
@@ -145,7 +145,7 @@ async fn collect_manifest_files(
tasks.push(tokio::task::spawn(async move {
- storage.get_object(&path).await.unwrap()
+ storage.get_object(&path).await
}));
});
@@ -152,7 +152,11 @@ async fn collect_manifest_files(
for task in tasks {
- let file = task.await.unwrap();
+ let file = match task.await {
+ Ok(Ok(bytes)) => bytes,
+ Ok(Err(e)) | Err(e) => return Err(e.into()), // handle or propagate
+ };
op.push(file);
}
@@ -158,7 +162,11 @@ async fn collect_manifest_files(
.map(|res| serde_json::from_slice(&res).unwrap())
+ .map(|res| {
+ serde_json::from_slice(&res)
+ .map_err(|e| object_store::Error::Generic { store: "S3".to_string(), source: Box::new(e) })
+ })
+ .collect::<Result<Vec<_>, _>>()?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn collect_manifest_files( | |
storage: Arc<dyn ObjectStorage>, | |
manifest_urls: Vec<String>, | |
) -> Result<Vec<Manifest>, object_store::Error> { | |
let mut tasks = Vec::new(); | |
manifest_urls.into_iter().for_each(|path| { | |
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap(); | |
let storage = Arc::clone(&storage); | |
tasks.push(tokio::task::spawn(async move { | |
storage.get_object(&path).await.unwrap() | |
})); | |
}); | |
let mut op = Vec::new(); | |
for task in tasks { | |
let file = task.await.unwrap(); | |
op.push(file); | |
} | |
Ok(op | |
.into_iter() | |
.map(|res| serde_json::from_slice(&res).unwrap()) | |
.collect()) | |
} | |
async fn collect_manifest_files( | |
storage: Arc<dyn ObjectStorage>, | |
manifest_urls: Vec<String>, | |
) -> Result<Vec<Manifest>, object_store::Error> { | |
let mut tasks = Vec::new(); | |
manifest_urls.into_iter().for_each(|path| { | |
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap(); | |
let storage = Arc::clone(&storage); | |
tasks.push(tokio::task::spawn(async move { | |
storage.get_object(&path).await | |
})); | |
}); | |
let mut op = Vec::new(); | |
for task in tasks { | |
let file = match task.await { | |
Ok(Ok(bytes)) => bytes, | |
Ok(Err(e)) | Err(e) => return Err(e.into()), // handle or propagate | |
}; | |
op.push(file); | |
} | |
Ok(op | |
.into_iter() | |
.map(|res| { | |
serde_json::from_slice(&res) | |
.map_err(|e| object_store::Error::Generic { | |
store: "S3".to_string(), | |
source: Box::new(e), | |
}) | |
}) | |
.collect::<Result<Vec<_>, _>>()?) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/enterprise/utils.rs (1)
157-157
:⚠️ Potential issueAvoid panics caused by
.expect(...)
.Deserialization errors in
.expect("Data is invalid for Manifest")
will panic if invalid data is encountered. Propagate the error upward or handle it gracefully to improve robustness.Suggested refactor:
.map(|res| { - serde_json::from_slice(&res).expect("Data is invalid for Manifest") + serde_json::from_slice(&res).map_err(|e| { + ObjectStorageError::Generic(format!("Invalid Manifest data: {:?}", e)) + })? })
🧹 Nitpick comments (2)
src/enterprise/utils.rs (2)
20-61
: Add unit tests for time filter edge cases.While the create_time_filter function appears conceptually sound, it would be good to have unit tests verifying behavior for boundary conditions (e.g., zero-width time ranges or invalid time partitions). This will improve reliability and reduce the risk of off-by-one or range-related errors.
63-159
: Consider adding integration tests for parallel manifest fetching.While the asynchronous fetch approach is straightforward, verifying concurrency performance and error handling in real-world conditions would be beneficial. An integration test or robust mocking could ensure correct handling of partial failures and network timeouts.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/catalog/mod.rs
(2 hunks)src/enterprise/utils.rs
(1 hunks)src/storage/mod.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/catalog/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
🔇 Additional comments (2)
src/storage/mod.rs (2)
23-23
: Import added for new error handling capability.The import for
JoinError
fromtokio::task
module is correctly added to support the new error variant.
259-260
: Well-structured error variant for async task handling.The new
JoinError
variant in theObjectStorageError
enum is properly implemented with the#[from]
attribute for automatic conversion fromtokio::task::JoinError
. This addition enhances the error handling capabilities for async operations in the object storage system, which aligns with the PR's goal of implementing necessary changes for Parseable Enterprise.
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | ||
|
||
let date = RelativePathBuf::from_iter(date); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for potential out-of-bounds slice access.
Splitting on "/"
and calling date.as_slice()[1..4]
can panic if the path has fewer than four segments. Handle or verify the path structure before slicing to prevent a runtime panic.
Possible fix:
- let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
+ if date.len() < 4 {
+ // Decide whether to skip this file or handle the error
+ return Err(
+ ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
+ );
+ }
+ let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
let date = RelativePathBuf::from_iter(date); | |
if date.len() < 4 { | |
// Decide whether to skip this file or handle the error | |
return Err( | |
ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date)) | |
); | |
} | |
let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); | |
let date = RelativePathBuf::from_iter(date); |
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
utils
module for enhanced utilities that support time-based filtering and efficient file retrieval from storage.Index
mode, restricting access to specific endpoints and improving middleware functionality.Index
mode, providing clearer feedback.