From f7d366e809fadb1e3b4f2a57f6b24bde58cb6dfe Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 28 Feb 2025 12:30:29 +0530 Subject: [PATCH] refactor: `Event` per log, streamline data handling (#1209) --------- Signed-off-by: Devdutt Shenoi --- src/connectors/kafka/processor.rs | 50 ++---- src/event/format/json.rs | 137 +++++++++++++++- src/event/format/mod.rs | 21 ++- src/event/mod.rs | 2 +- src/handlers/http/ingest.rs | 148 ++++++----------- src/handlers/http/modal/utils/ingest_utils.rs | 155 ++---------------- src/metadata.rs | 10 +- 7 files changed, 244 insertions(+), 279 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 6c5f6e398..b74754003 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -16,10 +16,9 @@ * */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use async_trait::async_trait; -use chrono::Utc; use futures_util::StreamExt; use rdkafka::consumer::{CommitMode, Consumer}; use serde_json::Value; @@ -58,38 +57,10 @@ impl ParseableSinkProcessor { let stream = PARSEABLE.get_stream(stream_name)?; let schema = stream.get_schema_raw(); let time_partition = stream.get_time_partition(); + let custom_partition = stream.get_custom_partition(); let static_schema_flag = stream.get_static_schema_flag(); let schema_version = stream.get_schema_version(); - let (json_vec, total_payload_size) = Self::json_vec(records); - let batch_json_event = json::Event { - data: Value::Array(json_vec), - }; - - let (rb, is_first) = batch_json_event.into_recordbatch( - &schema, - Utc::now(), - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - let p_event = ParseableEvent { - rb, - stream_name: stream_name.to_string(), - origin_format: "json", - origin_size: total_payload_size, - is_first_event: is_first, - parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::UserDefined, - }; - - Ok(p_event) - } - - fn json_vec(records: &[ConsumerRecord]) -> (Vec, u64) { let mut json_vec = Vec::with_capacity(records.len()); let mut total_payload_size = 0u64; @@ -100,7 +71,18 @@ impl ParseableSinkProcessor { } } - (json_vec, total_payload_size) + let p_event = json::Event::new(Value::Array(json_vec)).into_event( + stream_name.to_string(), + total_payload_size, + &schema, + static_schema_flag, + custom_partition.as_ref(), + time_partition.as_ref(), + schema_version, + StreamType::UserDefined, + )?; + + Ok(p_event) } } @@ -108,11 +90,11 @@ impl ParseableSinkProcessor { impl Processor, ()> for ParseableSinkProcessor { async fn process(&self, records: Vec) -> anyhow::Result<()> { let len = records.len(); - debug!("Processing {} records", len); + debug!("Processing {len} records"); self.build_event_from_chunk(&records).await?.process()?; - debug!("Processed {} records", len); + debug!("Processed {len} records"); Ok(()) } } diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 5006be142..c28b701de 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -23,6 +23,7 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; use arrow_schema::{DataType, Field, Fields, Schema}; +use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; use serde_json::Value; @@ -30,15 +31,30 @@ use std::{collections::HashMap, sync::Arc}; use tracing::error; use super::EventFormat; -use crate::{metadata::SchemaVersion, utils::arrow::get_field}; +use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; pub struct Event { - pub data: Value, + pub json: Value, + pub p_timestamp: DateTime, +} + +impl Event { + pub fn new(json: Value) -> Self { + Self { + json, + p_timestamp: Utc::now(), + } + } } impl EventFormat for Event { type Data = Vec; + /// Returns the time at ingestion, i.e. the `p_timestamp` value + fn get_p_timestamp(&self) -> DateTime { + self.p_timestamp + } + // convert the incoming json to a vector of json values // also extract the arrow schema, tags and metadata from the incoming json fn to_data( @@ -52,7 +68,7 @@ impl EventFormat for Event { // incoming event may be a single json or a json array // but Data (type defined above) is a vector of json values // hence we need to convert the incoming event to a vector of json values - let value_arr = match self.data { + let value_arr = match self.json { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => unreachable!("flatten would have failed beforehand"), @@ -120,6 +136,87 @@ impl EventFormat for Event { Ok(None) => unreachable!("all records are added to one rb"), } } + + /// Converts a JSON event into a Parseable Event + fn into_event( + self, + stream_name: String, + origin_size: u64, + storage_schema: &HashMap>, + static_schema_flag: bool, + custom_partitions: Option<&String>, + time_partition: Option<&String>, + schema_version: SchemaVersion, + stream_type: StreamType, + ) -> Result { + let custom_partition_values = match custom_partitions.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + extract_custom_partition_values(&self.json, &custom_partitions) + } + None => HashMap::new(), + }; + + let parsed_timestamp = match time_partition { + Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, + _ => self.p_timestamp.naive_utc(), + }; + + let (rb, is_first_event) = self.into_recordbatch( + storage_schema, + static_schema_flag, + time_partition, + schema_version, + )?; + + Ok(super::Event { + rb, + stream_name, + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: None, + custom_partition_values, + stream_type, + }) + } +} + +/// Extracts custom partition values from provided JSON object +/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}` +pub fn extract_custom_partition_values( + json: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +/// Returns the parsed timestamp of deignated time partition from json object +/// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00` +fn extract_and_parse_time( + json: &Value, + time_partition: &str, +) -> Result { + let current_time = json + .get(time_partition) + .ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?; + let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; + + Ok(parsed_time.naive_utc()) } // Returns arrow schema with the fields that are present in the request body @@ -225,3 +322,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion } } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use serde_json::json; + + use super::*; + + #[test] + fn parse_time_parition_from_value() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = extract_and_parse_time(&json, "timestamp"); + + let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); + assert_eq!(parsed.unwrap(), expected); + } + + #[test] + fn time_parition_not_in_json() { + let json = json!({"hello": "world!"}); + let parsed = extract_and_parse_time(&json, "timestamp"); + + assert!(parsed.is_err()); + } + + #[test] + fn time_parition_not_parseable_as_datetime() { + let json = json!({"timestamp": "not time"}); + let parsed = extract_and_parse_time(&json, "timestamp"); + + assert!(parsed.is_err()); + } +} diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e7f0707e8..ce90cfc52 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -32,10 +32,11 @@ use serde_json::Value; use crate::{ metadata::SchemaVersion, + storage::StreamType, utils::arrow::{get_field, get_timestamp_array, replace_columns}, }; -use super::DEFAULT_TIMESTAMP_KEY; +use super::{Event, DEFAULT_TIMESTAMP_KEY}; pub mod json; @@ -105,14 +106,17 @@ pub trait EventFormat: Sized { fn decode(data: Self::Data, schema: Arc) -> Result; + /// Returns the UTC time at ingestion + fn get_p_timestamp(&self) -> DateTime; + fn into_recordbatch( self, storage_schema: &HashMap>, - p_timestamp: DateTime, static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { + let p_timestamp = self.get_p_timestamp(); let (data, mut schema, is_first) = self.to_data(storage_schema, time_partition, schema_version)?; @@ -173,6 +177,19 @@ pub trait EventFormat: Sized { } true } + + #[allow(clippy::too_many_arguments)] + fn into_event( + self, + stream_name: String, + origin_size: u64, + storage_schema: &HashMap>, + static_schema_flag: bool, + custom_partitions: Option<&String>, + time_partition: Option<&String>, + schema_version: SchemaVersion, + stream_type: StreamType, + ) -> Result; } pub fn get_existing_field_names( diff --git a/src/event/mod.rs b/src/event/mod.rs index cbf0a4a5a..29a4a0899 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -81,7 +81,7 @@ impl Event { self.origin_format, self.origin_size, self.rb.num_rows(), - self.parsed_timestamp, + self.parsed_timestamp.date(), ); crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0ca1f11a5..0523e8757 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,6 +26,7 @@ use chrono::Utc; use http::StatusCode; use serde_json::Value; +use crate::event; use crate::event::error::EventError; use crate::event::format::{self, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; @@ -35,7 +36,6 @@ use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; -use crate::{event, LOCK_EXPECT}; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::flatten_and_push_logs; @@ -79,34 +79,22 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); - let now = Utc::now(); - let (rb, is_first) = { - let body_val: Value = serde_json::from_slice(&body)?; - let hash_map = PARSEABLE.streams.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let event = format::json::Event { data: body_val }; - // For internal streams, use old schema - event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)? - }; - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event: is_first, - parsed_timestamp: now.naive_utc(), - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::Internal, - } - .process()?; + let json: Value = serde_json::from_slice(&body)?; + let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); + + // For internal streams, use old schema + format::json::Event::new(json) + .into_event( + stream_name, + size as u64, + &schema, + false, + None, + None, + SchemaVersion::V0, + StreamType::Internal, + )? + .process()?; Ok(()) } @@ -351,12 +339,11 @@ mod tests { use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; - use chrono::Utc; use serde_json::json; use std::{collections::HashMap, sync::Arc}; use crate::{ - handlers::http::modal::utils::ingest_utils::into_event_batch, + event::format::{json, EventFormat}, metadata::SchemaVersion, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -393,15 +380,9 @@ mod tests { "b": "hello", }); - let (rb, _) = into_event_batch( - json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 4); @@ -427,15 +408,9 @@ mod tests { "c": null }); - let (rb, _) = into_event_batch( - json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -465,8 +440,9 @@ mod tests { .into_iter(), ); - let (rb, _) = - into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -496,9 +472,9 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err() - ); + assert!(json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .is_err()); } #[test] @@ -514,8 +490,9 @@ mod tests { .into_iter(), ); - let (rb, _) = - into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 1); @@ -554,15 +531,9 @@ mod tests { }, ]); - let (rb, _) = into_event_batch( - json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -608,15 +579,9 @@ mod tests { }, ]); - let (rb, _) = into_event_batch( - json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -663,8 +628,9 @@ mod tests { .into_iter(), ); - let (rb, _) = - into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -711,9 +677,9 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err() - ); + assert!(json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .is_err()); } #[test] @@ -751,15 +717,9 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); + let (rb, _) = json::Event::new(flattened_json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); assert_eq!( @@ -840,15 +800,9 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - Utc::now(), - false, - None, - SchemaVersion::V1, - ) - .unwrap(); + let (rb, _) = json::Event::new(flattened_json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) + .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index b6286d67b..84d5ae117 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,30 +16,22 @@ * */ -use arrow_schema::Field; -use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; +use chrono::Utc; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; use crate::{ - event::{ - format::{json, EventFormat, LogSource}, - Event, - }, + event::format::{json, EventFormat, LogSource}, handlers::http::{ ingest::PostError, kinesis::{flatten_kinesis_logs, Message}, }, - metadata::SchemaVersion, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, - parseable::{StreamNotFound, PARSEABLE}, + parseable::PARSEABLE, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, - LOCK_EXPECT, }; pub async fn flatten_and_push_logs( @@ -118,132 +110,21 @@ async fn push_logs( )?)?] }; - for value in data { - let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length - let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, - _ => p_timestamp.naive_utc(), - }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) - } - None => HashMap::new(), - }; - let schema = PARSEABLE - .streams - .read() - .unwrap() - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let (rb, is_first_event) = into_event_batch( - value, - schema, - p_timestamp, - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, - } - .process()?; + for json in data { + let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length + let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); + json::Event { json, p_timestamp } + .into_event( + stream_name.to_owned(), + origin_size, + &schema, + static_schema_flag, + custom_partition.as_ref(), + time_partition.as_ref(), + schema_version, + StreamType::UserDefined, + )? + .process()?; } Ok(()) } - -pub fn into_event_batch( - data: Value, - schema: HashMap>, - p_timestamp: DateTime, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let (rb, is_first) = json::Event { data }.into_recordbatch( - &schema, - p_timestamp, - static_schema_flag, - time_partition, - schema_version, - )?; - Ok((rb, is_first)) -} - -pub fn get_custom_partition_values( - json: &Value, - custom_partition_list: &[&str], -) -> HashMap { - let mut custom_partition_values: HashMap = HashMap::new(); - for custom_partition_field in custom_partition_list { - let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - custom_partition_values -} - -fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { - let current_time = json - .get(time_partition) - .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; - let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; - - Ok(parsed_time.naive_utc()) -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use serde_json::json; - - use super::*; - - #[test] - fn parse_time_parition_from_value() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); - assert_eq!(parsed.unwrap(), expected); - } - - #[test] - fn time_parition_not_in_json() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::MissingTimePartition(_))); - } - - #[test] - fn time_parition_not_parseable_as_datetime() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::SerdeError(_))); - } -} diff --git a/src/metadata.rs b/src/metadata.rs index a29fdfee2..f4d2e2225 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::{Local, NaiveDateTime}; +use chrono::{Local, NaiveDate}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::num::NonZeroU32; @@ -37,20 +37,20 @@ pub fn update_stats( origin: &'static str, size: u64, num_rows: usize, - parsed_timestamp: NaiveDateTime, + parsed_date: NaiveDate, ) { - let parsed_date = parsed_timestamp.date().to_string(); + let parsed_date = parsed_date.to_string(); EVENTS_INGESTED .with_label_values(&[stream_name, origin]) .add(num_rows as i64); EVENTS_INGESTED_DATE - .with_label_values(&[stream_name, origin, parsed_date.as_str()]) + .with_label_values(&[stream_name, origin, &parsed_date]) .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); EVENTS_INGESTED_SIZE_DATE - .with_label_values(&[stream_name, origin, parsed_date.as_str()]) + .with_label_values(&[stream_name, origin, &parsed_date]) .add(size as i64); LIFETIME_EVENTS_INGESTED .with_label_values(&[stream_name, origin])