crossbeam_channel/flavors/
at.rs

1//! Channel that delivers a message at a certain moment in time.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
13
14/// Result of a receive operation.
15pub(crate) type AtToken = Option<Instant>;
16
17/// Channel that delivers a message at a certain moment in time
18pub(crate) struct Channel {
19    /// The instant at which the message will be delivered.
20    delivery_time: Instant,
21
22    /// `true` if the message has been received.
23    received: AtomicBool,
24}
25
26impl Channel {
27    /// Creates a channel that delivers a message at a certain instant in time.
28    #[inline]
29    pub(crate) fn new_deadline(when: Instant) -> Self {
30        Channel {
31            delivery_time: when,
32            received: AtomicBool::new(false),
33        }
34    }
35    /// Creates a channel that delivers a message after a certain duration of time.
36    #[inline]
37    pub(crate) fn new_timeout(dur: Duration) -> Self {
38        Self::new_deadline(Instant::now() + dur)
39    }
40
41    /// Attempts to receive a message without blocking.
42    #[inline]
43    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
44        // We use relaxed ordering because this is just an optional optimistic check.
45        if self.received.load(Ordering::Relaxed) {
46            // The message has already been received.
47            return Err(TryRecvError::Empty);
48        }
49
50        if Instant::now() < self.delivery_time {
51            // The message was not delivered yet.
52            return Err(TryRecvError::Empty);
53        }
54
55        // Try receiving the message if it is still available.
56        if !self.received.swap(true, Ordering::SeqCst) {
57            // Success! Return delivery time as the message.
58            Ok(self.delivery_time)
59        } else {
60            // The message was already received.
61            Err(TryRecvError::Empty)
62        }
63    }
64
65    /// Receives a message from the channel.
66    #[inline]
67    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
68        // We use relaxed ordering because this is just an optional optimistic check.
69        if self.received.load(Ordering::Relaxed) {
70            // The message has already been received.
71            utils::sleep_until(deadline);
72            return Err(RecvTimeoutError::Timeout);
73        }
74
75        // Wait until the message is received or the deadline is reached.
76        loop {
77            let now = Instant::now();
78
79            let deadline = match deadline {
80                // Check if we can receive the next message.
81                _ if now >= self.delivery_time => break,
82                // Check if the timeout deadline has been reached.
83                Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
84
85                // Sleep until one of the above happens
86                Some(d) if d < self.delivery_time => d,
87                _ => self.delivery_time,
88            };
89
90            thread::sleep(deadline - now);
91        }
92
93        // Try receiving the message if it is still available.
94        if !self.received.swap(true, Ordering::SeqCst) {
95            // Success! Return the message, which is the instant at which it was delivered.
96            Ok(self.delivery_time)
97        } else {
98            // The message was already received. Block forever.
99            utils::sleep_until(None);
100            unreachable!()
101        }
102    }
103
104    /// Reads a message from the channel.
105    #[inline]
106    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
107        token.at.ok_or(())
108    }
109
110    /// Returns `true` if the channel is empty.
111    #[inline]
112    pub(crate) fn is_empty(&self) -> bool {
113        // We use relaxed ordering because this is just an optional optimistic check.
114        if self.received.load(Ordering::Relaxed) {
115            return true;
116        }
117
118        // If the delivery time hasn't been reached yet, the channel is empty.
119        if Instant::now() < self.delivery_time {
120            return true;
121        }
122
123        // The delivery time has been reached. The channel is empty only if the message has already
124        // been received.
125        self.received.load(Ordering::SeqCst)
126    }
127
128    /// Returns `true` if the channel is full.
129    #[inline]
130    pub(crate) fn is_full(&self) -> bool {
131        !self.is_empty()
132    }
133
134    /// Returns the number of messages in the channel.
135    #[inline]
136    pub(crate) fn len(&self) -> usize {
137        if self.is_empty() {
138            0
139        } else {
140            1
141        }
142    }
143
144    /// Returns the capacity of the channel.
145    #[allow(clippy::unnecessary_wraps)] // This is intentional.
146    #[inline]
147    pub(crate) fn capacity(&self) -> Option<usize> {
148        Some(1)
149    }
150}
151
152impl SelectHandle for Channel {
153    #[inline]
154    fn try_select(&self, token: &mut Token) -> bool {
155        match self.try_recv() {
156            Ok(msg) => {
157                token.at = Some(msg);
158                true
159            }
160            Err(TryRecvError::Disconnected) => {
161                token.at = None;
162                true
163            }
164            Err(TryRecvError::Empty) => false,
165        }
166    }
167
168    #[inline]
169    fn deadline(&self) -> Option<Instant> {
170        // We use relaxed ordering because this is just an optional optimistic check.
171        if self.received.load(Ordering::Relaxed) {
172            None
173        } else {
174            Some(self.delivery_time)
175        }
176    }
177
178    #[inline]
179    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
180        self.is_ready()
181    }
182
183    #[inline]
184    fn unregister(&self, _oper: Operation) {}
185
186    #[inline]
187    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
188        self.try_select(token)
189    }
190
191    #[inline]
192    fn is_ready(&self) -> bool {
193        !self.is_empty()
194    }
195
196    #[inline]
197    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
198        self.is_ready()
199    }
200
201    #[inline]
202    fn unwatch(&self, _oper: Operation) {}
203}