1//! Channel that delivers a message at a certain moment in time.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
45use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::{Duration, Instant};
89use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
1314/// Result of a receive operation.
15pub(crate) type AtToken = Option<Instant>;
1617/// Channel that delivers a message at a certain moment in time
18pub(crate) struct Channel {
19/// The instant at which the message will be delivered.
20delivery_time: Instant,
2122/// `true` if the message has been received.
23received: AtomicBool,
24}
2526impl Channel {
27/// Creates a channel that delivers a message at a certain instant in time.
28#[inline]
29pub(crate) fn new_deadline(when: Instant) -> Self {
30 Channel {
31 delivery_time: when,
32 received: AtomicBool::new(false),
33 }
34 }
35/// Creates a channel that delivers a message after a certain duration of time.
36#[inline]
37pub(crate) fn new_timeout(dur: Duration) -> Self {
38Self::new_deadline(Instant::now() + dur)
39 }
4041/// Attempts to receive a message without blocking.
42#[inline]
43pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
44// We use relaxed ordering because this is just an optional optimistic check.
45if self.received.load(Ordering::Relaxed) {
46// The message has already been received.
47return Err(TryRecvError::Empty);
48 }
4950if Instant::now() < self.delivery_time {
51// The message was not delivered yet.
52return Err(TryRecvError::Empty);
53 }
5455// Try receiving the message if it is still available.
56if !self.received.swap(true, Ordering::SeqCst) {
57// Success! Return delivery time as the message.
58Ok(self.delivery_time)
59 } else {
60// The message was already received.
61Err(TryRecvError::Empty)
62 }
63 }
6465/// Receives a message from the channel.
66#[inline]
67pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
68// We use relaxed ordering because this is just an optional optimistic check.
69if self.received.load(Ordering::Relaxed) {
70// The message has already been received.
71utils::sleep_until(deadline);
72return Err(RecvTimeoutError::Timeout);
73 }
7475// Wait until the message is received or the deadline is reached.
76loop {
77let now = Instant::now();
7879let deadline = match deadline {
80// Check if we can receive the next message.
81_ if now >= self.delivery_time => break,
82// Check if the timeout deadline has been reached.
83Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
8485// Sleep until one of the above happens
86Some(d) if d < self.delivery_time => d,
87_ => self.delivery_time,
88 };
8990 thread::sleep(deadline - now);
91 }
9293// Try receiving the message if it is still available.
94if !self.received.swap(true, Ordering::SeqCst) {
95// Success! Return the message, which is the instant at which it was delivered.
96Ok(self.delivery_time)
97 } else {
98// The message was already received. Block forever.
99utils::sleep_until(None);
100unreachable!()
101 }
102 }
103104/// Reads a message from the channel.
105#[inline]
106pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
107 token.at.ok_or(())
108 }
109110/// Returns `true` if the channel is empty.
111#[inline]
112pub(crate) fn is_empty(&self) -> bool {
113// We use relaxed ordering because this is just an optional optimistic check.
114if self.received.load(Ordering::Relaxed) {
115return true;
116 }
117118// If the delivery time hasn't been reached yet, the channel is empty.
119if Instant::now() < self.delivery_time {
120return true;
121 }
122123// The delivery time has been reached. The channel is empty only if the message has already
124 // been received.
125self.received.load(Ordering::SeqCst)
126 }
127128/// Returns `true` if the channel is full.
129#[inline]
130pub(crate) fn is_full(&self) -> bool {
131 !self.is_empty()
132 }
133134/// Returns the number of messages in the channel.
135#[inline]
136pub(crate) fn len(&self) -> usize {
137if self.is_empty() {
1380
139} else {
1401
141}
142 }
143144/// Returns the capacity of the channel.
145#[allow(clippy::unnecessary_wraps)] // This is intentional.
146#[inline]
147pub(crate) fn capacity(&self) -> Option<usize> {
148Some(1)
149 }
150}
151152impl SelectHandle for Channel {
153#[inline]
154fn try_select(&self, token: &mut Token) -> bool {
155match self.try_recv() {
156Ok(msg) => {
157 token.at = Some(msg);
158true
159}
160Err(TryRecvError::Disconnected) => {
161 token.at = None;
162true
163}
164Err(TryRecvError::Empty) => false,
165 }
166 }
167168#[inline]
169fn deadline(&self) -> Option<Instant> {
170// We use relaxed ordering because this is just an optional optimistic check.
171if self.received.load(Ordering::Relaxed) {
172None
173} else {
174Some(self.delivery_time)
175 }
176 }
177178#[inline]
179fn register(&self, _oper: Operation, _cx: &Context) -> bool {
180self.is_ready()
181 }
182183#[inline]
184fn unregister(&self, _oper: Operation) {}
185186#[inline]
187fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
188self.try_select(token)
189 }
190191#[inline]
192fn is_ready(&self) -> bool {
193 !self.is_empty()
194 }
195196#[inline]
197fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
198self.is_ready()
199 }
200201#[inline]
202fn unwatch(&self, _oper: Operation) {}
203}