Skip to content

Commit

Permalink
Fix opaque errors in HTTP frontend streaming failures
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Feb 5, 2024
1 parent 6c0d05e commit 7e50702
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ struct QueryBody {
}

/// Convert rows from a `RecordBatch` to their JSON Lines byte representation, with newlines at the end
fn batch_to_json(batch: RecordBatch) -> Result<Vec<u8>, ArrowError> {
fn batch_to_json(
maybe_batch: Result<RecordBatch, DataFusionError>,
) -> Result<Vec<u8>, ArrowError> {
let mut buf = Vec::new();
for row in record_batches_to_json_rows(&[&batch])? {
for row in record_batches_to_json_rows(&[&maybe_batch?])? {
buf.extend(
serde_json::to_vec(&row)
.map_err(|error| ArrowError::JsonError(error.to_string()))?,
Expand All @@ -140,10 +142,14 @@ async fn plan_to_response(
context: Arc<SeafowlContext>,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Response, DataFusionError> {
let stream = context
.execute_stream(plan)
.await?
.map(|maybe_batch| batch_to_json(maybe_batch?));
let stream = context.execute_stream(plan).await?.map(|maybe_batch| {
batch_to_json(maybe_batch)
// Seems like at this point wrap/hyper don't really handle the stream error well,
// i.e. the status code returned is 200 even when stream fails.
// To at least make this more transparent convert the error message to payload,
// otherwise the client just gets an opaque empty reply.
.or_else(|e| Ok::<Vec<u8>, ArrowError>(e.to_string().into_bytes()))
});
let body = hyper::Body::wrap_stream(stream);
Ok(Response::new(body))
}
Expand Down Expand Up @@ -1121,10 +1127,32 @@ pub mod tests {
let resp =
query_uncached_endpoint(&handler, "SELECT 'notanint'::int", new_db, None)
.await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(resp.status(), StatusCode::OK);

let error_msg = String::from_utf8_lossy(resp.body());
assert!(error_msg.contains("Arrow error: Cast error: Cannot cast string 'notanint' to value of Int32 type"));
assert_eq!(
error_msg,
"Cast error: Cannot cast string 'notanint' to value of Int32 type"
);
}

#[rstest]
#[tokio::test]
async fn test_error_json_conversion(
#[values(None, Some("test_db"))] new_db: Option<&str>,
) {
let context = in_memory_context_with_single_table(new_db).await;
let handler = filters(context, http_config_from_access_policy(free_for_all()));

let resp =
query_uncached_endpoint(&handler, "SELECT 1::NUMERIC", new_db, None).await;
assert_eq!(resp.status(), StatusCode::OK);

let error_msg = String::from_utf8_lossy(resp.body());
assert_eq!(
error_msg,
"Json error: data type Decimal128(38, 10) not supported in nested map for json writer"
);
}

#[rstest]
Expand Down

0 comments on commit 7e50702

Please sign in to comment.