Skip to content

Commit

Permalink
Add an elementary grpc metrics test
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Feb 12, 2024
1 parent 44f7d9c commit 168aa55
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 69 deletions.
2 changes: 1 addition & 1 deletion tests/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ dsn = "{}"
dsn.escape_default(),
);

write!(conf_file, "{}", config_str)?;
write!(conf_file, "{config_str}")?;
Ok(temp_dir)
}
51 changes: 21 additions & 30 deletions tests/flight/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use crate::flight::*;

#[tokio::test]
async fn test_basic_queries() -> Result<()> {
let (context, addr, flight) = start_flight_server().await;
let (context, mut client) = flight_server().await;
create_table_and_insert(context.as_ref(), "flight_table").await;
tokio::task::spawn(flight);

let mut client = create_flight_client(addr).await;

// Test the handshake works
let _ = client.handshake("test").await.expect("error handshaking");
Expand All @@ -29,18 +26,12 @@ async fn test_basic_queries() -> Result<()> {
Ok(())
}

#[rstest]
#[tokio::test]
async fn test_interleaving_queries() -> Result<()> {
let (context, addr, flight) = start_flight_server().await;
async fn test_interleaving_queries(_metrics_setup: ()) -> Result<()> {
let (context, mut client) = flight_server().await;
create_table_and_insert(context.as_ref(), "flight_table").await;

tokio::task::spawn(flight);

// Configure the metrics recorder and exporter
setup_metrics(&Metrics::default());

let mut client = create_flight_client(addr).await;

// Fire of the first query
let cmd = CommandStatementQuery {
query: "SELECT MAX(some_int_value) FROM flight_table".to_string(),
Expand Down Expand Up @@ -69,7 +60,7 @@ async fn test_interleaving_queries() -> Result<()> {
.clone()
.expect("expected ticket");

// Execute a couple of queries that errors out
// Execute a couple of queries that error out
// One during planning (GetFlightInfo) ...
let err = get_flight_batches(&mut client, "SELECT * FROM nonexistent".to_string())
.await
Expand All @@ -78,7 +69,7 @@ async fn test_interleaving_queries() -> Result<()> {
assert!(
err.contains("code: Internal, message: \"Error during planning: table 'default.public.nonexistent' not found\"")
);
// ...and another one during execution, so we don't really capture the status in the metrics
// ...and another one after handing off the stream to the client, so we don't really capture the status in the metrics
let err = get_flight_batches(&mut client, "SELECT 'notanint'::INT".to_string())
.await
.unwrap_err()
Expand Down Expand Up @@ -122,27 +113,27 @@ async fn test_interleaving_queries() -> Result<()> {
assert_batches_eq!(expected, &results);

// Finally test gRPC-related metrics
assert_eq!(
get_metrics(GRPC_REQUESTS).await,
vec![
"# HELP grpc_requests Counter tracking gRPC request statistics",
"# TYPE grpc_requests counter",
"grpc_requests{path=\"/arrow.flight.protocol.FlightService/DoGet\",status=\"0\"} 3",
"grpc_requests{path=\"/arrow.flight.protocol.FlightService/DoGet\",status=\"5\"} 1",
"grpc_requests{path=\"/arrow.flight.protocol.FlightService/GetFlightInfo\",status=\"0\"} 3",
"grpc_requests{path=\"/arrow.flight.protocol.FlightService/GetFlightInfo\",status=\"13\"} 1",
]
);
// TODO: Run this test in a separate process to make the metrics assertions precise,
// and avoid unique address/global recorder conflicts between tests.
// assert_eq!(
// get_metrics(GRPC_REQUESTS).await,
// vec![
// "# HELP grpc_requests Counter tracking gRPC request statistics",
// "# TYPE grpc_requests counter",
// "grpc_requests{path=\"/arrow.flight.protocol.FlightService/DoGet\",status=\"0\"} 3",
// "grpc_requests{path=\"/arrow.flight.protocol.FlightService/DoGet\",status=\"5\"} 1",
// "grpc_requests{path=\"/arrow.flight.protocol.FlightService/GetFlightInfo\",status=\"0\"} 3",
// "grpc_requests{path=\"/arrow.flight.protocol.FlightService/GetFlightInfo\",status=\"13\"} 1",
// ]
// );
assert!(get_metrics(GRPC_REQUESTS).await.len() >= 6);

Ok(())
}

#[tokio::test]
async fn test_ddl_types_roundtrip() -> Result<()> {
let (_context, addr, flight) = start_flight_server().await;
tokio::task::spawn(flight);

let mut client = create_flight_client(addr).await;
let (_context, mut client) = flight_server().await;

let all_types_query = r#"
SELECT
Expand Down
53 changes: 25 additions & 28 deletions tests/flight/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::http::get_metrics;
use crate::metrics_setup;
use crate::statements::create_table_and_insert;
use arrow::record_batch::RecordBatch;
use arrow_flight::error::Result;
Expand All @@ -7,13 +8,11 @@ use arrow_flight::{FlightClient, FlightDescriptor};
use datafusion_common::assert_batches_eq;
use futures::TryStreamExt;
use prost::Message;
use seafowl::config::context::{build_context, setup_metrics, GRPC_REQUESTS};
use seafowl::config::schema::{load_config_from_string, Metrics};
use rstest::rstest;
use seafowl::config::context::{build_context, GRPC_REQUESTS};
use seafowl::config::schema::{load_config_from_string, SeafowlConfig};
use seafowl::context::SeafowlContext;
use seafowl::frontend::flight::run_flight_server;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::net::TcpListener;
use tonic::metadata::MetadataValue;
Expand All @@ -22,12 +21,8 @@ use tonic::transport::Channel;
mod client;
mod search_path;

async fn start_flight_server() -> (
Arc<SeafowlContext>,
SocketAddr,
Pin<Box<dyn Future<Output = ()> + Send>>,
) {
// let OS choose a a free port
async fn make_test_context() -> (SeafowlConfig, Arc<SeafowlContext>) {
// let OS choose a free port
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

Expand All @@ -48,27 +43,29 @@ bind_port = {}"#,

let config = load_config_from_string(&config_text, false, None).unwrap();
let context = Arc::from(build_context(&config).await.unwrap());
(config, context)
}

let flight = run_flight_server(
context.clone(),
config
.frontend
.flight
.expect("Arrow Flight frontend configured"),
);
async fn flight_server() -> (Arc<SeafowlContext>, FlightClient) {
let (config, context) = make_test_context().await;

(context, addr, Box::pin(flight))
}
let flight_cfg = config
.frontend
.flight
.expect("Arrow Flight frontend configured");

let flight = run_flight_server(context.clone(), flight_cfg.clone());
tokio::task::spawn(flight);

async fn create_flight_client(addr: SocketAddr) -> FlightClient {
let port = addr.port();
let channel = Channel::from_shared(format!("http://localhost:{port}"))
.expect("Endpoint created")
.connect()
.await
.expect("Channel connected");
// Create the channel for the client
let channel = Channel::from_shared(format!(
"http://{}:{}",
flight_cfg.bind_host, flight_cfg.bind_port
))
.expect("Endpoint created")
.connect_lazy();

FlightClient::new(channel)
(context, FlightClient::new(channel))
}

async fn get_flight_batches(
Expand Down
5 changes: 1 addition & 4 deletions tests/flight/search_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ use crate::flight::*;
#[tokio::test]
async fn test_default_schema_override(
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let (context, addr, flight) = start_flight_server().await;
let (context, mut client) = flight_server().await;

context.plan_query("CREATE SCHEMA some_schema").await?;
create_table_and_insert(context.as_ref(), "some_schema.flight_table").await;
tokio::task::spawn(flight);

let mut client = create_flight_client(addr).await;

// Trying to run the query without the search_path set will error out
let err = get_flight_batches(&mut client, "SELECT * FROM flight_table".to_string())
Expand Down
6 changes: 4 additions & 2 deletions tests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio::process::Command;

use futures::Future;
use futures::FutureExt;
use seafowl::config::context::{build_context, setup_metrics, HTTP_REQUESTS};
use seafowl::config::schema::{load_config_from_string, Metrics};
use seafowl::config::context::{build_context, HTTP_REQUESTS};
use seafowl::config::schema::load_config_from_string;
use seafowl::frontend::http::filters;

use warp::hyper::body::to_bytes;
Expand Down Expand Up @@ -41,6 +41,8 @@ use tempfile::Builder;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Sender;

use crate::metrics_setup;

// Hack because integration tests do not set cfg(test)
// https://users.rust-lang.org/t/sharing-helper-function-between-unit-and-integration-tests/9941/2
#[allow(clippy::duplicate_mod)]
Expand Down
6 changes: 2 additions & 4 deletions tests/http/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::http::*;

#[rstest]
#[tokio::test]
async fn test_http_server_reader_writer() {
async fn test_http_server_reader_writer(_metrics_setup: ()) {
// It's questionable how much value this testing adds on top of the tests in http.rs, but we do:
// - test the code consumes the config correctly, which we don't do in HTTP tests
// - hit the server that's actually listening on a port instead of calling warp routines directly.
Expand All @@ -13,9 +14,6 @@ async fn test_http_server_reader_writer() {
let client = Client::new();
let uri = format!("http://{addr}/q");

// Configure the metrics recorder and exporter
setup_metrics(&Metrics::default());

// GET & POST SELECT 1 as a read-only user
for method in [Method::GET, Method::POST] {
let resp = q(&client, method, &uri, "SELECT 1", None).await;
Expand Down
10 changes: 10 additions & 0 deletions tests/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
// Single main.rs for all integration tests
// https://endler.dev/2020/rust-compile-times/#combine-all-integration-tests-in-a-single-binary

use rstest::fixture;
use seafowl::config::context::setup_metrics;
use seafowl::config::schema::Metrics;

mod clade;
mod cli;
mod flight;
mod http;
mod statements;

#[fixture]
#[once]
fn metrics_setup() {
setup_metrics(&Metrics::default())
}

0 comments on commit 168aa55

Please sign in to comment.