Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes required for Parseable Enterprise #1215

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

parmesant
Copy link
Contributor

@parmesant parmesant commented Feb 28, 2025

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Introduced a new utils module for enhanced utilities that support time-based filtering and efficient file retrieval from storage.
    • Added a new Index mode, restricting access to specific endpoints and improving middleware functionality.
    • Expanded external integration by exposing additional functionality for buffered access and metadata retrieval across various storage solutions.
  • Improvements
    • Streamlined file error notifications to enhance clarity when files are unavailable.
    • Enhanced accessibility of several modules, allowing for broader usage across the codebase.
    • Introduced a new error handling case for unsupported operations in Index mode, providing clearer feedback.

Copy link

coderabbitai bot commented Feb 28, 2025

Walkthrough

This pull request introduces a new public enterprise utility module along with its associated functions for generating time filters and retrieving parquet file paths from an object storage system. It also adjusts module visibility in multiple parts of the codebase, simplifies log formatting in the stream processing module, and adds new asynchronous methods—get_buffered_reader and head—to various object storage implementations including Azure, LocalFS, and S3. Additionally, the S3 client type is simplified by removing a wrapper. Overall, the changes extend public APIs and add new functionality for object metadata and buffered reading.

Changes

File(s) Change Summary
src/enterprise/mod.rs and src/lib.rs Added new public module enterprise and updated module visibility (catalog, query, utils) to public; added pub mod utils; in enterprise.
src/enterprise/utils.rs New file defining public functions create_time_filter and fetch_parquet_file_paths, and a private async function collect_manifest_files for time-based filtering and parquet file retrieval.
src/parseable/streams.rs Simplified warning log formatting in the convert_disk_files_to_parquet method from a multi-line block to a single line.
src/query/stream_schema_provider.rs Updated the ManifestExt trait visibility from private to public.
src/storage/azure_blob.rs Added async methods get_buffered_reader and head (currently unimplemented) to the BlobStore implementation; updated import to include ObjectMeta.
src/storage/localfs.rs Added async methods get_buffered_reader and head (currently unimplemented) to the LocalFS implementation of the ObjectStorage trait.
src/storage/object_storage.rs Added async methods get_buffered_reader and head to the ObjectStorage trait definition.
src/storage/s3.rs Removed the LimitStore wrapper from the S3 client type, simplifying it from LimitStore<AmazonS3> to AmazonS3; added async methods get_buffered_reader and head; added necessary BufReader import.
src/catalog/mod.rs Introduced error handling for Mode::Index in remove_manifest_from_snapshot and get_first_event functions.
src/handlers/http/middleware.rs Added Mode::Index to ModeFilterMiddleware for endpoint access restrictions.
src/main.rs Handled Mode::Index case in the main application to print a message and exit.
src/option.rs Added Index variant to Mode enum and updated mode function to handle it.
src/parseable/mod.rs Enhanced get_server_mode_string method to return a string for Mode::Index.
src/storage/store_metadata.rs Added Mode::Index case to resolve_parseable_metadata function.
src/storage/mod.rs Introduced JoinError variant to ObjectStorageError enum for better error handling.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Fetcher as fetch_parquet_file_paths
    participant Filter as create_time_filter
    participant OS as ObjectStorage
    participant Collector as collect_manifest_files

    Caller->>Fetcher: Call fetch_parquet_file_paths(stream, time_range)
    Fetcher->>Filter: Call create_time_filter(time_range, partition, table)
    Filter-->>Fetcher: Return time filter expressions
    Fetcher->>OS: Retrieve snapshot and manifest URLs
    OS-->>Fetcher: Return matching objects
    Fetcher->>Collector: Spawn async tasks to collect manifest files
    Collector-->>Fetcher: Return list of manifests
    Fetcher->>Caller: Return mapping of parquet paths by date
Loading
sequenceDiagram
    participant Client
    participant Storage as ObjectStorage (S3, LocalFS, AzureBlob)

    Client->>Storage: Call get_buffered_reader(path)
    Storage-->>Client: Return BufReader (data stream)
    Client->>Storage: Call head(path)
    Storage-->>Client: Return ObjectMeta (metadata)
Loading

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

In the code garden under the moonlit sky,
A little rabbit hops with a gleam in its eye.
New modules bloom and functions dance,
Like ASCII carrots, they enhance the chance.
With a skip and a hop, the changes sing—
CodeRabbit cheers for every new spring!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@nitisht nitisht requested a review from de-sh February 28, 2025 18:41
@nitisht nitisht marked this pull request as ready for review February 28, 2025 18:41
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
src/storage/azure_blob.rs (2)

427-432: Unimplemented get_buffered_reader method
This stub method currently returns unimplemented!(). Consider implementing a basic version to match the functionality in other storage backends, such as S3. Otherwise, ensure proper error handling or documentation clarifying that it's not yet supported.

Do you want me to propose a reference implementation that mirrors the approach in s3.rs?


433-435: Unimplemented head method
Without a working head implementation, calls to retrieve object metadata will fail on Azure storage. Consider returning a NotImplemented error or implementing the metadata retrieval logic.

src/storage/object_storage.rs (1)

79-83: Trait methods for buffered reading and metadata (API consistency)
Adding get_buffered_reader and head to the ObjectStorage trait enforces consistency across providers. Ensure that all existing implementations (e.g., localfs, azure, s3) follow through with a valid or at least a gracefully failing implementation.

src/enterprise/utils.rs (2)

63-135: fetch_parquet_file_paths for retrieving and filtering manifest files
This function merges snapshots, fetches manifest files, and applies time-based filters. Consider additional checks:
• Handling partial JSON parse failures gracefully (skip vs. abort).
• Ensuring date path parsing remains robust for different file naming conventions.


137-161: Async fetching of manifests in collect_manifest_files
Spawning tasks for each manifest file concurrently can improve performance. However, ensure partial failures (e.g., a single corrupted file) do not affect overall success. Consider adding timeouts or concurrency limits if the number of files is large.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f7d366e and 48bd263.

📒 Files selected for processing (9)
  • src/enterprise/mod.rs (1 hunks)
  • src/enterprise/utils.rs (1 hunks)
  • src/lib.rs (2 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
  • src/storage/azure_blob.rs (2 hunks)
  • src/storage/localfs.rs (2 hunks)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/s3.rs (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/enterprise/mod.rs
🔇 Additional comments (10)
src/lib.rs (1)

24-24: Module visibility changes and new enterprise module addition.

These changes make several internal modules public and add a new enterprise module. This expands the library's public API surface, which is aligned with the PR objectives for Parseable Enterprise.

Make sure that:

  1. These modules' public APIs are well-documented
  2. You've considered API stability implications
  3. The new enterprise module follows the same design patterns as other modules

Also applies to: 29-29, 41-41, 49-49

src/parseable/streams.rs (1)

516-517: Simplified warning log format.

The warning log has been simplified from a multi-line format to a single line, which makes the logging more concise.

src/query/stream_schema_provider.rs (1)

897-897:

❓ Verification inconclusive

Made ManifestExt trait public.

This change makes the ManifestExt trait accessible outside its module, which is consistent with making the query module public in lib.rs. The trait provides useful methods for filtering and pruning data based on query conditions.


🏁 Script executed:

#!/bin/bash
# Check if the ManifestExt trait is being used in the new enterprise module
rg -A 2 -B 2 "ManifestExt" --include="*.rs" ./src/enterprise/

Length of output: 503


ManifestExt Trait Made Public – Further Verification Required

The change to make the ManifestExt trait public remains valid as it aligns with exposing the query module. However, the verification command intended to check its usage in the enterprise module returned an error due to an unsupported --include flag. Please manually verify or update the search command (for example, using --glob "*.rs") to confirm that the trait is referenced as expected in the src/enterprise/ code.

src/storage/localfs.rs (1)

31-31: Added new ObjectStore imports for buffered reading and metadata.

The import for BufReader and ObjectMeta supports the new methods being added to the ObjectStorage trait implementation.

src/storage/azure_blob.rs (1)

38-41: Potential usage of newly imported BufReader
The buffered::BufReader import suggests upcoming buffered reading support, but it is not yet referenced in the existing code (beyond the added trait method). Ensure that usage is fully integrated and tested to avoid dead code or missing functionality.

src/storage/object_storage.rs (1)

36-37: New imports for trait enhancements
The import of BufReader and ObjectMeta is essential for the added methods (get_buffered_reader and head). This extension allows for more versatile and standardized object I/O operations in the trait.

src/storage/s3.rs (2)

40-43: Updated imports for buffered reading and object metadata retrieval
The addition of BufReader, BackoffConfig, and ObjectMeta imports aligns with new S3 functionalities for metadata and streamed reads. Ensure consistent modern error handling patterns across the code.


333-333: Removal of LimitStore wrapper
Switching the client field to use AmazonS3 directly can simplify architecture but may remove concurrency or rate-limiting controls that LimitStore provided. Verify if this is an intentional design choice and whether it affects performance under high load.

src/enterprise/utils.rs (2)

1-19: Initial imports for enterprise utilities
Necessary crates like datafusion, itertools, and relative_path are introduced. This setup appears aligned with upcoming filtering and file path generation. Verify that none of these additions significantly increase build overhead or introduce overlapping functionality with existing imports.


20-61: create_time_filter for generating start/end expressions
The logic handles both partitioned and non-partitioned timestamps. Verify that the inclusive/exclusive bounds (Bound::Included vs. Bound::Excluded) match the desired time slice. Also consider adding unit tests covering edge cases.

@nitisht nitisht marked this pull request as draft February 28, 2025 18:46
@parmesant parmesant force-pushed the enterprise-changes branch from 3ea414e to 03687d6 Compare March 1, 2025 09:18
@parmesant parmesant force-pushed the enterprise-changes branch from 03687d6 to 79321a4 Compare March 1, 2025 09:19
@parmesant parmesant marked this pull request as ready for review March 1, 2025 09:19
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/catalog/mod.rs (1)

354-354: ⚠️ Potential issue

Replace unimplemented!() with a proper error handling approach

Similar to the previous comment, using unimplemented!() will cause a runtime panic if this code path is executed.

Replace with a more graceful error handling approach:

- Mode::Index => unimplemented!(),
+ Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
+     std::io::Error::new(
+         std::io::ErrorKind::Unsupported,
+         "Index mode not yet implemented for get_first_event"
+     )
+ ))),
🧹 Nitpick comments (4)
src/main.rs (1)

42-45: Consider adding clarity about program termination

The implementation correctly informs users that indexing is an enterprise feature, but it might be helpful to explicitly mention that the program will exit after displaying this message.

Additionally, consider whether an exit code of 0 (which indicates successful execution) is the most appropriate choice for this scenario.

src/handlers/http/middleware.rs (1)

361-378: Mode::Index middleware implementation follows established patterns

The implementation correctly restricts access to endpoints except those containing "create" or "delete" when in Index mode, following the same pattern as other mode implementations.

However, you might want to consider documenting what "Index API" specifically refers to, either in a comment or in external documentation, to make it clearer for other developers what functionality is available in this mode.

src/storage/s3.rs (1)

330-330: Consider maintaining a concurrency limit as in the datafusion runtime setup.

The construct_client method now uses a direct AmazonS3 client rather than the LimitStore<AmazonS3> wrapper. If you wish to preserve the concurrency-limiting behavior from the datafusion runtime in this path, reintroduce a concurrency wrapper or clarify if this new usage is intentionally unlimited.

src/enterprise/utils.rs (1)

20-61: Ensure the time bounds align with the intended semantics.

Currently, the lower bound is inclusive, and the upper bound is exclusive. Confirm that these match the expected time range filtering semantics.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 48bd263 and 79321a4.

📒 Files selected for processing (15)
  • src/catalog/mod.rs (2 hunks)
  • src/enterprise/mod.rs (1 hunks)
  • src/enterprise/utils.rs (1 hunks)
  • src/handlers/http/middleware.rs (1 hunks)
  • src/lib.rs (2 hunks)
  • src/main.rs (2 hunks)
  • src/option.rs (2 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/query/stream_schema_provider.rs (1 hunks)
  • src/storage/azure_blob.rs (2 hunks)
  • src/storage/localfs.rs (2 hunks)
  • src/storage/object_storage.rs (4 hunks)
  • src/storage/s3.rs (3 hunks)
  • src/storage/store_metadata.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/enterprise/mod.rs
  • src/query/stream_schema_provider.rs
  • src/storage/azure_blob.rs
  • src/lib.rs
  • src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: coverage
🔇 Additional comments (12)
src/storage/store_metadata.rs (1)

162-167: Implementation looks good for Index mode

The implementation for Mode::Index correctly follows the same pattern as Mode::Ingest, updating the server mode and staging directory in the metadata.

src/main.rs (1)

1-2: Added import for process exit

Import correctly added for the new exit functionality.

src/option.rs (2)

25-25: Index mode added to Mode enum

The new Mode::Index variant is correctly added to the enum.


132-132: Parsing for "index" string correctly implemented

The mode function is properly updated to handle the new "index" string input.

src/parseable/mod.rs (1)

246-246: LGTM: Mode::Index variant correctly added

This change adds proper handling for the new Mode::Index variant, which now returns "Distributed (Index)" in the string representation of server modes.

src/storage/object_storage.rs (3)

79-83: LGTM: New storage interface methods enhance functionality

The addition of get_buffered_reader and head methods to the ObjectStorage trait provides more efficient ways to interact with stored objects. The get_buffered_reader allows streaming data with buffering for better memory usage, while the head method enables checking metadata without downloading the entire object.


823-826: LGTM: Schema path generation updated for Index mode

The schema path generation function has been properly updated to include the new Mode::Index variant in its control flow.


841-846: LGTM: Stream JSON path generation updated for Index mode

The stream JSON path generation function has been properly updated to include the new Mode::Index variant in its control flow.

src/storage/localfs.rs (1)

107-125: LGTM: Proper error handling for unimplemented methods

The implementation of the new get_buffered_reader and head methods correctly returns meaningful error messages instead of causing runtime panics. This approach allows the code to gracefully handle these cases while indicating that the functionality is not yet supported.

This implementation follows the best practice suggested in previous code reviews, providing clear error messages that will help developers understand why the operation failed.

src/storage/s3.rs (3)

40-43: Great addition of the BufReader and related imports.

These imports enable buffered read capabilities, which is crucial for streaming large objects efficiently. No immediate concerns here.


558-568: Check for potential error states and large object edge cases.

The get_buffered_reader method gracefully returns an error if the head call fails. This is correct. However, for extremely large objects, ensure that partial reads or network timeouts are handled appropriately upstream, especially in high-latency environments.


569-571: Direct pass-through of metadata retrieval looks good.

This head method is straightforward and properly propagates errors. No issues found.

Comment on lines 63 to 135
pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();

let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();

let time_partition = object_store_format.time_partition;

let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);

let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);

let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();

let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}

let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await
.unwrap();

let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();

let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();

for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}

selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});

Ok(parquet_files)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using unwrap() when collecting manifest files.

On line 107, collect_manifest_files(...).await.unwrap() can cause a panic if an error occurs. Instead, consider propagating or handling the error gracefully to prevent unexpected runtime failures.

--- a/src/enterprise/utils.rs
+++ b/src/enterprise/utils.rs
@@ -106,7 +106,11 @@ pub async fn fetch_parquet_file_paths(
     .await
-    .unwrap();
+    .map_err(|err| {
+        // Log or handle error
+        eprintln!("Failed to collect manifest files: {:?}", err);
+        err
+    })?;

 // Then decide how the function returns or logs the error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();
let time_partition = object_store_format.time_partition;
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}
let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await
.unwrap();
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();
for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}
selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});
Ok(parquet_files)
}
pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();
let time_partition = object_store_format.time_partition;
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}
let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await
.map_err(|err| {
// Log or handle error
eprintln!("Failed to collect manifest files: {:?}", err);
err
})?;
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();
for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}
selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});
Ok(parquet_files)
}

Comment on lines 137 to 160
async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await.unwrap()
}));
});

let mut op = Vec::new();
for task in tasks {
let file = task.await.unwrap();
op.push(file);
}

Ok(op
.into_iter()
.map(|res| serde_json::from_slice(&res).unwrap())
.collect())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle unwrapping in spawned tasks to avoid panics.

Within collect_manifest_files, both the spawned task (storage.get_object(&path).await.unwrap()) and the task join (task.await.unwrap()) use unwrap(). Additionally, deserialization (serde_json::from_slice(&res).unwrap()) can also panic. Use error handling or propagate errors back to the caller to ensure robust, crash-free behavior.

--- a/src/enterprise/utils.rs
+++ b/src/enterprise/utils.rs
@@ -145,7 +145,7 @@ async fn collect_manifest_files(
         tasks.push(tokio::task::spawn(async move {
-            storage.get_object(&path).await.unwrap()
+            storage.get_object(&path).await
         }));
     });

@@ -152,7 +152,11 @@ async fn collect_manifest_files(
 for task in tasks {
-    let file = task.await.unwrap();
+    let file = match task.await {
+        Ok(Ok(bytes)) => bytes,
+        Ok(Err(e)) | Err(e) => return Err(e.into()), // handle or propagate
+    };

     op.push(file);
 }

@@ -158,7 +162,11 @@ async fn collect_manifest_files(
     .map(|res| serde_json::from_slice(&res).unwrap())
+    .map(|res| {
+        serde_json::from_slice(&res)
+            .map_err(|e| object_store::Error::Generic { store: "S3".to_string(), source: Box::new(e) })
+    })
+    .collect::<Result<Vec<_>, _>>()?
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await.unwrap()
}));
});
let mut op = Vec::new();
for task in tasks {
let file = task.await.unwrap();
op.push(file);
}
Ok(op
.into_iter()
.map(|res| serde_json::from_slice(&res).unwrap())
.collect())
}
async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await
}));
});
let mut op = Vec::new();
for task in tasks {
let file = match task.await {
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) | Err(e) => return Err(e.into()), // handle or propagate
};
op.push(file);
}
Ok(op
.into_iter()
.map(|res| {
serde_json::from_slice(&res)
.map_err(|e| object_store::Error::Generic {
store: "S3".to_string(),
source: Box::new(e),
})
})
.collect::<Result<Vec<_>, _>>()?)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/enterprise/utils.rs (1)

157-157: ⚠️ Potential issue

Avoid panics caused by .expect(...).

Deserialization errors in .expect("Data is invalid for Manifest") will panic if invalid data is encountered. Propagate the error upward or handle it gracefully to improve robustness.

Suggested refactor:

 .map(|res| {
-    serde_json::from_slice(&res).expect("Data is invalid for Manifest")
+    serde_json::from_slice(&res).map_err(|e| {
+        ObjectStorageError::Generic(format!("Invalid Manifest data: {:?}", e))
+    })?
})
🧹 Nitpick comments (2)
src/enterprise/utils.rs (2)

20-61: Add unit tests for time filter edge cases.

While the create_time_filter function appears conceptually sound, it would be good to have unit tests verifying behavior for boundary conditions (e.g., zero-width time ranges or invalid time partitions). This will improve reliability and reduce the risk of off-by-one or range-related errors.


63-159: Consider adding integration tests for parallel manifest fetching.

While the asynchronous fetch approach is straightforward, verifying concurrency performance and error handling in real-world conditions would be beneficial. An integration test or robust mocking could ensure correct handling of partial failures and network timeouts.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 79321a4 and 6a9fc23.

📒 Files selected for processing (3)
  • src/catalog/mod.rs (2 hunks)
  • src/enterprise/utils.rs (1 hunks)
  • src/storage/mod.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/catalog/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (2)
src/storage/mod.rs (2)

23-23: Import added for new error handling capability.

The import for JoinError from tokio::task module is correctly added to support the new error variant.


259-260: Well-structured error variant for async task handling.

The new JoinError variant in the ObjectStorageError enum is properly implemented with the #[from] attribute for automatic conversion from tokio::task::JoinError. This addition enhances the error handling capabilities for async operations in the object storage system, which aligns with the PR's goal of implementing necessary changes for Parseable Enterprise.

Comment on lines +125 to +127
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check for potential out-of-bounds slice access.

Splitting on "/" and calling date.as_slice()[1..4] can panic if the path has fewer than four segments. Handle or verify the path structure before slicing to prevent a runtime panic.

Possible fix:

- let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
+ if date.len() < 4 {
+     // Decide whether to skip this file or handle the error
+     return Err(
+         ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
+     );
+ }
+ let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
if date.len() < 4 {
// Decide whether to skip this file or handle the error
return Err(
ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
);
}
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant