From 622b9a255b81d22c3e75aed1cdf5994d8063d3a2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Mon, 6 Jan 2025 08:19:47 +0530 Subject: [PATCH] feat: Custom Flattening for OTEL logs, metrics and traces (#1043) custom flattening for OTEL data add proto files for metrics and trace add compiled rust files for metrics and trace protobuf files add separate handlers for OTEL logs, metrics and traces custom flattening added for OTEL logs and metrics custom flattening for OTEL traces use endpoints `/v1/logs` for OTEL logs `/v1/metrics` for OTEL metrics `/v1/traces` for OTEL traces add custom header X-P-Log-Source when using endpint `api/v1/ingest` `otel-logs` for OTEL logs `otel-metrics` for OTEL metrics `otel-traces` for OTEL traces --------- Signed-off-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Co-authored-by: Devdutt Shenoi --- Cargo.lock | 150 +++-- Cargo.toml | 5 +- src/event/format/json.rs | 13 +- src/event/format/mod.rs | 35 ++ src/handlers/http/ingest.rs | 185 ++++++- src/handlers/http/modal/server.rs | 6 +- src/handlers/http/modal/utils/ingest_utils.rs | 55 +- .../otel/opentelemetry.proto.common.v1.rs | 95 ---- .../http/otel/opentelemetry.proto.logs.v1.rs | 291 ---------- .../otel/opentelemetry.proto.resource.v1.rs | 36 -- .../http/otel/opentelemetry/proto/README.md | 2 - .../proto/common/v1/common.proto | 81 --- .../opentelemetry/proto/logs/v1/logs.proto | 203 ------- .../proto/resource/v1/resource.proto | 37 -- src/kafka.rs | 38 +- src/lib.rs | 2 +- src/{handlers/http/otel/proto.rs => otel.rs} | 24 +- src/otel/logs.rs | 162 ++++++ src/otel/metrics.rs | 524 ++++++++++++++++++ src/otel/otel_utils.rs | 162 ++++++ src/otel/traces.rs | 304 ++++++++++ src/utils/header_parsing.rs | 2 + src/utils/json/mod.rs | 30 +- 23 files changed, 1548 insertions(+), 894 deletions(-) delete mode 100644 src/handlers/http/otel/opentelemetry.proto.common.v1.rs delete mode 100644 src/handlers/http/otel/opentelemetry.proto.logs.v1.rs delete mode 100644 src/handlers/http/otel/opentelemetry.proto.resource.v1.rs delete mode 100644 src/handlers/http/otel/opentelemetry/proto/README.md delete mode 100644 src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto delete mode 100644 src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto delete mode 100644 src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto rename src/{handlers/http/otel/proto.rs => otel.rs} (64%) create mode 100644 src/otel/logs.rs create mode 100644 src/otel/metrics.rs create mode 100644 src/otel/otel_utils.rs create mode 100644 src/otel/traces.rs diff --git a/Cargo.lock b/Cargo.lock index 32fffdbaf..053897936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -248,7 +248,7 @@ dependencies = [ "pin-project", "prometheus", "quanta", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -712,7 +712,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -723,7 +723,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -990,7 +990,7 @@ dependencies = [ "semver", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -1106,7 +1106,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1336,7 +1336,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1347,7 +1347,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1776,7 +1776,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1789,7 +1789,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1811,7 +1811,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1978,7 +1978,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2938,7 +2938,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3009,7 +3009,7 @@ dependencies = [ "reqwest 0.12.8", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", "url", "validator", ] @@ -3020,6 +3020,49 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.9", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror 2.0.9", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3161,6 +3204,7 @@ dependencies = [ "object_store", "once_cell", "openid", + "opentelemetry-proto", "parquet", "path-clean", "prometheus", @@ -3183,7 +3227,7 @@ dependencies = [ "sha2", "static-files", "sysinfo", - "thiserror", + "thiserror 2.0.9", "thread-priority", "tokio", "tokio-stream", @@ -3311,7 +3355,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3351,7 +3395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" dependencies = [ "proc-macro2", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3423,7 +3467,7 @@ dependencies = [ "parking_lot", "procfs", "protobuf", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -3465,7 +3509,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.79", + "syn 2.0.87", "tempfile", ] @@ -3479,7 +3523,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3536,7 +3580,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "socket2", - "thiserror", + "thiserror 1.0.64", "tokio", "tracing", ] @@ -3553,7 +3597,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "slab", - "thiserror", + "thiserror 1.0.64", "tinyvec", "tracing", ] @@ -3864,7 +3908,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", "unicode-ident", ] @@ -4117,7 +4161,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4140,7 +4184,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4282,7 +4326,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4325,7 +4369,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4370,7 +4414,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4391,9 +4435,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -4468,7 +4512,16 @@ version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.64", +] + +[[package]] +name = "thiserror" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" +dependencies = [ + "thiserror-impl 2.0.9", ] [[package]] @@ -4479,7 +4532,18 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] @@ -4601,7 +4665,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4831,7 +4895,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4962,7 +5026,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4e71ddbefed856d5881821d6ada4e606bbb91fd332296963ed596e2ad2100f3" dependencies = [ "libc", - "thiserror", + "thiserror 1.0.64", "windows 0.52.0", ] @@ -5037,7 +5101,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5124,7 +5188,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -5158,7 +5222,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5297,7 +5361,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5308,7 +5372,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5606,7 +5670,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5628,7 +5692,7 @@ dependencies = [ "flate2", "indexmap 2.5.0", "memchr", - "thiserror", + "thiserror 1.0.64", "zopfli", ] diff --git a/Cargo.toml b/Cargo.toml index 21938aadd..d57b900ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,9 +64,10 @@ humantime-serde = "1.1" itertools = "0.13.0" num_cpus = "1.15" once_cell = "1.17.1" +opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"} prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" -rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]} +rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] } regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ @@ -80,7 +81,7 @@ serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" static-files = "0.2" sysinfo = "0.31.4" -thiserror = "1.0.64" +thiserror = "2.0.0" thread-priority = "1.0.0" tokio = { version = "1.28", default-features = false, features = [ "sync", diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 6d1cf3419..ab9116eb7 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::{EventFormat, Metadata, Tags}; +use super::{EventFormat, LogSource, Metadata, Tags}; use crate::{ metadata::SchemaVersion, utils::{arrow::get_field, json::flatten_json_body}, @@ -52,8 +52,17 @@ impl EventFormat for Event { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, None, schema_version, false)?; + let data = flatten_json_body( + self.data, + None, + None, + None, + schema_version, + false, + log_source, + )?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index b3cb8e4dd..593e82f1e 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -43,6 +43,38 @@ type Tags = String; type Metadata = String; type EventSchema = Vec>; +/// Source of the logs, used to perform special processing for certain sources +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub enum LogSource { + // AWS Kinesis sends logs in the format of a json array + Kinesis, + // OpenTelemetry sends logs according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1 + OtelLogs, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto + OtelMetrics, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 + OtelTraces, + #[default] + // Json object or array + Json, + Custom(String), +} + +impl From<&str> for LogSource { + fn from(s: &str) -> Self { + match s { + "kinesis" => LogSource::Kinesis, + "otel-logs" => LogSource::OtelLogs, + "otel-metrics" => LogSource::OtelMetrics, + "otel-traces" => LogSource::OtelTraces, + custom => LogSource::Custom(custom.to_owned()), + } + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { @@ -54,6 +86,7 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; @@ -64,12 +97,14 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data( storage_schema, static_schema_flag, time_partition, schema_version, + log_source, )?; // DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index ad6fd0089..3b5c327c7 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -20,16 +20,20 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; +use crate::event::format::LogSource; use crate::event::{ self, error::EventError, format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::STREAM_NAME_HEADER_KEY; +use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; +use crate::otel::logs::flatten_otel_logs; +use crate::otel::metrics::flatten_otel_metrics; +use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -38,6 +42,10 @@ use arrow_schema::Schema; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use nom::AsBytes; +use opentelemetry_proto::tonic::logs::v1::LogsData; +use opentelemetry_proto::tonic::metrics::v1::MetricsData; +use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -85,7 +93,13 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< metadata: String::default(), }; // For internal streams, use old schema - event.into_recordbatch(&schema, None, None, SchemaVersion::V0)? + event.into_recordbatch( + &schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + )? }; event::Event { rb, @@ -106,21 +120,102 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // Handler for POST /v1/logs to ingest OTEL logs // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn handle_otel_ingestion( +pub async fn handle_otel_logs_ingestion( req: HttpRequest, body: Bytes, ) -> Result { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { - let stream_name = stream_name.to_str().unwrap(); - create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; - push_logs(stream_name, &req, &body).await?; - } else { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelLogs { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-logs for ingesting otel logs" + ))); + } + + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_logs(&logs); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; + } + + Ok(HttpResponse::Ok().finish()) +} + +// Handler for POST /v1/metrics to ingest OTEL metrics +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn handle_otel_metrics_ingestion( + req: HttpRequest, + body: Bytes, +) -> Result { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelMetrics { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-metrics for ingesting otel metrics" + ))); + } + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_metrics(metrics); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; + } + + Ok(HttpResponse::Ok().finish()) +} + +// Handler for POST /v1/traces to ingest OTEL traces +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn handle_otel_traces_ingestion( + req: HttpRequest, + body: Bytes, +) -> Result { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelTraces { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-traces for ingesting otel traces" + ))); + } + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_traces(&traces); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; } + Ok(HttpResponse::Ok().finish()) } @@ -280,7 +375,7 @@ mod tests { use serde_json::json; use crate::{ - event, + event::{self, format::LogSource}, handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, metadata::SchemaVersion, }; @@ -329,6 +424,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -379,6 +475,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -412,7 +509,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -444,7 +550,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -462,7 +577,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -480,7 +604,8 @@ mod tests { HashMap::default(), None, None, - SchemaVersion::V0 + SchemaVersion::V0, + &LogSource::default() ) .is_err()) } @@ -512,6 +637,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -568,6 +694,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -617,7 +744,16 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -666,7 +802,16 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -701,6 +846,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -781,6 +927,7 @@ mod tests { None, None, SchemaVersion::V1, + &LogSource::default(), ) .unwrap(); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d50edb96e..34a3d00af 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -438,7 +438,7 @@ impl Server { web::resource("/logs") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_logs_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), @@ -447,7 +447,7 @@ impl Server { web::resource("/metrics") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_metrics_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), @@ -456,7 +456,7 @@ impl Server { web::resource("/traces") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_traces_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 4ea637c00..31cff6d9f 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,23 +16,22 @@ * */ -use std::{collections::HashMap, sync::Arc}; - use actix_web::HttpRequest; use anyhow::anyhow; use arrow_schema::Field; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; use serde_json::Value; +use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ self, - format::{self, EventFormat}, + format::{self, EventFormat, LogSource}, }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, @@ -47,16 +46,26 @@ pub async fn flatten_and_push_logs( let log_source = req .headers() .get(LOG_SOURCE_KEY) - .map(|header| header.to_str().unwrap_or_default()) + .map(|h| h.to_str().unwrap_or("")) + .map(LogSource::from) .unwrap_or_default(); - if log_source == LOG_SOURCE_KINESIS { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &req, &body).await?; + + match log_source { + LogSource::Kinesis => { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name, &req, &body, &LogSource::default()).await?; + } + } + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::Invalid(anyhow!( + "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + ))); + } + _ => { + push_logs(stream_name, &req, &body, &log_source).await?; } - } else { - push_logs(stream_name, &req, &body).await?; } Ok(()) } @@ -65,6 +74,7 @@ pub async fn push_logs( stream_name: &str, req: &HttpRequest, body: &Bytes, + log_source: &LogSource, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; @@ -88,6 +98,7 @@ pub async fn push_logs( &HashMap::new(), size, schema_version, + log_source, ) .await?; } else { @@ -97,6 +108,7 @@ pub async fn push_logs( None, custom_partition.as_ref(), schema_version, + log_source, )?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -116,6 +128,7 @@ pub async fn push_logs( &custom_partition_values, size, schema_version, + log_source, ) .await?; } @@ -127,6 +140,7 @@ pub async fn push_logs( time_partition_limit, None, schema_version, + log_source, )?; for value in data { parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?; @@ -141,6 +155,7 @@ pub async fn push_logs( &HashMap::new(), size, schema_version, + log_source, ) .await?; } @@ -151,6 +166,7 @@ pub async fn push_logs( time_partition_limit, custom_partition.as_ref(), schema_version, + log_source, )?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -171,6 +187,7 @@ pub async fn push_logs( &custom_partition_values, size, schema_version, + log_source, ) .await?; } @@ -190,6 +207,7 @@ pub async fn create_process_record_batch( custom_partition_values: &HashMap, origin_size: u64, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(), PostError> { let (rb, is_first_event) = get_stream_schema( stream_name, @@ -198,6 +216,7 @@ pub async fn create_process_record_batch( static_schema_flag, time_partition, schema_version, + log_source, )?; event::Event { rb, @@ -223,6 +242,7 @@ pub fn get_stream_schema( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map @@ -237,6 +257,7 @@ pub fn get_stream_schema( static_schema_flag, time_partition, schema_version, + log_source, ) } @@ -247,6 +268,7 @@ pub fn into_event_batch( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; @@ -255,8 +277,13 @@ pub fn into_event_batch( tags, metadata, }; - let (rb, is_first) = - event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; + let (rb, is_first) = event.into_recordbatch( + &schema, + static_schema_flag, + time_partition, + schema_version, + log_source, + )?; Ok((rb, is_first)) } diff --git a/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/src/handlers/http/otel/opentelemetry.proto.common.v1.rs deleted file mode 100644 index bc40d0720..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.common.v1.rs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - - // This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. - // All the repeated fields were changed to Option> and the `oneof` fields were changed to Option. - - use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Debug, Clone)] - /// AnyValue is used to represent any type of attribute value. AnyValue may contain a - /// primitive value such as a string or integer or it may contain an arbitrary nested - /// object containing arrays, key-value lists and primitives. - pub struct AnyValue { - /// The value is one of the listed fields. It is valid for all values to be unspecified - /// in which case this AnyValue is considered to be "empty". - pub value: Value, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct Value { - #[serde(rename = "stringValue")] - pub str_val: Option, - #[serde(rename = "boolValue")] - pub bool_val: Option, - #[serde(rename = "intValue")] - pub int_val: Option, - #[serde(rename = "doubleValue")] - pub double_val: Option, - #[serde(rename = "arrayValue")] - pub array_val: Option, - #[serde(rename = "keyVauleList")] - pub kv_list_val: Option, - #[serde(rename = "bytesValue")] - pub bytes_val: Option, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message - /// since oneof in AnyValue does not allow repeated fields. - pub struct ArrayValue { - /// Array of values. The array may be empty (contain 0 elements). - pub values: Vec, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message - /// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need - /// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to - /// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches - /// are semantically equivalent. - pub struct KeyValueList { - /// A collection of key/value pairs of key-value pairs. The list may be empty (may - /// contain 0 elements). - /// The keys MUST be unique (it is not allowed to have more than one - /// value with the same key). - pub values: Vec, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// KeyValue is a key-value pair that is used to store Span attributes, Link - /// attributes, etc. - pub struct KeyValue { - pub key: String, - pub value: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// InstrumentationScope is a message representing the instrumentation scope information - /// such as the fully qualified name and version. - pub struct InstrumentationScope { - /// An empty instrumentation scope name means the name is unknown. - pub name: Option, - pub version: Option, - /// Additional attributes that describe the scope. \[Optional\]. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - pub attributes: Option>, - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, - } - \ No newline at end of file diff --git a/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs b/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs deleted file mode 100644 index dc63286e3..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -// This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. - // All the repeated fields were changed to Option>. - - use crate::handlers::http::otel::proto::common::v1::InstrumentationScope; - use crate::handlers::http::otel::proto::common::v1::KeyValue; - use crate::handlers::http::otel::proto::common::v1::Value; - use crate::handlers::http::otel::proto::resource::v1::Resource; - use serde::{Deserialize, Serialize}; - - #[derive(Serialize, Deserialize, Debug)] - /// LogsData represents the logs data that can be stored in a persistent storage, - /// OR can be embedded by other protocols that transfer OTLP logs data but do not - /// implement the OTLP protocol. - /// - /// The main difference between this message and collector protocol is that - /// in this message there will not be any "control" or "metadata" specific to - /// OTLP protocol. - /// - /// When new fields are added into this message, the OTLP request MUST be updated - /// as well. - pub struct LogsData { - /// An array of ResourceLogs. - /// For data coming from a single resource this array will typically contain - /// one element. Intermediary nodes that receive data from multiple origins - /// typically batch the data before forwarding further and in that case this - /// array will contain multiple elements. - #[serde(rename = "resourceLogs")] - pub resource_logs: Option>, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A collection of ScopeLogs from a Resource. - pub struct ResourceLogs { - /// The resource for the logs in this message. - /// If this field is not set then resource info is unknown. - pub resource: Option, - /// A list of ScopeLogs that originate from a resource. - #[serde(rename = "scopeLogs")] - pub scope_logs: Option>, - /// This schema_url applies to the data in the "resource" field. It does not apply - /// to the data in the "scope_logs" field which have their own schema_url field. - #[serde(rename = "schemaUrl")] - pub schema_url: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A collection of Logs produced by a Scope. - pub struct ScopeLogs { - /// The instrumentation scope information for the logs in this message. - /// Semantically when InstrumentationScope isn't set, it is equivalent with - /// an empty instrumentation scope name (unknown). - pub scope: Option, - /// A list of log records. - #[serde(rename = "logRecords")] - pub log_records: Vec, - /// This schema_url applies to all logs in the "logs" field. - #[serde(rename = "schemaUrl")] - pub schema_url: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A log record according to OpenTelemetry Log Data Model: - /// - pub struct LogRecord { - /// time_unix_nano is the time when the event occurred. - /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - /// Value of 0 indicates unknown or missing timestamp. - #[serde(rename = "timeUnixNano")] - pub time_unix_nano: Option, - /// Time when the event was observed by the collection system. - /// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) - /// this timestamp is typically set at the generation time and is equal to Timestamp. - /// For events originating externally and collected by OpenTelemetry (e.g. using - /// Collector) this is the time when OpenTelemetry's code observed the event measured - /// by the clock of the OpenTelemetry code. This field MUST be set once the event is - /// observed by OpenTelemetry. - /// - /// For converting OpenTelemetry log data to formats that support only one timestamp or - /// when receiving OpenTelemetry log data by recipients that support only one timestamp - /// internally the following logic is recommended: - /// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. - /// - /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - /// Value of 0 indicates unknown or missing timestamp. - #[serde(rename = "observedTimeUnixNano")] - pub observed_time_unix_nano: Option, - /// Numerical value of the severity, normalized to values described in Log Data Model. - /// \[Optional\]. - #[serde(rename = "severityNumber")] - pub severity_number: Option, - /// The severity text (also known as log level). The original string representation as - /// it is known at the source. \[Optional\]. - #[serde(rename = "severityText")] - pub severity_text: Option, - /// A value containing the body of the log record. Can be for example a human-readable - /// string message (including multi-line) describing the event in a free form or it can - /// be a structured data composed of arrays and maps of other values. \[Optional\]. - pub body: Option, - /// Additional attributes that describe the specific event occurrence. \[Optional\]. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - pub attributes: Option>, - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, - /// Flags, a bit field. 8 least significant bits are the trace flags as - /// defined in W3C Trace Context specification. 24 most significant bits are reserved - /// and must be set to 0. Readers must not assume that 24 most significant bits - /// will be zero and must correctly mask the bits when reading 8-bit trace flag (use - /// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). \[Optional\]. - pub flags: Option, - /// A unique identifier for a trace. All logs from the same trace share - /// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR - /// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON - /// is zero-length and thus is also invalid). - /// - /// This field is optional. - /// - /// The receivers SHOULD assume that the log record is not associated with a - /// trace if any of the following is true: - /// - the field is not present, - /// - the field contains an invalid value. - #[serde(rename = "traceId")] - pub trace_id: Option, - /// A unique identifier for a span within a trace, assigned when the span - /// is created. The ID is an 8-byte array. An ID with all zeroes OR of length - /// other than 8 bytes is considered invalid (empty string in OTLP/JSON - /// is zero-length and thus is also invalid). - /// - /// This field is optional. If the sender specifies a valid span_id then it SHOULD also - /// specify a valid trace_id. - /// - /// The receivers SHOULD assume that the log record is not associated with a - /// span if any of the following is true: - /// - the field is not present, - /// - the field contains an invalid value. - #[serde(rename = "spanId")] - pub span_id: Option, - } - /// Possible values for LogRecord.SeverityNumber. - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum SeverityNumber { - /// UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. - Unspecified = 0, - Trace = 1, - Trace2 = 2, - Trace3 = 3, - Trace4 = 4, - Debug = 5, - Debug2 = 6, - Debug3 = 7, - Debug4 = 8, - Info = 9, - Info2 = 10, - Info3 = 11, - Info4 = 12, - Warn = 13, - Warn2 = 14, - Warn3 = 15, - Warn4 = 16, - Error = 17, - Error2 = 18, - Error3 = 19, - Error4 = 20, - Fatal = 21, - Fatal2 = 22, - Fatal3 = 23, - Fatal4 = 24, - } - impl SeverityNumber { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(severity_number: i32) -> &'static str { - match severity_number { - 0 => "SEVERITY_NUMBER_UNSPECIFIED", - 1 => "SEVERITY_NUMBER_TRACE", - 2 => "SEVERITY_NUMBER_TRACE2", - 3 => "SEVERITY_NUMBER_TRACE3", - 4 => "SEVERITY_NUMBER_TRACE4", - 5 => "SEVERITY_NUMBER_DEBUG", - 6 => "SEVERITY_NUMBER_DEBUG2", - 7 => "SEVERITY_NUMBER_DEBUG3", - 8 => "SEVERITY_NUMBER_DEBUG4", - 9 => "SEVERITY_NUMBER_INFO", - 10 => "SEVERITY_NUMBER_INFO2", - 11 => "SEVERITY_NUMBER_INFO3", - 12 => "SEVERITY_NUMBER_INFO4", - 13 => "SEVERITY_NUMBER_WARN", - 14 => "SEVERITY_NUMBER_WARN2", - 15 => "SEVERITY_NUMBER_WARN3", - 16 => "SEVERITY_NUMBER_WARN4", - 17 => "SEVERITY_NUMBER_ERROR", - 18 => "SEVERITY_NUMBER_ERROR2", - 19 => "SEVERITY_NUMBER_ERROR3", - 20 => "SEVERITY_NUMBER_ERROR4", - 21 => "SEVERITY_NUMBER_FATAL", - 22 => "SEVERITY_NUMBER_FATAL2", - 23 => "SEVERITY_NUMBER_FATAL3", - 24 => "SEVERITY_NUMBER_FATAL4", - _ => "Invalid severity number", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "SEVERITY_NUMBER_UNSPECIFIED" => Some(Self::Unspecified), - "SEVERITY_NUMBER_TRACE" => Some(Self::Trace), - "SEVERITY_NUMBER_TRACE2" => Some(Self::Trace2), - "SEVERITY_NUMBER_TRACE3" => Some(Self::Trace3), - "SEVERITY_NUMBER_TRACE4" => Some(Self::Trace4), - "SEVERITY_NUMBER_DEBUG" => Some(Self::Debug), - "SEVERITY_NUMBER_DEBUG2" => Some(Self::Debug2), - "SEVERITY_NUMBER_DEBUG3" => Some(Self::Debug3), - "SEVERITY_NUMBER_DEBUG4" => Some(Self::Debug4), - "SEVERITY_NUMBER_INFO" => Some(Self::Info), - "SEVERITY_NUMBER_INFO2" => Some(Self::Info2), - "SEVERITY_NUMBER_INFO3" => Some(Self::Info3), - "SEVERITY_NUMBER_INFO4" => Some(Self::Info4), - "SEVERITY_NUMBER_WARN" => Some(Self::Warn), - "SEVERITY_NUMBER_WARN2" => Some(Self::Warn2), - "SEVERITY_NUMBER_WARN3" => Some(Self::Warn3), - "SEVERITY_NUMBER_WARN4" => Some(Self::Warn4), - "SEVERITY_NUMBER_ERROR" => Some(Self::Error), - "SEVERITY_NUMBER_ERROR2" => Some(Self::Error2), - "SEVERITY_NUMBER_ERROR3" => Some(Self::Error3), - "SEVERITY_NUMBER_ERROR4" => Some(Self::Error4), - "SEVERITY_NUMBER_FATAL" => Some(Self::Fatal), - "SEVERITY_NUMBER_FATAL2" => Some(Self::Fatal2), - "SEVERITY_NUMBER_FATAL3" => Some(Self::Fatal3), - "SEVERITY_NUMBER_FATAL4" => Some(Self::Fatal4), - _ => None, - } - } - } - /// LogRecordFlags is defined as a protobuf 'uint32' type and is to be used as - /// bit-fields. Each non-zero value defined in this enum is a bit-mask. - /// To extract the bit-field, for example, use an expression like: - /// - /// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) - /// - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum LogRecordFlags { - /// The zero value for the enum. Should not be used for comparisons. - /// Instead use bitwise "and" with the appropriate mask as shown above. - DoNotUse = 0, - /// Bits 0-7 are used for trace flags. - TraceFlagsMask = 255, - } - impl LogRecordFlags { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(flag: u32) -> &'static str { - match flag { - 0 => "LOG_RECORD_FLAGS_DO_NOT_USE", - 255 => "LOG_RECORD_FLAGS_TRACE_FLAGS_MASK", - _ => "Invalid flag", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "LOG_RECORD_FLAGS_DO_NOT_USE" => Some(Self::DoNotUse), - "LOG_RECORD_FLAGS_TRACE_FLAGS_MASK" => Some(Self::TraceFlagsMask), - _ => None, - } - } - } - \ No newline at end of file diff --git a/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs b/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs deleted file mode 100644 index 2f102628a..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ -// This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. -// All the repeated fields were changed to Option> - -use crate::handlers::http::otel::proto::common::v1::KeyValue; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug)] -/// Resource information. -pub struct Resource { - /// Set of attributes that describe the resource. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - #[serde(rename = "attributes")] - pub attributes: Option>, - /// dropped_attributes_count is the number of dropped attributes. If the value is 0, then - /// no attributes were dropped. - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, -} diff --git a/src/handlers/http/otel/opentelemetry/proto/README.md b/src/handlers/http/otel/opentelemetry/proto/README.md deleted file mode 100644 index d0281330e..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/README.md +++ /dev/null @@ -1,2 +0,0 @@ -The following protobuf definitions are vendored from: -https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto diff --git a/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto b/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto deleted file mode 100644 index f7ee8f265..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.common.v1; - -option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.common.v1"; -option java_outer_classname = "CommonProto"; -option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; - -// AnyValue is used to represent any type of attribute value. AnyValue may contain a -// primitive value such as a string or integer or it may contain an arbitrary nested -// object containing arrays, key-value lists and primitives. -message AnyValue { - // The value is one of the listed fields. It is valid for all values to be unspecified - // in which case this AnyValue is considered to be "empty". - oneof value { - string string_value = 1; - bool bool_value = 2; - int64 int_value = 3; - double double_value = 4; - ArrayValue array_value = 5; - KeyValueList kvlist_value = 6; - bytes bytes_value = 7; - } -} - -// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message -// since oneof in AnyValue does not allow repeated fields. -message ArrayValue { - // Array of values. The array may be empty (contain 0 elements). - repeated AnyValue values = 1; -} - -// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message -// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need -// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to -// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches -// are semantically equivalent. -message KeyValueList { - // A collection of key/value pairs of key-value pairs. The list may be empty (may - // contain 0 elements). - // The keys MUST be unique (it is not allowed to have more than one - // value with the same key). - repeated KeyValue values = 1; -} - -// KeyValue is a key-value pair that is used to store Span attributes, Link -// attributes, etc. -message KeyValue { - string key = 1; - AnyValue value = 2; -} - -// InstrumentationScope is a message representing the instrumentation scope information -// such as the fully qualified name and version. -message InstrumentationScope { - // An empty instrumentation scope name means the name is unknown. - string name = 1; - string version = 2; - - // Additional attributes that describe the scope. [Optional]. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated KeyValue attributes = 3; - uint32 dropped_attributes_count = 4; -} diff --git a/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto b/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto deleted file mode 100644 index 0b4b64972..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2020, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.logs.v1; - -import "opentelemetry/proto/common/v1/common.proto"; -import "opentelemetry/proto/resource/v1/resource.proto"; - -option csharp_namespace = "OpenTelemetry.Proto.Logs.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.logs.v1"; -option java_outer_classname = "LogsProto"; -option go_package = "go.opentelemetry.io/proto/otlp/logs/v1"; - -// LogsData represents the logs data that can be stored in a persistent storage, -// OR can be embedded by other protocols that transfer OTLP logs data but do not -// implement the OTLP protocol. -// -// The main difference between this message and collector protocol is that -// in this message there will not be any "control" or "metadata" specific to -// OTLP protocol. -// -// When new fields are added into this message, the OTLP request MUST be updated -// as well. -message LogsData { - // An array of ResourceLogs. - // For data coming from a single resource this array will typically contain - // one element. Intermediary nodes that receive data from multiple origins - // typically batch the data before forwarding further and in that case this - // array will contain multiple elements. - repeated ResourceLogs resource_logs = 1; -} - -// A collection of ScopeLogs from a Resource. -message ResourceLogs { - reserved 1000; - - // The resource for the logs in this message. - // If this field is not set then resource info is unknown. - opentelemetry.proto.resource.v1.Resource resource = 1; - - // A list of ScopeLogs that originate from a resource. - repeated ScopeLogs scope_logs = 2; - - // This schema_url applies to the data in the "resource" field. It does not apply - // to the data in the "scope_logs" field which have their own schema_url field. - string schema_url = 3; -} - -// A collection of Logs produced by a Scope. -message ScopeLogs { - // The instrumentation scope information for the logs in this message. - // Semantically when InstrumentationScope isn't set, it is equivalent with - // an empty instrumentation scope name (unknown). - opentelemetry.proto.common.v1.InstrumentationScope scope = 1; - - // A list of log records. - repeated LogRecord log_records = 2; - - // This schema_url applies to all logs in the "logs" field. - string schema_url = 3; -} - -// Possible values for LogRecord.SeverityNumber. -enum SeverityNumber { - // UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. - SEVERITY_NUMBER_UNSPECIFIED = 0; - SEVERITY_NUMBER_TRACE = 1; - SEVERITY_NUMBER_TRACE2 = 2; - SEVERITY_NUMBER_TRACE3 = 3; - SEVERITY_NUMBER_TRACE4 = 4; - SEVERITY_NUMBER_DEBUG = 5; - SEVERITY_NUMBER_DEBUG2 = 6; - SEVERITY_NUMBER_DEBUG3 = 7; - SEVERITY_NUMBER_DEBUG4 = 8; - SEVERITY_NUMBER_INFO = 9; - SEVERITY_NUMBER_INFO2 = 10; - SEVERITY_NUMBER_INFO3 = 11; - SEVERITY_NUMBER_INFO4 = 12; - SEVERITY_NUMBER_WARN = 13; - SEVERITY_NUMBER_WARN2 = 14; - SEVERITY_NUMBER_WARN3 = 15; - SEVERITY_NUMBER_WARN4 = 16; - SEVERITY_NUMBER_ERROR = 17; - SEVERITY_NUMBER_ERROR2 = 18; - SEVERITY_NUMBER_ERROR3 = 19; - SEVERITY_NUMBER_ERROR4 = 20; - SEVERITY_NUMBER_FATAL = 21; - SEVERITY_NUMBER_FATAL2 = 22; - SEVERITY_NUMBER_FATAL3 = 23; - SEVERITY_NUMBER_FATAL4 = 24; -} - -// LogRecordFlags is defined as a protobuf 'uint32' type and is to be used as -// bit-fields. Each non-zero value defined in this enum is a bit-mask. -// To extract the bit-field, for example, use an expression like: -// -// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) -// -enum LogRecordFlags { - // The zero value for the enum. Should not be used for comparisons. - // Instead use bitwise "and" with the appropriate mask as shown above. - LOG_RECORD_FLAGS_DO_NOT_USE = 0; - - // Bits 0-7 are used for trace flags. - LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; - - // Bits 8-31 are reserved for future use. -} - -// A log record according to OpenTelemetry Log Data Model: -// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md -message LogRecord { - reserved 4; - - // time_unix_nano is the time when the event occurred. - // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - // Value of 0 indicates unknown or missing timestamp. - fixed64 time_unix_nano = 1; - - // Time when the event was observed by the collection system. - // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) - // this timestamp is typically set at the generation time and is equal to Timestamp. - // For events originating externally and collected by OpenTelemetry (e.g. using - // Collector) this is the time when OpenTelemetry's code observed the event measured - // by the clock of the OpenTelemetry code. This field MUST be set once the event is - // observed by OpenTelemetry. - // - // For converting OpenTelemetry log data to formats that support only one timestamp or - // when receiving OpenTelemetry log data by recipients that support only one timestamp - // internally the following logic is recommended: - // - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. - // - // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - // Value of 0 indicates unknown or missing timestamp. - fixed64 observed_time_unix_nano = 11; - - // Numerical value of the severity, normalized to values described in Log Data Model. - // [Optional]. - SeverityNumber severity_number = 2; - - // The severity text (also known as log level). The original string representation as - // it is known at the source. [Optional]. - string severity_text = 3; - - // A value containing the body of the log record. Can be for example a human-readable - // string message (including multi-line) describing the event in a free form or it can - // be a structured data composed of arrays and maps of other values. [Optional]. - opentelemetry.proto.common.v1.AnyValue body = 5; - - // Additional attributes that describe the specific event occurrence. [Optional]. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated opentelemetry.proto.common.v1.KeyValue attributes = 6; - uint32 dropped_attributes_count = 7; - - // Flags, a bit field. 8 least significant bits are the trace flags as - // defined in W3C Trace Context specification. 24 most significant bits are reserved - // and must be set to 0. Readers must not assume that 24 most significant bits - // will be zero and must correctly mask the bits when reading 8-bit trace flag (use - // flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional]. - fixed32 flags = 8; - - // A unique identifier for a trace. All logs from the same trace share - // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR - // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON - // is zero-length and thus is also invalid). - // - // This field is optional. - // - // The receivers SHOULD assume that the log record is not associated with a - // trace if any of the following is true: - // - the field is not present, - // - the field contains an invalid value. - bytes trace_id = 9; - - // A unique identifier for a span within a trace, assigned when the span - // is created. The ID is an 8-byte array. An ID with all zeroes OR of length - // other than 8 bytes is considered invalid (empty string in OTLP/JSON - // is zero-length and thus is also invalid). - // - // This field is optional. If the sender specifies a valid span_id then it SHOULD also - // specify a valid trace_id. - // - // The receivers SHOULD assume that the log record is not associated with a - // span if any of the following is true: - // - the field is not present, - // - the field contains an invalid value. - bytes span_id = 10; -} diff --git a/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto b/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto deleted file mode 100644 index 6637560bc..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.resource.v1; - -import "opentelemetry/proto/common/v1/common.proto"; - -option csharp_namespace = "OpenTelemetry.Proto.Resource.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.resource.v1"; -option java_outer_classname = "ResourceProto"; -option go_package = "go.opentelemetry.io/proto/otlp/resource/v1"; - -// Resource information. -message Resource { - // Set of attributes that describe the resource. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; - - // dropped_attributes_count is the number of dropped attributes. If the value is 0, then - // no attributes were dropped. - uint32 dropped_attributes_count = 2; -} diff --git a/src/kafka.rs b/src/kafka.rs index f65b954c6..b917eca83 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; +use crate::event::format::LogSource; use crate::option::CONFIG; use crate::{ event::{ @@ -90,38 +91,6 @@ pub enum KafkaError { DoNotPrintError, } -// // Commented out functions -// // Might come in handy later -// fn parse_auto_env(key: &'static str) -> Result, ::Err> -// where -// T: FromStr, -// { -// Ok(if let Ok(val) = env::var(key) { -// Some(val.parse::()?) -// } else { -// None -// }) -// } - -// fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { -// if let Ok(raw_secs) = env::var(format!("{key}_S")) { -// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) -// } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { -// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) -// } else { -// Ok(None) -// } -// } - -// fn parse_i32_env(key: &'static str) -> Result, KafkaError> { -// parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) -// } - -// fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { -// handle_duration_env_prefix(key_prefix) -// .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) -// } - fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { if let Some(topics) = &CONFIG.parseable.kafka_topics { // topics can be a comma separated list of topics to subscribe to @@ -147,10 +116,6 @@ fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { conf.set("client.id", val); } - // if let Some(val) = get_flag_env_val("a")? { - // conf.set("api.version.request", val.to_string()); - // } - if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); } @@ -234,6 +199,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { static_schema_flag.as_ref(), time_partition.as_ref(), schema_version, + &LogSource::default(), ) .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; diff --git a/src/lib.rs b/src/lib.rs index 5c8e09274..951bb432a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ pub mod metrics; pub mod migration; mod oidc; pub mod option; +pub mod otel; mod query; pub mod rbac; mod response; @@ -43,7 +44,6 @@ pub mod sync; pub mod users; mod utils; mod validator; - pub use handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; diff --git a/src/handlers/http/otel/proto.rs b/src/otel.rs similarity index 64% rename from src/handlers/http/otel/proto.rs rename to src/otel.rs index 9322bfcc5..11f98d89e 100644 --- a/src/handlers/http/otel/proto.rs +++ b/src/otel.rs @@ -16,23 +16,7 @@ * */ -/// Common types used across all event types. -pub mod common { - pub mod v1 { - include!("opentelemetry.proto.common.v1.rs"); - } -} - -/// Generated types used for logs. -pub mod logs { - pub mod v1 { - include!("opentelemetry.proto.logs.v1.rs"); - } -} - -/// Generated types used in resources. -pub mod resource { - pub mod v1 { - include!("opentelemetry.proto.resource.v1.rs"); - } -} +pub mod logs; +pub mod metrics; +pub mod otel_utils; +pub mod traces; diff --git a/src/otel/logs.rs b/src/otel/logs.rs new file mode 100644 index 000000000..fcdffe1af --- /dev/null +++ b/src/otel/logs.rs @@ -0,0 +1,162 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use opentelemetry_proto::tonic::logs::v1::LogsData; +use opentelemetry_proto::tonic::logs::v1::ScopeLogs; +use opentelemetry_proto::tonic::logs::v1::SeverityNumber; +use serde_json::Value; +use std::collections::BTreeMap; + +use super::otel_utils::collect_json_from_values; +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; + +/// otel log event has severity number +/// there is a mapping of severity number to severity text provided in proto +/// this function fetches the severity text from the severity number +/// and adds it to the flattened json +fn flatten_severity(severity_number: i32) -> BTreeMap { + let mut severity_json: BTreeMap = BTreeMap::new(); + severity_json.insert( + "severity_number".to_string(), + Value::Number(severity_number.into()), + ); + let severity = SeverityNumber::try_from(severity_number).unwrap(); + severity_json.insert( + "severity_text".to_string(), + Value::String(severity.as_str_name().to_string()), + ); + severity_json +} + +/// this function flattens the `LogRecord` object +/// and returns a `BTreeMap` of the flattened json +/// this function is called recursively for each log record object in the otel logs +pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { + let mut log_record_json: BTreeMap = BTreeMap::new(); + log_record_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + log_record.time_unix_nano as i64, + )), + ); + log_record_json.insert( + "observed_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + log_record.observed_time_unix_nano as i64, + )), + ); + + log_record_json.extend(flatten_severity(log_record.severity_number)); + + if log_record.body.is_some() { + let body = &log_record.body; + let body_json = collect_json_from_values(body, &"body".to_string()); + for key in body_json.keys() { + log_record_json.insert(key.to_owned(), body_json[key].to_owned()); + } + } + insert_attributes(&mut log_record_json, &log_record.attributes); + log_record_json.insert( + "log_record_dropped_attributes_count".to_string(), + Value::Number(log_record.dropped_attributes_count.into()), + ); + + log_record_json.insert( + "flags".to_string(), + Value::Number((log_record.flags).into()), + ); + log_record_json.insert( + "span_id".to_string(), + Value::String(hex::encode(&log_record.span_id)), + ); + log_record_json.insert( + "trace_id".to_string(), + Value::String(hex::encode(&log_record.trace_id)), + ); + + log_record_json +} + +/// this function flattens the `ScopeLogs` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { + let mut vec_scope_log_json = Vec::new(); + let mut scope_log_json = BTreeMap::new(); + + if let Some(scope) = &scope_log.scope { + scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_log_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_log_json, &scope.attributes); + scope_log_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_log_json.insert( + "scope_log_schema_url".to_string(), + Value::String(scope_log.schema_url.clone()), + ); + + for log_record in &scope_log.log_records { + let log_record_json = flatten_log_record(log_record); + let mut combined_json = scope_log_json.clone(); + combined_json.extend(log_record_json); + vec_scope_log_json.push(combined_json); + } + + vec_scope_log_json +} + +/// this function performs the custom flattening of the otel logs +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_logs(message: &LogsData) -> Vec> { + let mut vec_otel_json = Vec::new(); + for record in &message.resource_logs { + let mut resource_log_json = BTreeMap::new(); + + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_log_json, &resource.attributes); + resource_log_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_logs_json = Vec::new(); + for scope_log in &record.scope_logs { + vec_resource_logs_json.extend(flatten_scope_log(scope_log)); + } + resource_log_json.insert( + "schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + + for resource_logs_json in &mut vec_resource_logs_json { + resource_logs_json.extend(resource_log_json.clone()); + } + + vec_otel_json.extend(vec_resource_logs_json); + } + + vec_otel_json +} diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs new file mode 100644 index 000000000..f5aa1c072 --- /dev/null +++ b/src/otel/metrics.rs @@ -0,0 +1,524 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::BTreeMap; + +use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; +use opentelemetry_proto::tonic::metrics::v1::{ + exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, + ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary, +}; +use serde_json::Value; + +use super::otel_utils::{ + convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, +}; + +/// otel metrics event has json array for exemplar +/// this function flatten the exemplar json array +/// and returns a `BTreeMap` of the exemplar json +/// this function is reused in all json objects that have exemplar +fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { + let mut exemplar_json = BTreeMap::new(); + for exemplar in exemplars { + insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); + exemplar_json.insert( + "exemplar_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + exemplar.time_unix_nano as i64, + )), + ); + exemplar_json.insert( + "exemplar_span_id".to_string(), + Value::String(hex::encode(&exemplar.span_id)), + ); + exemplar_json.insert( + "exemplar_trace_id".to_string(), + Value::String(hex::encode(&exemplar.trace_id)), + ); + if let Some(value) = &exemplar.value { + match value { + ExemplarValue::AsDouble(double_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), + ); + } + ExemplarValue::AsInt(int_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); + } + } + } + } + exemplar_json +} + +/// otel metrics event has json array for number data points +/// this function flatten the number data points json array +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is reused in all json objects that have number data points +fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { + data_points + .iter() + .map(|data_point| { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key, value); + } + data_point_json.extend(flatten_data_point_flags(data_point.flags)); + if let Some(value) = &data_point.value { + match value { + NumberDataPointValue::AsDouble(double_val) => { + data_point_json.insert( + "data_point_value".to_string(), + Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), + ); + } + NumberDataPointValue::AsInt(int_val) => { + data_point_json.insert( + "data_point_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); + } + } + } + data_point_json + }) + .collect() +} + +/// otel metrics event has json object for gauge +/// each gauge object has json array for data points +/// this function flatten the gauge json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_gauge(gauge: &Gauge) -> Vec> { + let mut vec_gauge_json = Vec::new(); + let data_points_json = flatten_number_data_points(&gauge.data_points); + for data_point_json in data_points_json { + let mut gauge_json = BTreeMap::new(); + for (key, value) in &data_point_json { + gauge_json.insert(key.clone(), value.clone()); + } + vec_gauge_json.push(gauge_json); + } + vec_gauge_json +} + +/// otel metrics event has json object for sum +/// each sum object has json array for data points +/// this function flatten the sum json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_sum(sum: &Sum) -> Vec> { + let mut vec_sum_json = Vec::new(); + let data_points_json = flatten_number_data_points(&sum.data_points); + for data_point_json in data_points_json { + let mut sum_json = BTreeMap::new(); + for (key, value) in &data_point_json { + sum_json.insert(key.clone(), value.clone()); + } + vec_sum_json.push(sum_json); + } + let mut sum_json = BTreeMap::new(); + sum_json.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); + sum_json.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); + for data_point_json in &mut vec_sum_json { + for (key, value) in &sum_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + vec_sum_json +} + +/// otel metrics event has json object for histogram +/// each histogram object has json array for data points +/// this function flatten the histogram json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_histogram(histogram: &Histogram) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &histogram.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + insert_number_if_some(&mut data_point_json, "data_point_sum", &data_point.sum); + data_point_json.insert( + "data_point_bucket_counts".to_string(), + Value::Array( + data_point + .bucket_counts + .iter() + .map(|&count| Value::Number(count.into())) + .collect(), + ), + ); + data_point_json.insert( + "data_point_explicit_bounds".to_string(), + Value::Array( + data_point + .explicit_bounds + .iter() + .map(|bound| Value::String(bound.to_string())) + .collect(), + ), + ); + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key.to_string(), value); + } + data_point_json.extend(flatten_data_point_flags(data_point.flags)); + insert_number_if_some(&mut data_point_json, "min", &data_point.min); + insert_number_if_some(&mut data_point_json, "max", &data_point.max); + data_points_json.push(data_point_json); + } + let mut histogram_json = BTreeMap::new(); + histogram_json.extend(flatten_aggregation_temporality( + histogram.aggregation_temporality, + )); + for data_point_json in &mut data_points_json { + for (key, value) in &histogram_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + data_points_json +} + +/// otel metrics event has json object for buckets +/// this function flatten the buckets json object +/// and returns a `BTreeMap` of the flattened json +fn flatten_buckets(bucket: &Buckets) -> BTreeMap { + let mut bucket_json = BTreeMap::new(); + bucket_json.insert("offset".to_string(), Value::Number(bucket.offset.into())); + bucket_json.insert( + "bucket_count".to_string(), + Value::Array( + bucket + .bucket_counts + .iter() + .map(|&count| Value::Number(count.into())) + .collect(), + ), + ); + bucket_json +} + +/// otel metrics event has json object for exponential histogram +/// each exponential histogram object has json array for data points +/// this function flatten the exponential histogram json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &exp_histogram.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + insert_number_if_some(&mut data_point_json, "data_point_sum", &data_point.sum); + data_point_json.insert( + "data_point_scale".to_string(), + Value::Number(data_point.scale.into()), + ); + data_point_json.insert( + "data_point_zero_count".to_string(), + Value::Number(data_point.zero_count.into()), + ); + if let Some(positive) = &data_point.positive { + let positive_json = flatten_buckets(positive); + for (key, value) in positive_json { + data_point_json.insert(format!("positive_{}", key), value); + } + } + if let Some(negative) = &data_point.negative { + let negative_json = flatten_buckets(negative); + for (key, value) in negative_json { + data_point_json.insert(format!("negative_{}", key), value); + } + } + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key, value); + } + data_points_json.push(data_point_json); + } + let mut exp_histogram_json = BTreeMap::new(); + exp_histogram_json.extend(flatten_aggregation_temporality( + exp_histogram.aggregation_temporality, + )); + for data_point_json in &mut data_points_json { + for (key, value) in &exp_histogram_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + data_points_json +} + +/// otel metrics event has json object for summary +/// each summary object has json array for data points +/// this function flatten the summary json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_summary(summary: &Summary) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &summary.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + data_point_json.insert( + "data_point_sum".to_string(), + Value::Number(serde_json::Number::from_f64(data_point.sum).unwrap()), + ); + data_point_json.insert( + "data_point_quantile_values".to_string(), + Value::Array( + data_point + .quantile_values + .iter() + .map(|quantile_value| { + Value::Object( + vec![ + ( + "quantile", + Value::Number( + serde_json::Number::from_f64(quantile_value.quantile) + .unwrap(), + ), + ), + ( + "value", + Value::Number( + serde_json::Number::from_f64(quantile_value.value).unwrap(), + ), + ), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + ) + }) + .collect(), + ), + ); + data_points_json.push(data_point_json); + } + data_points_json +} + +/// this function flattens the `Metric` object +/// each metric object has json object for gauge, sum, histogram, exponential histogram, summary +/// this function flatten the metric json object +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is called recursively for each metric record object in the otel metrics event +pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { + let mut data_points_json = Vec::new(); + let mut metric_json = BTreeMap::new(); + + match &metrics_record.data { + Some(metric::Data::Gauge(gauge)) => { + data_points_json.extend(flatten_gauge(gauge)); + } + Some(metric::Data::Sum(sum)) => { + data_points_json.extend(flatten_sum(sum)); + } + Some(metric::Data::Histogram(histogram)) => { + data_points_json.extend(flatten_histogram(histogram)); + } + Some(metric::Data::ExponentialHistogram(exp_histogram)) => { + data_points_json.extend(flatten_exp_histogram(exp_histogram)); + } + Some(metric::Data::Summary(summary)) => { + data_points_json.extend(flatten_summary(summary)); + } + None => {} + } + metric_json.insert( + "metric_name".to_string(), + Value::String(metrics_record.name.clone()), + ); + metric_json.insert( + "metric_description".to_string(), + Value::String(metrics_record.description.clone()), + ); + metric_json.insert( + "metric_unit".to_string(), + Value::String(metrics_record.unit.clone()), + ); + insert_attributes(&mut metric_json, &metrics_record.metadata); + for data_point_json in &mut data_points_json { + for (key, value) in &metric_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + if data_points_json.is_empty() { + data_points_json.push(metric_json); + } + data_points_json +} + +/// this function performs the custom flattening of the otel metrics +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec> { + let mut vec_otel_json = Vec::new(); + for record in &message.resource_metrics { + let mut resource_metrics_json = BTreeMap::new(); + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_metrics_json, &resource.attributes); + resource_metrics_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + let mut vec_scope_metrics_json = Vec::new(); + for scope_metric in &record.scope_metrics { + let mut scope_metrics_json = BTreeMap::new(); + for metrics_record in &scope_metric.metrics { + vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); + } + if let Some(scope) = &scope_metric.scope { + scope_metrics_json + .insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_metrics_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_metrics_json, &scope.attributes); + scope_metrics_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_metrics_json.insert( + "scope_metrics_schema_url".to_string(), + Value::String(scope_metric.schema_url.clone()), + ); + + for scope_metric_json in &mut vec_scope_metrics_json { + for (key, value) in &scope_metrics_json { + scope_metric_json.insert(key.clone(), value.clone()); + } + } + } + resource_metrics_json.insert( + "resource_metrics_schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + for resource_metric_json in &mut vec_scope_metrics_json { + for (key, value) in &resource_metrics_json { + resource_metric_json.insert(key.clone(), value.clone()); + } + } + vec_otel_json.extend(vec_scope_metrics_json); + } + vec_otel_json +} + +/// otel metrics event has json object for aggregation temporality +/// there is a mapping of aggregation temporality to its description provided in proto +/// this function fetches the description from the aggregation temporality +/// and adds it to the flattened json +fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap { + let mut aggregation_temporality_json = BTreeMap::new(); + aggregation_temporality_json.insert( + "aggregation_temporality".to_string(), + Value::Number(aggregation_temporality.into()), + ); + let description = match aggregation_temporality { + 0 => "AGGREGATION_TEMPORALITY_UNSPECIFIED", + 1 => "AGGREGATION_TEMPORALITY_DELTA", + 2 => "AGGREGATION_TEMPORALITY_CUMULATIVE", + _ => "", + }; + aggregation_temporality_json.insert( + "aggregation_temporality_description".to_string(), + Value::String(description.to_string()), + ); + + aggregation_temporality_json +} + +fn flatten_data_point_flags(flags: u32) -> BTreeMap { + let mut data_point_flags_json = BTreeMap::new(); + data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into())); + let description = match flags { + 0 => "DATA_POINT_FLAGS_DO_NOT_USE", + 1 => "DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK", + _ => "", + }; + data_point_flags_json.insert( + "data_point_flags_description".to_string(), + Value::String(description.to_string()), + ); + data_point_flags_json +} diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs new file mode 100644 index 000000000..3ac051771 --- /dev/null +++ b/src/otel/otel_utils.rs @@ -0,0 +1,162 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::DateTime; +use opentelemetry_proto::tonic::common::v1::{any_value::Value as OtelValue, AnyValue, KeyValue}; +use serde_json::Value; +use std::collections::BTreeMap; +// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte +pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap { + let mut value_json: BTreeMap = BTreeMap::new(); + match value { + OtelValue::StringValue(str_val) => { + value_json.insert(key.to_string(), Value::String(str_val)); + } + OtelValue::BoolValue(bool_val) => { + value_json.insert(key.to_string(), Value::Bool(bool_val)); + } + OtelValue::IntValue(int_val) => { + value_json.insert(key.to_string(), Value::Number(int_val.into())); + } + OtelValue::DoubleValue(double_val) => { + if let Some(number) = serde_json::Number::from_f64(double_val) { + value_json.insert(key.to_string(), Value::Number(number)); + } + } + OtelValue::ArrayValue(array_val) => { + let values = &array_val.values; + for value in values { + let array_value_json = collect_json_from_anyvalue(key, value.clone()); + for key in array_value_json.keys() { + value_json.insert( + format!( + "{}_{}", + key.to_owned(), + value_to_string(array_value_json[key].to_owned()) + ), + array_value_json[key].to_owned(), + ); + } + } + } + OtelValue::KvlistValue(kv_list_val) => { + for key_value in kv_list_val.values { + let value = key_value.value; + if value.is_some() { + let value = value.unwrap(); + let key_value_json = collect_json_from_anyvalue(key, value.clone()); + + for key in key_value_json.keys() { + value_json.insert( + format!( + "{}_{}_{}", + key.to_owned(), + key_value.key, + value_to_string(key_value_json[key].to_owned()) + ), + key_value_json[key].to_owned(), + ); + } + } + } + } + OtelValue::BytesValue(bytes_val) => { + value_json.insert( + key.to_string(), + Value::String(String::from_utf8_lossy(&bytes_val).to_string()), + ); + } + } + + value_json +} + +pub fn collect_json_from_anyvalue(key: &String, value: AnyValue) -> BTreeMap { + collect_json_from_value(key, value.value.unwrap()) +} + +//traverse through Value by calling function ollect_json_from_any_value +pub fn collect_json_from_values( + values: &Option, + key: &String, +) -> BTreeMap { + let mut value_json: BTreeMap = BTreeMap::new(); + + for value in values.iter() { + value_json = collect_json_from_anyvalue(key, value.clone()); + } + + value_json +} + +pub fn value_to_string(value: serde_json::Value) -> String { + match value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + } +} + +pub fn flatten_attributes(attributes: &Vec) -> BTreeMap { + let mut attributes_json: BTreeMap = BTreeMap::new(); + for attribute in attributes { + let key = &attribute.key; + let value = &attribute.value; + let value_json = collect_json_from_values(value, &key.to_string()); + for key in value_json.keys() { + attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } + } + attributes_json +} + +pub fn insert_if_some( + map: &mut BTreeMap, + key: &str, + option: &Option, +) { + if let Some(value) = option { + map.insert(key.to_string(), Value::String(value.to_string())); + } +} + +pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, option: &Option) { + if let Some(value) = option { + if let Some(number) = serde_json::Number::from_f64(*value) { + map.insert(key.to_string(), Value::Number(number)); + } + } +} + +pub fn insert_bool_if_some(map: &mut BTreeMap, key: &str, option: &Option) { + if let Some(value) = option { + map.insert(key.to_string(), Value::Bool(*value)); + } +} + +pub fn insert_attributes(map: &mut BTreeMap, attributes: &Vec) { + let attributes_json = flatten_attributes(attributes); + for (key, value) in attributes_json { + map.insert(key, value); + } +} + +pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { + let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc(); + dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() +} diff --git a/src/otel/traces.rs b/src/otel/traces.rs new file mode 100644 index 000000000..8ba137b33 --- /dev/null +++ b/src/otel/traces.rs @@ -0,0 +1,304 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_proto::tonic::trace::v1::span::Event; +use opentelemetry_proto::tonic::trace::v1::span::Link; +use opentelemetry_proto::tonic::trace::v1::ScopeSpans; +use opentelemetry_proto::tonic::trace::v1::Span; +use opentelemetry_proto::tonic::trace::v1::Status; +use opentelemetry_proto::tonic::trace::v1::TracesData; +use serde_json::Value; +use std::collections::BTreeMap; + +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; + +/// this function flattens the `ScopeSpans` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { + let mut vec_scope_span_json = Vec::new(); + let mut scope_span_json = BTreeMap::new(); + + for span in &scope_span.spans { + let span_record_json = flatten_span_record(span); + vec_scope_span_json.extend(span_record_json); + } + + if let Some(scope) = &scope_span.scope { + scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_span_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_span_json, &scope.attributes); + scope_span_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + + for span_json in &mut vec_scope_span_json { + for (key, value) in &scope_span_json { + span_json.insert(key.clone(), value.clone()); + } + } + } + + for span_json in &mut vec_scope_span_json { + span_json.insert( + "schema_url".to_string(), + Value::String(scope_span.schema_url.clone()), + ); + } + + vec_scope_span_json +} + +/// this function performs the custom flattening of the otel traces event +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec> { + let mut vec_otel_json = Vec::new(); + + for record in &message.resource_spans { + let mut resource_span_json = BTreeMap::new(); + + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_span_json, &resource.attributes); + resource_span_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_spans_json = Vec::new(); + for scope_span in &record.scope_spans { + let scope_span_json = flatten_scope_span(scope_span); + vec_resource_spans_json.extend(scope_span_json); + } + + resource_span_json.insert( + "schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + + for resource_spans_json in &mut vec_resource_spans_json { + for (key, value) in &resource_span_json { + resource_spans_json.insert(key.clone(), value.clone()); + } + } + + vec_otel_json.extend(vec_resource_spans_json); + } + + vec_otel_json +} + +/// otel traces has json array of events +/// this function flattens the `Event` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_events(events: &[Event]) -> Vec> { + events + .iter() + .map(|event| { + let mut event_json = BTreeMap::new(); + event_json.insert( + "event_time_unix_nano".to_string(), + Value::String( + convert_epoch_nano_to_timestamp(event.time_unix_nano as i64).to_string(), + ), + ); + event_json.insert("event_name".to_string(), Value::String(event.name.clone())); + insert_attributes(&mut event_json, &event.attributes); + event_json.insert( + "event_dropped_attributes_count".to_string(), + Value::Number(event.dropped_attributes_count.into()), + ); + event_json + }) + .collect() +} + +/// otel traces has json array of links +/// this function flattens the `Link` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_links(links: &[Link]) -> Vec> { + links + .iter() + .map(|link| { + let mut link_json = BTreeMap::new(); + link_json.insert( + "link_span_id".to_string(), + Value::String(hex::encode(&link.span_id)), + ); + link_json.insert( + "link_trace_id".to_string(), + Value::String(hex::encode(&link.trace_id)), + ); + + insert_attributes(&mut link_json, &link.attributes); + link_json.insert( + "link_dropped_attributes_count".to_string(), + Value::Number(link.dropped_attributes_count.into()), + ); + link_json + }) + .collect() +} + +/// otel trace event has status +/// there is a mapping of status code to status description provided in proto +/// this function fetches the status description from the status code +/// and adds it to the flattened json +fn flatten_status(status: &Status) -> BTreeMap { + let mut status_json = BTreeMap::new(); + status_json.insert( + "span_status_message".to_string(), + Value::String(status.message.clone()), + ); + status_json.insert( + "span_status_code".to_string(), + Value::Number(status.code.into()), + ); + let description = match status.code { + 0 => "STATUS_CODE_UNSET", + 1 => "STATUS_CODE_OK", + 2 => "STATUS_CODE_ERROR", + _ => "", + }; + status_json.insert( + "span_status_description".to_string(), + Value::String(description.to_string()), + ); + + status_json +} + +/// otel log event has flags +/// there is a mapping of flags to flags description provided in proto +/// this function fetches the flags description from the flags +/// and adds it to the flattened json +fn flatten_flags(flags: u32) -> BTreeMap { + let mut flags_json = BTreeMap::new(); + flags_json.insert("span_flags".to_string(), Value::Number(flags.into())); + let description = match flags { + 0 => "SPAN_FLAGS_DO_NOT_USE", + 255 => "SPAN_FLAGS_TRACE_FLAGS_MASK", + 256 => "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK", + 512 => "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK", + _ => "", + }; + flags_json.insert( + "span_flags_description".to_string(), + Value::String(description.to_string()), + ); + + flags_json +} + +/// otel span event has kind +/// there is a mapping of kind to kind description provided in proto +/// this function fetches the kind description from the kind +/// and adds it to the flattened json +fn flatten_kind(kind: i32) -> BTreeMap { + let mut kind_json = BTreeMap::new(); + kind_json.insert("span_kind".to_string(), Value::Number(kind.into())); + let description = match kind { + 0 => "SPAN_KIND_UNSPECIFIED", + 1 => "SPAN_KIND_INTERNAL", + 2 => "SPAN_KIND_SERVER", + 3 => "SPAN_KIND_CLIENT", + 4 => "SPAN_KIND_PRODUCER", + 5 => "SPAN_KIND_CONSUMER", + _ => "", + }; + kind_json.insert( + "span_kind_description".to_string(), + Value::String(description.to_string()), + ); + + kind_json +} + +/// this function flattens the `Span` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is called recursively for each span record object in the otel traces event +fn flatten_span_record(span_record: &Span) -> Vec> { + let mut span_records_json = Vec::new(); + + let mut span_record_json = BTreeMap::new(); + span_record_json.insert( + "span_trace_id".to_string(), + Value::String(hex::encode(&span_record.trace_id)), + ); + span_record_json.insert( + "span_span_id".to_string(), + Value::String(hex::encode(&span_record.span_id)), + ); + span_record_json.insert( + "span_trace_state".to_string(), + Value::String(span_record.trace_state.clone()), + ); + span_record_json.insert( + "span_parent_span_id".to_string(), + Value::String(hex::encode(&span_record.parent_span_id)), + ); + span_record_json.extend(flatten_flags(span_record.flags)); + span_record_json.insert( + "span_name".to_string(), + Value::String(span_record.name.clone()), + ); + span_record_json.extend(flatten_kind(span_record.kind)); + span_record_json.insert( + "span_start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + span_record.start_time_unix_nano as i64, + )), + ); + span_record_json.insert( + "span_end_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + span_record.end_time_unix_nano as i64, + )), + ); + insert_attributes(&mut span_record_json, &span_record.attributes); + span_record_json.insert( + "span_dropped_attributes_count".to_string(), + Value::Number(span_record.dropped_attributes_count.into()), + ); + span_records_json.extend(flatten_events(&span_record.events)); + span_record_json.insert( + "span_dropped_events_count".to_string(), + Value::Number(span_record.dropped_events_count.into()), + ); + span_records_json.extend(flatten_links(&span_record.links)); + span_record_json.insert( + "span_dropped_links_count".to_string(), + Value::Number(span_record.dropped_links_count.into()), + ); + + if let Some(status) = &span_record.status { + span_record_json.extend(flatten_status(status)); + } + + for span_json in &mut span_records_json { + for (key, value) in &span_record_json { + span_json.insert(key.clone(), value.clone()); + } + } + + span_records_json +} diff --git a/src/utils/header_parsing.rs b/src/utils/header_parsing.rs index 8d4feab1e..89caf4d9e 100644 --- a/src/utils/header_parsing.rs +++ b/src/utils/header_parsing.rs @@ -68,6 +68,8 @@ pub enum ParseHeaderError { SeperatorInValue(char), #[error("Stream name not found in header [x-p-stream]")] MissingStreamName, + #[error("Log source not found in header [x-p-log-source]")] + MissingLogSource, } impl ResponseError for ParseHeaderError { diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 0d3ac1e79..4c794afba 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -22,6 +22,7 @@ use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels}; use serde_json; use serde_json::Value; +use crate::event::format::LogSource; use crate::metadata::SchemaVersion; pub mod flatten; @@ -36,15 +37,20 @@ pub fn flatten_json_body( custom_partition: Option<&String>, schema_version: SchemaVersion, validation_required: bool, + log_source: &LogSource, ) -> Result { // Flatten the json body only if new schema and has less than 4 levels of nesting - let mut nested_value = - if schema_version == SchemaVersion::V0 || has_more_than_four_levels(&body, 1) { - body - } else { - let flattened_json = generic_flattening(&body)?; - convert_to_array(flattened_json)? - }; + let mut nested_value = if schema_version == SchemaVersion::V1 + && !has_more_than_four_levels(&body, 1) + && matches!( + log_source, + LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis + ) { + let flattened_json = generic_flattening(&body)?; + convert_to_array(flattened_json)? + } else { + body + }; flatten::flatten( &mut nested_value, "_", @@ -62,6 +68,7 @@ pub fn convert_array_to_object( time_partition_limit: Option, custom_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result, anyhow::Error> { let data = flatten_json_body( body, @@ -70,6 +77,7 @@ pub fn convert_array_to_object( custom_partition, schema_version, true, + log_source, )?; let value_arr = match data { Value::Array(arr) => arr, @@ -101,6 +109,8 @@ pub fn convert_to_string(value: &Value) -> Value { #[cfg(test)] mod tests { + use crate::event::format::LogSource; + use super::flatten_json_body; use serde_json::json; @@ -115,7 +125,8 @@ mod tests { None, None, crate::metadata::SchemaVersion::V1, - false + false, + &LogSource::default() ) .unwrap(), expected @@ -133,7 +144,8 @@ mod tests { None, None, crate::metadata::SchemaVersion::V1, - false + false, + &LogSource::default() ) .unwrap(), expected