From 485e89575bcb9a9db9c25613ceb9147bd24fcc8c Mon Sep 17 00:00:00 2001 From: jzaddach Date: Wed, 5 May 2021 16:33:00 +0200 Subject: [PATCH 1/9] Return Err instead of panicking when index for OsSender is invalid --- src/ipc.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ipc.rs b/src/ipc.rs index a5ae1ba2..391d281a 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -870,9 +870,7 @@ fn deserialize_os_ipc_sender<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { let index: usize = Deserialize::deserialize(deserializer)?; OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| { - // FIXME(pcwalton): This could panic if the data was corrupt and the index was out of - // bounds. We should return an `Err` result instead. - Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_sender()) + os_ipc_channels_for_deserialization.borrow_mut().get_mut(index).map(|x| x.to_sender()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for OsSender")) }) } @@ -896,6 +894,6 @@ fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| { // FIXME(pcwalton): This could panic if the data was corrupt and the index was out // of bounds. We should return an `Err` result instead. - Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_receiver()) + os_ipc_channels_for_deserialization.borrow_mut().get_mut(index).map(|x| x.to_receiver()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for OsReceiver")) }) } From be2b95a4a3d64d852d9243d5f289028a6278945e Mon Sep 17 00:00:00 2001 From: jzaddach Date: Thu, 6 May 2021 07:19:06 +0200 Subject: [PATCH 2/9] Added ability to send arbitrary fds/handles --- Cargo.toml | 2 +- benches/bench.rs | 4 +- src/ipc.rs | 129 ++++++++++----- src/platform/common/fd.rs | 100 +++++++++++ src/platform/common/mod.rs | 1 + src/platform/macos/mach_sys.rs | 6 + src/platform/macos/mod.rs | 154 +++++++++++++---- src/platform/mod.rs | 10 ++ src/platform/test.rs | 291 +++++++++++++++++---------------- src/platform/unix/mod.rs | 127 +++++++------- src/platform/windows/handle.rs | 193 ++++++++++++++++++++++ src/platform/windows/mod.rs | 196 ++++++---------------- 12 files changed, 788 insertions(+), 425 deletions(-) create mode 100644 src/platform/common/fd.rs create mode 100644 src/platform/common/mod.rs create mode 100644 src/platform/windows/handle.rs diff --git a/Cargo.toml b/Cargo.toml index ead0f959..58477791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,4 +36,4 @@ sc = { version = "0.2.2", optional = true } crossbeam-utils = "0.7" [target.'cfg(target_os = "windows")'.dependencies] -winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default"]} +winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default", "std"]} diff --git a/benches/bench.rs b/benches/bench.rs index 38c665ba..444a3d29 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -46,7 +46,7 @@ mod platform { let wait_rx = wait_rx.lock().unwrap(); let tx = tx; for _ in 0..ITERATIONS { - tx.send(&data, vec![], vec![]).unwrap(); + tx.send(&data, vec![], vec![], vec![]).unwrap(); if ITERATIONS > 1 { // Prevent beginning of the next send // from overlapping with receive of last fragment, @@ -72,7 +72,7 @@ mod platform { } else { b.iter(|| { for _ in 0..ITERATIONS { - tx.send(&data, vec![], vec![]).unwrap(); + tx.send(&data, vec![], vec![], vec![]).unwrap(); rx.recv().unwrap(); } 0 diff --git a/src/ipc.rs b/src/ipc.rs index 391d281a..0b96fe4a 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -9,6 +9,7 @@ use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender}; use crate::platform::{OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel}; +use crate::platform::Descriptor; use bincode; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -29,6 +30,9 @@ thread_local! { static OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION: RefCell>> = RefCell::new(Vec::new()) } +thread_local! { + static OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION: RefCell> = RefCell::new(Vec::new()) +} thread_local! { static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) } @@ -36,6 +40,9 @@ thread_local! { static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) } +thread_local! { + static OS_IPC_DESCRIPTORS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) +} #[derive(Debug)] pub enum IpcError { @@ -247,17 +254,17 @@ pub struct IpcReceiver { impl IpcReceiver where T: for<'de> Deserialize<'de> + Serialize { /// Blocking receive. pub fn recv(&self) -> Result { - let (data, os_ipc_channels, os_ipc_shared_memory_regions) = self.os_receiver.recv()?; - OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions) + let (data, os_ipc_channels, os_ipc_shared_memory_regions, os_ipc_descriptors) = self.os_receiver.recv()?; + OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions, os_ipc_descriptors) .to() .map_err(IpcError::Bincode) } /// Non-blocking receive pub fn try_recv(&self) -> Result { - let (data, os_ipc_channels, os_ipc_shared_memory_regions) = + let (data, os_ipc_channels, os_ipc_shared_memory_regions, os_ipc_descriptors) = self.os_receiver.try_recv()?; - OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions) + OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions, os_ipc_descriptors) .to() .map_err(IpcError::Bincode) .map_err(TryRecvError::IpcError) @@ -341,24 +348,30 @@ impl IpcSender where T: Serialize { let mut bytes = Vec::with_capacity(4096); OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| { OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with( - |os_ipc_shared_memory_regions_for_serialization| { - let old_os_ipc_channels = - mem::replace(&mut *os_ipc_channels_for_serialization.borrow_mut(), Vec::new()); - let old_os_ipc_shared_memory_regions = - mem::replace(&mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(), - Vec::new()); - let os_ipc_shared_memory_regions; - let os_ipc_channels; - { - bincode::serialize_into(&mut bytes, &data)?; - os_ipc_channels = - mem::replace(&mut *os_ipc_channels_for_serialization.borrow_mut(), - old_os_ipc_channels); - os_ipc_shared_memory_regions = mem::replace( - &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(), - old_os_ipc_shared_memory_regions); - }; - Ok(self.os_sender.send(&bytes[..], os_ipc_channels, os_ipc_shared_memory_regions)?) + |os_ipc_shared_memory_regions_for_serialization| { + OS_IPC_DESCRIPTORS_FOR_SERIALIZATION.with( + |os_ipc_descriptors_for_serialization| { + let old_os_ipc_channels = + mem::replace(&mut *os_ipc_channels_for_serialization.borrow_mut(), Vec::new()); + let old_os_ipc_shared_memory_regions = + mem::replace(&mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(), + Vec::new()); + let old_os_ipc_descriptors = mem::replace(&mut *os_ipc_descriptors_for_serialization.borrow_mut(), Vec::new()); + let os_ipc_shared_memory_regions; + let os_ipc_channels; + let os_ipc_descriptors; + { + bincode::serialize_into(&mut bytes, &data)?; + os_ipc_channels = + mem::replace(&mut *os_ipc_channels_for_serialization.borrow_mut(), + old_os_ipc_channels); + os_ipc_shared_memory_regions = mem::replace( + &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(), + old_os_ipc_shared_memory_regions); + os_ipc_descriptors = mem::replace(&mut *os_ipc_descriptors_for_serialization.borrow_mut(), old_os_ipc_descriptors); + }; + Ok(self.os_sender.send(&bytes[..], os_ipc_channels, os_ipc_shared_memory_regions, os_ipc_descriptors)?) + }) }) }) } @@ -463,7 +476,8 @@ impl IpcReceiverSet { OsIpcSelectionResult::DataReceived(os_receiver_id, data, os_ipc_channels, - os_ipc_shared_memory_regions) => { + os_ipc_shared_memory_regions, + os_ipc_descriptors) => { IpcSelectionResult::MessageReceived(os_receiver_id, OpaqueIpcMessage { data: data, os_ipc_channels: os_ipc_channels, @@ -472,6 +486,7 @@ impl IpcReceiverSet { |os_ipc_shared_memory_region| { Some(os_ipc_shared_memory_region) }).collect(), + os_ipc_descriptors, }) } OsIpcSelectionResult::ChannelClosed(os_receiver_id) => { @@ -606,6 +621,7 @@ pub struct OpaqueIpcMessage { data: Vec, os_ipc_channels: Vec, os_ipc_shared_memory_regions: Vec>, + os_ipc_descriptors: Vec, } impl Debug for OpaqueIpcMessage { @@ -620,7 +636,8 @@ impl Debug for OpaqueIpcMessage { impl OpaqueIpcMessage { fn new(data: Vec, os_ipc_channels: Vec, - os_ipc_shared_memory_regions: Vec) + os_ipc_shared_memory_regions: Vec, + os_ipc_descriptors: Vec) -> OpaqueIpcMessage { OpaqueIpcMessage { data: data, @@ -630,6 +647,7 @@ impl OpaqueIpcMessage { .map(|os_ipc_shared_memory_region| { Some(os_ipc_shared_memory_region) }).collect(), + os_ipc_descriptors, } } @@ -638,18 +656,24 @@ impl OpaqueIpcMessage { OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| { OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with( |os_ipc_shared_memory_regions_for_deserialization| { - mem::swap(&mut *os_ipc_channels_for_deserialization.borrow_mut(), - &mut self.os_ipc_channels); - mem::swap(&mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(), - &mut self.os_ipc_shared_memory_regions); - let result = bincode::deserialize(&self.data[..]); - mem::swap(&mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(), - &mut self.os_ipc_shared_memory_regions); - mem::swap(&mut *os_ipc_channels_for_deserialization.borrow_mut(), - &mut self.os_ipc_channels); - /* Error check comes after doing cleanup, - * since we need the cleanup both in the success and the error cases. */ - Ok(result?) + OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION.with(|os_ipc_descriptors_for_deserialization| { + mem::swap(&mut *os_ipc_channels_for_deserialization.borrow_mut(), + &mut self.os_ipc_channels); + mem::swap(&mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(), + &mut self.os_ipc_shared_memory_regions); + mem::swap(&mut *os_ipc_descriptors_for_deserialization.borrow_mut(), + &mut self.os_ipc_descriptors); + let result = bincode::deserialize(&self.data[..]); + mem::swap(&mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(), + &mut self.os_ipc_shared_memory_regions); + mem::swap(&mut *os_ipc_channels_for_deserialization.borrow_mut(), + &mut self.os_ipc_channels); + mem::swap(&mut *os_ipc_descriptors_for_deserialization.borrow_mut(), + &mut self.os_ipc_descriptors); + /* Error check comes after doing cleanup, + * since we need the cleanup both in the success and the error cases. */ + Ok(result?) + }) }) }) } @@ -761,7 +785,7 @@ impl IpcOneShotServer where T: for<'de> Deserialize<'de> + Serialize { } pub fn accept(self) -> Result<(IpcReceiver,T), bincode::Error> { - let (os_receiver, data, os_channels, os_shared_memory_regions) = + let (os_receiver, data, os_channels, os_shared_memory_regions, os_ipc_descriptors) = self.os_server.accept()?; let value = OpaqueIpcMessage { data: data, @@ -770,6 +794,7 @@ impl IpcOneShotServer where T: for<'de> Deserialize<'de> + Serialize { .map(|os_shared_memory_region| { Some(os_shared_memory_region) }).collect(), + os_ipc_descriptors, }.to()?; Ok((IpcReceiver { os_receiver: os_receiver, @@ -789,7 +814,7 @@ impl IpcBytesReceiver { #[inline] pub fn recv(&self) -> Result, IpcError> { match self.os_receiver.recv() { - Ok((data, _, _)) => Ok(data), + Ok((data, _, _, _)) => Ok(data), Err(err) => Err(err.into()), } } @@ -797,7 +822,7 @@ impl IpcBytesReceiver { /// Non-blocking receive pub fn try_recv(&self) -> Result, TryRecvError> { match self.os_receiver.try_recv() { - Ok((data, _, _)) => Ok(data), + Ok((data, _, _, _)) => Ok(data), Err(err) => Err(err.into()), } } @@ -850,7 +875,7 @@ impl Serialize for IpcBytesSender { impl IpcBytesSender { #[inline] pub fn send(&self, data: &[u8]) -> Result<(), io::Error> { - self.os_sender.send(data, vec![], vec![]).map_err(|e| io::Error::from(e)) + self.os_sender.send(data, vec![], vec![], vec![]).map_err(|e| io::Error::from(e)) } } @@ -897,3 +922,27 @@ fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) os_ipc_channels_for_deserialization.borrow_mut().get_mut(index).map(|x| x.to_receiver()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for OsReceiver")) }) } + + +impl Serialize for Descriptor { + fn serialize(&self, serializer: S) -> Result where S: Serializer { + let index = OS_IPC_DESCRIPTORS_FOR_SERIALIZATION.with(|os_ipc_descriptors_for_serialization| { + let mut os_ipc_descriptors_for_serialization = + os_ipc_descriptors_for_serialization.borrow_mut(); + let index = os_ipc_descriptors_for_serialization.len(); + os_ipc_descriptors_for_serialization.push(self.consume()); + index + }); + index.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for Descriptor { + fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { + let index: usize = Deserialize::deserialize(deserializer)?; + + OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION.with(|os_ipc_descriptors_for_deserialization| { + os_ipc_descriptors_for_deserialization.borrow_mut().get_mut(index).map(|x| x.consume()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for Descriptor")) + }) + } +} diff --git a/src/platform/common/fd.rs b/src/platform/common/fd.rs new file mode 100644 index 00000000..493c36aa --- /dev/null +++ b/src/platform/common/fd.rs @@ -0,0 +1,100 @@ +// use std::ops::{ +// Deref, +// DerefMut, +// }; +use std::os::unix::io::{ + AsRawFd, + RawFd, + IntoRawFd, + FromRawFd, +}; +use std::fmt; +use std::cmp::{PartialEq}; +use std::mem; +use std::fs::File; +use std::cell::RefCell; + +pub struct OwnedFd(RefCell); + +impl Drop for OwnedFd { + fn drop(&mut self) { + if *self.0.borrow() != -1 { + unsafe { + let _ = libc::close(*self.0.borrow()); + } + } + } +} + +// impl Deref for OwnedFd { +// type Target = RawFd; + +// fn deref(&self) -> &Self::Target { +// &self.0 +// } +// } + +// impl DerefMut for OwnedFd { +// fn deref_mut(&mut self) -> &mut Self::Target { +// &mut self.0 +// } +// } + +impl IntoRawFd for OwnedFd { + fn into_raw_fd(self) -> RawFd { + let fd = *self.0.borrow(); + mem::forget(self); + fd + } +} + +impl AsRawFd for OwnedFd { + fn as_raw_fd(& self) -> RawFd { + *self.0.borrow() + } +} + +impl FromRawFd for OwnedFd { + unsafe fn from_raw_fd(fd: RawFd) -> OwnedFd { + OwnedFd::new(fd) + } +} + +impl Into for OwnedFd { + fn into(self) -> File { + unsafe { + File::from_raw_fd(self.into_raw_fd()) + } + } +} + +impl From for OwnedFd { + fn from(file: File) -> Self { + OwnedFd::new(file.into_raw_fd()) + } +} + +impl OwnedFd { + pub fn new(fd: RawFd) -> OwnedFd { + OwnedFd(RefCell::new(fd)) + } + + pub fn consume(&self) -> OwnedFd { + let fd = self.0.replace(-1); + OwnedFd::new(fd) + } +} + +impl PartialEq for OwnedFd { + fn eq(&self, other: &Self) -> bool { + *self.0.borrow() == *other.0.borrow() + } +} + +impl fmt::Debug for OwnedFd { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("") + .field(&self.0) + .finish() + } +} diff --git a/src/platform/common/mod.rs b/src/platform/common/mod.rs new file mode 100644 index 00000000..99821ee4 --- /dev/null +++ b/src/platform/common/mod.rs @@ -0,0 +1 @@ +pub mod fd; \ No newline at end of file diff --git a/src/platform/macos/mach_sys.rs b/src/platform/macos/mach_sys.rs index acdbe9d8..e0e26dc5 100644 --- a/src/platform/macos/mach_sys.rs +++ b/src/platform/macos/mach_sys.rs @@ -33,6 +33,11 @@ pub struct mach_vm_info_region { pub type mach_vm_info_region_t = mach_vm_info_region; +extern "C" { + pub fn fileport_makeport(fd: std::os::raw::c_int, port: *mut fileport_t) -> std::os::raw::c_int; + pub fn fileport_makefd(port: fileport_t) -> std::os::raw::c_int; +} + // Code below this line is automatically generated. pub type boolean_t = ::libc::c_uint; @@ -62,6 +67,7 @@ pub type mach_port_mscount_t = natural_t; pub type mach_port_msgcount_t = natural_t; pub type mach_port_rights_t = natural_t; pub type mach_port_srights_t = ::libc::c_uint; +pub type fileport_t = mach_port_type_t; #[repr(C)] #[derive(Copy)] pub struct Struct_mach_port_status { diff --git a/src/platform/macos/mod.rs b/src/platform/macos/mod.rs index 46db8916..003b1f68 100644 --- a/src/platform/macos/mod.rs +++ b/src/platform/macos/mod.rs @@ -13,6 +13,8 @@ use self::mach_sys::{mach_msg_ool_descriptor_t, mach_msg_port_descriptor_t, mach use self::mach_sys::{mach_msg_timeout_t, mach_port_limits_t, mach_port_msgcount_t}; use self::mach_sys::{mach_port_right_t, mach_port_t, mach_task_self_, vm_inherit_t}; use self::mach_sys::mach_port_deallocate; +use self::mach_sys::fileport_t; +use crate::platform::Descriptor; use bincode; use libc::{self, c_char, c_uint, c_void, size_t}; @@ -30,6 +32,12 @@ use std::slice; use std::sync::RwLock; use std::usize; +use std::os::raw::c_int; + +use std::os::unix::io::AsRawFd; + +use crate::platform::common::fd::OwnedFd; + mod mach_sys; /// The size that we preallocate on the stack to receive messages. If the message is larger than @@ -197,6 +205,31 @@ fn mach_port_extract_right( Err(error.into()) } +fn mach_fileport_makeport(fd: c_int) -> Result { + let mut port = MACH_PORT_NULL; + let error = unsafe { + mach_sys::fileport_makeport(fd, & mut port) + }; + if error == KERN_SUCCESS { + Ok(port) + } + else { + Err(error.into()) + } +} + +fn mach_fileport_makefd(port: fileport_t) -> Result { + let fd = unsafe { + mach_sys::fileport_makefd(port) + }; + if fd >= 0 { + Ok(fd) + } + else { + Err(fd.into()) + } +} + impl OsIpcReceiver { fn new() -> Result { let port = mach_port_allocate(MACH_PORT_RIGHT_RECEIVE)?; @@ -325,12 +358,12 @@ impl OsIpcReceiver { } fn recv_with_blocking_mode(&self, blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec), + -> Result<(Vec, Vec, Vec, Vec), MachError> { select(self.port.get(), blocking_mode).and_then(|result| { match result { - OsIpcSelectionResult::DataReceived(_, data, channels, shared_memory_regions) => { - Ok((data, channels, shared_memory_regions)) + OsIpcSelectionResult::DataReceived(_, data, channels, shared_memory_regions, descriptors) => { + Ok((data, channels, shared_memory_regions, descriptors)) } OsIpcSelectionResult::ChannelClosed(_) => Err(MachError::from(MACH_NOTIFY_NO_SENDERS)), } @@ -338,12 +371,12 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec),MachError> { + -> Result<(Vec, Vec, Vec, Vec),MachError> { self.recv_with_blocking_mode(BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec),MachError> { + -> Result<(Vec, Vec, Vec, Vec),MachError> { self.recv_with_blocking_mode(BlockingMode::Nonblocking) } } @@ -473,7 +506,8 @@ impl OsIpcSender { pub fn send(&self, data: &[u8], ports: Vec, - mut shared_memory_regions: Vec) + mut shared_memory_regions: Vec, + descriptors: Vec) -> Result<(),MachError> { let mut data = SendData::from(data); if let Some(data) = data.take_shared_memory() { @@ -481,7 +515,7 @@ impl OsIpcSender { } unsafe { - let size = Message::size_of(&data, ports.len(), shared_memory_regions.len()); + let size = Message::size_of(&data, ports.len(), shared_memory_regions.len(), descriptors.len()); let message = libc::malloc(size as size_t) as *mut Message; (*message).header.msgh_bits = (MACH_MSG_TYPE_COPY_SEND as u32) | MACH_MSGH_BITS_COMPLEX; @@ -491,7 +525,7 @@ impl OsIpcSender { (*message).header.msgh_reserved = 0; (*message).header.msgh_id = 0; (*message).body.msgh_descriptor_count = - (ports.len() + shared_memory_regions.len()) as u32; + (ports.len() + shared_memory_regions.len() + descriptors.len()) as u32; let mut port_descriptor_dest = message.offset(1) as *mut mach_msg_port_descriptor_t; for outgoing_port in &ports { @@ -519,7 +553,28 @@ impl OsIpcSender { shared_memory_descriptor_dest = shared_memory_descriptor_dest.offset(1); } - let is_inline_dest = shared_memory_descriptor_dest as *mut bool; + // See https://chromium.googlesource.com/chromium/src/+/refs/heads/main/mojo/core/channel_mac.cc#393 + let descriptor_count = descriptors.len(); + let mut port_descriptor_dest = shared_memory_descriptor_dest as *mut mach_msg_port_descriptor_t; + for descriptor in &descriptors { + (*port_descriptor_dest).name = mach_fileport_makeport(descriptor.as_raw_fd())?; + (*port_descriptor_dest).pad1 = 0; + (*port_descriptor_dest).pad2 = 0; + + (*port_descriptor_dest).disposition = MACH_MSG_TYPE_MOVE_SEND; + (*port_descriptor_dest).type_ = MACH_MSG_PORT_DESCRIPTOR; + port_descriptor_dest = port_descriptor_dest.offset(1); + } + + let mut port_counts = port_descriptor_dest as *mut usize; + *port_counts = ports.len(); + port_counts = port_counts.offset(1); + *port_counts = shared_memory_regions.len(); + port_counts = port_counts.offset(1); + *port_counts = descriptor_count; + port_counts = port_counts.offset(1); + + let is_inline_dest = port_counts as *mut bool; *is_inline_dest = data.is_inline(); if data.is_inline() { @@ -552,7 +607,7 @@ impl OsIpcSender { *max_inline_size = inline_len; } } - return self.send(inline_data, ports, shared_memory_regions); + return self.send(inline_data, ports, shared_memory_regions, descriptors); } if os_result != MACH_MSG_SUCCESS { return Err(MachError::from(os_result)) @@ -563,6 +618,10 @@ impl OsIpcSender { for shared_memory_region in shared_memory_regions { mem::forget(shared_memory_region); } + + for descriptor in descriptors { + mem::forget(descriptor); + } Ok(()) } } @@ -649,15 +708,15 @@ impl Drop for OsIpcReceiverSet { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { - OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => { - (id, data, channels, shared_memory_regions) + OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { + (id, data, channels, shared_memory_regions, descriptors) } OsIpcSelectionResult::ChannelClosed(id) => { panic!("OsIpcSelectionResult::unwrap(): receiver ID {} was closed!", id) @@ -730,35 +789,58 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode) return Ok(OsIpcSelectionResult::ChannelClosed(local_port as u64)) } - let (mut ports, mut shared_memory_regions) = (Vec::new(), Vec::new()); + let (mut ports, mut shared_memory_regions, mut descriptors) = (Vec::new(), Vec::new(), Vec::new()); let mut port_descriptor = message.offset(1) as *mut mach_msg_port_descriptor_t; let mut descriptors_remaining = (*message).body.msgh_descriptor_count; + let mut raw_ports = Vec::new(); while descriptors_remaining > 0 { - if (*port_descriptor).type_ != MACH_MSG_PORT_DESCRIPTOR { - break + match (*port_descriptor).type_ { + MACH_MSG_OOL_DESCRIPTOR => { + let shared_memory_descriptor = port_descriptor as *mut mach_msg_ool_descriptor_t; + shared_memory_regions.push(OsIpcSharedMemory::from_raw_parts( + (*shared_memory_descriptor).address as *mut u8, + (*shared_memory_descriptor).size as usize)); + port_descriptor = shared_memory_descriptor.offset(1) as *mut mach_msg_port_descriptor_t; + + }, + MACH_MSG_PORT_DESCRIPTOR => { + raw_ports.push((*port_descriptor).name); + port_descriptor = port_descriptor.offset(1); + }, + _ => { + panic!("Unexpected mach message type"); + }, } - ports.push(OsOpaqueIpcChannel::from_name((*port_descriptor).name)); - port_descriptor = port_descriptor.offset(1); + descriptors_remaining -= 1; } - let mut shared_memory_descriptor = port_descriptor as *mut mach_msg_ool_descriptor_t; - while descriptors_remaining > 0 { - debug_assert!((*shared_memory_descriptor).type_ == MACH_MSG_OOL_DESCRIPTOR); - shared_memory_regions.push(OsIpcSharedMemory::from_raw_parts( - (*shared_memory_descriptor).address as *mut u8, - (*shared_memory_descriptor).size as usize)); - shared_memory_descriptor = shared_memory_descriptor.offset(1); - descriptors_remaining -= 1; + let mut port_counts = port_descriptor as *mut usize; + let port_count = *port_counts; + port_counts = port_counts.offset(1); + let shared_memory_region_count = *port_counts; + port_counts = port_counts.offset(1); + let descriptor_count = *port_counts; + + assert_eq!(raw_ports.len(), port_count + descriptor_count, "Mismatch in expected and transmitted number of ports"); + assert_eq!(shared_memory_regions.len(), shared_memory_region_count, "Mismatch in expected and transmitted number of shared memory regions"); + + for idx in 0 .. port_count { + ports.push(OsOpaqueIpcChannel::from_name(raw_ports[idx])); + } + + for idx in port_count .. (port_count + descriptor_count) { + let fd = mach_fileport_makefd(raw_ports[idx])?; + descriptors.push(OwnedFd::new(fd)); } - let has_inline_data_ptr = shared_memory_descriptor as *mut bool; + let has_inline_data_ptr = port_counts.offset(1) as *mut bool; let has_inline_data = *has_inline_data_ptr; let payload = if has_inline_data { let payload_size_ptr = has_inline_data_ptr.offset(1) as *mut usize; let payload_size = *payload_size_ptr; let max_payload_size = message as usize + ((*message).header.msgh_size as usize) - - (shared_memory_descriptor as usize); + (has_inline_data_ptr as usize); assert!(payload_size <= max_payload_size); let payload_ptr = payload_size_ptr.offset(1) as *mut u8; slice::from_raw_parts(payload_ptr, payload_size).to_vec() @@ -774,7 +856,8 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode) Ok(OsIpcSelectionResult::DataReceived(local_port as u64, payload, ports, - shared_memory_regions)) + shared_memory_regions, + descriptors)) } } @@ -802,9 +885,10 @@ impl OsIpcOneShotServer { pub fn accept(self) -> Result<(OsIpcReceiver, Vec, Vec, - Vec),MachError> { - let (bytes, channels, shared_memory_regions) = self.receiver.recv()?; - Ok((self.receiver.consume(), bytes, channels, shared_memory_regions)) + Vec, + Vec),MachError> { + let (bytes, channels, shared_memory_regions, descriptors) = self.receiver.recv()?; + Ok((self.receiver.consume(), bytes, channels, shared_memory_regions, descriptors)) } } @@ -930,10 +1014,12 @@ struct Message { } impl Message { - fn size_of(data: &SendData, port_length: usize, shared_memory_length: usize) -> usize { + fn size_of(data: &SendData, port_length: usize, shared_memory_length: usize, descriptors_length: usize) -> usize { let mut size = mem::size_of::() + mem::size_of::() * port_length + mem::size_of::() * shared_memory_length + + mem::size_of::() * descriptors_length + + mem::size_of::() * 3 + mem::size_of::(); if data.is_inline() { diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 47c4e024..0ae0b0d8 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -7,6 +7,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", + target_os = "openbsd", + target_os = "freebsd", + target_os = "macos")))] +mod common; + #[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", target_os = "openbsd", target_os = "freebsd")))] @@ -16,6 +22,7 @@ mod unix; target_os = "freebsd")))] mod os { pub use super::unix::*; + pub type Descriptor = super::common::fd::OwnedFd; } #[cfg(all(not(feature = "force-inprocess"), target_os = "macos"))] @@ -23,6 +30,7 @@ mod macos; #[cfg(all(not(feature = "force-inprocess"), target_os = "macos"))] mod os { pub use super::macos::*; + pub type Descriptor = super::common::fd::OwnedFd; } #[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))] @@ -30,6 +38,7 @@ mod windows; #[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))] mod os { pub use super::windows::*; + pub type Descriptor = super::windows::handle::WinHandle; } #[cfg(any( @@ -48,6 +57,7 @@ mod os { pub use self::os::{OsIpcChannel, OsIpcOneShotServer, OsIpcReceiver, OsIpcReceiverSet}; pub use self::os::{OsIpcSelectionResult, OsIpcSender, OsIpcSharedMemory}; pub use self::os::{OsOpaqueIpcChannel, channel}; +pub use self::os::{Descriptor}; #[cfg(test)] mod test; diff --git a/src/platform/test.rs b/src/platform/test.rs index c4716f41..1daaa005 100644 --- a/src/platform/test.rs +++ b/src/platform/test.rs @@ -34,10 +34,10 @@ use crate::test::{get_channel_name_arg, spawn_server}; fn simple() { let (tx, rx) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; - tx.send(data, Vec::new(), Vec::new()).unwrap(); - let (received_data, received_channels, received_shared_memory) = rx.recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory), - (data, Vec::new(), Vec::new())); + tx.send(data, Vec::new(), Vec::new(), Vec::new()).unwrap(); + let (received_data, received_channels, received_shared_memory, descriptors) = rx.recv().unwrap(); + assert_eq!((&received_data[..], received_channels, received_shared_memory, descriptors), + (data, Vec::new(), Vec::new(), Vec::new())); } #[test] @@ -45,15 +45,15 @@ fn sender_transfer() { let (super_tx, super_rx) = platform::channel().unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; - super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); - let (_, mut received_channels, _) = super_rx.recv().unwrap(); + super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); + let (_, mut received_channels, _, _) = super_rx.recv().unwrap(); assert_eq!(received_channels.len(), 1); let sub_tx = received_channels.pop().unwrap().to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = sub_rx.recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (data, vec![], vec![], vec![])); } #[test] @@ -61,15 +61,15 @@ fn receiver_transfer() { let (super_tx, super_rx) = platform::channel().unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; - super_tx.send(data, vec![OsIpcChannel::Receiver(sub_rx)], vec![]).unwrap(); - let (_, mut received_channels, _) = super_rx.recv().unwrap(); + super_tx.send(data, vec![OsIpcChannel::Receiver(sub_rx)], vec![], vec![]).unwrap(); + let (_, mut received_channels, _, _) = super_rx.recv().unwrap(); assert_eq!(received_channels.len(), 1); let sub_rx = received_channels.pop().unwrap().to_receiver(); - sub_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = sub_rx.recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (data, vec![], vec![], vec![])); } #[test] @@ -80,23 +80,23 @@ fn multisender_transfer() { let data: &[u8] = b"asdfasdf"; super_tx.send(data, vec![OsIpcChannel::Sender(sub0_tx), OsIpcChannel::Sender(sub1_tx)], - vec![]).unwrap(); - let (_, mut received_channels, _) = super_rx.recv().unwrap(); + vec![], vec![]).unwrap(); + let (_, mut received_channels, _, _) = super_rx.recv().unwrap(); assert_eq!(received_channels.len(), 2); let sub0_tx = received_channels.remove(0).to_sender(); - sub0_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_subchannels, received_shared_memory_regions) = + sub0_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_subchannels, received_shared_memory_regions, descriptors) = sub0_rx.recv().unwrap(); - assert_eq!((&received_data[..], received_subchannels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_subchannels, received_shared_memory_regions, descriptors), + (data, vec![], vec![], vec![])); let sub1_tx = received_channels.remove(0).to_sender(); - sub1_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_subchannels, received_shared_memory_regions) = + sub1_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_subchannels, received_shared_memory_regions, descriptors) = sub1_rx.recv().unwrap(); - assert_eq!((&received_data[..], received_subchannels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_subchannels, received_shared_memory_regions, descriptors), + (data, vec![], vec![], vec![])); } #[test] @@ -104,11 +104,11 @@ fn medium_data() { let data: Vec = (0..65536).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; let (tx, rx) = platform::channel().unwrap(); - tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = rx.recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (&data[..], vec![], vec![], vec![])); } #[test] @@ -117,15 +117,15 @@ fn medium_data_with_sender_transfer() { let data: &[u8] = &data[..]; let (super_tx, super_rx) = platform::channel().unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); - super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); - let (_, mut received_channels, _) = super_rx.recv().unwrap(); + super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); + let (_, mut received_channels, _, _) = super_rx.recv().unwrap(); assert_eq!(received_channels.len(), 1); let sub_tx = received_channels.pop().unwrap().to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = sub_rx.recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (data, vec![], vec![], vec![])); } fn check_big_data(size: u32) { @@ -133,15 +133,15 @@ fn check_big_data(size: u32) { let thread = thread::spawn(move || { let data: Vec = (0.. size).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, descriptors) = rx.recv().unwrap(); let data: Vec = (0.. size).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; assert_eq!(received_data.len(), data.len()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (&data[..], vec![], vec![], vec![])); thread.join().unwrap(); } @@ -166,9 +166,9 @@ fn big_data_with_sender_transfer() { let thread = thread::spawn(move || { let data: Vec = (0.. 1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); + super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); }); - let (received_data, mut received_channels, received_shared_memory_regions) = + let (received_data, mut received_channels, received_shared_memory_regions, descriptors) = super_rx.recv().unwrap(); let data: Vec = (0.. 1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; @@ -176,16 +176,17 @@ fn big_data_with_sender_transfer() { assert_eq!(&received_data[..], &data[..]); assert_eq!(received_channels.len(), 1); assert_eq!(received_shared_memory_regions.len(), 0); + assert_eq!(descriptors.len(), 0); let data: Vec = (0..65536).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; let sub_tx = received_channels[0].to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = sub_rx.recv().unwrap(); assert_eq!(received_data.len(), data.len()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (&data[..], vec![], vec![], vec![])); thread.join().unwrap(); } @@ -196,24 +197,25 @@ fn with_n_fds(n: usize, size: usize) { let (super_tx, super_rx) = platform::channel().unwrap(); let data: Vec = (0..size).map(|i| (i % 251) as u8).collect(); - super_tx.send(&data[..], sender_fds, vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + super_tx.send(&data[..], sender_fds, vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = super_rx.recv().unwrap(); assert_eq!(received_data.len(), data.len()); assert_eq!(&received_data[..], &data[..]); assert_eq!(received_channels.len(), receivers.len()); assert_eq!(received_shared_memory_regions.len(), 0); + assert_eq!(descriptors.len(), 0); let data: Vec = (0..65536).map(|i| (i % 251) as u8).collect(); for (mut sender_fd, sub_rx) in received_channels.into_iter().zip(receivers.into_iter()) { let sub_tx = sender_fd.to_sender(); - sub_tx.send(&data[..], vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(&data[..], vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, descriptors) = sub_rx.recv().unwrap(); assert_eq!(received_data.len(), data.len()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, descriptors), + (&data[..], vec![], vec![], vec![])); } } @@ -307,9 +309,9 @@ macro_rules! create_big_data_with_n_fds { let thread = thread::spawn(move || { let data: Vec = (0.. 1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - super_tx.send(data, sender_fds, vec![]).unwrap(); + super_tx.send(data, sender_fds, vec![], vec![]).unwrap(); }); - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = super_rx.recv().unwrap(); thread.join().unwrap(); @@ -319,17 +321,18 @@ macro_rules! create_big_data_with_n_fds { assert_eq!(&received_data[..], &data[..]); assert_eq!(received_channels.len(), receivers.len()); assert_eq!(received_shared_memory_regions.len(), 0); + assert_eq!(received_descriptors.len(), 0); let data: Vec = (0..65536).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; for (mut sender_fd, sub_rx) in received_channels.into_iter().zip(receivers.into_iter()) { let sub_tx = sender_fd.to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); - let (received_data, received_channels, received_shared_memory_regions) = + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = sub_rx.recv().unwrap(); assert_eq!(received_data.len(), data.len()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (&data[..], vec![], vec![], vec![])); } } ) @@ -354,21 +357,21 @@ fn concurrent_senders() { thread::spawn(move || { let data: Vec = (0.. 1024 * 1024).map(|j| (j % 13) as u8 | i << 4).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }) }).collect(); let mut received_vals: Vec = vec![]; for _ in 0..num_senders { - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = rx.recv().unwrap(); let val = received_data[0] >> 4; received_vals.push(val); let data: Vec = (0.. 1024 * 1024).map(|j| (j % 13) as u8 | val << 4).collect(); let data: &[u8] = &data[..]; assert_eq!(received_data.len(), data.len()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (&data[..], vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (&data[..], vec![], vec![], vec![])); } assert!(rx.try_recv().is_err()); // There should be no further messages pending. received_vals.sort(); @@ -388,24 +391,24 @@ fn receiver_set() { let rx1_id = rx_set.add(rx1).unwrap(); let data: &[u8] = b"1234567"; - tx0.send(data, vec![], vec![]).unwrap(); - let (received_id, received_data, _, _) = + tx0.send(data, vec![], vec![], vec![]).unwrap(); + let (received_id, received_data, _, _, _) = rx_set.select().unwrap().into_iter().next().unwrap().unwrap(); assert_eq!(received_id, rx0_id); assert_eq!(received_data, data); - tx1.send(data, vec![], vec![]).unwrap(); - let (received_id, received_data, _, _) = + tx1.send(data, vec![], vec![], vec![]).unwrap(); + let (received_id, received_data, _, _, _) = rx_set.select().unwrap().into_iter().next().unwrap().unwrap(); assert_eq!(received_id, rx1_id); assert_eq!(received_data, data); - tx0.send(data, vec![], vec![]).unwrap(); - tx1.send(data, vec![], vec![]).unwrap(); + tx0.send(data, vec![], vec![], vec![]).unwrap(); + tx1.send(data, vec![], vec![], vec![]).unwrap(); let (mut received0, mut received1) = (false, false); while !received0 || !received1 { for result in rx_set.select().unwrap().into_iter() { - let (received_id, received_data, _, _) = result.unwrap(); + let (received_id, received_data, _, _, _) = result.unwrap(); assert_eq!(received_data, data); assert!(received_id == rx0_id || received_id == rx1_id); if received_id == rx0_id { @@ -430,19 +433,19 @@ fn receiver_set_eintr() { let rx_id = rx_set.add(rx0).unwrap(); // Let the parent know we're ready let tx1 = OsIpcSender::connect(name).unwrap(); - tx1.send(b" Ready! ", vec![OsIpcChannel::Sender(tx0)], vec![]).unwrap(); + tx1.send(b" Ready! ", vec![OsIpcChannel::Sender(tx0)], vec![], vec![]).unwrap(); // Send the result of the select back to the parent let result = rx_set.select().unwrap(); let mut result_iter = result.into_iter(); - let (id, data, _, _) = result_iter.next().unwrap().unwrap(); + let (id, data, _, _, _) = result_iter.next().unwrap().unwrap(); assert_eq!(rx_id, id); assert_eq!(b"Test".as_ref(), &*data); assert!(result_iter.next().is_none()); - tx1.send(b"Success!", vec![], vec![]).unwrap(); + tx1.send(b"Success!", vec![], vec![], vec![]).unwrap(); }) }; // Wait until the child is ready - let (server, res, mut channels, _) = server.accept().unwrap(); + let (server, res, mut channels, _, _) = server.accept().unwrap(); assert!(res == b" Ready! "); let tx1 = channels.first_mut().unwrap().to_sender(); unsafe { @@ -451,8 +454,8 @@ fn receiver_set_eintr() { kill(child_pid, SIGCONT); } // The interrupt shouldn't affect the following send - tx1.send(b"Test", vec![], vec![]).unwrap(); - let (res, _, _) = server.recv().unwrap(); + tx1.send(b"Test", vec![], vec![], vec![]).unwrap(); + let (res, _, _, _) = server.recv().unwrap(); assert!(res == b"Success!"); child_pid.wait(); } @@ -464,8 +467,8 @@ fn receiver_set_empty() { let rx_id = rx_set.add(rx).unwrap(); let data: &[u8] = b""; - tx.send(data, vec![], vec![]).unwrap(); - let (received_id, received_data, _, _) = + tx.send(data, vec![], vec![], vec![]).unwrap(); + let (received_id, received_data, _, _, _) = rx_set.select().unwrap().into_iter().next().unwrap().unwrap(); assert_eq!(received_id, rx_id); assert!(received_data.is_empty()); @@ -511,12 +514,12 @@ fn receiver_set_medium_data() { let data0: Vec = (0..65536).map(|offset| (offset % 127) as u8).collect(); let data1: Vec = (0..65536).map(|offset| (offset % 127) as u8 | 0x80).collect(); - tx0.send(&*data0, vec![], vec![]).unwrap(); - tx1.send(&*data1, vec![], vec![]).unwrap(); + tx0.send(&*data0, vec![], vec![], vec![]).unwrap(); + tx1.send(&*data1, vec![], vec![], vec![]).unwrap(); let (mut received0, mut received1) = (false, false); while !received0 || !received1 { for result in rx_set.select().unwrap().into_iter() { - let (received_id, mut received_data, _, _) = result.unwrap(); + let (received_id, mut received_data, _, _, _) = result.unwrap(); received_data.truncate(65536); assert!(received_id == rx0_id || received_id == rx1_id); if received_id == rx0_id { @@ -545,18 +548,18 @@ fn receiver_set_big_data() { let (reference_data0, reference_data1) = (data0.clone(), data1.clone()); let thread0 = thread::spawn(move || { - tx0.send(&*data0, vec![], vec![]).unwrap(); + tx0.send(&*data0, vec![], vec![], vec![]).unwrap(); tx0 // Don't close just yet -- the receiver-side test code below doesn't expect that... }); let thread1 = thread::spawn(move || { - tx1.send(&*data1, vec![], vec![]).unwrap(); + tx1.send(&*data1, vec![], vec![], vec![]).unwrap(); tx1 }); let (mut received0, mut received1) = (false, false); while !received0 || !received1 { for result in rx_set.select().unwrap().into_iter() { - let (received_id, mut received_data, _, _) = result.unwrap(); + let (received_id, mut received_data, _, _, _) = result.unwrap(); received_data.truncate(1024 * 1024); assert!(received_id == rx0_id || received_id == rx1_id); if received_id == rx0_id { @@ -601,7 +604,7 @@ fn receiver_set_concurrent() { // The `macos` back-end won't receive exact size unless it's a multiple of 4... // (See https://github.com/servo/ipc-channel/pull/79 etc. ) let msg_size = msg_size & !3; - tx.send(&data[0..msg_size], vec![], vec![]).unwrap(); + tx.send(&data[0..msg_size], vec![], vec![], vec![]).unwrap(); } }); (thread, (rx_id, (reference_data, chan_index, 0usize))) @@ -610,7 +613,7 @@ fn receiver_set_concurrent() { while !receiver_records.is_empty() { for result in rx_set.select().unwrap().into_iter() { match result { - platform::OsIpcSelectionResult::DataReceived(rx_id, data, _, _) => { + platform::OsIpcSelectionResult::DataReceived(rx_id, data, _, _, _) => { let &mut (ref reference_data, chan_index, ref mut msg_index) = receiver_records.get_mut(&rx_id).unwrap(); let msg_size = (*msg_index * 99991 + chan_index * 90001) % max_msg_size; @@ -640,13 +643,13 @@ fn server_accept_first() { thread::spawn(move || { thread::sleep(Duration::from_millis(30)); let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); - let (_, received_data, received_channels, received_shared_memory_regions) = + let (_, received_data, received_channels, received_shared_memory_regions, received_descriptors) = server.accept().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[test] @@ -656,15 +659,15 @@ fn server_connect_first() { thread::spawn(move || { let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); thread::sleep(Duration::from_millis(30)); - let (_, mut received_data, received_channels, received_shared_memory_regions) = + let (_, mut received_data, received_channels, received_shared_memory_regions, received_descriptors) = server.accept().unwrap(); received_data.truncate(7); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] @@ -675,7 +678,7 @@ fn cross_process_spawn() { let channel_name = get_channel_name_arg("server"); if let Some(channel_name) = channel_name { let tx = OsIpcSender::connect(channel_name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); unsafe { libc::exit(0); } } @@ -683,11 +686,11 @@ fn cross_process_spawn() { let (server, name) = OsIpcOneShotServer::new().unwrap(); let mut child_pid = spawn_server("cross_process_spawn", &[("server", &*name)]); - let (_, received_data, received_channels, received_shared_memory_regions) = + let (_, received_data, received_channels, received_shared_memory_regions, received_descriptors) = server.accept().unwrap(); child_pid.wait().expect("failed to wait on child"); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios")))] @@ -698,14 +701,14 @@ fn cross_process_fork() { let child_pid = unsafe { fork(|| { let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); })}; - let (_, received_data, received_channels, received_shared_memory_regions) = + let (_, received_data, received_channels, received_shared_memory_regions, received_descriptors) = server.accept().unwrap(); child_pid.wait(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] @@ -716,10 +719,10 @@ fn cross_process_sender_transfer_spawn() { let super_tx = OsIpcSender::connect(channel_name).unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; - super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); + super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); sub_rx.recv().unwrap(); let data: &[u8] = b"bar"; - super_tx.send(data, vec![], vec![]).unwrap(); + super_tx.send(data, vec![], vec![], vec![]).unwrap(); unsafe { libc::exit(0); } } @@ -727,18 +730,18 @@ fn cross_process_sender_transfer_spawn() { let (server, name) = OsIpcOneShotServer::new().unwrap(); let mut child_pid = spawn_server("cross_process_sender_transfer_spawn", &[("server", &*name)]); - let (super_rx, _, mut received_channels, _) = server.accept().unwrap(); + let (super_rx, _, mut received_channels, _, _) = server.accept().unwrap(); assert_eq!(received_channels.len(), 1); let sub_tx = received_channels[0].to_sender(); let data: &[u8] = b"baz"; - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let data: &[u8] = b"bar"; - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = super_rx.recv().unwrap(); child_pid.wait().expect("failed to wait on child"); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios")))] @@ -750,24 +753,24 @@ fn cross_process_sender_transfer_fork() { let super_tx = OsIpcSender::connect(name).unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; - super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); + super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); sub_rx.recv().unwrap(); let data: &[u8] = b"bar"; - super_tx.send(data, vec![], vec![]).unwrap(); + super_tx.send(data, vec![], vec![], vec![]).unwrap(); })}; - let (super_rx, _, mut received_channels, _) = server.accept().unwrap(); + let (super_rx, _, mut received_channels, _, _) = server.accept().unwrap(); assert_eq!(received_channels.len(), 1); let sub_tx = received_channels[0].to_sender(); let data: &[u8] = b"baz"; - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let data: &[u8] = b"bar"; - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = super_rx.recv().unwrap(); child_pid.wait(); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (data, vec![], vec![], vec![])); } #[test] @@ -787,7 +790,7 @@ fn no_receiver_notification() { drop(receiver); let data: &[u8] = b"1234567"; loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -807,12 +810,12 @@ fn no_receiver_notification_pending() { let (sender, receiver) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; - let result = sender.send(data, vec![], vec![]); + let result = sender.send(data, vec![], vec![], vec![]); assert!(result.is_ok()); drop(receiver); loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -835,7 +838,7 @@ fn no_receiver_notification_delayed() { let data: &[u8] = b"1234567"; loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -851,11 +854,12 @@ fn shared_memory() { let (tx, rx) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; let shmem_data = OsIpcSharedMemory::from_byte(0xba, 1024 * 1024); - tx.send(data, vec![], vec![shmem_data]).unwrap(); - let (received_data, received_channels, received_shared_memory) = rx.recv().unwrap(); + tx.send(data, vec![], vec![shmem_data], vec![]).unwrap(); + let (received_data, received_channels, received_shared_memory, received_descriptors) = rx.recv().unwrap(); assert_eq!((&received_data[..], received_channels), (data, Vec::new())); assert_eq!(received_shared_memory[0].len(), 1024 * 1024); assert!(received_shared_memory[0].iter().all(|byte| *byte == 0xba)); + assert_eq!(received_descriptors.len(), 0); } #[test] @@ -870,10 +874,10 @@ fn try_recv() { let (tx, rx) = platform::channel().unwrap(); assert!(rx.try_recv().is_err()); let data: &[u8] = b"1234567"; - tx.send(data, Vec::new(), Vec::new()).unwrap(); - let (received_data, received_channels, received_shared_memory) = rx.try_recv().unwrap(); - assert_eq!((&received_data[..], received_channels, received_shared_memory), - (data, Vec::new(), Vec::new())); + tx.send(data, Vec::new(), Vec::new(), Vec::new()).unwrap(); + let (received_data, received_channels, received_shared_memory, received_descriptors) = rx.try_recv().unwrap(); + assert_eq!((&received_data[..], received_channels, received_shared_memory, received_descriptors), + (data, Vec::new(), Vec::new(), Vec::new())); assert!(rx.try_recv().is_err()); } @@ -932,7 +936,7 @@ fn try_recv_large() { let thread = thread::spawn(move || { let data: Vec = (0.. 1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); let mut result; @@ -941,12 +945,12 @@ fn try_recv_large() { result.is_err() } {} thread.join().unwrap(); - let (received_data, received_channels, received_shared_memory) = result.unwrap(); + let (received_data, received_channels, received_shared_memory, received_descriptors) = result.unwrap(); let data: Vec = (0.. 1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - assert_eq!((&received_data[..], received_channels, received_shared_memory), - (data, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory, received_descriptors), + (data, vec![], vec![], vec![])); assert!(rx.try_recv().is_err()); } @@ -995,7 +999,7 @@ fn try_recv_large_delayed() { let data: Vec = (0..msg_size).map(|j| (j % 13) as u8 | i << 4).collect(); let data: &[u8] = &data[..]; delay(thread_delay); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }) }).collect(); @@ -1006,7 +1010,7 @@ fn try_recv_large_delayed() { result = rx.try_recv(); result.is_err() } {} - let (received_data, received_channels, received_shared_memory) = result.unwrap(); + let (received_data, received_channels, received_shared_memory, received_descriptors) = result.unwrap(); let val = received_data[0] >> 4; received_vals.push(val); @@ -1015,6 +1019,7 @@ fn try_recv_large_delayed() { assert_eq!(received_data.len(), data.len()); assert_eq!((&received_data[..], received_channels, received_shared_memory), (data, vec![], vec![])); + assert_eq!(received_descriptors.len(), 0); } assert!(rx.try_recv().is_err()); // There should be no further messages pending. received_vals.sort(); @@ -1072,24 +1077,24 @@ fn cross_process_two_step_transfer_spawn() { let (sub_tx, sub_rx) = platform::channel().unwrap(); // send the other process the tx side, so it can send us the channels - super_tx.send(&[], vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap(); + super_tx.send(&[], vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]).unwrap(); // get two_rx from the other process - let (_, mut received_channels, _) = sub_rx.recv().unwrap(); + let (_, mut received_channels, _, _) = sub_rx.recv().unwrap(); assert_eq!(received_channels.len(), 1); let two_rx = received_channels[0].to_receiver(); // get one_rx from two_rx's buffer - let (_, mut received_channels, _) = two_rx.recv().unwrap(); + let (_, mut received_channels, _, _) = two_rx.recv().unwrap(); assert_eq!(received_channels.len(), 1); let one_rx = received_channels[0].to_receiver(); // get a cookie from one_rx - let (data, _, _) = one_rx.recv().unwrap(); + let (data, _, _, _) = one_rx.recv().unwrap(); assert_eq!(&data[..], cookie); // finally, send a cookie back - super_tx.send(&data, vec![], vec![]).unwrap(); + super_tx.send(&data, vec![], vec![], vec![]).unwrap(); // terminate unsafe { libc::exit(0); } @@ -1098,12 +1103,12 @@ fn cross_process_two_step_transfer_spawn() { // create channel 1 let (one_tx, one_rx) = platform::channel().unwrap(); // put data in channel 1's pipe - one_tx.send(cookie, vec![], vec![]).unwrap(); + one_tx.send(cookie, vec![], vec![], vec![]).unwrap(); // create channel 2 let (two_tx, two_rx) = platform::channel().unwrap(); // put channel 1's rx end in channel 2's pipe - two_tx.send(&[], vec![OsIpcChannel::Receiver(one_rx)], vec![]).unwrap(); + two_tx.send(&[], vec![OsIpcChannel::Receiver(one_rx)], vec![], vec![]).unwrap(); // create a one-shot server, and spawn another process let (server, name) = OsIpcOneShotServer::new().unwrap(); @@ -1111,19 +1116,19 @@ fn cross_process_two_step_transfer_spawn() { &[("server", &*name)]); // The other process will have sent us a transmit channel in received channels - let (super_rx, _, mut received_channels, _) = server.accept().unwrap(); + let (super_rx, _, mut received_channels, _, _) = server.accept().unwrap(); assert_eq!(received_channels.len(), 1); let sub_tx = received_channels[0].to_sender(); // Send the outer payload channel, so the server can use it to // retrive the inner payload and the cookie - sub_tx.send(&[], vec![OsIpcChannel::Receiver(two_rx)], vec![]).unwrap(); + sub_tx.send(&[], vec![OsIpcChannel::Receiver(two_rx)], vec![], vec![]).unwrap(); // Then we wait for the cookie to make its way back to us - let (received_data, received_channels, received_shared_memory_regions) = + let (received_data, received_channels, received_shared_memory_regions, received_descriptors) = super_rx.recv().unwrap(); let child_exit_code = child_pid.wait().expect("failed to wait on child"); assert!(child_exit_code.success()); - assert_eq!((&received_data[..], received_channels, received_shared_memory_regions), - (cookie, vec![], vec![])); + assert_eq!((&received_data[..], received_channels, received_shared_memory_regions, received_descriptors), + (cookie, vec![], vec![], vec![])); } diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index 55de4833..ce961205 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -8,12 +8,13 @@ // except according to those terms. use crate::ipc; +use crate::platform::Descriptor; use bincode; use fnv::FnvHasher; use libc::{EAGAIN, EWOULDBLOCK}; use libc::{self, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET}; -use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_void, getsockopt}; -use libc::{iovec, mode_t, msghdr, off_t, recvmsg, sendmsg}; +use libc::{SO_LINGER, c_char, c_int, c_void, getsockopt}; +use libc::{iovec, msghdr, off_t, recvmsg, sendmsg}; use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_family_t}; use std::cell::Cell; use std::cmp; @@ -35,6 +36,7 @@ use std::thread; use mio::unix::EventedFd; use mio::{Poll, Token, Events, Ready, PollOpt}; use tempfile::{Builder, TempDir}; +use std::os::unix::io::AsRawFd; const MAX_FDS_IN_CMSG: u32 = 64; @@ -140,12 +142,12 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { recv(self.fd.get(), BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { recv(self.fd.get(), BlockingMode::Nonblocking) } } @@ -234,9 +236,16 @@ impl OsIpcSender { pub fn send(&self, data: &[u8], channels: Vec, - shared_memory_regions: Vec) + shared_memory_regions: Vec, + descriptors: Vec) -> Result<(),UnixError> { + let header = Header { + total_size: data.len(), + channel_fd_num: channels.len(), + shared_memory_fd_num: shared_memory_regions.len(), + descriptor_num: descriptors.len(), + }; let mut fds = Vec::new(); for channel in channels.iter() { fds.push(channel.fd()); @@ -245,13 +254,17 @@ impl OsIpcSender { fds.push(shared_memory_region.store.fd()); } + for descriptor in descriptors.iter() { + fds.push(descriptor.as_raw_fd()); + } + // `len` is the total length of the message. // Its value will be sent as a message header before the payload data. // // Not to be confused with the length of the data to send in this packet // (i.e. the length of the data buffer passed in), // which in a fragmented send will be smaller than the total message length. - fn send_first_fragment(sender_fd: c_int, fds: &[c_int], data_buffer: &[u8], len: usize) + fn send_first_fragment(sender_fd: c_int, fds: &[c_int], data_buffer: &[u8], header: &Header) -> Result<(),UnixError> { let result = unsafe { let cmsg_length = mem::size_of_val(fds); @@ -276,8 +289,9 @@ impl OsIpcSender { // whether it already got the entire message, // or needs to receive additional fragments -- and if so, how much. iovec { - iov_base: &len as *const _ as *mut c_void, - iov_len: mem::size_of_val(&len), + //TODO: Header serialization is not really nicely done + iov_base: header as *const _ as *mut c_void, + iov_len: mem::size_of::
(), }, iovec { iov_base: data_buffer.as_ptr() as *mut c_void, @@ -296,7 +310,7 @@ impl OsIpcSender { } else { Err(UnixError::last()) } - }; + } fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(),UnixError> { let result = unsafe { @@ -338,7 +352,7 @@ impl OsIpcSender { // If the message is small enough, try sending it in a single fragment. if data.len() <= Self::get_max_fragment_size() { - match send_first_fragment(self.fd.0, &fds[..], data, data.len()) { + match send_first_fragment(self.fd.0, &fds[..], data, &header) { Ok(_) => return Ok(()), Err(error) => { // ENOBUFS means the kernel failed to allocate a buffer large enough @@ -377,7 +391,7 @@ impl OsIpcSender { // This fragment always uses the full allowable buffer size. end_byte_position = Self::first_fragment_size(sendbuf_size); - send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len()) + send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], &header) } else { // Followup fragment. No header; but offset by amount of data already sent. @@ -499,12 +513,13 @@ impl OsIpcReceiverSet { match (evt.readiness().is_readable(), self.pollfds.get(&evt_token)) { (true, Some(&poll_entry)) => { match recv(poll_entry.fd, BlockingMode::Blocking) { - Ok((data, channels, shared_memory_regions)) => { + Ok((data, channels, shared_memory_regions, descriptors)) => { selection_results.push(OsIpcSelectionResult::DataReceived( poll_entry.id, data, channels, - shared_memory_regions)); + shared_memory_regions, + descriptors)); } Err(err) if err.channel_is_closed() => { self.pollfds.remove(&evt_token).unwrap(); @@ -532,15 +547,15 @@ impl OsIpcReceiverSet { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { - OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => { - (id, data, channels, shared_memory_regions) + OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { + (id, data, channels, shared_memory_regions, descriptors) } OsIpcSelectionResult::ChannelClosed(id) => { panic!("OsIpcSelectionResult::unwrap(): receiver ID {} was closed!", id) @@ -607,7 +622,8 @@ impl OsIpcOneShotServer { let socket_path = temp_dir.path().join("socket"); let path_string = socket_path.to_str().unwrap(); - let (sockaddr, len) = new_sockaddr_un(CString::new(path_string).unwrap().as_ptr()); + let path_c_string = CString::new(path_string).unwrap(); + let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr()); if libc::bind(fd, &sockaddr as *const _ as *const sockaddr, len as socklen_t) != 0 { return Err(UnixError::last()); } @@ -626,7 +642,8 @@ impl OsIpcOneShotServer { pub fn accept(self) -> Result<(OsIpcReceiver, Vec, Vec, - Vec),UnixError> { + Vec, + Vec),UnixError> { unsafe { let sockaddr: *mut sockaddr = ptr::null_mut(); let sockaddr_len: *mut socklen_t = ptr::null_mut(); @@ -637,8 +654,8 @@ impl OsIpcOneShotServer { make_socket_lingering(client_fd)?; let receiver = OsIpcReceiver::from_fd(client_fd); - let (data, channels, shared_memory_regions) = receiver.recv()?; - Ok((receiver, data, channels, shared_memory_regions)) + let (data, channels, shared_memory_regions, descriptors) = receiver.recv()?; + Ok((receiver, data, channels, shared_memory_regions, descriptors)) } } } @@ -693,7 +710,7 @@ impl BackingStore { pub unsafe fn map_file(&self, length: Option) -> (*mut u8, size_t) { let length = length.unwrap_or_else(|| { - let mut st = mem::uninitialized(); + let mut st = mem::MaybeUninit::uninit().assume_init(); assert!(libc::fstat(self.fd, &mut st) == 0); st.st_size as size_t }); @@ -893,26 +910,35 @@ enum BlockingMode { Nonblocking, } +#[derive(Default, Debug, Clone, Copy)] +#[repr(C)] struct Header { + total_size: usize, + channel_fd_num: usize, + shared_memory_fd_num: usize, + descriptor_num: usize, +} + fn recv(fd: c_int, blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { - let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new()); + let (mut channels, mut shared_memory_regions, mut descriptors) = (Vec::new(), Vec::new(), Vec::new()); // First fragments begins with a header recording the total data length. // // We use this to determine whether we already got the entire message, // or need to receive additional fragments -- and if so, how much. - let mut total_size = 0usize; + let mut header; let mut main_data_buffer; unsafe { + header = Header::default(); // Allocate a buffer without initialising the memory. main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size()); main_data_buffer.set_len(OsIpcSender::get_max_fragment_size()); let mut iovec = [ iovec { - iov_base: &mut total_size as *mut _ as *mut c_void, - iov_len: mem::size_of_val(&total_size), + iov_base: &mut header as *mut _ as *mut c_void, + iov_len: mem::size_of::
(), }, iovec { iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, @@ -922,7 +948,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) let mut cmsg = UnixCmsg::new(&mut iovec); let bytes_read = cmsg.recv(fd, blocking_mode)?; - main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size)); + main_data_buffer.set_len(bytes_read - mem::size_of::
()); let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; let cmsg_length = cmsg.msghdr.msg_controllen; @@ -931,19 +957,23 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) } else { (cmsg.cmsg_len() - CMSG_ALIGN(mem::size_of::())) / mem::size_of::() }; - for index in 0..channel_length { - let fd = *cmsg_fds.offset(index as isize); - if is_socket(fd) { - channels.push(OsOpaqueIpcChannel::from_fd(fd)); - continue - } - shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + assert_eq!(header.channel_fd_num + header.shared_memory_fd_num + header.descriptor_num, channel_length); + for index in 0..header.channel_fd_num { + channels.push(OsOpaqueIpcChannel::from_fd(*cmsg_fds.offset(index as isize))); + } + + for index in header.channel_fd_num .. (header.channel_fd_num + header.shared_memory_fd_num) { + shared_memory_regions.push(OsIpcSharedMemory::from_fd(*cmsg_fds.offset(index as isize))); + } + + for index in (header.channel_fd_num + header.shared_memory_fd_num) .. (header.channel_fd_num + header.shared_memory_fd_num + header.descriptor_num) { + descriptors.push(Descriptor::new(*cmsg_fds.offset(index as isize))); } } - if total_size == main_data_buffer.len() { + if header.total_size == main_data_buffer.len() { // Fast path: no fragments. - return Ok((main_data_buffer, channels, shared_memory_regions)) + return Ok((main_data_buffer, channels, shared_memory_regions, descriptors)) } // Reassemble fragments. @@ -954,13 +984,13 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) // Extend the buffer to hold the entire message, without initialising the memory. let len = main_data_buffer.len(); - main_data_buffer.reserve_exact(total_size - len); + main_data_buffer.reserve_exact(header.total_size - len); // Receive followup fragments directly into the main buffer. - while main_data_buffer.len() < total_size { + while main_data_buffer.len() < header.total_size { let write_pos = main_data_buffer.len(); let end_pos = cmp::min(write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE), - total_size); + header.total_size); let result = unsafe { assert!(end_pos <= main_data_buffer.capacity()); main_data_buffer.set_len(end_pos); @@ -986,7 +1016,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) }; } - Ok((main_data_buffer, channels, shared_memory_regions)) + Ok((main_data_buffer, channels, shared_memory_regions, descriptors)) } // https://github.com/servo/ipc-channel/issues/192 @@ -1082,16 +1112,6 @@ impl UnixCmsg { } } -fn is_socket(fd: c_int) -> bool { - unsafe { - let mut st = mem::uninitialized(); - if libc::fstat(fd, &mut st) != 0 { - return false - } - S_ISSOCK(st.st_mode as mode_t) - } -} - // FFI stuff follows: #[cfg(all(feature="memfd", target_os="linux"))] @@ -1120,11 +1140,6 @@ fn CMSG_SPACE(length: size_t) -> size_t { CMSG_ALIGN(length) + CMSG_ALIGN(mem::size_of::()) } -#[allow(non_snake_case)] -fn S_ISSOCK(mode: mode_t) -> bool { - (mode & S_IFMT) == S_IFSOCK -} - #[repr(C)] struct cmsghdr { cmsg_len: MsgControlLen, diff --git a/src/platform/windows/handle.rs b/src/platform/windows/handle.rs new file mode 100644 index 00000000..913ad2cd --- /dev/null +++ b/src/platform/windows/handle.rs @@ -0,0 +1,193 @@ +use std::mem; +use std::thread; +// use std::ops::{Deref, DerefMut}; +use std::os::windows::io::{ + AsRawHandle, + RawHandle, + IntoRawHandle, + FromRawHandle, +}; +use std::fs::File; +use std::ffi::CString; +use std::default::Default; +use std::cell::{RefCell}; + +use winapi::um::winnt::{HANDLE}; +use winapi::um::handleapi::{INVALID_HANDLE_VALUE}; + +use super::{WinError, CURRENT_PROCESS_HANDLE}; + +#[derive(Debug)] +pub struct WinHandle { + h: RefCell, +} + +unsafe impl Send for WinHandle { } +unsafe impl Sync for WinHandle { } + +impl Drop for WinHandle { + fn drop(&mut self) { + unsafe { + if self.is_valid() { + let result = winapi::um::handleapi::CloseHandle(*self.h.borrow()); + assert!(thread::panicking() || result != 0); + } + } + } +} + +impl Default for WinHandle { + fn default() -> WinHandle { + WinHandle { h: RefCell::new(INVALID_HANDLE_VALUE) } + } +} + +// impl Deref for WinHandle { +// type Target = WinHandle; + +// fn deref(&self) -> &Self::Target { +// &self.h +// } +// } + +// impl DerefMut for WinHandle { +// fn deref_mut(&mut self) -> &mut Self::Target { +// &mut self.h +// } +// } + +impl IntoRawHandle for WinHandle { + fn into_raw_handle(self) -> RawHandle { + let handle = *self.h.borrow(); + mem::forget(self); + handle + } +} + +impl AsRawHandle for WinHandle { + fn as_raw_handle(& self) -> RawHandle { + *self.h.borrow() + } +} + +impl FromRawHandle for WinHandle { + unsafe fn from_raw_handle(handle: RawHandle) -> WinHandle { + WinHandle::new(handle) + } +} + +impl Into for WinHandle { + fn into(self) -> File { + unsafe { + File::from_raw_handle(self.into_raw_handle()) + } + } +} + +impl From for WinHandle { + fn from(file: File) -> Self { + WinHandle::new(file.into_raw_handle()) + } +} + +const WINDOWS_APP_MODULE_NAME: &'static str = "api-ms-win-core-handle-l1-1-0"; +const COMPARE_OBJECT_HANDLES_FUNCTION_NAME: &'static str = "CompareObjectHandles"; + +lazy_static! { + static ref WINDOWS_APP_MODULE_NAME_CSTRING: CString = CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); + static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: CString = CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); +} + +#[cfg(feature = "windows-shared-memory-equality")] +impl PartialEq for WinHandle { + fn eq(&self, other: &WinHandle) -> bool { + unsafe { + // Calling LoadLibraryA every time seems to be ok since libraries are refcounted and multiple calls won't produce multiple instances. + let module_handle = winapi::um::libloaderapi::LoadLibraryA(WINDOWS_APP_MODULE_NAME_CSTRING.as_ptr()); + if module_handle.is_null() { + panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, WinError::error_string(GetLastError())); + } + let proc = winapi::um::libloaderapi::GetProcAddress(module_handle, COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING.as_ptr()); + if proc.is_null() { + panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, WinError::error_string(GetLastError())); + } + let compare_object_handles: unsafe extern "stdcall" fn(HANDLE, HANDLE) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); + compare_object_handles(self.h, other.h) != 0 + } + } +} + +impl WinHandle { + pub fn new(h: HANDLE) -> WinHandle { + WinHandle { h: RefCell::new(h) } + } + + pub fn invalid() -> WinHandle { + WinHandle { h: RefCell::new(INVALID_HANDLE_VALUE) } + } + + pub fn is_valid(&self) -> bool { + *self.h.borrow() != INVALID_HANDLE_VALUE + } + + pub(crate) fn as_raw(&self) -> HANDLE { + *self.h.borrow() + } + + pub(crate) fn take_raw(&mut self) -> HANDLE { + self.h.replace(INVALID_HANDLE_VALUE) + } + + pub(crate) fn take(&mut self) -> WinHandle { + WinHandle::new(self.take_raw()) + } + + pub(crate) fn consume(&self) -> WinHandle { + WinHandle::new(self.h.replace(INVALID_HANDLE_VALUE)) + } +} + +/// Duplicate a given handle from this process to the target one, passing the +/// given flags to DuplicateHandle. +/// +/// Unlike win32 DuplicateHandle, this will preserve INVALID_HANDLE_VALUE (which is +/// also the pseudohandle for the current process). +pub fn dup_handle_to_process_with_flags(handle: &WinHandle, other_process: &WinHandle, flags: winapi::shared::minwindef::DWORD) + -> Result +{ + if !handle.is_valid() { + return Ok(WinHandle::invalid()); + } + + unsafe { + let mut new_handle: HANDLE = INVALID_HANDLE_VALUE; + let ok = winapi::um::handleapi::DuplicateHandle(CURRENT_PROCESS_HANDLE.as_raw(), handle.as_raw(), + other_process.as_raw(), &mut new_handle, + 0, winapi::shared::minwindef::FALSE, flags); + if ok == winapi::shared::minwindef::FALSE { + Err(WinError::last("DuplicateHandle")) + } else { + Ok(WinHandle::new(new_handle)) + } + } +} + +/// Duplicate a handle in the current process. +pub fn dup_handle(handle: &WinHandle) -> Result { + dup_handle_to_process(handle, &WinHandle::new(CURRENT_PROCESS_HANDLE.as_raw())) +} + +/// Duplicate a handle to the target process. +pub fn dup_handle_to_process(handle: &WinHandle, other_process: &WinHandle) -> Result { + dup_handle_to_process_with_flags(handle, other_process, winapi::um::winnt::DUPLICATE_SAME_ACCESS) +} + +/// Duplicate a handle to the target process, closing the source handle. +pub fn move_handle_to_process(handle: WinHandle, other_process: &WinHandle) -> Result { + let result = dup_handle_to_process_with_flags(&handle, other_process, + winapi::um::winnt::DUPLICATE_CLOSE_SOURCE | winapi::um::winnt::DUPLICATE_SAME_ACCESS); + // Since the handle was moved to another process, the original is no longer valid; + // so we probably shouldn't try to close it explicitly? + mem::forget(handle); + result +} \ No newline at end of file diff --git a/src/platform/windows/mod.rs b/src/platform/windows/mod.rs index f4ebcd86..3e87f7cc 100644 --- a/src/platform/windows/mod.rs +++ b/src/platform/windows/mod.rs @@ -14,7 +14,6 @@ use crate::ipc; use libc::intptr_t; use std::cell::{Cell, RefCell}; use std::cmp::PartialEq; -use std::default::Default; use std::env; use std::error::Error as StdError; use std::ffi::CString; @@ -27,13 +26,18 @@ use std::ptr; use std::slice; use std::thread; use uuid::Uuid; -use winapi::um::winnt::{HANDLE}; -use winapi::um::handleapi::{INVALID_HANDLE_VALUE}; use winapi::shared::minwindef::{LPVOID}; use winapi; + +use winapi::um::winnt::{HANDLE}; +use winapi::um::handleapi::{INVALID_HANDLE_VALUE}; + mod aliased_cell; +pub mod handle; use self::aliased_cell::AliasedCell; +use crate::platform::Descriptor; +use self::handle::{WinHandle, dup_handle, dup_handle_to_process, move_handle_to_process}; lazy_static! { static ref CURRENT_PROCESS_ID: winapi::shared::ntdef::ULONG = unsafe { winapi::um::processthreadsapi::GetCurrentProcessId() }; @@ -180,7 +184,7 @@ impl<'data> Message<'data> { /// make sure that the message was originally sent to it, and was not sitting /// in another channel's buffer when that channel got transferred to another /// process. On Windows, we duplicate handles on the sender side to a specific -/// reciever. If the wrong receiver gets it, those handles are not valid. +/// receiver. If the wrong receiver gets it, those handles are not valid. /// /// TODO(vlad): We could attempt to recover from the above situation by /// duplicating from the intended target process to ourselves (the receiver). @@ -199,6 +203,7 @@ struct OutOfBandMessage { channel_handles: Vec, shmem_handles: Vec<(intptr_t, u64)>, // handle and size big_data_receiver_handle: Option<(intptr_t, u64)>, // handle and size + descriptor_handles: Vec, } impl OutOfBandMessage { @@ -208,13 +213,15 @@ impl OutOfBandMessage { channel_handles: vec![], shmem_handles: vec![], big_data_receiver_handle: None, + descriptor_handles: vec![], } } fn needs_to_be_sent(&self) -> bool { !self.channel_handles.is_empty() || !self.shmem_handles.is_empty() || - self.big_data_receiver_handle.is_some() + self.big_data_receiver_handle.is_some() || + !self.descriptor_handles.is_empty() } } @@ -225,7 +232,8 @@ impl serde::Serialize for OutOfBandMessage { ((self.target_process_id, &self.channel_handles, &self.shmem_handles, - &self.big_data_receiver_handle)).serialize(serializer) + &self.big_data_receiver_handle, + &self.descriptor_handles)).serialize(serializer) } } @@ -233,13 +241,14 @@ impl<'de> serde::Deserialize<'de> for OutOfBandMessage { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de> { - let (target_process_id, channel_handles, shmem_handles, big_data_receiver_handle) = + let (target_process_id, channel_handles, shmem_handles, big_data_receiver_handle, descriptor_handles) = serde::Deserialize::deserialize(deserializer)?; Ok(OutOfBandMessage { target_process_id: target_process_id, channel_handles: channel_handles, shmem_handles: shmem_handles, - big_data_receiver_handle: big_data_receiver_handle + big_data_receiver_handle: big_data_receiver_handle, + descriptor_handles }) } } @@ -252,129 +261,6 @@ fn make_pipe_name(pipe_id: &Uuid) -> CString { CString::new(format!("\\\\.\\pipe\\rust-ipc-{}", pipe_id.to_string())).unwrap() } -/// Duplicate a given handle from this process to the target one, passing the -/// given flags to DuplicateHandle. -/// -/// Unlike win32 DuplicateHandle, this will preserve INVALID_HANDLE_VALUE (which is -/// also the pseudohandle for the current process). -fn dup_handle_to_process_with_flags(handle: &WinHandle, other_process: &WinHandle, flags: winapi::shared::minwindef::DWORD) - -> Result -{ - if !handle.is_valid() { - return Ok(WinHandle::invalid()); - } - - unsafe { - let mut new_handle: HANDLE = INVALID_HANDLE_VALUE; - let ok = winapi::um::handleapi::DuplicateHandle(CURRENT_PROCESS_HANDLE.as_raw(), handle.as_raw(), - other_process.as_raw(), &mut new_handle, - 0, winapi::shared::minwindef::FALSE, flags); - if ok == winapi::shared::minwindef::FALSE { - Err(WinError::last("DuplicateHandle")) - } else { - Ok(WinHandle::new(new_handle)) - } - } -} - -/// Duplicate a handle in the current process. -fn dup_handle(handle: &WinHandle) -> Result { - dup_handle_to_process(handle, &WinHandle::new(CURRENT_PROCESS_HANDLE.as_raw())) -} - -/// Duplicate a handle to the target process. -fn dup_handle_to_process(handle: &WinHandle, other_process: &WinHandle) -> Result { - dup_handle_to_process_with_flags(handle, other_process, winapi::um::winnt::DUPLICATE_SAME_ACCESS) -} - -/// Duplicate a handle to the target process, closing the source handle. -fn move_handle_to_process(handle: WinHandle, other_process: &WinHandle) -> Result { - let result = dup_handle_to_process_with_flags(&handle, other_process, - winapi::um::winnt::DUPLICATE_CLOSE_SOURCE | winapi::um::winnt::DUPLICATE_SAME_ACCESS); - // Since the handle was moved to another process, the original is no longer valid; - // so we probably shouldn't try to close it explicitly? - mem::forget(handle); - result -} - -#[derive(Debug)] -struct WinHandle { - h: HANDLE -} - -unsafe impl Send for WinHandle { } -unsafe impl Sync for WinHandle { } - -impl Drop for WinHandle { - fn drop(&mut self) { - unsafe { - if self.is_valid() { - let result = winapi::um::handleapi::CloseHandle(self.h); - assert!(thread::panicking() || result != 0); - } - } - } -} - -impl Default for WinHandle { - fn default() -> WinHandle { - WinHandle { h: INVALID_HANDLE_VALUE } - } -} - -const WINDOWS_APP_MODULE_NAME: &'static str = "api-ms-win-core-handle-l1-1-0"; -const COMPARE_OBJECT_HANDLES_FUNCTION_NAME: &'static str = "CompareObjectHandles"; - -lazy_static! { - static ref WINDOWS_APP_MODULE_NAME_CSTRING: CString = CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); - static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: CString = CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); -} - -#[cfg(feature = "windows-shared-memory-equality")] -impl PartialEq for WinHandle { - fn eq(&self, other: &WinHandle) -> bool { - unsafe { - // Calling LoadLibraryA every time seems to be ok since libraries are refcounted and multiple calls won't produce multiple instances. - let module_handle = winapi::um::libloaderapi::LoadLibraryA(WINDOWS_APP_MODULE_NAME_CSTRING.as_ptr()); - if module_handle.is_null() { - panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, WinError::error_string(GetLastError())); - } - let proc = winapi::um::libloaderapi::GetProcAddress(module_handle, COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING.as_ptr()); - if proc.is_null() { - panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, WinError::error_string(GetLastError())); - } - let compare_object_handles: unsafe extern "stdcall" fn(HANDLE, HANDLE) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); - compare_object_handles(self.h, other.h) != 0 - } - } -} - -impl WinHandle { - fn new(h: HANDLE) -> WinHandle { - WinHandle { h: h } - } - - fn invalid() -> WinHandle { - WinHandle { h: INVALID_HANDLE_VALUE } - } - - fn is_valid(&self) -> bool { - self.h != INVALID_HANDLE_VALUE - } - - fn as_raw(&self) -> HANDLE { - self.h - } - - fn take_raw(&mut self) -> HANDLE { - mem::replace(&mut self.h, INVALID_HANDLE_VALUE) - } - - fn take(&mut self) -> WinHandle { - WinHandle::new(self.take_raw()) - } -} - /// Helper struct for all data being aliased by the kernel during async reads. #[derive(Debug)] struct AsyncData { @@ -756,7 +642,7 @@ impl MessageReader { } } - fn get_message(&mut self) -> Result, Vec, Vec)>, + fn get_message(&mut self) -> Result, Vec, Vec, Vec)>, WinError> { // Never touch the buffer while it's still mutably aliased by the kernel! if self.r#async.is_some() { @@ -769,6 +655,7 @@ impl MessageReader { let mut channels: Vec = vec![]; let mut shmems: Vec = vec![]; let mut big_data = None; + let mut descriptors: Vec = vec![]; if let Some(oob) = message.oob_data() { win32_trace!("[$ {:?}] msg with total {} bytes, {} channels, {} shmems, big data handle {:?}", @@ -785,6 +672,10 @@ impl MessageReader { ).unwrap()); } + for handle in oob.descriptor_handles { + descriptors.push(WinHandle::new(handle as HANDLE)); + } + if oob.big_data_receiver_handle.is_some() { let (handle, big_data_size) = oob.big_data_receiver_handle.unwrap(); let receiver = OsIpcReceiver::from_handle(WinHandle::new(handle as HANDLE)); @@ -797,7 +688,7 @@ impl MessageReader { win32_trace!("[$ {:?}] get_message success -> {} bytes, {} channels, {} shmems", self.handle, buf_data.len(), channels.len(), shmems.len()); drain_bytes = Some(message.size()); - result = Some((buf_data, channels, shmems)); + result = Some((buf_data, channels, shmems, descriptors)); } else { drain_bytes = None; result = None; @@ -1006,7 +897,7 @@ impl OsIpcReceiver { // the implementation in select() is used. It does much the same thing, but across multiple // channels. fn receive_message(&self, mut blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { let mut reader = self.reader.borrow_mut(); assert!(reader.entry_id.is_none(), "receive_message is only valid before this OsIpcReceiver was added to a Set"); @@ -1015,8 +906,8 @@ impl OsIpcReceiver { loop { // First, try to fetch a message, in case we have one pending // in the reader's receive buffer - if let Some((data, channels, shmems)) = reader.get_message()? { - return Ok((data, channels, shmems)); + if let Some((data, channels, shmems, descriptors)) = reader.get_message()? { + return Ok((data, channels, shmems, descriptors)); } // Then, issue a read if we don't have one already in flight. @@ -1037,13 +928,13 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { win32_trace!("recv"); self.receive_message(BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { win32_trace!("try_recv"); self.receive_message(BlockingMode::Nonblocking) } @@ -1217,7 +1108,8 @@ impl OsIpcSender { pub fn send(&self, data: &[u8], ports: Vec, - shared_memory_regions: Vec) + shared_memory_regions: Vec, + descriptors: Vec) -> Result<(),WinError> { // We limit the max size we can send here; we can fix this @@ -1225,7 +1117,7 @@ impl OsIpcSender { // to. assert!(data.len() <= u32::max_value() as usize); - let (server_h, server_pid) = if !shared_memory_regions.is_empty() || !ports.is_empty() { + let (server_h, server_pid) = if !shared_memory_regions.is_empty() || !ports.is_empty() || !descriptors.is_empty() { self.get_pipe_server_process_handle_and_pid()? } else { (WinHandle::invalid(), 0) @@ -1257,6 +1149,11 @@ impl OsIpcSender { } } + for descriptor in descriptors { + let mut raw_remote_handle = move_handle_to_process(descriptor, &server_h)?; + oob.descriptor_handles.push(raw_remote_handle.take_raw() as intptr_t); + } + // Do we need to fragment? let big_data_sender: Option = if OsIpcSender::needs_fragmentation(data.len(), &oob) { @@ -1323,7 +1220,7 @@ impl OsIpcSender { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } @@ -1506,9 +1403,9 @@ impl OsIpcReceiverSet { if !closed { // Drain as many messages as we can. - while let Some((data, channels, shmems)) = reader.get_message()? { + while let Some((data, channels, shmems, descriptors)) = reader.get_message()? { win32_trace!("[# {:?}] receiver {:?} ({}) got a message", self.iocp.as_raw(), reader.get_raw_handle(), reader.entry_id.unwrap()); - selection_results.push(OsIpcSelectionResult::DataReceived(reader.entry_id.unwrap(), data, channels, shmems)); + selection_results.push(OsIpcSelectionResult::DataReceived(reader.entry_id.unwrap(), data, channels, shmems, descriptors)); } win32_trace!("[# {:?}] receiver {:?} ({}) -- no message", self.iocp.as_raw(), reader.get_raw_handle(), reader.entry_id.unwrap()); @@ -1545,10 +1442,10 @@ impl OsIpcReceiverSet { } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { - OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => { - (id, data, channels, shared_memory_regions) + OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { + (id, data, channels, shared_memory_regions, descriptors) } OsIpcSelectionResult::ChannelClosed(id) => { panic!("OsIpcSelectionResult::unwrap(): receiver ID {} was closed!", id) @@ -1685,11 +1582,12 @@ impl OsIpcOneShotServer { pub fn accept(self) -> Result<(OsIpcReceiver, Vec, Vec, - Vec),WinError> { + Vec, + Vec),WinError> { let receiver = self.receiver; receiver.accept()?; - let (data, channels, shmems) = receiver.recv()?; - Ok((receiver, data, channels, shmems)) + let (data, channels, shmems, descriptors) = receiver.recv()?; + Ok((receiver, data, channels, shmems, descriptors)) } } From 72a679626c709bdb0f57acf140c4154aef1a5478 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Sun, 16 May 2021 21:32:19 +0200 Subject: [PATCH 3/9] Add test for descriptor transfer --- src/test.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/test.rs b/src/test.rs index 7303c8fb..d50bb64c 100644 --- a/src/test.rs +++ b/src/test.rs @@ -652,3 +652,27 @@ fn test_receiver_stream() { _ => panic!("Stream should have 5"), }; } + +#[test] +fn test_transfer_descriptor() { + let person = ("Patrick Walton".to_owned(), 29); + let person_clone = person.clone(); + let temp_file_path = std::env::temp_dir().join("ipc-channel-test.txt" ); + let mut file = std::fs::File::create(& temp_file_path).unwrap(); + let text = "This is a text string"; + use std::io::Write; + file.write(text.as_bytes()); + std::mem::drop(file); + let file = std::fs::File::open(& temp_file_path).unwrap(); + + let person_and_descriptor = (person, crate::platform::Descriptor::from(file)); + let (tx, rx) = ipc::channel().unwrap(); + tx.send(person_and_descriptor).unwrap(); + let received_person_and_descriptor = rx.recv().unwrap(); + assert_eq!(received_person_and_descriptor.0, person_clone); + let mut file: std::fs::File = received_person_and_descriptor.1.into(); + use std::io::Read; + let mut read_text = String::new(); + file.read_to_string(&mut read_text); + assert_eq!(text, read_text); +} From 7e4d1590dbf99cc405d7fc77a2a435f6f3e29035 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Thu, 22 Jul 2021 18:35:23 +0200 Subject: [PATCH 4/9] Fixed missing unwraps in test --- src/test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test.rs b/src/test.rs index d50bb64c..f9d80f2b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -661,7 +661,7 @@ fn test_transfer_descriptor() { let mut file = std::fs::File::create(& temp_file_path).unwrap(); let text = "This is a text string"; use std::io::Write; - file.write(text.as_bytes()); + file.write(text.as_bytes()).unwrap(); std::mem::drop(file); let file = std::fs::File::open(& temp_file_path).unwrap(); @@ -673,6 +673,6 @@ fn test_transfer_descriptor() { let mut file: std::fs::File = received_person_and_descriptor.1.into(); use std::io::Read; let mut read_text = String::new(); - file.read_to_string(&mut read_text); + file.read_to_string(&mut read_text).unwrap(); assert_eq!(text, read_text); } From 7254e51bed29ccd47ee1547ea597fc1ebe3e51e9 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Sun, 25 Jul 2021 15:02:08 +0200 Subject: [PATCH 5/9] Fix failing tests and move descriptor to own module --- src/descriptor.rs | 170 ++++++++++++++++++++++++++++++++++ src/ipc.rs | 16 ++-- src/lib.rs | 1 + src/platform/common/fd.rs | 100 -------------------- src/platform/common/mod.rs | 1 - src/platform/inprocess/mod.rs | 35 +++---- src/platform/macos/mod.rs | 20 ++-- src/platform/mod.rs | 10 -- src/platform/unix/mod.rs | 18 ++-- src/platform/windows/mod.rs | 163 ++++++++++++++++++++++++++++---- src/test.rs | 2 +- 11 files changed, 362 insertions(+), 174 deletions(-) create mode 100644 src/descriptor.rs delete mode 100644 src/platform/common/fd.rs delete mode 100644 src/platform/common/mod.rs diff --git a/src/descriptor.rs b/src/descriptor.rs new file mode 100644 index 00000000..867b7e95 --- /dev/null +++ b/src/descriptor.rs @@ -0,0 +1,170 @@ +use std::io; +use std::thread; +use std::mem; +use std::default::Default; +use std::fs::File; +use std::cell::RefCell; + +#[cfg(windows)] +pub use { + std::os::windows::io::RawHandle as RawDescriptor, + std::os::windows::io::AsRawHandle, + std::os::windows::io::IntoRawHandle, + std::os::windows::io::FromRawHandle, +}; + +#[cfg(unix)] +pub use { + std::os::unix::io::RawFd as RawDescriptor, + std::os::unix::io::AsRawFd, + std::os::unix::io::IntoRawFd, + std::os::unix::io::FromRawFd, +}; + +#[cfg(windows)] +const INVALID_RAW_DESCRIPTOR: RawDescriptor = winapi::um::handleapi::INVALID_HANDLE_VALUE; + +#[cfg(windows)] +fn raw_descriptor_close(descriptor: &RawDescriptor) -> Result<(), io::Error> { + unsafe { + let result = winapi::um::handleapi::CloseHandle(*descriptor); + if result == 0 { + Err(io::Error::last_os_error()) + } + else { + Ok(()) + } + } +} + +#[cfg(unix)] +const INVALID_RAW_DESCRIPTOR: RawDescriptor = -1; + +#[cfg(unix)] +fn raw_descriptor_close(descriptor: &RawDescriptor) -> Result<(), io::Error> { + unsafe { + let result = libc::close(*descriptor); + if result == 0 { + Ok(()) + } + else { + Err(io::Error::last_os_error()) + } + } +} + +#[derive(Debug)] +pub struct OwnedDescriptor(RefCell); + +unsafe impl Send for OwnedDescriptor { } +unsafe impl Sync for OwnedDescriptor { } + +impl Drop for OwnedDescriptor { + fn drop(&mut self) { + if *self.0.borrow() != INVALID_RAW_DESCRIPTOR { + let result = raw_descriptor_close(&*self.0.borrow()); + assert!( thread::panicking() || result.is_ok() ); + } + } +} + +impl OwnedDescriptor { + pub fn new(descriptor: RawDescriptor) -> OwnedDescriptor { + OwnedDescriptor(RefCell::new(descriptor)) + } + + pub fn consume(& self) -> OwnedDescriptor { + OwnedDescriptor::new(self.0.replace(INVALID_RAW_DESCRIPTOR)) + } +} + +impl Default for OwnedDescriptor { + fn default() -> OwnedDescriptor { + OwnedDescriptor::new(INVALID_RAW_DESCRIPTOR) + } +} + +#[cfg(windows)] +impl IntoRawHandle for OwnedDescriptor { + fn into_raw_handle(self) -> RawDescriptor { + let handle = *self.0.borrow(); + mem::forget(self); + handle + } +} + +#[cfg(windows)] +impl AsRawHandle for OwnedDescriptor { + fn as_raw_handle(& self) -> RawDescriptor { + *self.0.borrow() + } +} + +#[cfg(windows)] +impl FromRawHandle for OwnedDescriptor { + unsafe fn from_raw_handle(handle: RawDescriptor) -> OwnedDescriptor { + OwnedDescriptor::new(handle) + } +} + +#[cfg(windows)] +impl Into for OwnedDescriptor { + fn into(self) -> File { + unsafe { + File::from_raw_handle(self.into_raw_handle()) + } + } +} + +#[cfg(windows)] +impl From for OwnedDescriptor { + fn from(file: File) -> Self { + OwnedDescriptor::new(file.into_raw_handle()) + } +} + +#[cfg(unix)] +impl IntoRawFd for OwnedDescriptor { + fn into_raw_fd(self) -> RawDescriptor { + let fd = self.0.replace(INVALID_RAW_DESCRIPTOR); + mem::forget(self); + fd + } +} + +#[cfg(unix)] +impl AsRawFd for OwnedDescriptor { + fn as_raw_fd(& self) -> RawDescriptor { + *self.0.borrow() + } +} + +#[cfg(unix)] +impl FromRawFd for OwnedDescriptor { + unsafe fn from_raw_fd(fd: RawDescriptor) -> OwnedDescriptor { + OwnedDescriptor::new(fd) + } +} + +#[cfg(unix)] +impl Into for OwnedDescriptor { + fn into(self) -> File { + unsafe { + File::from_raw_fd(self.into_raw_fd()) + } + } +} + +#[cfg(unix)] +impl From for OwnedDescriptor { + fn from(file: File) -> Self { + OwnedDescriptor::new(file.into_raw_fd()) + } +} + +#[cfg(unix)] +impl PartialEq for OwnedDescriptor { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} diff --git a/src/ipc.rs b/src/ipc.rs index 0b96fe4a..3f5bf64d 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -9,7 +9,7 @@ use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender}; use crate::platform::{OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel}; -use crate::platform::Descriptor; +use crate::descriptor::OwnedDescriptor; use bincode; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -31,7 +31,7 @@ thread_local! { RefCell>> = RefCell::new(Vec::new()) } thread_local! { - static OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION: RefCell> = RefCell::new(Vec::new()) + static OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION: RefCell> = RefCell::new(Vec::new()) } thread_local! { static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) @@ -41,7 +41,7 @@ thread_local! { RefCell::new(Vec::new()) } thread_local! { - static OS_IPC_DESCRIPTORS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) + static OS_IPC_DESCRIPTORS_FOR_SERIALIZATION: RefCell> = RefCell::new(Vec::new()) } #[derive(Debug)] @@ -621,7 +621,7 @@ pub struct OpaqueIpcMessage { data: Vec, os_ipc_channels: Vec, os_ipc_shared_memory_regions: Vec>, - os_ipc_descriptors: Vec, + os_ipc_descriptors: Vec, } impl Debug for OpaqueIpcMessage { @@ -637,7 +637,7 @@ impl OpaqueIpcMessage { fn new(data: Vec, os_ipc_channels: Vec, os_ipc_shared_memory_regions: Vec, - os_ipc_descriptors: Vec) + os_ipc_descriptors: Vec) -> OpaqueIpcMessage { OpaqueIpcMessage { data: data, @@ -924,7 +924,7 @@ fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) } -impl Serialize for Descriptor { +impl Serialize for OwnedDescriptor { fn serialize(&self, serializer: S) -> Result where S: Serializer { let index = OS_IPC_DESCRIPTORS_FOR_SERIALIZATION.with(|os_ipc_descriptors_for_serialization| { let mut os_ipc_descriptors_for_serialization = @@ -937,12 +937,12 @@ impl Serialize for Descriptor { } } -impl<'de> Deserialize<'de> for Descriptor { +impl<'de> Deserialize<'de> for OwnedDescriptor { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { let index: usize = Deserialize::deserialize(deserializer)?; OS_IPC_DESCRIPTORS_FOR_DESERIALIZATION.with(|os_ipc_descriptors_for_deserialization| { - os_ipc_descriptors_for_deserialization.borrow_mut().get_mut(index).map(|x| x.consume()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for Descriptor")) + os_ipc_descriptors_for_deserialization.borrow_mut().get_mut(index).map(|x| x.consume()).ok_or(serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(index as u64), &"index for OwnedDescriptor")) }) } } diff --git a/src/lib.rs b/src/lib.rs index f7011662..bae5a892 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,6 +86,7 @@ extern crate winapi; pub mod ipc; pub mod platform; pub mod router; +pub mod descriptor; #[cfg(test)] mod test; diff --git a/src/platform/common/fd.rs b/src/platform/common/fd.rs deleted file mode 100644 index 493c36aa..00000000 --- a/src/platform/common/fd.rs +++ /dev/null @@ -1,100 +0,0 @@ -// use std::ops::{ -// Deref, -// DerefMut, -// }; -use std::os::unix::io::{ - AsRawFd, - RawFd, - IntoRawFd, - FromRawFd, -}; -use std::fmt; -use std::cmp::{PartialEq}; -use std::mem; -use std::fs::File; -use std::cell::RefCell; - -pub struct OwnedFd(RefCell); - -impl Drop for OwnedFd { - fn drop(&mut self) { - if *self.0.borrow() != -1 { - unsafe { - let _ = libc::close(*self.0.borrow()); - } - } - } -} - -// impl Deref for OwnedFd { -// type Target = RawFd; - -// fn deref(&self) -> &Self::Target { -// &self.0 -// } -// } - -// impl DerefMut for OwnedFd { -// fn deref_mut(&mut self) -> &mut Self::Target { -// &mut self.0 -// } -// } - -impl IntoRawFd for OwnedFd { - fn into_raw_fd(self) -> RawFd { - let fd = *self.0.borrow(); - mem::forget(self); - fd - } -} - -impl AsRawFd for OwnedFd { - fn as_raw_fd(& self) -> RawFd { - *self.0.borrow() - } -} - -impl FromRawFd for OwnedFd { - unsafe fn from_raw_fd(fd: RawFd) -> OwnedFd { - OwnedFd::new(fd) - } -} - -impl Into for OwnedFd { - fn into(self) -> File { - unsafe { - File::from_raw_fd(self.into_raw_fd()) - } - } -} - -impl From for OwnedFd { - fn from(file: File) -> Self { - OwnedFd::new(file.into_raw_fd()) - } -} - -impl OwnedFd { - pub fn new(fd: RawFd) -> OwnedFd { - OwnedFd(RefCell::new(fd)) - } - - pub fn consume(&self) -> OwnedFd { - let fd = self.0.replace(-1); - OwnedFd::new(fd) - } -} - -impl PartialEq for OwnedFd { - fn eq(&self, other: &Self) -> bool { - *self.0.borrow() == *other.0.borrow() - } -} - -impl fmt::Debug for OwnedFd { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("") - .field(&self.0) - .finish() - } -} diff --git a/src/platform/common/mod.rs b/src/platform/common/mod.rs deleted file mode 100644 index 99821ee4..00000000 --- a/src/platform/common/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fd; \ No newline at end of file diff --git a/src/platform/inprocess/mod.rs b/src/platform/inprocess/mod.rs index 7d5e369f..8b4a69f2 100644 --- a/src/platform/inprocess/mod.rs +++ b/src/platform/inprocess/mod.rs @@ -21,6 +21,7 @@ use std::cmp::{PartialEq}; use std::ops::{Deref, RangeFrom}; use std::usize; use uuid::Uuid; +use crate::descriptor::OwnedDescriptor; #[derive(Clone)] struct ServerRecord { @@ -52,7 +53,7 @@ lazy_static! { static ref ONE_SHOT_SERVERS: Mutex> = Mutex::new(HashMap::new()); } -struct ChannelMessage(Vec, Vec, Vec); +struct ChannelMessage(Vec, Vec, Vec, Vec); pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), ChannelError> { let (base_sender, base_receiver) = crossbeam_channel::unbounded::(); @@ -85,12 +86,12 @@ impl OsIpcReceiver { pub fn recv( &self - ) -> Result<(Vec, Vec, Vec), ChannelError> { + ) -> Result<(Vec, Vec, Vec, Vec), ChannelError> { let r = self.receiver.borrow(); let r = r.as_ref().unwrap(); match r.recv() { - Ok(ChannelMessage(d, c, s)) => { - Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s)) + Ok(ChannelMessage(d, c, s, fd)) => { + Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s, fd)) } Err(_) => Err(ChannelError::ChannelClosedError), } @@ -98,12 +99,12 @@ impl OsIpcReceiver { pub fn try_recv( &self - ) -> Result<(Vec, Vec, Vec), ChannelError> { + ) -> Result<(Vec, Vec, Vec, Vec), ChannelError> { let r = self.receiver.borrow(); let r = r.as_ref().unwrap(); match r.try_recv() { - Ok(ChannelMessage(d, c, s)) => { - Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s)) + Ok(ChannelMessage(d, c, s, fd)) => { + Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s, fd)) }, Err(e) => { match e { @@ -149,10 +150,11 @@ impl OsIpcSender { data: &[u8], ports: Vec, shared_memory_regions: Vec, + descriptors: Vec, ) -> Result<(), ChannelError> { Ok(self.sender .borrow() - .send(ChannelMessage(data.to_vec(), ports, shared_memory_regions)).map_err(|_| ChannelError::BrokenPipeError)?) + .send(ChannelMessage(data.to_vec(), ports, shared_memory_regions, descriptors)).map_err(|_| ChannelError::BrokenPipeError)?) } } @@ -198,9 +200,9 @@ impl OsIpcReceiverSet { let res = select.select(); let r_index = res.index(); let r_id = self.receiver_ids[r_index]; - if let Ok(ChannelMessage(data, channels, shmems)) = res.recv(&borrows[r_index as usize]) { + if let Ok(ChannelMessage(data, channels, shmems, descriptors)) = res.recv(&borrows[r_index as usize]) { let channels = channels.into_iter().map(OsOpaqueIpcChannel::new).collect(); - return Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems)]) + return Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems, descriptors)]) } else { Remove(r_index, r_id) } @@ -212,15 +214,15 @@ impl OsIpcReceiverSet { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { - OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions) => { - (id, data, channels, shared_memory_regions) + OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { + (id, data, channels, shared_memory_regions, descriptors) } OsIpcSelectionResult::ChannelClosed(id) => { panic!("OsIpcSelectionResult::unwrap(): receiver ID {} was closed!", id) @@ -255,6 +257,7 @@ impl OsIpcOneShotServer { Vec, Vec, Vec, + Vec, ), ChannelError, > { @@ -266,8 +269,8 @@ impl OsIpcOneShotServer { .clone(); record.accept(); ONE_SHOT_SERVERS.lock().unwrap().remove(&self.name).unwrap(); - let (data, channels, shmems) = self.receiver.recv()?; - Ok((self.receiver, data, channels, shmems)) + let (data, channels, shmems, descriptors) = self.receiver.recv()?; + Ok((self.receiver, data, channels, shmems, descriptors)) } } diff --git a/src/platform/macos/mod.rs b/src/platform/macos/mod.rs index 003b1f68..1d8b4226 100644 --- a/src/platform/macos/mod.rs +++ b/src/platform/macos/mod.rs @@ -14,7 +14,7 @@ use self::mach_sys::{mach_msg_timeout_t, mach_port_limits_t, mach_port_msgcount_ use self::mach_sys::{mach_port_right_t, mach_port_t, mach_task_self_, vm_inherit_t}; use self::mach_sys::mach_port_deallocate; use self::mach_sys::fileport_t; -use crate::platform::Descriptor; +use crate::descriptor::OwnedDescriptor; use bincode; use libc::{self, c_char, c_uint, c_void, size_t}; @@ -36,8 +36,6 @@ use std::os::raw::c_int; use std::os::unix::io::AsRawFd; -use crate::platform::common::fd::OwnedFd; - mod mach_sys; /// The size that we preallocate on the stack to receive messages. If the message is larger than @@ -358,7 +356,7 @@ impl OsIpcReceiver { } fn recv_with_blocking_mode(&self, blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec, Vec), + -> Result<(Vec, Vec, Vec, Vec), MachError> { select(self.port.get(), blocking_mode).and_then(|result| { match result { @@ -371,12 +369,12 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec, Vec),MachError> { + -> Result<(Vec, Vec, Vec, Vec),MachError> { self.recv_with_blocking_mode(BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec, Vec),MachError> { + -> Result<(Vec, Vec, Vec, Vec),MachError> { self.recv_with_blocking_mode(BlockingMode::Nonblocking) } } @@ -507,7 +505,7 @@ impl OsIpcSender { data: &[u8], ports: Vec, mut shared_memory_regions: Vec, - descriptors: Vec) + descriptors: Vec) -> Result<(),MachError> { let mut data = SendData::from(data); if let Some(data) = data.take_shared_memory() { @@ -708,12 +706,12 @@ impl Drop for OsIpcReceiverSet { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { (id, data, channels, shared_memory_regions, descriptors) @@ -831,7 +829,7 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode) for idx in port_count .. (port_count + descriptor_count) { let fd = mach_fileport_makefd(raw_ports[idx])?; - descriptors.push(OwnedFd::new(fd)); + descriptors.push(OwnedDescriptor::new(fd)); } let has_inline_data_ptr = port_counts.offset(1) as *mut bool; @@ -886,7 +884,7 @@ impl OsIpcOneShotServer { Vec, Vec, Vec, - Vec),MachError> { + Vec),MachError> { let (bytes, channels, shared_memory_regions, descriptors) = self.receiver.recv()?; Ok((self.receiver.consume(), bytes, channels, shared_memory_regions, descriptors)) } diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 0ae0b0d8..47c4e024 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -7,12 +7,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", - target_os = "openbsd", - target_os = "freebsd", - target_os = "macos")))] -mod common; - #[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", target_os = "openbsd", target_os = "freebsd")))] @@ -22,7 +16,6 @@ mod unix; target_os = "freebsd")))] mod os { pub use super::unix::*; - pub type Descriptor = super::common::fd::OwnedFd; } #[cfg(all(not(feature = "force-inprocess"), target_os = "macos"))] @@ -30,7 +23,6 @@ mod macos; #[cfg(all(not(feature = "force-inprocess"), target_os = "macos"))] mod os { pub use super::macos::*; - pub type Descriptor = super::common::fd::OwnedFd; } #[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))] @@ -38,7 +30,6 @@ mod windows; #[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))] mod os { pub use super::windows::*; - pub type Descriptor = super::windows::handle::WinHandle; } #[cfg(any( @@ -57,7 +48,6 @@ mod os { pub use self::os::{OsIpcChannel, OsIpcOneShotServer, OsIpcReceiver, OsIpcReceiverSet}; pub use self::os::{OsIpcSelectionResult, OsIpcSender, OsIpcSharedMemory}; pub use self::os::{OsOpaqueIpcChannel, channel}; -pub use self::os::{Descriptor}; #[cfg(test)] mod test; diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index ce961205..810c96b5 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -7,8 +7,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use crate::descriptor::OwnedDescriptor; use crate::ipc; -use crate::platform::Descriptor; use bincode; use fnv::FnvHasher; use libc::{EAGAIN, EWOULDBLOCK}; @@ -142,12 +142,12 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { recv(self.fd.get(), BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { recv(self.fd.get(), BlockingMode::Nonblocking) } } @@ -237,7 +237,7 @@ impl OsIpcSender { data: &[u8], channels: Vec, shared_memory_regions: Vec, - descriptors: Vec) + descriptors: Vec) -> Result<(),UnixError> { let header = Header { @@ -547,12 +547,12 @@ impl OsIpcReceiverSet { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { (id, data, channels, shared_memory_regions, descriptors) @@ -643,7 +643,7 @@ impl OsIpcOneShotServer { Vec, Vec, Vec, - Vec),UnixError> { + Vec),UnixError> { unsafe { let sockaddr: *mut sockaddr = ptr::null_mut(); let sockaddr_len: *mut socklen_t = ptr::null_mut(); @@ -919,7 +919,7 @@ enum BlockingMode { } fn recv(fd: c_int, blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec, Vec),UnixError> { + -> Result<(Vec, Vec, Vec, Vec),UnixError> { let (mut channels, mut shared_memory_regions, mut descriptors) = (Vec::new(), Vec::new(), Vec::new()); @@ -967,7 +967,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) } for index in (header.channel_fd_num + header.shared_memory_fd_num) .. (header.channel_fd_num + header.shared_memory_fd_num + header.descriptor_num) { - descriptors.push(Descriptor::new(*cmsg_fds.offset(index as isize))); + descriptors.push(OwnedDescriptor::new(*cmsg_fds.offset(index as isize))); } } diff --git a/src/platform/windows/mod.rs b/src/platform/windows/mod.rs index 3e87f7cc..4e3e36a0 100644 --- a/src/platform/windows/mod.rs +++ b/src/platform/windows/mod.rs @@ -14,6 +14,7 @@ use crate::ipc; use libc::intptr_t; use std::cell::{Cell, RefCell}; use std::cmp::PartialEq; +use std::default::Default; use std::env; use std::error::Error as StdError; use std::ffi::CString; @@ -22,22 +23,19 @@ use std::io; use std::marker::{Send, Sync, PhantomData}; use std::mem; use std::ops::{Deref, DerefMut, RangeFrom}; +use std::os::windows::io::IntoRawHandle; use std::ptr; use std::slice; use std::thread; use uuid::Uuid; -use winapi::shared::minwindef::{LPVOID}; -use winapi; - - use winapi::um::winnt::{HANDLE}; use winapi::um::handleapi::{INVALID_HANDLE_VALUE}; +use winapi::shared::minwindef::{LPVOID}; +use winapi; mod aliased_cell; -pub mod handle; use self::aliased_cell::AliasedCell; -use crate::platform::Descriptor; -use self::handle::{WinHandle, dup_handle, dup_handle_to_process, move_handle_to_process}; +use crate::descriptor::OwnedDescriptor; lazy_static! { static ref CURRENT_PROCESS_ID: winapi::shared::ntdef::ULONG = unsafe { winapi::um::processthreadsapi::GetCurrentProcessId() }; @@ -261,6 +259,135 @@ fn make_pipe_name(pipe_id: &Uuid) -> CString { CString::new(format!("\\\\.\\pipe\\rust-ipc-{}", pipe_id.to_string())).unwrap() } +/// Duplicate a given handle from this process to the target one, passing the +/// given flags to DuplicateHandle. +/// +/// Unlike win32 DuplicateHandle, this will preserve INVALID_HANDLE_VALUE (which is +/// also the pseudohandle for the current process). +fn dup_handle_to_process_with_flags(handle: &WinHandle, other_process: &WinHandle, flags: winapi::shared::minwindef::DWORD) + -> Result +{ + if !handle.is_valid() { + return Ok(WinHandle::invalid()); + } + + unsafe { + let mut new_handle: HANDLE = INVALID_HANDLE_VALUE; + let ok = winapi::um::handleapi::DuplicateHandle(CURRENT_PROCESS_HANDLE.as_raw(), handle.as_raw(), + other_process.as_raw(), &mut new_handle, + 0, winapi::shared::minwindef::FALSE, flags); + if ok == winapi::shared::minwindef::FALSE { + Err(WinError::last("DuplicateHandle")) + } else { + Ok(WinHandle::new(new_handle)) + } + } +} + +/// Duplicate a handle in the current process. +fn dup_handle(handle: &WinHandle) -> Result { + dup_handle_to_process(handle, &WinHandle::new(CURRENT_PROCESS_HANDLE.as_raw())) +} + +/// Duplicate a handle to the target process. +fn dup_handle_to_process(handle: &WinHandle, other_process: &WinHandle) -> Result { + dup_handle_to_process_with_flags(handle, other_process, winapi::um::winnt::DUPLICATE_SAME_ACCESS) +} + +/// Duplicate a handle to the target process, closing the source handle. +fn move_handle_to_process(handle: WinHandle, other_process: &WinHandle) -> Result { + let result = dup_handle_to_process_with_flags(&handle, other_process, + winapi::um::winnt::DUPLICATE_CLOSE_SOURCE | winapi::um::winnt::DUPLICATE_SAME_ACCESS); + // Since the handle was moved to another process, the original is no longer valid; + // so we probably shouldn't try to close it explicitly? + mem::forget(handle); + result +} + +#[derive(Debug)] +struct WinHandle { + h: HANDLE +} + +unsafe impl Send for WinHandle { } +unsafe impl Sync for WinHandle { } + +impl Drop for WinHandle { + fn drop(&mut self) { + unsafe { + if self.is_valid() { + let result = winapi::um::handleapi::CloseHandle(self.h); + assert!(thread::panicking() || result != 0); + } + } + } +} + +impl Default for WinHandle { + fn default() -> WinHandle { + WinHandle { h: INVALID_HANDLE_VALUE } + } +} + +impl From for WinHandle { + fn from(descriptor: OwnedDescriptor) -> WinHandle { + WinHandle::new(descriptor.into_raw_handle()) + } +} + +const WINDOWS_APP_MODULE_NAME: &'static str = "api-ms-win-core-handle-l1-1-0"; +const COMPARE_OBJECT_HANDLES_FUNCTION_NAME: &'static str = "CompareObjectHandles"; + +lazy_static! { + static ref WINDOWS_APP_MODULE_NAME_CSTRING: CString = CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); + static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: CString = CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); +} + +#[cfg(feature = "windows-shared-memory-equality")] +impl PartialEq for WinHandle { + fn eq(&self, other: &WinHandle) -> bool { + unsafe { + // Calling LoadLibraryA every time seems to be ok since libraries are refcounted and multiple calls won't produce multiple instances. + let module_handle = winapi::um::libloaderapi::LoadLibraryA(WINDOWS_APP_MODULE_NAME_CSTRING.as_ptr()); + if module_handle.is_null() { + panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, WinError::error_string(GetLastError())); + } + let proc = winapi::um::libloaderapi::GetProcAddress(module_handle, COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING.as_ptr()); + if proc.is_null() { + panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, WinError::error_string(GetLastError())); + } + let compare_object_handles: unsafe extern "stdcall" fn(HANDLE, HANDLE) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); + compare_object_handles(self.h, other.h) != 0 + } + } +} + +impl WinHandle { + fn new(h: HANDLE) -> WinHandle { + WinHandle { h: h } + } + + fn invalid() -> WinHandle { + WinHandle { h: INVALID_HANDLE_VALUE } + } + + fn is_valid(&self) -> bool { + self.h != INVALID_HANDLE_VALUE + } + + fn as_raw(&self) -> HANDLE { + self.h + } + + fn take_raw(&mut self) -> HANDLE { + mem::replace(&mut self.h, INVALID_HANDLE_VALUE) + } + + fn take(&mut self) -> WinHandle { + WinHandle::new(self.take_raw()) + } +} + /// Helper struct for all data being aliased by the kernel during async reads. #[derive(Debug)] struct AsyncData { @@ -642,7 +769,7 @@ impl MessageReader { } } - fn get_message(&mut self) -> Result, Vec, Vec, Vec)>, + fn get_message(&mut self) -> Result, Vec, Vec, Vec)>, WinError> { // Never touch the buffer while it's still mutably aliased by the kernel! if self.r#async.is_some() { @@ -655,7 +782,7 @@ impl MessageReader { let mut channels: Vec = vec![]; let mut shmems: Vec = vec![]; let mut big_data = None; - let mut descriptors: Vec = vec![]; + let mut descriptors: Vec = vec![]; if let Some(oob) = message.oob_data() { win32_trace!("[$ {:?}] msg with total {} bytes, {} channels, {} shmems, big data handle {:?}", @@ -673,7 +800,7 @@ impl MessageReader { } for handle in oob.descriptor_handles { - descriptors.push(WinHandle::new(handle as HANDLE)); + descriptors.push(OwnedDescriptor::new(handle as HANDLE)); } if oob.big_data_receiver_handle.is_some() { @@ -897,7 +1024,7 @@ impl OsIpcReceiver { // the implementation in select() is used. It does much the same thing, but across multiple // channels. fn receive_message(&self, mut blocking_mode: BlockingMode) - -> Result<(Vec, Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { let mut reader = self.reader.borrow_mut(); assert!(reader.entry_id.is_none(), "receive_message is only valid before this OsIpcReceiver was added to a Set"); @@ -928,13 +1055,13 @@ impl OsIpcReceiver { } pub fn recv(&self) - -> Result<(Vec, Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { win32_trace!("recv"); self.receive_message(BlockingMode::Blocking) } pub fn try_recv(&self) - -> Result<(Vec, Vec, Vec, Vec),WinError> { + -> Result<(Vec, Vec, Vec, Vec),WinError> { win32_trace!("try_recv"); self.receive_message(BlockingMode::Nonblocking) } @@ -1109,7 +1236,7 @@ impl OsIpcSender { data: &[u8], ports: Vec, shared_memory_regions: Vec, - descriptors: Vec) + descriptors: Vec) -> Result<(),WinError> { // We limit the max size we can send here; we can fix this @@ -1150,7 +1277,7 @@ impl OsIpcSender { } for descriptor in descriptors { - let mut raw_remote_handle = move_handle_to_process(descriptor, &server_h)?; + let mut raw_remote_handle = move_handle_to_process(descriptor.into(), &server_h)?; oob.descriptor_handles.push(raw_remote_handle.take_raw() as intptr_t); } @@ -1220,7 +1347,7 @@ impl OsIpcSender { } pub enum OsIpcSelectionResult { - DataReceived(u64, Vec, Vec, Vec, Vec), + DataReceived(u64, Vec, Vec, Vec, Vec), ChannelClosed(u64), } @@ -1442,7 +1569,7 @@ impl OsIpcReceiverSet { } impl OsIpcSelectionResult { - pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { + pub fn unwrap(self) -> (u64, Vec, Vec, Vec, Vec) { match self { OsIpcSelectionResult::DataReceived(id, data, channels, shared_memory_regions, descriptors) => { (id, data, channels, shared_memory_regions, descriptors) @@ -1583,7 +1710,7 @@ impl OsIpcOneShotServer { Vec, Vec, Vec, - Vec),WinError> { + Vec),WinError> { let receiver = self.receiver; receiver.accept()?; let (data, channels, shmems, descriptors) = receiver.recv()?; diff --git a/src/test.rs b/src/test.rs index f9d80f2b..609b99a7 100644 --- a/src/test.rs +++ b/src/test.rs @@ -665,7 +665,7 @@ fn test_transfer_descriptor() { std::mem::drop(file); let file = std::fs::File::open(& temp_file_path).unwrap(); - let person_and_descriptor = (person, crate::platform::Descriptor::from(file)); + let person_and_descriptor = (person, crate::descriptor::OwnedDescriptor::from(file)); let (tx, rx) = ipc::channel().unwrap(); tx.send(person_and_descriptor).unwrap(); let received_person_and_descriptor = rx.recv().unwrap(); From bfe46c9b75cc7e2031f2f5939b161add2c7d42b9 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Mon, 26 Jul 2021 12:29:51 +0200 Subject: [PATCH 6/9] Fix bug in Unix fragmentation --- src/platform/unix/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index 810c96b5..f02e0570 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -216,7 +216,7 @@ impl OsIpcSender { /// /// This one is smaller than regular fragments, because it carries the message (size) header. fn first_fragment_size(sendbuf_size: usize) -> usize { - (Self::fragment_size(sendbuf_size) - mem::size_of::()) + (Self::fragment_size(sendbuf_size) - mem::size_of::
()) & (!8usize + 1) // Ensure optimal alignment. } @@ -240,7 +240,7 @@ impl OsIpcSender { descriptors: Vec) -> Result<(),UnixError> { - let header = Header { + let mut header = Header { total_size: data.len(), channel_fd_num: channels.len(), shared_memory_fd_num: shared_memory_regions.len(), @@ -380,6 +380,7 @@ impl OsIpcSender { let (dedicated_tx, dedicated_rx) = channel()?; // Extract FD handle without consuming the Receiver, so the FD doesn't get closed. fds.push(dedicated_rx.fd.get()); + header.channel_fd_num += 1; // Split up the packet into fragments. let mut byte_position = 0; From 96b17bdcd4c75736140ab9e8faf5e72e9c477575 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Mon, 26 Jul 2021 13:02:59 +0200 Subject: [PATCH 7/9] Implement PartialEq for OwnedDescriptor on windows --- src/descriptor.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/descriptor.rs b/src/descriptor.rs index 867b7e95..a448b69f 100644 --- a/src/descriptor.rs +++ b/src/descriptor.rs @@ -123,6 +123,16 @@ impl From for OwnedDescriptor { } } +#[cfg(windows)] +impl PartialEq for OwnedDescriptor { + fn eq(&self, other: &Self) -> bool { + unsafe { + winapi::um::handleapi::CompareObjectHandles(*self.0.borrow(), *other.0.borrow()) + != winapi::shared::minwindef::FALSE + } + } +} + #[cfg(unix)] impl IntoRawFd for OwnedDescriptor { fn into_raw_fd(self) -> RawDescriptor { From 6b00662ef62f94279dc301961c7e9a754aa838c1 Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Mon, 26 Jul 2021 13:31:04 +0200 Subject: [PATCH 8/9] CompareObjectHandles still not linking properly, fixed with cludge --- src/descriptor.rs | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/descriptor.rs b/src/descriptor.rs index a448b69f..fa7d92f3 100644 --- a/src/descriptor.rs +++ b/src/descriptor.rs @@ -1,9 +1,10 @@ -use std::io; -use std::thread; -use std::mem; +use std::cell::RefCell; use std::default::Default; +use std::ffi::CString; use std::fs::File; -use std::cell::RefCell; +use std::io; +use std::mem; +use std::thread; #[cfg(windows)] pub use { @@ -123,12 +124,29 @@ impl From for OwnedDescriptor { } } -#[cfg(windows)] +const WINDOWS_APP_MODULE_NAME: &'static str = "api-ms-win-core-handle-l1-1-0"; +const COMPARE_OBJECT_HANDLES_FUNCTION_NAME: &'static str = "CompareObjectHandles"; + +lazy_static! { + static ref WINDOWS_APP_MODULE_NAME_CSTRING: CString = CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); + static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: CString = CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); +} + +#[cfg(feature = "windows-shared-memory-equality")] impl PartialEq for OwnedDescriptor { fn eq(&self, other: &Self) -> bool { unsafe { - winapi::um::handleapi::CompareObjectHandles(*self.0.borrow(), *other.0.borrow()) - != winapi::shared::minwindef::FALSE + // Calling LoadLibraryA every time seems to be ok since libraries are refcounted and multiple calls won't produce multiple instances. + let module_handle = winapi::um::libloaderapi::LoadLibraryA(WINDOWS_APP_MODULE_NAME_CSTRING.as_ptr()); + if module_handle.is_null() { + panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, WinError::error_string(GetLastError())); + } + let proc = winapi::um::libloaderapi::GetProcAddress(module_handle, COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING.as_ptr()); + if proc.is_null() { + panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, WinError::error_string(GetLastError())); + } + let compare_object_handles: unsafe extern "stdcall" fn(HANDLE, HANDLE) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); + compare_object_handles(*self.0.borrow(), *other.0.borrow()) != 0 } } } From bb49277006e154bd84e5f0ec4784eabe7dc9724a Mon Sep 17 00:00:00 2001 From: Jonas Zaddach Date: Mon, 26 Jul 2021 13:53:53 +0200 Subject: [PATCH 9/9] Fix imports for used winapi functions --- src/descriptor.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/descriptor.rs b/src/descriptor.rs index fa7d92f3..1dff0e96 100644 --- a/src/descriptor.rs +++ b/src/descriptor.rs @@ -1,6 +1,5 @@ use std::cell::RefCell; use std::default::Default; -use std::ffi::CString; use std::fs::File; use std::io; use std::mem; @@ -128,8 +127,8 @@ const WINDOWS_APP_MODULE_NAME: &'static str = "api-ms-win-core-handle-l1-1-0"; const COMPARE_OBJECT_HANDLES_FUNCTION_NAME: &'static str = "CompareObjectHandles"; lazy_static! { - static ref WINDOWS_APP_MODULE_NAME_CSTRING: CString = CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); - static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: CString = CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); + static ref WINDOWS_APP_MODULE_NAME_CSTRING: std::ffi::CString = std::ffi::CString::new(WINDOWS_APP_MODULE_NAME).unwrap(); + static ref COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING: std::ffi::CString = std::ffi::CString::new(COMPARE_OBJECT_HANDLES_FUNCTION_NAME).unwrap(); } #[cfg(feature = "windows-shared-memory-equality")] @@ -139,13 +138,13 @@ impl PartialEq for OwnedDescriptor { // Calling LoadLibraryA every time seems to be ok since libraries are refcounted and multiple calls won't produce multiple instances. let module_handle = winapi::um::libloaderapi::LoadLibraryA(WINDOWS_APP_MODULE_NAME_CSTRING.as_ptr()); if module_handle.is_null() { - panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, WinError::error_string(GetLastError())); + panic!("Error loading library {}. {}", WINDOWS_APP_MODULE_NAME, std::io::Error::last_os_error()); } let proc = winapi::um::libloaderapi::GetProcAddress(module_handle, COMPARE_OBJECT_HANDLES_FUNCTION_NAME_CSTRING.as_ptr()); if proc.is_null() { - panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, WinError::error_string(GetLastError())); + panic!("Error calling GetProcAddress to use {}. {}", COMPARE_OBJECT_HANDLES_FUNCTION_NAME, std::io::Error::last_os_error()); } - let compare_object_handles: unsafe extern "stdcall" fn(HANDLE, HANDLE) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); + let compare_object_handles: unsafe extern "stdcall" fn(RawDescriptor, RawDescriptor) -> winapi::shared::minwindef::BOOL = std::mem::transmute(proc); compare_object_handles(*self.0.borrow(), *other.0.borrow()) != 0 } }