Skip to content

Commit

Permalink
feat: move pipelines to the first-class endpoint (#5480)
Browse files Browse the repository at this point in the history
* feat: move pipelines to the first-class endpoint

Signed-off-by: Ruihang Xia <[email protected]>

* change endpoints

Signed-off-by: Ruihang Xia <[email protected]>

* prefix path with /

Signed-off-by: Ruihang Xia <[email protected]>

* update integration result

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Feb 8, 2025
1 parent 059cb6f commit 0de0fd8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
31 changes: 29 additions & 2 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,16 @@ impl HttpServerBuilder {
log_validator: validator,
ingest_interceptor,
};

let router = self.router.nest(
&format!("/{HTTP_API_VERSION}"),
HttpServer::route_pipelines(log_state.clone()),
);
// deprecated since v0.11.0. Use `/logs` and `/pipelines` instead.
let router = router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(log_state.clone()),
#[allow(deprecated)]
HttpServer::route_log_deprecated(log_state.clone()),
);

let router = router.nest(
Expand Down Expand Up @@ -893,7 +900,8 @@ impl HttpServer {
.with_state(log_state)
}

fn route_log<S>(log_state: LogState) -> Router<S> {
#[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/logs", routing::post(event::log_ingester))
.route(
Expand All @@ -912,6 +920,25 @@ impl HttpServer {
.with_state(log_state)
}

fn route_pipelines<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/ingest", routing::post(event::log_ingester))
.route(
"/pipelines/{pipeline_name}",
routing::post(event::add_pipeline),
)
.route(
"/pipelines/{pipeline_name}",
routing::delete(event::delete_pipeline),
)
.route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
.layer(
ServiceBuilder::new()
.layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
)
.with_state(log_state)
}

fn route_sql<S>(api_state: ApiState) -> Router<S> {
Router::new()
.route("/sql", routing::get(handler::sql).post(handler::sql))
Expand Down
28 changes: 14 additions & 14 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ transform:

// 1. create pipeline
let res = client
.post("/v1/events/pipelines/greptime_guagua")
.post("/v1/pipelines/greptime_guagua")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
Expand All @@ -1243,7 +1243,7 @@ transform:
);

let res = client
.post("/v1/events/pipelines/test")
.post("/v1/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
Expand Down Expand Up @@ -1287,7 +1287,7 @@ transform:
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=logs1&pipeline_name=test")
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
Expand All @@ -1298,7 +1298,7 @@ transform:

// 3. remove pipeline
let res = client
.delete(format!("/v1/events/pipelines/test?version={}", encoded).as_str())
.delete(format!("/v1/pipelines/test?version={}", encoded).as_str())
.send()
.await;

Expand All @@ -1316,7 +1316,7 @@ transform:

// 4. write data failed
let res = client
.post("/v1/events/logs?db=public&table=logs1&pipeline_name=test")
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
Expand All @@ -1337,7 +1337,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) {
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"}
{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#;
let res = client
.post("/v1/events/logs?db=public&table=logs&pipeline_name=greptime_identity")
.post("/v1/ingest?db=public&table=logs&pipeline_name=greptime_identity")
.header("Content-Type", "application/json")
.body(body)
.send()
Expand Down Expand Up @@ -1403,7 +1403,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
HeaderValue::from_static("flatten_json_object=true"),
),
],
"/v1/events/logs?table=logs&pipeline_name=greptime_identity",
"/v1/ingest?table=logs&pipeline_name=greptime_identity",
body.as_bytes().to_vec(),
false,
)
Expand Down Expand Up @@ -1464,7 +1464,7 @@ transform:

// 1. create pipeline
let res = client
.post("/v1/events/pipelines/test")
.post("/v1/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(pipeline_content)
.send()
Expand Down Expand Up @@ -1580,7 +1580,7 @@ transform:
]
"#;
let res = client
.post("/v1/events/pipelines/dryrun?pipeline_name=test")
.post("/v1/pipelines/_dryrun?pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
Expand Down Expand Up @@ -1610,7 +1610,7 @@ transform:
}
"#;
let res = client
.post("/v1/events/pipelines/dryrun")
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body)
.send()
Expand Down Expand Up @@ -1638,7 +1638,7 @@ transform:
});
body["pipeline"] = json!(pipeline_content);
let res = client
.post("/v1/events/pipelines/dryrun")
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body.to_string())
.send()
Expand Down Expand Up @@ -1666,7 +1666,7 @@ transform:
]
});
let res = client
.post("/v1/events/pipelines/dryrun")
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body.to_string())
.send()
Expand Down Expand Up @@ -1707,7 +1707,7 @@ transform:

// 1. create pipeline
let res = client
.post("/v1/events/pipelines/test")
.post("/v1/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
Expand Down Expand Up @@ -1742,7 +1742,7 @@ transform:
2024-05-25 20:16:37.218 hello world
"#;
let res = client
.post("/v1/events/logs?db=public&table=logs1&pipeline_name=test")
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "text/plain")
.body(data_body)
.send()
Expand Down

0 comments on commit 0de0fd8

Please sign in to comment.