From e22aa819be107eed9069231444a5743e79941526 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 11 Feb 2025 11:22:10 -0800 Subject: [PATCH] feat: support server-side keep-alive for mysql and pg protocols (#5496) * feat: support server-side keep-alive for mysql and pg protocols Signed-off-by: Ruihang Xia * update config.md Signed-off-by: Ruihang Xia * update config to use humantime for keep-alive configuration Signed-off-by: Ruihang Xia * chore: Update socket2 dependency Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + config/config.md | 4 ++++ config/frontend.example.toml | 6 ++++++ config/standalone.example.toml | 6 ++++++ src/frontend/src/server.rs | 2 ++ src/frontend/src/service_config/mysql.rs | 7 +++++++ src/frontend/src/service_config/postgres.rs | 7 +++++++ src/servers/Cargo.toml | 1 + src/servers/src/mysql/server.rs | 9 ++++++++- src/servers/src/postgres/server.rs | 8 +++++++- src/servers/src/server.rs | 16 +++++++++++++++- src/servers/tests/mysql/mysql_server_test.rs | 1 + src/servers/tests/postgres/mod.rs | 1 + tests-integration/src/test_util.rs | 2 ++ tests-integration/tests/http.rs | 2 ++ 15 files changed, 70 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45d242094d5c..2920b50303a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10561,6 +10561,7 @@ dependencies = [ "session", "snafu 0.8.5", "snap", + "socket2", "sql", "store-api", "strum 0.25.0", diff --git a/config/config.md b/config/config.md index 0ad4ba95c1ae..5f348e0a6d28 100644 --- a/config/config.md +++ b/config/config.md @@ -40,6 +40,7 @@ | `mysql.enable` | Bool | `true` | Whether to enable. | | `mysql.addr` | String | `127.0.0.1:4002` | The addr to bind the MySQL server. | | `mysql.runtime_size` | Integer | `2` | The number of server worker threads. | +| `mysql.keep_alive` | String | `0s` | Server-side keep-alive time.
Set to 0 (default) to disable. | | `mysql.tls` | -- | -- | -- | | `mysql.tls.mode` | String | `disable` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html
- `disable` (default value)
- `prefer`
- `require`
- `verify-ca`
- `verify-full` | | `mysql.tls.cert_path` | String | Unset | Certificate file path. | @@ -49,6 +50,7 @@ | `postgres.enable` | Bool | `true` | Whether to enable | | `postgres.addr` | String | `127.0.0.1:4003` | The addr to bind the PostgresSQL server. | | `postgres.runtime_size` | Integer | `2` | The number of server worker threads. | +| `postgres.keep_alive` | String | `0s` | Server-side keep-alive time.
Set to 0 (default) to disable. | | `postgres.tls` | -- | -- | PostgresSQL server TLS options, see `mysql.tls` section. | | `postgres.tls.mode` | String | `disable` | TLS mode. | | `postgres.tls.cert_path` | String | Unset | Certificate file path. | @@ -234,6 +236,7 @@ | `mysql.enable` | Bool | `true` | Whether to enable. | | `mysql.addr` | String | `127.0.0.1:4002` | The addr to bind the MySQL server. | | `mysql.runtime_size` | Integer | `2` | The number of server worker threads. | +| `mysql.keep_alive` | String | `0s` | Server-side keep-alive time.
Set to 0 (default) to disable. | | `mysql.tls` | -- | -- | -- | | `mysql.tls.mode` | String | `disable` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html
- `disable` (default value)
- `prefer`
- `require`
- `verify-ca`
- `verify-full` | | `mysql.tls.cert_path` | String | Unset | Certificate file path. | @@ -243,6 +246,7 @@ | `postgres.enable` | Bool | `true` | Whether to enable | | `postgres.addr` | String | `127.0.0.1:4003` | The addr to bind the PostgresSQL server. | | `postgres.runtime_size` | Integer | `2` | The number of server worker threads. | +| `postgres.keep_alive` | String | `0s` | Server-side keep-alive time.
Set to 0 (default) to disable. | | `postgres.tls` | -- | -- | PostgresSQL server TLS options, see `mysql.tls` section. | | `postgres.tls.mode` | String | `disable` | TLS mode. | | `postgres.tls.cert_path` | String | Unset | Certificate file path. | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 741eaec500e4..addea0454a05 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -74,6 +74,9 @@ enable = true addr = "127.0.0.1:4002" ## The number of server worker threads. runtime_size = 2 +## Server-side keep-alive time. +## Set to 0 (default) to disable. +keep_alive = "0s" # MySQL server TLS options. [mysql.tls] @@ -105,6 +108,9 @@ enable = true addr = "127.0.0.1:4003" ## The number of server worker threads. runtime_size = 2 +## Server-side keep-alive time. +## Set to 0 (default) to disable. +keep_alive = "0s" ## PostgresSQL server TLS options, see `mysql.tls` section. [postgres.tls] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index d751f23a7104..005ff282f6e5 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -78,6 +78,9 @@ enable = true addr = "127.0.0.1:4002" ## The number of server worker threads. runtime_size = 2 +## Server-side keep-alive time. +## Set to 0 (default) to disable. +keep_alive = "0s" # MySQL server TLS options. [mysql.tls] @@ -109,6 +112,9 @@ enable = true addr = "127.0.0.1:4003" ## The number of server worker threads. runtime_size = 2 +## Server-side keep-alive time. +## Set to 0 (default) to disable. +keep_alive = "0s" ## PostgresSQL server TLS options, see `mysql.tls` section. [postgres.tls] diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 298c48c73c84..cb3284c9f860 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -227,6 +227,7 @@ where Arc::new(MysqlSpawnConfig::new( opts.tls.should_force_tls(), tls_server_config, + opts.keep_alive.as_secs(), opts.reject_no_database.unwrap_or(false), )), ); @@ -248,6 +249,7 @@ where ServerSqlQueryHandlerAdapter::arc(instance.clone()), opts.tls.should_force_tls(), tls_server_config, + opts.keep_alive.as_secs(), common_runtime::global_runtime(), user_provider.clone(), )) as Box; diff --git a/src/frontend/src/service_config/mysql.rs b/src/frontend/src/service_config/mysql.rs index 623cedd149b4..20753554fff2 100644 --- a/src/frontend/src/service_config/mysql.rs +++ b/src/frontend/src/service_config/mysql.rs @@ -23,6 +23,12 @@ pub struct MysqlOptions { #[serde(default = "Default::default")] pub tls: TlsOption, pub reject_no_database: Option, + /// Server-side keep-alive time. + /// + /// Set to 0 (default) to disable. + #[serde(default = "Default::default")] + #[serde(with = "humantime_serde")] + pub keep_alive: std::time::Duration, } impl Default for MysqlOptions { @@ -33,6 +39,7 @@ impl Default for MysqlOptions { runtime_size: 2, tls: TlsOption::default(), reject_no_database: None, + keep_alive: std::time::Duration::from_secs(0), } } } diff --git a/src/frontend/src/service_config/postgres.rs b/src/frontend/src/service_config/postgres.rs index e27f2b7bcb91..015c968386c5 100644 --- a/src/frontend/src/service_config/postgres.rs +++ b/src/frontend/src/service_config/postgres.rs @@ -22,6 +22,12 @@ pub struct PostgresOptions { pub runtime_size: usize, #[serde(default = "Default::default")] pub tls: TlsOption, + /// Server-side keep-alive time. + /// + /// Set to 0 (default) to disable. + #[serde(default = "Default::default")] + #[serde(with = "humantime_serde")] + pub keep_alive: std::time::Duration, } impl Default for PostgresOptions { @@ -31,6 +37,7 @@ impl Default for PostgresOptions { addr: "127.0.0.1:4003".to_string(), runtime_size: 2, tls: Default::default(), + keep_alive: std::time::Duration::from_secs(0), } } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 450a8529a08d..514c99f80ebf 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -76,6 +76,7 @@ notify.workspace = true object-pool = "0.5" once_cell.workspace = true openmetrics-parser = "0.4" +socket2 = "0.5" # use crates.io version after current revision is merged in next release # opensrv-mysql = "0.7.0" opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" } diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index dae01b3f1a41..12b9c689a1d2 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -72,6 +72,8 @@ pub struct MysqlSpawnConfig { // tls config force_tls: bool, tls: Arc, + // keep-alive config + keep_alive_secs: u64, // other shim config reject_no_database: bool, } @@ -80,11 +82,13 @@ impl MysqlSpawnConfig { pub fn new( force_tls: bool, tls: Arc, + keep_alive_secs: u64, reject_no_database: bool, ) -> MysqlSpawnConfig { MysqlSpawnConfig { force_tls, tls, + keep_alive_secs, reject_no_database, } } @@ -218,7 +222,10 @@ impl Server for MysqlServer { } async fn start(&self, listening: SocketAddr) -> Result { - let (stream, addr) = self.base_server.bind(listening).await?; + let (stream, addr) = self + .base_server + .bind(listening, self.spawn_config.keep_alive_secs) + .await?; let io_runtime = self.base_server.io_runtime(); let join_handle = common_runtime::spawn_global(self.accept(io_runtime, stream)); diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index 70f74a32ece3..4ffb3ce2c01c 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -35,6 +35,7 @@ pub struct PostgresServer { base_server: BaseTcpServer, make_handler: Arc, tls_server_config: Arc, + keep_alive_secs: u64, } impl PostgresServer { @@ -43,6 +44,7 @@ impl PostgresServer { query_handler: ServerSqlQueryHandlerRef, force_tls: bool, tls_server_config: Arc, + keep_alive_secs: u64, io_runtime: Runtime, user_provider: Option, ) -> PostgresServer { @@ -58,6 +60,7 @@ impl PostgresServer { base_server: BaseTcpServer::create_server("Postgres", io_runtime), make_handler, tls_server_config, + keep_alive_secs, } } @@ -116,7 +119,10 @@ impl Server for PostgresServer { } async fn start(&self, listening: SocketAddr) -> Result { - let (stream, addr) = self.base_server.bind(listening).await?; + let (stream, addr) = self + .base_server + .bind(listening, self.keep_alive_secs) + .await?; let io_runtime = self.base_server.io_runtime(); let join_handle = common_runtime::spawn_global(self.accept(io_runtime, stream)); diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 5d93c7e8af91..35a5c618594a 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -144,6 +144,7 @@ impl AcceptTask { &mut self, addr: SocketAddr, name: &str, + keep_alive_secs: u64, ) -> Result<(Abortable, SocketAddr)> { match self.abort_registration.take() { Some(registration) => { @@ -157,6 +158,15 @@ impl AcceptTask { let addr = listener.local_addr()?; info!("{name} server started at {addr}"); + // set keep-alive + if keep_alive_secs > 0 { + let socket_ref = socket2::SockRef::from(&listener); + let keep_alive = socket2::TcpKeepalive::new() + .with_time(std::time::Duration::from_secs(keep_alive_secs)) + .with_interval(std::time::Duration::from_secs(keep_alive_secs)); + socket_ref.set_tcp_keepalive(&keep_alive)?; + } + let stream = TcpListenerStream::new(listener); let stream = Abortable::new(stream, registration); Ok((stream, addr)) @@ -205,12 +215,16 @@ impl BaseTcpServer { task.shutdown(&self.name).await } + /// Bind the server to the given address and set the keep-alive time. + /// + /// If `keep_alive_secs` is 0, the keep-alive will not be set. pub(crate) async fn bind( &self, addr: SocketAddr, + keep_alive_secs: u64, ) -> Result<(Abortable, SocketAddr)> { let mut task = self.accept_task.lock().await; - task.bind(addr, &self.name).await + task.bind(addr, &self.name, keep_alive_secs).await } pub(crate) async fn start_with(&self, join_handle: JoinHandle<()>) -> Result<()> { diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index a9f7f8309aa8..cb800e4e4d01 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -71,6 +71,7 @@ fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 75ea27b0cd0a..41867295198f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -923,6 +923,7 @@ watch = false enable = true addr = "127.0.0.1:4002" runtime_size = 2 +keep_alive = "0s" [mysql.tls] mode = "disable" @@ -934,6 +935,7 @@ watch = false enable = true addr = "127.0.0.1:4003" runtime_size = 2 +keep_alive = "0s" [postgres.tls] mode = "disable"