crossbeam_channel/
select.rs

1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::Backoff;
9
10use crate::channel::{self, Receiver, Sender};
11use crate::context::Context;
12use crate::err::{ReadyTimeoutError, TryReadyError};
13use crate::err::{RecvError, SendError};
14use crate::err::{SelectTimeoutError, TrySelectError};
15use crate::flavors;
16use crate::utils;
17
18/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19/// `read` or `write`.
20///
21/// Each field contains data associated with a specific channel flavor.
22#[derive(Debug, Default)]
23pub struct Token {
24    pub at: flavors::at::AtToken,
25    pub array: flavors::array::ArrayToken,
26    pub list: flavors::list::ListToken,
27    pub never: flavors::never::NeverToken,
28    pub tick: flavors::tick::TickToken,
29    pub zero: flavors::zero::ZeroToken,
30}
31
32/// Identifier associated with an operation by a specific thread on a specific channel.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct Operation(usize);
35
36impl Operation {
37    /// Creates an operation identifier from a mutable reference.
38    ///
39    /// This function essentially just turns the address of the reference into a number. The
40    /// reference should point to a variable that is specific to the thread and the operation,
41    /// and is alive for the entire duration of select or blocking operation.
42    #[inline]
43    pub fn hook<T>(r: &mut T) -> Operation {
44        let val = r as *mut T as usize;
45        // Make sure that the pointer address doesn't equal the numerical representation of
46        // `Selected::{Waiting, Aborted, Disconnected}`.
47        assert!(val > 2);
48        Operation(val)
49    }
50}
51
52/// Current state of a select or a blocking operation.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum Selected {
55    /// Still waiting for an operation.
56    Waiting,
57
58    /// The attempt to block the current thread has been aborted.
59    Aborted,
60
61    /// An operation became ready because a channel is disconnected.
62    Disconnected,
63
64    /// An operation became ready because a message can be sent or received.
65    Operation(Operation),
66}
67
68impl From<usize> for Selected {
69    #[inline]
70    fn from(val: usize) -> Selected {
71        match val {
72            0 => Selected::Waiting,
73            1 => Selected::Aborted,
74            2 => Selected::Disconnected,
75            oper => Selected::Operation(Operation(oper)),
76        }
77    }
78}
79
80impl Into<usize> for Selected {
81    #[inline]
82    fn into(self) -> usize {
83        match self {
84            Selected::Waiting => 0,
85            Selected::Aborted => 1,
86            Selected::Disconnected => 2,
87            Selected::Operation(Operation(val)) => val,
88        }
89    }
90}
91
92/// A receiver or a sender that can participate in select.
93///
94/// This is a handle that assists select in executing an operation, registration, deciding on the
95/// appropriate deadline for blocking, etc.
96pub trait SelectHandle {
97    /// Attempts to select an operation and returns `true` on success.
98    fn try_select(&self, token: &mut Token) -> bool;
99
100    /// Returns a deadline for an operation, if there is one.
101    fn deadline(&self) -> Option<Instant>;
102
103    /// Registers an operation for execution and returns `true` if it is now ready.
104    fn register(&self, oper: Operation, cx: &Context) -> bool;
105
106    /// Unregisters an operation for execution.
107    fn unregister(&self, oper: Operation);
108
109    /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110    fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111
112    /// Returns `true` if an operation can be executed without blocking.
113    fn is_ready(&self) -> bool;
114
115    /// Registers an operation for readiness notification and returns `true` if it is now ready.
116    fn watch(&self, oper: Operation, cx: &Context) -> bool;
117
118    /// Unregisters an operation for readiness notification.
119    fn unwatch(&self, oper: Operation);
120}
121
122impl<T: SelectHandle> SelectHandle for &T {
123    fn try_select(&self, token: &mut Token) -> bool {
124        (**self).try_select(token)
125    }
126
127    fn deadline(&self) -> Option<Instant> {
128        (**self).deadline()
129    }
130
131    fn register(&self, oper: Operation, cx: &Context) -> bool {
132        (**self).register(oper, cx)
133    }
134
135    fn unregister(&self, oper: Operation) {
136        (**self).unregister(oper);
137    }
138
139    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140        (**self).accept(token, cx)
141    }
142
143    fn is_ready(&self) -> bool {
144        (**self).is_ready()
145    }
146
147    fn watch(&self, oper: Operation, cx: &Context) -> bool {
148        (**self).watch(oper, cx)
149    }
150
151    fn unwatch(&self, oper: Operation) {
152        (**self).unwatch(oper)
153    }
154}
155
156/// Determines when a select operation should time out.
157#[derive(Clone, Copy, Eq, PartialEq)]
158enum Timeout {
159    /// No blocking.
160    Now,
161
162    /// Block forever.
163    Never,
164
165    /// Time out after the time instant.
166    At(Instant),
167}
168
169/// Runs until one of the operations is selected, potentially blocking the current thread.
170///
171/// Successful receive operations will have to be followed up by `channel::read()` and successful
172/// send operations by `channel::write()`.
173fn run_select(
174    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175    timeout: Timeout,
176) -> Option<(Token, usize, *const u8)> {
177    if handles.is_empty() {
178        // Wait until the timeout and return.
179        match timeout {
180            Timeout::Now => return None,
181            Timeout::Never => {
182                utils::sleep_until(None);
183                unreachable!();
184            }
185            Timeout::At(when) => {
186                utils::sleep_until(Some(when));
187                return None;
188            }
189        }
190    }
191
192    // Shuffle the operations for fairness.
193    utils::shuffle(handles);
194
195    // Create a token, which serves as a temporary variable that gets initialized in this function
196    // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197    // selected operation.
198    let mut token = Token::default();
199
200    // Try selecting one of the operations without blocking.
201    for &(handle, i, ptr) in handles.iter() {
202        if handle.try_select(&mut token) {
203            return Some((token, i, ptr));
204        }
205    }
206
207    loop {
208        // Prepare for blocking.
209        let res = Context::with(|cx| {
210            let mut sel = Selected::Waiting;
211            let mut registered_count = 0;
212            let mut index_ready = None;
213
214            if let Timeout::Now = timeout {
215                cx.try_select(Selected::Aborted).unwrap();
216            }
217
218            // Register all operations.
219            for (handle, i, _) in handles.iter_mut() {
220                registered_count += 1;
221
222                // If registration returns `false`, that means the operation has just become ready.
223                if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224                    // Try aborting select.
225                    sel = match cx.try_select(Selected::Aborted) {
226                        Ok(()) => {
227                            index_ready = Some(*i);
228                            Selected::Aborted
229                        }
230                        Err(s) => s,
231                    };
232                    break;
233                }
234
235                // If another thread has already selected one of the operations, stop registration.
236                sel = cx.selected();
237                if sel != Selected::Waiting {
238                    break;
239                }
240            }
241
242            if sel == Selected::Waiting {
243                // Check with each operation for how long we're allowed to block, and compute the
244                // earliest deadline.
245                let mut deadline: Option<Instant> = match timeout {
246                    Timeout::Now => return None,
247                    Timeout::Never => None,
248                    Timeout::At(when) => Some(when),
249                };
250                for &(handle, _, _) in handles.iter() {
251                    if let Some(x) = handle.deadline() {
252                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
253                    }
254                }
255
256                // Block the current thread.
257                sel = cx.wait_until(deadline);
258            }
259
260            // Unregister all registered operations.
261            for (handle, _, _) in handles.iter_mut().take(registered_count) {
262                handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263            }
264
265            match sel {
266                Selected::Waiting => unreachable!(),
267                Selected::Aborted => {
268                    // If an operation became ready during registration, try selecting it.
269                    if let Some(index_ready) = index_ready {
270                        for &(handle, i, ptr) in handles.iter() {
271                            if i == index_ready && handle.try_select(&mut token) {
272                                return Some((i, ptr));
273                            }
274                        }
275                    }
276                }
277                Selected::Disconnected => {}
278                Selected::Operation(_) => {
279                    // Find the selected operation.
280                    for (handle, i, ptr) in handles.iter_mut() {
281                        // Is this the selected operation?
282                        if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283                        {
284                            // Try selecting this operation.
285                            if handle.accept(&mut token, cx) {
286                                return Some((*i, *ptr));
287                            }
288                        }
289                    }
290                }
291            }
292
293            None
294        });
295
296        // Return if an operation was selected.
297        if let Some((i, ptr)) = res {
298            return Some((token, i, ptr));
299        }
300
301        // Try selecting one of the operations without blocking.
302        for &(handle, i, ptr) in handles.iter() {
303            if handle.try_select(&mut token) {
304                return Some((token, i, ptr));
305            }
306        }
307
308        match timeout {
309            Timeout::Now => return None,
310            Timeout::Never => {}
311            Timeout::At(when) => {
312                if Instant::now() >= when {
313                    return None;
314                }
315            }
316        }
317    }
318}
319
320/// Runs until one of the operations becomes ready, potentially blocking the current thread.
321fn run_ready(
322    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323    timeout: Timeout,
324) -> Option<usize> {
325    if handles.is_empty() {
326        // Wait until the timeout and return.
327        match timeout {
328            Timeout::Now => return None,
329            Timeout::Never => {
330                utils::sleep_until(None);
331                unreachable!();
332            }
333            Timeout::At(when) => {
334                utils::sleep_until(Some(when));
335                return None;
336            }
337        }
338    }
339
340    // Shuffle the operations for fairness.
341    utils::shuffle(handles);
342
343    loop {
344        let backoff = Backoff::new();
345        loop {
346            // Check operations for readiness.
347            for &(handle, i, _) in handles.iter() {
348                if handle.is_ready() {
349                    return Some(i);
350                }
351            }
352
353            if backoff.is_completed() {
354                break;
355            } else {
356                backoff.snooze();
357            }
358        }
359
360        // Check for timeout.
361        match timeout {
362            Timeout::Now => return None,
363            Timeout::Never => {}
364            Timeout::At(when) => {
365                if Instant::now() >= when {
366                    return None;
367                }
368            }
369        }
370
371        // Prepare for blocking.
372        let res = Context::with(|cx| {
373            let mut sel = Selected::Waiting;
374            let mut registered_count = 0;
375
376            // Begin watching all operations.
377            for (handle, _, _) in handles.iter_mut() {
378                registered_count += 1;
379                let oper = Operation::hook::<&dyn SelectHandle>(handle);
380
381                // If registration returns `false`, that means the operation has just become ready.
382                if handle.watch(oper, cx) {
383                    sel = match cx.try_select(Selected::Operation(oper)) {
384                        Ok(()) => Selected::Operation(oper),
385                        Err(s) => s,
386                    };
387                    break;
388                }
389
390                // If another thread has already chosen one of the operations, stop registration.
391                sel = cx.selected();
392                if sel != Selected::Waiting {
393                    break;
394                }
395            }
396
397            if sel == Selected::Waiting {
398                // Check with each operation for how long we're allowed to block, and compute the
399                // earliest deadline.
400                let mut deadline: Option<Instant> = match timeout {
401                    Timeout::Now => unreachable!(),
402                    Timeout::Never => None,
403                    Timeout::At(when) => Some(when),
404                };
405                for &(handle, _, _) in handles.iter() {
406                    if let Some(x) = handle.deadline() {
407                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
408                    }
409                }
410
411                // Block the current thread.
412                sel = cx.wait_until(deadline);
413            }
414
415            // Unwatch all operations.
416            for (handle, _, _) in handles.iter_mut().take(registered_count) {
417                handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418            }
419
420            match sel {
421                Selected::Waiting => unreachable!(),
422                Selected::Aborted => {}
423                Selected::Disconnected => {}
424                Selected::Operation(_) => {
425                    for (handle, i, _) in handles.iter_mut() {
426                        let oper = Operation::hook::<&dyn SelectHandle>(handle);
427                        if sel == Selected::Operation(oper) {
428                            return Some(*i);
429                        }
430                    }
431                }
432            }
433
434            None
435        });
436
437        // Return if an operation became ready.
438        if res.is_some() {
439            return res;
440        }
441    }
442}
443
444/// Attempts to select one of the operations without blocking.
445#[inline]
446pub fn try_select<'a>(
447    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448) -> Result<SelectedOperation<'a>, TrySelectError> {
449    match run_select(handles, Timeout::Now) {
450        None => Err(TrySelectError),
451        Some((token, index, ptr)) => Ok(SelectedOperation {
452            token,
453            index,
454            ptr,
455            _marker: PhantomData,
456        }),
457    }
458}
459
460/// Blocks until one of the operations becomes ready and selects it.
461#[inline]
462pub fn select<'a>(
463    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464) -> SelectedOperation<'a> {
465    if handles.is_empty() {
466        panic!("no operations have been added to `Select`");
467    }
468
469    let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470    SelectedOperation {
471        token,
472        index,
473        ptr,
474        _marker: PhantomData,
475    }
476}
477
478/// Blocks for a limited time until one of the operations becomes ready and selects it.
479#[inline]
480pub fn select_timeout<'a>(
481    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482    timeout: Duration,
483) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
484    select_deadline(handles, Instant::now() + timeout)
485}
486
487/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
488#[inline]
489pub(crate) fn select_deadline<'a>(
490    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
491    deadline: Instant,
492) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
493    match run_select(handles, Timeout::At(deadline)) {
494        None => Err(SelectTimeoutError),
495        Some((token, index, ptr)) => Ok(SelectedOperation {
496            token,
497            index,
498            ptr,
499            _marker: PhantomData,
500        }),
501    }
502}
503
504/// Selects from a set of channel operations.
505///
506/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508/// among them is selected.
509///
510/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511/// when it will simply return an error because the channel is disconnected.
512///
513/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514/// dynamically created list of channel operations.
515///
516/// Once a list of operations has been built with `Select`, there are two different ways of
517/// proceeding:
518///
519/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520///   the returned selected operation has already begun and **must** be completed. If we don't
521///   complete it, a panic will occur.
522///
523/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525///   possible for another thread to make the operation not ready just before we try executing it,
526///   so it's wise to use a retry loop. However, note that these methods might return with success
527///   spuriously, so it's a good idea to always double check if the operation is really ready.
528///
529/// # Examples
530///
531/// Use [`select`] to receive a message from a list of receivers:
532///
533/// ```
534/// use crossbeam_channel::{Receiver, RecvError, Select};
535///
536/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537///     // Build a list of operations.
538///     let mut sel = Select::new();
539///     for r in rs {
540///         sel.recv(r);
541///     }
542///
543///     // Complete the selected operation.
544///     let oper = sel.select();
545///     let index = oper.index();
546///     oper.recv(&rs[index])
547/// }
548/// ```
549///
550/// Use [`ready`] to receive a message from a list of receivers:
551///
552/// ```
553/// use crossbeam_channel::{Receiver, RecvError, Select};
554///
555/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556///     // Build a list of operations.
557///     let mut sel = Select::new();
558///     for r in rs {
559///         sel.recv(r);
560///     }
561///
562///     loop {
563///         // Wait until a receive operation becomes ready and try executing it.
564///         let index = sel.ready();
565///         let res = rs[index].try_recv();
566///
567///         // If the operation turns out not to be ready, retry.
568///         if let Err(e) = res {
569///             if e.is_empty() {
570///                 continue;
571///             }
572///         }
573///
574///         // Success!
575///         return res.map_err(|_| RecvError);
576///     }
577/// }
578/// ```
579///
580/// [`try_select`]: Select::try_select
581/// [`select`]: Select::select
582/// [`select_timeout`]: Select::select_timeout
583/// [`try_ready`]: Select::try_ready
584/// [`ready`]: Select::ready
585/// [`ready_timeout`]: Select::ready_timeout
586pub struct Select<'a> {
587    /// A list of senders and receivers participating in selection.
588    handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
589
590    /// The next index to assign to an operation.
591    next_index: usize,
592}
593
594unsafe impl Send for Select<'_> {}
595unsafe impl Sync for Select<'_> {}
596
597impl<'a> Select<'a> {
598    /// Creates an empty list of channel operations for selection.
599    ///
600    /// # Examples
601    ///
602    /// ```
603    /// use crossbeam_channel::Select;
604    ///
605    /// let mut sel = Select::new();
606    ///
607    /// // The list of operations is empty, which means no operation can be selected.
608    /// assert!(sel.try_select().is_err());
609    /// ```
610    pub fn new() -> Select<'a> {
611        Select {
612            handles: Vec::with_capacity(4),
613            next_index: 0,
614        }
615    }
616
617    /// Adds a send operation.
618    ///
619    /// Returns the index of the added operation.
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// use crossbeam_channel::{unbounded, Select};
625    ///
626    /// let (s, r) = unbounded::<i32>();
627    ///
628    /// let mut sel = Select::new();
629    /// let index = sel.send(&s);
630    /// ```
631    pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
632        let i = self.next_index;
633        let ptr = s as *const Sender<_> as *const u8;
634        self.handles.push((s, i, ptr));
635        self.next_index += 1;
636        i
637    }
638
639    /// Adds a receive operation.
640    ///
641    /// Returns the index of the added operation.
642    ///
643    /// # Examples
644    ///
645    /// ```
646    /// use crossbeam_channel::{unbounded, Select};
647    ///
648    /// let (s, r) = unbounded::<i32>();
649    ///
650    /// let mut sel = Select::new();
651    /// let index = sel.recv(&r);
652    /// ```
653    pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
654        let i = self.next_index;
655        let ptr = r as *const Receiver<_> as *const u8;
656        self.handles.push((r, i, ptr));
657        self.next_index += 1;
658        i
659    }
660
661    /// Removes a previously added operation.
662    ///
663    /// This is useful when an operation is selected because the channel got disconnected and we
664    /// want to try again to select a different operation instead.
665    ///
666    /// If new operations are added after removing some, the indices of removed operations will not
667    /// be reused.
668    ///
669    /// # Panics
670    ///
671    /// An attempt to remove a non-existing or already removed operation will panic.
672    ///
673    /// # Examples
674    ///
675    /// ```
676    /// use crossbeam_channel::{unbounded, Select};
677    ///
678    /// let (s1, r1) = unbounded::<i32>();
679    /// let (_, r2) = unbounded::<i32>();
680    ///
681    /// let mut sel = Select::new();
682    /// let oper1 = sel.recv(&r1);
683    /// let oper2 = sel.recv(&r2);
684    ///
685    /// // Both operations are initially ready, so a random one will be executed.
686    /// let oper = sel.select();
687    /// assert_eq!(oper.index(), oper2);
688    /// assert!(oper.recv(&r2).is_err());
689    /// sel.remove(oper2);
690    ///
691    /// s1.send(10).unwrap();
692    ///
693    /// let oper = sel.select();
694    /// assert_eq!(oper.index(), oper1);
695    /// assert_eq!(oper.recv(&r1), Ok(10));
696    /// ```
697    pub fn remove(&mut self, index: usize) {
698        assert!(
699            index < self.next_index,
700            "index out of bounds; {} >= {}",
701            index,
702            self.next_index,
703        );
704
705        let i = self
706            .handles
707            .iter()
708            .enumerate()
709            .find(|(_, (_, i, _))| *i == index)
710            .expect("no operation with this index")
711            .0;
712
713        self.handles.swap_remove(i);
714    }
715
716    /// Attempts to select one of the operations without blocking.
717    ///
718    /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719    /// the same time, a random one among them is selected. If none of the operations are ready, an
720    /// error is returned.
721    ///
722    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723    /// even when it will simply return an error because the channel is disconnected.
724    ///
725    /// The selected operation must be completed with [`SelectedOperation::send`]
726    /// or [`SelectedOperation::recv`].
727    ///
728    /// # Examples
729    ///
730    /// ```
731    /// use crossbeam_channel::{unbounded, Select};
732    ///
733    /// let (s1, r1) = unbounded();
734    /// let (s2, r2) = unbounded();
735    ///
736    /// s1.send(10).unwrap();
737    /// s2.send(20).unwrap();
738    ///
739    /// let mut sel = Select::new();
740    /// let oper1 = sel.recv(&r1);
741    /// let oper2 = sel.recv(&r2);
742    ///
743    /// // Both operations are initially ready, so a random one will be executed.
744    /// let oper = sel.try_select();
745    /// match oper {
746    ///     Err(_) => panic!("both operations should be ready"),
747    ///     Ok(oper) => match oper.index() {
748    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750    ///         _ => unreachable!(),
751    ///     }
752    /// }
753    /// ```
754    pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
755        try_select(&mut self.handles)
756    }
757
758    /// Blocks until one of the operations becomes ready and selects it.
759    ///
760    /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761    /// ready at the same time, a random one among them is selected.
762    ///
763    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764    /// even when it will simply return an error because the channel is disconnected.
765    ///
766    /// The selected operation must be completed with [`SelectedOperation::send`]
767    /// or [`SelectedOperation::recv`].
768    ///
769    /// # Panics
770    ///
771    /// Panics if no operations have been added to `Select`.
772    ///
773    /// # Examples
774    ///
775    /// ```
776    /// use std::thread;
777    /// use std::time::Duration;
778    /// use crossbeam_channel::{unbounded, Select};
779    ///
780    /// let (s1, r1) = unbounded();
781    /// let (s2, r2) = unbounded();
782    ///
783    /// thread::spawn(move || {
784    ///     thread::sleep(Duration::from_secs(1));
785    ///     s1.send(10).unwrap();
786    /// });
787    /// thread::spawn(move || s2.send(20).unwrap());
788    ///
789    /// let mut sel = Select::new();
790    /// let oper1 = sel.recv(&r1);
791    /// let oper2 = sel.recv(&r2);
792    ///
793    /// // The second operation will be selected because it becomes ready first.
794    /// let oper = sel.select();
795    /// match oper.index() {
796    ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797    ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798    ///     _ => unreachable!(),
799    /// }
800    /// ```
801    pub fn select(&mut self) -> SelectedOperation<'a> {
802        select(&mut self.handles)
803    }
804
805    /// Blocks for a limited time until one of the operations becomes ready and selects it.
806    ///
807    /// If an operation becomes ready, it is selected and returned. If multiple operations are
808    /// ready at the same time, a random one among them is selected. If none of the operations
809    /// become ready for the specified duration, an error is returned.
810    ///
811    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812    /// even when it will simply return an error because the channel is disconnected.
813    ///
814    /// The selected operation must be completed with [`SelectedOperation::send`]
815    /// or [`SelectedOperation::recv`].
816    ///
817    /// # Examples
818    ///
819    /// ```
820    /// use std::thread;
821    /// use std::time::Duration;
822    /// use crossbeam_channel::{unbounded, Select};
823    ///
824    /// let (s1, r1) = unbounded();
825    /// let (s2, r2) = unbounded();
826    ///
827    /// thread::spawn(move || {
828    ///     thread::sleep(Duration::from_secs(1));
829    ///     s1.send(10).unwrap();
830    /// });
831    /// thread::spawn(move || s2.send(20).unwrap());
832    ///
833    /// let mut sel = Select::new();
834    /// let oper1 = sel.recv(&r1);
835    /// let oper2 = sel.recv(&r2);
836    ///
837    /// // The second operation will be selected because it becomes ready first.
838    /// let oper = sel.select_timeout(Duration::from_millis(500));
839    /// match oper {
840    ///     Err(_) => panic!("should not have timed out"),
841    ///     Ok(oper) => match oper.index() {
842    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844    ///         _ => unreachable!(),
845    ///     }
846    /// }
847    /// ```
848    pub fn select_timeout(
849        &mut self,
850        timeout: Duration,
851    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
852        select_timeout(&mut self.handles, timeout)
853    }
854
855    /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
856    ///
857    /// If an operation becomes ready, it is selected and returned. If multiple operations are
858    /// ready at the same time, a random one among them is selected. If none of the operations
859    /// become ready before the given deadline, an error is returned.
860    ///
861    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862    /// even when it will simply return an error because the channel is disconnected.
863    ///
864    /// The selected operation must be completed with [`SelectedOperation::send`]
865    /// or [`SelectedOperation::recv`].
866    ///
867    /// # Examples
868    ///
869    /// ```
870    /// use std::thread;
871    /// use std::time::{Instant, Duration};
872    /// use crossbeam_channel::{unbounded, Select};
873    ///
874    /// let (s1, r1) = unbounded();
875    /// let (s2, r2) = unbounded();
876    ///
877    /// thread::spawn(move || {
878    ///     thread::sleep(Duration::from_secs(1));
879    ///     s1.send(10).unwrap();
880    /// });
881    /// thread::spawn(move || s2.send(20).unwrap());
882    ///
883    /// let mut sel = Select::new();
884    /// let oper1 = sel.recv(&r1);
885    /// let oper2 = sel.recv(&r2);
886    ///
887    /// let deadline = Instant::now() + Duration::from_millis(500);
888    ///
889    /// // The second operation will be selected because it becomes ready first.
890    /// let oper = sel.select_deadline(deadline);
891    /// match oper {
892    ///     Err(_) => panic!("should not have timed out"),
893    ///     Ok(oper) => match oper.index() {
894    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
895    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
896    ///         _ => unreachable!(),
897    ///     }
898    /// }
899    /// ```
900    pub fn select_deadline(
901        &mut self,
902        deadline: Instant,
903    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
904        select_deadline(&mut self.handles, deadline)
905    }
906
907    /// Attempts to find a ready operation without blocking.
908    ///
909    /// If an operation is ready, its index is returned. If multiple operations are ready at the
910    /// same time, a random one among them is chosen. If none of the operations are ready, an error
911    /// is returned.
912    ///
913    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
914    /// even when it will simply return an error because the channel is disconnected.
915    ///
916    /// Note that this method might return with success spuriously, so it's a good idea to always
917    /// double check if the operation is really ready.
918    ///
919    /// # Examples
920    ///
921    /// ```
922    /// use crossbeam_channel::{unbounded, Select};
923    ///
924    /// let (s1, r1) = unbounded();
925    /// let (s2, r2) = unbounded();
926    ///
927    /// s1.send(10).unwrap();
928    /// s2.send(20).unwrap();
929    ///
930    /// let mut sel = Select::new();
931    /// let oper1 = sel.recv(&r1);
932    /// let oper2 = sel.recv(&r2);
933    ///
934    /// // Both operations are initially ready, so a random one will be chosen.
935    /// match sel.try_ready() {
936    ///     Err(_) => panic!("both operations should be ready"),
937    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
938    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
939    ///     Ok(_) => unreachable!(),
940    /// }
941    /// ```
942    pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
943        match run_ready(&mut self.handles, Timeout::Now) {
944            None => Err(TryReadyError),
945            Some(index) => Ok(index),
946        }
947    }
948
949    /// Blocks until one of the operations becomes ready.
950    ///
951    /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
952    /// the same time, a random one among them is chosen.
953    ///
954    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
955    /// even when it will simply return an error because the channel is disconnected.
956    ///
957    /// Note that this method might return with success spuriously, so it's a good idea to always
958    /// double check if the operation is really ready.
959    ///
960    /// # Panics
961    ///
962    /// Panics if no operations have been added to `Select`.
963    ///
964    /// # Examples
965    ///
966    /// ```
967    /// use std::thread;
968    /// use std::time::Duration;
969    /// use crossbeam_channel::{unbounded, Select};
970    ///
971    /// let (s1, r1) = unbounded();
972    /// let (s2, r2) = unbounded();
973    ///
974    /// thread::spawn(move || {
975    ///     thread::sleep(Duration::from_secs(1));
976    ///     s1.send(10).unwrap();
977    /// });
978    /// thread::spawn(move || s2.send(20).unwrap());
979    ///
980    /// let mut sel = Select::new();
981    /// let oper1 = sel.recv(&r1);
982    /// let oper2 = sel.recv(&r2);
983    ///
984    /// // The second operation will be selected because it becomes ready first.
985    /// match sel.ready() {
986    ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
987    ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
988    ///     _ => unreachable!(),
989    /// }
990    /// ```
991    pub fn ready(&mut self) -> usize {
992        if self.handles.is_empty() {
993            panic!("no operations have been added to `Select`");
994        }
995
996        run_ready(&mut self.handles, Timeout::Never).unwrap()
997    }
998
999    /// Blocks for a limited time until one of the operations becomes ready.
1000    ///
1001    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1002    /// the same time, a random one among them is chosen. If none of the operations become ready
1003    /// for the specified duration, an error is returned.
1004    ///
1005    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1006    /// even when it will simply return an error because the channel is disconnected.
1007    ///
1008    /// Note that this method might return with success spuriously, so it's a good idea to double
1009    /// check if the operation is really ready.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```
1014    /// use std::thread;
1015    /// use std::time::Duration;
1016    /// use crossbeam_channel::{unbounded, Select};
1017    ///
1018    /// let (s1, r1) = unbounded();
1019    /// let (s2, r2) = unbounded();
1020    ///
1021    /// thread::spawn(move || {
1022    ///     thread::sleep(Duration::from_secs(1));
1023    ///     s1.send(10).unwrap();
1024    /// });
1025    /// thread::spawn(move || s2.send(20).unwrap());
1026    ///
1027    /// let mut sel = Select::new();
1028    /// let oper1 = sel.recv(&r1);
1029    /// let oper2 = sel.recv(&r2);
1030    ///
1031    /// // The second operation will be selected because it becomes ready first.
1032    /// match sel.ready_timeout(Duration::from_millis(500)) {
1033    ///     Err(_) => panic!("should not have timed out"),
1034    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1035    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1036    ///     Ok(_) => unreachable!(),
1037    /// }
1038    /// ```
1039    pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1040        self.ready_deadline(Instant::now() + timeout)
1041    }
1042
1043    /// Blocks until a given deadline, or until one of the operations becomes ready.
1044    ///
1045    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1046    /// the same time, a random one among them is chosen. If none of the operations become ready
1047    /// before the deadline, an error is returned.
1048    ///
1049    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1050    /// even when it will simply return an error because the channel is disconnected.
1051    ///
1052    /// Note that this method might return with success spuriously, so it's a good idea to double
1053    /// check if the operation is really ready.
1054    ///
1055    /// # Examples
1056    ///
1057    /// ```
1058    /// use std::thread;
1059    /// use std::time::{Duration, Instant};
1060    /// use crossbeam_channel::{unbounded, Select};
1061    ///
1062    /// let deadline = Instant::now() + Duration::from_millis(500);
1063    ///
1064    /// let (s1, r1) = unbounded();
1065    /// let (s2, r2) = unbounded();
1066    ///
1067    /// thread::spawn(move || {
1068    ///     thread::sleep(Duration::from_secs(1));
1069    ///     s1.send(10).unwrap();
1070    /// });
1071    /// thread::spawn(move || s2.send(20).unwrap());
1072    ///
1073    /// let mut sel = Select::new();
1074    /// let oper1 = sel.recv(&r1);
1075    /// let oper2 = sel.recv(&r2);
1076    ///
1077    /// // The second operation will be selected because it becomes ready first.
1078    /// match sel.ready_deadline(deadline) {
1079    ///     Err(_) => panic!("should not have timed out"),
1080    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1081    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1082    ///     Ok(_) => unreachable!(),
1083    /// }
1084    /// ```
1085    pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1086        match run_ready(&mut self.handles, Timeout::At(deadline)) {
1087            None => Err(ReadyTimeoutError),
1088            Some(index) => Ok(index),
1089        }
1090    }
1091}
1092
1093impl<'a> Clone for Select<'a> {
1094    fn clone(&self) -> Select<'a> {
1095        Select {
1096            handles: self.handles.clone(),
1097            next_index: self.next_index,
1098        }
1099    }
1100}
1101
1102impl<'a> Default for Select<'a> {
1103    fn default() -> Select<'a> {
1104        Select::new()
1105    }
1106}
1107
1108impl fmt::Debug for Select<'_> {
1109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1110        f.pad("Select { .. }")
1111    }
1112}
1113
1114/// A selected operation that needs to be completed.
1115///
1116/// To complete the operation, call [`send`] or [`recv`].
1117///
1118/// # Panics
1119///
1120/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1121/// `SelectedOperation` is dropped without completion, a panic occurs.
1122///
1123/// [`send`]: SelectedOperation::send
1124/// [`recv`]: SelectedOperation::recv
1125#[must_use]
1126pub struct SelectedOperation<'a> {
1127    /// Token needed to complete the operation.
1128    token: Token,
1129
1130    /// The index of the selected operation.
1131    index: usize,
1132
1133    /// The address of the selected `Sender` or `Receiver`.
1134    ptr: *const u8,
1135
1136    /// Indicates that `Sender`s and `Receiver`s are borrowed.
1137    _marker: PhantomData<&'a ()>,
1138}
1139
1140impl SelectedOperation<'_> {
1141    /// Returns the index of the selected operation.
1142    ///
1143    /// # Examples
1144    ///
1145    /// ```
1146    /// use crossbeam_channel::{bounded, Select};
1147    ///
1148    /// let (s1, r1) = bounded::<()>(0);
1149    /// let (s2, r2) = bounded::<()>(0);
1150    /// let (s3, r3) = bounded::<()>(1);
1151    ///
1152    /// let mut sel = Select::new();
1153    /// let oper1 = sel.send(&s1);
1154    /// let oper2 = sel.recv(&r2);
1155    /// let oper3 = sel.send(&s3);
1156    ///
1157    /// // Only the last operation is ready.
1158    /// let oper = sel.select();
1159    /// assert_eq!(oper.index(), 2);
1160    /// assert_eq!(oper.index(), oper3);
1161    ///
1162    /// // Complete the operation.
1163    /// oper.send(&s3, ()).unwrap();
1164    /// ```
1165    pub fn index(&self) -> usize {
1166        self.index
1167    }
1168
1169    /// Completes the send operation.
1170    ///
1171    /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1172    /// when the operation was added.
1173    ///
1174    /// # Panics
1175    ///
1176    /// Panics if an incorrect [`Sender`] reference is passed.
1177    ///
1178    /// # Examples
1179    ///
1180    /// ```
1181    /// use crossbeam_channel::{bounded, Select, SendError};
1182    ///
1183    /// let (s, r) = bounded::<i32>(0);
1184    /// drop(r);
1185    ///
1186    /// let mut sel = Select::new();
1187    /// let oper1 = sel.send(&s);
1188    ///
1189    /// let oper = sel.select();
1190    /// assert_eq!(oper.index(), oper1);
1191    /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1192    /// ```
1193    pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1194        assert!(
1195            s as *const Sender<T> as *const u8 == self.ptr,
1196            "passed a sender that wasn't selected",
1197        );
1198        let res = unsafe { channel::write(s, &mut self.token, msg) };
1199        mem::forget(self);
1200        res.map_err(SendError)
1201    }
1202
1203    /// Completes the receive operation.
1204    ///
1205    /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1206    /// when the operation was added.
1207    ///
1208    /// # Panics
1209    ///
1210    /// Panics if an incorrect [`Receiver`] reference is passed.
1211    ///
1212    /// # Examples
1213    ///
1214    /// ```
1215    /// use crossbeam_channel::{bounded, Select, RecvError};
1216    ///
1217    /// let (s, r) = bounded::<i32>(0);
1218    /// drop(s);
1219    ///
1220    /// let mut sel = Select::new();
1221    /// let oper1 = sel.recv(&r);
1222    ///
1223    /// let oper = sel.select();
1224    /// assert_eq!(oper.index(), oper1);
1225    /// assert_eq!(oper.recv(&r), Err(RecvError));
1226    /// ```
1227    pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1228        assert!(
1229            r as *const Receiver<T> as *const u8 == self.ptr,
1230            "passed a receiver that wasn't selected",
1231        );
1232        let res = unsafe { channel::read(r, &mut self.token) };
1233        mem::forget(self);
1234        res.map_err(|_| RecvError)
1235    }
1236}
1237
1238impl fmt::Debug for SelectedOperation<'_> {
1239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1240        f.pad("SelectedOperation { .. }")
1241    }
1242}
1243
1244impl Drop for SelectedOperation<'_> {
1245    fn drop(&mut self) {
1246        panic!("dropped `SelectedOperation` without completing the operation");
1247    }
1248}