Skip to content

Commit

Permalink
feat: pipeline dispatcher part 2: execution (#5409)
Browse files Browse the repository at this point in the history
* fmt: correct format

* test: add negative tests

* feat: Add pipeline dispatching and execution output handling

* refactor: Enhance ingest function to correctly process original data values

custom table names during pipeline execution while optimizing the management of
transformed rows and multiple dispatched pipelines

* refactor: call greptime_identity with intermediate values

* fix: typo

* test: port tests to refactored apis

* refactor: adapt dryrun api call

* refactor: move pipeline execution code to a separated module

* refactor: update otlp pipeline execution path

* fmt: format imports

* fix: compilation

* fix: resolve residual issues

* refactor: address review comments

* chore: use btreemap as pipeline intermediate status trait modify

* refactor: update dispatcher to accept BTreeMap

* refactor: update identity pipeline

* refactor: use new input for pipeline

* chore: wip

* refactor: use updated prepare api

* refactor: improve error and header name

* feat: port flatten to new api

* chore: update pipeline api

* chore: fix transform and some pipeline test

* refactor: reimplement cmcd

* refactor: update csv processor

* fmt: update format

* chore: fix regex and dissect processor

* chore: fix test

* test: add integration test for http pipeline

* refactor: improve regex pipeline

* refactor: improve required field check

* refactor: rename table_part to table_suffix

* fix: resolve merge issue

---------

Co-authored-by: paomian <[email protected]>
  • Loading branch information
sunng87 and paomian authored Feb 8, 2025
1 parent 0de0fd8 commit 480b05c
Show file tree
Hide file tree
Showing 42 changed files with 2,236 additions and 2,936 deletions.
612 changes: 435 additions & 177 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;

Expand Down Expand Up @@ -112,8 +112,10 @@ impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn logs(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportLogsServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
Expand All @@ -128,7 +130,15 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
&ctx,
pipeline_handler,
)
.await?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
Expand Down
11 changes: 6 additions & 5 deletions src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result};
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result};
use serde_json::{Deserializer, Value};

fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());

for v in input_values {
pipeline.prepare(v, &mut payload)?;
let r = pipeline.exec_mut(&mut payload)?;
let mut payload = json_to_intermediate_state(v).unwrap();
let r = pipeline
.exec_mut(&mut payload)?
.into_transformed()
.expect("expect transformed result ");
result.push(r);
pipeline.reset_intermediate_state(&mut payload);
}

Ok(result)
Expand Down
43 changes: 32 additions & 11 deletions src/pipeline/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use common_telemetry::debug;
use snafu::OptionExt;
use yaml_rust::Yaml;

use crate::etl::error::{Error, Result};
use crate::etl_error::{
FieldRequiredForDispatcherSnafu, TablePartRequiredForDispatcherRuleSnafu,
use crate::etl::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};
use crate::Value;

const FIELD: &str = "field";
const TABLE_PARTIAL: &str = "table_part";
const TABLE_SUFFIX: &str = "table_suffix";
const PIPELINE: &str = "pipeline";
const VALUE: &str = "value";
const RULES: &str = "rules";
Expand All @@ -39,10 +41,10 @@ const RULES: &str = "rules";
/// rules:
/// - value: http
/// pipeline: http_pipeline
/// table_part: http_log
/// table_suffix: http_log
/// - value: db
/// pipeline: db_pipeline
/// table_part: db_log
/// table_suffix: db_log
/// ```
///
/// If none of the rules match the value, this pipeline will continue to process
Expand All @@ -58,12 +60,12 @@ pub(crate) struct Dispatcher {
/// - `value`: for pattern matching
/// - `pipeline`: the pipeline to call, if it's unspecified, we use default
/// `greptime_identity`
/// - `table_part`: the table name segment that we use to construct full table
/// - `table_suffix`: the table name segment that we use to construct full table
/// name
#[derive(Debug, PartialEq)]
pub(crate) struct Rule {
pub value: Value,
pub table_part: String,
pub table_suffix: String,
pub pipeline: Option<String>,
}

Expand All @@ -80,10 +82,11 @@ impl TryFrom<&Yaml> for Dispatcher {
rules
.iter()
.map(|rule| {
let table_part = rule[TABLE_PARTIAL]
let table_part = rule[TABLE_SUFFIX]
.as_str()
.map(|s| s.to_string())
.context(TablePartRequiredForDispatcherRuleSnafu)?;
.context(TableSuffixRequiredForDispatcherRuleSnafu)?;

let pipeline = rule[PIPELINE].as_str().map(|s| s.to_string());

if rule[VALUE].is_badvalue() {
Expand All @@ -93,7 +96,7 @@ impl TryFrom<&Yaml> for Dispatcher {

Ok(Rule {
value,
table_part,
table_suffix: table_part,
pipeline,
})
})
Expand All @@ -105,3 +108,21 @@ impl TryFrom<&Yaml> for Dispatcher {
Ok(Dispatcher { field, rules })
}
}

impl Dispatcher {
/// execute dispatcher and returns matched rule if any
pub(crate) fn exec(&self, data: &BTreeMap<String, Value>) -> Option<&Rule> {
if let Some(value) = data.get(&self.field) {
for rule in &self.rules {
if rule.value == *value {
return Some(rule);
}
}

None
} else {
debug!("field {} not found in keys {:?}", &self.field, data.keys());
None
}
}
}
Loading

0 comments on commit 480b05c

Please sign in to comment.