async_stream/
yielder.rs

1use std::cell::Cell;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::ptr;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
9pub struct Sender<T> {
10    _p: PhantomData<T>,
11}
12
13#[derive(Debug)]
14pub struct Receiver<T> {
15    _p: PhantomData<T>,
16}
17
18pub(crate) struct Enter<'a, T> {
19    _rx: &'a mut Receiver<T>,
20    prev: *mut (),
21}
22
23// Note: It is considered unsound for anyone other than our macros to call
24// this function. This is a private API intended only for calls from our
25// macros, and users should never call it, but some people tend to
26// misinterpret it as fine to call unless it is marked unsafe.
27#[doc(hidden)]
28pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) {
29    let tx = Sender { _p: PhantomData };
30    let rx = Receiver { _p: PhantomData };
31    (tx, rx)
32}
33
34// Tracks the pointer to `Option<T>`.
35//
36// TODO: Ensure wakers match?
37thread_local!(static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut()));
38
39// ===== impl Sender =====
40
41impl<T> Sender<T> {
42    pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
43        Send { value: Some(value) }
44    }
45}
46
47struct Send<T> {
48    value: Option<T>,
49}
50
51impl<T> Unpin for Send<T> {}
52
53impl<T> Future for Send<T> {
54    type Output = ();
55
56    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
57        if self.value.is_none() {
58            return Poll::Ready(());
59        }
60
61        STORE.with(|cell| {
62            let ptr = cell.get() as *mut Option<T>;
63            let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");
64
65            if option_ref.is_none() {
66                *option_ref = self.value.take();
67            }
68
69            Poll::Pending
70        })
71    }
72}
73
74// ===== impl Receiver =====
75
76impl<T> Receiver<T> {
77    pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
78        let prev = STORE.with(|cell| {
79            let prev = cell.get();
80            cell.set(dst as *mut _ as *mut ());
81            prev
82        });
83
84        Enter { _rx: self, prev }
85    }
86}
87
88// ===== impl Enter =====
89
90impl<'a, T> Drop for Enter<'a, T> {
91    fn drop(&mut self) {
92        STORE.with(|cell| cell.set(self.prev));
93    }
94}