-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(http source): Add custom response header configuration #20811
base: master
Are you sure you want to change the base?
Changes from 12 commits
abe8f47
87d8679
d3c2c3c
38f210d
a76b5fa
27352cc
dc670fd
8859463
1d3445c
01d7e41
aa06e83
63c97ed
46d23f8
0da494a
1ea1d69
a4f9137
0e36d4c
6e802fc
21c5c4c
bbce50d
0eb0663
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
The `http_server` source now allows configuring custom headers to be added to responses via the `custom_response_headers` option. | ||
|
||
authors: chriscancompute |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ use http::StatusCode; | |
use http_serde; | ||
use tokio_util::codec::Decoder as _; | ||
use vrl::value::{kind::Collection, Kind}; | ||
use warp::http::{HeaderMap, HeaderValue}; | ||
use warp::http::{HeaderMap, HeaderName, HeaderValue}; | ||
|
||
use vector_lib::codecs::{ | ||
decoding::{DeserializerConfig, FramingConfig}, | ||
|
@@ -99,6 +99,14 @@ pub struct SimpleHttpConfig { | |
#[configurable(metadata(docs::examples = "*"))] | ||
headers: Vec<String>, | ||
|
||
/// Custom response headers to be added to the HTTP response | ||
#[serde(default)] | ||
#[configurable(metadata(docs::examples = "example_custom_response_headers()"))] | ||
#[configurable(metadata( | ||
docs::additional_props_description = "A custom response header key-values pair" | ||
))] | ||
custom_response_headers: HashMap<String, Vec<String>>, | ||
|
||
/// A list of URL query parameters to include in the log event. | ||
/// | ||
/// These override any values included in the body with conflicting names. | ||
|
@@ -170,6 +178,13 @@ pub struct SimpleHttpConfig { | |
keepalive: KeepaliveConfig, | ||
} | ||
|
||
fn example_custom_response_headers() -> HashMap<String, Vec<String>> { | ||
HashMap::<String, Vec<String>>::from_iter([( | ||
"Access-Control-Allow-Origin".to_string(), | ||
vec!["my-cool-server".to_string(), "my-other-server".to_string()], | ||
)]) | ||
} | ||
|
||
impl SimpleHttpConfig { | ||
/// Builds the `schema::Definition` for this source using the provided `LogNamespace`. | ||
fn schema_definition(&self, log_namespace: LogNamespace) -> Definition { | ||
|
@@ -265,6 +280,7 @@ impl Default for SimpleHttpConfig { | |
address: "0.0.0.0:8080".parse().unwrap(), | ||
encoding: None, | ||
headers: Vec::new(), | ||
custom_response_headers: HashMap::new(), | ||
query_parameters: Vec::new(), | ||
tls: None, | ||
auth: None, | ||
|
@@ -355,6 +371,7 @@ impl SourceConfig for SimpleHttpConfig { | |
|
||
let source = SimpleHttpSource { | ||
headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?, | ||
custom_response_headers: self.custom_response_headers.clone(), | ||
query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"), | ||
path_key: self.path_key.clone(), | ||
host_key: self.host_key.clone(), | ||
|
@@ -403,6 +420,7 @@ impl SourceConfig for SimpleHttpConfig { | |
#[derive(Clone)] | ||
struct SimpleHttpSource { | ||
headers: Vec<HttpConfigParamKind>, | ||
custom_response_headers: HashMap<String, Vec<String>>, | ||
query_parameters: Vec<String>, | ||
path_key: OptionalValuePath, | ||
host_key: OptionalValuePath, | ||
|
@@ -544,10 +562,31 @@ impl HttpSource for SimpleHttpSource { | |
fn enable_source_ip(&self) -> bool { | ||
self.host_key.path.is_some() | ||
} | ||
|
||
/// Enriches the warp::reply::Reply with custom headers | ||
/// | ||
/// This method adds the custom headers specified in the configuration | ||
/// to the HTTP response. | ||
fn enrich_reply<T: warp::Reply + 'static>(&self, reply: T) -> Box<dyn warp::Reply> { | ||
let mut response = reply.into_response(); | ||
let header_map = response.headers_mut(); | ||
|
||
for (key, values) in &self.custom_response_headers { | ||
let header_name: HeaderName = key.parse().unwrap(); | ||
if let Some((first, rest)) = values.split_first() { | ||
header_map.insert(header_name.clone(), first.parse().unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can just use
https://docs.rs/http/0.2.12/http/header/struct.HeaderMap.html#method.append There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the docs you point, I understand that if you append without inserting first, it would fail Thats why I suggested to insert the first and append the rest There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it fails. It just returns See the implementation: https://docs.rs/http/0.2.12/src/http/header/map.rs.html#1415-1449 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, you're right, didn't see the implementation. I think I understood wrong the documentation. I see that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For sure :) I can see why you would have been confused. |
||
for value in rest { | ||
header_map.append(header_name.clone(), value.parse().unwrap()); | ||
} | ||
} | ||
} | ||
Box::new(response) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::collections::HashMap; | ||
use std::str::FromStr; | ||
use std::{io::Write, net::SocketAddr}; | ||
|
||
|
@@ -591,6 +630,7 @@ mod tests { | |
#[allow(clippy::too_many_arguments)] | ||
async fn source<'a>( | ||
headers: Vec<String>, | ||
custom_response_headers: HashMap<String, Vec<String>>, | ||
query_parameters: Vec<String>, | ||
path_key: &'a str, | ||
host_key: &'a str, | ||
|
@@ -619,6 +659,7 @@ mod tests { | |
SimpleHttpConfig { | ||
address, | ||
headers, | ||
custom_response_headers, | ||
encoding: None, | ||
query_parameters, | ||
response_code, | ||
|
@@ -730,6 +771,7 @@ mod tests { | |
|
||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -776,6 +818,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -815,6 +858,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -848,6 +892,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -886,6 +931,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -931,6 +977,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -982,6 +1029,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1068,6 +1116,7 @@ mod tests { | |
"X-*".to_string(), | ||
"AbsentHeader".to_string(), | ||
], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1112,6 +1161,7 @@ mod tests { | |
|
||
let (rx, addr) = source( | ||
vec!["*".to_string()], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1145,11 +1195,65 @@ mod tests { | |
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn http_custom_response_headers() { | ||
async fn send(address: SocketAddr, body: &str) -> reqwest::Response { | ||
reqwest::Client::new() | ||
.post(&format!("http://{}/", address)) | ||
.body(body.to_owned()) | ||
.send() | ||
.await | ||
.unwrap() | ||
} | ||
|
||
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let mut custom_headers: HashMap<String, Vec<String>> = HashMap::new(); | ||
custom_headers.insert( | ||
"Access-Control-Allow-Origin".to_string(), | ||
vec!["example.com".to_string(), "example2.com".to_string()], | ||
); | ||
|
||
let (rx, addr) = source( | ||
vec!["*".to_string()], | ||
custom_headers, | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
"/", | ||
"POST", | ||
StatusCode::OK, | ||
true, | ||
EventStatus::Delivered, | ||
true, | ||
None, | ||
Some(JsonDeserializerConfig::default().into()), | ||
) | ||
.await; | ||
|
||
spawn_collect_n( | ||
async move { | ||
let response = send(addr, "{\"key1\":\"value1\"}").await; | ||
let response_headers = response.headers(); | ||
let view = response_headers.get_all("Access-Control-Allow-Origin"); | ||
let mut iter = view.iter(); | ||
assert_eq!(&"example.com", iter.next().unwrap()); | ||
assert_eq!(&"example2.com", iter.next().unwrap()); | ||
assert!(iter.next().is_none()); | ||
}, | ||
rx, | ||
1, | ||
) | ||
.await | ||
}) | ||
.await; | ||
} | ||
|
||
#[tokio::test] | ||
async fn http_query() { | ||
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![ | ||
"source".to_string(), | ||
"region".to_string(), | ||
|
@@ -1206,6 +1310,7 @@ mod tests { | |
|
||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1237,6 +1342,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"vector_http_path", | ||
"vector_remote_ip", | ||
|
@@ -1278,6 +1384,7 @@ mod tests { | |
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"vector_http_path", | ||
"vector_remote_ip", | ||
|
@@ -1339,6 +1446,7 @@ mod tests { | |
components::init_test(); | ||
let (_rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"vector_http_path", | ||
"vector_remote_ip", | ||
|
@@ -1364,6 +1472,7 @@ mod tests { | |
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1398,6 +1507,7 @@ mod tests { | |
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1429,6 +1539,7 @@ mod tests { | |
let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { | ||
let (rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
@@ -1462,6 +1573,7 @@ mod tests { | |
components::init_test(); | ||
let (_rx, addr) = source( | ||
vec![], | ||
HashMap::new(), | ||
vec![], | ||
"http_path", | ||
"remote_ip", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do this parsing and validation earlier? That is: parse the configured headers into a
HeaderMap
during source start time? I think that would have the advantage of: