Skip to content

Commit

Permalink
feat: Custom Flattening for OTEL logs, metrics and traces (#1043)
Browse files Browse the repository at this point in the history
custom flattening for OTEL data

add proto files for metrics and trace
add compiled rust files for metrics and trace protobuf files
add separate handlers for OTEL logs, metrics and traces
custom flattening added for OTEL logs and metrics

custom flattening for OTEL traces

use endpoints
`/v1/logs` for OTEL logs
`/v1/metrics` for OTEL metrics
`/v1/traces` for OTEL traces

add custom header X-P-Log-Source when using endpint `api/v1/ingest`
`otel-logs` for OTEL logs
`otel-metrics` for OTEL metrics
`otel-traces` for OTEL traces

---------

Signed-off-by: Nikhil Sinha <[email protected]>
Co-authored-by: Devdutt Shenoi <[email protected]>
  • Loading branch information
nikhilsinhaparseable and de-sh authored Jan 6, 2025
1 parent 450bac2 commit 622b9a2
Show file tree
Hide file tree
Showing 23 changed files with 1,548 additions and 894 deletions.
150 changes: 107 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ humantime-serde = "1.1"
itertools = "0.13.0"
num_cpus = "1.15"
once_cell = "1.17.1"
opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"}
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand All @@ -80,7 +81,7 @@ serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thiserror = "2.0.0"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
Expand Down
13 changes: 11 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use super::{EventFormat, LogSource, Metadata, Tags};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
Expand All @@ -52,8 +52,17 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, None, schema_version, false)?;
let data = flatten_json_body(
self.data,
None,
None,
None,
schema_version,
false,
log_source,
)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
35 changes: 35 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
Kinesis,
// OpenTelemetry sends logs according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
OtelLogs,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
OtelMetrics,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
OtelTraces,
#[default]
// Json object or array
Json,
Custom(String),
}

impl From<&str> for LogSource {
fn from(s: &str) -> Self {
match s {
"kinesis" => LogSource::Kinesis,
"otel-logs" => LogSource::OtelLogs,
"otel-metrics" => LogSource::OtelMetrics,
"otel-traces" => LogSource::OtelTraces,
custom => LogSource::Custom(custom.to_owned()),
}
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand All @@ -54,6 +86,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -64,12 +97,14 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

// DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names
Expand Down
Loading

0 comments on commit 622b9a2

Please sign in to comment.