crossbeam_channel/flavors/
tick.rs
1use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::atomic::AtomicCell;
9
10use crate::context::Context;
11use crate::err::{RecvTimeoutError, TryRecvError};
12use crate::select::{Operation, SelectHandle, Token};
13
14pub(crate) type TickToken = Option<Instant>;
16
17pub(crate) struct Channel {
19 delivery_time: AtomicCell<Instant>,
21
22 duration: Duration,
24}
25
26impl Channel {
27 #[inline]
29 pub(crate) fn new(dur: Duration) -> Self {
30 Channel {
31 delivery_time: AtomicCell::new(Instant::now() + dur),
32 duration: dur,
33 }
34 }
35
36 #[inline]
38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39 loop {
40 let now = Instant::now();
41 let delivery_time = self.delivery_time.load();
42
43 if now < delivery_time {
44 return Err(TryRecvError::Empty);
45 }
46
47 if self
48 .delivery_time
49 .compare_exchange(delivery_time, now + self.duration)
50 .is_ok()
51 {
52 return Ok(delivery_time);
53 }
54 }
55 }
56
57 #[inline]
59 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60 loop {
61 let delivery_time = self.delivery_time.load();
62 let now = Instant::now();
63
64 if let Some(d) = deadline {
65 if d < delivery_time {
66 if now < d {
67 thread::sleep(d - now);
68 }
69 return Err(RecvTimeoutError::Timeout);
70 }
71 }
72
73 if self
74 .delivery_time
75 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76 .is_ok()
77 {
78 if now < delivery_time {
79 thread::sleep(delivery_time - now);
80 }
81 return Ok(delivery_time);
82 }
83 }
84 }
85
86 #[inline]
88 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89 token.tick.ok_or(())
90 }
91
92 #[inline]
94 pub(crate) fn is_empty(&self) -> bool {
95 Instant::now() < self.delivery_time.load()
96 }
97
98 #[inline]
100 pub(crate) fn is_full(&self) -> bool {
101 !self.is_empty()
102 }
103
104 #[inline]
106 pub(crate) fn len(&self) -> usize {
107 if self.is_empty() {
108 0
109 } else {
110 1
111 }
112 }
113
114 #[allow(clippy::unnecessary_wraps)] #[inline]
117 pub(crate) fn capacity(&self) -> Option<usize> {
118 Some(1)
119 }
120}
121
122impl SelectHandle for Channel {
123 #[inline]
124 fn try_select(&self, token: &mut Token) -> bool {
125 match self.try_recv() {
126 Ok(msg) => {
127 token.tick = Some(msg);
128 true
129 }
130 Err(TryRecvError::Disconnected) => {
131 token.tick = None;
132 true
133 }
134 Err(TryRecvError::Empty) => false,
135 }
136 }
137
138 #[inline]
139 fn deadline(&self) -> Option<Instant> {
140 Some(self.delivery_time.load())
141 }
142
143 #[inline]
144 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
145 self.is_ready()
146 }
147
148 #[inline]
149 fn unregister(&self, _oper: Operation) {}
150
151 #[inline]
152 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
153 self.try_select(token)
154 }
155
156 #[inline]
157 fn is_ready(&self) -> bool {
158 !self.is_empty()
159 }
160
161 #[inline]
162 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
163 self.is_ready()
164 }
165
166 #[inline]
167 fn unwatch(&self, _oper: Operation) {}
168}