crossbeam_channel/
channel.rs

1//! The channel interface.
2
3use std::fmt;
4use std::iter::FusedIterator;
5use std::mem;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::context::Context;
11use crate::counter;
12use crate::err::{
13    RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14};
15use crate::flavors;
16use crate::select::{Operation, SelectHandle, Token};
17
18/// Creates a channel of unbounded capacity.
19///
20/// This channel has a growable buffer that can hold any number of messages at a time.
21///
22/// # Examples
23///
24/// ```
25/// use std::thread;
26/// use crossbeam_channel::unbounded;
27///
28/// let (s, r) = unbounded();
29///
30/// // Computes the n-th Fibonacci number.
31/// fn fib(n: i32) -> i32 {
32///     if n <= 1 {
33///         n
34///     } else {
35///         fib(n - 1) + fib(n - 2)
36///     }
37/// }
38///
39/// // Spawn an asynchronous computation.
40/// thread::spawn(move || s.send(fib(20)).unwrap());
41///
42/// // Print the result of the computation.
43/// println!("{}", r.recv().unwrap());
44/// ```
45pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46    let (s, r) = counter::new(flavors::list::Channel::new());
47    let s = Sender {
48        flavor: SenderFlavor::List(s),
49    };
50    let r = Receiver {
51        flavor: ReceiverFlavor::List(r),
52    };
53    (s, r)
54}
55
56/// Creates a channel of bounded capacity.
57///
58/// This channel has a buffer that can hold at most `cap` messages at a time.
59///
60/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61/// receive operations must appear at the same time in order to pair up and pass the message over.
62///
63/// # Examples
64///
65/// A channel of capacity 1:
66///
67/// ```
68/// use std::thread;
69/// use std::time::Duration;
70/// use crossbeam_channel::bounded;
71///
72/// let (s, r) = bounded(1);
73///
74/// // This call returns immediately because there is enough space in the channel.
75/// s.send(1).unwrap();
76///
77/// thread::spawn(move || {
78///     // This call blocks the current thread because the channel is full.
79///     // It will be able to complete only after the first message is received.
80///     s.send(2).unwrap();
81/// });
82///
83/// thread::sleep(Duration::from_secs(1));
84/// assert_eq!(r.recv(), Ok(1));
85/// assert_eq!(r.recv(), Ok(2));
86/// ```
87///
88/// A zero-capacity channel:
89///
90/// ```
91/// use std::thread;
92/// use std::time::Duration;
93/// use crossbeam_channel::bounded;
94///
95/// let (s, r) = bounded(0);
96///
97/// thread::spawn(move || {
98///     // This call blocks the current thread until a receive operation appears
99///     // on the other side of the channel.
100///     s.send(1).unwrap();
101/// });
102///
103/// thread::sleep(Duration::from_secs(1));
104/// assert_eq!(r.recv(), Ok(1));
105/// ```
106pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107    if cap == 0 {
108        let (s, r) = counter::new(flavors::zero::Channel::new());
109        let s = Sender {
110            flavor: SenderFlavor::Zero(s),
111        };
112        let r = Receiver {
113            flavor: ReceiverFlavor::Zero(r),
114        };
115        (s, r)
116    } else {
117        let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118        let s = Sender {
119            flavor: SenderFlavor::Array(s),
120        };
121        let r = Receiver {
122            flavor: ReceiverFlavor::Array(r),
123        };
124        (s, r)
125    }
126}
127
128/// Creates a receiver that delivers a message after a certain duration of time.
129///
130/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132/// sent.
133///
134/// # Examples
135///
136/// Using an `after` channel for timeouts:
137///
138/// ```
139/// use std::time::Duration;
140/// use crossbeam_channel::{after, select, unbounded};
141///
142/// let (s, r) = unbounded::<i32>();
143/// let timeout = Duration::from_millis(100);
144///
145/// select! {
146///     recv(r) -> msg => println!("received {:?}", msg),
147///     recv(after(timeout)) -> _ => println!("timed out"),
148/// }
149/// ```
150///
151/// When the message gets sent:
152///
153/// ```
154/// use std::thread;
155/// use std::time::{Duration, Instant};
156/// use crossbeam_channel::after;
157///
158/// // Converts a number of milliseconds into a `Duration`.
159/// let ms = |ms| Duration::from_millis(ms);
160///
161/// // Returns `true` if `a` and `b` are very close `Instant`s.
162/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
163///
164/// let start = Instant::now();
165/// let r = after(ms(100));
166///
167/// thread::sleep(ms(500));
168///
169/// // This message was sent 100 ms from the start and received 500 ms from the start.
170/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171/// assert!(eq(Instant::now(), start + ms(500)));
172/// ```
173pub fn after(duration: Duration) -> Receiver<Instant> {
174    Receiver {
175        flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
176    }
177}
178
179/// Creates a receiver that delivers a message at a certain instant in time.
180///
181/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
182/// be sent into the channel at the moment in time `when`. The message is the instant at which it
183/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
184/// instantly to the receiver.
185///
186/// # Examples
187///
188/// Using an `at` channel for timeouts:
189///
190/// ```
191/// use std::time::{Instant, Duration};
192/// use crossbeam_channel::{at, select, unbounded};
193///
194/// let (s, r) = unbounded::<i32>();
195/// let deadline = Instant::now() + Duration::from_millis(500);
196///
197/// select! {
198///     recv(r) -> msg => println!("received {:?}", msg),
199///     recv(at(deadline)) -> _ => println!("timed out"),
200/// }
201/// ```
202///
203/// When the message gets sent:
204///
205/// ```
206/// use std::time::{Duration, Instant};
207/// use crossbeam_channel::at;
208///
209/// // Converts a number of milliseconds into a `Duration`.
210/// let ms = |ms| Duration::from_millis(ms);
211///
212/// let start = Instant::now();
213/// let end = start + ms(100);
214///
215/// let r = at(end);
216///
217/// // This message was sent 100 ms from the start
218/// assert_eq!(r.recv().unwrap(), end);
219/// assert!(Instant::now() > start + ms(100));
220/// ```
221pub fn at(when: Instant) -> Receiver<Instant> {
222    Receiver {
223        flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
224    }
225}
226
227/// Creates a receiver that never delivers messages.
228///
229/// The channel is bounded with capacity of 0 and never gets disconnected.
230///
231/// # Examples
232///
233/// Using a `never` channel to optionally add a timeout to [`select!`]:
234///
235/// ```
236/// use std::thread;
237/// use std::time::Duration;
238/// use crossbeam_channel::{after, select, never, unbounded};
239///
240/// let (s, r) = unbounded();
241///
242/// thread::spawn(move || {
243///     thread::sleep(Duration::from_secs(1));
244///     s.send(1).unwrap();
245/// });
246///
247/// // Suppose this duration can be a `Some` or a `None`.
248/// let duration = Some(Duration::from_millis(100));
249///
250/// // Create a channel that times out after the specified duration.
251/// let timeout = duration
252///     .map(|d| after(d))
253///     .unwrap_or(never());
254///
255/// select! {
256///     recv(r) -> msg => assert_eq!(msg, Ok(1)),
257///     recv(timeout) -> _ => println!("timed out"),
258/// }
259/// ```
260pub fn never<T>() -> Receiver<T> {
261    Receiver {
262        flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
263    }
264}
265
266/// Creates a receiver that delivers messages periodically.
267///
268/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
269/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
270/// sent.
271///
272/// # Examples
273///
274/// Using a `tick` channel to periodically print elapsed time:
275///
276/// ```
277/// use std::time::{Duration, Instant};
278/// use crossbeam_channel::tick;
279///
280/// let start = Instant::now();
281/// let ticker = tick(Duration::from_millis(100));
282///
283/// for _ in 0..5 {
284///     ticker.recv().unwrap();
285///     println!("elapsed: {:?}", start.elapsed());
286/// }
287/// ```
288///
289/// When messages get sent:
290///
291/// ```
292/// use std::thread;
293/// use std::time::{Duration, Instant};
294/// use crossbeam_channel::tick;
295///
296/// // Converts a number of milliseconds into a `Duration`.
297/// let ms = |ms| Duration::from_millis(ms);
298///
299/// // Returns `true` if `a` and `b` are very close `Instant`s.
300/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
301///
302/// let start = Instant::now();
303/// let r = tick(ms(100));
304///
305/// // This message was sent 100 ms from the start and received 100 ms from the start.
306/// assert!(eq(r.recv().unwrap(), start + ms(100)));
307/// assert!(eq(Instant::now(), start + ms(100)));
308///
309/// thread::sleep(ms(500));
310///
311/// // This message was sent 200 ms from the start and received 600 ms from the start.
312/// assert!(eq(r.recv().unwrap(), start + ms(200)));
313/// assert!(eq(Instant::now(), start + ms(600)));
314///
315/// // This message was sent 700 ms from the start and received 700 ms from the start.
316/// assert!(eq(r.recv().unwrap(), start + ms(700)));
317/// assert!(eq(Instant::now(), start + ms(700)));
318/// ```
319pub fn tick(duration: Duration) -> Receiver<Instant> {
320    Receiver {
321        flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
322    }
323}
324
325/// The sending side of a channel.
326///
327/// # Examples
328///
329/// ```
330/// use std::thread;
331/// use crossbeam_channel::unbounded;
332///
333/// let (s1, r) = unbounded();
334/// let s2 = s1.clone();
335///
336/// thread::spawn(move || s1.send(1).unwrap());
337/// thread::spawn(move || s2.send(2).unwrap());
338///
339/// let msg1 = r.recv().unwrap();
340/// let msg2 = r.recv().unwrap();
341///
342/// assert_eq!(msg1 + msg2, 3);
343/// ```
344pub struct Sender<T> {
345    flavor: SenderFlavor<T>,
346}
347
348/// Sender flavors.
349enum SenderFlavor<T> {
350    /// Bounded channel based on a preallocated array.
351    Array(counter::Sender<flavors::array::Channel<T>>),
352
353    /// Unbounded channel implemented as a linked list.
354    List(counter::Sender<flavors::list::Channel<T>>),
355
356    /// Zero-capacity channel.
357    Zero(counter::Sender<flavors::zero::Channel<T>>),
358}
359
360unsafe impl<T: Send> Send for Sender<T> {}
361unsafe impl<T: Send> Sync for Sender<T> {}
362
363impl<T> UnwindSafe for Sender<T> {}
364impl<T> RefUnwindSafe for Sender<T> {}
365
366impl<T> Sender<T> {
367    /// Attempts to send a message into the channel without blocking.
368    ///
369    /// This method will either send a message into the channel immediately or return an error if
370    /// the channel is full or disconnected. The returned error contains the original message.
371    ///
372    /// If called on a zero-capacity channel, this method will send the message only if there
373    /// happens to be a receive operation on the other side of the channel at the same time.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// use crossbeam_channel::{bounded, TrySendError};
379    ///
380    /// let (s, r) = bounded(1);
381    ///
382    /// assert_eq!(s.try_send(1), Ok(()));
383    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
384    ///
385    /// drop(r);
386    /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
387    /// ```
388    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
389        match &self.flavor {
390            SenderFlavor::Array(chan) => chan.try_send(msg),
391            SenderFlavor::List(chan) => chan.try_send(msg),
392            SenderFlavor::Zero(chan) => chan.try_send(msg),
393        }
394    }
395
396    /// Blocks the current thread until a message is sent or the channel is disconnected.
397    ///
398    /// If the channel is full and not disconnected, this call will block until the send operation
399    /// can proceed. If the channel becomes disconnected, this call will wake up and return an
400    /// error. The returned error contains the original message.
401    ///
402    /// If called on a zero-capacity channel, this method will wait for a receive operation to
403    /// appear on the other side of the channel.
404    ///
405    /// # Examples
406    ///
407    /// ```
408    /// use std::thread;
409    /// use std::time::Duration;
410    /// use crossbeam_channel::{bounded, SendError};
411    ///
412    /// let (s, r) = bounded(1);
413    /// assert_eq!(s.send(1), Ok(()));
414    ///
415    /// thread::spawn(move || {
416    ///     assert_eq!(r.recv(), Ok(1));
417    ///     thread::sleep(Duration::from_secs(1));
418    ///     drop(r);
419    /// });
420    ///
421    /// assert_eq!(s.send(2), Ok(()));
422    /// assert_eq!(s.send(3), Err(SendError(3)));
423    /// ```
424    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
425        match &self.flavor {
426            SenderFlavor::Array(chan) => chan.send(msg, None),
427            SenderFlavor::List(chan) => chan.send(msg, None),
428            SenderFlavor::Zero(chan) => chan.send(msg, None),
429        }
430        .map_err(|err| match err {
431            SendTimeoutError::Disconnected(msg) => SendError(msg),
432            SendTimeoutError::Timeout(_) => unreachable!(),
433        })
434    }
435
436    /// Waits for a message to be sent into the channel, but only for a limited time.
437    ///
438    /// If the channel is full and not disconnected, this call will block until the send operation
439    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
440    /// wake up and return an error. The returned error contains the original message.
441    ///
442    /// If called on a zero-capacity channel, this method will wait for a receive operation to
443    /// appear on the other side of the channel.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// use std::thread;
449    /// use std::time::Duration;
450    /// use crossbeam_channel::{bounded, SendTimeoutError};
451    ///
452    /// let (s, r) = bounded(0);
453    ///
454    /// thread::spawn(move || {
455    ///     thread::sleep(Duration::from_secs(1));
456    ///     assert_eq!(r.recv(), Ok(2));
457    ///     drop(r);
458    /// });
459    ///
460    /// assert_eq!(
461    ///     s.send_timeout(1, Duration::from_millis(500)),
462    ///     Err(SendTimeoutError::Timeout(1)),
463    /// );
464    /// assert_eq!(
465    ///     s.send_timeout(2, Duration::from_secs(1)),
466    ///     Ok(()),
467    /// );
468    /// assert_eq!(
469    ///     s.send_timeout(3, Duration::from_millis(500)),
470    ///     Err(SendTimeoutError::Disconnected(3)),
471    /// );
472    /// ```
473    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
474        self.send_deadline(msg, Instant::now() + timeout)
475    }
476
477    /// Waits for a message to be sent into the channel, but only until a given deadline.
478    ///
479    /// If the channel is full and not disconnected, this call will block until the send operation
480    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
481    /// wake up and return an error. The returned error contains the original message.
482    ///
483    /// If called on a zero-capacity channel, this method will wait for a receive operation to
484    /// appear on the other side of the channel.
485    ///
486    /// # Examples
487    ///
488    /// ```
489    /// use std::thread;
490    /// use std::time::{Duration, Instant};
491    /// use crossbeam_channel::{bounded, SendTimeoutError};
492    ///
493    /// let (s, r) = bounded(0);
494    ///
495    /// thread::spawn(move || {
496    ///     thread::sleep(Duration::from_secs(1));
497    ///     assert_eq!(r.recv(), Ok(2));
498    ///     drop(r);
499    /// });
500    ///
501    /// let now = Instant::now();
502    ///
503    /// assert_eq!(
504    ///     s.send_deadline(1, now + Duration::from_millis(500)),
505    ///     Err(SendTimeoutError::Timeout(1)),
506    /// );
507    /// assert_eq!(
508    ///     s.send_deadline(2, now + Duration::from_millis(1500)),
509    ///     Ok(()),
510    /// );
511    /// assert_eq!(
512    ///     s.send_deadline(3, now + Duration::from_millis(2000)),
513    ///     Err(SendTimeoutError::Disconnected(3)),
514    /// );
515    /// ```
516    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
517        match &self.flavor {
518            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
519            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
520            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
521        }
522    }
523
524    /// Returns `true` if the channel is empty.
525    ///
526    /// Note: Zero-capacity channels are always empty.
527    ///
528    /// # Examples
529    ///
530    /// ```
531    /// use crossbeam_channel::unbounded;
532    ///
533    /// let (s, r) = unbounded();
534    /// assert!(s.is_empty());
535    ///
536    /// s.send(0).unwrap();
537    /// assert!(!s.is_empty());
538    /// ```
539    pub fn is_empty(&self) -> bool {
540        match &self.flavor {
541            SenderFlavor::Array(chan) => chan.is_empty(),
542            SenderFlavor::List(chan) => chan.is_empty(),
543            SenderFlavor::Zero(chan) => chan.is_empty(),
544        }
545    }
546
547    /// Returns `true` if the channel is full.
548    ///
549    /// Note: Zero-capacity channels are always full.
550    ///
551    /// # Examples
552    ///
553    /// ```
554    /// use crossbeam_channel::bounded;
555    ///
556    /// let (s, r) = bounded(1);
557    ///
558    /// assert!(!s.is_full());
559    /// s.send(0).unwrap();
560    /// assert!(s.is_full());
561    /// ```
562    pub fn is_full(&self) -> bool {
563        match &self.flavor {
564            SenderFlavor::Array(chan) => chan.is_full(),
565            SenderFlavor::List(chan) => chan.is_full(),
566            SenderFlavor::Zero(chan) => chan.is_full(),
567        }
568    }
569
570    /// Returns the number of messages in the channel.
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// use crossbeam_channel::unbounded;
576    ///
577    /// let (s, r) = unbounded();
578    /// assert_eq!(s.len(), 0);
579    ///
580    /// s.send(1).unwrap();
581    /// s.send(2).unwrap();
582    /// assert_eq!(s.len(), 2);
583    /// ```
584    pub fn len(&self) -> usize {
585        match &self.flavor {
586            SenderFlavor::Array(chan) => chan.len(),
587            SenderFlavor::List(chan) => chan.len(),
588            SenderFlavor::Zero(chan) => chan.len(),
589        }
590    }
591
592    /// If the channel is bounded, returns its capacity.
593    ///
594    /// # Examples
595    ///
596    /// ```
597    /// use crossbeam_channel::{bounded, unbounded};
598    ///
599    /// let (s, _) = unbounded::<i32>();
600    /// assert_eq!(s.capacity(), None);
601    ///
602    /// let (s, _) = bounded::<i32>(5);
603    /// assert_eq!(s.capacity(), Some(5));
604    ///
605    /// let (s, _) = bounded::<i32>(0);
606    /// assert_eq!(s.capacity(), Some(0));
607    /// ```
608    pub fn capacity(&self) -> Option<usize> {
609        match &self.flavor {
610            SenderFlavor::Array(chan) => chan.capacity(),
611            SenderFlavor::List(chan) => chan.capacity(),
612            SenderFlavor::Zero(chan) => chan.capacity(),
613        }
614    }
615
616    /// Returns `true` if senders belong to the same channel.
617    ///
618    /// # Examples
619    ///
620    /// ```rust
621    /// use crossbeam_channel::unbounded;
622    ///
623    /// let (s, _) = unbounded::<usize>();
624    ///
625    /// let s2 = s.clone();
626    /// assert!(s.same_channel(&s2));
627    ///
628    /// let (s3, _) = unbounded();
629    /// assert!(!s.same_channel(&s3));
630    /// ```
631    pub fn same_channel(&self, other: &Sender<T>) -> bool {
632        match (&self.flavor, &other.flavor) {
633            (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
634            (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
635            (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
636            _ => false,
637        }
638    }
639}
640
641impl<T> Drop for Sender<T> {
642    fn drop(&mut self) {
643        unsafe {
644            match &self.flavor {
645                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
646                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
647                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
648            }
649        }
650    }
651}
652
653impl<T> Clone for Sender<T> {
654    fn clone(&self) -> Self {
655        let flavor = match &self.flavor {
656            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
657            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
658            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
659        };
660
661        Sender { flavor }
662    }
663}
664
665impl<T> fmt::Debug for Sender<T> {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        f.pad("Sender { .. }")
668    }
669}
670
671/// The receiving side of a channel.
672///
673/// # Examples
674///
675/// ```
676/// use std::thread;
677/// use std::time::Duration;
678/// use crossbeam_channel::unbounded;
679///
680/// let (s, r) = unbounded();
681///
682/// thread::spawn(move || {
683///     let _ = s.send(1);
684///     thread::sleep(Duration::from_secs(1));
685///     let _ = s.send(2);
686/// });
687///
688/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
689/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
690/// ```
691pub struct Receiver<T> {
692    flavor: ReceiverFlavor<T>,
693}
694
695/// Receiver flavors.
696enum ReceiverFlavor<T> {
697    /// Bounded channel based on a preallocated array.
698    Array(counter::Receiver<flavors::array::Channel<T>>),
699
700    /// Unbounded channel implemented as a linked list.
701    List(counter::Receiver<flavors::list::Channel<T>>),
702
703    /// Zero-capacity channel.
704    Zero(counter::Receiver<flavors::zero::Channel<T>>),
705
706    /// The after flavor.
707    At(Arc<flavors::at::Channel>),
708
709    /// The tick flavor.
710    Tick(Arc<flavors::tick::Channel>),
711
712    /// The never flavor.
713    Never(flavors::never::Channel<T>),
714}
715
716unsafe impl<T: Send> Send for Receiver<T> {}
717unsafe impl<T: Send> Sync for Receiver<T> {}
718
719impl<T> UnwindSafe for Receiver<T> {}
720impl<T> RefUnwindSafe for Receiver<T> {}
721
722impl<T> Receiver<T> {
723    /// Attempts to receive a message from the channel without blocking.
724    ///
725    /// This method will either receive a message from the channel immediately or return an error
726    /// if the channel is empty.
727    ///
728    /// If called on a zero-capacity channel, this method will receive a message only if there
729    /// happens to be a send operation on the other side of the channel at the same time.
730    ///
731    /// # Examples
732    ///
733    /// ```
734    /// use crossbeam_channel::{unbounded, TryRecvError};
735    ///
736    /// let (s, r) = unbounded();
737    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
738    ///
739    /// s.send(5).unwrap();
740    /// drop(s);
741    ///
742    /// assert_eq!(r.try_recv(), Ok(5));
743    /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
744    /// ```
745    pub fn try_recv(&self) -> Result<T, TryRecvError> {
746        match &self.flavor {
747            ReceiverFlavor::Array(chan) => chan.try_recv(),
748            ReceiverFlavor::List(chan) => chan.try_recv(),
749            ReceiverFlavor::Zero(chan) => chan.try_recv(),
750            ReceiverFlavor::At(chan) => {
751                let msg = chan.try_recv();
752                unsafe {
753                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
754                        &msg,
755                    )
756                }
757            }
758            ReceiverFlavor::Tick(chan) => {
759                let msg = chan.try_recv();
760                unsafe {
761                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
762                        &msg,
763                    )
764                }
765            }
766            ReceiverFlavor::Never(chan) => chan.try_recv(),
767        }
768    }
769
770    /// Blocks the current thread until a message is received or the channel is empty and
771    /// disconnected.
772    ///
773    /// If the channel is empty and not disconnected, this call will block until the receive
774    /// operation can proceed. If the channel is empty and becomes disconnected, this call will
775    /// wake up and return an error.
776    ///
777    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
778    /// on the other side of the channel.
779    ///
780    /// # Examples
781    ///
782    /// ```
783    /// use std::thread;
784    /// use std::time::Duration;
785    /// use crossbeam_channel::{unbounded, RecvError};
786    ///
787    /// let (s, r) = unbounded();
788    ///
789    /// thread::spawn(move || {
790    ///     thread::sleep(Duration::from_secs(1));
791    ///     s.send(5).unwrap();
792    ///     drop(s);
793    /// });
794    ///
795    /// assert_eq!(r.recv(), Ok(5));
796    /// assert_eq!(r.recv(), Err(RecvError));
797    /// ```
798    pub fn recv(&self) -> Result<T, RecvError> {
799        match &self.flavor {
800            ReceiverFlavor::Array(chan) => chan.recv(None),
801            ReceiverFlavor::List(chan) => chan.recv(None),
802            ReceiverFlavor::Zero(chan) => chan.recv(None),
803            ReceiverFlavor::At(chan) => {
804                let msg = chan.recv(None);
805                unsafe {
806                    mem::transmute_copy::<
807                        Result<Instant, RecvTimeoutError>,
808                        Result<T, RecvTimeoutError>,
809                    >(&msg)
810                }
811            }
812            ReceiverFlavor::Tick(chan) => {
813                let msg = chan.recv(None);
814                unsafe {
815                    mem::transmute_copy::<
816                        Result<Instant, RecvTimeoutError>,
817                        Result<T, RecvTimeoutError>,
818                    >(&msg)
819                }
820            }
821            ReceiverFlavor::Never(chan) => chan.recv(None),
822        }
823        .map_err(|_| RecvError)
824    }
825
826    /// Waits for a message to be received from the channel, but only for a limited time.
827    ///
828    /// If the channel is empty and not disconnected, this call will block until the receive
829    /// operation can proceed or the operation times out. If the channel is empty and becomes
830    /// disconnected, this call will wake up and return an error.
831    ///
832    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
833    /// on the other side of the channel.
834    ///
835    /// # Examples
836    ///
837    /// ```
838    /// use std::thread;
839    /// use std::time::Duration;
840    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
841    ///
842    /// let (s, r) = unbounded();
843    ///
844    /// thread::spawn(move || {
845    ///     thread::sleep(Duration::from_secs(1));
846    ///     s.send(5).unwrap();
847    ///     drop(s);
848    /// });
849    ///
850    /// assert_eq!(
851    ///     r.recv_timeout(Duration::from_millis(500)),
852    ///     Err(RecvTimeoutError::Timeout),
853    /// );
854    /// assert_eq!(
855    ///     r.recv_timeout(Duration::from_secs(1)),
856    ///     Ok(5),
857    /// );
858    /// assert_eq!(
859    ///     r.recv_timeout(Duration::from_secs(1)),
860    ///     Err(RecvTimeoutError::Disconnected),
861    /// );
862    /// ```
863    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
864        self.recv_deadline(Instant::now() + timeout)
865    }
866
867    /// Waits for a message to be received from the channel, but only before a given deadline.
868    ///
869    /// If the channel is empty and not disconnected, this call will block until the receive
870    /// operation can proceed or the operation times out. If the channel is empty and becomes
871    /// disconnected, this call will wake up and return an error.
872    ///
873    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
874    /// on the other side of the channel.
875    ///
876    /// # Examples
877    ///
878    /// ```
879    /// use std::thread;
880    /// use std::time::{Instant, Duration};
881    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
882    ///
883    /// let (s, r) = unbounded();
884    ///
885    /// thread::spawn(move || {
886    ///     thread::sleep(Duration::from_secs(1));
887    ///     s.send(5).unwrap();
888    ///     drop(s);
889    /// });
890    ///
891    /// let now = Instant::now();
892    ///
893    /// assert_eq!(
894    ///     r.recv_deadline(now + Duration::from_millis(500)),
895    ///     Err(RecvTimeoutError::Timeout),
896    /// );
897    /// assert_eq!(
898    ///     r.recv_deadline(now + Duration::from_millis(1500)),
899    ///     Ok(5),
900    /// );
901    /// assert_eq!(
902    ///     r.recv_deadline(now + Duration::from_secs(5)),
903    ///     Err(RecvTimeoutError::Disconnected),
904    /// );
905    /// ```
906    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
907        match &self.flavor {
908            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
909            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
910            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
911            ReceiverFlavor::At(chan) => {
912                let msg = chan.recv(Some(deadline));
913                unsafe {
914                    mem::transmute_copy::<
915                        Result<Instant, RecvTimeoutError>,
916                        Result<T, RecvTimeoutError>,
917                    >(&msg)
918                }
919            }
920            ReceiverFlavor::Tick(chan) => {
921                let msg = chan.recv(Some(deadline));
922                unsafe {
923                    mem::transmute_copy::<
924                        Result<Instant, RecvTimeoutError>,
925                        Result<T, RecvTimeoutError>,
926                    >(&msg)
927                }
928            }
929            ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
930        }
931    }
932
933    /// Returns `true` if the channel is empty.
934    ///
935    /// Note: Zero-capacity channels are always empty.
936    ///
937    /// # Examples
938    ///
939    /// ```
940    /// use crossbeam_channel::unbounded;
941    ///
942    /// let (s, r) = unbounded();
943    ///
944    /// assert!(r.is_empty());
945    /// s.send(0).unwrap();
946    /// assert!(!r.is_empty());
947    /// ```
948    pub fn is_empty(&self) -> bool {
949        match &self.flavor {
950            ReceiverFlavor::Array(chan) => chan.is_empty(),
951            ReceiverFlavor::List(chan) => chan.is_empty(),
952            ReceiverFlavor::Zero(chan) => chan.is_empty(),
953            ReceiverFlavor::At(chan) => chan.is_empty(),
954            ReceiverFlavor::Tick(chan) => chan.is_empty(),
955            ReceiverFlavor::Never(chan) => chan.is_empty(),
956        }
957    }
958
959    /// Returns `true` if the channel is full.
960    ///
961    /// Note: Zero-capacity channels are always full.
962    ///
963    /// # Examples
964    ///
965    /// ```
966    /// use crossbeam_channel::bounded;
967    ///
968    /// let (s, r) = bounded(1);
969    ///
970    /// assert!(!r.is_full());
971    /// s.send(0).unwrap();
972    /// assert!(r.is_full());
973    /// ```
974    pub fn is_full(&self) -> bool {
975        match &self.flavor {
976            ReceiverFlavor::Array(chan) => chan.is_full(),
977            ReceiverFlavor::List(chan) => chan.is_full(),
978            ReceiverFlavor::Zero(chan) => chan.is_full(),
979            ReceiverFlavor::At(chan) => chan.is_full(),
980            ReceiverFlavor::Tick(chan) => chan.is_full(),
981            ReceiverFlavor::Never(chan) => chan.is_full(),
982        }
983    }
984
985    /// Returns the number of messages in the channel.
986    ///
987    /// # Examples
988    ///
989    /// ```
990    /// use crossbeam_channel::unbounded;
991    ///
992    /// let (s, r) = unbounded();
993    /// assert_eq!(r.len(), 0);
994    ///
995    /// s.send(1).unwrap();
996    /// s.send(2).unwrap();
997    /// assert_eq!(r.len(), 2);
998    /// ```
999    pub fn len(&self) -> usize {
1000        match &self.flavor {
1001            ReceiverFlavor::Array(chan) => chan.len(),
1002            ReceiverFlavor::List(chan) => chan.len(),
1003            ReceiverFlavor::Zero(chan) => chan.len(),
1004            ReceiverFlavor::At(chan) => chan.len(),
1005            ReceiverFlavor::Tick(chan) => chan.len(),
1006            ReceiverFlavor::Never(chan) => chan.len(),
1007        }
1008    }
1009
1010    /// If the channel is bounded, returns its capacity.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```
1015    /// use crossbeam_channel::{bounded, unbounded};
1016    ///
1017    /// let (_, r) = unbounded::<i32>();
1018    /// assert_eq!(r.capacity(), None);
1019    ///
1020    /// let (_, r) = bounded::<i32>(5);
1021    /// assert_eq!(r.capacity(), Some(5));
1022    ///
1023    /// let (_, r) = bounded::<i32>(0);
1024    /// assert_eq!(r.capacity(), Some(0));
1025    /// ```
1026    pub fn capacity(&self) -> Option<usize> {
1027        match &self.flavor {
1028            ReceiverFlavor::Array(chan) => chan.capacity(),
1029            ReceiverFlavor::List(chan) => chan.capacity(),
1030            ReceiverFlavor::Zero(chan) => chan.capacity(),
1031            ReceiverFlavor::At(chan) => chan.capacity(),
1032            ReceiverFlavor::Tick(chan) => chan.capacity(),
1033            ReceiverFlavor::Never(chan) => chan.capacity(),
1034        }
1035    }
1036
1037    /// A blocking iterator over messages in the channel.
1038    ///
1039    /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1040    /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1041    ///
1042    /// [`next`]: Iterator::next
1043    ///
1044    /// # Examples
1045    ///
1046    /// ```
1047    /// use std::thread;
1048    /// use crossbeam_channel::unbounded;
1049    ///
1050    /// let (s, r) = unbounded();
1051    ///
1052    /// thread::spawn(move || {
1053    ///     s.send(1).unwrap();
1054    ///     s.send(2).unwrap();
1055    ///     s.send(3).unwrap();
1056    ///     drop(s); // Disconnect the channel.
1057    /// });
1058    ///
1059    /// // Collect all messages from the channel.
1060    /// // Note that the call to `collect` blocks until the sender is dropped.
1061    /// let v: Vec<_> = r.iter().collect();
1062    ///
1063    /// assert_eq!(v, [1, 2, 3]);
1064    /// ```
1065    pub fn iter(&self) -> Iter<'_, T> {
1066        Iter { receiver: self }
1067    }
1068
1069    /// A non-blocking iterator over messages in the channel.
1070    ///
1071    /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1072    /// never blocks waiting for the next message.
1073    ///
1074    /// [`next`]: Iterator::next
1075    ///
1076    /// # Examples
1077    ///
1078    /// ```
1079    /// use std::thread;
1080    /// use std::time::Duration;
1081    /// use crossbeam_channel::unbounded;
1082    ///
1083    /// let (s, r) = unbounded::<i32>();
1084    ///
1085    /// thread::spawn(move || {
1086    ///     s.send(1).unwrap();
1087    ///     thread::sleep(Duration::from_secs(1));
1088    ///     s.send(2).unwrap();
1089    ///     thread::sleep(Duration::from_secs(2));
1090    ///     s.send(3).unwrap();
1091    /// });
1092    ///
1093    /// thread::sleep(Duration::from_secs(2));
1094    ///
1095    /// // Collect all messages from the channel without blocking.
1096    /// // The third message hasn't been sent yet so we'll collect only the first two.
1097    /// let v: Vec<_> = r.try_iter().collect();
1098    ///
1099    /// assert_eq!(v, [1, 2]);
1100    /// ```
1101    pub fn try_iter(&self) -> TryIter<'_, T> {
1102        TryIter { receiver: self }
1103    }
1104
1105    /// Returns `true` if receivers belong to the same channel.
1106    ///
1107    /// # Examples
1108    ///
1109    /// ```rust
1110    /// use crossbeam_channel::unbounded;
1111    ///
1112    /// let (_, r) = unbounded::<usize>();
1113    ///
1114    /// let r2 = r.clone();
1115    /// assert!(r.same_channel(&r2));
1116    ///
1117    /// let (_, r3) = unbounded();
1118    /// assert!(!r.same_channel(&r3));
1119    /// ```
1120    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1121        match (&self.flavor, &other.flavor) {
1122            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1123            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1124            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1125            (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1126            (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1127            (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1128            _ => false,
1129        }
1130    }
1131}
1132
1133impl<T> Drop for Receiver<T> {
1134    fn drop(&mut self) {
1135        unsafe {
1136            match &self.flavor {
1137                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1138                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1139                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1140                ReceiverFlavor::At(_) => {}
1141                ReceiverFlavor::Tick(_) => {}
1142                ReceiverFlavor::Never(_) => {}
1143            }
1144        }
1145    }
1146}
1147
1148impl<T> Clone for Receiver<T> {
1149    fn clone(&self) -> Self {
1150        let flavor = match &self.flavor {
1151            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1152            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1153            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1154            ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1155            ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1156            ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1157        };
1158
1159        Receiver { flavor }
1160    }
1161}
1162
1163impl<T> fmt::Debug for Receiver<T> {
1164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165        f.pad("Receiver { .. }")
1166    }
1167}
1168
1169impl<'a, T> IntoIterator for &'a Receiver<T> {
1170    type Item = T;
1171    type IntoIter = Iter<'a, T>;
1172
1173    fn into_iter(self) -> Self::IntoIter {
1174        self.iter()
1175    }
1176}
1177
1178impl<T> IntoIterator for Receiver<T> {
1179    type Item = T;
1180    type IntoIter = IntoIter<T>;
1181
1182    fn into_iter(self) -> Self::IntoIter {
1183        IntoIter { receiver: self }
1184    }
1185}
1186
1187/// A blocking iterator over messages in a channel.
1188///
1189/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1190/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1191///
1192/// [`next`]: Iterator::next
1193///
1194/// # Examples
1195///
1196/// ```
1197/// use std::thread;
1198/// use crossbeam_channel::unbounded;
1199///
1200/// let (s, r) = unbounded();
1201///
1202/// thread::spawn(move || {
1203///     s.send(1).unwrap();
1204///     s.send(2).unwrap();
1205///     s.send(3).unwrap();
1206///     drop(s); // Disconnect the channel.
1207/// });
1208///
1209/// // Collect all messages from the channel.
1210/// // Note that the call to `collect` blocks until the sender is dropped.
1211/// let v: Vec<_> = r.iter().collect();
1212///
1213/// assert_eq!(v, [1, 2, 3]);
1214/// ```
1215pub struct Iter<'a, T> {
1216    receiver: &'a Receiver<T>,
1217}
1218
1219impl<T> FusedIterator for Iter<'_, T> {}
1220
1221impl<T> Iterator for Iter<'_, T> {
1222    type Item = T;
1223
1224    fn next(&mut self) -> Option<Self::Item> {
1225        self.receiver.recv().ok()
1226    }
1227}
1228
1229impl<T> fmt::Debug for Iter<'_, T> {
1230    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1231        f.pad("Iter { .. }")
1232    }
1233}
1234
1235/// A non-blocking iterator over messages in a channel.
1236///
1237/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1238/// never blocks waiting for the next message.
1239///
1240/// [`next`]: Iterator::next
1241///
1242/// # Examples
1243///
1244/// ```
1245/// use std::thread;
1246/// use std::time::Duration;
1247/// use crossbeam_channel::unbounded;
1248///
1249/// let (s, r) = unbounded::<i32>();
1250///
1251/// thread::spawn(move || {
1252///     s.send(1).unwrap();
1253///     thread::sleep(Duration::from_secs(1));
1254///     s.send(2).unwrap();
1255///     thread::sleep(Duration::from_secs(2));
1256///     s.send(3).unwrap();
1257/// });
1258///
1259/// thread::sleep(Duration::from_secs(2));
1260///
1261/// // Collect all messages from the channel without blocking.
1262/// // The third message hasn't been sent yet so we'll collect only the first two.
1263/// let v: Vec<_> = r.try_iter().collect();
1264///
1265/// assert_eq!(v, [1, 2]);
1266/// ```
1267pub struct TryIter<'a, T> {
1268    receiver: &'a Receiver<T>,
1269}
1270
1271impl<T> Iterator for TryIter<'_, T> {
1272    type Item = T;
1273
1274    fn next(&mut self) -> Option<Self::Item> {
1275        self.receiver.try_recv().ok()
1276    }
1277}
1278
1279impl<T> fmt::Debug for TryIter<'_, T> {
1280    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1281        f.pad("TryIter { .. }")
1282    }
1283}
1284
1285/// A blocking iterator over messages in a channel.
1286///
1287/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1288/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1289///
1290/// [`next`]: Iterator::next
1291///
1292/// # Examples
1293///
1294/// ```
1295/// use std::thread;
1296/// use crossbeam_channel::unbounded;
1297///
1298/// let (s, r) = unbounded();
1299///
1300/// thread::spawn(move || {
1301///     s.send(1).unwrap();
1302///     s.send(2).unwrap();
1303///     s.send(3).unwrap();
1304///     drop(s); // Disconnect the channel.
1305/// });
1306///
1307/// // Collect all messages from the channel.
1308/// // Note that the call to `collect` blocks until the sender is dropped.
1309/// let v: Vec<_> = r.into_iter().collect();
1310///
1311/// assert_eq!(v, [1, 2, 3]);
1312/// ```
1313pub struct IntoIter<T> {
1314    receiver: Receiver<T>,
1315}
1316
1317impl<T> FusedIterator for IntoIter<T> {}
1318
1319impl<T> Iterator for IntoIter<T> {
1320    type Item = T;
1321
1322    fn next(&mut self) -> Option<Self::Item> {
1323        self.receiver.recv().ok()
1324    }
1325}
1326
1327impl<T> fmt::Debug for IntoIter<T> {
1328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329        f.pad("IntoIter { .. }")
1330    }
1331}
1332
1333impl<T> SelectHandle for Sender<T> {
1334    fn try_select(&self, token: &mut Token) -> bool {
1335        match &self.flavor {
1336            SenderFlavor::Array(chan) => chan.sender().try_select(token),
1337            SenderFlavor::List(chan) => chan.sender().try_select(token),
1338            SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1339        }
1340    }
1341
1342    fn deadline(&self) -> Option<Instant> {
1343        None
1344    }
1345
1346    fn register(&self, oper: Operation, cx: &Context) -> bool {
1347        match &self.flavor {
1348            SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1349            SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1350            SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1351        }
1352    }
1353
1354    fn unregister(&self, oper: Operation) {
1355        match &self.flavor {
1356            SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1357            SenderFlavor::List(chan) => chan.sender().unregister(oper),
1358            SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1359        }
1360    }
1361
1362    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1363        match &self.flavor {
1364            SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1365            SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1366            SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1367        }
1368    }
1369
1370    fn is_ready(&self) -> bool {
1371        match &self.flavor {
1372            SenderFlavor::Array(chan) => chan.sender().is_ready(),
1373            SenderFlavor::List(chan) => chan.sender().is_ready(),
1374            SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1375        }
1376    }
1377
1378    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1379        match &self.flavor {
1380            SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1381            SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1382            SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1383        }
1384    }
1385
1386    fn unwatch(&self, oper: Operation) {
1387        match &self.flavor {
1388            SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1389            SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1390            SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1391        }
1392    }
1393}
1394
1395impl<T> SelectHandle for Receiver<T> {
1396    fn try_select(&self, token: &mut Token) -> bool {
1397        match &self.flavor {
1398            ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1399            ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1400            ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1401            ReceiverFlavor::At(chan) => chan.try_select(token),
1402            ReceiverFlavor::Tick(chan) => chan.try_select(token),
1403            ReceiverFlavor::Never(chan) => chan.try_select(token),
1404        }
1405    }
1406
1407    fn deadline(&self) -> Option<Instant> {
1408        match &self.flavor {
1409            ReceiverFlavor::Array(_) => None,
1410            ReceiverFlavor::List(_) => None,
1411            ReceiverFlavor::Zero(_) => None,
1412            ReceiverFlavor::At(chan) => chan.deadline(),
1413            ReceiverFlavor::Tick(chan) => chan.deadline(),
1414            ReceiverFlavor::Never(chan) => chan.deadline(),
1415        }
1416    }
1417
1418    fn register(&self, oper: Operation, cx: &Context) -> bool {
1419        match &self.flavor {
1420            ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1421            ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1422            ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1423            ReceiverFlavor::At(chan) => chan.register(oper, cx),
1424            ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1425            ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1426        }
1427    }
1428
1429    fn unregister(&self, oper: Operation) {
1430        match &self.flavor {
1431            ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1432            ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1433            ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1434            ReceiverFlavor::At(chan) => chan.unregister(oper),
1435            ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1436            ReceiverFlavor::Never(chan) => chan.unregister(oper),
1437        }
1438    }
1439
1440    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1441        match &self.flavor {
1442            ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1443            ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1444            ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1445            ReceiverFlavor::At(chan) => chan.accept(token, cx),
1446            ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1447            ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1448        }
1449    }
1450
1451    fn is_ready(&self) -> bool {
1452        match &self.flavor {
1453            ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1454            ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1455            ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1456            ReceiverFlavor::At(chan) => chan.is_ready(),
1457            ReceiverFlavor::Tick(chan) => chan.is_ready(),
1458            ReceiverFlavor::Never(chan) => chan.is_ready(),
1459        }
1460    }
1461
1462    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1463        match &self.flavor {
1464            ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1465            ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1466            ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1467            ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1468            ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1469            ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1470        }
1471    }
1472
1473    fn unwatch(&self, oper: Operation) {
1474        match &self.flavor {
1475            ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1476            ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1477            ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1478            ReceiverFlavor::At(chan) => chan.unwatch(oper),
1479            ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1480            ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1481        }
1482    }
1483}
1484
1485/// Writes a message into the channel.
1486pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1487    match &s.flavor {
1488        SenderFlavor::Array(chan) => chan.write(token, msg),
1489        SenderFlavor::List(chan) => chan.write(token, msg),
1490        SenderFlavor::Zero(chan) => chan.write(token, msg),
1491    }
1492}
1493
1494/// Reads a message from the channel.
1495pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1496    match &r.flavor {
1497        ReceiverFlavor::Array(chan) => chan.read(token),
1498        ReceiverFlavor::List(chan) => chan.read(token),
1499        ReceiverFlavor::Zero(chan) => chan.read(token),
1500        ReceiverFlavor::At(chan) => {
1501            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1502        }
1503        ReceiverFlavor::Tick(chan) => {
1504            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1505        }
1506        ReceiverFlavor::Never(chan) => chan.read(token),
1507    }
1508}