Skip to content

Commit

Permalink
source-http-ingest: support path parameters
Browse files Browse the repository at this point in the history
Introduces support for capturing path parameters from incoming requests.  The
paths in the endpoint config are now allowed to specify parameters using the
OpenAPI syntax.  For example, the path
`/vendors/{vendorId}/products/{productId}` can now be used to extract the
`vendorId` and `productId`, which will be added to a `/_meta/pathParams` JSON
object. The OpenAPI syntax was chosen because it's well-known and documented,
and because it's simple and limited. This makes it easy to parse and less
likely to result in surprising behavior. The syntax is documented at:
https://swagger.io/docs/specification/v3_0/paths-and-operations/#path-templating

The `/_meta/reqPath` property was also added in order to record the configured
path at which the request was handled.
  • Loading branch information
psFried committed Dec 12, 2024
1 parent f5d43b2 commit 68e82ac
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 53 deletions.
1 change: 1 addition & 0 deletions source-http-ingest/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions source-http-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-trait = "0.1"
axum = "0.7"
futures = "0.3"
http = "1.0"
itertools = "0.13"
schemars = "0.8"
serde = "1.0"
serde_json = { version = "1.0", features = ["raw_value", "float_roundtrip"] }
Expand Down
64 changes: 60 additions & 4 deletions source-http-ingest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod server;
pub mod transactor;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use anyhow::Context;

Expand Down Expand Up @@ -37,8 +37,15 @@ pub struct EndpointConfig {

/// List of URL paths to accept requests at.
///
/// Discovery will return a separate collection for each given path. Paths must be provided
/// without any percent encoding, and should not include any query parameters or fragment.
/// Discovery will return a separate collection for each given path. Paths
/// must be provided without any percent encoding, and should not include
/// any query parameters or fragment.
///
/// Paths can include path parameters, following the syntax of OpenAPI path
/// templating. For example, `/vendors/{vendorId}/products/{productId}`,
/// which would accept a request to `/vendors/abc/products/123`. This would
/// result in captured data with
/// `"_meta": { "pathParams": { "vendorId": "abc", "productId": "123"}, ...}`
#[serde(default)]
#[schemars(default = "paths_schema_default", schema_with = "paths_schema")]
paths: Vec<String>,
Expand Down Expand Up @@ -297,6 +304,7 @@ async fn do_validate(
.collect::<HashSet<_>>();
let mut output = Vec::with_capacity(bindings.len());
let mut typed_bindings = Vec::with_capacity(bindings.len());
let mut existing_paths = HashMap::new();
for ValidateBinding {
collection,
resource_config_json,
Expand All @@ -311,6 +319,18 @@ async fn do_validate(

if let Some(rp) = &resource_config.path {
endpoint_paths.remove(rp.as_str());

// Normalize all parameters to a constant name, so that we can check for conflicts.
// This is meant to catch cases where a user enters `/a/{foo}` and `/a/{bar}`, which
// would otherwise fail when we try to start the server.
let path_ident = server::transform_path_params(rp, |_| String::from("{parameter}"));
if let Some(prev) = existing_paths.insert(path_ident, rp.to_owned()) {
anyhow::bail!(
"path parameter conflict: {} and {} both match requests for the same paths. Remove or change one of the paths",
prev,
rp,
);
}
}
output.push(ValidatedBinding {
resource_path: resource_config.resource_path(),
Expand Down Expand Up @@ -375,6 +395,32 @@ fn discovered_webhook_collection(path: Option<&str>) -> DiscoveredBinding {
let mut resource_config = ResourceConfig::default();
resource_config.stream = path.map(|s| s.to_owned());
resource_config.path = path.map(|s| s.to_owned());

let path_params = path
.map(|p| server::openapi_path_parameters(p).collect::<Vec<_>>())
.unwrap_or_default();

// reqPath is _not_ required so that we don't break existing tasks where
// some of the data won't have it.
let mut meta_required = vec![server::properties::ID, server::properties::TS];
// But pathParams _can_ be required if they're configured, because no
// pre-existing task configuration can include them.
if !path_params.is_empty() {
meta_required.push(server::properties::PATH_PARAMS);
}
let path_params_properties = path_params
.iter()
.map(|n| {
(
n.to_string(),
serde_json::json!({
"type": "string",
"description": "The value of the path parameter",
}),
)
})
.collect::<serde_json::Map<_, _>>();

DiscoveredBinding {
disable: false,
recommended_name: path.map(|p| p.trim_matches('/')).unwrap_or("webhook-data").to_string(),
Expand All @@ -400,9 +446,19 @@ fn discovered_webhook_collection(path: Option<&str>) -> DiscoveredBinding {
"type": "string",
"format": "date-time",
"description": "Timestamp of when the request was received by the connector"
},
"reqPath": {
"type": "string",
"description": "The configured path at which the request was received. Will include parameter placeholders if the path has them"
},
"pathParams": {
"type": "object",
"description": "Parameters extracted from the path of the request, if configured",
"properties": path_params_properties,
"required": path_params,
}
},
"required": ["webhookId", "receivedAt"]
"required": meta_required,
}
},
"required": ["_meta"]
Expand Down
Loading

0 comments on commit 68e82ac

Please sign in to comment.