From 0de0fd80b018d2f4f1005214babb853be6e3db2a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 8 Feb 2025 11:46:31 +0800 Subject: [PATCH] feat: move pipelines to the first-class endpoint (#5480) * feat: move pipelines to the first-class endpoint Signed-off-by: Ruihang Xia * change endpoints Signed-off-by: Ruihang Xia * prefix path with / Signed-off-by: Ruihang Xia * update integration result Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/servers/src/http.rs | 31 +++++++++++++++++++++++++++++-- tests-integration/tests/http.rs | 28 ++++++++++++++-------------- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ea522efdc2a8..67e3da18173b 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -605,9 +605,16 @@ impl HttpServerBuilder { log_validator: validator, ingest_interceptor, }; + let router = self.router.nest( + &format!("/{HTTP_API_VERSION}"), + HttpServer::route_pipelines(log_state.clone()), + ); + // deprecated since v0.11.0. Use `/logs` and `/pipelines` instead. + let router = router.nest( &format!("/{HTTP_API_VERSION}/events"), - HttpServer::route_log(log_state.clone()), + #[allow(deprecated)] + HttpServer::route_log_deprecated(log_state.clone()), ); let router = router.nest( @@ -893,7 +900,8 @@ impl HttpServer { .with_state(log_state) } - fn route_log(log_state: LogState) -> Router { + #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")] + fn route_log_deprecated(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) .route( @@ -912,6 +920,25 @@ impl HttpServer { .with_state(log_state) } + fn route_pipelines(log_state: LogState) -> Router { + Router::new() + .route("/ingest", routing::post(event::log_ingester)) + .route( + "/pipelines/{pipeline_name}", + routing::post(event::add_pipeline), + ) + .route( + "/pipelines/{pipeline_name}", + routing::delete(event::delete_pipeline), + ) + .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun)) + .layer( + ServiceBuilder::new() + .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)), + ) + .with_state(log_state) + } + fn route_sql(api_state: ApiState) -> Router { Router::new() .route("/sql", routing::get(handler::sql).post(handler::sql)) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f60fa3a35af9..14fd45089e3e 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1228,7 +1228,7 @@ transform: // 1. create pipeline let res = client - .post("/v1/events/pipelines/greptime_guagua") + .post("/v1/pipelines/greptime_guagua") .header("Content-Type", "application/x-yaml") .body(body) .send() @@ -1243,7 +1243,7 @@ transform: ); let res = client - .post("/v1/events/pipelines/test") + .post("/v1/pipelines/test") .header("Content-Type", "application/x-yaml") .body(body) .send() @@ -1287,7 +1287,7 @@ transform: ] "#; let res = client - .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "application/json") .body(data_body) .send() @@ -1298,7 +1298,7 @@ transform: // 3. remove pipeline let res = client - .delete(format!("/v1/events/pipelines/test?version={}", encoded).as_str()) + .delete(format!("/v1/pipelines/test?version={}", encoded).as_str()) .send() .await; @@ -1316,7 +1316,7 @@ transform: // 4. write data failed let res = client - .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "application/json") .body(data_body) .send() @@ -1337,7 +1337,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) { let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"} {"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#; let res = client - .post("/v1/events/logs?db=public&table=logs&pipeline_name=greptime_identity") + .post("/v1/ingest?db=public&table=logs&pipeline_name=greptime_identity") .header("Content-Type", "application/json") .body(body) .send() @@ -1403,7 +1403,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { HeaderValue::from_static("flatten_json_object=true"), ), ], - "/v1/events/logs?table=logs&pipeline_name=greptime_identity", + "/v1/ingest?table=logs&pipeline_name=greptime_identity", body.as_bytes().to_vec(), false, ) @@ -1464,7 +1464,7 @@ transform: // 1. create pipeline let res = client - .post("/v1/events/pipelines/test") + .post("/v1/pipelines/test") .header("Content-Type", "application/x-yaml") .body(pipeline_content) .send() @@ -1580,7 +1580,7 @@ transform: ] "#; let res = client - .post("/v1/events/pipelines/dryrun?pipeline_name=test") + .post("/v1/pipelines/_dryrun?pipeline_name=test") .header("Content-Type", "application/json") .body(data_body) .send() @@ -1610,7 +1610,7 @@ transform: } "#; let res = client - .post("/v1/events/pipelines/dryrun") + .post("/v1/pipelines/_dryrun") .header("Content-Type", "application/json") .body(body) .send() @@ -1638,7 +1638,7 @@ transform: }); body["pipeline"] = json!(pipeline_content); let res = client - .post("/v1/events/pipelines/dryrun") + .post("/v1/pipelines/_dryrun") .header("Content-Type", "application/json") .body(body.to_string()) .send() @@ -1666,7 +1666,7 @@ transform: ] }); let res = client - .post("/v1/events/pipelines/dryrun") + .post("/v1/pipelines/_dryrun") .header("Content-Type", "application/json") .body(body.to_string()) .send() @@ -1707,7 +1707,7 @@ transform: // 1. create pipeline let res = client - .post("/v1/events/pipelines/test") + .post("/v1/pipelines/test") .header("Content-Type", "application/x-yaml") .body(body) .send() @@ -1742,7 +1742,7 @@ transform: 2024-05-25 20:16:37.218 hello world "#; let res = client - .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "text/plain") .body(data_body) .send()