Skip to content

Commit

Permalink
refactor: Event per log, streamline data handling (#1209)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Devdutt Shenoi <[email protected]>
  • Loading branch information
de-sh authored Feb 28, 2025
1 parent 1b4ea73 commit f7d366e
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 279 deletions.
50 changes: 16 additions & 34 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*
*/

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use async_trait::async_trait;
use chrono::Utc;
use futures_util::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer};
use serde_json::Value;
Expand Down Expand Up @@ -58,38 +57,10 @@ impl ParseableSinkProcessor {
let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let (json_vec, total_payload_size) = Self::json_vec(records);
let batch_json_event = json::Event {
data: Value::Array(json_vec),
};

let (rb, is_first) = batch_json_event.into_recordbatch(
&schema,
Utc::now(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;

let p_event = ParseableEvent {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: total_payload_size,
is_first_event: is_first,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
};

Ok(p_event)
}

fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;

Expand All @@ -100,19 +71,30 @@ impl ParseableSinkProcessor {
}
}

(json_vec, total_payload_size)
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
)?;

Ok(p_event)
}
}

#[async_trait]
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
let len = records.len();
debug!("Processing {} records", len);
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;

debug!("Processed {} records", len);
debug!("Processed {len} records");
Ok(())
}
}
Expand Down
137 changes: 134 additions & 3 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,38 @@ use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
use arrow_schema::{DataType, Field, Fields, Schema};
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
use itertools::Itertools;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::EventFormat;
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};

pub struct Event {
pub data: Value,
pub json: Value,
pub p_timestamp: DateTime<Utc>,
}

impl Event {
pub fn new(json: Value) -> Self {
Self {
json,
p_timestamp: Utc::now(),
}
}
}

impl EventFormat for Event {
type Data = Vec<Value>;

/// Returns the time at ingestion, i.e. the `p_timestamp` value
fn get_p_timestamp(&self) -> DateTime<Utc> {
self.p_timestamp
}

// convert the incoming json to a vector of json values
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
Expand All @@ -52,7 +68,7 @@ impl EventFormat for Event {
// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
let value_arr = match self.data {
let value_arr = match self.json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
Expand Down Expand Up @@ -120,6 +136,87 @@ impl EventFormat for Event {
Ok(None) => unreachable!("all records are added to one rb"),
}
}

/// Converts a JSON event into a Parseable Event
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
extract_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};

let parsed_timestamp = match time_partition {
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
_ => self.p_timestamp.naive_utc(),
};

let (rb, is_first_event) = self.into_recordbatch(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
)?;

Ok(super::Event {
rb,
stream_name,
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
custom_partition_values,
stream_type,
})
}
}

/// Extracts custom partition values from provided JSON object
/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
pub fn extract_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}

/// Returns the parsed timestamp of deignated time partition from json object
/// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00`
fn extract_and_parse_time(
json: &Value,
time_partition: &str,
) -> Result<NaiveDateTime, anyhow::Error> {
let current_time = json
.get(time_partition)
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;
let parsed_time: DateTime<Utc> = serde_json::from_value(current_time.clone())?;

Ok(parsed_time.naive_utc())
}

// Returns arrow schema with the fields that are present in the request body
Expand Down Expand Up @@ -225,3 +322,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use serde_json::json;

use super::*;

#[test]
fn parse_time_parition_from_value() {
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
let parsed = extract_and_parse_time(&json, "timestamp");

let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
assert_eq!(parsed.unwrap(), expected);
}

#[test]
fn time_parition_not_in_json() {
let json = json!({"hello": "world!"});
let parsed = extract_and_parse_time(&json, "timestamp");

assert!(parsed.is_err());
}

#[test]
fn time_parition_not_parseable_as_datetime() {
let json = json!({"timestamp": "not time"});
let parsed = extract_and_parse_time(&json, "timestamp");

assert!(parsed.is_err());
}
}
21 changes: 19 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ use serde_json::Value;

use crate::{
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;
use super::{Event, DEFAULT_TIMESTAMP_KEY};

pub mod json;

Expand Down Expand Up @@ -105,14 +106,17 @@ pub trait EventFormat: Sized {

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

/// Returns the UTC time at ingestion
fn get_p_timestamp(&self) -> DateTime<Utc>;

fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
let p_timestamp = self.get_p_timestamp();
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;

Expand Down Expand Up @@ -173,6 +177,19 @@ pub trait EventFormat: Sized {
}
true
}

#[allow(clippy::too_many_arguments)]
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<Event, AnyError>;
}

pub fn get_existing_field_names(
Expand Down
2 changes: 1 addition & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Event {
self.origin_format,
self.origin_size,
self.rb.num_rows(),
self.parsed_timestamp,
self.parsed_timestamp.date(),
);

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
Expand Down
Loading

0 comments on commit f7d366e

Please sign in to comment.