use std::mem;
use pin_project_lite::pin_project;
use tokio::sync::watch;
use super::{task, Future, Pin, Poll};
pub(crate) fn channel() -> (Signal, Watch) {
let (tx, rx) = watch::channel(());
(Signal { tx }, Watch { rx })
}
pub(crate) struct Signal {
tx: watch::Sender<()>,
}
pub(crate) struct Draining(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
#[derive(Clone)]
pub(crate) struct Watch {
rx: watch::Receiver<()>,
}
pin_project! {
#[allow(missing_debug_implementations)]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>,
}
}
enum State<F> {
Watch(F),
Draining,
}
impl Signal {
pub(crate) fn drain(self) -> Draining {
let _ = self.tx.send(());
Draining(Box::pin(async move { self.tx.closed().await }))
}
}
impl Future for Draining {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.as_mut().0).poll(cx)
}
}
impl Watch {
pub(crate) fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
where
F: Future,
FN: FnOnce(Pin<&mut F>),
{
let Self { mut rx } = self;
let _rx = rx.clone();
Watching {
future,
state: State::Watch(on_drain),
watch: Box::pin(async move {
let _ = rx.changed().await;
}),
_rx,
}
}
}
impl<F, FN> Future for Watching<F, FN>
where
F: Future,
FN: FnOnce(Pin<&mut F>),
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop {
match mem::replace(me.state, State::Draining) {
State::Watch(on_drain) => {
match Pin::new(&mut me.watch).poll(cx) {
Poll::Ready(()) => {
on_drain(me.future.as_mut());
}
Poll::Pending => {
*me.state = State::Watch(on_drain);
return me.future.poll(cx);
}
}
}
State::Draining => return me.future.poll(cx),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
struct TestMe {
draining: bool,
finished: bool,
poll_cnt: usize,
}
impl Future for TestMe {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
self.poll_cnt += 1;
if self.finished {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[test]
fn watch() {
let mut mock = tokio_test::task::spawn(());
mock.enter(|cx, _| {
let (tx, rx) = channel();
let fut = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let mut watch = rx.watch(fut, |mut fut| {
fut.draining = true;
});
assert_eq!(watch.future.poll_cnt, 0);
assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 1);
assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 2);
let mut draining = tx.drain();
assert!(!watch.future.draining);
assert_eq!(watch.future.poll_cnt, 2);
assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 3);
assert!(watch.future.draining);
assert!(Pin::new(&mut draining).poll(cx).is_pending());
watch.future.finished = true;
assert!(Pin::new(&mut watch).poll(cx).is_ready());
assert_eq!(watch.future.poll_cnt, 4);
drop(watch);
assert!(Pin::new(&mut draining).poll(cx).is_ready());
})
}
#[test]
fn watch_clones() {
let mut mock = tokio_test::task::spawn(());
mock.enter(|cx, _| {
let (tx, rx) = channel();
let fut1 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let fut2 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let watch1 = rx.clone().watch(fut1, |mut fut| {
fut.draining = true;
});
let watch2 = rx.watch(fut2, |mut fut| {
fut.draining = true;
});
let mut draining = tx.drain();
assert!(Pin::new(&mut draining).poll(cx).is_pending());
drop(watch1);
assert!(Pin::new(&mut draining).poll(cx).is_pending());
drop(watch2);
assert!(Pin::new(&mut draining).poll(cx).is_ready());
});
}
}