Skip to content

Commit

Permalink
refactor: further streamline, associate w/ Parseable
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Mar 1, 2025
1 parent 303ba35 commit c2faefc
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 125 deletions.
47 changes: 13 additions & 34 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error};

use crate::{
connectors::common::processor::Processor,
event::{
format::{json, EventFormat, LogSource},
Event as ParseableEvent,
},
parseable::PARSEABLE,
connectors::common::processor::Processor, event::format::LogSource, parseable::PARSEABLE,
storage::StreamType,
};

Expand All @@ -41,10 +36,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
pub struct ParseableSinkProcessor;

impl ParseableSinkProcessor {
async fn build_event_from_chunk(
&self,
records: &[ConsumerRecord],
) -> anyhow::Result<ParseableEvent> {
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<u64> {
let stream_name = records
.first()
.map(|r| r.topic.as_str())
Expand All @@ -54,14 +46,6 @@ impl ParseableSinkProcessor {
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
.await?;

let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let time_partition_limit = stream.get_time_partition_limit();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;

Expand All @@ -72,20 +56,15 @@ impl ParseableSinkProcessor {
}
}

let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
time_partition_limit,
schema_version,
&LogSource::Custom("Kafka".to_owned()),
StreamType::UserDefined,
)?;

Ok(p_event)
PARSEABLE
.get_or_create_stream(stream_name)
.push_logs(
Value::Array(json_vec),
&LogSource::Custom("Kafka".to_owned()),
)
.await?;

Ok(total_payload_size)
}
}

Expand All @@ -95,9 +74,9 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;
let size = self.process_event_from_chunk(&records).await?;

debug!("Processed {len} records");
debug!("Processed {len} records, size = {size} Bytes");
Ok(())
}
}
Expand Down
52 changes: 26 additions & 26 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ use serde_json::Value;

use crate::event;
use crate::event::error::EventError;
use crate::event::format::{self, EventFormat, LogSource};
use crate::event::format::LogSource;
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::push_logs;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;

Expand Down Expand Up @@ -72,31 +70,21 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
return Err(PostError::OtelNotSupported);
}

push_logs(&stream_name, json, &log_source).await?;
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Ok(HttpResponse::Ok().finish())
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
let size: usize = body.len();
let json: Value = serde_json::from_slice(&body)?;
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();

// For internal streams, use old schema
format::json::Event::new(json)
.into_event(
stream_name,
size as u64,
&schema,
false,
None,
None,
None,
SchemaVersion::V0,
&LogSource::Pmeta,
StreamType::Internal,
)?
.process()?;

PARSEABLE
.get_stream(&stream_name)?
.push_logs(json, &LogSource::Pmeta)
.await?;

Ok(())
}
Expand Down Expand Up @@ -125,7 +113,10 @@ pub async fn handle_otel_logs_ingestion(
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
.await?;

push_logs(&stream_name, json, &log_source).await?;
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -156,7 +147,10 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

push_logs(&stream_name, json, &log_source).await?;
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -184,7 +178,10 @@ pub async fn handle_otel_traces_ingestion(
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
.await?;

push_logs(&stream_name, json, &log_source).await?;
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -233,7 +230,10 @@ pub async fn post_event(
return Err(PostError::OtelNotSupported);
}

push_logs(&stream_name, json, &log_source).await?;
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down
63 changes: 0 additions & 63 deletions src/handlers/http/modal/utils/ingest_utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/handlers/http/modal/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@
*
*/

pub mod ingest_utils;
pub mod logstream_utils;
pub mod rbac_utils;
35 changes: 34 additions & 1 deletion src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ use parquet::{
};
use rand::distributions::DistString;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::task::JoinSet;
use tracing::{error, info, trace, warn};

use crate::{
cli::Options,
event::DEFAULT_TIMESTAMP_KEY,
event::{
format::{json, EventFormat, LogSource},
DEFAULT_TIMESTAMP_KEY,
},
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
option::Mode,
Expand Down Expand Up @@ -109,6 +113,35 @@ impl Stream {
})
}

pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
let time_partition = self.get_time_partition();
let time_partition_limit = self.get_time_partition_limit();
let static_schema_flag = self.get_static_schema_flag();
let custom_partition = self.get_custom_partition();
let schema_version = self.get_schema_version();
let schema = self.get_schema_raw();
let stream_type = self.get_stream_type();

let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length

json::Event::new(json)
.into_event(
self.stream_name.to_owned(),
origin_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
time_partition_limit,
schema_version,
log_source,
stream_type,
)?
.process()?;

Ok(())
}

// Concatenates record batches and puts them in memory store for each event.
pub fn push(
&self,
Expand Down

0 comments on commit c2faefc

Please sign in to comment.