crossbeam_channel/flavors/
tick.rs

1//! Channel that delivers messages periodically.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::atomic::AtomicCell;
9
10use crate::context::Context;
11use crate::err::{RecvTimeoutError, TryRecvError};
12use crate::select::{Operation, SelectHandle, Token};
13
14/// Result of a receive operation.
15pub(crate) type TickToken = Option<Instant>;
16
17/// Channel that delivers messages periodically.
18pub(crate) struct Channel {
19    /// The instant at which the next message will be delivered.
20    delivery_time: AtomicCell<Instant>,
21
22    /// The time interval in which messages get delivered.
23    duration: Duration,
24}
25
26impl Channel {
27    /// Creates a channel that delivers messages periodically.
28    #[inline]
29    pub(crate) fn new(dur: Duration) -> Self {
30        Channel {
31            delivery_time: AtomicCell::new(Instant::now() + dur),
32            duration: dur,
33        }
34    }
35
36    /// Attempts to receive a message without blocking.
37    #[inline]
38    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39        loop {
40            let now = Instant::now();
41            let delivery_time = self.delivery_time.load();
42
43            if now < delivery_time {
44                return Err(TryRecvError::Empty);
45            }
46
47            if self
48                .delivery_time
49                .compare_exchange(delivery_time, now + self.duration)
50                .is_ok()
51            {
52                return Ok(delivery_time);
53            }
54        }
55    }
56
57    /// Receives a message from the channel.
58    #[inline]
59    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60        loop {
61            let delivery_time = self.delivery_time.load();
62            let now = Instant::now();
63
64            if let Some(d) = deadline {
65                if d < delivery_time {
66                    if now < d {
67                        thread::sleep(d - now);
68                    }
69                    return Err(RecvTimeoutError::Timeout);
70                }
71            }
72
73            if self
74                .delivery_time
75                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76                .is_ok()
77            {
78                if now < delivery_time {
79                    thread::sleep(delivery_time - now);
80                }
81                return Ok(delivery_time);
82            }
83        }
84    }
85
86    /// Reads a message from the channel.
87    #[inline]
88    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89        token.tick.ok_or(())
90    }
91
92    /// Returns `true` if the channel is empty.
93    #[inline]
94    pub(crate) fn is_empty(&self) -> bool {
95        Instant::now() < self.delivery_time.load()
96    }
97
98    /// Returns `true` if the channel is full.
99    #[inline]
100    pub(crate) fn is_full(&self) -> bool {
101        !self.is_empty()
102    }
103
104    /// Returns the number of messages in the channel.
105    #[inline]
106    pub(crate) fn len(&self) -> usize {
107        if self.is_empty() {
108            0
109        } else {
110            1
111        }
112    }
113
114    /// Returns the capacity of the channel.
115    #[allow(clippy::unnecessary_wraps)] // This is intentional.
116    #[inline]
117    pub(crate) fn capacity(&self) -> Option<usize> {
118        Some(1)
119    }
120}
121
122impl SelectHandle for Channel {
123    #[inline]
124    fn try_select(&self, token: &mut Token) -> bool {
125        match self.try_recv() {
126            Ok(msg) => {
127                token.tick = Some(msg);
128                true
129            }
130            Err(TryRecvError::Disconnected) => {
131                token.tick = None;
132                true
133            }
134            Err(TryRecvError::Empty) => false,
135        }
136    }
137
138    #[inline]
139    fn deadline(&self) -> Option<Instant> {
140        Some(self.delivery_time.load())
141    }
142
143    #[inline]
144    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
145        self.is_ready()
146    }
147
148    #[inline]
149    fn unregister(&self, _oper: Operation) {}
150
151    #[inline]
152    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
153        self.try_select(token)
154    }
155
156    #[inline]
157    fn is_ready(&self) -> bool {
158        !self.is_empty()
159    }
160
161    #[inline]
162    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
163        self.is_ready()
164    }
165
166    #[inline]
167    fn unwatch(&self, _oper: Operation) {}
168}