use crate::runtime::{EHandle, Time, WakeupTime};
use fuchsia_zircon as zx;
use futures::stream::FusedStream;
use futures::task::{AtomicWaker, Context};
use futures::{FutureExt, Stream};
use std::cmp;
use std::collections::BinaryHeap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::task::Poll;
impl WakeupTime for std::time::Instant {
fn into_time(self) -> Time {
let now_as_instant = std::time::Instant::now();
let now_as_time = Time::now();
now_as_time + self.saturating_duration_since(now_as_instant).into()
}
}
impl WakeupTime for Time {
fn into_time(self) -> Time {
self
}
}
impl WakeupTime for zx::MonotonicTime {
fn into_time(self) -> Time {
self.into()
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Timer {
waker_and_bool: Arc<(AtomicWaker, AtomicBool)>,
}
impl Unpin for Timer {}
impl Timer {
pub fn new<WT>(time: WT) -> Self
where
WT: WakeupTime,
{
let waker_and_bool = Arc::new((AtomicWaker::new(), AtomicBool::new(false)));
let this = Timer { waker_and_bool };
EHandle::register_timer(time.into_time(), this.handle());
this
}
fn handle(&self) -> TimerHandle {
TimerHandle { inner: Arc::downgrade(&self.waker_and_bool) }
}
pub fn reset(&mut self, time: Time) {
assert!(self.did_fire());
self.waker_and_bool.1.store(false, Ordering::SeqCst);
EHandle::register_timer(time, self.handle());
}
fn did_fire(&self) -> bool {
self.waker_and_bool.1.load(Ordering::SeqCst)
}
fn register_task(&self, cx: &mut Context<'_>) {
self.waker_and_bool.0.register(cx.waker());
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.did_fire() {
return Poll::Ready(());
}
self.register_task(cx);
if self.did_fire() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
pub(crate) struct TimerHandle {
inner: Weak<(AtomicWaker, AtomicBool)>,
}
impl TimerHandle {
pub fn is_defunct(&self) -> bool {
self.inner.upgrade().is_none()
}
pub fn wake(&self) {
if let Some(wb) = self.inner.upgrade() {
wb.1.store(true, Ordering::SeqCst);
wb.0.wake();
}
}
}
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Interval {
timer: Timer,
next: Time,
duration: zx::Duration,
}
impl Interval {
pub fn new(duration: zx::Duration) -> Self {
let next = Time::after(duration);
Interval { timer: Timer::new(next), next, duration }
}
}
impl Unpin for Interval {}
impl FusedStream for Interval {
fn is_terminated(&self) -> bool {
false
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
match this.timer.poll_unpin(cx) {
Poll::Ready(()) => {
this.timer.register_task(cx);
this.next += this.duration;
this.timer.reset(this.next);
Poll::Ready(Some(()))
}
Poll::Pending => {
this.timer.register_task(cx);
Poll::Pending
}
}
}
}
#[derive(Default)]
pub(crate) struct TimerHeap {
inner: BinaryHeap<TimeWaker>,
}
impl TimerHeap {
pub fn add_timer(&mut self, time: Time, handle: TimerHandle) {
self.inner.push(TimeWaker { time, handle })
}
pub fn next_deadline(&mut self) -> Option<&TimeWaker> {
while self.inner.peek().map(|t| t.handle.is_defunct()).unwrap_or_default() {
self.inner.pop();
}
self.inner.peek()
}
pub fn pop(&mut self) -> Option<TimeWaker> {
self.inner.pop()
}
}
pub(crate) struct TimeWaker {
time: Time,
handle: TimerHandle,
}
impl TimeWaker {
pub fn wake(&self) {
self.handle.wake();
}
pub fn time(&self) -> Time {
self.time
}
}
impl Ord for TimeWaker {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.time.cmp(&other.time).reverse() }
}
impl PartialOrd for TimeWaker {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimeWaker {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl Eq for TimeWaker {}
#[cfg(test)]
mod test {
use super::*;
use crate::{LocalExecutor, SendExecutor, TestExecutor};
use assert_matches::assert_matches;
use fuchsia_zircon::prelude::*;
use fuchsia_zircon::Duration;
use futures::future::Either;
use futures::prelude::*;
#[test]
fn shorter_fires_first() {
let mut exec = LocalExecutor::new();
let shorter = Timer::new(Time::after(100.millis()));
let longer = Timer::new(Time::after(1.second()));
match exec.run_singlethreaded(future::select(shorter, longer)) {
Either::Left(_) => {}
Either::Right(_) => panic!("wrong timer fired"),
}
}
#[test]
fn shorter_fires_first_multithreaded() {
let mut exec = SendExecutor::new(4);
let shorter = Timer::new(Time::after(100.millis()));
let longer = Timer::new(Time::after(1.second()));
match exec.run(future::select(shorter, longer)) {
Either::Left(_) => {}
Either::Right(_) => panic!("wrong timer fired"),
}
}
#[test]
fn fires_after_timeout() {
let mut exec = TestExecutor::new_with_fake_time();
exec.set_fake_time(Time::from_nanos(0));
let deadline = Time::after(5.seconds());
let mut future = Timer::new(deadline);
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
exec.set_fake_time(deadline);
assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
}
#[test]
fn timer_before_now_fires_immediately() {
let mut exec = TestExecutor::new();
let now = Time::now();
let before = Timer::new(now - Duration::from_nanos(1));
let after = Timer::new(now + Duration::from_nanos(1));
assert_matches!(
exec.run_singlethreaded(futures::future::select(before, after)),
Either::Left(_),
"Timer in the past should fire first"
);
}
#[test]
fn interval() {
let mut exec = TestExecutor::new_with_fake_time();
let start = Time::from_nanos(0);
exec.set_fake_time(start);
let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
let mut future = {
let counter = counter.clone();
Interval::new(5.seconds())
.map(move |()| {
counter.fetch_add(1, Ordering::SeqCst);
})
.collect::<()>()
};
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(0, counter.load(Ordering::SeqCst));
let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
assert!(first_deadline >= 5.seconds() + start);
exec.set_fake_time(first_deadline);
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(1, counter.load(Ordering::SeqCst));
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(1, counter.load(Ordering::SeqCst));
let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
exec.set_fake_time(second_deadline);
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(2, counter.load(Ordering::SeqCst));
assert_eq!(second_deadline, first_deadline + 5.seconds());
}
#[test]
fn timer_fake_time() {
let mut exec = TestExecutor::new_with_fake_time();
exec.set_fake_time(Time::from_nanos(0));
let mut timer = Timer::new(Time::after(1.seconds()));
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
exec.set_fake_time(Time::after(1.seconds()));
assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
}
}