blocking/lib.rs
1//! A thread pool for isolating blocking I/O in async programs.
2//!
3//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5//! solutions, they're not always available or ideal.
6//!
7//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8//! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9//! current number of running I/O jobs.
10//!
11//! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12//! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13//! next job or shuts down after a certain timeout.
14//!
15//! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment
16//! variable with value between 1 and 10000.
17//!
18//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
19//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
20//! [io_uring]: https://lwn.net/Articles/776703
21//!
22//! # Examples
23//!
24//! Read the contents of a file:
25//!
26//! ```no_run
27//! use blocking::unblock;
28//! use std::fs;
29//!
30//! # futures_lite::future::block_on(async {
31//! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
32//! println!("{}", contents);
33//! # std::io::Result::Ok(()) });
34//! ```
35//!
36//! Read a file and pipe its contents to stdout:
37//!
38//! ```no_run
39//! use blocking::{unblock, Unblock};
40//! use futures_lite::io;
41//! use std::fs::File;
42//!
43//! # futures_lite::future::block_on(async {
44//! let input = unblock(|| File::open("file.txt")).await?;
45//! let input = Unblock::new(input);
46//! let mut output = Unblock::new(std::io::stdout());
47//!
48//! io::copy(input, &mut output).await?;
49//! # std::io::Result::Ok(()) });
50//! ```
51//!
52//! Iterate over the contents of a directory:
53//!
54//! ```no_run
55//! use blocking::Unblock;
56//! use futures_lite::prelude::*;
57//! use std::fs;
58//!
59//! # futures_lite::future::block_on(async {
60//! let mut dir = Unblock::new(fs::read_dir(".")?);
61//! while let Some(item) = dir.next().await {
62//! println!("{}", item?.file_name().to_string_lossy());
63//! }
64//! # std::io::Result::Ok(()) });
65//! ```
66//!
67//! Spawn a process:
68//!
69//! ```no_run
70//! use blocking::unblock;
71//! use std::process::Command;
72//!
73//! # futures_lite::future::block_on(async {
74//! let out = unblock(|| Command::new("dir").output()).await?;
75//! # std::io::Result::Ok(()) });
76//! ```
77
78#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
79
80use std::any::Any;
81use std::collections::VecDeque;
82use std::env;
83use std::fmt;
84use std::io::{self, Read, Seek, SeekFrom, Write};
85use std::mem;
86use std::panic;
87use std::pin::Pin;
88use std::slice;
89use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
90use std::sync::{Arc, Condvar, Mutex, MutexGuard};
91use std::task::{Context, Poll};
92use std::thread;
93use std::time::Duration;
94
95use async_channel::{bounded, Receiver};
96use async_lock::OnceCell;
97use async_task::Runnable;
98use atomic_waker::AtomicWaker;
99use futures_lite::{future, prelude::*, ready};
100
101#[doc(no_inline)]
102pub use async_task::Task;
103
104/// Default value for max threads that Executor can grow to
105const DEFAULT_MAX_THREADS: usize = 500;
106
107/// Minimum value for max threads config
108const MIN_MAX_THREADS: usize = 1;
109
110/// Maximum value for max threads config
111const MAX_MAX_THREADS: usize = 10000;
112
113/// Env variable that allows to override default value for max threads.
114const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
115
116/// The blocking executor.
117struct Executor {
118 /// Inner state of the executor.
119 inner: Mutex<Inner>,
120
121 /// Used to put idle threads to sleep and wake them up when new work comes in.
122 cvar: Condvar,
123
124 /// Maximum number of threads in the pool
125 thread_limit: usize,
126}
127
128/// Inner state of the blocking executor.
129struct Inner {
130 /// Number of idle threads in the pool.
131 ///
132 /// Idle threads are sleeping, waiting to get a task to run.
133 idle_count: usize,
134
135 /// Total number of threads in the pool.
136 ///
137 /// This is the number of idle threads + the number of active threads.
138 thread_count: usize,
139
140 /// The queue of blocking tasks.
141 queue: VecDeque<Runnable>,
142}
143
144impl Executor {
145 fn max_threads() -> usize {
146 match env::var(MAX_THREADS_ENV) {
147 Ok(v) => v
148 .parse::<usize>()
149 .map(|v| v.max(MIN_MAX_THREADS).min(MAX_MAX_THREADS))
150 .unwrap_or(DEFAULT_MAX_THREADS),
151 Err(_) => DEFAULT_MAX_THREADS,
152 }
153 }
154
155 /// Spawns a future onto this executor.
156 ///
157 /// Returns a [`Task`] handle for the spawned task.
158 fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
159 static EXECUTOR: OnceCell<Executor> = OnceCell::new();
160
161 let (runnable, task) = async_task::spawn(future, |r| {
162 // Initialize the executor if we haven't already.
163 let executor = EXECUTOR.get_or_init_blocking(|| {
164 let thread_limit = Self::max_threads();
165 Executor {
166 inner: Mutex::new(Inner {
167 idle_count: 0,
168 thread_count: 0,
169 queue: VecDeque::new(),
170 }),
171 cvar: Condvar::new(),
172 thread_limit,
173 }
174 });
175
176 // Schedule the task on our executor.
177 executor.schedule(r)
178 });
179 runnable.schedule();
180 task
181 }
182
183 /// Runs the main loop on the current thread.
184 ///
185 /// This function runs blocking tasks until it becomes idle and times out.
186 fn main_loop(&'static self) {
187 let mut inner = self.inner.lock().unwrap();
188 loop {
189 // This thread is not idle anymore because it's going to run tasks.
190 inner.idle_count -= 1;
191
192 // Run tasks in the queue.
193 while let Some(runnable) = inner.queue.pop_front() {
194 // We have found a task - grow the pool if needed.
195 self.grow_pool(inner);
196
197 // Run the task.
198 panic::catch_unwind(|| runnable.run()).ok();
199
200 // Re-lock the inner state and continue.
201 inner = self.inner.lock().unwrap();
202 }
203
204 // This thread is now becoming idle.
205 inner.idle_count += 1;
206
207 // Put the thread to sleep until another task is scheduled.
208 let timeout = Duration::from_millis(500);
209 let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
210 inner = lock;
211
212 // If there are no tasks after a while, stop this thread.
213 if res.timed_out() && inner.queue.is_empty() {
214 inner.idle_count -= 1;
215 inner.thread_count -= 1;
216 break;
217 }
218 }
219 }
220
221 /// Schedules a runnable task for execution.
222 fn schedule(&'static self, runnable: Runnable) {
223 let mut inner = self.inner.lock().unwrap();
224 inner.queue.push_back(runnable);
225
226 // Notify a sleeping thread and spawn more threads if needed.
227 self.cvar.notify_one();
228 self.grow_pool(inner);
229 }
230
231 /// Spawns more blocking threads if the pool is overloaded with work.
232 fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
233 // If runnable tasks greatly outnumber idle threads and there aren't too many threads
234 // already, then be aggressive: wake all idle threads and spawn one more thread.
235 while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < self.thread_limit {
236 // The new thread starts in idle state.
237 inner.idle_count += 1;
238 inner.thread_count += 1;
239
240 // Notify all existing idle threads because we need to hurry up.
241 self.cvar.notify_all();
242
243 // Generate a new thread ID.
244 static ID: AtomicUsize = AtomicUsize::new(1);
245 let id = ID.fetch_add(1, Ordering::Relaxed);
246
247 // Spawn the new thread.
248 thread::Builder::new()
249 .name(format!("blocking-{}", id))
250 .spawn(move || self.main_loop())
251 .unwrap();
252 }
253 }
254}
255
256/// Runs blocking code on a thread pool.
257///
258/// # Examples
259///
260/// Read the contents of a file:
261///
262/// ```no_run
263/// use blocking::unblock;
264/// use std::fs;
265///
266/// # futures_lite::future::block_on(async {
267/// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
268/// # std::io::Result::Ok(()) });
269/// ```
270///
271/// Spawn a process:
272///
273/// ```no_run
274/// use blocking::unblock;
275/// use std::process::Command;
276///
277/// # futures_lite::future::block_on(async {
278/// let out = unblock(|| Command::new("dir").output()).await?;
279/// # std::io::Result::Ok(()) });
280/// ```
281pub fn unblock<T, F>(f: F) -> Task<T>
282where
283 F: FnOnce() -> T + Send + 'static,
284 T: Send + 'static,
285{
286 Executor::spawn(async move { f() })
287}
288
289/// Runs blocking I/O on a thread pool.
290///
291/// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
292/// special thread pool while exposing a familiar async interface.
293///
294/// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
295/// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
296///
297/// # Caveats
298///
299/// [`Unblock`] is a low-level primitive, and as such it comes with some caveats.
300///
301/// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or
302/// [`async-process`] (on Windows).
303///
304/// [`async-fs`]: https://github.com/smol-rs/async-fs
305/// [`async-process`]: https://github.com/smol-rs/async-process
306///
307/// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
308/// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading
309/// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it
310/// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them
311/// into the I/O handle.
312///
313/// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
314///
315/// ### Reading
316///
317/// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it,
318/// and then drop it, a blocked read operation may keep hanging on the thread pool. The next
319/// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult
320/// problem to solve, so make sure you only use a single stdin handle for the duration of the
321/// entire program.
322///
323/// ### Writing
324///
325/// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
326/// [`Unblock`] handle or some buffered data might get lost.
327///
328/// ### Seeking
329///
330/// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single
331/// read operation may move the file cursor farther than is the span of the operation. In fact,
332/// reading just keeps going in the background until the pipe gets full. Keep this mind when
333/// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets.
334///
335/// # Examples
336///
337/// ```
338/// use blocking::Unblock;
339/// use futures_lite::prelude::*;
340///
341/// # futures_lite::future::block_on(async {
342/// let mut stdout = Unblock::new(std::io::stdout());
343/// stdout.write_all(b"Hello world!").await?;
344/// stdout.flush().await?;
345/// # std::io::Result::Ok(()) });
346/// ```
347pub struct Unblock<T> {
348 state: State<T>,
349 cap: Option<usize>,
350}
351
352impl<T> Unblock<T> {
353 /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
354 ///
355 /// # Examples
356 ///
357 /// ```no_run
358 /// use blocking::Unblock;
359 ///
360 /// let stdin = Unblock::new(std::io::stdin());
361 /// ```
362 pub fn new(io: T) -> Unblock<T> {
363 Unblock {
364 state: State::Idle(Some(Box::new(io))),
365 cap: None,
366 }
367 }
368
369 /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
370 /// capacity.
371 ///
372 /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
373 /// transferred between blocking and async code goes through a buffer of limited capacity. This
374 /// constructor configures that capacity.
375 ///
376 /// The default capacity is:
377 ///
378 /// * For [`Iterator`] types: 8192 items.
379 /// * For [`Read`]/[`Write`] types: 8 MB.
380 ///
381 /// # Examples
382 ///
383 /// ```no_run
384 /// use blocking::Unblock;
385 ///
386 /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
387 /// ```
388 pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
389 Unblock {
390 state: State::Idle(Some(Box::new(io))),
391 cap: Some(cap),
392 }
393 }
394
395 /// Gets a mutable reference to the blocking I/O handle.
396 ///
397 /// This is an async method because the I/O handle might be on the thread pool and needs to
398 /// be moved onto the current thread before we can get a reference to it.
399 ///
400 /// # Examples
401 ///
402 /// ```no_run
403 /// use blocking::{unblock, Unblock};
404 /// use std::fs::File;
405 ///
406 /// # futures_lite::future::block_on(async {
407 /// let file = unblock(|| File::create("file.txt")).await?;
408 /// let mut file = Unblock::new(file);
409 ///
410 /// let metadata = file.get_mut().await.metadata()?;
411 /// # std::io::Result::Ok(()) });
412 /// ```
413 pub async fn get_mut(&mut self) -> &mut T {
414 // Wait for the running task to stop and ignore I/O errors if there are any.
415 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
416
417 // Assume idle state and get a reference to the inner value.
418 match &mut self.state {
419 State::Idle(t) => t.as_mut().expect("inner value was taken out"),
420 State::WithMut(..)
421 | State::Streaming(..)
422 | State::Reading(..)
423 | State::Writing(..)
424 | State::Seeking(..) => {
425 unreachable!("when stopped, the state machine must be in idle state");
426 }
427 }
428 }
429
430 /// Performs a blocking operation on the I/O handle.
431 ///
432 /// # Examples
433 ///
434 /// ```no_run
435 /// use blocking::{unblock, Unblock};
436 /// use std::fs::File;
437 ///
438 /// # futures_lite::future::block_on(async {
439 /// let file = unblock(|| File::create("file.txt")).await?;
440 /// let mut file = Unblock::new(file);
441 ///
442 /// let metadata = file.with_mut(|f| f.metadata()).await?;
443 /// # std::io::Result::Ok(()) });
444 /// ```
445 pub async fn with_mut<R, F>(&mut self, op: F) -> R
446 where
447 F: FnOnce(&mut T) -> R + Send + 'static,
448 R: Send + 'static,
449 T: Send + 'static,
450 {
451 // Wait for the running task to stop and ignore I/O errors if there are any.
452 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
453
454 // Assume idle state and take out the inner value.
455 let mut t = match &mut self.state {
456 State::Idle(t) => t.take().expect("inner value was taken out"),
457 State::WithMut(..)
458 | State::Streaming(..)
459 | State::Reading(..)
460 | State::Writing(..)
461 | State::Seeking(..) => {
462 unreachable!("when stopped, the state machine must be in idle state");
463 }
464 };
465
466 let (sender, receiver) = bounded(1);
467 let task = Executor::spawn(async move {
468 sender.try_send(op(&mut t)).ok();
469 t
470 });
471 self.state = State::WithMut(task);
472
473 receiver
474 .recv()
475 .await
476 .expect("`Unblock::with_mut()` operation has panicked")
477 }
478
479 /// Extracts the inner blocking I/O handle.
480 ///
481 /// This is an async method because the I/O handle might be on the thread pool and needs to
482 /// be moved onto the current thread before we can extract it.
483 ///
484 /// # Examples
485 ///
486 /// ```no_run
487 /// use blocking::{unblock, Unblock};
488 /// use futures_lite::prelude::*;
489 /// use std::fs::File;
490 ///
491 /// # futures_lite::future::block_on(async {
492 /// let file = unblock(|| File::create("file.txt")).await?;
493 /// let file = Unblock::new(file);
494 ///
495 /// let file = file.into_inner().await;
496 /// # std::io::Result::Ok(()) });
497 /// ```
498 pub async fn into_inner(self) -> T {
499 // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
500 // bind `self` to a local mutable variable.
501 let mut this = self;
502
503 // Wait for the running task to stop and ignore I/O errors if there are any.
504 future::poll_fn(|cx| this.poll_stop(cx)).await.ok();
505
506 // Assume idle state and extract the inner value.
507 match &mut this.state {
508 State::Idle(t) => *t.take().expect("inner value was taken out"),
509 State::WithMut(..)
510 | State::Streaming(..)
511 | State::Reading(..)
512 | State::Writing(..)
513 | State::Seeking(..) => {
514 unreachable!("when stopped, the state machine must be in idle state");
515 }
516 }
517 }
518
519 /// Waits for the running task to stop.
520 ///
521 /// On success, the state machine is moved into the idle state.
522 fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
523 loop {
524 match &mut self.state {
525 State::Idle(_) => return Poll::Ready(Ok(())),
526
527 State::WithMut(task) => {
528 // Poll the task to wait for it to finish.
529 let io = ready!(Pin::new(task).poll(cx));
530 self.state = State::Idle(Some(io));
531 }
532
533 State::Streaming(any, task) => {
534 // Drop the receiver to close the channel. This stops the `send()` operation in
535 // the task, after which the task returns the iterator back.
536 any.take();
537
538 // Poll the task to retrieve the iterator.
539 let iter = ready!(Pin::new(task).poll(cx));
540 self.state = State::Idle(Some(iter));
541 }
542
543 State::Reading(reader, task) => {
544 // Drop the reader to close the pipe. This stops copying inside the task, after
545 // which the task returns the I/O handle back.
546 reader.take();
547
548 // Poll the task to retrieve the I/O handle.
549 let (res, io) = ready!(Pin::new(task).poll(cx));
550 // Make sure to move into the idle state before reporting errors.
551 self.state = State::Idle(Some(io));
552 res?;
553 }
554
555 State::Writing(writer, task) => {
556 // Drop the writer to close the pipe. This stops copying inside the task, after
557 // which the task flushes the I/O handle and
558 writer.take();
559
560 // Poll the task to retrieve the I/O handle.
561 let (res, io) = ready!(Pin::new(task).poll(cx));
562 // Make sure to move into the idle state before reporting errors.
563 self.state = State::Idle(Some(io));
564 res?;
565 }
566
567 State::Seeking(task) => {
568 // Poll the task to wait for it to finish.
569 let (_, res, io) = ready!(Pin::new(task).poll(cx));
570 // Make sure to move into the idle state before reporting errors.
571 self.state = State::Idle(Some(io));
572 res?;
573 }
574 }
575 }
576 }
577}
578
579impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
580 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581 struct Closed;
582 impl fmt::Debug for Closed {
583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584 f.write_str("<closed>")
585 }
586 }
587
588 struct Blocked;
589 impl fmt::Debug for Blocked {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 f.write_str("<blocked>")
592 }
593 }
594
595 match &self.state {
596 State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
597 State::Idle(Some(io)) => {
598 let io: &T = io;
599 f.debug_struct("Unblock").field("io", io).finish()
600 }
601 State::WithMut(..)
602 | State::Streaming(..)
603 | State::Reading(..)
604 | State::Writing(..)
605 | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
606 }
607 }
608}
609
610/// Current state of a blocking task.
611enum State<T> {
612 /// There is no blocking task.
613 ///
614 /// The inner value is readily available, unless it has already been extracted. The value is
615 /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
616 /// [`Unblock`].
617 Idle(Option<Box<T>>),
618
619 /// A [`Unblock::with_mut()`] closure was spawned and is still running.
620 WithMut(Task<Box<T>>),
621
622 /// The inner value is an [`Iterator`] currently iterating in a task.
623 ///
624 /// The `dyn Any` value here is a `mpsc::Receiver<<T as Iterator>::Item>`.
625 Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>),
626
627 /// The inner value is a [`Read`] currently reading in a task.
628 Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
629
630 /// The inner value is a [`Write`] currently writing in a task.
631 Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
632
633 /// The inner value is a [`Seek`] currently seeking in a task.
634 Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
635}
636
637impl<T: Iterator + Send + 'static> Stream for Unblock<T>
638where
639 T::Item: Send + 'static,
640{
641 type Item = T::Item;
642
643 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
644 loop {
645 match &mut self.state {
646 // If not in idle or active streaming state, stop the running task.
647 State::WithMut(..)
648 | State::Streaming(None, _)
649 | State::Reading(..)
650 | State::Writing(..)
651 | State::Seeking(..) => {
652 // Wait for the running task to stop.
653 ready!(self.poll_stop(cx)).ok();
654 }
655
656 // If idle, start a streaming task.
657 State::Idle(iter) => {
658 // Take the iterator out to run it on a blocking task.
659 let mut iter = iter.take().expect("inner iterator was taken out");
660
661 // This channel capacity seems to work well in practice. If it's too low, there
662 // will be too much synchronization between tasks. If too high, memory
663 // consumption increases.
664 let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
665
666 // Spawn a blocking task that runs the iterator and returns it when done.
667 let task = Executor::spawn(async move {
668 for item in &mut iter {
669 if sender.send(item).await.is_err() {
670 break;
671 }
672 }
673 iter
674 });
675
676 // Move into the busy state and poll again.
677 self.state = State::Streaming(Some(Box::new(receiver)), task);
678 }
679
680 // If streaming, receive an item.
681 State::Streaming(Some(any), task) => {
682 let receiver = any.downcast_mut::<Receiver<T::Item>>().unwrap();
683
684 // Poll the channel.
685 let opt = ready!(Pin::new(receiver).poll_next(cx));
686
687 // If the channel is closed, retrieve the iterator back from the blocking task.
688 // This is not really a required step, but it's cleaner to drop the iterator on
689 // the same thread that created it.
690 if opt.is_none() {
691 // Poll the task to retrieve the iterator.
692 let iter = ready!(Pin::new(task).poll(cx));
693 self.state = State::Idle(Some(iter));
694 }
695
696 return Poll::Ready(opt);
697 }
698 }
699 }
700 }
701}
702
703impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
704 fn poll_read(
705 mut self: Pin<&mut Self>,
706 cx: &mut Context<'_>,
707 buf: &mut [u8],
708 ) -> Poll<io::Result<usize>> {
709 loop {
710 match &mut self.state {
711 // If not in idle or active reading state, stop the running task.
712 State::WithMut(..)
713 | State::Reading(None, _)
714 | State::Streaming(..)
715 | State::Writing(..)
716 | State::Seeking(..) => {
717 // Wait for the running task to stop.
718 ready!(self.poll_stop(cx))?;
719 }
720
721 // If idle, start a reading task.
722 State::Idle(io) => {
723 // Take the I/O handle out to read it on a blocking task.
724 let mut io = io.take().expect("inner value was taken out");
725
726 // This pipe capacity seems to work well in practice. If it's too low, there
727 // will be too much synchronization between tasks. If too high, memory
728 // consumption increases.
729 let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
730
731 // Spawn a blocking task that reads and returns the I/O handle when done.
732 let task = Executor::spawn(async move {
733 // Copy bytes from the I/O handle into the pipe until the pipe is closed or
734 // an error occurs.
735 loop {
736 match future::poll_fn(|cx| writer.fill(cx, &mut io)).await {
737 Ok(0) => return (Ok(()), io),
738 Ok(_) => {}
739 Err(err) => return (Err(err), io),
740 }
741 }
742 });
743
744 // Move into the busy state and poll again.
745 self.state = State::Reading(Some(reader), task);
746 }
747
748 // If reading, read bytes from the pipe.
749 State::Reading(Some(reader), task) => {
750 // Poll the pipe.
751 let n = ready!(reader.drain(cx, buf))?;
752
753 // If the pipe is closed, retrieve the I/O handle back from the blocking task.
754 // This is not really a required step, but it's cleaner to drop the handle on
755 // the same thread that created it.
756 if n == 0 {
757 // Poll the task to retrieve the I/O handle.
758 let (res, io) = ready!(Pin::new(task).poll(cx));
759 // Make sure to move into the idle state before reporting errors.
760 self.state = State::Idle(Some(io));
761 res?;
762 }
763
764 return Poll::Ready(Ok(n));
765 }
766 }
767 }
768 }
769}
770
771impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
772 fn poll_write(
773 mut self: Pin<&mut Self>,
774 cx: &mut Context<'_>,
775 buf: &[u8],
776 ) -> Poll<io::Result<usize>> {
777 loop {
778 match &mut self.state {
779 // If not in idle or active writing state, stop the running task.
780 State::WithMut(..)
781 | State::Writing(None, _)
782 | State::Streaming(..)
783 | State::Reading(..)
784 | State::Seeking(..) => {
785 // Wait for the running task to stop.
786 ready!(self.poll_stop(cx))?;
787 }
788
789 // If idle, start the writing task.
790 State::Idle(io) => {
791 // Take the I/O handle out to write on a blocking task.
792 let mut io = io.take().expect("inner value was taken out");
793
794 // This pipe capacity seems to work well in practice. If it's too low, there will
795 // be too much synchronization between tasks. If too high, memory consumption
796 // increases.
797 let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
798
799 // Spawn a blocking task that writes and returns the I/O handle when done.
800 let task = Executor::spawn(async move {
801 // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
802 // error occurs. Flush the I/O handle at the end.
803 loop {
804 match future::poll_fn(|cx| reader.drain(cx, &mut io)).await {
805 Ok(0) => return (io.flush(), io),
806 Ok(_) => {}
807 Err(err) => {
808 io.flush().ok();
809 return (Err(err), io);
810 }
811 }
812 }
813 });
814
815 // Move into the busy state and poll again.
816 self.state = State::Writing(Some(writer), task);
817 }
818
819 // If writing, write more bytes into the pipe.
820 State::Writing(Some(writer), _) => return writer.fill(cx, buf),
821 }
822 }
823 }
824
825 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
826 loop {
827 match &mut self.state {
828 // If not in idle state, stop the running task.
829 State::WithMut(..)
830 | State::Streaming(..)
831 | State::Writing(..)
832 | State::Reading(..)
833 | State::Seeking(..) => {
834 // Wait for the running task to stop.
835 ready!(self.poll_stop(cx))?;
836 }
837
838 // Idle implies flushed.
839 State::Idle(_) => return Poll::Ready(Ok(())),
840 }
841 }
842 }
843
844 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
845 // First, make sure the I/O handle is flushed.
846 ready!(Pin::new(&mut self).poll_flush(cx))?;
847
848 // Then move into the idle state with no I/O handle, thus dropping it.
849 self.state = State::Idle(None);
850 Poll::Ready(Ok(()))
851 }
852}
853
854impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
855 fn poll_seek(
856 mut self: Pin<&mut Self>,
857 cx: &mut Context<'_>,
858 pos: SeekFrom,
859 ) -> Poll<io::Result<u64>> {
860 loop {
861 match &mut self.state {
862 // If not in idle state, stop the running task.
863 State::WithMut(..)
864 | State::Streaming(..)
865 | State::Reading(..)
866 | State::Writing(..) => {
867 // Wait for the running task to stop.
868 ready!(self.poll_stop(cx))?;
869 }
870
871 State::Idle(io) => {
872 // Take the I/O handle out to seek on a blocking task.
873 let mut io = io.take().expect("inner value was taken out");
874
875 let task = Executor::spawn(async move {
876 let res = io.seek(pos);
877 (pos, res, io)
878 });
879 self.state = State::Seeking(task);
880 }
881
882 State::Seeking(task) => {
883 // Poll the task to wait for it to finish.
884 let (original_pos, res, io) = ready!(Pin::new(task).poll(cx));
885 // Make sure to move into the idle state before reporting errors.
886 self.state = State::Idle(Some(io));
887 let current = res?;
888
889 // If the `pos` argument matches the original one, return the result.
890 if original_pos == pos {
891 return Poll::Ready(Ok(current));
892 }
893 }
894 }
895 }
896 }
897}
898
899/// Creates a bounded single-producer single-consumer pipe.
900///
901/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
902///
903/// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
904/// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
905///
906/// When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
907/// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
908fn pipe(cap: usize) -> (Reader, Writer) {
909 assert!(cap > 0, "capacity must be positive");
910 assert!(cap.checked_mul(2).is_some(), "capacity is too large");
911
912 // Allocate the ring buffer.
913 let mut v = Vec::with_capacity(cap);
914 let buffer = v.as_mut_ptr();
915 mem::forget(v);
916
917 let inner = Arc::new(Pipe {
918 head: AtomicUsize::new(0),
919 tail: AtomicUsize::new(0),
920 reader: AtomicWaker::new(),
921 writer: AtomicWaker::new(),
922 closed: AtomicBool::new(false),
923 buffer,
924 cap,
925 });
926
927 let r = Reader {
928 inner: inner.clone(),
929 head: 0,
930 tail: 0,
931 };
932
933 let w = Writer {
934 inner,
935 head: 0,
936 tail: 0,
937 zeroed_until: 0,
938 };
939
940 (r, w)
941}
942
943/// The reading side of a pipe.
944struct Reader {
945 /// The inner ring buffer.
946 inner: Arc<Pipe>,
947
948 /// The head index, moved by the reader, in the range `0..2*cap`.
949 ///
950 /// This index always matches `inner.head`.
951 head: usize,
952
953 /// The tail index, moved by the writer, in the range `0..2*cap`.
954 ///
955 /// This index is a snapshot of `index.tail` that might become stale at any point.
956 tail: usize,
957}
958
959/// The writing side of a pipe.
960struct Writer {
961 /// The inner ring buffer.
962 inner: Arc<Pipe>,
963
964 /// The head index, moved by the reader, in the range `0..2*cap`.
965 ///
966 /// This index is a snapshot of `index.head` that might become stale at any point.
967 head: usize,
968
969 /// The tail index, moved by the writer, in the range `0..2*cap`.
970 ///
971 /// This index always matches `inner.tail`.
972 tail: usize,
973
974 /// How many bytes at the beginning of the buffer have been zeroed.
975 ///
976 /// The pipe allocates an uninitialized buffer, and we must be careful about passing
977 /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
978 /// expensive, so we zero it in smaller chunks as the writer makes progress.
979 zeroed_until: usize,
980}
981
982unsafe impl Send for Reader {}
983unsafe impl Send for Writer {}
984
985/// The inner ring buffer.
986///
987/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
988/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
989///
990/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
991/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
992/// could mean the pipe is either empty or full, but we don't know which!
993struct Pipe {
994 /// The head index, moved by the reader, in the range `0..2*cap`.
995 head: AtomicUsize,
996
997 /// The tail index, moved by the writer, in the range `0..2*cap`.
998 tail: AtomicUsize,
999
1000 /// A waker representing the blocked reader.
1001 reader: AtomicWaker,
1002
1003 /// A waker representing the blocked writer.
1004 writer: AtomicWaker,
1005
1006 /// Set to `true` if the reader or writer was dropped.
1007 closed: AtomicBool,
1008
1009 /// The byte buffer.
1010 buffer: *mut u8,
1011
1012 /// The buffer capacity.
1013 cap: usize,
1014}
1015
1016unsafe impl Sync for Pipe {}
1017unsafe impl Send for Pipe {}
1018
1019impl Drop for Pipe {
1020 fn drop(&mut self) {
1021 // Deallocate the byte buffer.
1022 unsafe {
1023 Vec::from_raw_parts(self.buffer, 0, self.cap);
1024 }
1025 }
1026}
1027
1028impl Drop for Reader {
1029 fn drop(&mut self) {
1030 // Dropping closes the pipe and then wakes the writer.
1031 self.inner.closed.store(true, Ordering::SeqCst);
1032 self.inner.writer.wake();
1033 }
1034}
1035
1036impl Drop for Writer {
1037 fn drop(&mut self) {
1038 // Dropping closes the pipe and then wakes the reader.
1039 self.inner.closed.store(true, Ordering::SeqCst);
1040 self.inner.reader.wake();
1041 }
1042}
1043
1044impl Reader {
1045 /// Reads bytes from this reader and writes into blocking `dest`.
1046 fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>> {
1047 let cap = self.inner.cap;
1048
1049 // Calculates the distance between two indices.
1050 let distance = |a: usize, b: usize| {
1051 if a <= b {
1052 b - a
1053 } else {
1054 2 * cap - (a - b)
1055 }
1056 };
1057
1058 // If the pipe appears to be empty...
1059 if distance(self.head, self.tail) == 0 {
1060 // Reload the tail in case it's become stale.
1061 self.tail = self.inner.tail.load(Ordering::Acquire);
1062
1063 // If the pipe is now really empty...
1064 if distance(self.head, self.tail) == 0 {
1065 // Register the waker.
1066 self.inner.reader.register(cx.waker());
1067 atomic::fence(Ordering::SeqCst);
1068
1069 // Reload the tail after registering the waker.
1070 self.tail = self.inner.tail.load(Ordering::Acquire);
1071
1072 // If the pipe is still empty...
1073 if distance(self.head, self.tail) == 0 {
1074 // Check whether the pipe is closed or just empty.
1075 if self.inner.closed.load(Ordering::Relaxed) {
1076 return Poll::Ready(Ok(0));
1077 } else {
1078 return Poll::Pending;
1079 }
1080 }
1081 }
1082 }
1083
1084 // The pipe is not empty so remove the waker.
1085 self.inner.reader.take();
1086
1087 // Yield with some small probability - this improves fairness.
1088 ready!(maybe_yield(cx));
1089
1090 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1091 let real_index = |i: usize| {
1092 if i < cap {
1093 i
1094 } else {
1095 i - cap
1096 }
1097 };
1098
1099 // Number of bytes read so far.
1100 let mut count = 0;
1101
1102 loop {
1103 // Calculate how many bytes to read in this iteration.
1104 let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
1105 .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
1106 .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
1107
1108 // Create a slice of data in the pipe buffer.
1109 let pipe_slice =
1110 unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
1111
1112 // Copy bytes from the pipe buffer into `dest`.
1113 let n = dest.write(pipe_slice)?;
1114 count += n;
1115
1116 // If pipe is empty or `dest` is full, return.
1117 if n == 0 {
1118 return Poll::Ready(Ok(count));
1119 }
1120
1121 // Move the head forward.
1122 if self.head + n < 2 * cap {
1123 self.head += n;
1124 } else {
1125 self.head = 0;
1126 }
1127
1128 // Store the current head index.
1129 self.inner.head.store(self.head, Ordering::Release);
1130
1131 // Wake the writer because the pipe is not full.
1132 self.inner.writer.wake();
1133 }
1134 }
1135}
1136
1137impl Writer {
1138 /// Reads bytes from blocking `src` and writes into this writer.
1139 fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>> {
1140 // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
1141 if self.inner.closed.load(Ordering::Relaxed) {
1142 return Poll::Ready(Ok(0));
1143 }
1144
1145 // Calculates the distance between two indices.
1146 let cap = self.inner.cap;
1147 let distance = |a: usize, b: usize| {
1148 if a <= b {
1149 b - a
1150 } else {
1151 2 * cap - (a - b)
1152 }
1153 };
1154
1155 // If the pipe appears to be full...
1156 if distance(self.head, self.tail) == cap {
1157 // Reload the head in case it's become stale.
1158 self.head = self.inner.head.load(Ordering::Acquire);
1159
1160 // If the pipe is now really empty...
1161 if distance(self.head, self.tail) == cap {
1162 // Register the waker.
1163 self.inner.writer.register(cx.waker());
1164 atomic::fence(Ordering::SeqCst);
1165
1166 // Reload the head after registering the waker.
1167 self.head = self.inner.head.load(Ordering::Acquire);
1168
1169 // If the pipe is still full...
1170 if distance(self.head, self.tail) == cap {
1171 // Check whether the pipe is closed or just full.
1172 if self.inner.closed.load(Ordering::Relaxed) {
1173 return Poll::Ready(Ok(0));
1174 } else {
1175 return Poll::Pending;
1176 }
1177 }
1178 }
1179 }
1180
1181 // The pipe is not full so remove the waker.
1182 self.inner.writer.take();
1183
1184 // Yield with some small probability - this improves fairness.
1185 ready!(maybe_yield(cx));
1186
1187 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1188 let real_index = |i: usize| {
1189 if i < cap {
1190 i
1191 } else {
1192 i - cap
1193 }
1194 };
1195
1196 // Number of bytes written so far.
1197 let mut count = 0;
1198
1199 loop {
1200 // Calculate how many bytes to write in this iteration.
1201 let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
1202 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1203 .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
1204 .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
1205
1206 // Create a slice of available space in the pipe buffer.
1207 let pipe_slice_mut = unsafe {
1208 let from = real_index(self.tail);
1209 let to = from + n;
1210
1211 // Make sure all bytes in the slice are initialized.
1212 if self.zeroed_until < to {
1213 self.inner
1214 .buffer
1215 .add(self.zeroed_until)
1216 .write_bytes(0u8, to - self.zeroed_until);
1217 self.zeroed_until = to;
1218 }
1219
1220 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1221 };
1222
1223 // Copy bytes from `src` into the piper buffer.
1224 let n = src.read(pipe_slice_mut)?;
1225 count += n;
1226
1227 // If the pipe is full or closed, or `src` is empty, return.
1228 if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1229 return Poll::Ready(Ok(count));
1230 }
1231
1232 // Move the tail forward.
1233 if self.tail + n < 2 * cap {
1234 self.tail += n;
1235 } else {
1236 self.tail = 0;
1237 }
1238
1239 // Store the current tail index.
1240 self.inner.tail.store(self.tail, Ordering::Release);
1241
1242 // Wake the reader because the pipe is not empty.
1243 self.inner.reader.wake();
1244 }
1245 }
1246}
1247
1248/// Yield with some small probability.
1249fn maybe_yield(cx: &mut Context<'_>) -> Poll<()> {
1250 if fastrand::usize(..100) == 0 {
1251 cx.waker().wake_by_ref();
1252 Poll::Pending
1253 } else {
1254 Poll::Ready(())
1255 }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use super::*;
1261 #[test]
1262 fn test_max_threads() {
1263 // properly set env var
1264 env::set_var(MAX_THREADS_ENV, "100");
1265 assert_eq!(100, Executor::max_threads());
1266
1267 // passed value below minimum, so we set it to minimum
1268 env::set_var(MAX_THREADS_ENV, "0");
1269 assert_eq!(1, Executor::max_threads());
1270
1271 // passed value above maximum, so we set to allowed maximum
1272 env::set_var(MAX_THREADS_ENV, "50000");
1273 assert_eq!(10000, Executor::max_threads());
1274
1275 // no env var, use default
1276 env::set_var(MAX_THREADS_ENV, "");
1277 assert_eq!(500, Executor::max_threads());
1278
1279 // not a number, use default
1280 env::set_var(MAX_THREADS_ENV, "NOTINT");
1281 assert_eq!(500, Executor::max_threads());
1282 }
1283}