From 7865a331896e63115eff0d9387bdb4903231822a Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Tue, 6 Aug 2024 09:29:02 +0200 Subject: [PATCH] Minimal rubicon 3.x changes --- tokio/Cargo.toml | 1 + tokio/src/macros/thread_local.rs | 2 +- tokio/src/process/unix/mod.rs | 12 ++++++++---- tokio/src/runtime/task/id.rs | 8 +++++--- tokio/src/runtime/task/list.rs | 12 ++++++++---- tokio/src/runtime/task/waker.rs | 8 +++++--- tokio/src/runtime/thread_id.rs | 8 +++++--- tokio/src/signal/registry.rs | 6 ++++-- tokio/src/signal/windows/sys.rs | 6 ++++-- tokio/src/time/clock.rs | 8 +++++--- 10 files changed, 46 insertions(+), 25 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 12a8c698570..8dc3e2969de 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -94,6 +94,7 @@ pin-project-lite = "0.2.11" bytes = { version = "1.0.0", optional = true } mio = { version = "1.0.1", optional = true, default-features = false } parking_lot = { version = "0.12.0", optional = true } +rubicon = "3.4.0" [target.'cfg(not(target_family = "wasm"))'.dependencies] socket2 = { version = "0.5.5", optional = true, features = [ "all" ] } diff --git a/tokio/src/macros/thread_local.rs b/tokio/src/macros/thread_local.rs index 6d4a6aacbff..39335f77966 100644 --- a/tokio/src/macros/thread_local.rs +++ b/tokio/src/macros/thread_local.rs @@ -13,6 +13,6 @@ macro_rules! tokio_thread_local { #[cfg(not(all(loom, test)))] macro_rules! tokio_thread_local { ($($tts:tt)+) => { - ::std::thread_local!{ $($tts)+ } + ::rubicon::thread_local!{ $($tts)+ } } } diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index c9d1035f53d..42f41b1a8ab 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -68,17 +68,21 @@ cfg_not_has_const_mutex_new! { fn get_orphan_queue() -> &'static OrphanQueueImpl { use crate::util::once_cell::OnceCell; - static ORPHAN_QUEUE: OnceCell> = OnceCell::new(); + rubicon::process_local! { + static TOKIO_PROCESS_UNIX_ORPHAN_QUEUE: OnceCell> = OnceCell::new(); + } - ORPHAN_QUEUE.get(OrphanQueueImpl::new) + TOKIO_PROCESS_UNIX_ORPHAN_QUEUE.get(OrphanQueueImpl::new) } } cfg_has_const_mutex_new! { fn get_orphan_queue() -> &'static OrphanQueueImpl { - static ORPHAN_QUEUE: OrphanQueueImpl = OrphanQueueImpl::new(); + rubicon::process_local! { + static TOKIO_PROCESS_UNIX_ORPHAN_QUEUE: OrphanQueueImpl = OrphanQueueImpl::new(); + } - &ORPHAN_QUEUE + &TOKIO_PROCESS_UNIX_ORPHAN_QUEUE } } diff --git a/tokio/src/runtime/task/id.rs b/tokio/src/runtime/task/id.rs index 9c3b1403ec2..0ff81495af7 100644 --- a/tokio/src/runtime/task/id.rs +++ b/tokio/src/runtime/task/id.rs @@ -79,14 +79,16 @@ impl Id { #[cfg(all(test, loom))] crate::loom::lazy_static! { - static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + static ref TOKIO_TASK_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); } #[cfg(not(all(test, loom)))] - static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + rubicon::process_local! { + static TOKIO_TASK_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + } loop { - let id = NEXT_ID.fetch_add(1, Relaxed); + let id = TOKIO_TASK_NEXT_ID.fetch_add(1, Relaxed); if let Some(id) = NonZeroU64::new(id) { return Self(id); } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 988d422836d..69571b85c99 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -28,11 +28,13 @@ use std::num::NonZeroU64; cfg_has_atomic_u64! { use std::sync::atomic::AtomicU64; - static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1); + rubicon::process_local! { + static TOKIO_NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1); + } fn get_next_id() -> NonZeroU64 { loop { - let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed); + let id = TOKIO_NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed); if let Some(id) = NonZeroU64::new(id) { return id; } @@ -43,11 +45,13 @@ cfg_has_atomic_u64! { cfg_not_has_atomic_u64! { use std::sync::atomic::AtomicU32; - static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1); + rubicon::process_local! { + static TOKIO_NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1); + } fn get_next_id() -> NonZeroU64 { loop { - let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed); + let id = TOKIO_NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed); if let Some(id) = NonZeroU64::new(u64::from(id)) { return id; } diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index 2a1568fe8f7..28a7483a96c 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -93,10 +93,12 @@ unsafe fn wake_by_ref(ptr: *const ()) { raw.wake_by_ref(); } -static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker); +rubicon::process_local! { + static TOKIO_RUNTIME_TASK_WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker); +} fn raw_waker(header: NonNull
) -> RawWaker { let ptr = header.as_ptr() as *const (); - RawWaker::new(ptr, &WAKER_VTABLE) + RawWaker::new(ptr, &TOKIO_RUNTIME_TASK_WAKER_VTABLE) } diff --git a/tokio/src/runtime/thread_id.rs b/tokio/src/runtime/thread_id.rs index ef392897963..aef1e138125 100644 --- a/tokio/src/runtime/thread_id.rs +++ b/tokio/src/runtime/thread_id.rs @@ -7,16 +7,18 @@ impl ThreadId { pub(crate) fn next() -> Self { use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; - static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0); + rubicon::process_local! { + static TOKIO_RUNTIME_THREAD_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0); + } - let mut last = NEXT_ID.load(Relaxed); + let mut last = TOKIO_RUNTIME_THREAD_NEXT_ID.load(Relaxed); loop { let id = match last.checked_add(1) { Some(id) => id, None => exhausted(), }; - match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) { + match TOKIO_RUNTIME_THREAD_NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) { Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()), Err(id) => last = id, } diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 3fff8df9303..cedf4d31a9e 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -164,9 +164,11 @@ where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init, { - static GLOBALS: OnceCell = OnceCell::new(); + rubicon::process_local! { + static TOKIO_SIGNAL_REGISTRY_GLOBALS: OnceCell = OnceCell::new(); + } - GLOBALS.get(globals_init) + TOKIO_SIGNAL_REGISTRY_GLOBALS.get(globals_init) } #[cfg(all(test, not(loom)))] diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index 26e6bdf8182..bcd1c73cacd 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -88,11 +88,13 @@ impl Init for OsExtraData { } fn global_init() -> io::Result<()> { - static INIT: Once = Once::new(); + rubicon::process_local! { + static TOKIO_SIGNAL_WINDOWS_INIT: Once = Once::new(); + } let mut init = None; - INIT.call_once(|| unsafe { + TOKIO_SIGNAL_WINDOWS_INIT.call_once(|| unsafe { let rc = console::SetConsoleCtrlHandler(Some(handler), 1); let ret = if rc == 0 { Err(io::Error::last_os_error()) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 50884f972fa..8d5a0d7c69e 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -74,7 +74,9 @@ cfg_test_util! { // A static is used so we can avoid accessing the thread-local as well. The // `std` AtomicBool is used directly because loom does not support static // atomics. - static DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false); + rubicon::process_local! { + static TOKIO_TIME_DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false); + } #[derive(Debug)] struct Inner { @@ -210,7 +212,7 @@ cfg_test_util! { /// Returns the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { - if !DID_PAUSE_CLOCK.load(Ordering::Acquire) { + if !TOKIO_TIME_DID_PAUSE_CLOCK.load(Ordering::Acquire) { return Instant::from_std(std::time::Instant::now()); } @@ -257,7 +259,7 @@ cfg_test_util! { } // Track that we paused the clock - DID_PAUSE_CLOCK.store(true, Ordering::Release); + TOKIO_TIME_DID_PAUSE_CLOCK.store(true, Ordering::Release); let elapsed = match inner.unfrozen.as_ref() { Some(v) => v.elapsed(),