Skip to content

Commit

Permalink
extension: use the existing connection instead of creating new unix s…
Browse files Browse the repository at this point in the history
…ocket listener
  • Loading branch information
sisungo committed Jul 22, 2024
1 parent 39a0630 commit 8041b6f
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 123 deletions.
4 changes: 4 additions & 0 deletions airup-sdk/src/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl Connection {
.recv::<crate::rpc::Response>()?
.into_result())
}

pub fn into_inner(self) -> rpc::Connection {
self.underlying
}
}
impl Deref for Connection {
type Target = rpc::Connection;
Expand Down
5 changes: 5 additions & 0 deletions airup-sdk/src/blocking/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ impl Connection {
ciborium::into_writer(obj, &mut buffer)?;
self.0.send(&buffer)
}

/// Returns the underlying message protocol.
pub fn into_inner(self) -> MessageProto<UnixStream> {
self.0
}
}
impl Deref for Connection {
type Target = MessageProto<UnixStream>;
Expand Down
4 changes: 4 additions & 0 deletions airup-sdk/src/nonblocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl Connection {
.await?
.into_result())
}

pub fn into_inner(self) -> rpc::Connection {
self.underlying
}
}
impl Deref for Connection {
type Target = rpc::Connection;
Expand Down
5 changes: 5 additions & 0 deletions airup-sdk/src/nonblocking/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ impl Connection {
ciborium::into_writer(obj, &mut buffer)?;
self.0.send(&buffer.into_inner()).await
}

/// Returns the underlying message protocol.
pub fn into_inner(self) -> MessageProto<UnixStream> {
self.0
}
}
impl Deref for Connection {
type Target = MessageProto<UnixStream>;
Expand Down
5 changes: 5 additions & 0 deletions airup-sdk/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ impl<T> MessageProto<T> {
pub fn size_limit(&self) -> usize {
self.size_limit
}

/// Gets the inner stream.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> AsRef<T> for MessageProto<T> {
fn as_ref(&self) -> &T {
Expand Down
5 changes: 0 additions & 5 deletions airup-sdk/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ pub trait ConnectionExt<'a>: crate::Connection {
self.invoke("system.trigger_event", event)
}

/// Loads an extension.
fn register_extension(&'a mut self, name: &'a str, path: &'a str) -> Self::Invoke<'a, ()> {
self.invoke("system.register_extension", (name, path))
}

/// Unloads an extension.
fn unregister_extension(&'a mut self, name: &'a str) -> Self::Invoke<'a, ()> {
self.invoke("system.unregister_extension", name)
Expand Down
7 changes: 3 additions & 4 deletions airupd/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ impl Extensions {
}

/// Registers an extension.
pub async fn register(&self, name: String, path: &str) -> Result<(), airup_sdk::Error> {
pub async fn register(&self, name: String, conn: UnixStream) -> Result<(), airup_sdk::Error> {
let mut lock = self.0.write().await;
if lock.contains_key(&name) {
return Err(airup_sdk::Error::Exists);
}
lock.insert(
name.clone(),
Arc::new(
Extension::new(name, path)
Extension::new(name, conn)
.await
.map_err(|x| airup_sdk::Error::Io {
message: x.to_string(),
Expand Down Expand Up @@ -78,9 +78,8 @@ struct Extension {
}
impl Extension {
/// Creates a new [`Extension`] instance, hosting the extension.
async fn new(name: String, path: &str) -> std::io::Result<Self> {
async fn new(name: String, connection: UnixStream) -> std::io::Result<Self> {
let (tx, rx) = mpsc::channel(8);
let connection = UnixStream::connect(path).await?;
ExtensionHost {
name,
connection,
Expand Down
1 change: 1 addition & 0 deletions airupd/src/rpc/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod debug;
mod info;
pub mod session;
mod system;

use airup_sdk::{
Expand Down
29 changes: 29 additions & 0 deletions airupd/src/rpc/api/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//! Session management APIs.
use crate::app::airupd;
use airup_sdk::rpc::{Request, Response};

async fn send_error(
session: &mut crate::rpc::Session,
error: airup_sdk::Error,
) -> anyhow::Result<()> {
session
.conn
.send(&Response::new(Err::<(), _>(error)))
.await?;
Ok(())
}

pub async fn invoke(mut session: crate::rpc::Session, req: Request) {
_ = match &req.method[..] {
"session.into_extension" => into_extension(session, req).await,
_ => send_error(&mut session, airup_sdk::Error::NotImplemented).await,
};
}

async fn into_extension(session: crate::rpc::Session, req: Request) -> anyhow::Result<()> {
let name: String = req.extract_params()?;
let conn = session.conn.into_inner().into_inner();
airupd().extensions.register(name, conn).await?;
Ok(())
}
6 changes: 0 additions & 6 deletions airupd/src/rpc/api/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub(super) fn init<H: BuildHasher>(methods: &mut HashMap<&'static str, Method, H
enter_milestone,
set_instance_name,
trigger_event,
register_extension,
unregister_extension,
]
)
Expand Down Expand Up @@ -137,11 +136,6 @@ async fn trigger_event(event: Event) -> Result<(), Error> {
Ok(())
}

#[airupfx::macros::api]
async fn register_extension(name: String, path: String) -> Result<(), Error> {
airupd().extensions.register(name, &path).await
}

#[airupfx::macros::api]
async fn unregister_extension(name: String) -> Result<(), Error> {
airupd().extensions.unregister(&name).await
Expand Down
32 changes: 7 additions & 25 deletions airupd/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,20 @@ impl Session {
}

/// Starts the session task.
fn start(mut self) {
fn start(self) {
tokio::spawn(async move {
if let Err(err) = self.run().await {
tracing::debug!("{} disconnected: {}", self.audit_name().await, err);
}
_ = self.run().await;
});
}

/// Runs the session in place.
async fn run(&mut self) -> anyhow::Result<()> {
tracing::debug!("{} established", self.audit_name().await);
async fn run(mut self) -> anyhow::Result<()> {
loop {
let req = self.conn.recv_req().await?;
if req.method.strip_prefix("session.").is_some() {
api::session::invoke(self, req).await;
return Ok(());
}
let resp = match req.method.strip_prefix("extapi.") {
Some(method) => {
airupd()
Expand All @@ -124,23 +125,4 @@ impl Session {
self.conn.send(&resp).await?;
}
}

/// Returns audit-style name of the IPC session.
async fn audit_name(&self) -> String {
let cred = self.conn.as_ref().peer_cred().ok();
let uid = cred
.as_ref()
.map(|x| x.uid().to_string())
.unwrap_or_else(|| "?".into());
let gid = cred
.as_ref()
.map(|x| x.gid().to_string())
.unwrap_or_else(|| "?".into());
let pid = cred
.as_ref()
.and_then(|x| x.pid())
.map(|x| x.to_string())
.unwrap_or_else(|| "?".into());
format!("ipc_session(uid={}, gid={}, pid={})", uid, gid, pid)
}
}
82 changes: 25 additions & 57 deletions airupfx/airupfx-extensions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@
use airup_sdk::{
info::ConnectionExt,
nonblocking::rpc::{MessageProtoRecvExt, MessageProtoSendExt},
rpc::MessageProto,
rpc::{MessageProto, Request},
system::{ConnectionExt as _, Event},
};
use airupfx_signal::SIGTERM;
use ciborium::cbor;
use std::{collections::HashMap, future::Future, path::Path, pin::Pin, sync::Arc};
use tokio::net::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixListener,
};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};

/// An extension server.
#[derive(Debug)]
pub struct Server {
listener: UnixListener,
path: String,
extension_name: String,
service_name: String,
rpc_methods: HashMap<&'static str, Method>,
Expand All @@ -39,20 +34,14 @@ impl Server {
.to_string();
_ = std::fs::remove_file(&extension_socket_path);

Ok(Self::with_config(extension_name, service_name, extension_socket_path).await?)
Ok(Self::with_config(extension_name, service_name).await?)
}

pub async fn with_config<P: AsRef<Path>>(
pub async fn with_config(
extension_name: impl Into<String>,
service_name: impl Into<String>,
path: P,
) -> std::io::Result<Self> {
let listener = UnixListener::bind(path.as_ref())?;
airupfx_fs::set_permission(path.as_ref(), airupfx_fs::Permission::Socket).await?;

Ok(Self {
listener,
path: path.as_ref().display().to_string(),
extension_name: extension_name.into(),
service_name: service_name.into(),
rpc_methods: HashMap::with_capacity(16),
Expand All @@ -66,35 +55,21 @@ impl Server {
}

/// Runs the extension server.
pub async fn run(self) -> ! {
pub async fn run(self) -> anyhow::Result<()> {
let rpc_methods = Arc::new(self.rpc_methods);

let extension_name = self.extension_name.clone();
tokio::spawn(async move {
let mut airup_rpc_conn =
airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path()).await?;

match airup_rpc_conn
.register_extension(&extension_name, &self.path)
.await
{
Ok(Ok(())) => (),
Ok(Err(err)) => {
eprintln!("error: api failure: {err}");
std::process::exit(1);
}
Err(err) => {
eprintln!("error: rpc failure: {err}");
std::process::exit(1);
}
}

_ = airup_rpc_conn
.trigger_event(&Event::new("notify_active".into(), self.service_name))
.await;
let mut extension_conn =
airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path()).await?;

Ok::<(), anyhow::Error>(())
});
extension_conn
.send(&Request::new("session.into_extension", extension_name))
.await?;

_ = airup_sdk::nonblocking::Connection::connect(airup_sdk::socket_path())
.await?
.trigger_event(&Event::new("notify_active".into(), self.service_name))
.await;

let extension_name = self.extension_name.clone();
_ = airupfx_signal::signal(SIGTERM, |_| async move {
Expand All @@ -109,19 +84,16 @@ impl Server {
std::process::exit(0);
});

loop {
let Ok((stream, _)) = self.listener.accept().await else {
continue;
};
let (rx, tx) = stream.into_split();

Session {
rx: MessageProto::new(rx, 6 * 1024 * 1024),
tx: Arc::new(MessageProto::new(tx, 6 * 1024 * 1024).into()),
rpc_methods: rpc_methods.clone(),
}
.run_on_the_fly();
let stream = extension_conn.into_inner().into_inner().into_inner();
let (rx, tx) = stream.into_split();

Session {
rx: MessageProto::new(rx, 6 * 1024 * 1024),
tx: Arc::new(MessageProto::new(tx, 6 * 1024 * 1024).into()),
rpc_methods: rpc_methods.clone(),
}
.run()
.await
}
}

Expand All @@ -132,10 +104,6 @@ struct Session {
rpc_methods: Arc<HashMap<&'static str, Method>>,
}
impl Session {
pub fn run_on_the_fly(self) {
tokio::spawn(self.run());
}

pub async fn run(mut self) -> anyhow::Result<()> {
let mut buf = Vec::with_capacity(4096);
loop {
Expand Down
16 changes: 3 additions & 13 deletions docs/en-US/api_manual/rpc/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,15 @@ Module `system` provides methods for managing the system.

**Description**: Sets the server's instance name. If the string parameter was empty, it restores the default instance name.

## Method: `system.load_extension`
## Method: `system.unregister_extension`

**Name**: `system.load_extension`

**Parameters**: `string (name of extension)`, `string (path of the extension's socket)`

**Return Value**: `null`

**Description**: Loads an Airup extension.

## Method: `system.unload_extension`

**Name**: `system.unload_extension`
**Name**: `system.unregister_extension`

**Parameters**: `string (name of extension)`

**Return Value**: `null`

**Description**: Unloads an Airup extension.
**Description**: Unregisters an Airup extension.

**Possible Errors**:

Expand Down
Loading

0 comments on commit 8041b6f

Please sign in to comment.