diff --git a/airup-sdk/src/blocking/mod.rs b/airup-sdk/src/blocking/mod.rs index 87498ca..b6195dc 100644 --- a/airup-sdk/src/blocking/mod.rs +++ b/airup-sdk/src/blocking/mod.rs @@ -51,6 +51,10 @@ impl Connection { .recv::()? .into_result()) } + + pub fn into_inner(self) -> rpc::Connection { + self.underlying + } } impl Deref for Connection { type Target = rpc::Connection; diff --git a/airup-sdk/src/blocking/rpc.rs b/airup-sdk/src/blocking/rpc.rs index 80f0e62..5b2bf26 100644 --- a/airup-sdk/src/blocking/rpc.rs +++ b/airup-sdk/src/blocking/rpc.rs @@ -47,6 +47,11 @@ impl Connection { ciborium::into_writer(obj, &mut buffer)?; self.0.send(&buffer) } + + /// Returns the underlying message protocol. + pub fn into_inner(self) -> MessageProto { + self.0 + } } impl Deref for Connection { type Target = MessageProto; diff --git a/airup-sdk/src/nonblocking/mod.rs b/airup-sdk/src/nonblocking/mod.rs index 7e86259..794d7f7 100644 --- a/airup-sdk/src/nonblocking/mod.rs +++ b/airup-sdk/src/nonblocking/mod.rs @@ -54,6 +54,10 @@ impl Connection { .await? .into_result()) } + + pub fn into_inner(self) -> rpc::Connection { + self.underlying + } } impl Deref for Connection { type Target = rpc::Connection; diff --git a/airup-sdk/src/nonblocking/rpc.rs b/airup-sdk/src/nonblocking/rpc.rs index 17144bb..4e578f2 100644 --- a/airup-sdk/src/nonblocking/rpc.rs +++ b/airup-sdk/src/nonblocking/rpc.rs @@ -51,6 +51,11 @@ impl Connection { ciborium::into_writer(obj, &mut buffer)?; self.0.send(&buffer.into_inner()).await } + + /// Returns the underlying message protocol. + pub fn into_inner(self) -> MessageProto { + self.0 + } } impl Deref for Connection { type Target = MessageProto; diff --git a/airup-sdk/src/rpc.rs b/airup-sdk/src/rpc.rs index dd65ec0..737f53d 100644 --- a/airup-sdk/src/rpc.rs +++ b/airup-sdk/src/rpc.rs @@ -122,6 +122,11 @@ impl MessageProto { pub fn size_limit(&self) -> usize { self.size_limit } + + /// Gets the inner stream. + pub fn into_inner(self) -> T { + self.inner + } } impl AsRef for MessageProto { fn as_ref(&self) -> &T { diff --git a/airup-sdk/src/system.rs b/airup-sdk/src/system.rs index be73e37..cf8ce46 100644 --- a/airup-sdk/src/system.rs +++ b/airup-sdk/src/system.rs @@ -187,11 +187,6 @@ pub trait ConnectionExt<'a>: crate::Connection { self.invoke("system.trigger_event", event) } - /// Loads an extension. - fn register_extension(&'a mut self, name: &'a str, path: &'a str) -> Self::Invoke<'a, ()> { - self.invoke("system.register_extension", (name, path)) - } - /// Unloads an extension. fn unregister_extension(&'a mut self, name: &'a str) -> Self::Invoke<'a, ()> { self.invoke("system.unregister_extension", name) diff --git a/airupd/src/extension.rs b/airupd/src/extension.rs index bd69265..c43c156 100644 --- a/airupd/src/extension.rs +++ b/airupd/src/extension.rs @@ -23,7 +23,7 @@ impl Extensions { } /// Registers an extension. - pub async fn register(&self, name: String, path: &str) -> Result<(), airup_sdk::Error> { + pub async fn register(&self, name: String, conn: UnixStream) -> Result<(), airup_sdk::Error> { let mut lock = self.0.write().await; if lock.contains_key(&name) { return Err(airup_sdk::Error::Exists); @@ -31,7 +31,7 @@ impl Extensions { lock.insert( name.clone(), Arc::new( - Extension::new(name, path) + Extension::new(name, conn) .await .map_err(|x| airup_sdk::Error::Io { message: x.to_string(), @@ -78,9 +78,8 @@ struct Extension { } impl Extension { /// Creates a new [`Extension`] instance, hosting the extension. - async fn new(name: String, path: &str) -> std::io::Result { + async fn new(name: String, connection: UnixStream) -> std::io::Result { let (tx, rx) = mpsc::channel(8); - let connection = UnixStream::connect(path).await?; ExtensionHost { name, connection, diff --git a/airupd/src/rpc/api/mod.rs b/airupd/src/rpc/api/mod.rs index 3de7659..7b4cbdd 100644 --- a/airupd/src/rpc/api/mod.rs +++ b/airupd/src/rpc/api/mod.rs @@ -2,6 +2,7 @@ mod debug; mod info; +pub mod session; mod system; use airup_sdk::{ diff --git a/airupd/src/rpc/api/session.rs b/airupd/src/rpc/api/session.rs new file mode 100644 index 0000000..504e46b --- /dev/null +++ b/airupd/src/rpc/api/session.rs @@ -0,0 +1,29 @@ +//! Session management APIs. + +use crate::app::airupd; +use airup_sdk::rpc::{Request, Response}; + +async fn send_error( + session: &mut crate::rpc::Session, + error: airup_sdk::Error, +) -> anyhow::Result<()> { + session + .conn + .send(&Response::new(Err::<(), _>(error))) + .await?; + Ok(()) +} + +pub async fn invoke(mut session: crate::rpc::Session, req: Request) { + _ = match &req.method[..] { + "session.into_extension" => into_extension(session, req).await, + _ => send_error(&mut session, airup_sdk::Error::NotImplemented).await, + }; +} + +async fn into_extension(session: crate::rpc::Session, req: Request) -> anyhow::Result<()> { + let name: String = req.extract_params()?; + let conn = session.conn.into_inner().into_inner(); + airupd().extensions.register(name, conn).await?; + Ok(()) +} diff --git a/airupd/src/rpc/api/system.rs b/airupd/src/rpc/api/system.rs index 81ab228..425364e 100644 --- a/airupd/src/rpc/api/system.rs +++ b/airupd/src/rpc/api/system.rs @@ -29,7 +29,6 @@ pub(super) fn init(methods: &mut HashMap<&'static str, Method, H enter_milestone, set_instance_name, trigger_event, - register_extension, unregister_extension, ] ) @@ -137,11 +136,6 @@ async fn trigger_event(event: Event) -> Result<(), Error> { Ok(()) } -#[airupfx::macros::api] -async fn register_extension(name: String, path: String) -> Result<(), Error> { - airupd().extensions.register(name, &path).await -} - #[airupfx::macros::api] async fn unregister_extension(name: String) -> Result<(), Error> { airupd().extensions.unregister(&name).await diff --git a/airupd/src/rpc/mod.rs b/airupd/src/rpc/mod.rs index 3047976..340b870 100644 --- a/airupd/src/rpc/mod.rs +++ b/airupd/src/rpc/mod.rs @@ -99,19 +99,20 @@ impl Session { } /// Starts the session task. - fn start(mut self) { + fn start(self) { tokio::spawn(async move { - if let Err(err) = self.run().await { - tracing::debug!("{} disconnected: {}", self.audit_name().await, err); - } + _ = self.run().await; }); } /// Runs the session in place. - async fn run(&mut self) -> anyhow::Result<()> { - tracing::debug!("{} established", self.audit_name().await); + async fn run(mut self) -> anyhow::Result<()> { loop { let req = self.conn.recv_req().await?; + if req.method.strip_prefix("session.").is_some() { + api::session::invoke(self, req).await; + return Ok(()); + } let resp = match req.method.strip_prefix("extapi.") { Some(method) => { airupd() @@ -124,23 +125,4 @@ impl Session { self.conn.send(&resp).await?; } } - - /// Returns audit-style name of the IPC session. - async fn audit_name(&self) -> String { - let cred = self.conn.as_ref().peer_cred().ok(); - let uid = cred - .as_ref() - .map(|x| x.uid().to_string()) - .unwrap_or_else(|| "?".into()); - let gid = cred - .as_ref() - .map(|x| x.gid().to_string()) - .unwrap_or_else(|| "?".into()); - let pid = cred - .as_ref() - .and_then(|x| x.pid()) - .map(|x| x.to_string()) - .unwrap_or_else(|| "?".into()); - format!("ipc_session(uid={}, gid={}, pid={})", uid, gid, pid) - } } diff --git a/airupfx/airupfx-extensions/src/lib.rs b/airupfx/airupfx-extensions/src/lib.rs index 16d239d..c6fad20 100644 --- a/airupfx/airupfx-extensions/src/lib.rs +++ b/airupfx/airupfx-extensions/src/lib.rs @@ -4,22 +4,17 @@ use airup_sdk::{ info::ConnectionExt, nonblocking::rpc::{MessageProtoRecvExt, MessageProtoSendExt}, - rpc::MessageProto, + rpc::{MessageProto, Request}, system::{ConnectionExt as _, Event}, }; use airupfx_signal::SIGTERM; use ciborium::cbor; -use std::{collections::HashMap, future::Future, path::Path, pin::Pin, sync::Arc}; -use tokio::net::{ - unix::{OwnedReadHalf, OwnedWriteHalf}, - UnixListener, -}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; +use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf}; /// An extension server. #[derive(Debug)] pub struct Server { - listener: UnixListener, - path: String, extension_name: String, service_name: String, rpc_methods: HashMap<&'static str, Method>, @@ -39,20 +34,14 @@ impl Server { .to_string(); _ = std::fs::remove_file(&extension_socket_path); - Ok(Self::with_config(extension_name, service_name, extension_socket_path).await?) + Ok(Self::with_config(extension_name, service_name).await?) } - pub async fn with_config>( + pub async fn with_config( extension_name: impl Into, service_name: impl Into, - path: P, ) -> std::io::Result { - let listener = UnixListener::bind(path.as_ref())?; - airupfx_fs::set_permission(path.as_ref(), airupfx_fs::Permission::Socket).await?; - Ok(Self { - listener, - path: path.as_ref().display().to_string(), extension_name: extension_name.into(), service_name: service_name.into(), rpc_methods: HashMap::with_capacity(16), @@ -66,35 +55,21 @@ impl Server { } /// Runs the extension server. - pub async fn run(self) -> ! { + pub async fn run(self) -> anyhow::Result<()> { let rpc_methods = Arc::new(self.rpc_methods); let extension_name = self.extension_name.clone(); - tokio::spawn(async move { - let mut airup_rpc_conn = - airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path()).await?; - - match airup_rpc_conn - .register_extension(&extension_name, &self.path) - .await - { - Ok(Ok(())) => (), - Ok(Err(err)) => { - eprintln!("error: api failure: {err}"); - std::process::exit(1); - } - Err(err) => { - eprintln!("error: rpc failure: {err}"); - std::process::exit(1); - } - } - - _ = airup_rpc_conn - .trigger_event(&Event::new("notify_active".into(), self.service_name)) - .await; + let mut extension_conn = + airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path()).await?; - Ok::<(), anyhow::Error>(()) - }); + extension_conn + .send(&Request::new("session.into_extension", extension_name)) + .await?; + + _ = airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path()) + .await? + .trigger_event(&Event::new("notify_active".into(), self.service_name)) + .await; let extension_name = self.extension_name.clone(); _ = airupfx_signal::signal(SIGTERM, |_| async move { @@ -109,19 +84,16 @@ impl Server { std::process::exit(0); }); - loop { - let Ok((stream, _)) = self.listener.accept().await else { - continue; - }; - let (rx, tx) = stream.into_split(); - - Session { - rx: MessageProto::new(rx, 6 * 1024 * 1024), - tx: Arc::new(MessageProto::new(tx, 6 * 1024 * 1024).into()), - rpc_methods: rpc_methods.clone(), - } - .run_on_the_fly(); + let stream = extension_conn.into_inner().into_inner().into_inner(); + let (rx, tx) = stream.into_split(); + + Session { + rx: MessageProto::new(rx, 6 * 1024 * 1024), + tx: Arc::new(MessageProto::new(tx, 6 * 1024 * 1024).into()), + rpc_methods: rpc_methods.clone(), } + .run() + .await } } @@ -132,10 +104,6 @@ struct Session { rpc_methods: Arc>, } impl Session { - pub fn run_on_the_fly(self) { - tokio::spawn(self.run()); - } - pub async fn run(mut self) -> anyhow::Result<()> { let mut buf = Vec::with_capacity(4096); loop { diff --git a/docs/en-US/api_manual/rpc/system.md b/docs/en-US/api_manual/rpc/system.md index 9aa4f9e..7601524 100644 --- a/docs/en-US/api_manual/rpc/system.md +++ b/docs/en-US/api_manual/rpc/system.md @@ -160,25 +160,15 @@ Module `system` provides methods for managing the system. **Description**: Sets the server's instance name. If the string parameter was empty, it restores the default instance name. -## Method: `system.load_extension` +## Method: `system.unregister_extension` -**Name**: `system.load_extension` - -**Parameters**: `string (name of extension)`, `string (path of the extension's socket)` - -**Return Value**: `null` - -**Description**: Loads an Airup extension. - -## Method: `system.unload_extension` - -**Name**: `system.unload_extension` +**Name**: `system.unregister_extension` **Parameters**: `string (name of extension)` **Return Value**: `null` -**Description**: Unloads an Airup extension. +**Description**: Unregisters an Airup extension. **Possible Errors**: diff --git a/docs/zh-CN/api_manual/rpc/system.md b/docs/zh-CN/api_manual/rpc/system.md index 3ab4231..4b4c0dd 100644 --- a/docs/zh-CN/api_manual/rpc/system.md +++ b/docs/zh-CN/api_manual/rpc/system.md @@ -140,16 +140,6 @@ **描述**:触发指定事件。 -## `system.load_extension` 方法 - -**名称**:`system.load_extension` - -**参数**:`字符串 (扩展名称)`, `字符串 (扩展套接字路径)` - -**返回值**:`null` - -**描述**:加载一个 Airup 扩展。 - ## `system.set_instance_name` 方法 **名称**:`system.set_instance_name` @@ -170,15 +160,15 @@ **描述**:进入一个里程碑。 -## `system.unload_extension` 方法 +## `system.unregister_extension` 方法 -**名称**:`system.unload_extension` +**名称**:`system.unregister_extension` **参数**:`字符串 (扩展名称)` **返回值**:`null` -**描述**:卸载一个 Airup 扩展。 +**描述**:取消注册一个 Airup 扩展。 **可能的错误**: