Skip to content

Commit

Permalink
Minimal rubicon 3.x changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fasterthanlime committed Aug 6, 2024
1 parent ab53bf0 commit 7865a33
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 25 deletions.
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pin-project-lite = "0.2.11"
bytes = { version = "1.0.0", optional = true }
mio = { version = "1.0.1", optional = true, default-features = false }
parking_lot = { version = "0.12.0", optional = true }
rubicon = "3.4.0"

[target.'cfg(not(target_family = "wasm"))'.dependencies]
socket2 = { version = "0.5.5", optional = true, features = [ "all" ] }
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/macros/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ macro_rules! tokio_thread_local {
#[cfg(not(all(loom, test)))]
macro_rules! tokio_thread_local {
($($tts:tt)+) => {
::std::thread_local!{ $($tts)+ }
::rubicon::thread_local!{ $($tts)+ }
}
}
12 changes: 8 additions & 4 deletions tokio/src/process/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,21 @@ cfg_not_has_const_mutex_new! {
fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
use crate::util::once_cell::OnceCell;

static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new();
rubicon::process_local! {
static TOKIO_PROCESS_UNIX_ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new();
}

ORPHAN_QUEUE.get(OrphanQueueImpl::new)
TOKIO_PROCESS_UNIX_ORPHAN_QUEUE.get(OrphanQueueImpl::new)
}
}

cfg_has_const_mutex_new! {
fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
rubicon::process_local! {
static TOKIO_PROCESS_UNIX_ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
}

&ORPHAN_QUEUE
&TOKIO_PROCESS_UNIX_ORPHAN_QUEUE
}
}

Expand Down
8 changes: 5 additions & 3 deletions tokio/src/runtime/task/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ impl Id {

#[cfg(all(test, loom))]
crate::loom::lazy_static! {
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
static ref TOKIO_TASK_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}

#[cfg(not(all(test, loom)))]
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
rubicon::process_local! {
static TOKIO_TASK_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}

loop {
let id = NEXT_ID.fetch_add(1, Relaxed);
let id = TOKIO_TASK_NEXT_ID.fetch_add(1, Relaxed);
if let Some(id) = NonZeroU64::new(id) {
return Self(id);
}
Expand Down
12 changes: 8 additions & 4 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use std::num::NonZeroU64;
cfg_has_atomic_u64! {
use std::sync::atomic::AtomicU64;

static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
rubicon::process_local! {
static TOKIO_NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
}

fn get_next_id() -> NonZeroU64 {
loop {
let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
let id = TOKIO_NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
if let Some(id) = NonZeroU64::new(id) {
return id;
}
Expand All @@ -43,11 +45,13 @@ cfg_has_atomic_u64! {
cfg_not_has_atomic_u64! {
use std::sync::atomic::AtomicU32;

static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
rubicon::process_local! {
static TOKIO_NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
}

fn get_next_id() -> NonZeroU64 {
loop {
let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
let id = TOKIO_NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
if let Some(id) = NonZeroU64::new(u64::from(id)) {
return id;
}
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/runtime/task/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ unsafe fn wake_by_ref(ptr: *const ()) {
raw.wake_by_ref();
}

static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker);
rubicon::process_local! {
static TOKIO_RUNTIME_TASK_WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker);
}

fn raw_waker(header: NonNull<Header>) -> RawWaker {
let ptr = header.as_ptr() as *const ();
RawWaker::new(ptr, &WAKER_VTABLE)
RawWaker::new(ptr, &TOKIO_RUNTIME_TASK_WAKER_VTABLE)
}
8 changes: 5 additions & 3 deletions tokio/src/runtime/thread_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ impl ThreadId {
pub(crate) fn next() -> Self {
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};

static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0);
rubicon::process_local! {
static TOKIO_RUNTIME_THREAD_NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0);
}

let mut last = NEXT_ID.load(Relaxed);
let mut last = TOKIO_RUNTIME_THREAD_NEXT_ID.load(Relaxed);
loop {
let id = match last.checked_add(1) {
Some(id) => id,
None => exhausted(),
};

match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) {
match TOKIO_RUNTIME_THREAD_NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) {
Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()),
Err(id) => last = id,
}
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/signal/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ where
OsExtraData: 'static + Send + Sync + Init,
OsStorage: 'static + Send + Sync + Init,
{
static GLOBALS: OnceCell<Globals> = OnceCell::new();
rubicon::process_local! {
static TOKIO_SIGNAL_REGISTRY_GLOBALS: OnceCell<Globals> = OnceCell::new();
}

GLOBALS.get(globals_init)
TOKIO_SIGNAL_REGISTRY_GLOBALS.get(globals_init)
}

#[cfg(all(test, not(loom)))]
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/signal/windows/sys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ impl Init for OsExtraData {
}

fn global_init() -> io::Result<()> {
static INIT: Once = Once::new();
rubicon::process_local! {
static TOKIO_SIGNAL_WINDOWS_INIT: Once = Once::new();
}

let mut init = None;

INIT.call_once(|| unsafe {
TOKIO_SIGNAL_WINDOWS_INIT.call_once(|| unsafe {
let rc = console::SetConsoleCtrlHandler(Some(handler), 1);
let ret = if rc == 0 {
Err(io::Error::last_os_error())
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ cfg_test_util! {
// A static is used so we can avoid accessing the thread-local as well. The
// `std` AtomicBool is used directly because loom does not support static
// atomics.
static DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false);
rubicon::process_local! {
static TOKIO_TIME_DID_PAUSE_CLOCK: StdAtomicBool = StdAtomicBool::new(false);
}

#[derive(Debug)]
struct Inner {
Expand Down Expand Up @@ -210,7 +212,7 @@ cfg_test_util! {

/// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if !DID_PAUSE_CLOCK.load(Ordering::Acquire) {
if !TOKIO_TIME_DID_PAUSE_CLOCK.load(Ordering::Acquire) {
return Instant::from_std(std::time::Instant::now());
}

Expand Down Expand Up @@ -257,7 +259,7 @@ cfg_test_util! {
}

// Track that we paused the clock
DID_PAUSE_CLOCK.store(true, Ordering::Release);
TOKIO_TIME_DID_PAUSE_CLOCK.store(true, Ordering::Release);

let elapsed = match inner.unfrozen.as_ref() {
Some(v) => v.elapsed(),
Expand Down

0 comments on commit 7865a33

Please sign in to comment.