-
-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Event
per log, streamline data handling
#1209
Conversation
WalkthroughThe changes refactor event-processing logic across multiple modules. In the Kafka connector, the event-building method now directly creates an event using Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
src/event/format/mod.rs (1)
177-188
: Consider documenting the new trait method.The new
into_event
method is a significant addition to the trait, but lacks documentation explaining its purpose, parameters, and return value.Add documentation for the method:
+ /// Converts the format into an Event with the provided context. + /// + /// # Arguments + /// * `stream_name` - Name of the stream + /// * `origin_size` - Original size of the data in bytes + /// * `storage_schema` - Schema map from storage + /// * `static_schema_flag` - Whether schema is static + /// * `custom_partitions` - Optional custom partitions + /// * `time_partition` - Optional time partition + /// * `schema_version` - Version of the schema + /// * `stream_type` - Type of the stream + /// + /// # Returns + /// A Result containing either the Event or an error fn into_event( self, stream_name: String, origin_size: u64, storage_schema: &HashMap<String, Arc<Field>>, static_schema_flag: bool, custom_partitions: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, ) -> Result<Event, AnyError>;src/event/format/json.rs (2)
135-177
: Implementation logic is straightforward, but consider clarifying error messages.The method cleanly processes the JSON into a recordbatch and sets relevant fields. However, if an error occurs during
into_recordbatch
, you might consider providing more context in the returned error message for easier debugging.
325-331
: Fix misspelling in test name.Rename the function to improve readability and consistency:
- fn parse_time_parition_from_value() { + fn parse_time_partition_from_value() {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/connectors/kafka/processor.rs
(3 hunks)src/event/format/json.rs
(4 hunks)src/event/format/mod.rs
(2 hunks)src/handlers/http/ingest.rs
(14 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (13)
src/event/format/mod.rs (2)
35-35
: LGTM!The import of
StreamType
from the storage module is appropriate for the newinto_event
method.
39-39
: LGTM!The reorganized import of
Event
andDEFAULT_TIMESTAMP_KEY
from the parent module improves code organization.src/connectors/kafka/processor.rs (2)
74-84
: LGTM! Streamlined event creation.The refactored code eliminates intermediate steps and directly creates the event, making it more efficient and easier to understand.
93-93
: LGTM! Improved logging format.The debug logging statements now use a more concise format string syntax.
Also applies to: 97-97
src/handlers/http/ingest.rs (2)
82-97
: LGTM! Simplified internal stream ingestion.The refactored code streamlines event creation for internal streams, making it more consistent with other ingestion paths.
383-385
: LGTM! Consistent test refactoring.The test cases have been updated to use the new event creation pattern consistently throughout the file, maintaining test coverage while simplifying the code.
Also applies to: 411-413, 443-445, 475-477, 493-495, 534-536, 582-584, 631-633, 680-682, 720-722, 803-805
src/handlers/http/modal/utils/ingest_utils.rs (2)
25-25
: No issues found with the updated imports.
31-31
: Imports look good.src/event/format/json.rs (5)
26-26
: The import of chrono is valid.
34-34
: The additional imports look appropriate.
37-39
: The new struct fields provide a clear separation of data and ingestion timestamp.
41-47
: Constructor implementation looks succinct and clear.
200-210
: Timestamp parsing implementation is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/event/format/json.rs (1)
181-199
:⚠️ Potential issuePrevent panics due to
.unwrap()
in custom partition extraction.When a partition field is not present in the JSON, calling
.unwrap()
will panic. Replace this with safe error handling or a fallback to an empty value to avoid crashes.- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match json.get(custom_partition_field.trim()) { + Some(value) => value.clone(), + None => Value::Null, + };
🧹 Nitpick comments (2)
src/event/format/json.rs (2)
135-179
: Consider enhancing error handling in the conversion process.While the implementation is thorough, consider adding more specific error types or context to help with debugging:
fn into_event( self, stream_name: String, origin_size: u64, storage_schema: &HashMap<String, Arc<Field>>, static_schema_flag: bool, custom_partitions: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, ) -> Result<super::Event, anyhow::Error> { + // Add context to errors from into_recordbatch let (rb, is_first_event) = self.into_recordbatch( storage_schema, static_schema_flag, time_partition, schema_version, - )?; + ).with_context(|| format!("Failed to convert JSON to RecordBatch for stream {}", stream_name))?; Ok(super::Event { rb, stream_name, origin_format: "json", origin_size, is_first_event, parsed_timestamp, time_partition: None, custom_partition_values, stream_type, }) }
317-349
: Consider adding more edge cases to the test suite.While the current tests cover basic scenarios, consider adding tests for:
- Timestamps in different formats (e.g., Unix timestamp)
- Timestamps in different timezones
- Invalid timezone specifications
Example test case:
#[test] fn parse_unix_timestamp() { let json = json!({"timestamp": 1621123200}); let parsed = get_parsed_timestamp(&json, "timestamp"); let expected = NaiveDateTime::from_timestamp_opt(1621123200, 0).unwrap(); assert_eq!(parsed.unwrap(), expected); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/event/format/json.rs
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/event/format/json.rs (2)
36-48
: LGTM! Clean struct definition with proper initialization.The
Event
struct changes and its constructor implementation are well-designed. Theingestion_time
field addition helps track when events enter the system.
201-211
: LGTM! Robust timestamp parsing implementation.The function properly handles missing fields and parsing errors with clear error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (1)
113-114
:⚠️ Potential issueAvoid potential panic with
.unwrap()
Using.unwrap()
when serializing to JSON may crash if an error occurs. Recommended fix:- let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; + let origin_size = serde_json::to_vec(&json)?.len() as u64;src/event/format/json.rs (1)
185-203
:⚠️ Potential issuePotential panic on
.unwrap()
inget_custom_partition_values
Calling.unwrap()
for a missing key may crash. Either return an error or use a fallback:- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match json.get(custom_partition_field.trim()) { + Some(val) => val.to_owned(), + None => Value::Null, + };
🧹 Nitpick comments (3)
src/event/format/mod.rs (1)
109-110
: Consider elaborating the ingestion timestamp purpose
Theget_p_timestamp()
method is helpful, but clarifying in the docstring whether this represents the actual ingestion time or a fallback to the current system time would enhance maintainability.src/handlers/http/modal/utils/ingest_utils.rs (1)
117-127
: Long argument list tointo_event
The call passes numerous parameters, mirroring the method signature in the trait. Consider a small struct to bundle these arguments for improved readability.src/event/format/json.rs (1)
139-182
: Large method body
While this adds clear custom partition extraction and timestamp processing, consider partial refactoring to reduce complexity and improve testability (e.g., a smaller helper function for partition/timestamp logic).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/event/format/json.rs
(4 hunks)src/event/format/mod.rs
(3 hunks)src/handlers/http/ingest.rs
(14 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (13)
src/event/format/mod.rs (2)
35-35
: No concerns on new import
This import ofStreamType
appears straightforward and necessary for the upcoming changes.
39-39
: Approved import alias
BringingEvent
andDEFAULT_TIMESTAMP_KEY
into scope is clear and conventional.src/handlers/http/modal/utils/ingest_utils.rs (3)
19-19
: Approved use import
Importingchrono::Utc
is consistent with other parts of the codebase for timestamp handling.
26-26
: No issues with expanded imports
Inclusion ofjson
,EventFormat
, andLogSource
aligns with usage in this file.
32-32
: Approved usage of PARSEABLE
Refactoring references to the globalPARSEABLE
struct is consistent with the existing architecture.src/event/format/json.rs (8)
26-26
: Approved new time imports
IntroducingDateTime
,NaiveDateTime
, andUtc
is necessary for deeper timestamp handling.
34-34
: Approved additional imports
Refactoring includes references toStreamType
; no issues detected.
38-39
: Enhanced Event struct
Addingjson
andp_timestamp
clarifies how the event data and ingestion time are tracked.
41-47
: Convenient constructor implementation
UsingUtc::now()
innew
ensures each event records the ingestion time by default.
53-55
: Sufficient explicit timestamp retrieval
Theget_p_timestamp
method is straightforward and follows the trait requirement.
70-70
: Approved approach to handle single-object or array
Matching onself.json
to unify into aVec<Value>
is suitably concise.
205-215
: Timestamp parsing logic
This function gracefully returns an error if parsing fails. No issues identified.
321-353
: Tests align with new functionality
Coverage for valid, missing, and invalid timestamps is thorough.
Signed-off-by: Devdutt Shenoi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/event/format/json.rs (1)
194-194
:⚠️ Potential issuePrevent panics due to
.unwrap()
in custom partition extraction.When a partition field is not present in the JSON, calling
.unwrap()
will panic. Replace this with safe error handling to avoid crashes.- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match json.get(custom_partition_field.trim()) { + Some(value) => value.clone(), + None => continue, // Skip this field if not found + };
🧹 Nitpick comments (1)
src/event/format/json.rs (1)
326-341
: Fix test naming and improve error handling in tests.The test name contains a typo ("parition" instead of "partition") and uses unwrap() on the result.
- fn parse_time_parition_from_value() { + fn parse_time_partition_from_value() { let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); let parsed = extract_and_parse_time(&json, "timestamp"); let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); - assert_eq!(parsed.unwrap(), expected); + assert!(parsed.is_ok()); + assert_eq!(parsed.unwrap(), expected); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/event/format/json.rs
(4 hunks)src/event/mod.rs
(1 hunks)src/metadata.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (9)
src/event/mod.rs (1)
84-84
: LGTM - Improved data granularity for statistics tracking.The change from using the full timestamp to just the date part for statistics tracking makes sense. This aligns with the parameter type update in the
update_stats
function and focuses metrics collection at the day level.src/metadata.rs (3)
40-40
: Parameter type change simplifies date handling.Changing from
NaiveDateTime
toNaiveDate
streamlines the date handling in the metrics system. This is a good refactoring that removes the need for additional date extraction logic in this function.
42-42
: LGTM - Consistent string conversion.Keeping the string conversion in this method ensures consistent formatting across all metrics calls.
47-54
: LGTM - Consistent handling of date metric labels.The updates to the metrics labels correctly use the new date string format.
src/event/format/json.rs (5)
37-39
: Field name improvement and timestamp tracking added.Renaming
data
tojson
better reflects the field's purpose. The addition ofp_timestamp
enables tracking the ingestion time, which is valuable for diagnostics and data lineage.
42-47
: LGTM - Convenient constructor pattern added.The new constructor method simplifies object creation and automatically captures the current timestamp.
53-56
: LGTM - Accessor maintains encapsulation.The getter method for
p_timestamp
provides a clean interface for accessing this field.
140-184
: LGTM - Streamlined event creation process.The new
into_event
method creates a complete event from JSON data in one step, simplifying the event creation process across different ingestion paths.
344-357
: LGTM - Test coverage for edge cases.Good tests for error handling when the timestamp field is missing or invalid.
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Refactor