Skip to content

Commit

Permalink
Don't ignore errors on query status API
Browse files Browse the repository at this point in the history
We may be observing issues trying to run Hybrid at scale that are not visible to us due to the nature how query status handles those. If error is not a mismatch, it is just silently ignored.
This could lead to a variety of problems - most importantly it will mask the failures and made other shards think that everything is OK, and they can proceed with finalizing results.

This PR changes this and makes this API return an error if there is at least one error that is not a status mismatch. If all of them are, the behaviour is the same as before
  • Loading branch information
akoshelev committed Dec 11, 2024
1 parent ff37b8a commit c464883
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions ipa-core/src/query/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,14 @@ impl Processor {
/// [`QueryStatusError::DifferentStatus`] and retrieve it's internal state. Returns [`None`]
/// if not possible.
#[cfg(feature = "in-memory-infra")]
fn downcast_state_error(box_error: crate::error::BoxError) -> Option<QueryStatus> {
fn downcast_state_error(box_error: &crate::error::BoxError) -> Option<QueryStatus> {
use crate::helpers::ApiError;
let api_error = box_error.downcast::<ApiError>().ok()?;
if let ApiError::QueryStatus(QueryStatusError::DifferentStatus { my_status, .. }) =
*api_error
let api_error = box_error.downcast_ref::<ApiError>();
if let Some(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
my_status, ..
})) = api_error
{
return Some(my_status);
return Some(*my_status);
}
None
}
Expand All @@ -386,7 +387,7 @@ impl Processor {
/// of relying on errors.
#[cfg(feature = "in-memory-infra")]
fn get_state_from_error(
error: crate::helpers::InMemoryTransportError<ShardIndex>,
error: &crate::helpers::InMemoryTransportError<ShardIndex>,
) -> Option<QueryStatus> {
if let crate::helpers::InMemoryTransportError::Rejected { inner, .. } = error {
return Self::downcast_state_error(inner);
Expand All @@ -399,8 +400,8 @@ impl Processor {
/// TODO: Ideally broadcast should return a value, that we could use to parse the state instead
/// of relying on errors.
#[cfg(feature = "real-world-infra")]
fn get_state_from_error(shard_error: crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = shard_error.source {
fn get_state_from_error(shard_error: &crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = &shard_error.source {

Check warning on line 404 in ipa-core/src/query/processor.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/query/processor.rs#L403-L404

Added lines #L403 - L404 were not covered by tests
return Some(error.actual);
}
None
Expand Down Expand Up @@ -431,17 +432,14 @@ impl Processor {

let shard_responses = shard_transport.broadcast(shard_query_status_req).await;
if let Err(e) = shard_responses {
// The following silently ignores the cases where the query isn't found.
// TODO: this code is a ticking bomb - it ignores all errors, not just when
// query is not found. If there is no handler, handler responded with an error, etc.
// Moreover, any error may result in client mistakenly assuming that the status
// is completed.
let states: Vec<_> = e
.failures
.into_iter()
.filter_map(|(_si, e)| Self::get_state_from_error(e))
.collect();
status = states.into_iter().fold(status, min_status);
for (shard, failure) in &e.failures {
if let Some(other) = Self::get_state_from_error(failure) {
status = min_status(status, other);
} else {
tracing::error!("failed to get status from shard {shard}: {failure:?}");
return Err(e.into());
}
}
}

Ok(status)
Expand Down Expand Up @@ -1205,16 +1203,25 @@ mod tests {
/// * From the standpoint of leader shard in Helper 1
/// * On query_status
///
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader shouldn't
/// return an error but instead with the min state.
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader should
/// return an error despite other shards returning their status
#[tokio::test]
#[should_panic(
expected = "(ShardIndex(3), Rejected { dest: ShardIndex(3), inner: QueryStatus(NoSuchQuery(QueryId)) })"
)]
async fn status_query_doesnt_exist() {
fn shard_handle(si: ShardIndex) -> Arc<dyn RequestHandler<ShardIndex>> {
create_handler(move |_| async move {
if si == ShardIndex::from(3) {
Err(ApiError::QueryStatus(QueryStatusError::NoSuchQuery(
QueryId,
)))
} else if si == ShardIndex::from(2) {
Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
query_id: QueryId,
my_status: QueryStatus::Running,
other_status: QueryStatus::Preparing,
}))
} else {
Ok(HelperResponse::ok())
}
Expand All @@ -1237,16 +1244,10 @@ mod tests {
req,
)
.unwrap();
let r = t
.processor
t.processor
.query_status(t.shard_transport.clone_ref(), QueryId)
.await;
if let Err(e) = r {
panic!("Unexpected error {e}");
}
if let Ok(st) = r {
assert_eq!(QueryStatus::AwaitingInputs, st);
}
.await
.unwrap();
}

/// Context:
Expand Down

0 comments on commit c464883

Please sign in to comment.