diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index 27108ff1..3f3b7495 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -3,6 +3,7 @@ use std::{ collections::HashMap, io::{self, Result as IoResult}, + ops::DerefMut, sync::{Arc, Mutex, RwLock}, u16, u32, u64, u8, }; @@ -217,6 +218,7 @@ pub(crate) struct VhostUserVsockBackend { pub threads: Vec>, queues_per_thread: Vec, pub exit_event: EventFd, + last_processed: Mutex, } impl VhostUserVsockBackend { @@ -236,6 +238,7 @@ impl VhostUserVsockBackend { threads: vec![thread], queues_per_thread, exit_event: EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?, + last_processed: Mutex::new(RxQueueType::Standard), }) } } @@ -317,8 +320,30 @@ impl VhostUserBackend for VhostUserVsockBackend { } } - if device_event != EVT_QUEUE_EVENT && thread.thread_backend.pending_rx() { - thread.process_rx(vring_rx, evt_idx)?; + if device_event != EVT_QUEUE_EVENT { + let mut last_processed_lock = self.last_processed.lock().unwrap(); + let last_processed = last_processed_lock.deref_mut(); + + match last_processed { + RxQueueType::Standard => { + if thread.thread_backend.pending_raw_pkts() { + thread.process_raw_pkts(vring_rx, evt_idx)?; + *last_processed = RxQueueType::RawPkts; + } + if thread.thread_backend.pending_rx() { + thread.process_rx(vring_rx, evt_idx)?; + } + } + RxQueueType::RawPkts => { + if thread.thread_backend.pending_rx() { + thread.process_rx(vring_rx, evt_idx)?; + *last_processed = RxQueueType::Standard; + } + if thread.thread_backend.pending_raw_pkts() { + thread.process_raw_pkts(vring_rx, evt_idx)?; + } + } + } } Ok(false) diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index ed62a4ae..b2f5c96e 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -36,7 +36,7 @@ use crate::{ type ArcVhostBknd = Arc; -enum RxQueueType { +pub enum RxQueueType { Standard, RawPkts, }