Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vsock: Fix issues with sibling VM communication #385

Merged
merged 3 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions crates/vsock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ pub(crate) fn start_backend_server(
VhostUserVsockBackend::new(config.clone(), cid_map.clone())
.map_err(BackendError::CouldNotCreateBackend)?,
);
cid_map
.write()
.unwrap()
.insert(config.get_guest_cid(), backend.clone());

let listener = Listener::new(config.get_socket_path(), true).unwrap();

Expand Down Expand Up @@ -236,7 +232,6 @@ pub(crate) fn start_backend_server(

// No matter the result, we need to shut down the worker thread.
backend.exit_event.write(1).unwrap();
cid_map.write().unwrap().remove(&config.get_guest_cid());
}
}

Expand Down Expand Up @@ -455,8 +450,7 @@ mod tests {

let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));

let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map.clone()).unwrap());
cid_map.write().unwrap().insert(CID, backend.clone());
let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map).unwrap());

let daemon = VhostUserDaemon::new(
String::from("vhost-user-vsock"),
Expand Down
27 changes: 13 additions & 14 deletions crates/vsock/src/thread_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::{
vsock_conn::*,
};

pub(crate) type RawPktsQ = VecDeque<RawVsockPacket>;

pub(crate) struct RawVsockPacket {
pub header: [u8; PKT_HEADER_SIZE],
pub data: Vec<u8>,
Expand Down Expand Up @@ -65,9 +67,9 @@ pub(crate) struct VsockThreadBackend {
pub local_port_set: HashSet<u32>,
tx_buffer_size: u32,
/// Maps the guest CID to the corresponding backend. Used for sibling VM communication.
cid_map: Arc<RwLock<CidMap>>,
pub cid_map: Arc<RwLock<CidMap>>,
/// Queue of raw vsock packets recieved from sibling VMs to be sent to the guest.
raw_pkts_queue: VecDeque<RawVsockPacket>,
pub raw_pkts_queue: Arc<RwLock<RawPktsQ>>,
}

impl VsockThreadBackend {
Expand All @@ -92,7 +94,7 @@ impl VsockThreadBackend {
local_port_set: HashSet::new(),
tx_buffer_size,
cid_map,
raw_pkts_queue: VecDeque::new(),
raw_pkts_queue: Arc::new(RwLock::new(VecDeque::new())),
}
}

Expand All @@ -103,7 +105,7 @@ impl VsockThreadBackend {

/// Checks if there are pending raw vsock packets to be sent to the guest.
pub fn pending_raw_pkts(&self) -> bool {
!self.raw_pkts_queue.is_empty()
!self.raw_pkts_queue.read().unwrap().is_empty()
}

/// Deliver a vsock packet to the guest vsock driver.
Expand Down Expand Up @@ -178,14 +180,13 @@ impl VsockThreadBackend {
if dst_cid != VSOCK_HOST_CID {
let cid_map = self.cid_map.read().unwrap();
if cid_map.contains_key(&dst_cid) {
let sibling_backend = cid_map.get(&dst_cid).unwrap();
let mut sibling_backend_thread = sibling_backend.threads[0].lock().unwrap();
let (sibling_raw_pkts_queue, sibling_event_fd) = cid_map.get(&dst_cid).unwrap();

sibling_backend_thread
.thread_backend
.raw_pkts_queue
sibling_raw_pkts_queue
.write()
.unwrap()
.push_back(RawVsockPacket::from_vsock_packet(pkt)?);
let _ = sibling_backend_thread.sibling_event_fd.write(1);
let _ = sibling_event_fd.write(1);
} else {
warn!("vsock: dropping packet for unknown cid: {:?}", dst_cid);
}
Expand Down Expand Up @@ -254,6 +255,8 @@ impl VsockThreadBackend {
pub fn recv_raw_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
let raw_vsock_pkt = self
.raw_pkts_queue
.write()
.unwrap()
.pop_front()
.ok_or(Error::EmptyRawPktsQueue)?;

Expand Down Expand Up @@ -436,10 +439,6 @@ mod tests {

let sibling_backend =
Arc::new(VhostUserVsockBackend::new(sibling_config, cid_map.clone()).unwrap());
cid_map
.write()
.unwrap()
.insert(SIBLING_CID, sibling_backend.clone());

let epoll_fd = epoll::create(false).unwrap();
let mut vtp =
Expand Down
5 changes: 3 additions & 2 deletions crates/vsock/src/vhu_vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use vmm_sys_util::{
eventfd::{EventFd, EFD_NONBLOCK},
};

use crate::thread_backend::RawPktsQ;
use crate::vhu_vsock_thread::*;

pub(crate) type CidMap = HashMap<u64, Arc<VhostUserVsockBackend>>;
pub(crate) type CidMap = HashMap<u64, (Arc<RwLock<RawPktsQ>>, EventFd)>;

const NUM_QUEUES: usize = 2;
const QUEUE_SIZE: usize = 256;
Expand Down Expand Up @@ -316,7 +317,7 @@ impl VhostUserBackend<VringRwLock, ()> for VhostUserVsockBackend {
}
}

if device_event != EVT_QUEUE_EVENT && thread.thread_backend.pending_rx() {
if device_event != EVT_QUEUE_EVENT {
thread.process_rx(vring_rx, evt_idx)?;
}

Expand Down
101 changes: 72 additions & 29 deletions crates/vsock/src/vhu_vsock_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub(crate) struct VhostUserVsockThread {
/// EventFd to notify this thread for custom events. Currently used to notify
/// this thread to process raw vsock packets sent from a sibling VM.
pub sibling_event_fd: EventFd,
/// Keeps track of which RX queue was processed first in the last iteration.
/// Used to alternate between the RX queues to prevent the starvation of one by the other.
last_processed: RxQueueType,
}

impl VhostUserVsockThread {
Expand All @@ -92,21 +95,31 @@ impl VhostUserVsockThread {

let sibling_event_fd = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;

let thread_backend = VsockThreadBackend::new(
uds_path.clone(),
epoll_fd,
guest_cid,
tx_buffer_size,
cid_map.clone(),
);

cid_map.write().unwrap().insert(
guest_cid,
(
thread_backend.raw_pkts_queue.clone(),
sibling_event_fd.try_clone().unwrap(),
),
);

let thread = VhostUserVsockThread {
mem: None,
event_idx: false,
host_sock: host_sock.as_raw_fd(),
host_sock_path: uds_path.clone(),
host_sock_path: uds_path,
host_listener: host_sock,
vring_worker: None,
epoll_file,
thread_backend: VsockThreadBackend::new(
uds_path,
epoll_fd,
guest_cid,
tx_buffer_size,
cid_map,
),
thread_backend,
guest_cid,
pool: ThreadPoolBuilder::new()
.pool_size(1)
Expand All @@ -115,6 +128,7 @@ impl VhostUserVsockThread {
local_port: Wrapping(0),
tx_buffer_size,
sibling_event_fd,
last_processed: RxQueueType::Standard,
};

VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?;
Expand Down Expand Up @@ -517,7 +531,7 @@ impl VhostUserVsockThread {
}

/// Wrapper to process rx queue based on whether event idx is enabled or not.
pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
fn process_unix_sockets(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
// To properly handle EVENT_IDX we need to keep calling
// process_rx_queue until it stops finding new requests
Expand All @@ -540,6 +554,50 @@ impl VhostUserVsockThread {
Ok(false)
}

/// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not.
pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
loop {
if !self.thread_backend.pending_raw_pkts() {
break;
}
vring.disable_notification().unwrap();

self.process_rx_queue(vring, RxQueueType::RawPkts)?;
if !vring.enable_notification().unwrap() {
break;
}
}
} else {
self.process_rx_queue(vring, RxQueueType::RawPkts)?;
}
Ok(false)
}

pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
match self.last_processed {
RxQueueType::Standard => {
if self.thread_backend.pending_raw_pkts() {
self.process_raw_pkts(vring, event_idx)?;
self.last_processed = RxQueueType::RawPkts;
}
if self.thread_backend.pending_rx() {
self.process_unix_sockets(vring, event_idx)?;
}
}
RxQueueType::RawPkts => {
if self.thread_backend.pending_rx() {
self.process_unix_sockets(vring, event_idx)?;
self.last_processed = RxQueueType::Standard;
}
if self.thread_backend.pending_raw_pkts() {
self.process_raw_pkts(vring, event_idx)?;
}
}
}
Ok(false)
}

/// Process tx queue and send requests to the backend for processing.
fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result<bool> {
let mut used_any = false;
Expand Down Expand Up @@ -635,31 +693,16 @@ impl VhostUserVsockThread {
}
Ok(false)
}

/// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not.
pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
loop {
if !self.thread_backend.pending_raw_pkts() {
break;
}
vring.disable_notification().unwrap();

self.process_rx_queue(vring, RxQueueType::RawPkts)?;
if !vring.enable_notification().unwrap() {
break;
}
}
} else {
self.process_rx_queue(vring, RxQueueType::RawPkts)?;
}
Ok(false)
}
}

impl Drop for VhostUserVsockThread {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.host_sock_path);
self.thread_backend
.cid_map
.write()
.unwrap()
.remove(&self.guest_cid);
}
}
#[cfg(test)]
Expand Down