blocking/
lib.rs

1//! A thread pool for isolating blocking I/O in async programs.
2//!
3//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5//! solutions, they're not always available or ideal.
6//!
7//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8//! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9//! current number of running I/O jobs.
10//!
11//! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12//! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13//! next job or shuts down after a certain timeout.
14//!
15//! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment
16//! variable with value between 1 and 10000.
17//!
18//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
19//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
20//! [io_uring]: https://lwn.net/Articles/776703
21//!
22//! # Examples
23//!
24//! Read the contents of a file:
25//!
26//! ```no_run
27//! use blocking::unblock;
28//! use std::fs;
29//!
30//! # futures_lite::future::block_on(async {
31//! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
32//! println!("{}", contents);
33//! # std::io::Result::Ok(()) });
34//! ```
35//!
36//! Read a file and pipe its contents to stdout:
37//!
38//! ```no_run
39//! use blocking::{unblock, Unblock};
40//! use futures_lite::io;
41//! use std::fs::File;
42//!
43//! # futures_lite::future::block_on(async {
44//! let input = unblock(|| File::open("file.txt")).await?;
45//! let input = Unblock::new(input);
46//! let mut output = Unblock::new(std::io::stdout());
47//!
48//! io::copy(input, &mut output).await?;
49//! # std::io::Result::Ok(()) });
50//! ```
51//!
52//! Iterate over the contents of a directory:
53//!
54//! ```no_run
55//! use blocking::Unblock;
56//! use futures_lite::prelude::*;
57//! use std::fs;
58//!
59//! # futures_lite::future::block_on(async {
60//! let mut dir = Unblock::new(fs::read_dir(".")?);
61//! while let Some(item) = dir.next().await {
62//!     println!("{}", item?.file_name().to_string_lossy());
63//! }
64//! # std::io::Result::Ok(()) });
65//! ```
66//!
67//! Spawn a process:
68//!
69//! ```no_run
70//! use blocking::unblock;
71//! use std::process::Command;
72//!
73//! # futures_lite::future::block_on(async {
74//! let out = unblock(|| Command::new("dir").output()).await?;
75//! # std::io::Result::Ok(()) });
76//! ```
77
78#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
79
80use std::any::Any;
81use std::collections::VecDeque;
82use std::env;
83use std::fmt;
84use std::io::{self, Read, Seek, SeekFrom, Write};
85use std::mem;
86use std::panic;
87use std::pin::Pin;
88use std::slice;
89use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
90use std::sync::{Arc, Condvar, Mutex, MutexGuard};
91use std::task::{Context, Poll};
92use std::thread;
93use std::time::Duration;
94
95use async_channel::{bounded, Receiver};
96use async_lock::OnceCell;
97use async_task::Runnable;
98use atomic_waker::AtomicWaker;
99use futures_lite::{future, prelude::*, ready};
100
101#[doc(no_inline)]
102pub use async_task::Task;
103
104/// Default value for max threads that Executor can grow to
105const DEFAULT_MAX_THREADS: usize = 500;
106
107/// Minimum value for max threads config
108const MIN_MAX_THREADS: usize = 1;
109
110/// Maximum value for max threads config
111const MAX_MAX_THREADS: usize = 10000;
112
113/// Env variable that allows to override default value for max threads.
114const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
115
116/// The blocking executor.
117struct Executor {
118    /// Inner state of the executor.
119    inner: Mutex<Inner>,
120
121    /// Used to put idle threads to sleep and wake them up when new work comes in.
122    cvar: Condvar,
123
124    /// Maximum number of threads in the pool
125    thread_limit: usize,
126}
127
128/// Inner state of the blocking executor.
129struct Inner {
130    /// Number of idle threads in the pool.
131    ///
132    /// Idle threads are sleeping, waiting to get a task to run.
133    idle_count: usize,
134
135    /// Total number of threads in the pool.
136    ///
137    /// This is the number of idle threads + the number of active threads.
138    thread_count: usize,
139
140    /// The queue of blocking tasks.
141    queue: VecDeque<Runnable>,
142}
143
144impl Executor {
145    fn max_threads() -> usize {
146        match env::var(MAX_THREADS_ENV) {
147            Ok(v) => v
148                .parse::<usize>()
149                .map(|v| v.max(MIN_MAX_THREADS).min(MAX_MAX_THREADS))
150                .unwrap_or(DEFAULT_MAX_THREADS),
151            Err(_) => DEFAULT_MAX_THREADS,
152        }
153    }
154
155    /// Spawns a future onto this executor.
156    ///
157    /// Returns a [`Task`] handle for the spawned task.
158    fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
159        static EXECUTOR: OnceCell<Executor> = OnceCell::new();
160
161        let (runnable, task) = async_task::spawn(future, |r| {
162            // Initialize the executor if we haven't already.
163            let executor = EXECUTOR.get_or_init_blocking(|| {
164                let thread_limit = Self::max_threads();
165                Executor {
166                    inner: Mutex::new(Inner {
167                        idle_count: 0,
168                        thread_count: 0,
169                        queue: VecDeque::new(),
170                    }),
171                    cvar: Condvar::new(),
172                    thread_limit,
173                }
174            });
175
176            // Schedule the task on our executor.
177            executor.schedule(r)
178        });
179        runnable.schedule();
180        task
181    }
182
183    /// Runs the main loop on the current thread.
184    ///
185    /// This function runs blocking tasks until it becomes idle and times out.
186    fn main_loop(&'static self) {
187        let mut inner = self.inner.lock().unwrap();
188        loop {
189            // This thread is not idle anymore because it's going to run tasks.
190            inner.idle_count -= 1;
191
192            // Run tasks in the queue.
193            while let Some(runnable) = inner.queue.pop_front() {
194                // We have found a task - grow the pool if needed.
195                self.grow_pool(inner);
196
197                // Run the task.
198                panic::catch_unwind(|| runnable.run()).ok();
199
200                // Re-lock the inner state and continue.
201                inner = self.inner.lock().unwrap();
202            }
203
204            // This thread is now becoming idle.
205            inner.idle_count += 1;
206
207            // Put the thread to sleep until another task is scheduled.
208            let timeout = Duration::from_millis(500);
209            let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
210            inner = lock;
211
212            // If there are no tasks after a while, stop this thread.
213            if res.timed_out() && inner.queue.is_empty() {
214                inner.idle_count -= 1;
215                inner.thread_count -= 1;
216                break;
217            }
218        }
219    }
220
221    /// Schedules a runnable task for execution.
222    fn schedule(&'static self, runnable: Runnable) {
223        let mut inner = self.inner.lock().unwrap();
224        inner.queue.push_back(runnable);
225
226        // Notify a sleeping thread and spawn more threads if needed.
227        self.cvar.notify_one();
228        self.grow_pool(inner);
229    }
230
231    /// Spawns more blocking threads if the pool is overloaded with work.
232    fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
233        // If runnable tasks greatly outnumber idle threads and there aren't too many threads
234        // already, then be aggressive: wake all idle threads and spawn one more thread.
235        while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < self.thread_limit {
236            // The new thread starts in idle state.
237            inner.idle_count += 1;
238            inner.thread_count += 1;
239
240            // Notify all existing idle threads because we need to hurry up.
241            self.cvar.notify_all();
242
243            // Generate a new thread ID.
244            static ID: AtomicUsize = AtomicUsize::new(1);
245            let id = ID.fetch_add(1, Ordering::Relaxed);
246
247            // Spawn the new thread.
248            thread::Builder::new()
249                .name(format!("blocking-{}", id))
250                .spawn(move || self.main_loop())
251                .unwrap();
252        }
253    }
254}
255
256/// Runs blocking code on a thread pool.
257///
258/// # Examples
259///
260/// Read the contents of a file:
261///
262/// ```no_run
263/// use blocking::unblock;
264/// use std::fs;
265///
266/// # futures_lite::future::block_on(async {
267/// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
268/// # std::io::Result::Ok(()) });
269/// ```
270///
271/// Spawn a process:
272///
273/// ```no_run
274/// use blocking::unblock;
275/// use std::process::Command;
276///
277/// # futures_lite::future::block_on(async {
278/// let out = unblock(|| Command::new("dir").output()).await?;
279/// # std::io::Result::Ok(()) });
280/// ```
281pub fn unblock<T, F>(f: F) -> Task<T>
282where
283    F: FnOnce() -> T + Send + 'static,
284    T: Send + 'static,
285{
286    Executor::spawn(async move { f() })
287}
288
289/// Runs blocking I/O on a thread pool.
290///
291/// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
292/// special thread pool while exposing a familiar async interface.
293///
294/// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
295/// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
296///
297/// # Caveats
298///
299/// [`Unblock`] is a low-level primitive, and as such it comes with some caveats.
300///
301/// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or
302/// [`async-process`] (on Windows).
303///
304/// [`async-fs`]: https://github.com/smol-rs/async-fs
305/// [`async-process`]: https://github.com/smol-rs/async-process
306///
307/// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
308/// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading
309/// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it
310/// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them
311/// into the I/O handle.
312///
313/// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
314///
315/// ### Reading
316///
317/// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it,
318/// and then drop it, a blocked read operation may keep hanging on the thread pool. The next
319/// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult
320/// problem to solve, so make sure you only use a single stdin handle for the duration of the
321/// entire program.
322///
323/// ### Writing
324///
325/// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
326/// [`Unblock`] handle or some buffered data might get lost.
327///
328/// ### Seeking
329///
330/// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single
331/// read operation may move the file cursor farther than is the span of the operation. In fact,
332/// reading just keeps going in the background until the pipe gets full. Keep this mind when
333/// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets.
334///
335/// # Examples
336///
337/// ```
338/// use blocking::Unblock;
339/// use futures_lite::prelude::*;
340///
341/// # futures_lite::future::block_on(async {
342/// let mut stdout = Unblock::new(std::io::stdout());
343/// stdout.write_all(b"Hello world!").await?;
344/// stdout.flush().await?;
345/// # std::io::Result::Ok(()) });
346/// ```
347pub struct Unblock<T> {
348    state: State<T>,
349    cap: Option<usize>,
350}
351
352impl<T> Unblock<T> {
353    /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
354    ///
355    /// # Examples
356    ///
357    /// ```no_run
358    /// use blocking::Unblock;
359    ///
360    /// let stdin = Unblock::new(std::io::stdin());
361    /// ```
362    pub fn new(io: T) -> Unblock<T> {
363        Unblock {
364            state: State::Idle(Some(Box::new(io))),
365            cap: None,
366        }
367    }
368
369    /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
370    /// capacity.
371    ///
372    /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
373    /// transferred between blocking and async code goes through a buffer of limited capacity. This
374    /// constructor configures that capacity.
375    ///
376    /// The default capacity is:
377    ///
378    /// * For [`Iterator`] types: 8192 items.
379    /// * For [`Read`]/[`Write`] types: 8 MB.
380    ///
381    /// # Examples
382    ///
383    /// ```no_run
384    /// use blocking::Unblock;
385    ///
386    /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
387    /// ```
388    pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
389        Unblock {
390            state: State::Idle(Some(Box::new(io))),
391            cap: Some(cap),
392        }
393    }
394
395    /// Gets a mutable reference to the blocking I/O handle.
396    ///
397    /// This is an async method because the I/O handle might be on the thread pool and needs to
398    /// be moved onto the current thread before we can get a reference to it.
399    ///
400    /// # Examples
401    ///
402    /// ```no_run
403    /// use blocking::{unblock, Unblock};
404    /// use std::fs::File;
405    ///
406    /// # futures_lite::future::block_on(async {
407    /// let file = unblock(|| File::create("file.txt")).await?;
408    /// let mut file = Unblock::new(file);
409    ///
410    /// let metadata = file.get_mut().await.metadata()?;
411    /// # std::io::Result::Ok(()) });
412    /// ```
413    pub async fn get_mut(&mut self) -> &mut T {
414        // Wait for the running task to stop and ignore I/O errors if there are any.
415        future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
416
417        // Assume idle state and get a reference to the inner value.
418        match &mut self.state {
419            State::Idle(t) => t.as_mut().expect("inner value was taken out"),
420            State::WithMut(..)
421            | State::Streaming(..)
422            | State::Reading(..)
423            | State::Writing(..)
424            | State::Seeking(..) => {
425                unreachable!("when stopped, the state machine must be in idle state");
426            }
427        }
428    }
429
430    /// Performs a blocking operation on the I/O handle.
431    ///
432    /// # Examples
433    ///
434    /// ```no_run
435    /// use blocking::{unblock, Unblock};
436    /// use std::fs::File;
437    ///
438    /// # futures_lite::future::block_on(async {
439    /// let file = unblock(|| File::create("file.txt")).await?;
440    /// let mut file = Unblock::new(file);
441    ///
442    /// let metadata = file.with_mut(|f| f.metadata()).await?;
443    /// # std::io::Result::Ok(()) });
444    /// ```
445    pub async fn with_mut<R, F>(&mut self, op: F) -> R
446    where
447        F: FnOnce(&mut T) -> R + Send + 'static,
448        R: Send + 'static,
449        T: Send + 'static,
450    {
451        // Wait for the running task to stop and ignore I/O errors if there are any.
452        future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
453
454        // Assume idle state and take out the inner value.
455        let mut t = match &mut self.state {
456            State::Idle(t) => t.take().expect("inner value was taken out"),
457            State::WithMut(..)
458            | State::Streaming(..)
459            | State::Reading(..)
460            | State::Writing(..)
461            | State::Seeking(..) => {
462                unreachable!("when stopped, the state machine must be in idle state");
463            }
464        };
465
466        let (sender, receiver) = bounded(1);
467        let task = Executor::spawn(async move {
468            sender.try_send(op(&mut t)).ok();
469            t
470        });
471        self.state = State::WithMut(task);
472
473        receiver
474            .recv()
475            .await
476            .expect("`Unblock::with_mut()` operation has panicked")
477    }
478
479    /// Extracts the inner blocking I/O handle.
480    ///
481    /// This is an async method because the I/O handle might be on the thread pool and needs to
482    /// be moved onto the current thread before we can extract it.
483    ///
484    /// # Examples
485    ///
486    /// ```no_run
487    /// use blocking::{unblock, Unblock};
488    /// use futures_lite::prelude::*;
489    /// use std::fs::File;
490    ///
491    /// # futures_lite::future::block_on(async {
492    /// let file = unblock(|| File::create("file.txt")).await?;
493    /// let file = Unblock::new(file);
494    ///
495    /// let file = file.into_inner().await;
496    /// # std::io::Result::Ok(()) });
497    /// ```
498    pub async fn into_inner(self) -> T {
499        // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
500        // bind `self` to a local mutable variable.
501        let mut this = self;
502
503        // Wait for the running task to stop and ignore I/O errors if there are any.
504        future::poll_fn(|cx| this.poll_stop(cx)).await.ok();
505
506        // Assume idle state and extract the inner value.
507        match &mut this.state {
508            State::Idle(t) => *t.take().expect("inner value was taken out"),
509            State::WithMut(..)
510            | State::Streaming(..)
511            | State::Reading(..)
512            | State::Writing(..)
513            | State::Seeking(..) => {
514                unreachable!("when stopped, the state machine must be in idle state");
515            }
516        }
517    }
518
519    /// Waits for the running task to stop.
520    ///
521    /// On success, the state machine is moved into the idle state.
522    fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
523        loop {
524            match &mut self.state {
525                State::Idle(_) => return Poll::Ready(Ok(())),
526
527                State::WithMut(task) => {
528                    // Poll the task to wait for it to finish.
529                    let io = ready!(Pin::new(task).poll(cx));
530                    self.state = State::Idle(Some(io));
531                }
532
533                State::Streaming(any, task) => {
534                    // Drop the receiver to close the channel. This stops the `send()` operation in
535                    // the task, after which the task returns the iterator back.
536                    any.take();
537
538                    // Poll the task to retrieve the iterator.
539                    let iter = ready!(Pin::new(task).poll(cx));
540                    self.state = State::Idle(Some(iter));
541                }
542
543                State::Reading(reader, task) => {
544                    // Drop the reader to close the pipe. This stops copying inside the task, after
545                    // which the task returns the I/O handle back.
546                    reader.take();
547
548                    // Poll the task to retrieve the I/O handle.
549                    let (res, io) = ready!(Pin::new(task).poll(cx));
550                    // Make sure to move into the idle state before reporting errors.
551                    self.state = State::Idle(Some(io));
552                    res?;
553                }
554
555                State::Writing(writer, task) => {
556                    // Drop the writer to close the pipe. This stops copying inside the task, after
557                    // which the task flushes the I/O handle and
558                    writer.take();
559
560                    // Poll the task to retrieve the I/O handle.
561                    let (res, io) = ready!(Pin::new(task).poll(cx));
562                    // Make sure to move into the idle state before reporting errors.
563                    self.state = State::Idle(Some(io));
564                    res?;
565                }
566
567                State::Seeking(task) => {
568                    // Poll the task to wait for it to finish.
569                    let (_, res, io) = ready!(Pin::new(task).poll(cx));
570                    // Make sure to move into the idle state before reporting errors.
571                    self.state = State::Idle(Some(io));
572                    res?;
573                }
574            }
575        }
576    }
577}
578
579impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581        struct Closed;
582        impl fmt::Debug for Closed {
583            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584                f.write_str("<closed>")
585            }
586        }
587
588        struct Blocked;
589        impl fmt::Debug for Blocked {
590            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591                f.write_str("<blocked>")
592            }
593        }
594
595        match &self.state {
596            State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
597            State::Idle(Some(io)) => {
598                let io: &T = io;
599                f.debug_struct("Unblock").field("io", io).finish()
600            }
601            State::WithMut(..)
602            | State::Streaming(..)
603            | State::Reading(..)
604            | State::Writing(..)
605            | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
606        }
607    }
608}
609
610/// Current state of a blocking task.
611enum State<T> {
612    /// There is no blocking task.
613    ///
614    /// The inner value is readily available, unless it has already been extracted. The value is
615    /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
616    /// [`Unblock`].
617    Idle(Option<Box<T>>),
618
619    /// A [`Unblock::with_mut()`] closure was spawned and is still running.
620    WithMut(Task<Box<T>>),
621
622    /// The inner value is an [`Iterator`] currently iterating in a task.
623    ///
624    /// The `dyn Any` value here is a `mpsc::Receiver<<T as Iterator>::Item>`.
625    Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>),
626
627    /// The inner value is a [`Read`] currently reading in a task.
628    Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
629
630    /// The inner value is a [`Write`] currently writing in a task.
631    Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
632
633    /// The inner value is a [`Seek`] currently seeking in a task.
634    Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
635}
636
637impl<T: Iterator + Send + 'static> Stream for Unblock<T>
638where
639    T::Item: Send + 'static,
640{
641    type Item = T::Item;
642
643    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
644        loop {
645            match &mut self.state {
646                // If not in idle or active streaming state, stop the running task.
647                State::WithMut(..)
648                | State::Streaming(None, _)
649                | State::Reading(..)
650                | State::Writing(..)
651                | State::Seeking(..) => {
652                    // Wait for the running task to stop.
653                    ready!(self.poll_stop(cx)).ok();
654                }
655
656                // If idle, start a streaming task.
657                State::Idle(iter) => {
658                    // Take the iterator out to run it on a blocking task.
659                    let mut iter = iter.take().expect("inner iterator was taken out");
660
661                    // This channel capacity seems to work well in practice. If it's too low, there
662                    // will be too much synchronization between tasks. If too high, memory
663                    // consumption increases.
664                    let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
665
666                    // Spawn a blocking task that runs the iterator and returns it when done.
667                    let task = Executor::spawn(async move {
668                        for item in &mut iter {
669                            if sender.send(item).await.is_err() {
670                                break;
671                            }
672                        }
673                        iter
674                    });
675
676                    // Move into the busy state and poll again.
677                    self.state = State::Streaming(Some(Box::new(receiver)), task);
678                }
679
680                // If streaming, receive an item.
681                State::Streaming(Some(any), task) => {
682                    let receiver = any.downcast_mut::<Receiver<T::Item>>().unwrap();
683
684                    // Poll the channel.
685                    let opt = ready!(Pin::new(receiver).poll_next(cx));
686
687                    // If the channel is closed, retrieve the iterator back from the blocking task.
688                    // This is not really a required step, but it's cleaner to drop the iterator on
689                    // the same thread that created it.
690                    if opt.is_none() {
691                        // Poll the task to retrieve the iterator.
692                        let iter = ready!(Pin::new(task).poll(cx));
693                        self.state = State::Idle(Some(iter));
694                    }
695
696                    return Poll::Ready(opt);
697                }
698            }
699        }
700    }
701}
702
703impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
704    fn poll_read(
705        mut self: Pin<&mut Self>,
706        cx: &mut Context<'_>,
707        buf: &mut [u8],
708    ) -> Poll<io::Result<usize>> {
709        loop {
710            match &mut self.state {
711                // If not in idle or active reading state, stop the running task.
712                State::WithMut(..)
713                | State::Reading(None, _)
714                | State::Streaming(..)
715                | State::Writing(..)
716                | State::Seeking(..) => {
717                    // Wait for the running task to stop.
718                    ready!(self.poll_stop(cx))?;
719                }
720
721                // If idle, start a reading task.
722                State::Idle(io) => {
723                    // Take the I/O handle out to read it on a blocking task.
724                    let mut io = io.take().expect("inner value was taken out");
725
726                    // This pipe capacity seems to work well in practice. If it's too low, there
727                    // will be too much synchronization between tasks. If too high, memory
728                    // consumption increases.
729                    let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
730
731                    // Spawn a blocking task that reads and returns the I/O handle when done.
732                    let task = Executor::spawn(async move {
733                        // Copy bytes from the I/O handle into the pipe until the pipe is closed or
734                        // an error occurs.
735                        loop {
736                            match future::poll_fn(|cx| writer.fill(cx, &mut io)).await {
737                                Ok(0) => return (Ok(()), io),
738                                Ok(_) => {}
739                                Err(err) => return (Err(err), io),
740                            }
741                        }
742                    });
743
744                    // Move into the busy state and poll again.
745                    self.state = State::Reading(Some(reader), task);
746                }
747
748                // If reading, read bytes from the pipe.
749                State::Reading(Some(reader), task) => {
750                    // Poll the pipe.
751                    let n = ready!(reader.drain(cx, buf))?;
752
753                    // If the pipe is closed, retrieve the I/O handle back from the blocking task.
754                    // This is not really a required step, but it's cleaner to drop the handle on
755                    // the same thread that created it.
756                    if n == 0 {
757                        // Poll the task to retrieve the I/O handle.
758                        let (res, io) = ready!(Pin::new(task).poll(cx));
759                        // Make sure to move into the idle state before reporting errors.
760                        self.state = State::Idle(Some(io));
761                        res?;
762                    }
763
764                    return Poll::Ready(Ok(n));
765                }
766            }
767        }
768    }
769}
770
771impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
772    fn poll_write(
773        mut self: Pin<&mut Self>,
774        cx: &mut Context<'_>,
775        buf: &[u8],
776    ) -> Poll<io::Result<usize>> {
777        loop {
778            match &mut self.state {
779                // If not in idle or active writing state, stop the running task.
780                State::WithMut(..)
781                | State::Writing(None, _)
782                | State::Streaming(..)
783                | State::Reading(..)
784                | State::Seeking(..) => {
785                    // Wait for the running task to stop.
786                    ready!(self.poll_stop(cx))?;
787                }
788
789                // If idle, start the writing task.
790                State::Idle(io) => {
791                    // Take the I/O handle out to write on a blocking task.
792                    let mut io = io.take().expect("inner value was taken out");
793
794                    // This pipe capacity seems to work well in practice. If it's too low, there will
795                    // be too much synchronization between tasks. If too high, memory consumption
796                    // increases.
797                    let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
798
799                    // Spawn a blocking task that writes and returns the I/O handle when done.
800                    let task = Executor::spawn(async move {
801                        // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
802                        // error occurs. Flush the I/O handle at the end.
803                        loop {
804                            match future::poll_fn(|cx| reader.drain(cx, &mut io)).await {
805                                Ok(0) => return (io.flush(), io),
806                                Ok(_) => {}
807                                Err(err) => {
808                                    io.flush().ok();
809                                    return (Err(err), io);
810                                }
811                            }
812                        }
813                    });
814
815                    // Move into the busy state and poll again.
816                    self.state = State::Writing(Some(writer), task);
817                }
818
819                // If writing, write more bytes into the pipe.
820                State::Writing(Some(writer), _) => return writer.fill(cx, buf),
821            }
822        }
823    }
824
825    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
826        loop {
827            match &mut self.state {
828                // If not in idle state, stop the running task.
829                State::WithMut(..)
830                | State::Streaming(..)
831                | State::Writing(..)
832                | State::Reading(..)
833                | State::Seeking(..) => {
834                    // Wait for the running task to stop.
835                    ready!(self.poll_stop(cx))?;
836                }
837
838                // Idle implies flushed.
839                State::Idle(_) => return Poll::Ready(Ok(())),
840            }
841        }
842    }
843
844    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
845        // First, make sure the I/O handle is flushed.
846        ready!(Pin::new(&mut self).poll_flush(cx))?;
847
848        // Then move into the idle state with no I/O handle, thus dropping it.
849        self.state = State::Idle(None);
850        Poll::Ready(Ok(()))
851    }
852}
853
854impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
855    fn poll_seek(
856        mut self: Pin<&mut Self>,
857        cx: &mut Context<'_>,
858        pos: SeekFrom,
859    ) -> Poll<io::Result<u64>> {
860        loop {
861            match &mut self.state {
862                // If not in idle state, stop the running task.
863                State::WithMut(..)
864                | State::Streaming(..)
865                | State::Reading(..)
866                | State::Writing(..) => {
867                    // Wait for the running task to stop.
868                    ready!(self.poll_stop(cx))?;
869                }
870
871                State::Idle(io) => {
872                    // Take the I/O handle out to seek on a blocking task.
873                    let mut io = io.take().expect("inner value was taken out");
874
875                    let task = Executor::spawn(async move {
876                        let res = io.seek(pos);
877                        (pos, res, io)
878                    });
879                    self.state = State::Seeking(task);
880                }
881
882                State::Seeking(task) => {
883                    // Poll the task to wait for it to finish.
884                    let (original_pos, res, io) = ready!(Pin::new(task).poll(cx));
885                    // Make sure to move into the idle state before reporting errors.
886                    self.state = State::Idle(Some(io));
887                    let current = res?;
888
889                    // If the `pos` argument matches the original one, return the result.
890                    if original_pos == pos {
891                        return Poll::Ready(Ok(current));
892                    }
893                }
894            }
895        }
896    }
897}
898
899/// Creates a bounded single-producer single-consumer pipe.
900///
901/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
902///
903/// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
904/// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
905///
906/// When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
907/// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
908fn pipe(cap: usize) -> (Reader, Writer) {
909    assert!(cap > 0, "capacity must be positive");
910    assert!(cap.checked_mul(2).is_some(), "capacity is too large");
911
912    // Allocate the ring buffer.
913    let mut v = Vec::with_capacity(cap);
914    let buffer = v.as_mut_ptr();
915    mem::forget(v);
916
917    let inner = Arc::new(Pipe {
918        head: AtomicUsize::new(0),
919        tail: AtomicUsize::new(0),
920        reader: AtomicWaker::new(),
921        writer: AtomicWaker::new(),
922        closed: AtomicBool::new(false),
923        buffer,
924        cap,
925    });
926
927    let r = Reader {
928        inner: inner.clone(),
929        head: 0,
930        tail: 0,
931    };
932
933    let w = Writer {
934        inner,
935        head: 0,
936        tail: 0,
937        zeroed_until: 0,
938    };
939
940    (r, w)
941}
942
943/// The reading side of a pipe.
944struct Reader {
945    /// The inner ring buffer.
946    inner: Arc<Pipe>,
947
948    /// The head index, moved by the reader, in the range `0..2*cap`.
949    ///
950    /// This index always matches `inner.head`.
951    head: usize,
952
953    /// The tail index, moved by the writer, in the range `0..2*cap`.
954    ///
955    /// This index is a snapshot of `index.tail` that might become stale at any point.
956    tail: usize,
957}
958
959/// The writing side of a pipe.
960struct Writer {
961    /// The inner ring buffer.
962    inner: Arc<Pipe>,
963
964    /// The head index, moved by the reader, in the range `0..2*cap`.
965    ///
966    /// This index is a snapshot of `index.head` that might become stale at any point.
967    head: usize,
968
969    /// The tail index, moved by the writer, in the range `0..2*cap`.
970    ///
971    /// This index always matches `inner.tail`.
972    tail: usize,
973
974    /// How many bytes at the beginning of the buffer have been zeroed.
975    ///
976    /// The pipe allocates an uninitialized buffer, and we must be careful about passing
977    /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
978    /// expensive, so we zero it in smaller chunks as the writer makes progress.
979    zeroed_until: usize,
980}
981
982unsafe impl Send for Reader {}
983unsafe impl Send for Writer {}
984
985/// The inner ring buffer.
986///
987/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
988/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
989///
990/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
991/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
992/// could mean the pipe is either empty or full, but we don't know which!
993struct Pipe {
994    /// The head index, moved by the reader, in the range `0..2*cap`.
995    head: AtomicUsize,
996
997    /// The tail index, moved by the writer, in the range `0..2*cap`.
998    tail: AtomicUsize,
999
1000    /// A waker representing the blocked reader.
1001    reader: AtomicWaker,
1002
1003    /// A waker representing the blocked writer.
1004    writer: AtomicWaker,
1005
1006    /// Set to `true` if the reader or writer was dropped.
1007    closed: AtomicBool,
1008
1009    /// The byte buffer.
1010    buffer: *mut u8,
1011
1012    /// The buffer capacity.
1013    cap: usize,
1014}
1015
1016unsafe impl Sync for Pipe {}
1017unsafe impl Send for Pipe {}
1018
1019impl Drop for Pipe {
1020    fn drop(&mut self) {
1021        // Deallocate the byte buffer.
1022        unsafe {
1023            Vec::from_raw_parts(self.buffer, 0, self.cap);
1024        }
1025    }
1026}
1027
1028impl Drop for Reader {
1029    fn drop(&mut self) {
1030        // Dropping closes the pipe and then wakes the writer.
1031        self.inner.closed.store(true, Ordering::SeqCst);
1032        self.inner.writer.wake();
1033    }
1034}
1035
1036impl Drop for Writer {
1037    fn drop(&mut self) {
1038        // Dropping closes the pipe and then wakes the reader.
1039        self.inner.closed.store(true, Ordering::SeqCst);
1040        self.inner.reader.wake();
1041    }
1042}
1043
1044impl Reader {
1045    /// Reads bytes from this reader and writes into blocking `dest`.
1046    fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>> {
1047        let cap = self.inner.cap;
1048
1049        // Calculates the distance between two indices.
1050        let distance = |a: usize, b: usize| {
1051            if a <= b {
1052                b - a
1053            } else {
1054                2 * cap - (a - b)
1055            }
1056        };
1057
1058        // If the pipe appears to be empty...
1059        if distance(self.head, self.tail) == 0 {
1060            // Reload the tail in case it's become stale.
1061            self.tail = self.inner.tail.load(Ordering::Acquire);
1062
1063            // If the pipe is now really empty...
1064            if distance(self.head, self.tail) == 0 {
1065                // Register the waker.
1066                self.inner.reader.register(cx.waker());
1067                atomic::fence(Ordering::SeqCst);
1068
1069                // Reload the tail after registering the waker.
1070                self.tail = self.inner.tail.load(Ordering::Acquire);
1071
1072                // If the pipe is still empty...
1073                if distance(self.head, self.tail) == 0 {
1074                    // Check whether the pipe is closed or just empty.
1075                    if self.inner.closed.load(Ordering::Relaxed) {
1076                        return Poll::Ready(Ok(0));
1077                    } else {
1078                        return Poll::Pending;
1079                    }
1080                }
1081            }
1082        }
1083
1084        // The pipe is not empty so remove the waker.
1085        self.inner.reader.take();
1086
1087        // Yield with some small probability - this improves fairness.
1088        ready!(maybe_yield(cx));
1089
1090        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1091        let real_index = |i: usize| {
1092            if i < cap {
1093                i
1094            } else {
1095                i - cap
1096            }
1097        };
1098
1099        // Number of bytes read so far.
1100        let mut count = 0;
1101
1102        loop {
1103            // Calculate how many bytes to read in this iteration.
1104            let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
1105                .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
1106                .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
1107
1108            // Create a slice of data in the pipe buffer.
1109            let pipe_slice =
1110                unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
1111
1112            // Copy bytes from the pipe buffer into `dest`.
1113            let n = dest.write(pipe_slice)?;
1114            count += n;
1115
1116            // If pipe is empty or `dest` is full, return.
1117            if n == 0 {
1118                return Poll::Ready(Ok(count));
1119            }
1120
1121            // Move the head forward.
1122            if self.head + n < 2 * cap {
1123                self.head += n;
1124            } else {
1125                self.head = 0;
1126            }
1127
1128            // Store the current head index.
1129            self.inner.head.store(self.head, Ordering::Release);
1130
1131            // Wake the writer because the pipe is not full.
1132            self.inner.writer.wake();
1133        }
1134    }
1135}
1136
1137impl Writer {
1138    /// Reads bytes from blocking `src` and writes into this writer.
1139    fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>> {
1140        // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
1141        if self.inner.closed.load(Ordering::Relaxed) {
1142            return Poll::Ready(Ok(0));
1143        }
1144
1145        // Calculates the distance between two indices.
1146        let cap = self.inner.cap;
1147        let distance = |a: usize, b: usize| {
1148            if a <= b {
1149                b - a
1150            } else {
1151                2 * cap - (a - b)
1152            }
1153        };
1154
1155        // If the pipe appears to be full...
1156        if distance(self.head, self.tail) == cap {
1157            // Reload the head in case it's become stale.
1158            self.head = self.inner.head.load(Ordering::Acquire);
1159
1160            // If the pipe is now really empty...
1161            if distance(self.head, self.tail) == cap {
1162                // Register the waker.
1163                self.inner.writer.register(cx.waker());
1164                atomic::fence(Ordering::SeqCst);
1165
1166                // Reload the head after registering the waker.
1167                self.head = self.inner.head.load(Ordering::Acquire);
1168
1169                // If the pipe is still full...
1170                if distance(self.head, self.tail) == cap {
1171                    // Check whether the pipe is closed or just full.
1172                    if self.inner.closed.load(Ordering::Relaxed) {
1173                        return Poll::Ready(Ok(0));
1174                    } else {
1175                        return Poll::Pending;
1176                    }
1177                }
1178            }
1179        }
1180
1181        // The pipe is not full so remove the waker.
1182        self.inner.writer.take();
1183
1184        // Yield with some small probability - this improves fairness.
1185        ready!(maybe_yield(cx));
1186
1187        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1188        let real_index = |i: usize| {
1189            if i < cap {
1190                i
1191            } else {
1192                i - cap
1193            }
1194        };
1195
1196        // Number of bytes written so far.
1197        let mut count = 0;
1198
1199        loop {
1200            // Calculate how many bytes to write in this iteration.
1201            let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
1202                .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1203                .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
1204                .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
1205
1206            // Create a slice of available space in the pipe buffer.
1207            let pipe_slice_mut = unsafe {
1208                let from = real_index(self.tail);
1209                let to = from + n;
1210
1211                // Make sure all bytes in the slice are initialized.
1212                if self.zeroed_until < to {
1213                    self.inner
1214                        .buffer
1215                        .add(self.zeroed_until)
1216                        .write_bytes(0u8, to - self.zeroed_until);
1217                    self.zeroed_until = to;
1218                }
1219
1220                slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1221            };
1222
1223            // Copy bytes from `src` into the piper buffer.
1224            let n = src.read(pipe_slice_mut)?;
1225            count += n;
1226
1227            // If the pipe is full or closed, or `src` is empty, return.
1228            if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1229                return Poll::Ready(Ok(count));
1230            }
1231
1232            // Move the tail forward.
1233            if self.tail + n < 2 * cap {
1234                self.tail += n;
1235            } else {
1236                self.tail = 0;
1237            }
1238
1239            // Store the current tail index.
1240            self.inner.tail.store(self.tail, Ordering::Release);
1241
1242            // Wake the reader because the pipe is not empty.
1243            self.inner.reader.wake();
1244        }
1245    }
1246}
1247
1248/// Yield with some small probability.
1249fn maybe_yield(cx: &mut Context<'_>) -> Poll<()> {
1250    if fastrand::usize(..100) == 0 {
1251        cx.waker().wake_by_ref();
1252        Poll::Pending
1253    } else {
1254        Poll::Ready(())
1255    }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use super::*;
1261    #[test]
1262    fn test_max_threads() {
1263        // properly set env var
1264        env::set_var(MAX_THREADS_ENV, "100");
1265        assert_eq!(100, Executor::max_threads());
1266
1267        // passed value below minimum, so we set it to minimum
1268        env::set_var(MAX_THREADS_ENV, "0");
1269        assert_eq!(1, Executor::max_threads());
1270
1271        // passed value above maximum, so we set to allowed maximum
1272        env::set_var(MAX_THREADS_ENV, "50000");
1273        assert_eq!(10000, Executor::max_threads());
1274
1275        // no env var, use default
1276        env::set_var(MAX_THREADS_ENV, "");
1277        assert_eq!(500, Executor::max_threads());
1278
1279        // not a number, use default
1280        env::set_var(MAX_THREADS_ENV, "NOTINT");
1281        assert_eq!(500, Executor::max_threads());
1282    }
1283}