From de0d32dd2c10415e9a88f9788935d4a166242068 Mon Sep 17 00:00:00 2001 From: Erin Power Date: Wed, 26 Feb 2025 10:45:22 +0100 Subject: [PATCH] chore: remove old CLI & Components --- docs/src/services/proxy/filters.md | 2 +- docs/src/services/proxy/filters/capture.md | 2 +- .../src/services/proxy/filters/concatenate.md | 2 +- docs/src/services/proxy/filters/debug.md | 2 +- docs/src/services/proxy/filters/firewall.md | 2 +- .../services/proxy/filters/load_balancer.md | 2 +- .../proxy/filters/local_rate_limit.md | 2 +- docs/src/services/proxy/filters/match.md | 2 +- .../services/proxy/filters/token_router.md | 4 +- docs/src/services/xds/providers/filesystem.md | 2 +- src/cli.rs | 81 +--------- src/cli/agent.rs | 108 ------------- src/cli/manage.rs | 70 --------- src/cli/proxy.rs | 147 ------------------ src/cli/relay.rs | 79 ---------- src/components.rs | 3 - src/components/agent.rs | 111 ------------- src/components/manage.rs | 93 ----------- src/components/proxy.rs | 2 +- src/components/proxy/sessions.rs | 2 +- src/components/relay.rs | 88 ----------- src/config.rs | 48 +++--- src/config/serialization.rs | 16 +- src/config/watch/fs.rs | 4 +- src/lib.rs | 5 +- tests/capture.rs | 2 +- tests/concatenate.rs | 2 +- tests/filter_order.rs | 2 +- tests/filters.rs | 8 +- tests/firewall.rs | 2 +- tests/health.rs | 2 +- tests/load_balancer.rs | 2 +- tests/local_rate_limit.rs | 2 +- tests/match.rs | 2 +- tests/metrics.rs | 4 +- tests/no_filter.rs | 2 +- tests/qcmp.rs | 95 ----------- tests/token_router.rs | 6 +- 38 files changed, 73 insertions(+), 937 deletions(-) delete mode 100644 src/cli/agent.rs delete mode 100644 src/cli/manage.rs delete mode 100644 src/cli/proxy.rs delete mode 100644 src/cli/relay.rs delete mode 100644 src/components/agent.rs delete mode 100644 src/components/manage.rs delete mode 100644 src/components/relay.rs delete mode 100644 tests/qcmp.rs diff --git a/docs/src/services/proxy/filters.md b/docs/src/services/proxy/filters.md index 300953bbb6..d6bf7bccd5 100644 --- a/docs/src/services/proxy/filters.md +++ b/docs/src/services/proxy/filters.md @@ -52,7 +52,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 2); +# assert_eq!(config.filters().unwrap().load().len(), 2); # } ``` diff --git a/docs/src/services/proxy/filters/capture.md b/docs/src/services/proxy/filters/capture.md index 19e989488b..85d85416df 100644 --- a/docs/src/services/proxy/filters/capture.md +++ b/docs/src/services/proxy/filters/capture.md @@ -45,7 +45,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` ## Configuration Options ([Rust Doc](../../../../api/quilkin/filters/capture/struct.Config.html)) diff --git a/docs/src/services/proxy/filters/concatenate.md b/docs/src/services/proxy/filters/concatenate.md index e036c6ef56..0014bbc098 100644 --- a/docs/src/services/proxy/filters/concatenate.md +++ b/docs/src/services/proxy/filters/concatenate.md @@ -23,7 +23,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` ## Configuration Options ([Rust Doc](../../../../api/quilkin/filters/concatenate/struct.Config.html)) diff --git a/docs/src/services/proxy/filters/debug.md b/docs/src/services/proxy/filters/debug.md index 8ae5b04f93..1857cce06f 100644 --- a/docs/src/services/proxy/filters/debug.md +++ b/docs/src/services/proxy/filters/debug.md @@ -22,7 +22,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` ## Configuration Options ([Rust Doc](../../../../api/quilkin/filters/debug/struct.Config.html)) diff --git a/docs/src/services/proxy/filters/firewall.md b/docs/src/services/proxy/filters/firewall.md index a7352b4281..8891cbe187 100644 --- a/docs/src/services/proxy/filters/firewall.md +++ b/docs/src/services/proxy/filters/firewall.md @@ -33,7 +33,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` ## Configuration Options ([Rust Doc](../../../../api/quilkin/filters/firewall/struct.Config.html)) diff --git a/docs/src/services/proxy/filters/load_balancer.md b/docs/src/services/proxy/filters/load_balancer.md index c40db0e834..7182cc8751 100644 --- a/docs/src/services/proxy/filters/load_balancer.md +++ b/docs/src/services/proxy/filters/load_balancer.md @@ -22,7 +22,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); # } ``` diff --git a/docs/src/services/proxy/filters/local_rate_limit.md b/docs/src/services/proxy/filters/local_rate_limit.md index 1827d9b052..8fa9df0740 100644 --- a/docs/src/services/proxy/filters/local_rate_limit.md +++ b/docs/src/services/proxy/filters/local_rate_limit.md @@ -26,7 +26,7 @@ clusters: - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); # } ``` To configure a rate limiter, we specify the maximum rate at which the proxy is allowed to forward packets. In the example above, we configured the proxy to forward a maximum of 1000 packets per second). diff --git a/docs/src/services/proxy/filters/match.md b/docs/src/services/proxy/filters/match.md index f81fc22f78..72b2811fe2 100644 --- a/docs/src/services/proxy/filters/match.md +++ b/docs/src/services/proxy/filters/match.md @@ -36,7 +36,7 @@ filters: name: quilkin.filters.drop.v1alpha1.Drop # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 2); +# assert_eq!(config.filters().unwrap().load().len(), 2); ``` diff --git a/docs/src/services/proxy/filters/token_router.md b/docs/src/services/proxy/filters/token_router.md index d92d26d9a8..45286038e4 100644 --- a/docs/src/services/proxy/filters/token_router.md +++ b/docs/src/services/proxy/filters/token_router.md @@ -34,7 +34,7 @@ clusters: - bmt1eTcweA== # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` View the [CaptureBytes](capture.md) filter documentation for more details. @@ -96,7 +96,7 @@ clusters: - bmt1eTcweA== # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 2); +# assert_eq!(config.filters().unwrap().load().len(), 2); ``` On the game client side the [Concatenate](concatenate.md) filter could also be used to add authentication diff --git a/docs/src/services/xds/providers/filesystem.md b/docs/src/services/xds/providers/filesystem.md index 5c061feed9..c5d417e2a3 100644 --- a/docs/src/services/xds/providers/filesystem.md +++ b/docs/src/services/xds/providers/filesystem.md @@ -39,7 +39,7 @@ clusters: - 'MXg3aWp5Ng==' # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); +# assert_eq!(config.filters().unwrap().load().len(), 1); ``` [configuration]: ../../../services/proxy/configuration.md diff --git a/src/cli.rs b/src/cli.rs index d1c34e15a6..cc3a11d7fa 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -19,39 +19,19 @@ use std::{path::PathBuf, sync::Arc}; use clap::builder::TypedValueParser; use clap::crate_version; -use crate::Config; use strum_macros::{Display, EnumString}; pub use self::{ - agent::Agent, generate_config_schema::GenerateConfigSchema, - manage::Manage, - proxy::Proxy, qcmp::Qcmp, - relay::Relay, service::{Finalizer, Service}, }; -macro_rules! define_port { - ($port:expr_2021) => { - pub const PORT: u16 = $port; - - pub fn default_port() -> u16 { - PORT - } - }; -} - -pub mod agent; pub mod generate_config_schema; -pub mod manage; -pub mod proxy; pub mod qcmp; -pub mod relay; -mod service; +pub(crate) mod service; const ETC_CONFIG_PATH: &str = "/etc/quilkin/quilkin.yaml"; -const PORT_ENV_VAR: &str = "QUILKIN_PORT"; #[derive(Debug, clap::Parser)] #[command(next_help_heading = "Administration Options")] @@ -186,13 +166,9 @@ impl LogFormats { /// The various Quilkin commands. #[derive(Clone, Debug, clap::Subcommand)] pub enum Commands { - Agent(Agent), GenerateConfigSchema(GenerateConfigSchema), - Manage(Manage), #[clap(subcommand)] Qcmp(Qcmp), - Proxy(Proxy), - Relay(Relay), } impl Cli { @@ -244,47 +220,8 @@ impl Cli { self.providers .spawn_providers(&config, ready.clone(), locality.clone()); - match self.command { - Some(Commands::Agent(agent)) => { - let old_ready = agent::Ready { - provider_is_healthy: ready.clone(), - relay_is_healthy: ready.clone(), - ..<_>::default() - }; - agent.run(locality, config, old_ready, shutdown_rx).await - } - - Some(Commands::Proxy(runner)) => { - let old_ready = proxy::Ready { - xds_is_healthy: parking_lot::RwLock::from(Some(ready.clone())).into(), - ..<_>::default() - }; - runner.run(config, old_ready, None, shutdown_rx).await - } - - Some(Commands::Manage(manager)) => { - let old_ready = agent::Ready { - provider_is_healthy: ready.clone(), - is_manage: true, - ..<_>::default() - }; - manager.run(locality, config, old_ready, shutdown_rx).await - } - - Some(Commands::Relay(relay)) => { - let old_ready = relay::Ready { - provider_is_healthy: ready.clone(), - ..<_>::default() - }; - - relay.run(locality, config, old_ready, shutdown_rx).await - } - None => { - self.service.spawn_services(&config, &shutdown_rx)?; - shutdown_rx.changed().await.map_err(From::from) - } - Some(_) => unreachable!(), - } + self.service.spawn_services(&config, &shutdown_rx)?; + shutdown_rx.changed().await.map_err(From::from) } /// Searches for the configuration file, and panics if not found. @@ -294,12 +231,7 @@ impl Cli { let file = loop { let Some(path) = paths.next() else { - let cfg = if matches!(self.command, Some(Commands::Agent(..))) { - Config::default_agent() - } else { - Config::default_non_agent() - }; - return Ok(Arc::new(cfg)); + return Ok(<_>::default()); }; match std::fs::File::open(path) { @@ -315,10 +247,7 @@ impl Cli { } }; - Ok(Arc::new(crate::Config::from_reader( - file, - matches!(self.command, Some(Commands::Agent(..))), - )?)) + Ok(Arc::new(crate::Config::from_reader(file)?)) } } diff --git a/src/cli/agent.rs b/src/cli/agent.rs deleted file mode 100644 index 60fa15bafb..0000000000 --- a/src/cli/agent.rs +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::sync::Arc; - -use crate::components::agent; -pub use agent::Ready; - -define_port!(7600); - -/// Runs Quilkin as a relay service that runs a Manager Discovery Service -/// (mDS) for accepting cluster and configuration information from xDS -/// management services, and exposing it as a single merged xDS service for -/// proxy services. -#[derive(clap::Args, Clone, Debug)] -pub struct Agent { - /// Port for QCMP service. - #[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)] - pub qcmp_port: u16, - /// One or more `quilkin relay` endpoints to push configuration changes to. - #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")] - pub relay: Vec, - /// The configuration source for a management server. - #[clap(subcommand)] - pub provider: Option, - /// If specified, filters the available gameserver addresses to the one that - /// matches the specified type - #[clap(long)] - pub address_type: Option, - /// If specified, additionally filters the gameserver address by its ip kind - #[clap(long, requires("address_type"), value_enum)] - pub ip_kind: Option, - /// The ICAO code for the agent. - #[clap(short, long, env, default_value_t = crate::config::IcaoCode::default())] - pub icao_code: crate::config::IcaoCode, -} - -impl clap::ValueEnum for crate::config::AddrKind { - fn value_variants<'a>() -> &'a [Self] { - &[Self::Ipv4, Self::Ipv6, Self::Any] - } - - fn to_possible_value(&self) -> Option { - use clap::builder::PossibleValue as pv; - Some(match self { - Self::Ipv4 => pv::new("v4"), - Self::Ipv6 => pv::new("v6"), - Self::Any => pv::new("any"), - }) - } -} - -impl Default for Agent { - fn default() -> Self { - Self { - qcmp_port: PORT, - relay: <_>::default(), - provider: <_>::default(), - icao_code: <_>::default(), - address_type: None, - ip_kind: None, - } - } -} - -impl Agent { - #[tracing::instrument(skip_all)] - pub async fn run( - self, - locality: Option, - config: Arc, - ready: Ready, - shutdown_rx: crate::signal::ShutdownRx, - ) -> crate::Result<()> { - let icao_code = Some(self.icao_code); - - agent::Agent { - locality, - port: self.qcmp_port, - icao_code, - relay_servers: self.relay, - provider: self.provider, - address_selector: self.address_type.map(|at| crate::config::AddressSelector { - name: at, - kind: self.ip_kind.unwrap_or(crate::config::AddrKind::Any), - }), - } - .run(crate::components::RunArgs { - config, - ready, - shutdown_rx, - }) - .await - } -} diff --git a/src/cli/manage.rs b/src/cli/manage.rs deleted file mode 100644 index 93854cd06c..0000000000 --- a/src/cli/manage.rs +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::components::manage; -pub use manage::Ready; - -define_port!(7800); - -/// Runs Quilkin as a xDS management server, using `provider` as -/// a configuration source. -#[derive(clap::Args, Clone, Debug)] -pub struct Manage { - /// One or more `quilkin relay` endpoints to push configuration changes to. - #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")] - pub relay: Vec, - /// The TCP port to listen to, to serve discovery responses. - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] - pub port: u16, - /// The configuration source for a management server. - #[clap(subcommand)] - pub provider: crate::config::Providers, - /// If specified, filters the available gameserver addresses to the one that - /// matches the specified type - #[clap(long)] - pub address_type: Option, - /// If specified, additionally filters the gameserver address by its ip kind - #[clap(long, requires("address_type"), value_enum, default_value_t=crate::config::AddrKind::Any)] - pub ip_kind: crate::config::AddrKind, -} - -impl Manage { - #[tracing::instrument(skip_all)] - pub async fn run( - self, - locality: Option, - config: std::sync::Arc, - ready: Ready, - shutdown_rx: crate::signal::ShutdownRx, - ) -> crate::Result<()> { - manage::Manage { - locality, - port: self.port, - provider: self.provider, - relay_servers: self.relay, - address_selector: self.address_type.map(|at| crate::config::AddressSelector { - name: at, - kind: self.ip_kind, - }), - } - .run(crate::components::RunArgs { - config, - ready, - shutdown_rx, - }) - .await - } -} diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs deleted file mode 100644 index 762e7d9623..0000000000 --- a/src/cli/proxy.rs +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::SocketAddr; -use tonic::transport::Endpoint; - -#[cfg(doc)] -use crate::filters::FilterFactory; - -use crate::signal::ShutdownRx; - -pub use crate::{cli::service::XdpOptions, components::proxy::Ready}; - -define_port!(7777); - -const QCMP_PORT: u16 = 7600; - -/// Run Quilkin as a UDP reverse proxy. -#[derive(clap::Args, Clone, Debug)] -pub struct Proxy { - /// One or more `quilkin manage` endpoints to listen to for config changes - #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))] - pub management_server: Vec, - /// The remote URL or local file path to retrieve the Maxmind database. - #[clap(long, env)] - pub mmdb: Option, - /// The port to listen on. - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] - pub port: u16, - /// The port to listen on. - #[clap(short, long, env = "QUILKIN_QCMP_PORT", default_value_t = QCMP_PORT)] - pub qcmp_port: u16, - /// One or more socket addresses to forward packets to. - #[clap(long, env = "QUILKIN_DEST")] - pub to: Vec, - /// Assigns dynamic tokens to each address in the `--to` argument - /// - /// Format is `:` - #[clap(long, env = "QUILKIN_DEST_TOKENS", requires("to"))] - pub to_tokens: Option, - /// The interval in seconds at which the relay will send a discovery request - /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] - pub idle_request_interval_secs: Option, - /// Number of worker threads used to process packets. - /// - /// If not specified defaults to number of cpus. Has no effect if XDP is used, - /// as the number of workers is always the same as the NIC queue size. - #[clap(short, long, env = "QUILKIN_WORKERS")] - pub workers: Option, - #[clap(flatten)] - pub xdp_opts: XdpOptions, - #[clap(long = "termination-timeout")] - pub termination_timeout: Option, -} - -impl Default for Proxy { - fn default() -> Self { - Self { - management_server: <_>::default(), - mmdb: <_>::default(), - port: PORT, - qcmp_port: QCMP_PORT, - to: <_>::default(), - to_tokens: None, - idle_request_interval_secs: None, - workers: None, - xdp_opts: Default::default(), - termination_timeout: None, - } - } -} - -impl Proxy { - /// Start and run a proxy. - #[tracing::instrument(skip_all)] - pub async fn run( - self, - config: std::sync::Arc, - ready: Ready, - initialized: Option>, - shutdown_rx: ShutdownRx, - ) -> crate::Result<()> { - tracing::info!(port = self.port, proxy_id = config.id(), "Starting proxy"); - - // The number of worker tasks to spawn. Each task gets a dedicated queue to - // consume packets off. - let num_workers = self.workers.unwrap_or_else(|| { - std::num::NonZeroUsize::new(num_cpus::get()) - .expect("num_cpus returned 0, which should be impossible") - }); - - let socket = crate::net::raw_socket_with_reuse(self.port)?; - let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; - let phoenix = crate::net::TcpListener::bind(Some(self.qcmp_port))?; - - let to_tokens = self - .to_tokens - .map(|tt| { - let Some((count, length)) = tt.split_once(':') else { - eyre::bail!("--to-tokens `{tt}` is invalid, it must have a `:` separator") - }; - - let count = count.parse()?; - let length = length.parse()?; - - Ok(crate::components::proxy::ToTokens { count, length }) - }) - .transpose()?; - - crate::components::proxy::Proxy { - management_servers: self.management_server, - mmdb: self.mmdb, - to: self.to, - to_tokens, - num_workers, - socket: Some(socket), - qcmp, - phoenix, - notifier: None, - xdp: self.xdp_opts, - termination_timeout: self.termination_timeout, - } - .run( - crate::components::RunArgs { - config, - ready, - shutdown_rx, - }, - initialized, - ) - .await - } -} diff --git a/src/cli/relay.rs b/src/cli/relay.rs deleted file mode 100644 index f07ef0cc9e..0000000000 --- a/src/cli/relay.rs +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::sync::Arc; - -use crate::{ - components::relay, - config::{Config, Providers}, -}; -pub use relay::Ready; - -pub const PORT: u16 = 7900; - -/// Runs Quilkin as a relay service that runs a Manager Discovery Service -/// (mDS) for accepting cluster and configuration information from xDS -/// management services, and exposing it as a single merged xDS service for -/// proxy services. -#[derive(clap::Args, Clone, Debug)] -pub struct Relay { - /// Port for mDS service. - #[clap(short, long, env = "QUILKIN_MDS_PORT", default_value_t = PORT)] - pub mds_port: u16, - /// Port for xDS management_server service - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)] - pub xds_port: u16, - /// The interval in seconds at which the relay will send a discovery request - /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] - pub idle_request_interval_secs: Option, - #[clap(subcommand)] - pub providers: Option, -} - -impl Default for Relay { - fn default() -> Self { - Self { - mds_port: PORT, - xds_port: super::manage::PORT, - idle_request_interval_secs: None, - providers: None, - } - } -} - -impl Relay { - pub async fn run( - self, - locality: Option, - config: Arc, - ready: Ready, - shutdown_rx: crate::signal::ShutdownRx, - ) -> crate::Result<()> { - relay::Relay { - xds_port: self.xds_port, - mds_port: self.mds_port, - locality, - provider: self.providers, - } - .run(crate::components::RunArgs { - config, - ready, - shutdown_rx, - }) - .await - } -} diff --git a/src/components.rs b/src/components.rs index 5609172c1d..0e56ccb2b2 100644 --- a/src/components.rs +++ b/src/components.rs @@ -15,10 +15,7 @@ */ pub mod admin; -pub mod agent; -pub mod manage; pub mod proxy; -pub mod relay; /// Args common across all components pub struct RunArgs { diff --git a/src/components/agent.rs b/src/components/agent.rs deleted file mode 100644 index a8bba4e723..0000000000 --- a/src/components/agent.rs +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::RunArgs; -use crate::config::{IcaoCode, Providers}; -pub use crate::net::{DualStackLocalSocket, endpoint::Locality}; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; - -#[derive(Clone, Debug, Default)] -pub struct Ready { - pub provider_is_healthy: Arc, - pub relay_is_healthy: Arc, - /// If true, only care about the provider being healthy, not the relay - pub is_manage: bool, -} - -impl Ready { - #[inline] - pub fn is_ready(&self) -> bool { - self.provider_is_healthy.load(Ordering::SeqCst) - && (self.is_manage || self.relay_is_healthy.load(Ordering::SeqCst)) - } -} - -pub struct Agent { - pub locality: Option, - pub port: u16, - pub icao_code: Option, - pub relay_servers: Vec, - pub provider: Option, - pub address_selector: Option, -} - -impl Agent { - #[tracing::instrument(skip_all)] - pub async fn run( - self, - RunArgs { - config, - ready, - mut shutdown_rx, - }: RunArgs, - ) -> crate::Result<()> { - if let Some(agent) = config.dyn_cfg.agent() { - agent.qcmp_port.store(self.port.into()); - agent - .icao_code - .store(self.icao_code.unwrap_or_default().into()); - } else { - eyre::bail!("agent configuration missing"); - } - - let _mds_task = if !self.relay_servers.is_empty() { - let Some(provider) = self.provider else { - return Err(eyre::eyre!("no configuration provider given")); - }; - - let _provider_task = match provider { - Providers::Agones { - gameservers_namespace, - .. - } => crate::config::providersv2::Providers::default() - .agones() - .agones_namespace(gameservers_namespace), - - Providers::File { path } => crate::config::providersv2::Providers::default() - .fs() - .fs_path(path), - } - .spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality); - - let task = crate::net::xds::client::MdsClient::connect(config.id(), self.relay_servers); - - tokio::select! { - result = task => { - let client = result?; - - // Attempt to connect to a delta stream if the relay has one - // available, otherwise fallback to the regular aggregated stream - Some(client.delta_stream(config.clone(), ready.relay_is_healthy.clone()).await.map_err(|_| eyre::eyre!("failed to acquire delta stream"))?) - } - _ = shutdown_rx.changed() => return Ok(()), - } - } else { - tracing::info!("no relay servers given"); - None - }; - - crate::cli::Service::default() - .qcmp() - .qcmp_port(self.port) - .spawn_services(&config, &shutdown_rx)?; - shutdown_rx.changed().await.map_err(From::from) - } -} diff --git a/src/components/manage.rs b/src/components/manage.rs deleted file mode 100644 index 14fd40ae53..0000000000 --- a/src/components/manage.rs +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::RunArgs; -pub use super::agent::Ready; -pub use crate::{ - config::Providers, - net::{DualStackLocalSocket, endpoint::Locality}, -}; - -pub struct Manage { - pub locality: Option, - pub relay_servers: Vec, - pub provider: Providers, - pub port: u16, - pub address_selector: Option, -} - -impl Manage { - #[tracing::instrument(skip_all)] - pub async fn run( - self, - RunArgs { - config, - ready, - mut shutdown_rx, - }: RunArgs, - ) -> crate::Result<()> { - let Some(clusters) = config.dyn_cfg.clusters() else { - eyre::bail!("clusters were not configured, this is a configuration issue"); - }; - - if let Some(locality) = &self.locality { - clusters.modify(|map| map.update_unlocated_endpoints(locality.clone())); - } - - let provider_task = match self.provider { - Providers::Agones { - config_namespace, - gameservers_namespace, - } => crate::config::providersv2::Providers::default() - .k8s() - .k8s_namespace(config_namespace.unwrap_or_default()) - .agones() - .agones_namespace(gameservers_namespace), - Providers::File { path } => crate::config::providersv2::Providers::default() - .fs() - .fs_path(path), - } - .spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality); - - let _relay_stream = if !self.relay_servers.is_empty() { - tracing::info!("connecting to relay server"); - let client = - crate::net::xds::client::MdsClient::connect(config.id(), self.relay_servers) - .await?; - - // Attempt to connect to a delta stream if the relay has one - // available, otherwise fallback to the regular aggregated stream - Some( - client - .delta_stream(config.clone(), ready.relay_is_healthy.clone()) - .await - .map_err(|_| eyre::eyre!("failed to acquire delta stream"))?, - ) - } else { - None - }; - - crate::cli::Service::default() - .xds() - .xds_port(self.port) - .spawn_services(&config, &shutdown_rx)?; - - tokio::select! { - result = provider_task => result?, - result = shutdown_rx.changed() => result.map_err(From::from), - } - } -} diff --git a/src/components/proxy.rs b/src/components/proxy.rs index f426d157c0..d852d45faf 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -72,7 +72,7 @@ pub struct Proxy { pub qcmp: socket2::Socket, pub phoenix: crate::net::TcpListener, pub notifier: Option>, - pub xdp: crate::cli::proxy::XdpOptions, + pub xdp: crate::cli::service::XdpOptions, pub termination_timeout: Option, } diff --git a/src/components/proxy/sessions.rs b/src/components/proxy/sessions.rs index 2541cdb59d..643349cae4 100644 --- a/src/components/proxy/sessions.rs +++ b/src/components/proxy/sessions.rs @@ -581,7 +581,7 @@ mod tests { let (pending_sends, _srecv) = crate::net::queue(1).unwrap(); ( SessionPool::new( - Arc::new(Config::default_agent()), + Arc::new(Config::default()), vec![pending_sends.clone()], Arc::new(BufferPool::default()), ), diff --git a/src/components/relay.rs b/src/components/relay.rs deleted file mode 100644 index 537b0a5fd9..0000000000 --- a/src/components/relay.rs +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::RunArgs; -use crate::config::Providers; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; - -#[derive(Clone, Debug)] -pub struct Ready { - pub idle_request_interval: std::time::Duration, - pub provider_is_healthy: Arc, -} - -impl Default for Ready { - fn default() -> Self { - Self { - idle_request_interval: crate::components::admin::IDLE_REQUEST_INTERVAL, - provider_is_healthy: Default::default(), - } - } -} - -impl Ready { - #[inline] - pub fn is_ready(&self) -> bool { - self.provider_is_healthy.load(Ordering::SeqCst) - } -} - -pub struct Relay { - pub xds_port: u16, - pub mds_port: u16, - pub locality: Option, - pub provider: Option, -} - -impl Relay { - #[tracing::instrument(skip_all)] - pub async fn run( - self, - RunArgs { - config, - ready, - mut shutdown_rx, - }: RunArgs, - ) -> crate::Result<()> { - let _provider_task = match self.provider { - Some(Providers::Agones { - config_namespace, .. - }) => crate::config::providersv2::Providers::default() - .k8s() - .k8s_namespace(config_namespace.unwrap_or_default()) - .spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality), - - Some(Providers::File { path }) => crate::config::providersv2::Providers::default() - .fs() - .fs_path(path) - .spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality), - - None => tokio::spawn(std::future::pending()), - }; - - crate::cli::Service::default() - .xds() - .xds_port(self.xds_port) - .mds() - .mds_port(self.mds_port) - .spawn_services(&config, &shutdown_rx)?; - - shutdown_rx.changed().await.map_err(From::from) - } -} diff --git a/src/config.rs b/src/config.rs index 5de6c4a352..900c2caf09 100644 --- a/src/config.rs +++ b/src/config.rs @@ -60,6 +60,13 @@ pub struct Config { pub dyn_cfg: DynamicConfig, } +impl std::ops::Deref for Config { + type Target = DynamicConfig; + fn deref(&self) -> &Self::Target { + &self.dyn_cfg + } +} + #[cfg(test)] impl<'de> Deserialize<'de> for Config { fn deserialize(deserializer: D) -> Result @@ -851,25 +858,18 @@ impl Config { crate::metrics::apply_clusters(clusters); } - pub fn default_agent() -> Self { - let mut typemap = default_typemap(); - insert_default::(&mut typemap); - insert_default::(&mut typemap); - insert_default::(&mut typemap); - - Self { - dyn_cfg: DynamicConfig { - id: default_id(), - version: Version::default(), - typemap, - }, - } + #[inline] + pub fn id(&self) -> String { + String::clone(&self.dyn_cfg.id.load()) } +} - pub fn default_non_agent() -> Self { +impl Default for Config { + fn default() -> Self { let mut typemap = default_typemap(); insert_default::(&mut typemap); insert_default::(&mut typemap); + insert_default::(&mut typemap); insert_default::(&mut typemap); Self { @@ -880,11 +880,6 @@ impl Config { }, } } - - #[inline] - pub fn id(&self) -> String { - String::clone(&self.dyn_cfg.id.load()) - } } #[derive(Default, Debug, Deserialize, Serialize)] @@ -1279,3 +1274,18 @@ pub enum AddrKind { Ipv6, Any, } + +impl clap::ValueEnum for AddrKind { + fn value_variants<'a>() -> &'a [Self] { + &[Self::Ipv4, Self::Ipv6, Self::Any] + } + + fn to_possible_value(&self) -> Option { + use clap::builder::PossibleValue as pv; + Some(match self { + Self::Ipv4 => pv::new("v4"), + Self::Ipv6 => pv::new("v6"), + Self::Any => pv::new("any"), + }) + } +} diff --git a/src/config/serialization.rs b/src/config/serialization.rs index 50856b37a1..59b3a6ddb1 100644 --- a/src/config/serialization.rs +++ b/src/config/serialization.rs @@ -4,7 +4,7 @@ use super::*; impl Config { /// Attempts to deserialize `input` as a YAML object representing `Self`. - pub fn from_reader(input: R, is_agent: bool) -> Result { + pub fn from_reader(input: R) -> Result { #[derive(Deserialize)] #[serde(deny_unknown_fields)] struct AllConfig { @@ -19,12 +19,6 @@ impl Config { let cfg: AllConfig = serde_yaml::from_reader(input)?; - if cfg.datacenters.is_some() && (cfg.icao_code.is_some() || cfg.qcmp_port.is_some()) { - eyre::bail!("`datacenters` and `icao_code` or `qcmp_port` were present"); - } else if is_agent && cfg.datacenters.is_some() { - eyre::bail!("`datacenters` was set even though this is an agent"); - } - let mut typemap = default_typemap(); if let Some(filters) = cfg.filters { typemap.insert::(Slot::new(filters)); @@ -130,7 +124,7 @@ mod tests { #[test] fn deserialise_client() { - let config = Config::default_non_agent(); + let config = Config::default(); config.dyn_cfg.clusters().unwrap().modify(|clusters| { clusters.insert_default([Endpoint::new("127.0.0.1:25999".parse().unwrap())].into()) }); @@ -140,7 +134,7 @@ mod tests { #[test] fn deserialise_server() { - let config = Config::default_non_agent(); + let config = Config::default(); config.dyn_cfg.clusters().unwrap().modify(|clusters| { clusters.insert_default( [ @@ -241,7 +235,7 @@ mod tests { ], }] })) - .unwrap_or_else(|_| Config::default_agent()); + .unwrap_or_else(|_| Config::default()); let value = config.dyn_cfg.clusters().unwrap().read(); assert_eq!( @@ -325,7 +319,7 @@ dynamic: ]; for config in configs { - let result = Config::from_reader(config.as_bytes(), false); + let result = Config::from_reader(config.as_bytes()); let error = result.unwrap_err(); println!("here: {}", error); assert!(format!("{error:?}").contains("unknown field")); diff --git a/src/config/watch/fs.rs b/src/config/watch/fs.rs index 43ca10714a..52255e4877 100644 --- a/src/config/watch/fs.rs +++ b/src/config/watch/fs.rs @@ -95,8 +95,8 @@ mod tests { #[tokio::test] async fn basic() { - let source = Arc::new(crate::Config::default_non_agent()); - let dest = Arc::new(crate::Config::default_non_agent()); + let source = Arc::new(crate::Config::default()); + let dest = Arc::new(crate::Config::default()); let tmp_dir = tempfile::tempdir().unwrap(); let file_path = tmp_dir.into_path().join("config.yaml"); tokio::fs::write(&file_path, serde_yaml::to_string(&source).unwrap()) diff --git a/src/lib.rs b/src/lib.rs index bdbf486a9e..b9a54c3299 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,10 +41,7 @@ pub use quilkin_proto as generated; pub type Result = std::result::Result; #[doc(inline)] -pub use self::{ - cli::{Cli, Proxy}, - config::Config, -}; +pub use self::{cli::Cli, config::Config}; pub use quilkin_macros::include_proto; diff --git a/tests/capture.rs b/tests/capture.rs index 8674a8fcd3..a79716a75a 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -34,7 +34,7 @@ async fn token_router() { let mut echo = t.run_echo_server(AddressType::Random).await; quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.dyn_cfg.filters().unwrap().store( quilkin::filters::FilterChain::try_create([ Filter { diff --git a/tests/concatenate.rs b/tests/concatenate.rs index 72f656e619..dfa9336a5d 100644 --- a/tests/concatenate.rs +++ b/tests/concatenate.rs @@ -35,7 +35,7 @@ bytes: YWJj #abc "; let echo = t.run_echo_server(AddressType::Random).await; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/filter_order.rs b/tests/filter_order.rs index d3f9de428b..5e859792ee 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -52,7 +52,7 @@ async fn multiple_mutations() { .await; quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/filters.rs b/tests/filters.rs index 0058d21e92..7a2cf21c8f 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -37,7 +37,7 @@ async fn test_filter() { let echo = t.run_echo_server(AddressType::Random).await; // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.dyn_cfg.filters().unwrap().store( quilkin::filters::FilterChain::try_create([Filter { name: "TestFilter".to_string(), @@ -57,7 +57,7 @@ async fn test_filter() { let server_port = t.run_server(server_config, None, None).await; // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config .dyn_cfg .clusters() @@ -125,7 +125,7 @@ async fn debug_filter() { tracing::trace!(%echo, "running echo server"); // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() @@ -144,7 +144,7 @@ async fn debug_filter() { let server_port = t.run_server(server_config, None, None).await; // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config .dyn_cfg .clusters() diff --git a/tests/firewall.rs b/tests/firewall.rs index 24134bfc3b..c9c2355832 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -197,7 +197,7 @@ async fn test(t: &mut TestHelper, yaml: &str, address_type: AddressType) -> mpsc .replace("%2", echo.port().to_string().as_str()); tracing::info!(config = yaml.as_str(), "Config"); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.dyn_cfg.filters().unwrap().store( quilkin::filters::FilterChain::try_create([Filter { name: Firewall::factory().name().into(), diff --git a/tests/health.rs b/tests/health.rs index 66ec3dfdc7..87fa3bd35c 100644 --- a/tests/health.rs +++ b/tests/health.rs @@ -26,7 +26,7 @@ async fn health_server() { let mut t = TestHelper::default(); // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index 7c8e7f66fa..320e6c989c 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -47,7 +47,7 @@ policy: ROUND_ROBIN ); } - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index 27600f69ec..95a251aa0a 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -36,7 +36,7 @@ period: 1 "; let echo = t.run_echo_server(AddressType::Random).await; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/match.rs b/tests/match.rs index 6c44ee6df3..fb2184f05d 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -58,7 +58,7 @@ on_read: bytes: YWJj # abc "; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/metrics.rs b/tests/metrics.rs index fbea9f7a6a..c6c0661745 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -33,7 +33,7 @@ async fn metrics_server() { .port(); // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() @@ -48,7 +48,7 @@ async fn metrics_server() { .await; // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config .dyn_cfg .clusters() diff --git a/tests/no_filter.rs b/tests/no_filter.rs index 665d0058fc..cfe20b2edb 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -32,7 +32,7 @@ async fn echo() { let server2 = t.run_echo_server(AddressType::Random).await; // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters() diff --git a/tests/qcmp.rs b/tests/qcmp.rs deleted file mode 100644 index 223b2ebfc2..0000000000 --- a/tests/qcmp.rs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -use tokio::time::Duration; - -use quilkin::{ - codec::qcmp::Protocol, - test::{AddressType, TestHelper}, -}; - -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn proxy_ping() { - let mut t = TestHelper::default(); - let qcmp = quilkin::net::raw_socket_with_reuse(0).unwrap(); - let qcmp_port = quilkin::net::socket_port(&qcmp); - let server_proxy = quilkin::components::proxy::Proxy { - qcmp, - to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()], - ..<_>::default() - }; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - t.run_server(server_config, Some(server_proxy), None).await; - ping(qcmp_port).await; -} - -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn agent_ping() { - let qcmp_port = quilkin::test::available_addr(AddressType::Random) - .await - .port(); - let agent = quilkin::cli::Agent { - qcmp_port, - ..<_>::default() - }; - let server_config = std::sync::Arc::new(quilkin::Config::default_agent()); - let (_tx, rx) = quilkin::signal::channel(quilkin::signal::ShutdownKind::Testing); - tokio::spawn(async move { - agent - .run(None, server_config, Default::default(), rx) - .await - .expect("Agent should run") - }); - ping(qcmp_port).await; -} - -async fn ping(port: u16) { - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - let socket = tokio::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)) - .await - .unwrap(); - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let ping = Protocol::ping(); - - let mut ping_packet = quilkin::codec::qcmp::QcmpPacket::default(); - socket - .send_to(ping.encode(&mut ping_packet), &local_addr) - .await - .unwrap(); - let mut buf = [0; quilkin::codec::qcmp::MAX_QCMP_PACKET_LEN]; - let (size, _) = tokio::time::timeout(Duration::from_secs(1), socket.recv_from(&mut buf)) - .await - .unwrap() - .unwrap(); - let recv_time = quilkin::time::UtcTimestamp::now(); - let reply = Protocol::parse(&buf[..size]).unwrap().unwrap(); - - assert_eq!(ping.nonce(), reply.nonce()); - const MAX: std::time::Duration = std::time::Duration::from_millis(50); - - // If it takes longer than 50 milliseconds locally, it's likely that there - // is bug. - let delay = reply.round_trip_delay(recv_time).unwrap(); - assert!( - MAX > delay.duration(), - "Delay {:?} greater than {MAX:?}", - delay.duration(), - ); -} diff --git a/tests/token_router.rs b/tests/token_router.rs index a49c89cc14..f8d17c58ff 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -89,7 +89,7 @@ async fn multiple_clients() { let mut success = 0; let mut failed = 0; for _ in 0..limit { - match timeout(Duration::from_millis(60), a_rx.recv()).await { + match timeout(Duration::from_millis(20), a_rx.recv()).await { Ok(packet) => { assert_eq!("A", packet.unwrap()); success += 1; @@ -98,7 +98,7 @@ async fn multiple_clients() { failed += 1; } } - match timeout(Duration::from_millis(60), b_rx.recv()).await { + match timeout(Duration::from_millis(20), b_rx.recv()).await { Ok(packet) => { assert_eq!("B", packet.unwrap()); success += 1; @@ -135,7 +135,7 @@ quilkin.dev: - YWJj # abc "; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .dyn_cfg .clusters()