crossbeam_channel/channel.rs
1//! The channel interface.
2
3use std::fmt;
4use std::iter::FusedIterator;
5use std::mem;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::context::Context;
11use crate::counter;
12use crate::err::{
13 RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14};
15use crate::flavors;
16use crate::select::{Operation, SelectHandle, Token};
17
18/// Creates a channel of unbounded capacity.
19///
20/// This channel has a growable buffer that can hold any number of messages at a time.
21///
22/// # Examples
23///
24/// ```
25/// use std::thread;
26/// use crossbeam_channel::unbounded;
27///
28/// let (s, r) = unbounded();
29///
30/// // Computes the n-th Fibonacci number.
31/// fn fib(n: i32) -> i32 {
32/// if n <= 1 {
33/// n
34/// } else {
35/// fib(n - 1) + fib(n - 2)
36/// }
37/// }
38///
39/// // Spawn an asynchronous computation.
40/// thread::spawn(move || s.send(fib(20)).unwrap());
41///
42/// // Print the result of the computation.
43/// println!("{}", r.recv().unwrap());
44/// ```
45pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46 let (s, r) = counter::new(flavors::list::Channel::new());
47 let s = Sender {
48 flavor: SenderFlavor::List(s),
49 };
50 let r = Receiver {
51 flavor: ReceiverFlavor::List(r),
52 };
53 (s, r)
54}
55
56/// Creates a channel of bounded capacity.
57///
58/// This channel has a buffer that can hold at most `cap` messages at a time.
59///
60/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61/// receive operations must appear at the same time in order to pair up and pass the message over.
62///
63/// # Examples
64///
65/// A channel of capacity 1:
66///
67/// ```
68/// use std::thread;
69/// use std::time::Duration;
70/// use crossbeam_channel::bounded;
71///
72/// let (s, r) = bounded(1);
73///
74/// // This call returns immediately because there is enough space in the channel.
75/// s.send(1).unwrap();
76///
77/// thread::spawn(move || {
78/// // This call blocks the current thread because the channel is full.
79/// // It will be able to complete only after the first message is received.
80/// s.send(2).unwrap();
81/// });
82///
83/// thread::sleep(Duration::from_secs(1));
84/// assert_eq!(r.recv(), Ok(1));
85/// assert_eq!(r.recv(), Ok(2));
86/// ```
87///
88/// A zero-capacity channel:
89///
90/// ```
91/// use std::thread;
92/// use std::time::Duration;
93/// use crossbeam_channel::bounded;
94///
95/// let (s, r) = bounded(0);
96///
97/// thread::spawn(move || {
98/// // This call blocks the current thread until a receive operation appears
99/// // on the other side of the channel.
100/// s.send(1).unwrap();
101/// });
102///
103/// thread::sleep(Duration::from_secs(1));
104/// assert_eq!(r.recv(), Ok(1));
105/// ```
106pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107 if cap == 0 {
108 let (s, r) = counter::new(flavors::zero::Channel::new());
109 let s = Sender {
110 flavor: SenderFlavor::Zero(s),
111 };
112 let r = Receiver {
113 flavor: ReceiverFlavor::Zero(r),
114 };
115 (s, r)
116 } else {
117 let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118 let s = Sender {
119 flavor: SenderFlavor::Array(s),
120 };
121 let r = Receiver {
122 flavor: ReceiverFlavor::Array(r),
123 };
124 (s, r)
125 }
126}
127
128/// Creates a receiver that delivers a message after a certain duration of time.
129///
130/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132/// sent.
133///
134/// # Examples
135///
136/// Using an `after` channel for timeouts:
137///
138/// ```
139/// use std::time::Duration;
140/// use crossbeam_channel::{after, select, unbounded};
141///
142/// let (s, r) = unbounded::<i32>();
143/// let timeout = Duration::from_millis(100);
144///
145/// select! {
146/// recv(r) -> msg => println!("received {:?}", msg),
147/// recv(after(timeout)) -> _ => println!("timed out"),
148/// }
149/// ```
150///
151/// When the message gets sent:
152///
153/// ```
154/// use std::thread;
155/// use std::time::{Duration, Instant};
156/// use crossbeam_channel::after;
157///
158/// // Converts a number of milliseconds into a `Duration`.
159/// let ms = |ms| Duration::from_millis(ms);
160///
161/// // Returns `true` if `a` and `b` are very close `Instant`s.
162/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
163///
164/// let start = Instant::now();
165/// let r = after(ms(100));
166///
167/// thread::sleep(ms(500));
168///
169/// // This message was sent 100 ms from the start and received 500 ms from the start.
170/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171/// assert!(eq(Instant::now(), start + ms(500)));
172/// ```
173pub fn after(duration: Duration) -> Receiver<Instant> {
174 Receiver {
175 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
176 }
177}
178
179/// Creates a receiver that delivers a message at a certain instant in time.
180///
181/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
182/// be sent into the channel at the moment in time `when`. The message is the instant at which it
183/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
184/// instantly to the receiver.
185///
186/// # Examples
187///
188/// Using an `at` channel for timeouts:
189///
190/// ```
191/// use std::time::{Instant, Duration};
192/// use crossbeam_channel::{at, select, unbounded};
193///
194/// let (s, r) = unbounded::<i32>();
195/// let deadline = Instant::now() + Duration::from_millis(500);
196///
197/// select! {
198/// recv(r) -> msg => println!("received {:?}", msg),
199/// recv(at(deadline)) -> _ => println!("timed out"),
200/// }
201/// ```
202///
203/// When the message gets sent:
204///
205/// ```
206/// use std::time::{Duration, Instant};
207/// use crossbeam_channel::at;
208///
209/// // Converts a number of milliseconds into a `Duration`.
210/// let ms = |ms| Duration::from_millis(ms);
211///
212/// let start = Instant::now();
213/// let end = start + ms(100);
214///
215/// let r = at(end);
216///
217/// // This message was sent 100 ms from the start
218/// assert_eq!(r.recv().unwrap(), end);
219/// assert!(Instant::now() > start + ms(100));
220/// ```
221pub fn at(when: Instant) -> Receiver<Instant> {
222 Receiver {
223 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
224 }
225}
226
227/// Creates a receiver that never delivers messages.
228///
229/// The channel is bounded with capacity of 0 and never gets disconnected.
230///
231/// # Examples
232///
233/// Using a `never` channel to optionally add a timeout to [`select!`]:
234///
235/// ```
236/// use std::thread;
237/// use std::time::Duration;
238/// use crossbeam_channel::{after, select, never, unbounded};
239///
240/// let (s, r) = unbounded();
241///
242/// thread::spawn(move || {
243/// thread::sleep(Duration::from_secs(1));
244/// s.send(1).unwrap();
245/// });
246///
247/// // Suppose this duration can be a `Some` or a `None`.
248/// let duration = Some(Duration::from_millis(100));
249///
250/// // Create a channel that times out after the specified duration.
251/// let timeout = duration
252/// .map(|d| after(d))
253/// .unwrap_or(never());
254///
255/// select! {
256/// recv(r) -> msg => assert_eq!(msg, Ok(1)),
257/// recv(timeout) -> _ => println!("timed out"),
258/// }
259/// ```
260pub fn never<T>() -> Receiver<T> {
261 Receiver {
262 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
263 }
264}
265
266/// Creates a receiver that delivers messages periodically.
267///
268/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
269/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
270/// sent.
271///
272/// # Examples
273///
274/// Using a `tick` channel to periodically print elapsed time:
275///
276/// ```
277/// use std::time::{Duration, Instant};
278/// use crossbeam_channel::tick;
279///
280/// let start = Instant::now();
281/// let ticker = tick(Duration::from_millis(100));
282///
283/// for _ in 0..5 {
284/// ticker.recv().unwrap();
285/// println!("elapsed: {:?}", start.elapsed());
286/// }
287/// ```
288///
289/// When messages get sent:
290///
291/// ```
292/// use std::thread;
293/// use std::time::{Duration, Instant};
294/// use crossbeam_channel::tick;
295///
296/// // Converts a number of milliseconds into a `Duration`.
297/// let ms = |ms| Duration::from_millis(ms);
298///
299/// // Returns `true` if `a` and `b` are very close `Instant`s.
300/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
301///
302/// let start = Instant::now();
303/// let r = tick(ms(100));
304///
305/// // This message was sent 100 ms from the start and received 100 ms from the start.
306/// assert!(eq(r.recv().unwrap(), start + ms(100)));
307/// assert!(eq(Instant::now(), start + ms(100)));
308///
309/// thread::sleep(ms(500));
310///
311/// // This message was sent 200 ms from the start and received 600 ms from the start.
312/// assert!(eq(r.recv().unwrap(), start + ms(200)));
313/// assert!(eq(Instant::now(), start + ms(600)));
314///
315/// // This message was sent 700 ms from the start and received 700 ms from the start.
316/// assert!(eq(r.recv().unwrap(), start + ms(700)));
317/// assert!(eq(Instant::now(), start + ms(700)));
318/// ```
319pub fn tick(duration: Duration) -> Receiver<Instant> {
320 Receiver {
321 flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
322 }
323}
324
325/// The sending side of a channel.
326///
327/// # Examples
328///
329/// ```
330/// use std::thread;
331/// use crossbeam_channel::unbounded;
332///
333/// let (s1, r) = unbounded();
334/// let s2 = s1.clone();
335///
336/// thread::spawn(move || s1.send(1).unwrap());
337/// thread::spawn(move || s2.send(2).unwrap());
338///
339/// let msg1 = r.recv().unwrap();
340/// let msg2 = r.recv().unwrap();
341///
342/// assert_eq!(msg1 + msg2, 3);
343/// ```
344pub struct Sender<T> {
345 flavor: SenderFlavor<T>,
346}
347
348/// Sender flavors.
349enum SenderFlavor<T> {
350 /// Bounded channel based on a preallocated array.
351 Array(counter::Sender<flavors::array::Channel<T>>),
352
353 /// Unbounded channel implemented as a linked list.
354 List(counter::Sender<flavors::list::Channel<T>>),
355
356 /// Zero-capacity channel.
357 Zero(counter::Sender<flavors::zero::Channel<T>>),
358}
359
360unsafe impl<T: Send> Send for Sender<T> {}
361unsafe impl<T: Send> Sync for Sender<T> {}
362
363impl<T> UnwindSafe for Sender<T> {}
364impl<T> RefUnwindSafe for Sender<T> {}
365
366impl<T> Sender<T> {
367 /// Attempts to send a message into the channel without blocking.
368 ///
369 /// This method will either send a message into the channel immediately or return an error if
370 /// the channel is full or disconnected. The returned error contains the original message.
371 ///
372 /// If called on a zero-capacity channel, this method will send the message only if there
373 /// happens to be a receive operation on the other side of the channel at the same time.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use crossbeam_channel::{bounded, TrySendError};
379 ///
380 /// let (s, r) = bounded(1);
381 ///
382 /// assert_eq!(s.try_send(1), Ok(()));
383 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
384 ///
385 /// drop(r);
386 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
387 /// ```
388 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
389 match &self.flavor {
390 SenderFlavor::Array(chan) => chan.try_send(msg),
391 SenderFlavor::List(chan) => chan.try_send(msg),
392 SenderFlavor::Zero(chan) => chan.try_send(msg),
393 }
394 }
395
396 /// Blocks the current thread until a message is sent or the channel is disconnected.
397 ///
398 /// If the channel is full and not disconnected, this call will block until the send operation
399 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
400 /// error. The returned error contains the original message.
401 ///
402 /// If called on a zero-capacity channel, this method will wait for a receive operation to
403 /// appear on the other side of the channel.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// use std::thread;
409 /// use std::time::Duration;
410 /// use crossbeam_channel::{bounded, SendError};
411 ///
412 /// let (s, r) = bounded(1);
413 /// assert_eq!(s.send(1), Ok(()));
414 ///
415 /// thread::spawn(move || {
416 /// assert_eq!(r.recv(), Ok(1));
417 /// thread::sleep(Duration::from_secs(1));
418 /// drop(r);
419 /// });
420 ///
421 /// assert_eq!(s.send(2), Ok(()));
422 /// assert_eq!(s.send(3), Err(SendError(3)));
423 /// ```
424 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
425 match &self.flavor {
426 SenderFlavor::Array(chan) => chan.send(msg, None),
427 SenderFlavor::List(chan) => chan.send(msg, None),
428 SenderFlavor::Zero(chan) => chan.send(msg, None),
429 }
430 .map_err(|err| match err {
431 SendTimeoutError::Disconnected(msg) => SendError(msg),
432 SendTimeoutError::Timeout(_) => unreachable!(),
433 })
434 }
435
436 /// Waits for a message to be sent into the channel, but only for a limited time.
437 ///
438 /// If the channel is full and not disconnected, this call will block until the send operation
439 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
440 /// wake up and return an error. The returned error contains the original message.
441 ///
442 /// If called on a zero-capacity channel, this method will wait for a receive operation to
443 /// appear on the other side of the channel.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use std::thread;
449 /// use std::time::Duration;
450 /// use crossbeam_channel::{bounded, SendTimeoutError};
451 ///
452 /// let (s, r) = bounded(0);
453 ///
454 /// thread::spawn(move || {
455 /// thread::sleep(Duration::from_secs(1));
456 /// assert_eq!(r.recv(), Ok(2));
457 /// drop(r);
458 /// });
459 ///
460 /// assert_eq!(
461 /// s.send_timeout(1, Duration::from_millis(500)),
462 /// Err(SendTimeoutError::Timeout(1)),
463 /// );
464 /// assert_eq!(
465 /// s.send_timeout(2, Duration::from_secs(1)),
466 /// Ok(()),
467 /// );
468 /// assert_eq!(
469 /// s.send_timeout(3, Duration::from_millis(500)),
470 /// Err(SendTimeoutError::Disconnected(3)),
471 /// );
472 /// ```
473 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
474 self.send_deadline(msg, Instant::now() + timeout)
475 }
476
477 /// Waits for a message to be sent into the channel, but only until a given deadline.
478 ///
479 /// If the channel is full and not disconnected, this call will block until the send operation
480 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
481 /// wake up and return an error. The returned error contains the original message.
482 ///
483 /// If called on a zero-capacity channel, this method will wait for a receive operation to
484 /// appear on the other side of the channel.
485 ///
486 /// # Examples
487 ///
488 /// ```
489 /// use std::thread;
490 /// use std::time::{Duration, Instant};
491 /// use crossbeam_channel::{bounded, SendTimeoutError};
492 ///
493 /// let (s, r) = bounded(0);
494 ///
495 /// thread::spawn(move || {
496 /// thread::sleep(Duration::from_secs(1));
497 /// assert_eq!(r.recv(), Ok(2));
498 /// drop(r);
499 /// });
500 ///
501 /// let now = Instant::now();
502 ///
503 /// assert_eq!(
504 /// s.send_deadline(1, now + Duration::from_millis(500)),
505 /// Err(SendTimeoutError::Timeout(1)),
506 /// );
507 /// assert_eq!(
508 /// s.send_deadline(2, now + Duration::from_millis(1500)),
509 /// Ok(()),
510 /// );
511 /// assert_eq!(
512 /// s.send_deadline(3, now + Duration::from_millis(2000)),
513 /// Err(SendTimeoutError::Disconnected(3)),
514 /// );
515 /// ```
516 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
517 match &self.flavor {
518 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
519 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
520 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
521 }
522 }
523
524 /// Returns `true` if the channel is empty.
525 ///
526 /// Note: Zero-capacity channels are always empty.
527 ///
528 /// # Examples
529 ///
530 /// ```
531 /// use crossbeam_channel::unbounded;
532 ///
533 /// let (s, r) = unbounded();
534 /// assert!(s.is_empty());
535 ///
536 /// s.send(0).unwrap();
537 /// assert!(!s.is_empty());
538 /// ```
539 pub fn is_empty(&self) -> bool {
540 match &self.flavor {
541 SenderFlavor::Array(chan) => chan.is_empty(),
542 SenderFlavor::List(chan) => chan.is_empty(),
543 SenderFlavor::Zero(chan) => chan.is_empty(),
544 }
545 }
546
547 /// Returns `true` if the channel is full.
548 ///
549 /// Note: Zero-capacity channels are always full.
550 ///
551 /// # Examples
552 ///
553 /// ```
554 /// use crossbeam_channel::bounded;
555 ///
556 /// let (s, r) = bounded(1);
557 ///
558 /// assert!(!s.is_full());
559 /// s.send(0).unwrap();
560 /// assert!(s.is_full());
561 /// ```
562 pub fn is_full(&self) -> bool {
563 match &self.flavor {
564 SenderFlavor::Array(chan) => chan.is_full(),
565 SenderFlavor::List(chan) => chan.is_full(),
566 SenderFlavor::Zero(chan) => chan.is_full(),
567 }
568 }
569
570 /// Returns the number of messages in the channel.
571 ///
572 /// # Examples
573 ///
574 /// ```
575 /// use crossbeam_channel::unbounded;
576 ///
577 /// let (s, r) = unbounded();
578 /// assert_eq!(s.len(), 0);
579 ///
580 /// s.send(1).unwrap();
581 /// s.send(2).unwrap();
582 /// assert_eq!(s.len(), 2);
583 /// ```
584 pub fn len(&self) -> usize {
585 match &self.flavor {
586 SenderFlavor::Array(chan) => chan.len(),
587 SenderFlavor::List(chan) => chan.len(),
588 SenderFlavor::Zero(chan) => chan.len(),
589 }
590 }
591
592 /// If the channel is bounded, returns its capacity.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use crossbeam_channel::{bounded, unbounded};
598 ///
599 /// let (s, _) = unbounded::<i32>();
600 /// assert_eq!(s.capacity(), None);
601 ///
602 /// let (s, _) = bounded::<i32>(5);
603 /// assert_eq!(s.capacity(), Some(5));
604 ///
605 /// let (s, _) = bounded::<i32>(0);
606 /// assert_eq!(s.capacity(), Some(0));
607 /// ```
608 pub fn capacity(&self) -> Option<usize> {
609 match &self.flavor {
610 SenderFlavor::Array(chan) => chan.capacity(),
611 SenderFlavor::List(chan) => chan.capacity(),
612 SenderFlavor::Zero(chan) => chan.capacity(),
613 }
614 }
615
616 /// Returns `true` if senders belong to the same channel.
617 ///
618 /// # Examples
619 ///
620 /// ```rust
621 /// use crossbeam_channel::unbounded;
622 ///
623 /// let (s, _) = unbounded::<usize>();
624 ///
625 /// let s2 = s.clone();
626 /// assert!(s.same_channel(&s2));
627 ///
628 /// let (s3, _) = unbounded();
629 /// assert!(!s.same_channel(&s3));
630 /// ```
631 pub fn same_channel(&self, other: &Sender<T>) -> bool {
632 match (&self.flavor, &other.flavor) {
633 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
634 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
635 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
636 _ => false,
637 }
638 }
639}
640
641impl<T> Drop for Sender<T> {
642 fn drop(&mut self) {
643 unsafe {
644 match &self.flavor {
645 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
646 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
647 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
648 }
649 }
650 }
651}
652
653impl<T> Clone for Sender<T> {
654 fn clone(&self) -> Self {
655 let flavor = match &self.flavor {
656 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
657 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
658 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
659 };
660
661 Sender { flavor }
662 }
663}
664
665impl<T> fmt::Debug for Sender<T> {
666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 f.pad("Sender { .. }")
668 }
669}
670
671/// The receiving side of a channel.
672///
673/// # Examples
674///
675/// ```
676/// use std::thread;
677/// use std::time::Duration;
678/// use crossbeam_channel::unbounded;
679///
680/// let (s, r) = unbounded();
681///
682/// thread::spawn(move || {
683/// let _ = s.send(1);
684/// thread::sleep(Duration::from_secs(1));
685/// let _ = s.send(2);
686/// });
687///
688/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
689/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
690/// ```
691pub struct Receiver<T> {
692 flavor: ReceiverFlavor<T>,
693}
694
695/// Receiver flavors.
696enum ReceiverFlavor<T> {
697 /// Bounded channel based on a preallocated array.
698 Array(counter::Receiver<flavors::array::Channel<T>>),
699
700 /// Unbounded channel implemented as a linked list.
701 List(counter::Receiver<flavors::list::Channel<T>>),
702
703 /// Zero-capacity channel.
704 Zero(counter::Receiver<flavors::zero::Channel<T>>),
705
706 /// The after flavor.
707 At(Arc<flavors::at::Channel>),
708
709 /// The tick flavor.
710 Tick(Arc<flavors::tick::Channel>),
711
712 /// The never flavor.
713 Never(flavors::never::Channel<T>),
714}
715
716unsafe impl<T: Send> Send for Receiver<T> {}
717unsafe impl<T: Send> Sync for Receiver<T> {}
718
719impl<T> UnwindSafe for Receiver<T> {}
720impl<T> RefUnwindSafe for Receiver<T> {}
721
722impl<T> Receiver<T> {
723 /// Attempts to receive a message from the channel without blocking.
724 ///
725 /// This method will either receive a message from the channel immediately or return an error
726 /// if the channel is empty.
727 ///
728 /// If called on a zero-capacity channel, this method will receive a message only if there
729 /// happens to be a send operation on the other side of the channel at the same time.
730 ///
731 /// # Examples
732 ///
733 /// ```
734 /// use crossbeam_channel::{unbounded, TryRecvError};
735 ///
736 /// let (s, r) = unbounded();
737 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
738 ///
739 /// s.send(5).unwrap();
740 /// drop(s);
741 ///
742 /// assert_eq!(r.try_recv(), Ok(5));
743 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
744 /// ```
745 pub fn try_recv(&self) -> Result<T, TryRecvError> {
746 match &self.flavor {
747 ReceiverFlavor::Array(chan) => chan.try_recv(),
748 ReceiverFlavor::List(chan) => chan.try_recv(),
749 ReceiverFlavor::Zero(chan) => chan.try_recv(),
750 ReceiverFlavor::At(chan) => {
751 let msg = chan.try_recv();
752 unsafe {
753 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
754 &msg,
755 )
756 }
757 }
758 ReceiverFlavor::Tick(chan) => {
759 let msg = chan.try_recv();
760 unsafe {
761 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
762 &msg,
763 )
764 }
765 }
766 ReceiverFlavor::Never(chan) => chan.try_recv(),
767 }
768 }
769
770 /// Blocks the current thread until a message is received or the channel is empty and
771 /// disconnected.
772 ///
773 /// If the channel is empty and not disconnected, this call will block until the receive
774 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
775 /// wake up and return an error.
776 ///
777 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
778 /// on the other side of the channel.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// use std::thread;
784 /// use std::time::Duration;
785 /// use crossbeam_channel::{unbounded, RecvError};
786 ///
787 /// let (s, r) = unbounded();
788 ///
789 /// thread::spawn(move || {
790 /// thread::sleep(Duration::from_secs(1));
791 /// s.send(5).unwrap();
792 /// drop(s);
793 /// });
794 ///
795 /// assert_eq!(r.recv(), Ok(5));
796 /// assert_eq!(r.recv(), Err(RecvError));
797 /// ```
798 pub fn recv(&self) -> Result<T, RecvError> {
799 match &self.flavor {
800 ReceiverFlavor::Array(chan) => chan.recv(None),
801 ReceiverFlavor::List(chan) => chan.recv(None),
802 ReceiverFlavor::Zero(chan) => chan.recv(None),
803 ReceiverFlavor::At(chan) => {
804 let msg = chan.recv(None);
805 unsafe {
806 mem::transmute_copy::<
807 Result<Instant, RecvTimeoutError>,
808 Result<T, RecvTimeoutError>,
809 >(&msg)
810 }
811 }
812 ReceiverFlavor::Tick(chan) => {
813 let msg = chan.recv(None);
814 unsafe {
815 mem::transmute_copy::<
816 Result<Instant, RecvTimeoutError>,
817 Result<T, RecvTimeoutError>,
818 >(&msg)
819 }
820 }
821 ReceiverFlavor::Never(chan) => chan.recv(None),
822 }
823 .map_err(|_| RecvError)
824 }
825
826 /// Waits for a message to be received from the channel, but only for a limited time.
827 ///
828 /// If the channel is empty and not disconnected, this call will block until the receive
829 /// operation can proceed or the operation times out. If the channel is empty and becomes
830 /// disconnected, this call will wake up and return an error.
831 ///
832 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
833 /// on the other side of the channel.
834 ///
835 /// # Examples
836 ///
837 /// ```
838 /// use std::thread;
839 /// use std::time::Duration;
840 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
841 ///
842 /// let (s, r) = unbounded();
843 ///
844 /// thread::spawn(move || {
845 /// thread::sleep(Duration::from_secs(1));
846 /// s.send(5).unwrap();
847 /// drop(s);
848 /// });
849 ///
850 /// assert_eq!(
851 /// r.recv_timeout(Duration::from_millis(500)),
852 /// Err(RecvTimeoutError::Timeout),
853 /// );
854 /// assert_eq!(
855 /// r.recv_timeout(Duration::from_secs(1)),
856 /// Ok(5),
857 /// );
858 /// assert_eq!(
859 /// r.recv_timeout(Duration::from_secs(1)),
860 /// Err(RecvTimeoutError::Disconnected),
861 /// );
862 /// ```
863 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
864 self.recv_deadline(Instant::now() + timeout)
865 }
866
867 /// Waits for a message to be received from the channel, but only before a given deadline.
868 ///
869 /// If the channel is empty and not disconnected, this call will block until the receive
870 /// operation can proceed or the operation times out. If the channel is empty and becomes
871 /// disconnected, this call will wake up and return an error.
872 ///
873 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
874 /// on the other side of the channel.
875 ///
876 /// # Examples
877 ///
878 /// ```
879 /// use std::thread;
880 /// use std::time::{Instant, Duration};
881 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
882 ///
883 /// let (s, r) = unbounded();
884 ///
885 /// thread::spawn(move || {
886 /// thread::sleep(Duration::from_secs(1));
887 /// s.send(5).unwrap();
888 /// drop(s);
889 /// });
890 ///
891 /// let now = Instant::now();
892 ///
893 /// assert_eq!(
894 /// r.recv_deadline(now + Duration::from_millis(500)),
895 /// Err(RecvTimeoutError::Timeout),
896 /// );
897 /// assert_eq!(
898 /// r.recv_deadline(now + Duration::from_millis(1500)),
899 /// Ok(5),
900 /// );
901 /// assert_eq!(
902 /// r.recv_deadline(now + Duration::from_secs(5)),
903 /// Err(RecvTimeoutError::Disconnected),
904 /// );
905 /// ```
906 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
907 match &self.flavor {
908 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
909 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
910 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
911 ReceiverFlavor::At(chan) => {
912 let msg = chan.recv(Some(deadline));
913 unsafe {
914 mem::transmute_copy::<
915 Result<Instant, RecvTimeoutError>,
916 Result<T, RecvTimeoutError>,
917 >(&msg)
918 }
919 }
920 ReceiverFlavor::Tick(chan) => {
921 let msg = chan.recv(Some(deadline));
922 unsafe {
923 mem::transmute_copy::<
924 Result<Instant, RecvTimeoutError>,
925 Result<T, RecvTimeoutError>,
926 >(&msg)
927 }
928 }
929 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
930 }
931 }
932
933 /// Returns `true` if the channel is empty.
934 ///
935 /// Note: Zero-capacity channels are always empty.
936 ///
937 /// # Examples
938 ///
939 /// ```
940 /// use crossbeam_channel::unbounded;
941 ///
942 /// let (s, r) = unbounded();
943 ///
944 /// assert!(r.is_empty());
945 /// s.send(0).unwrap();
946 /// assert!(!r.is_empty());
947 /// ```
948 pub fn is_empty(&self) -> bool {
949 match &self.flavor {
950 ReceiverFlavor::Array(chan) => chan.is_empty(),
951 ReceiverFlavor::List(chan) => chan.is_empty(),
952 ReceiverFlavor::Zero(chan) => chan.is_empty(),
953 ReceiverFlavor::At(chan) => chan.is_empty(),
954 ReceiverFlavor::Tick(chan) => chan.is_empty(),
955 ReceiverFlavor::Never(chan) => chan.is_empty(),
956 }
957 }
958
959 /// Returns `true` if the channel is full.
960 ///
961 /// Note: Zero-capacity channels are always full.
962 ///
963 /// # Examples
964 ///
965 /// ```
966 /// use crossbeam_channel::bounded;
967 ///
968 /// let (s, r) = bounded(1);
969 ///
970 /// assert!(!r.is_full());
971 /// s.send(0).unwrap();
972 /// assert!(r.is_full());
973 /// ```
974 pub fn is_full(&self) -> bool {
975 match &self.flavor {
976 ReceiverFlavor::Array(chan) => chan.is_full(),
977 ReceiverFlavor::List(chan) => chan.is_full(),
978 ReceiverFlavor::Zero(chan) => chan.is_full(),
979 ReceiverFlavor::At(chan) => chan.is_full(),
980 ReceiverFlavor::Tick(chan) => chan.is_full(),
981 ReceiverFlavor::Never(chan) => chan.is_full(),
982 }
983 }
984
985 /// Returns the number of messages in the channel.
986 ///
987 /// # Examples
988 ///
989 /// ```
990 /// use crossbeam_channel::unbounded;
991 ///
992 /// let (s, r) = unbounded();
993 /// assert_eq!(r.len(), 0);
994 ///
995 /// s.send(1).unwrap();
996 /// s.send(2).unwrap();
997 /// assert_eq!(r.len(), 2);
998 /// ```
999 pub fn len(&self) -> usize {
1000 match &self.flavor {
1001 ReceiverFlavor::Array(chan) => chan.len(),
1002 ReceiverFlavor::List(chan) => chan.len(),
1003 ReceiverFlavor::Zero(chan) => chan.len(),
1004 ReceiverFlavor::At(chan) => chan.len(),
1005 ReceiverFlavor::Tick(chan) => chan.len(),
1006 ReceiverFlavor::Never(chan) => chan.len(),
1007 }
1008 }
1009
1010 /// If the channel is bounded, returns its capacity.
1011 ///
1012 /// # Examples
1013 ///
1014 /// ```
1015 /// use crossbeam_channel::{bounded, unbounded};
1016 ///
1017 /// let (_, r) = unbounded::<i32>();
1018 /// assert_eq!(r.capacity(), None);
1019 ///
1020 /// let (_, r) = bounded::<i32>(5);
1021 /// assert_eq!(r.capacity(), Some(5));
1022 ///
1023 /// let (_, r) = bounded::<i32>(0);
1024 /// assert_eq!(r.capacity(), Some(0));
1025 /// ```
1026 pub fn capacity(&self) -> Option<usize> {
1027 match &self.flavor {
1028 ReceiverFlavor::Array(chan) => chan.capacity(),
1029 ReceiverFlavor::List(chan) => chan.capacity(),
1030 ReceiverFlavor::Zero(chan) => chan.capacity(),
1031 ReceiverFlavor::At(chan) => chan.capacity(),
1032 ReceiverFlavor::Tick(chan) => chan.capacity(),
1033 ReceiverFlavor::Never(chan) => chan.capacity(),
1034 }
1035 }
1036
1037 /// A blocking iterator over messages in the channel.
1038 ///
1039 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1040 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1041 ///
1042 /// [`next`]: Iterator::next
1043 ///
1044 /// # Examples
1045 ///
1046 /// ```
1047 /// use std::thread;
1048 /// use crossbeam_channel::unbounded;
1049 ///
1050 /// let (s, r) = unbounded();
1051 ///
1052 /// thread::spawn(move || {
1053 /// s.send(1).unwrap();
1054 /// s.send(2).unwrap();
1055 /// s.send(3).unwrap();
1056 /// drop(s); // Disconnect the channel.
1057 /// });
1058 ///
1059 /// // Collect all messages from the channel.
1060 /// // Note that the call to `collect` blocks until the sender is dropped.
1061 /// let v: Vec<_> = r.iter().collect();
1062 ///
1063 /// assert_eq!(v, [1, 2, 3]);
1064 /// ```
1065 pub fn iter(&self) -> Iter<'_, T> {
1066 Iter { receiver: self }
1067 }
1068
1069 /// A non-blocking iterator over messages in the channel.
1070 ///
1071 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1072 /// never blocks waiting for the next message.
1073 ///
1074 /// [`next`]: Iterator::next
1075 ///
1076 /// # Examples
1077 ///
1078 /// ```
1079 /// use std::thread;
1080 /// use std::time::Duration;
1081 /// use crossbeam_channel::unbounded;
1082 ///
1083 /// let (s, r) = unbounded::<i32>();
1084 ///
1085 /// thread::spawn(move || {
1086 /// s.send(1).unwrap();
1087 /// thread::sleep(Duration::from_secs(1));
1088 /// s.send(2).unwrap();
1089 /// thread::sleep(Duration::from_secs(2));
1090 /// s.send(3).unwrap();
1091 /// });
1092 ///
1093 /// thread::sleep(Duration::from_secs(2));
1094 ///
1095 /// // Collect all messages from the channel without blocking.
1096 /// // The third message hasn't been sent yet so we'll collect only the first two.
1097 /// let v: Vec<_> = r.try_iter().collect();
1098 ///
1099 /// assert_eq!(v, [1, 2]);
1100 /// ```
1101 pub fn try_iter(&self) -> TryIter<'_, T> {
1102 TryIter { receiver: self }
1103 }
1104
1105 /// Returns `true` if receivers belong to the same channel.
1106 ///
1107 /// # Examples
1108 ///
1109 /// ```rust
1110 /// use crossbeam_channel::unbounded;
1111 ///
1112 /// let (_, r) = unbounded::<usize>();
1113 ///
1114 /// let r2 = r.clone();
1115 /// assert!(r.same_channel(&r2));
1116 ///
1117 /// let (_, r3) = unbounded();
1118 /// assert!(!r.same_channel(&r3));
1119 /// ```
1120 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1121 match (&self.flavor, &other.flavor) {
1122 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1123 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1124 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1125 (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1126 (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1127 (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1128 _ => false,
1129 }
1130 }
1131}
1132
1133impl<T> Drop for Receiver<T> {
1134 fn drop(&mut self) {
1135 unsafe {
1136 match &self.flavor {
1137 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1138 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1139 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1140 ReceiverFlavor::At(_) => {}
1141 ReceiverFlavor::Tick(_) => {}
1142 ReceiverFlavor::Never(_) => {}
1143 }
1144 }
1145 }
1146}
1147
1148impl<T> Clone for Receiver<T> {
1149 fn clone(&self) -> Self {
1150 let flavor = match &self.flavor {
1151 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1152 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1153 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1154 ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1155 ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1156 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1157 };
1158
1159 Receiver { flavor }
1160 }
1161}
1162
1163impl<T> fmt::Debug for Receiver<T> {
1164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165 f.pad("Receiver { .. }")
1166 }
1167}
1168
1169impl<'a, T> IntoIterator for &'a Receiver<T> {
1170 type Item = T;
1171 type IntoIter = Iter<'a, T>;
1172
1173 fn into_iter(self) -> Self::IntoIter {
1174 self.iter()
1175 }
1176}
1177
1178impl<T> IntoIterator for Receiver<T> {
1179 type Item = T;
1180 type IntoIter = IntoIter<T>;
1181
1182 fn into_iter(self) -> Self::IntoIter {
1183 IntoIter { receiver: self }
1184 }
1185}
1186
1187/// A blocking iterator over messages in a channel.
1188///
1189/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1190/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1191///
1192/// [`next`]: Iterator::next
1193///
1194/// # Examples
1195///
1196/// ```
1197/// use std::thread;
1198/// use crossbeam_channel::unbounded;
1199///
1200/// let (s, r) = unbounded();
1201///
1202/// thread::spawn(move || {
1203/// s.send(1).unwrap();
1204/// s.send(2).unwrap();
1205/// s.send(3).unwrap();
1206/// drop(s); // Disconnect the channel.
1207/// });
1208///
1209/// // Collect all messages from the channel.
1210/// // Note that the call to `collect` blocks until the sender is dropped.
1211/// let v: Vec<_> = r.iter().collect();
1212///
1213/// assert_eq!(v, [1, 2, 3]);
1214/// ```
1215pub struct Iter<'a, T> {
1216 receiver: &'a Receiver<T>,
1217}
1218
1219impl<T> FusedIterator for Iter<'_, T> {}
1220
1221impl<T> Iterator for Iter<'_, T> {
1222 type Item = T;
1223
1224 fn next(&mut self) -> Option<Self::Item> {
1225 self.receiver.recv().ok()
1226 }
1227}
1228
1229impl<T> fmt::Debug for Iter<'_, T> {
1230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1231 f.pad("Iter { .. }")
1232 }
1233}
1234
1235/// A non-blocking iterator over messages in a channel.
1236///
1237/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1238/// never blocks waiting for the next message.
1239///
1240/// [`next`]: Iterator::next
1241///
1242/// # Examples
1243///
1244/// ```
1245/// use std::thread;
1246/// use std::time::Duration;
1247/// use crossbeam_channel::unbounded;
1248///
1249/// let (s, r) = unbounded::<i32>();
1250///
1251/// thread::spawn(move || {
1252/// s.send(1).unwrap();
1253/// thread::sleep(Duration::from_secs(1));
1254/// s.send(2).unwrap();
1255/// thread::sleep(Duration::from_secs(2));
1256/// s.send(3).unwrap();
1257/// });
1258///
1259/// thread::sleep(Duration::from_secs(2));
1260///
1261/// // Collect all messages from the channel without blocking.
1262/// // The third message hasn't been sent yet so we'll collect only the first two.
1263/// let v: Vec<_> = r.try_iter().collect();
1264///
1265/// assert_eq!(v, [1, 2]);
1266/// ```
1267pub struct TryIter<'a, T> {
1268 receiver: &'a Receiver<T>,
1269}
1270
1271impl<T> Iterator for TryIter<'_, T> {
1272 type Item = T;
1273
1274 fn next(&mut self) -> Option<Self::Item> {
1275 self.receiver.try_recv().ok()
1276 }
1277}
1278
1279impl<T> fmt::Debug for TryIter<'_, T> {
1280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1281 f.pad("TryIter { .. }")
1282 }
1283}
1284
1285/// A blocking iterator over messages in a channel.
1286///
1287/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1288/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1289///
1290/// [`next`]: Iterator::next
1291///
1292/// # Examples
1293///
1294/// ```
1295/// use std::thread;
1296/// use crossbeam_channel::unbounded;
1297///
1298/// let (s, r) = unbounded();
1299///
1300/// thread::spawn(move || {
1301/// s.send(1).unwrap();
1302/// s.send(2).unwrap();
1303/// s.send(3).unwrap();
1304/// drop(s); // Disconnect the channel.
1305/// });
1306///
1307/// // Collect all messages from the channel.
1308/// // Note that the call to `collect` blocks until the sender is dropped.
1309/// let v: Vec<_> = r.into_iter().collect();
1310///
1311/// assert_eq!(v, [1, 2, 3]);
1312/// ```
1313pub struct IntoIter<T> {
1314 receiver: Receiver<T>,
1315}
1316
1317impl<T> FusedIterator for IntoIter<T> {}
1318
1319impl<T> Iterator for IntoIter<T> {
1320 type Item = T;
1321
1322 fn next(&mut self) -> Option<Self::Item> {
1323 self.receiver.recv().ok()
1324 }
1325}
1326
1327impl<T> fmt::Debug for IntoIter<T> {
1328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329 f.pad("IntoIter { .. }")
1330 }
1331}
1332
1333impl<T> SelectHandle for Sender<T> {
1334 fn try_select(&self, token: &mut Token) -> bool {
1335 match &self.flavor {
1336 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1337 SenderFlavor::List(chan) => chan.sender().try_select(token),
1338 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1339 }
1340 }
1341
1342 fn deadline(&self) -> Option<Instant> {
1343 None
1344 }
1345
1346 fn register(&self, oper: Operation, cx: &Context) -> bool {
1347 match &self.flavor {
1348 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1349 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1350 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1351 }
1352 }
1353
1354 fn unregister(&self, oper: Operation) {
1355 match &self.flavor {
1356 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1357 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1358 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1359 }
1360 }
1361
1362 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1363 match &self.flavor {
1364 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1365 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1366 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1367 }
1368 }
1369
1370 fn is_ready(&self) -> bool {
1371 match &self.flavor {
1372 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1373 SenderFlavor::List(chan) => chan.sender().is_ready(),
1374 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1375 }
1376 }
1377
1378 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1379 match &self.flavor {
1380 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1381 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1382 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1383 }
1384 }
1385
1386 fn unwatch(&self, oper: Operation) {
1387 match &self.flavor {
1388 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1389 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1390 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1391 }
1392 }
1393}
1394
1395impl<T> SelectHandle for Receiver<T> {
1396 fn try_select(&self, token: &mut Token) -> bool {
1397 match &self.flavor {
1398 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1399 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1400 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1401 ReceiverFlavor::At(chan) => chan.try_select(token),
1402 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1403 ReceiverFlavor::Never(chan) => chan.try_select(token),
1404 }
1405 }
1406
1407 fn deadline(&self) -> Option<Instant> {
1408 match &self.flavor {
1409 ReceiverFlavor::Array(_) => None,
1410 ReceiverFlavor::List(_) => None,
1411 ReceiverFlavor::Zero(_) => None,
1412 ReceiverFlavor::At(chan) => chan.deadline(),
1413 ReceiverFlavor::Tick(chan) => chan.deadline(),
1414 ReceiverFlavor::Never(chan) => chan.deadline(),
1415 }
1416 }
1417
1418 fn register(&self, oper: Operation, cx: &Context) -> bool {
1419 match &self.flavor {
1420 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1421 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1422 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1423 ReceiverFlavor::At(chan) => chan.register(oper, cx),
1424 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1425 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1426 }
1427 }
1428
1429 fn unregister(&self, oper: Operation) {
1430 match &self.flavor {
1431 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1432 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1433 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1434 ReceiverFlavor::At(chan) => chan.unregister(oper),
1435 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1436 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1437 }
1438 }
1439
1440 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1441 match &self.flavor {
1442 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1443 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1444 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1445 ReceiverFlavor::At(chan) => chan.accept(token, cx),
1446 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1447 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1448 }
1449 }
1450
1451 fn is_ready(&self) -> bool {
1452 match &self.flavor {
1453 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1454 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1455 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1456 ReceiverFlavor::At(chan) => chan.is_ready(),
1457 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1458 ReceiverFlavor::Never(chan) => chan.is_ready(),
1459 }
1460 }
1461
1462 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1463 match &self.flavor {
1464 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1465 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1466 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1467 ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1468 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1469 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1470 }
1471 }
1472
1473 fn unwatch(&self, oper: Operation) {
1474 match &self.flavor {
1475 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1476 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1477 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1478 ReceiverFlavor::At(chan) => chan.unwatch(oper),
1479 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1480 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1481 }
1482 }
1483}
1484
1485/// Writes a message into the channel.
1486pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1487 match &s.flavor {
1488 SenderFlavor::Array(chan) => chan.write(token, msg),
1489 SenderFlavor::List(chan) => chan.write(token, msg),
1490 SenderFlavor::Zero(chan) => chan.write(token, msg),
1491 }
1492}
1493
1494/// Reads a message from the channel.
1495pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1496 match &r.flavor {
1497 ReceiverFlavor::Array(chan) => chan.read(token),
1498 ReceiverFlavor::List(chan) => chan.read(token),
1499 ReceiverFlavor::Zero(chan) => chan.read(token),
1500 ReceiverFlavor::At(chan) => {
1501 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1502 }
1503 ReceiverFlavor::Tick(chan) => {
1504 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1505 }
1506 ReceiverFlavor::Never(chan) => chan.read(token),
1507 }
1508}