Skip to content

Commit

Permalink
Fix event_type array query parsing (#810)
Browse files Browse the repository at this point in the history
This fixes how `?event_types` query parameters are parsed in
`*/attempt/endpoint/*`, in addition to some adjacent refactors.

## Motivation

`axum::Query` doesn't support URL query array parameter parsing very
well. For example,
- `?a=1&a=2` gets rejected
- `?a[]=1&a[]=2` also gets rejected
- `?a[0]=1&a[1]=2` _also_ gets rejected

This is largely because `serde::urlencoded` doesn't support any of these
formats. See nox/serde_urlencoded#75

There is `serde_qs`, which supports the last two. But not the first
format. See samscott89/serde_qs#16

Unfortunately, we _need_ to support the first format because that is
what our internal API has supported up to today.

However, our OSS has been inconsistent. Specifically with
`?event_types`.

Most endpoints that took in `?event_types` used
`MessageListFetchOptions`, which manually parsed `?event_types` only
using the first format. Ideally, we'd like to support all 3.

In addition, `*/attempts/endpoint/` didn't use
`MessageListFetchOptions`. Instead it relied on `axum::Query`
deserialization, so `?event_types` outright didn't work as expected with
this endpoint.

## Solution

This PR does a couple small things to fix these issues

1) `MessageListFetchOptions` is axed in favor of a more explicit
`EventTypesQuery`. `MessageListFetchOptions` had an extra timestamp
parameter, `before`, that is already parsed just fine by `axum::Query`.
2) `EventTypesQuery` is _similar_ to `MessageListFetchOptions`, in
respect to how `event_types` are parsed. With a few exceptions
    * `EventTypesQuery` validates the input.
    * `EventTypesQuery` handles all 3 formats, not just the first one
* `EventTypesQuery` should be a bit more performant, since it uses
`form_urlencode`, which parses pairs as `Cow<str>`, so fewer
allocations. (Note that `form_urlencode` is what `serde_urlencoded` uses
under the hood)
3) Updates `*/attempts/endpoint/` to use `EventTypesQuery`, so that
`?event_types` are parsed correctly. Tests are added to demonstrate and
validate this behavior.
  • Loading branch information
svix-gabriel authored Feb 6, 2023
2 parents 0ee7cdf + 3d7b3f8 commit 3dc517b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 63 deletions.
1 change: 1 addition & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ ipnet = { version = "2.5", features = ["serde"] }
urlencoding = "2.1.2"
strum_macros = "0.24"
strum = { version = "0.24", features = ["derive"] }
form_urlencoded = "1.1.0"

[dev-dependencies]
anyhow = "1.0.56"
29 changes: 7 additions & 22 deletions server/svix-server/src/v1/endpoints/attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
utils::{
apply_pagination, iterator_from_before_or_after, openapi_tag, ApplicationEndpointPath,
ApplicationMsgAttemptPath, ApplicationMsgEndpointPath, ApplicationMsgPath,
EmptyResponse, ListResponse, MessageListFetchOptions, ModelOut, PaginationLimit,
EmptyResponse, EventTypesQuery, ListResponse, ModelOut, PaginationLimit,
ReversibleIterator, ValidatedQuery,
},
},
Expand Down Expand Up @@ -225,8 +225,6 @@ pub struct ListAttemptsByEndpointQueryParameters {
status: Option<MessageStatus>,
status_code_class: Option<StatusCodeClass>,
#[validate]
event_types: Option<EventTypeNameSet>,
#[validate]
channel: Option<EventChannel>,
before: Option<DateTime<Utc>>,
after: Option<DateTime<Utc>>,
Expand Down Expand Up @@ -305,11 +303,11 @@ async fn list_attempts_by_endpoint(
ValidatedQuery(ListAttemptsByEndpointQueryParameters {
status,
status_code_class,
event_types,
channel,
before,
after,
}): ValidatedQuery<ListAttemptsByEndpointQueryParameters>,
EventTypesQuery(event_types): EventTypesQuery,
Path(ApplicationEndpointPath { endpoint_id, .. }): Path<ApplicationEndpointPath>,
permissions::Application { app }: permissions::Application,
) -> Result<Json<ListResponse<MessageAttemptOut>>> {
Expand Down Expand Up @@ -360,8 +358,6 @@ pub struct ListAttemptsByMsgQueryParameters {
status: Option<MessageStatus>,
status_code_class: Option<StatusCodeClass>,
#[validate]
event_types: Option<EventTypeNameSet>,
#[validate]
channel: Option<EventChannel>,
#[validate]
endpoint_id: Option<EndpointIdOrUid>,
Expand All @@ -376,13 +372,13 @@ async fn list_attempts_by_msg(
ValidatedQuery(ListAttemptsByMsgQueryParameters {
status,
status_code_class,
event_types,
channel,
endpoint_id,
before,
after,
}): ValidatedQuery<ListAttemptsByMsgQueryParameters>,
Path(ApplicationMsgPath { msg_id, .. }): Path<ApplicationMsgPath>,
EventTypesQuery(event_types): EventTypesQuery,
permissions::Application { app }: permissions::Application,
) -> Result<Json<ListResponse<MessageAttemptOut>>> {
let PaginationLimit(limit) = pagination.limit;
Expand Down Expand Up @@ -533,7 +529,7 @@ async fn list_attempts_for_endpoint(
before,
after,
}): ValidatedQuery<ListAttemptsForEndpointQueryParameters>,
list_filter: MessageListFetchOptions,
event_types_query: EventTypesQuery,
Path(ApplicationMsgEndpointPath {
app_id,
msg_id,
Expand All @@ -551,7 +547,7 @@ async fn list_attempts_for_endpoint(
before,
after,
}),
list_filter,
event_types_query,
Path(ApplicationMsgPath { app_id, msg_id }),
auth_app,
)
Expand Down Expand Up @@ -579,7 +575,7 @@ async fn list_messageattempts(
before,
after,
}): ValidatedQuery<AttemptListFetchOptions>,
list_filter: MessageListFetchOptions,
EventTypesQuery(event_types): EventTypesQuery,
Path(ApplicationMsgPath { msg_id, .. }): Path<ApplicationMsgPath>,
permissions::Application { app }: permissions::Application,
) -> Result<Json<ListResponse<MessageAttemptOut>>> {
Expand Down Expand Up @@ -611,7 +607,7 @@ async fn list_messageattempts(
query = query.filter(Expr::cust_with_values("channels ?? ?", vec![channel]));
}

if let Some(EventTypeNameSet(event_types)) = list_filter.event_types {
if let Some(EventTypeNameSet(event_types)) = event_types {
query = query.filter(message::Column::EventType.is_in(event_types));
}

Expand Down Expand Up @@ -810,8 +806,6 @@ mod tests {

const INVALID_CHANNEL: &str = "$$invalid-channel";
const VALID_CHANNEL: &str = "valid-channel";
const INVALID_EVENT_TYPES: &[&str] = &["valid-event-type", "&&invalid-event-type"];
const VALID_EVENT_TYPES: &[&str] = &["valid-event-type", "another-valid-event-type"];
const INVALID_ENDPOINT_ID: &str = "$$invalid-endpoint";
const VALID_ENDPOINT_ID: &str = "ep_valid-endpoint";

Expand All @@ -828,21 +822,13 @@ mod tests {

#[test]
fn test_list_attempts_by_endpoint_query_parameters_validation() {
let q: ListAttemptsByEndpointQueryParameters =
serde_json::from_value(json!({ "event_types": INVALID_EVENT_TYPES })).unwrap();
assert!(q.validate().is_err());

let q: ListAttemptsByEndpointQueryParameters =
serde_json::from_value(json!({ "channel": INVALID_CHANNEL })).unwrap();
assert!(q.validate().is_err());
}

#[test]
fn test_list_attempts_by_msg_query_parameters_validation() {
let q: ListAttemptsByMsgQueryParameters =
serde_json::from_value(json!({ "event_types": INVALID_EVENT_TYPES })).unwrap();
assert!(q.validate().is_err());

let q: ListAttemptsByMsgQueryParameters =
serde_json::from_value(json!({ "channel": INVALID_CHANNEL })).unwrap();
assert!(q.validate().is_err());
Expand All @@ -853,7 +839,6 @@ mod tests {

let q: ListAttemptsByMsgQueryParameters = serde_json::from_value(json!(
{
"event_types": VALID_EVENT_TYPES,
"channel": VALID_CHANNEL,
"endpoint_id": VALID_ENDPOINT_ID
}
Expand Down
12 changes: 7 additions & 5 deletions server/svix-server/src/v1/endpoints/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::{
queue::MessageTaskBatch,
v1::utils::{
apply_pagination, iterator_from_before_or_after, openapi_tag, validation_error,
ApplicationMsgPath, ListResponse, MessageListFetchOptions, ModelIn, ModelOut,
PaginationLimit, ReversibleIterator, ValidatedJson, ValidatedQuery,
ApplicationMsgPath, EventTypesQuery, ListResponse, ModelIn, ModelOut, PaginationLimit,
ReversibleIterator, ValidatedJson, ValidatedQuery,
},
AppState,
};
Expand Down Expand Up @@ -165,6 +165,7 @@ pub struct ListMessagesQueryParams {
#[serde(default = "default_true")]
with_content: bool,

before: Option<DateTime<Utc>>,
after: Option<DateTime<Utc>>,
}

Expand All @@ -174,24 +175,25 @@ async fn list_messages(
ValidatedQuery(ListMessagesQueryParams {
channel,
with_content,
before,
after,
}): ValidatedQuery<ListMessagesQueryParams>,
list_filter: MessageListFetchOptions,
EventTypesQuery(event_types): EventTypesQuery,
permissions::Application { app }: permissions::Application,
) -> Result<Json<ListResponse<MessageOut>>> {
let PaginationLimit(limit) = pagination.limit;

let mut query = message::Entity::secure_find(app.id);

if let Some(EventTypeNameSet(event_types)) = list_filter.event_types {
if let Some(EventTypeNameSet(event_types)) = event_types {
query = query.filter(message::Column::EventType.is_in(event_types));
}

if let Some(channel) = channel {
query = query.filter(Expr::cust_with_values("channels ?? ?", vec![channel]));
}

let iterator = iterator_from_before_or_after(pagination.iterator, list_filter.before, after);
let iterator = iterator_from_before_or_after(pagination.iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));

let query = apply_pagination(query, message::Column::Id, limit, iterator);
Expand Down
62 changes: 26 additions & 36 deletions server/svix-server/src/v1/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
collections::HashSet,
error::Error as StdError,
ops::Deref,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

Expand Down Expand Up @@ -382,53 +381,44 @@ impl<T: JsonSchema> OperationInput for ValidatedQuery<T> {
}
}

/// This struct is slower than Query. Only use this if we need to pass arrays.
#[derive(Debug)]
pub struct MessageListFetchOptions {
pub event_types: Option<EventTypeNameSet>,
pub before: Option<DateTime<Utc>>,
}
// A special wrapper to handle query parameter lists. serde_qs and serde_urlencode can't
// handle url query param arrays as flexibly as we need to support in our API
pub struct EventTypesQuery(pub Option<EventTypeNameSet>);

#[async_trait]
impl<S> FromRequestParts<S> for MessageListFetchOptions
impl<S> FromRequestParts<S> for EventTypesQuery
where
S: Send + Sync,
{
type Rejection = Error;

async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self> {
let pairs: Vec<(String, String)> =
serde_urlencoded::from_str(parts.uri.query().unwrap_or_default())
.map_err(|err| HttpError::bad_request(None, Some(err.to_string())))?;

let mut before = None;
let mut event_types = EventTypeNameSet(HashSet::<EventTypeName>::new());
for (key, value) in pairs {
if key == "event_types" {
event_types.0.insert(EventTypeName(value));
} else if key == "before" {
before = Some(DateTime::<Utc>::from_str(&value).map_err(|_| {
HttpError::unprocessable_entity(vec![ValidationErrorItem {
loc: vec!["query".to_owned(), "before".to_owned()],
msg: "Unable to parse before".to_owned(),
ty: "value_error".to_owned(),
}])
})?);
}
}
let event_types = if event_types.0.is_empty() {
None
let pairs = form_urlencoded::parse(parts.uri.query().unwrap_or_default().as_bytes());

let event_types: HashSet<EventTypeName> = pairs
.filter_map(|(key, value)| {
// want to handle both `?event_types=`, `?event_types[]=`, and `?event_types[1]=`
if key == "event_types" || (key.starts_with("event_types[") && key.ends_with(']')) {
Some(EventTypeName(value.into_owned()))
} else {
None
}
})
.collect();

if event_types.is_empty() {
Ok(Self(None))
} else {
Some(event_types)
};
Ok(MessageListFetchOptions {
event_types,
before,
})
let event_types = EventTypeNameSet(event_types);
event_types.validate().map_err(|e| {
HttpError::unprocessable_entity(validation_errors(vec!["query".to_owned()], e))
})?;
Ok(Self(Some(event_types)))
}
}
}

impl OperationInput for MessageListFetchOptions {}
impl OperationInput for EventTypesQuery {}

pub async fn api_not_implemented() -> Result<()> {
Err(HttpError::not_implemented(None, None).into())
Expand Down
27 changes: 27 additions & 0 deletions server/svix-server/tests/e2e_attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,33 @@ async fn test_list_attempts_by_endpoint() {
.unwrap();
assert_eq!(obits_attempts.data.len(), 1);

let exploded_attempts: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{app_id}/attempt/endpoint/{endp_id_2}/?event_types=user.exploded"),
StatusCode::OK,
)
.await
.unwrap();
assert_eq!(exploded_attempts.data.len(), 1);

let regular_attempts: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{app_id}/attempt/endpoint/{endp_id_2}/?event_types[]=event.type"),
StatusCode::OK,
)
.await
.unwrap();
assert_eq!(regular_attempts.data.len(), 2);

let all_attempts: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{app_id}/attempt/endpoint/{endp_id_2}/?event_types[0]=event.type&event_types[1]=user.exploded"),
StatusCode::OK,
)
.await
.unwrap();
assert_eq!(all_attempts.data.len(), 3);

receiver_1.jh.abort();
receiver_2.jh.abort();
}
Expand Down

0 comments on commit 3dc517b

Please sign in to comment.