Skip to content

Commit

Permalink
Merge pull request scylladb#1157 from muzarski/run_query_errors
Browse files Browse the repository at this point in the history
session: internal request execution API error types refactor
  • Loading branch information
wprzytula authored Jan 15, 2025
2 parents 36a6775 + e4bb929 commit 54402c7
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 349 deletions.
46 changes: 26 additions & 20 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use crate::cluster::{ClusterState, NodeRef};
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::ProtocolError;
use crate::errors::{QueryError, UserRequestError};
use crate::errors::{QueryError, RequestAttemptError};
use crate::frame::response::result;
use crate::network::Connection;
use crate::observability::driver_tracing::RequestSpan;
use crate::observability::history::{self, HistoryListener};
use crate::observability::metrics::Metrics;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::response::query_result::ColumnSpecs;
use crate::response::{NonErrorQueryResponse, QueryResponse};
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
Expand Down Expand Up @@ -135,7 +135,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
sender: ProvingSender<Result<ReceivedPage, QueryError>>,

// Closure used to perform a single page query
// AsyncFn(Arc<Connection>, Option<Arc<[u8]>>) -> Result<QueryResponse, UserRequestError>
// AsyncFn(Arc<Connection>, Option<Arc<[u8]>>) -> Result<QueryResponse, RequestAttemptError>
page_query: QueryFunc,

statement_info: RoutingInfo<'a>,
Expand All @@ -158,7 +158,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
impl<QueryFunc, QueryFut, SpanCreator> PagerWorker<'_, QueryFunc, SpanCreator>
where
QueryFunc: Fn(Arc<Connection>, Consistency, PagingState) -> QueryFut,
QueryFut: Future<Output = Result<QueryResponse, UserRequestError>>,
QueryFut: Future<Output = Result<QueryResponse, RequestAttemptError>>,
SpanCreator: Fn() -> RequestSpan,
{
// Contract: this function MUST send at least one item through self.sender
Expand Down Expand Up @@ -199,14 +199,14 @@ where
'same_node_retries: loop {
trace!(parent: &span, "Execution started");
// Query pages until an error occurs
let queries_result: Result<PageSendAttemptedProof, QueryError> = self
let queries_result: Result<PageSendAttemptedProof, RequestAttemptError> = self
.query_pages(&connection, current_consistency, node)
.instrument(span.clone())
.await;

last_error = match queries_result {
let request_error: RequestAttemptError = match queries_result {
Ok(proof) => {
trace!(parent: &span, "Query succeeded");
trace!(parent: &span, "Request succeeded");
// query_pages returned Ok, so we are guaranteed
// that it attempted to send at least one page
// through self.sender and we can safely return now.
Expand All @@ -216,15 +216,15 @@ where
trace!(
parent: &span,
error = %error,
"Query failed"
"Request failed"
);
error
}
};

// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: &last_error,
let query_info = RequestInfo {
error: &request_error,
is_idempotent: self.query_is_idempotent,
consistency: self.query_consistency,
};
Expand All @@ -234,7 +234,10 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);

last_error = request_error.into_query_error();
self.log_attempt_error(&last_error, &retry_decision);

match retry_decision {
RetryDecision::RetrySameNode(cl) => {
self.metrics.inc_retries_num();
Expand Down Expand Up @@ -278,7 +281,7 @@ where
connection: &Arc<Connection>,
consistency: Consistency,
node: NodeRef<'_>,
) -> Result<PageSendAttemptedProof, QueryError> {
) -> Result<PageSendAttemptedProof, RequestAttemptError> {
loop {
let request_span = (self.span_creator)();
match self
Expand All @@ -298,7 +301,7 @@ where
consistency: Consistency,
node: NodeRef<'_>,
request_span: &RequestSpan,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, QueryError> {
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, RequestAttemptError> {
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

Expand Down Expand Up @@ -329,7 +332,7 @@ where
self.log_query_success();
self.execution_profile
.load_balancing_policy
.on_query_success(&self.statement_info, elapsed, node);
.on_request_success(&self.statement_info, elapsed, node);

request_span.record_raw_rows_fields(&rows);

Expand Down Expand Up @@ -359,11 +362,10 @@ where
Ok(ControlFlow::Continue(()))
}
Err(err) => {
let err = err.into();
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
.on_request_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
Ok(NonErrorQueryResponse {
Expand All @@ -381,10 +383,10 @@ where
Ok(response) => {
self.metrics.inc_failed_paged_queries();
let err =
ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into();
RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
.on_request_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
}
Expand Down Expand Up @@ -482,7 +484,7 @@ struct SingleConnectionPagerWorker<Fetcher> {
impl<Fetcher, FetchFut> SingleConnectionPagerWorker<Fetcher>
where
Fetcher: Fn(PagingState) -> FetchFut + Send + Sync,
FetchFut: Future<Output = Result<QueryResponse, UserRequestError>> + Send,
FetchFut: Future<Output = Result<QueryResponse, RequestAttemptError>> + Send,
{
async fn work(mut self) -> PageSendAttemptedProof {
match self.do_work().await {
Expand All @@ -497,8 +499,12 @@ where
async fn do_work(&mut self) -> Result<PageSendAttemptedProof, QueryError> {
let mut paging_state = PagingState::start();
loop {
let result = (self.fetcher)(paging_state).await?;
let response = result.into_non_error_query_response()?;
let result = (self.fetcher)(paging_state)
.await
.map_err(RequestAttemptError::into_query_error)?;
let response = result
.into_non_error_query_response()
.map_err(RequestAttemptError::into_query_error)?;
match response.response {
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
let (proof, send_result) = self
Expand Down
54 changes: 32 additions & 22 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::cluster::node::CloudEndpoint;
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, NewSessionError, ProtocolError, QueryError, TracingProtocolError, UserRequestError,
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand All @@ -28,7 +28,7 @@ use crate::observability::tracing::TracingInfo;
use crate::policies::address_translator::AddressTranslator;
use crate::policies::host_filter::HostFilter;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::policies::speculative_execution;
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
Expand Down Expand Up @@ -1138,7 +1138,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
} else {
let prepared = connection.prepare(query_ref).await?;
let serialized = prepared.serialize_values(values_ref)?;
Expand All @@ -1154,7 +1153,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
}
}
},
Expand All @@ -1175,7 +1173,9 @@ where
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_query_error)?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1303,9 +1303,10 @@ where

// Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()`
// returns either an error or an iterator with at least one connection, so there will be at least one result.
let first_ok: Result<PreparedStatement, UserRequestError> =
let first_ok: Result<PreparedStatement, RequestAttemptError> =
results.by_ref().find_or_first(Result::is_ok).unwrap();
let mut prepared: PreparedStatement = first_ok?;
let mut prepared: PreparedStatement =
first_ok.map_err(RequestAttemptError::into_query_error)?;

// Validate prepared ids equality
for statement in results.flatten() {
Expand Down Expand Up @@ -1456,7 +1457,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
}
},
&span,
Expand All @@ -1476,7 +1476,9 @@ where
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_query_error)?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1556,7 +1558,7 @@ where

let span = RequestSpan::new_batch();

let run_request_result = self
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
.run_request(
statement_info,
&batch.config,
Expand All @@ -1577,6 +1579,7 @@ where
serial_consistency,
)
.await
.and_then(QueryResponse::into_non_error_query_response)
}
},
&span,
Expand All @@ -1586,7 +1589,8 @@ where

let result = match run_request_result {
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
RunRequestResult::Completed(result) => {
RunRequestResult::Completed(non_error_query_response) => {
let result = non_error_query_response.into_query_result()?;
span.record_result_fields(&result);
result
}
Expand Down Expand Up @@ -1844,7 +1848,7 @@ where
request_span: &'a RequestSpan,
) -> Result<RunRequestResult<ResT>, QueryError>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
Expand Down Expand Up @@ -2008,7 +2012,7 @@ where
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
Expand Down Expand Up @@ -2045,18 +2049,18 @@ where
);
let attempt_id: Option<history::AttemptId> =
context.log_attempt_start(connection.get_connect_address());
let request_result: Result<ResT, QueryError> =
let request_result: Result<ResT, RequestAttemptError> =
run_request_once(connection, current_consistency, execution_profile)
.instrument(span.clone())
.await;

let elapsed = request_start.elapsed();
last_error = match request_result {
let request_error: RequestAttemptError = match request_result {
Ok(response) => {
trace!(parent: &span, "Request succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
execution_profile.load_balancing_policy.on_request_success(
context.query_info,
elapsed,
node,
Expand All @@ -2070,20 +2074,19 @@ where
"Request failed"
);
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
execution_profile.load_balancing_policy.on_request_failure(
context.query_info,
elapsed,
node,
&e,
);
Some(e)
e
}
};

let the_error: &QueryError = last_error.as_ref().unwrap();
// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: the_error,
let query_info = RequestInfo {
error: &request_error,
is_idempotent: context.is_idempotent,
consistency: context
.consistency_set_on_statement
Expand All @@ -2095,7 +2098,14 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);
context.log_attempt_error(&attempt_id, the_error, &retry_decision);

last_error = Some(request_error.into_query_error());
context.log_attempt_error(
&attempt_id,
last_error.as_ref().unwrap(),
&retry_decision,
);

match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
self.metrics.inc_retries_num();
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType,
use crate::deserialize::DeserializeOwnedValue;
use crate::errors::{BadKeyspaceName, BadQuery, DbError, QueryError};
use crate::observability::tracing::TracingInfo;
use crate::policies::retry::{QueryInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::routing::partitioner::{
Expand Down Expand Up @@ -2725,7 +2725,7 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() {

struct MyRetrySession(Arc<AtomicBool>);
impl RetrySession for MyRetrySession {
fn decide_should_retry(&mut self, _: QueryInfo) -> RetryDecision {
fn decide_should_retry(&mut self, _: RequestInfo) -> RetryDecision {
self.0.store(true, Ordering::Relaxed);
RetryDecision::IgnoreWriteError
}
Expand Down
7 changes: 5 additions & 2 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::client::pager::QueryPager;
use crate::cluster::node::resolve_contact_points;
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::{DbError, NewSessionError, QueryError};
use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError};
use crate::frame::response::event::Event;
use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize};
use crate::policies::host_filter::HostFilter;
Expand Down Expand Up @@ -986,7 +986,10 @@ where
let mut query = Query::new(query_str);
query.set_page_size(METADATA_QUERY_PAGE_SIZE);

let prepared = conn.prepare(&query).await?;
let prepared = conn
.prepare(&query)
.await
.map_err(RequestAttemptError::into_query_error)?;
let serialized_values = prepared.serialize_values(&keyspaces)?;
conn.execute_iter(prepared, serialized_values).await
}
Expand Down
Loading

0 comments on commit 54402c7

Please sign in to comment.