crossbeam_utils/
thread.rs

1//! Threads that can borrow variables from the stack.
2//!
3//! Create a scope when spawned threads need to access variables on the stack:
4//!
5//! ```
6//! use crossbeam_utils::thread;
7//!
8//! let people = vec![
9//!     "Alice".to_string(),
10//!     "Bob".to_string(),
11//!     "Carol".to_string(),
12//! ];
13//!
14//! thread::scope(|s| {
15//!     for person in &people {
16//!         s.spawn(move |_| {
17//!             println!("Hello, {}!", person);
18//!         });
19//!     }
20//! }).unwrap();
21//! ```
22//!
23//! # Why scoped threads?
24//!
25//! Suppose we wanted to re-write the previous example using plain threads:
26//!
27//! ```compile_fail,E0597
28//! use std::thread;
29//!
30//! let people = vec![
31//!     "Alice".to_string(),
32//!     "Bob".to_string(),
33//!     "Carol".to_string(),
34//! ];
35//!
36//! let mut threads = Vec::new();
37//!
38//! for person in &people {
39//!     threads.push(thread::spawn(move || {
40//!         println!("Hello, {}!", person);
41//!     }));
42//! }
43//!
44//! for thread in threads {
45//!     thread.join().unwrap();
46//! }
47//! ```
48//!
49//! This doesn't work because the borrow checker complains about `people` not living long enough:
50//!
51//! ```text
52//! error[E0597]: `people` does not live long enough
53//!   --> src/main.rs:12:20
54//!    |
55//! 12 |     for person in &people {
56//!    |                    ^^^^^^ borrowed value does not live long enough
57//! ...
58//! 21 | }
59//!    | - borrowed value only lives until here
60//!    |
61//!    = note: borrowed value must be valid for the static lifetime...
62//! ```
63//!
64//! The problem here is that spawned threads are not allowed to borrow variables on stack because
65//! the compiler cannot prove they will be joined before `people` is destroyed.
66//!
67//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68//! before the scope ends.
69//!
70//! # How scoped threads work
71//!
72//! If a variable is borrowed by a thread, the thread must complete before the variable is
73//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75//!
76//! A scope creates a clear boundary between variables outside the scope and threads inside the
77//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79//! can safely access variables outside it.
80//!
81//! # Nesting scoped threads
82//!
83//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85//! cannot be borrowed by scoped threads:
86//!
87//! ```compile_fail,E0373,E0521
88//! use crossbeam_utils::thread;
89//!
90//! thread::scope(|s| {
91//!     s.spawn(|_| {
92//!         // Not going to compile because we're trying to borrow `s`,
93//!         // which lives *inside* the scope! :(
94//!         s.spawn(|_| println!("nested thread"));
95//!     });
96//! });
97//! ```
98//!
99//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100//! argument, which can be used for spawning nested threads:
101//!
102//! ```
103//! use crossbeam_utils::thread;
104//!
105//! thread::scope(|s| {
106//!     // Note the `|s|` here.
107//!     s.spawn(|s| {
108//!         // Yay, this works because we're using a fresh argument `s`! :)
109//!         s.spawn(|_| println!("nested thread"));
110//!     });
111//! }).unwrap();
112//! ```
113
114use std::fmt;
115use std::io;
116use std::marker::PhantomData;
117use std::mem;
118use std::panic;
119use std::sync::{Arc, Mutex};
120use std::thread;
121
122use crate::sync::WaitGroup;
123use cfg_if::cfg_if;
124
125type SharedVec<T> = Arc<Mutex<Vec<T>>>;
126type SharedOption<T> = Arc<Mutex<Option<T>>>;
127
128/// Creates a new scope for spawning threads.
129///
130/// All child threads that haven't been manually joined will be automatically joined just before
131/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
132/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
133/// returned containing errors from panicked threads. Note that if panics are implemented by
134/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
135///
136/// # Examples
137///
138/// ```
139/// use crossbeam_utils::thread;
140///
141/// let var = vec![1, 2, 3];
142///
143/// thread::scope(|s| {
144///     s.spawn(|_| {
145///         println!("A child thread borrowing `var`: {:?}", var);
146///     });
147/// }).unwrap();
148/// ```
149pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
150where
151    F: FnOnce(&Scope<'env>) -> R,
152{
153    let wg = WaitGroup::new();
154    let scope = Scope::<'env> {
155        handles: SharedVec::default(),
156        wait_group: wg.clone(),
157        _marker: PhantomData,
158    };
159
160    // Execute the scoped function, but catch any panics.
161    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
162
163    // Wait until all nested scopes are dropped.
164    drop(scope.wait_group);
165    wg.wait();
166
167    // Join all remaining spawned threads.
168    let panics: Vec<_> = scope
169        .handles
170        .lock()
171        .unwrap()
172        // Filter handles that haven't been joined, join them, and collect errors.
173        .drain(..)
174        .filter_map(|handle| handle.lock().unwrap().take())
175        .filter_map(|handle| handle.join().err())
176        .collect();
177
178    // If `f` has panicked, resume unwinding.
179    // If any of the child threads have panicked, return the panic errors.
180    // Otherwise, everything is OK and return the result of `f`.
181    match result {
182        Err(err) => panic::resume_unwind(err),
183        Ok(res) => {
184            if panics.is_empty() {
185                Ok(res)
186            } else {
187                Err(Box::new(panics))
188            }
189        }
190    }
191}
192
193/// A scope for spawning threads.
194pub struct Scope<'env> {
195    /// The list of the thread join handles.
196    handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
197
198    /// Used to wait until all subscopes all dropped.
199    wait_group: WaitGroup,
200
201    /// Borrows data with invariant lifetime `'env`.
202    _marker: PhantomData<&'env mut &'env ()>,
203}
204
205unsafe impl Sync for Scope<'_> {}
206
207impl<'env> Scope<'env> {
208    /// Spawns a scoped thread.
209    ///
210    /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
211    /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
212    /// allowing it to reference variables outside the scope.
213    ///
214    /// The scoped thread is passed a reference to this scope as an argument, which can be used for
215    /// spawning nested threads.
216    ///
217    /// The returned [handle](ScopedJoinHandle) can be used to manually
218    /// [join](ScopedJoinHandle::join) the thread before the scope exits.
219    ///
220    /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
221    /// stack size or the name of the thread, use this API instead.
222    ///
223    /// [`spawn`]: std::thread::spawn
224    ///
225    /// # Panics
226    ///
227    /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
228    /// to recover from such errors.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use crossbeam_utils::thread;
234    ///
235    /// thread::scope(|s| {
236    ///     let handle = s.spawn(|_| {
237    ///         println!("A child thread is running");
238    ///         42
239    ///     });
240    ///
241    ///     // Join the thread and retrieve its result.
242    ///     let res = handle.join().unwrap();
243    ///     assert_eq!(res, 42);
244    /// }).unwrap();
245    /// ```
246    pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
247    where
248        F: FnOnce(&Scope<'env>) -> T,
249        F: Send + 'env,
250        T: Send + 'env,
251    {
252        self.builder()
253            .spawn(f)
254            .expect("failed to spawn scoped thread")
255    }
256
257    /// Creates a builder that can configure a thread before spawning.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// use crossbeam_utils::thread;
263    ///
264    /// thread::scope(|s| {
265    ///     s.builder()
266    ///         .spawn(|_| println!("A child thread is running"))
267    ///         .unwrap();
268    /// }).unwrap();
269    /// ```
270    pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
271        ScopedThreadBuilder {
272            scope: self,
273            builder: thread::Builder::new(),
274        }
275    }
276}
277
278impl fmt::Debug for Scope<'_> {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        f.pad("Scope { .. }")
281    }
282}
283
284/// Configures the properties of a new thread.
285///
286/// The two configurable properties are:
287///
288/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
289/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
290///
291/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
292/// thread handle with the given configuration.
293///
294/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
295/// value. You may want to use this builder when you want to recover from a failure to launch a
296/// thread.
297///
298/// # Examples
299///
300/// ```
301/// use crossbeam_utils::thread;
302///
303/// thread::scope(|s| {
304///     s.builder()
305///         .spawn(|_| println!("Running a child thread"))
306///         .unwrap();
307/// }).unwrap();
308/// ```
309///
310/// [`name`]: ScopedThreadBuilder::name
311/// [`stack_size`]: ScopedThreadBuilder::stack_size
312/// [`spawn`]: ScopedThreadBuilder::spawn
313/// [`io::Result`]: std::io::Result
314/// [naming-threads]: std::thread#naming-threads
315/// [stack-size]: std::thread#stack-size
316#[derive(Debug)]
317pub struct ScopedThreadBuilder<'scope, 'env> {
318    scope: &'scope Scope<'env>,
319    builder: thread::Builder,
320}
321
322impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
323    /// Sets the name for the new thread.
324    ///
325    /// The name must not contain null bytes (`\0`).
326    ///
327    /// For more information about named threads, see [here][naming-threads].
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use crossbeam_utils::thread;
333    /// use std::thread::current;
334    ///
335    /// thread::scope(|s| {
336    ///     s.builder()
337    ///         .name("my thread".to_string())
338    ///         .spawn(|_| assert_eq!(current().name(), Some("my thread")))
339    ///         .unwrap();
340    /// }).unwrap();
341    /// ```
342    ///
343    /// [naming-threads]: std::thread#naming-threads
344    pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
345        self.builder = self.builder.name(name);
346        self
347    }
348
349    /// Sets the size of the stack for the new thread.
350    ///
351    /// The stack size is measured in bytes.
352    ///
353    /// For more information about the stack size for threads, see [here][stack-size].
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use crossbeam_utils::thread;
359    ///
360    /// thread::scope(|s| {
361    ///     s.builder()
362    ///         .stack_size(32 * 1024)
363    ///         .spawn(|_| println!("Running a child thread"))
364    ///         .unwrap();
365    /// }).unwrap();
366    /// ```
367    ///
368    /// [stack-size]: std::thread#stack-size
369    pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
370        self.builder = self.builder.stack_size(size);
371        self
372    }
373
374    /// Spawns a scoped thread with this configuration.
375    ///
376    /// The scoped thread is passed a reference to this scope as an argument, which can be used for
377    /// spawning nested threads.
378    ///
379    /// The returned handle can be used to manually join the thread before the scope exits.
380    ///
381    /// # Errors
382    ///
383    /// Unlike the [`Scope::spawn`] method, this method yields an
384    /// [`io::Result`] to capture any failure to create the thread at
385    /// the OS level.
386    ///
387    /// [`io::Result`]: std::io::Result
388    ///
389    /// # Panics
390    ///
391    /// Panics if a thread name was set and it contained null bytes.
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// use crossbeam_utils::thread;
397    ///
398    /// thread::scope(|s| {
399    ///     let handle = s.builder()
400    ///         .spawn(|_| {
401    ///             println!("A child thread is running");
402    ///             42
403    ///         })
404    ///         .unwrap();
405    ///
406    ///     // Join the thread and retrieve its result.
407    ///     let res = handle.join().unwrap();
408    ///     assert_eq!(res, 42);
409    /// }).unwrap();
410    /// ```
411    pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
412    where
413        F: FnOnce(&Scope<'env>) -> T,
414        F: Send + 'env,
415        T: Send + 'env,
416    {
417        // The result of `f` will be stored here.
418        let result = SharedOption::default();
419
420        // Spawn the thread and grab its join handle and thread handle.
421        let (handle, thread) = {
422            let result = Arc::clone(&result);
423
424            // A clone of the scope that will be moved into the new thread.
425            let scope = Scope::<'env> {
426                handles: Arc::clone(&self.scope.handles),
427                wait_group: self.scope.wait_group.clone(),
428                _marker: PhantomData,
429            };
430
431            // Spawn the thread.
432            let handle = {
433                let closure = move || {
434                    // Make sure the scope is inside the closure with the proper `'env` lifetime.
435                    let scope: Scope<'env> = scope;
436
437                    // Run the closure.
438                    let res = f(&scope);
439
440                    // Store the result if the closure didn't panic.
441                    *result.lock().unwrap() = Some(res);
442                };
443
444                // Allocate `closure` on the heap and erase the `'env` bound.
445                let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
446                let closure: Box<dyn FnOnce() + Send + 'static> =
447                    unsafe { mem::transmute(closure) };
448
449                // Finally, spawn the closure.
450                self.builder.spawn(closure)?
451            };
452
453            let thread = handle.thread().clone();
454            let handle = Arc::new(Mutex::new(Some(handle)));
455            (handle, thread)
456        };
457
458        // Add the handle to the shared list of join handles.
459        self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
460
461        Ok(ScopedJoinHandle {
462            handle,
463            result,
464            thread,
465            _marker: PhantomData,
466        })
467    }
468}
469
470unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
471unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
472
473/// A handle that can be used to join its scoped thread.
474///
475/// This struct is created by the [`Scope::spawn`] method and the
476/// [`ScopedThreadBuilder::spawn`] method.
477pub struct ScopedJoinHandle<'scope, T> {
478    /// A join handle to the spawned thread.
479    handle: SharedOption<thread::JoinHandle<()>>,
480
481    /// Holds the result of the inner closure.
482    result: SharedOption<T>,
483
484    /// A handle to the the spawned thread.
485    thread: thread::Thread,
486
487    /// Borrows the parent scope with lifetime `'scope`.
488    _marker: PhantomData<&'scope ()>,
489}
490
491impl<T> ScopedJoinHandle<'_, T> {
492    /// Waits for the thread to finish and returns its result.
493    ///
494    /// If the child thread panics, an error is returned. Note that if panics are implemented by
495    /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
496    ///
497    /// # Panics
498    ///
499    /// This function may panic on some platforms if a thread attempts to join itself or otherwise
500    /// may create a deadlock with joining threads.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// use crossbeam_utils::thread;
506    ///
507    /// thread::scope(|s| {
508    ///     let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
509    ///     let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
510    ///
511    ///     // Join the first thread and verify that it succeeded.
512    ///     let res = handle1.join();
513    ///     assert!(res.is_ok());
514    ///
515    ///     // Join the second thread and verify that it panicked.
516    ///     let res = handle2.join();
517    ///     assert!(res.is_err());
518    /// }).unwrap();
519    /// ```
520    pub fn join(self) -> thread::Result<T> {
521        // Take out the handle. The handle will surely be available because the root scope waits
522        // for nested scopes before joining remaining threads.
523        let handle = self.handle.lock().unwrap().take().unwrap();
524
525        // Join the thread and then take the result out of its inner closure.
526        handle
527            .join()
528            .map(|()| self.result.lock().unwrap().take().unwrap())
529    }
530
531    /// Returns a handle to the underlying thread.
532    ///
533    /// # Examples
534    ///
535    /// ```
536    /// use crossbeam_utils::thread;
537    ///
538    /// thread::scope(|s| {
539    ///     let handle = s.spawn(|_| println!("A child thread is running"));
540    ///     println!("The child thread ID: {:?}", handle.thread().id());
541    /// }).unwrap();
542    /// ```
543    pub fn thread(&self) -> &thread::Thread {
544        &self.thread
545    }
546}
547
548cfg_if! {
549    if #[cfg(unix)] {
550        use std::os::unix::thread::{JoinHandleExt, RawPthread};
551
552        impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
553            fn as_pthread_t(&self) -> RawPthread {
554                // Borrow the handle. The handle will surely be available because the root scope waits
555                // for nested scopes before joining remaining threads.
556                let handle = self.handle.lock().unwrap();
557                handle.as_ref().unwrap().as_pthread_t()
558            }
559            fn into_pthread_t(self) -> RawPthread {
560                self.as_pthread_t()
561            }
562        }
563    } else if #[cfg(windows)] {
564        use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
565
566        impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
567            fn as_raw_handle(&self) -> RawHandle {
568                // Borrow the handle. The handle will surely be available because the root scope waits
569                // for nested scopes before joining remaining threads.
570                let handle = self.handle.lock().unwrap();
571                handle.as_ref().unwrap().as_raw_handle()
572            }
573        }
574
575        impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
576            fn into_raw_handle(self) -> RawHandle {
577                self.as_raw_handle()
578            }
579        }
580    }
581}
582
583impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585        f.pad("ScopedJoinHandle { .. }")
586    }
587}