parking_lot/once.rs

// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use crate::util::UncheckedOptionExt;
use core::{
fmt, mem,
sync::atomic::{fence, AtomicU8, Ordering},
};
use parking_lot_core::{self, SpinWait, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
const DONE_BIT: u8 = 1;
const POISON_BIT: u8 = 2;
const LOCKED_BIT: u8 = 4;
const PARKED_BIT: u8 = 8;
/// Current state of a `Once`.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum OnceState {
/// A closure has not been executed yet
New,
/// A closure was executed but panicked.
Poisoned,
/// A thread is currently executing a closure.
InProgress,
/// A closure has completed successfully.
Done,
}
impl OnceState {
/// Returns whether the associated `Once` has been poisoned.
///
/// Once an initialization routine for a `Once` has panicked it will forever
/// indicate to future forced initialization routines that it is poisoned.
#[inline]
pub fn poisoned(self) -> bool {
match self {
OnceState::Poisoned => true,
_ => false,
}
}
/// Returns whether the associated `Once` has successfully executed a
/// closure.
#[inline]
pub fn done(self) -> bool {
match self {
OnceState::Done => true,
_ => false,
}
}
}
/// A synchronization primitive which can be used to run a one-time
/// initialization. Useful for one-time initialization for globals, FFI or
/// related functionality.
///
/// # Differences from the standard library `Once`
///
/// - Only requires 1 byte of space, instead of 1 word.
/// - Not required to be `'static`.
/// - Relaxed memory barriers in the fast path, which can significantly improve
/// performance on some architectures.
/// - Efficient handling of micro-contention using adaptive spinning.
///
/// # Examples
///
/// ```
/// use parking_lot::Once;
///
/// static START: Once = Once::new();
///
/// START.call_once(|| {
/// // run initialization here
/// });
/// ```
pub struct Once(AtomicU8);
impl Once {
/// Creates a new `Once` value.
#[inline]
pub const fn new() -> Once {
Once(AtomicU8::new(0))
}
/// Returns the current state of this `Once`.
#[inline]
pub fn state(&self) -> OnceState {
let state = self.0.load(Ordering::Acquire);
if state & DONE_BIT != 0 {
OnceState::Done
} else if state & LOCKED_BIT != 0 {
OnceState::InProgress
} else if state & POISON_BIT != 0 {
OnceState::Poisoned
} else {
OnceState::New
}
}
/// Performs an initialization routine once and only once. The given closure
/// will be executed if this is the first time `call_once` has been called,
/// and otherwise the routine will *not* be invoked.
///
/// This method will block the calling thread if another initialization
/// routine is currently running.
///
/// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified). It is also
/// guaranteed that any memory writes performed by the executed closure can
/// be reliably observed by other threads at this point (there is a
/// happens-before relation between the closure and code executing after the
/// return).
///
/// # Examples
///
/// ```
/// use parking_lot::Once;
///
/// static mut VAL: usize = 0;
/// static INIT: Once = Once::new();
///
/// // Accessing a `static mut` is unsafe much of the time, but if we do so
/// // in a synchronized fashion (e.g. write once or read all) then we're
/// // good to go!
/// //
/// // This function will only call `expensive_computation` once, and will
/// // otherwise always return the value returned from the first invocation.
/// fn get_cached_val() -> usize {
/// unsafe {
/// INIT.call_once(|| {
/// VAL = expensive_computation();
/// });
/// VAL
/// }
/// }
///
/// fn expensive_computation() -> usize {
/// // ...
/// # 2
/// }
/// ```
///
/// # Panics
///
/// The closure `f` will only be executed once if this is called
/// concurrently amongst many threads. If that closure panics, however, then
/// it will *poison* this `Once` instance, causing all future invocations of
/// `call_once` to also panic.
#[inline]
pub fn call_once<F>(&self, f: F)
where
F: FnOnce(),
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(false, &mut |_| unsafe { f.take().unchecked_unwrap()() });
}
/// Performs the same function as `call_once` except ignores poisoning.
///
/// If this `Once` has been poisoned (some initialization panicked) then
/// this function will continue to attempt to call initialization functions
/// until one of them doesn't panic.
///
/// The closure `f` is yielded a structure which can be used to query the
/// state of this `Once` (whether initialization has previously panicked or
/// not).
#[inline]
pub fn call_once_force<F>(&self, f: F)
where
F: FnOnce(OnceState),
{
if self.0.load(Ordering::Acquire) == DONE_BIT {
return;
}
let mut f = Some(f);
self.call_once_slow(true, &mut |state| unsafe {
f.take().unchecked_unwrap()(state)
});
}
// This is a non-generic function to reduce the monomorphization cost of
// using `call_once` (this isn't exactly a trivial or small implementation).
//
// Additionally, this is tagged with `#[cold]` as it should indeed be cold
// and it helps let LLVM know that calls to this function should be off the
// fast path. Essentially, this should help generate more straight line code
// in LLVM.
//
// Finally, this takes an `FnMut` instead of a `FnOnce` because there's
// currently no way to take an `FnOnce` and call it via virtual dispatch
// without some allocation overhead.
#[cold]
fn call_once_slow(&self, ignore_poison: bool, f: &mut dyn FnMut(OnceState)) {
let mut spinwait = SpinWait::new();
let mut state = self.0.load(Ordering::Relaxed);
loop {
// If another thread called the closure, we're done
if state & DONE_BIT != 0 {
// An acquire fence is needed here since we didn't load the
// state with Ordering::Acquire.
fence(Ordering::Acquire);
return;
}
// If the state has been poisoned and we aren't forcing, then panic
if state & POISON_BIT != 0 && !ignore_poison {
// Need the fence here as well for the same reason
fence(Ordering::Acquire);
panic!("Once instance has previously been poisoned");
}
// Grab the lock if it isn't locked, even if there is a queue on it.
// We also clear the poison bit since we are going to try running
// the closure again.
if state & LOCKED_BIT == 0 {
match self.0.compare_exchange_weak(
state,
(state | LOCKED_BIT) & !POISON_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state & PARKED_BIT == 0 && spinwait.spin() {
state = self.0.load(Ordering::Relaxed);
continue;
}
// Set the parked bit
if state & PARKED_BIT == 0 {
if let Err(x) = self.0.compare_exchange_weak(
state,
state | PARKED_BIT,
Ordering::Relaxed,
Ordering::Relaxed,
) {
state = x;
continue;
}
}
// Park our thread until we are woken up by the thread that owns the
// lock.
unsafe {
let addr = self as *const _ as usize;
let validate = || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT;
let before_sleep = || {};
let timed_out = |_, _| unreachable!();
parking_lot_core::park(
addr,
validate,
before_sleep,
timed_out,
DEFAULT_PARK_TOKEN,
None,
);
}
// Loop back and check if the done bit was set
spinwait.reset();
state = self.0.load(Ordering::Relaxed);
}
struct PanicGuard<'a>(&'a Once);
impl<'a> Drop for PanicGuard<'a> {
fn drop(&mut self) {
// Mark the state as poisoned, unlock it and unpark all threads.
let once = self.0;
let state = once.0.swap(POISON_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
unsafe {
let addr = once as *const _ as usize;
parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN);
}
}
}
}
// At this point we have the lock, so run the closure. Make sure we
// properly clean up if the closure panicks.
let guard = PanicGuard(self);
let once_state = if state & POISON_BIT != 0 {
OnceState::Poisoned
} else {
OnceState::New
};
f(once_state);
mem::forget(guard);
// Now unlock the state, set the done bit and unpark all threads
let state = self.0.swap(DONE_BIT, Ordering::Release);
if state & PARKED_BIT != 0 {
unsafe {
let addr = self as *const _ as usize;
parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN);
}
}
}
}
impl Default for Once {
#[inline]
fn default() -> Once {
Once::new()
}
}
impl fmt::Debug for Once {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Once")
.field("state", &self.state())
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::Once;
use std::panic;
use std::sync::mpsc::channel;
use std::thread;
#[test]
fn smoke_once() {
static O: Once = Once::new();
let mut a = 0;
O.call_once(|| a += 1);
assert_eq!(a, 1);
O.call_once(|| a += 1);
assert_eq!(a, 1);
}
#[test]
fn stampede_once() {
static O: Once = Once::new();
static mut RUN: bool = false;
let (tx, rx) = channel();
for _ in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
for _ in 0..4 {
thread::yield_now()
}
unsafe {
O.call_once(|| {
assert!(!RUN);
RUN = true;
});
assert!(RUN);
}
tx.send(()).unwrap();
});
}
unsafe {
O.call_once(|| {
assert!(!RUN);
RUN = true;
});
assert!(RUN);
}
for _ in 0..10 {
rx.recv().unwrap();
}
}
#[test]
fn poison_bad() {
static O: Once = Once::new();
// poison the once
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
// poisoning propagates
let t = panic::catch_unwind(|| {
O.call_once(|| {});
});
assert!(t.is_err());
// we can subvert poisoning, however
let mut called = false;
O.call_once_force(|p| {
called = true;
assert!(p.poisoned())
});
assert!(called);
// once any success happens, we stop propagating the poison
O.call_once(|| {});
}
#[test]
fn wait_for_force_to_finish() {
static O: Once = Once::new();
// poison the once
let t = panic::catch_unwind(|| {
O.call_once(|| panic!());
});
assert!(t.is_err());
// make sure someone's waiting inside the once via a force
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let t1 = thread::spawn(move || {
O.call_once_force(|p| {
assert!(p.poisoned());
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
});
rx1.recv().unwrap();
// put another waiter on the once
let t2 = thread::spawn(|| {
let mut called = false;
O.call_once(|| {
called = true;
});
assert!(!called);
});
tx2.send(()).unwrap();
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
}
#[test]
fn test_once_debug() {
static O: Once = Once::new();
assert_eq!(format!("{:?}", O), "Once { state: New }");
}
}