From 110fa55ef52faf646a11a6e6f5db8d4e0cdc72cc Mon Sep 17 00:00:00 2001 From: Neng Li Date: Tue, 6 Aug 2024 17:14:05 +0800 Subject: [PATCH] fix(interactive): Fix receiver state error after reconnect in a short time (#4097) ## What do these changes do? ## Related issue number Fixes https://github.com/alibaba/GraphScope/issues/4096 --- .../executor/engine/pegasus/network/src/receive/mod.rs | 9 +++++++-- .../engine/pegasus/network/src/receive/net_rx.rs | 4 ++++ .../executor/engine/pegasus/network/src/send/mod.rs | 10 ++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/network/src/receive/mod.rs b/interactive_engine/executor/engine/pegasus/network/src/receive/mod.rs index ba8c651a83e0..3307391635e2 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/receive/mod.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/receive/mod.rs @@ -82,10 +82,15 @@ fn add_remote_register(local: u64, remote: u64, register: InboxRegister) { lock.insert((local, remote), register); } -fn remove_remote_register(local: u64, remote: u64) -> Option { +fn remove_remote_register(local: u64, remote: u64, other: InboxRegister) -> Option { let mut lock = REMOTE_RECV_REGISTER .write() .expect("failure to lock REMOTE_RECV_REGISTER"); + if let Some(register) = lock.get(&(local, remote)) { + if !register.from_same_receiver(&other) { + return None; + } + } lock.remove(&(local, remote)) } @@ -135,7 +140,7 @@ pub fn start_net_receiver( break; } } - remove_remote_register(local, remote.id); + remove_remote_register(local, remote.id, net_recv.get_inbox_register()); info!("IPC receiver recv from {:?} exit;", remote); }) .expect("start net recv thread failure;"); diff --git a/interactive_engine/executor/engine/pegasus/network/src/receive/net_rx.rs b/interactive_engine/executor/engine/pegasus/network/src/receive/net_rx.rs index a49794f45664..844e562e3954 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/receive/net_rx.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/receive/net_rx.rs @@ -278,6 +278,10 @@ impl InboxRegister { pub(crate) fn register(&self, channel_id: u128, tx: &MessageSender) -> Result<(), NetError> { self.inner.register(channel_id, tx.clone()) } + + pub(crate) fn from_same_receiver(&self, other: &InboxRegister) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } } #[cfg(test)] diff --git a/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs b/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs index fd7e58dad6d6..f6d3fecceba5 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/send/mod.rs @@ -159,10 +159,16 @@ pub(crate) fn add_remote_sender(local_id: u64, server: &Server, tx: &Arc>) { let mut lock = REMOTE_MSG_SENDER .write() .expect("REMOTE_MSG_SENDER write lock poisoned"); + if let Some((_, tx)) = lock.get(&(local_id, remote_id)) { + let weak = Arc::downgrade(other); + if !Weak::ptr_eq(tx, &weak) { + return; + } + } lock.remove(&(local_id, remote_id)); } @@ -275,5 +281,5 @@ fn busy_send( } } info!("IPC sender to {:?} exit;", remote); - remove_remote_sender(local, remote); + remove_remote_sender(local, remote, net_tx.get_outbox_tx().as_ref().expect("")); }