Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Event per log, streamline data handling #1209

Merged
merged 9 commits into from
Feb 28, 2025

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Feb 24, 2025

Fixes #XXXX.

Description

  1. fixes custom partitioning in kafka code
  2. Streamlines data handling after Event is created

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Enhanced event creation now includes metadata such as ingestion timestamps and supports custom partitioning, offering richer insights during data processing.
  • Refactor

    • Streamlined the event and log ingestion pipelines to simplify data conversion and improve overall efficiency.
    • Simplified the handling of log entries by directly utilizing the event structure, reducing complexity in the processing logic.
    • Updated event processing logic to use a more direct approach, eliminating unnecessary intermediate steps.
    • Adjusted date handling in event statistics to improve clarity and efficiency.

Copy link

coderabbitai bot commented Feb 24, 2025

Walkthrough

The changes refactor event-processing logic across multiple modules. In the Kafka connector, the event-building method now directly creates an event using json::Event::new and into_event, eliminating intermediate vector construction. The JSON event struct is updated with a renamed field and an added ingestion timestamp alongside new utility functions for partition extraction and timestamp parsing. The EventFormat trait now incorporates the new into_event method. Additionally, HTTP ingestion and modal utilities have been simplified by removing redundant steps and tests.

Changes

Files Changes Summary
src/connectors/kafka/processor.rs Updated build_event_from_chunk to directly construct events using json::Event::new and into_event, removing the json_vec logic and extra payload handling, with concise debug logging.
src/event/format/json.rs,
src/event/format/mod.rs
Revised Event struct (renamed data to json, added p_timestamp) with new constructor new, into_event method, and utility functions for custom partitions and timestamp parsing. Extended the EventFormat trait with into_event.
src/handlers/http/ingest.rs,
src/handlers/http/modal/utils/ingest_utils.rs
Refactored ingestion logic to streamline event creation by directly calling json::Event::new and into_event, while removing intermediate processing steps, redundant functions, and tests.
src/event/mod.rs,
src/metadata.rs
Updated update_stats function to replace parsed_timestamp with parsed_date, changing the argument type from NaiveDateTime to NaiveDate for simplified date handling.

Possibly related PRs

  • refactor: capture ingestion time at receive #1210: The changes in the main PR are related to the modifications in the build_event_from_chunk method and the into_event method, both of which involve handling event creation and timestamp management.
  • refactor: process ain't async #1180: The changes in the main PR are related to the modifications in the process method of the ParseableSinkProcessor, as both involve significant updates to how events are processed and constructed.
  • refactor: use appropriate error types #1201: The changes in the main PR, which involve significant modifications to the build_event_from_chunk method and the event creation process, are related to the changes in the retrieved PR that also focus on the Event struct and its methods, particularly the introduction of the into_event method and modifications to error handling within the event processing logic.

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

I hopped through lines of code today,
Simplifying events in my own bunny way.
No more vectors hopping about in a mess,
Just clear paths and timestamps that impress.
With each change, my whiskers twitch in glee—
A fluffy cheer from CodeRabbit, wild and free!
🐇💕

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
src/event/format/mod.rs (1)

177-188: Consider documenting the new trait method.

The new into_event method is a significant addition to the trait, but lacks documentation explaining its purpose, parameters, and return value.

Add documentation for the method:

+    /// Converts the format into an Event with the provided context.
+    ///
+    /// # Arguments
+    /// * `stream_name` - Name of the stream
+    /// * `origin_size` - Original size of the data in bytes
+    /// * `storage_schema` - Schema map from storage
+    /// * `static_schema_flag` - Whether schema is static
+    /// * `custom_partitions` - Optional custom partitions
+    /// * `time_partition` - Optional time partition
+    /// * `schema_version` - Version of the schema
+    /// * `stream_type` - Type of the stream
+    ///
+    /// # Returns
+    /// A Result containing either the Event or an error
     fn into_event(
         self,
         stream_name: String,
         origin_size: u64,
         storage_schema: &HashMap<String, Arc<Field>>,
         static_schema_flag: bool,
         custom_partitions: Option<&String>,
         time_partition: Option<&String>,
         schema_version: SchemaVersion,
         stream_type: StreamType,
     ) -> Result<Event, AnyError>;
src/event/format/json.rs (2)

135-177: Implementation logic is straightforward, but consider clarifying error messages.

The method cleanly processes the JSON into a recordbatch and sets relevant fields. However, if an error occurs during into_recordbatch, you might consider providing more context in the returned error message for easier debugging.


325-331: Fix misspelling in test name.

Rename the function to improve readability and consistency:

- fn parse_time_parition_from_value() {
+ fn parse_time_partition_from_value() {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 245ec54 and aa37f9c.

📒 Files selected for processing (5)
  • src/connectors/kafka/processor.rs (3 hunks)
  • src/event/format/json.rs (4 hunks)
  • src/event/format/mod.rs (2 hunks)
  • src/handlers/http/ingest.rs (14 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (13)
src/event/format/mod.rs (2)

35-35: LGTM!

The import of StreamType from the storage module is appropriate for the new into_event method.


39-39: LGTM!

The reorganized import of Event and DEFAULT_TIMESTAMP_KEY from the parent module improves code organization.

src/connectors/kafka/processor.rs (2)

74-84: LGTM! Streamlined event creation.

The refactored code eliminates intermediate steps and directly creates the event, making it more efficient and easier to understand.


93-93: LGTM! Improved logging format.

The debug logging statements now use a more concise format string syntax.

Also applies to: 97-97

src/handlers/http/ingest.rs (2)

82-97: LGTM! Simplified internal stream ingestion.

The refactored code streamlines event creation for internal streams, making it more consistent with other ingestion paths.


383-385: LGTM! Consistent test refactoring.

The test cases have been updated to use the new event creation pattern consistently throughout the file, maintaining test coverage while simplifying the code.

Also applies to: 411-413, 443-445, 475-477, 493-495, 534-536, 582-584, 631-633, 680-682, 720-722, 803-805

src/handlers/http/modal/utils/ingest_utils.rs (2)

25-25: No issues found with the updated imports.


31-31: Imports look good.

src/event/format/json.rs (5)

26-26: The import of chrono is valid.


34-34: The additional imports look appropriate.


37-39: The new struct fields provide a clear separation of data and ingestion timestamp.


41-47: Constructor implementation looks succinct and clear.


200-210: Timestamp parsing implementation is correct.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

181-199: ⚠️ Potential issue

Prevent panics due to .unwrap() in custom partition extraction.

When a partition field is not present in the JSON, calling .unwrap() will panic. Replace this with safe error handling or a fallback to an empty value to avoid crashes.

-        let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+        let custom_partition_value = match json.get(custom_partition_field.trim()) {
+            Some(value) => value.clone(),
+            None => Value::Null,
+        };
🧹 Nitpick comments (2)
src/event/format/json.rs (2)

135-179: Consider enhancing error handling in the conversion process.

While the implementation is thorough, consider adding more specific error types or context to help with debugging:

 fn into_event(
     self,
     stream_name: String,
     origin_size: u64,
     storage_schema: &HashMap<String, Arc<Field>>,
     static_schema_flag: bool,
     custom_partitions: Option<&String>,
     time_partition: Option<&String>,
     schema_version: SchemaVersion,
     stream_type: StreamType,
 ) -> Result<super::Event, anyhow::Error> {
+    // Add context to errors from into_recordbatch
     let (rb, is_first_event) = self.into_recordbatch(
         storage_schema,
         static_schema_flag,
         time_partition,
         schema_version,
-    )?;
+    ).with_context(|| format!("Failed to convert JSON to RecordBatch for stream {}", stream_name))?;

     Ok(super::Event {
         rb,
         stream_name,
         origin_format: "json",
         origin_size,
         is_first_event,
         parsed_timestamp,
         time_partition: None,
         custom_partition_values,
         stream_type,
     })
 }

317-349: Consider adding more edge cases to the test suite.

While the current tests cover basic scenarios, consider adding tests for:

  • Timestamps in different formats (e.g., Unix timestamp)
  • Timestamps in different timezones
  • Invalid timezone specifications

Example test case:

#[test]
fn parse_unix_timestamp() {
    let json = json!({"timestamp": 1621123200});
    let parsed = get_parsed_timestamp(&json, "timestamp");
    
    let expected = NaiveDateTime::from_timestamp_opt(1621123200, 0).unwrap();
    assert_eq!(parsed.unwrap(), expected);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6a19a1e and ac08646.

📒 Files selected for processing (1)
  • src/event/format/json.rs (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/event/format/json.rs (2)

36-48: LGTM! Clean struct definition with proper initialization.

The Event struct changes and its constructor implementation are well-designed. The ingestion_time field addition helps track when events enter the system.


201-211: LGTM! Robust timestamp parsing implementation.

The function properly handles missing fields and parsing errors with clear error messages.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (1)

113-114: ⚠️ Potential issue

Avoid potential panic with .unwrap()
Using .unwrap() when serializing to JSON may crash if an error occurs. Recommended fix:

-        let origin_size = serde_json::to_vec(&json).unwrap().len() as u64;
+        let origin_size = serde_json::to_vec(&json)?.len() as u64;
src/event/format/json.rs (1)

185-203: ⚠️ Potential issue

Potential panic on .unwrap() in get_custom_partition_values
Calling .unwrap() for a missing key may crash. Either return an error or use a fallback:

-        let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+        let custom_partition_value = match json.get(custom_partition_field.trim()) {
+            Some(val) => val.to_owned(),
+            None => Value::Null,
+        };
🧹 Nitpick comments (3)
src/event/format/mod.rs (1)

109-110: Consider elaborating the ingestion timestamp purpose
The get_p_timestamp() method is helpful, but clarifying in the docstring whether this represents the actual ingestion time or a fallback to the current system time would enhance maintainability.

src/handlers/http/modal/utils/ingest_utils.rs (1)

117-127: Long argument list to into_event
The call passes numerous parameters, mirroring the method signature in the trait. Consider a small struct to bundle these arguments for improved readability.

src/event/format/json.rs (1)

139-182: Large method body
While this adds clear custom partition extraction and timestamp processing, consider partial refactoring to reduce complexity and improve testability (e.g., a smaller helper function for partition/timestamp logic).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ac08646 and 660a86d.

📒 Files selected for processing (4)
  • src/event/format/json.rs (4 hunks)
  • src/event/format/mod.rs (3 hunks)
  • src/handlers/http/ingest.rs (14 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (13)
src/event/format/mod.rs (2)

35-35: No concerns on new import
This import of StreamType appears straightforward and necessary for the upcoming changes.


39-39: Approved import alias
Bringing Event and DEFAULT_TIMESTAMP_KEY into scope is clear and conventional.

src/handlers/http/modal/utils/ingest_utils.rs (3)

19-19: Approved use import
Importing chrono::Utc is consistent with other parts of the codebase for timestamp handling.


26-26: No issues with expanded imports
Inclusion of json, EventFormat, and LogSource aligns with usage in this file.


32-32: Approved usage of PARSEABLE
Refactoring references to the global PARSEABLE struct is consistent with the existing architecture.

src/event/format/json.rs (8)

26-26: Approved new time imports
Introducing DateTime, NaiveDateTime, and Utc is necessary for deeper timestamp handling.


34-34: Approved additional imports
Refactoring includes references to StreamType; no issues detected.


38-39: Enhanced Event struct
Adding json and p_timestamp clarifies how the event data and ingestion time are tracked.


41-47: Convenient constructor implementation
Using Utc::now() in new ensures each event records the ingestion time by default.


53-55: Sufficient explicit timestamp retrieval
The get_p_timestamp method is straightforward and follows the trait requirement.


70-70: Approved approach to handle single-object or array
Matching on self.json to unify into a Vec<Value> is suitably concise.


205-215: Timestamp parsing logic
This function gracefully returns an error if parsing fails. No issues identified.


321-353: Tests align with new functionality
Coverage for valid, missing, and invalid timestamps is thorough.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 25, 2025
Signed-off-by: Devdutt Shenoi <[email protected]>
coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 27, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

194-194: ⚠️ Potential issue

Prevent panics due to .unwrap() in custom partition extraction.

When a partition field is not present in the JSON, calling .unwrap() will panic. Replace this with safe error handling to avoid crashes.

- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+ let custom_partition_value = match json.get(custom_partition_field.trim()) {
+     Some(value) => value.clone(),
+     None => continue, // Skip this field if not found
+ };
🧹 Nitpick comments (1)
src/event/format/json.rs (1)

326-341: Fix test naming and improve error handling in tests.

The test name contains a typo ("parition" instead of "partition") and uses unwrap() on the result.

- fn parse_time_parition_from_value() {
+ fn parse_time_partition_from_value() {
      let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
      let parsed = extract_and_parse_time(&json, "timestamp");

      let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
-     assert_eq!(parsed.unwrap(), expected);
+     assert!(parsed.is_ok());
+     assert_eq!(parsed.unwrap(), expected);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 71ac640 and 69a3b4d.

📒 Files selected for processing (3)
  • src/event/format/json.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/metadata.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (9)
src/event/mod.rs (1)

84-84: LGTM - Improved data granularity for statistics tracking.

The change from using the full timestamp to just the date part for statistics tracking makes sense. This aligns with the parameter type update in the update_stats function and focuses metrics collection at the day level.

src/metadata.rs (3)

40-40: Parameter type change simplifies date handling.

Changing from NaiveDateTime to NaiveDate streamlines the date handling in the metrics system. This is a good refactoring that removes the need for additional date extraction logic in this function.


42-42: LGTM - Consistent string conversion.

Keeping the string conversion in this method ensures consistent formatting across all metrics calls.


47-54: LGTM - Consistent handling of date metric labels.

The updates to the metrics labels correctly use the new date string format.

src/event/format/json.rs (5)

37-39: Field name improvement and timestamp tracking added.

Renaming data to json better reflects the field's purpose. The addition of p_timestamp enables tracking the ingestion time, which is valuable for diagnostics and data lineage.


42-47: LGTM - Convenient constructor pattern added.

The new constructor method simplifies object creation and automatically captures the current timestamp.


53-56: LGTM - Accessor maintains encapsulation.

The getter method for p_timestamp provides a clean interface for accessing this field.


140-184: LGTM - Streamlined event creation process.

The new into_event method creates a complete event from JSON data in one step, simplifying the event creation process across different ingestion paths.


344-357: LGTM - Test coverage for edge cases.

Good tests for error handling when the timestamp field is missing or invalid.

@nitisht nitisht merged commit f7d366e into parseablehq:main Feb 28, 2025
14 checks passed
@de-sh de-sh deleted the start-restruct branch February 28, 2025 07:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants