Skip to content

Commit

Permalink
fix(interactive): Fix receiver state error after reconnect in a short…
Browse files Browse the repository at this point in the history
… time (#4097)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
#4096
  • Loading branch information
lnfjpt authored Aug 6, 2024
1 parent 74755c3 commit 110fa55
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,15 @@ fn add_remote_register(local: u64, remote: u64, register: InboxRegister) {
lock.insert((local, remote), register);
}

fn remove_remote_register(local: u64, remote: u64) -> Option<InboxRegister> {
fn remove_remote_register(local: u64, remote: u64, other: InboxRegister) -> Option<InboxRegister> {
let mut lock = REMOTE_RECV_REGISTER
.write()
.expect("failure to lock REMOTE_RECV_REGISTER");
if let Some(register) = lock.get(&(local, remote)) {
if !register.from_same_receiver(&other) {
return None;
}
}
lock.remove(&(local, remote))
}

Expand Down Expand Up @@ -135,7 +140,7 @@ pub fn start_net_receiver(
break;
}
}
remove_remote_register(local, remote.id);
remove_remote_register(local, remote.id, net_recv.get_inbox_register());
info!("IPC receiver recv from {:?} exit;", remote);
})
.expect("start net recv thread failure;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ impl InboxRegister {
pub(crate) fn register(&self, channel_id: u128, tx: &MessageSender<Payload>) -> Result<(), NetError> {
self.inner.register(channel_id, tx.clone())
}

pub(crate) fn from_same_receiver(&self, other: &InboxRegister) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,16 @@ pub(crate) fn add_remote_sender(local_id: u64, server: &Server, tx: &Arc<Sender<
lock.insert((local_id, server.id), (server.addr, tx));
}

pub(crate) fn remove_remote_sender(local_id: u64, remote_id: u64) {
pub(crate) fn remove_remote_sender(local_id: u64, remote_id: u64, other: &Arc<Sender<NetData>>) {
let mut lock = REMOTE_MSG_SENDER
.write()
.expect("REMOTE_MSG_SENDER write lock poisoned");
if let Some((_, tx)) = lock.get(&(local_id, remote_id)) {
let weak = Arc::downgrade(other);
if !Weak::ptr_eq(tx, &weak) {
return;
}
}
lock.remove(&(local_id, remote_id));
}

Expand Down Expand Up @@ -275,5 +281,5 @@ fn busy_send<W: Write>(
}
}
info!("IPC sender to {:?} exit;", remote);
remove_remote_sender(local, remote);
remove_remote_sender(local, remote, net_tx.get_outbox_tx().as_ref().expect(""));
}

0 comments on commit 110fa55

Please sign in to comment.