Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Event per log, streamline data handling #1209

Merged
merged 9 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*
*/

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use async_trait::async_trait;
use chrono::Utc;
use futures_util::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer};
use serde_json::Value;
Expand Down Expand Up @@ -58,38 +57,10 @@ impl ParseableSinkProcessor {
let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let (json_vec, total_payload_size) = Self::json_vec(records);
let batch_json_event = json::Event {
data: Value::Array(json_vec),
};

let (rb, is_first) = batch_json_event.into_recordbatch(
&schema,
Utc::now(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;

let p_event = ParseableEvent {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: total_payload_size,
is_first_event: is_first,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
};

Ok(p_event)
}

fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;

Expand All @@ -100,19 +71,30 @@ impl ParseableSinkProcessor {
}
}

(json_vec, total_payload_size)
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
)?;

Ok(p_event)
}
}

#[async_trait]
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
let len = records.len();
debug!("Processing {} records", len);
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;

debug!("Processed {} records", len);
debug!("Processed {len} records");
Ok(())
}
}
Expand Down
137 changes: 134 additions & 3 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,38 @@ use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
use arrow_schema::{DataType, Field, Fields, Schema};
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
use itertools::Itertools;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::EventFormat;
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};

pub struct Event {
pub data: Value,
pub json: Value,
pub p_timestamp: DateTime<Utc>,
}

impl Event {
pub fn new(json: Value) -> Self {
Self {
json,
p_timestamp: Utc::now(),
}
}
}

impl EventFormat for Event {
type Data = Vec<Value>;

/// Returns the time at ingestion, i.e. the `p_timestamp` value
fn get_p_timestamp(&self) -> DateTime<Utc> {
self.p_timestamp
}

// convert the incoming json to a vector of json values
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
Expand All @@ -52,7 +68,7 @@ impl EventFormat for Event {
// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
let value_arr = match self.data {
let value_arr = match self.json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
Expand Down Expand Up @@ -120,6 +136,87 @@ impl EventFormat for Event {
Ok(None) => unreachable!("all records are added to one rb"),
}
}

/// Converts a JSON event into a Parseable Event
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
extract_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};

let parsed_timestamp = match time_partition {
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
_ => self.p_timestamp.naive_utc(),
};

let (rb, is_first_event) = self.into_recordbatch(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
)?;

Ok(super::Event {
rb,
stream_name,
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
custom_partition_values,
stream_type,
})
}
}

/// Extracts custom partition values from provided JSON object
/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
pub fn extract_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}

/// Returns the parsed timestamp of deignated time partition from json object
/// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00`
fn extract_and_parse_time(
json: &Value,
time_partition: &str,
) -> Result<NaiveDateTime, anyhow::Error> {
let current_time = json
.get(time_partition)
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;
let parsed_time: DateTime<Utc> = serde_json::from_value(current_time.clone())?;

Ok(parsed_time.naive_utc())
}

// Returns arrow schema with the fields that are present in the request body
Expand Down Expand Up @@ -225,3 +322,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use serde_json::json;

use super::*;

#[test]
fn parse_time_parition_from_value() {
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
let parsed = extract_and_parse_time(&json, "timestamp");

let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
assert_eq!(parsed.unwrap(), expected);
}

#[test]
fn time_parition_not_in_json() {
let json = json!({"hello": "world!"});
let parsed = extract_and_parse_time(&json, "timestamp");

assert!(parsed.is_err());
}

#[test]
fn time_parition_not_parseable_as_datetime() {
let json = json!({"timestamp": "not time"});
let parsed = extract_and_parse_time(&json, "timestamp");

assert!(parsed.is_err());
}
}
21 changes: 19 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ use serde_json::Value;

use crate::{
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;
use super::{Event, DEFAULT_TIMESTAMP_KEY};

pub mod json;

Expand Down Expand Up @@ -105,14 +106,17 @@ pub trait EventFormat: Sized {

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

/// Returns the UTC time at ingestion
fn get_p_timestamp(&self) -> DateTime<Utc>;

fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
let p_timestamp = self.get_p_timestamp();
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;

Expand Down Expand Up @@ -173,6 +177,19 @@ pub trait EventFormat: Sized {
}
true
}

#[allow(clippy::too_many_arguments)]
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<Event, AnyError>;
}

pub fn get_existing_field_names(
Expand Down
2 changes: 1 addition & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Event {
self.origin_format,
self.origin_size,
self.rb.num_rows(),
self.parsed_timestamp,
self.parsed_timestamp.date(),
);

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
Expand Down
Loading
Loading