tokio/sync/
watch.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A multi-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33//!   `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34//! * If the current value is *unseen* when calling [`changed`], then
35//!   [`changed`] will return immediately. If the current value is *seen*, then
36//!   it will sleep until either a new message is sent via the [`Sender`] half,
37//!   or the [`Sender`] is dropped.
38//! * On completion, the [`changed`] method marks the new value as *seen*.
39//! * At creation, the initial value is considered *seen*. In other words,
40//!   [`Receiver::changed()`] will not return until a subsequent value is sent.
41//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42//!   The current value at the time the [`Receiver`] is created is considered
43//!   *seen*.
44//!
45//! ## `borrow_and_update` versus `borrow`
46//!
47//! If the receiver intends to await notifications from [`changed`] in a loop,
48//! [`Receiver::borrow_and_update()`] should be preferred over
49//! [`Receiver::borrow()`].  This avoids a potential race where a new value is
50//! sent between [`changed`] being ready and the value being read. (If
51//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52//!
53//! If the receiver is only interested in the current value, and does not intend
54//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57//! self`.
58//!
59//! # Examples
60//!
61//! The following example prints `hello! world! `.
62//!
63//! ```
64//! use tokio::sync::watch;
65//! use tokio::time::{Duration, sleep};
66//!
67//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68//! let (tx, mut rx) = watch::channel("hello");
69//!
70//! tokio::spawn(async move {
71//!     // Use the equivalent of a "do-while" loop so the initial value is
72//!     // processed before awaiting the `changed()` future.
73//!     loop {
74//!         println!("{}! ", *rx.borrow_and_update());
75//!         if rx.changed().await.is_err() {
76//!             break;
77//!         }
78//!     }
79//! });
80//!
81//! sleep(Duration::from_millis(100)).await;
82//! tx.send("world")?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! # Closing
88//!
89//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90//! when all [`Receiver`] handles have been dropped. This indicates that there
91//! is no further interest in the values being produced and work can be stopped.
92//!
93//! The value in the channel will not be dropped until the sender and all
94//! receivers have been dropped.
95//!
96//! # Thread safety
97//!
98//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100//! handles may be moved to separate threads and also used concurrently.
101//!
102//! [`Sender`]: crate::sync::watch::Sender
103//! [`Receiver`]: crate::sync::watch::Receiver
104//! [`changed`]: crate::sync::watch::Receiver::changed
105//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107//! [`Receiver::borrow_and_update()`]:
108//!     crate::sync::watch::Receiver::borrow_and_update
109//! [`channel`]: crate::sync::watch::channel
110//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111//! [`Sender::closed`]: crate::sync::watch::Sender::closed
112//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113
114use crate::sync::notify::Notify;
115
116use crate::loom::sync::atomic::AtomicUsize;
117use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
118use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
119use std::fmt;
120use std::mem;
121use std::ops;
122use std::panic;
123
124/// Receives values from the associated [`Sender`](struct@Sender).
125///
126/// Instances are created by the [`channel`](fn@channel) function.
127///
128/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
129/// wrapper.
130///
131/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
132#[derive(Debug)]
133pub struct Receiver<T> {
134    /// Pointer to the shared state
135    shared: Arc<Shared<T>>,
136
137    /// Last observed version
138    version: Version,
139}
140
141/// Sends values to the associated [`Receiver`](struct@Receiver).
142///
143/// Instances are created by the [`channel`](fn@channel) function.
144#[derive(Debug)]
145pub struct Sender<T> {
146    shared: Arc<Shared<T>>,
147}
148
149impl<T> Clone for Sender<T> {
150    fn clone(&self) -> Self {
151        self.shared.ref_count_tx.fetch_add(1, Relaxed);
152
153        Self {
154            shared: self.shared.clone(),
155        }
156    }
157}
158
159/// Returns a reference to the inner value.
160///
161/// Outstanding borrows hold a read lock on the inner value. This means that
162/// long-lived borrows could cause the producer half to block. It is recommended
163/// to keep the borrow as short-lived as possible. Additionally, if you are
164/// running in an environment that allows `!Send` futures, you must ensure that
165/// the returned `Ref` type is never held alive across an `.await` point,
166/// otherwise, it can lead to a deadlock.
167///
168/// The priority policy of the lock is dependent on the underlying lock
169/// implementation, and this type does not guarantee that any particular policy
170/// will be used. In particular, a producer which is waiting to acquire the lock
171/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
172///
173/// <details><summary>Potential deadlock example</summary>
174///
175/// ```text
176/// // Task 1 (on thread A)    |  // Task 2 (on thread B)
177/// let _ref1 = rx.borrow();   |
178///                            |  // will block
179///                            |  let _ = tx.send(());
180/// // may deadlock            |
181/// let _ref2 = rx.borrow();   |
182/// ```
183/// </details>
184#[derive(Debug)]
185pub struct Ref<'a, T> {
186    inner: RwLockReadGuard<'a, T>,
187    has_changed: bool,
188}
189
190impl<'a, T> Ref<'a, T> {
191    /// Indicates if the borrowed value is considered as _changed_ since the last
192    /// time it has been marked as seen.
193    ///
194    /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
195    ///
196    /// When borrowed from the [`Sender`] this function will always return `false`.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use tokio::sync::watch;
202    ///
203    /// #[tokio::main]
204    /// async fn main() {
205    ///     let (tx, mut rx) = watch::channel("hello");
206    ///
207    ///     tx.send("goodbye").unwrap();
208    ///     // The sender does never consider the value as changed.
209    ///     assert!(!tx.borrow().has_changed());
210    ///
211    ///     // Drop the sender immediately, just for testing purposes.
212    ///     drop(tx);
213    ///
214    ///     // Even if the sender has already been dropped...
215    ///     assert!(rx.has_changed().is_err());
216    ///     // ...the modified value is still readable and detected as changed.
217    ///     assert_eq!(*rx.borrow(), "goodbye");
218    ///     assert!(rx.borrow().has_changed());
219    ///
220    ///     // Read the changed value and mark it as seen.
221    ///     {
222    ///         let received = rx.borrow_and_update();
223    ///         assert_eq!(*received, "goodbye");
224    ///         assert!(received.has_changed());
225    ///         // Release the read lock when leaving this scope.
226    ///     }
227    ///
228    ///     // Now the value has already been marked as seen and could
229    ///     // never be modified again (after the sender has been dropped).
230    ///     assert!(!rx.borrow().has_changed());
231    /// }
232    /// ```
233    pub fn has_changed(&self) -> bool {
234        self.has_changed
235    }
236}
237
238struct Shared<T> {
239    /// The most recent value.
240    value: RwLock<T>,
241
242    /// The current version.
243    ///
244    /// The lowest bit represents a "closed" state. The rest of the bits
245    /// represent the current version.
246    state: AtomicState,
247
248    /// Tracks the number of `Receiver` instances.
249    ref_count_rx: AtomicUsize,
250
251    /// Tracks the number of `Sender` instances.
252    ref_count_tx: AtomicUsize,
253
254    /// Notifies waiting receivers that the value changed.
255    notify_rx: big_notify::BigNotify,
256
257    /// Notifies any task listening for `Receiver` dropped events.
258    notify_tx: Notify,
259}
260
261impl<T: fmt::Debug> fmt::Debug for Shared<T> {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        let state = self.state.load();
264        f.debug_struct("Shared")
265            .field("value", &self.value)
266            .field("version", &state.version())
267            .field("is_closed", &state.is_closed())
268            .field("ref_count_rx", &self.ref_count_rx)
269            .finish()
270    }
271}
272
273pub mod error {
274    //! Watch error types.
275
276    use std::error::Error;
277    use std::fmt;
278
279    /// Error produced when sending a value fails.
280    #[derive(PartialEq, Eq, Clone, Copy)]
281    pub struct SendError<T>(pub T);
282
283    // ===== impl SendError =====
284
285    impl<T> fmt::Debug for SendError<T> {
286        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287            f.debug_struct("SendError").finish_non_exhaustive()
288        }
289    }
290
291    impl<T> fmt::Display for SendError<T> {
292        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
293            write!(fmt, "channel closed")
294        }
295    }
296
297    impl<T> Error for SendError<T> {}
298
299    /// Error produced when receiving a change notification.
300    #[derive(Debug, Clone)]
301    pub struct RecvError(pub(super) ());
302
303    // ===== impl RecvError =====
304
305    impl fmt::Display for RecvError {
306        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307            write!(fmt, "channel closed")
308        }
309    }
310
311    impl Error for RecvError {}
312}
313
314mod big_notify {
315    use super::Notify;
316    use crate::sync::notify::Notified;
317
318    // To avoid contention on the lock inside the `Notify`, we store multiple
319    // copies of it. Then, we use either circular access or randomness to spread
320    // out threads over different `Notify` objects.
321    //
322    // Some simple benchmarks show that randomness performs slightly better than
323    // circular access (probably due to contention on `next`), so we prefer to
324    // use randomness when Tokio is compiled with a random number generator.
325    //
326    // When the random number generator is not available, we fall back to
327    // circular access.
328
329    pub(super) struct BigNotify {
330        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
331        next: std::sync::atomic::AtomicUsize,
332        inner: [Notify; 8],
333    }
334
335    impl BigNotify {
336        pub(super) fn new() -> Self {
337            Self {
338                #[cfg(not(all(
339                    not(loom),
340                    feature = "sync",
341                    any(feature = "rt", feature = "macros")
342                )))]
343                next: std::sync::atomic::AtomicUsize::new(0),
344                inner: Default::default(),
345            }
346        }
347
348        pub(super) fn notify_waiters(&self) {
349            for notify in &self.inner {
350                notify.notify_waiters();
351            }
352        }
353
354        /// This function implements the case where randomness is not available.
355        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
356        pub(super) fn notified(&self) -> Notified<'_> {
357            let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
358            self.inner[i].notified()
359        }
360
361        /// This function implements the case where randomness is available.
362        #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
363        pub(super) fn notified(&self) -> Notified<'_> {
364            let i = crate::runtime::context::thread_rng_n(8) as usize;
365            self.inner[i].notified()
366        }
367    }
368}
369
370use self::state::{AtomicState, Version};
371mod state {
372    use crate::loom::sync::atomic::AtomicUsize;
373    use crate::loom::sync::atomic::Ordering;
374
375    const CLOSED_BIT: usize = 1;
376
377    // Using 2 as the step size preserves the `CLOSED_BIT`.
378    const STEP_SIZE: usize = 2;
379
380    /// The version part of the state. The lowest bit is always zero.
381    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
382    pub(super) struct Version(usize);
383
384    /// Snapshot of the state. The first bit is used as the CLOSED bit.
385    /// The remaining bits are used as the version.
386    ///
387    /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
388    /// receivers does not set it.
389    #[derive(Copy, Clone, Debug)]
390    pub(super) struct StateSnapshot(usize);
391
392    /// The state stored in an atomic integer.
393    ///
394    /// The `Sender` uses `Release` ordering for storing a new state
395    /// and the `Receiver`s use `Acquire` ordering for loading the
396    /// current state. This ensures that written values are seen by
397    /// the `Receiver`s for a proper handover.
398    #[derive(Debug)]
399    pub(super) struct AtomicState(AtomicUsize);
400
401    impl Version {
402        /// Decrements the version.
403        pub(super) fn decrement(&mut self) {
404            // Using a wrapping decrement here is required to ensure that the
405            // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
406            // which wraps on overflow.
407            self.0 = self.0.wrapping_sub(STEP_SIZE);
408        }
409
410        pub(super) const INITIAL: Self = Version(0);
411    }
412
413    impl StateSnapshot {
414        /// Extract the version from the state.
415        pub(super) fn version(self) -> Version {
416            Version(self.0 & !CLOSED_BIT)
417        }
418
419        /// Is the closed bit set?
420        pub(super) fn is_closed(self) -> bool {
421            (self.0 & CLOSED_BIT) == CLOSED_BIT
422        }
423    }
424
425    impl AtomicState {
426        /// Create a new `AtomicState` that is not closed and which has the
427        /// version set to `Version::INITIAL`.
428        pub(super) fn new() -> Self {
429            AtomicState(AtomicUsize::new(Version::INITIAL.0))
430        }
431
432        /// Load the current value of the state.
433        ///
434        /// Only used by the receiver and for debugging purposes.
435        ///
436        /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
437        /// of the shared value with the sender side (single writer). The state is always
438        /// updated after modifying and before releasing the (exclusive) lock on the
439        /// shared value.
440        pub(super) fn load(&self) -> StateSnapshot {
441            StateSnapshot(self.0.load(Ordering::Acquire))
442        }
443
444        /// Increment the version counter.
445        pub(super) fn increment_version_while_locked(&self) {
446            // Use `Release` ordering to ensure that the shared value
447            // has been written before updating the version. The shared
448            // value is still protected by an exclusive lock during this
449            // method.
450            self.0.fetch_add(STEP_SIZE, Ordering::Release);
451        }
452
453        /// Set the closed bit in the state.
454        pub(super) fn set_closed(&self) {
455            self.0.fetch_or(CLOSED_BIT, Ordering::Release);
456        }
457    }
458}
459
460/// Creates a new watch channel, returning the "send" and "receive" handles.
461///
462/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
463/// Only the last value sent is made available to the [`Receiver`] half. All
464/// intermediate values are dropped.
465///
466/// # Examples
467///
468/// The following example prints `hello! world! `.
469///
470/// ```
471/// use tokio::sync::watch;
472/// use tokio::time::{Duration, sleep};
473///
474/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
475/// let (tx, mut rx) = watch::channel("hello");
476///
477/// tokio::spawn(async move {
478///     // Use the equivalent of a "do-while" loop so the initial value is
479///     // processed before awaiting the `changed()` future.
480///     loop {
481///         println!("{}! ", *rx.borrow_and_update());
482///         if rx.changed().await.is_err() {
483///             break;
484///         }
485///     }
486/// });
487///
488/// sleep(Duration::from_millis(100)).await;
489/// tx.send("world")?;
490/// # Ok(())
491/// # }
492/// ```
493///
494/// [`Sender`]: struct@Sender
495/// [`Receiver`]: struct@Receiver
496pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
497    let shared = Arc::new(Shared {
498        value: RwLock::new(init),
499        state: AtomicState::new(),
500        ref_count_rx: AtomicUsize::new(1),
501        ref_count_tx: AtomicUsize::new(1),
502        notify_rx: big_notify::BigNotify::new(),
503        notify_tx: Notify::new(),
504    });
505
506    let tx = Sender {
507        shared: shared.clone(),
508    };
509
510    let rx = Receiver {
511        shared,
512        version: Version::INITIAL,
513    };
514
515    (tx, rx)
516}
517
518impl<T> Receiver<T> {
519    fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
520        // No synchronization necessary as this is only used as a counter and
521        // not memory access.
522        shared.ref_count_rx.fetch_add(1, Relaxed);
523
524        Self { shared, version }
525    }
526
527    /// Returns a reference to the most recently sent value.
528    ///
529    /// This method does not mark the returned value as seen, so future calls to
530    /// [`changed`] may return immediately even if you have already seen the
531    /// value with a call to `borrow`.
532    ///
533    /// Outstanding borrows hold a read lock on the inner value. This means that
534    /// long-lived borrows could cause the producer half to block. It is recommended
535    /// to keep the borrow as short-lived as possible. Additionally, if you are
536    /// running in an environment that allows `!Send` futures, you must ensure that
537    /// the returned `Ref` type is never held alive across an `.await` point,
538    /// otherwise, it can lead to a deadlock.
539    ///
540    /// The priority policy of the lock is dependent on the underlying lock
541    /// implementation, and this type does not guarantee that any particular policy
542    /// will be used. In particular, a producer which is waiting to acquire the lock
543    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
544    ///
545    /// <details><summary>Potential deadlock example</summary>
546    ///
547    /// ```text
548    /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
549    /// let _ref1 = rx.borrow();   |
550    ///                            |  // will block
551    ///                            |  let _ = tx.send(());
552    /// // may deadlock            |
553    /// let _ref2 = rx.borrow();   |
554    /// ```
555    /// </details>
556    ///
557    /// For more information on when to use this method versus
558    /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
559    ///
560    /// [`changed`]: Receiver::changed
561    /// [`borrow_and_update`]: Receiver::borrow_and_update
562    ///
563    /// # Examples
564    ///
565    /// ```
566    /// use tokio::sync::watch;
567    ///
568    /// let (_, rx) = watch::channel("hello");
569    /// assert_eq!(*rx.borrow(), "hello");
570    /// ```
571    pub fn borrow(&self) -> Ref<'_, T> {
572        let inner = self.shared.value.read().unwrap();
573
574        // After obtaining a read-lock no concurrent writes could occur
575        // and the loaded version matches that of the borrowed reference.
576        let new_version = self.shared.state.load().version();
577        let has_changed = self.version != new_version;
578
579        Ref { inner, has_changed }
580    }
581
582    /// Returns a reference to the most recently sent value and marks that value
583    /// as seen.
584    ///
585    /// This method marks the current value as seen. Subsequent calls to [`changed`]
586    /// will not return immediately until the [`Sender`] has modified the shared
587    /// value again.
588    ///
589    /// Outstanding borrows hold a read lock on the inner value. This means that
590    /// long-lived borrows could cause the producer half to block. It is recommended
591    /// to keep the borrow as short-lived as possible. Additionally, if you are
592    /// running in an environment that allows `!Send` futures, you must ensure that
593    /// the returned `Ref` type is never held alive across an `.await` point,
594    /// otherwise, it can lead to a deadlock.
595    ///
596    /// The priority policy of the lock is dependent on the underlying lock
597    /// implementation, and this type does not guarantee that any particular policy
598    /// will be used. In particular, a producer which is waiting to acquire the lock
599    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
600    ///
601    /// <details><summary>Potential deadlock example</summary>
602    ///
603    /// ```text
604    /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
605    /// let _ref1 = rx1.borrow_and_update();   |
606    ///                                        |  // will block
607    ///                                        |  let _ = tx.send(());
608    /// // may deadlock                        |
609    /// let _ref2 = rx2.borrow_and_update();   |
610    /// ```
611    /// </details>
612    ///
613    /// For more information on when to use this method versus [`borrow`], see
614    /// [here](self#borrow_and_update-versus-borrow).
615    ///
616    /// [`changed`]: Receiver::changed
617    /// [`borrow`]: Receiver::borrow
618    pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
619        let inner = self.shared.value.read().unwrap();
620
621        // After obtaining a read-lock no concurrent writes could occur
622        // and the loaded version matches that of the borrowed reference.
623        let new_version = self.shared.state.load().version();
624        let has_changed = self.version != new_version;
625
626        // Mark the shared value as seen by updating the version
627        self.version = new_version;
628
629        Ref { inner, has_changed }
630    }
631
632    /// Checks if this channel contains a message that this receiver has not yet
633    /// seen. The new value is not marked as seen.
634    ///
635    /// Although this method is called `has_changed`, it does not check new
636    /// messages for equality, so this call will return true even if the new
637    /// message is equal to the old message.
638    ///
639    /// Returns an error if the channel has been closed.
640    /// # Examples
641    ///
642    /// ```
643    /// use tokio::sync::watch;
644    ///
645    /// #[tokio::main]
646    /// async fn main() {
647    ///     let (tx, mut rx) = watch::channel("hello");
648    ///
649    ///     tx.send("goodbye").unwrap();
650    ///
651    ///     assert!(rx.has_changed().unwrap());
652    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
653    ///
654    ///     // The value has been marked as seen
655    ///     assert!(!rx.has_changed().unwrap());
656    ///
657    ///     drop(tx);
658    ///     // The `tx` handle has been dropped
659    ///     assert!(rx.has_changed().is_err());
660    /// }
661    /// ```
662    pub fn has_changed(&self) -> Result<bool, error::RecvError> {
663        // Load the version from the state
664        let state = self.shared.state.load();
665        if state.is_closed() {
666            // The sender has dropped.
667            return Err(error::RecvError(()));
668        }
669        let new_version = state.version();
670
671        Ok(self.version != new_version)
672    }
673
674    /// Marks the state as changed.
675    ///
676    /// After invoking this method [`has_changed()`](Self::has_changed)
677    /// returns `true` and [`changed()`](Self::changed) returns
678    /// immediately, regardless of whether a new value has been sent.
679    ///
680    /// This is useful for triggering an initial change notification after
681    /// subscribing to synchronize new receivers.
682    pub fn mark_changed(&mut self) {
683        self.version.decrement();
684    }
685
686    /// Marks the state as unchanged.
687    ///
688    /// The current value will be considered seen by the receiver.
689    ///
690    /// This is useful if you are not interested in the current value
691    /// visible in the receiver.
692    pub fn mark_unchanged(&mut self) {
693        let current_version = self.shared.state.load().version();
694        self.version = current_version;
695    }
696
697    /// Waits for a change notification, then marks the newest value as seen.
698    ///
699    /// If the newest value in the channel has not yet been marked seen when
700    /// this method is called, the method marks that value seen and returns
701    /// immediately. If the newest value has already been marked seen, then the
702    /// method sleeps until a new message is sent by the [`Sender`] connected to
703    /// this `Receiver`, or until the [`Sender`] is dropped.
704    ///
705    /// This method returns an error if and only if the [`Sender`] is dropped.
706    ///
707    /// For more information, see
708    /// [*Change notifications*](self#change-notifications) in the module-level documentation.
709    ///
710    /// # Cancel safety
711    ///
712    /// This method is cancel safe. If you use it as the event in a
713    /// [`tokio::select!`](crate::select) statement and some other branch
714    /// completes first, then it is guaranteed that no values have been marked
715    /// seen by this call to `changed`.
716    ///
717    /// [`Sender`]: struct@Sender
718    ///
719    /// # Examples
720    ///
721    /// ```
722    /// use tokio::sync::watch;
723    ///
724    /// #[tokio::main]
725    /// async fn main() {
726    ///     let (tx, mut rx) = watch::channel("hello");
727    ///
728    ///     tokio::spawn(async move {
729    ///         tx.send("goodbye").unwrap();
730    ///     });
731    ///
732    ///     assert!(rx.changed().await.is_ok());
733    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
734    ///
735    ///     // The `tx` handle has been dropped
736    ///     assert!(rx.changed().await.is_err());
737    /// }
738    /// ```
739    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
740        changed_impl(&self.shared, &mut self.version).await
741    }
742
743    /// Waits for a value that satisfies the provided condition.
744    ///
745    /// This method will call the provided closure whenever something is sent on
746    /// the channel. Once the closure returns `true`, this method will return a
747    /// reference to the value that was passed to the closure.
748    ///
749    /// Before `wait_for` starts waiting for changes, it will call the closure
750    /// on the current value. If the closure returns `true` when given the
751    /// current value, then `wait_for` will immediately return a reference to
752    /// the current value. This is the case even if the current value is already
753    /// considered seen.
754    ///
755    /// The watch channel only keeps track of the most recent value, so if
756    /// several messages are sent faster than `wait_for` is able to call the
757    /// closure, then it may skip some updates. Whenever the closure is called,
758    /// it will be called with the most recent value.
759    ///
760    /// When this function returns, the value that was passed to the closure
761    /// when it returned `true` will be considered seen.
762    ///
763    /// If the channel is closed, then `wait_for` will return a `RecvError`.
764    /// Once this happens, no more messages can ever be sent on the channel.
765    /// When an error is returned, it is guaranteed that the closure has been
766    /// called on the last value, and that it returned `false` for that value.
767    /// (If the closure returned `true`, then the last value would have been
768    /// returned instead of the error.)
769    ///
770    /// Like the `borrow` method, the returned borrow holds a read lock on the
771    /// inner value. This means that long-lived borrows could cause the producer
772    /// half to block. It is recommended to keep the borrow as short-lived as
773    /// possible. See the documentation of `borrow` for more information on
774    /// this.
775    ///
776    /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
777    ///
778    /// # Examples
779    ///
780    /// ```
781    /// use tokio::sync::watch;
782    ///
783    /// #[tokio::main]
784    ///
785    /// async fn main() {
786    ///     let (tx, _rx) = watch::channel("hello");
787    ///
788    ///     tx.send("goodbye").unwrap();
789    ///
790    ///     // here we subscribe to a second receiver
791    ///     // now in case of using `changed` we would have
792    ///     // to first check the current value and then wait
793    ///     // for changes or else `changed` would hang.
794    ///     let mut rx2 = tx.subscribe();
795    ///
796    ///     // in place of changed we have use `wait_for`
797    ///     // which would automatically check the current value
798    ///     // and wait for changes until the closure returns true.
799    ///     assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
800    ///     assert_eq!(*rx2.borrow(), "goodbye");
801    /// }
802    /// ```
803    pub async fn wait_for(
804        &mut self,
805        mut f: impl FnMut(&T) -> bool,
806    ) -> Result<Ref<'_, T>, error::RecvError> {
807        let mut closed = false;
808        loop {
809            {
810                let inner = self.shared.value.read().unwrap();
811
812                let new_version = self.shared.state.load().version();
813                let has_changed = self.version != new_version;
814                self.version = new_version;
815
816                if !closed || has_changed {
817                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
818                    match result {
819                        Ok(true) => {
820                            return Ok(Ref { inner, has_changed });
821                        }
822                        Ok(false) => {
823                            // Skip the value.
824                        }
825                        Err(panicked) => {
826                            // Drop the read-lock to avoid poisoning it.
827                            drop(inner);
828                            // Forward the panic to the caller.
829                            panic::resume_unwind(panicked);
830                            // Unreachable
831                        }
832                    };
833                }
834            }
835
836            if closed {
837                return Err(error::RecvError(()));
838            }
839
840            // Wait for the value to change.
841            closed = changed_impl(&self.shared, &mut self.version).await.is_err();
842        }
843    }
844
845    /// Returns `true` if receivers belong to the same channel.
846    ///
847    /// # Examples
848    ///
849    /// ```
850    /// let (tx, rx) = tokio::sync::watch::channel(true);
851    /// let rx2 = rx.clone();
852    /// assert!(rx.same_channel(&rx2));
853    ///
854    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
855    /// assert!(!rx3.same_channel(&rx2));
856    /// ```
857    pub fn same_channel(&self, other: &Self) -> bool {
858        Arc::ptr_eq(&self.shared, &other.shared)
859    }
860
861    cfg_process_driver! {
862        pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
863            maybe_changed(&self.shared, &mut self.version)
864        }
865    }
866}
867
868fn maybe_changed<T>(
869    shared: &Shared<T>,
870    version: &mut Version,
871) -> Option<Result<(), error::RecvError>> {
872    // Load the version from the state
873    let state = shared.state.load();
874    let new_version = state.version();
875
876    if *version != new_version {
877        // Observe the new version and return
878        *version = new_version;
879        return Some(Ok(()));
880    }
881
882    if state.is_closed() {
883        // The sender has been dropped.
884        return Some(Err(error::RecvError(())));
885    }
886
887    None
888}
889
890async fn changed_impl<T>(
891    shared: &Shared<T>,
892    version: &mut Version,
893) -> Result<(), error::RecvError> {
894    crate::trace::async_trace_leaf().await;
895
896    loop {
897        // In order to avoid a race condition, we first request a notification,
898        // **then** check the current value's version. If a new version exists,
899        // the notification request is dropped.
900        let notified = shared.notify_rx.notified();
901
902        if let Some(ret) = maybe_changed(shared, version) {
903            return ret;
904        }
905
906        notified.await;
907        // loop around again in case the wake-up was spurious
908    }
909}
910
911impl<T> Clone for Receiver<T> {
912    fn clone(&self) -> Self {
913        let version = self.version;
914        let shared = self.shared.clone();
915
916        Self::from_shared(version, shared)
917    }
918}
919
920impl<T> Drop for Receiver<T> {
921    fn drop(&mut self) {
922        // No synchronization necessary as this is only used as a counter and
923        // not memory access.
924        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
925            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
926            self.shared.notify_tx.notify_waiters();
927        }
928    }
929}
930
931impl<T> Sender<T> {
932    /// Creates the sending-half of the [`watch`] channel.
933    ///
934    /// See documentation of [`watch::channel`] for errors when calling this function.
935    /// Beware that attempting to send a value when there are no receivers will
936    /// return an error.
937    ///
938    /// [`watch`]: crate::sync::watch
939    /// [`watch::channel`]: crate::sync::watch
940    ///
941    /// # Examples
942    /// ```
943    /// let sender = tokio::sync::watch::Sender::new(0u8);
944    /// assert!(sender.send(3).is_err());
945    /// let _rec = sender.subscribe();
946    /// assert!(sender.send(4).is_ok());
947    /// ```
948    pub fn new(init: T) -> Self {
949        let (tx, _) = channel(init);
950        tx
951    }
952
953    /// Sends a new value via the channel, notifying all receivers.
954    ///
955    /// This method fails if the channel is closed, which is the case when
956    /// every receiver has been dropped. It is possible to reopen the channel
957    /// using the [`subscribe`] method. However, when `send` fails, the value
958    /// isn't made available for future receivers (but returned with the
959    /// [`SendError`]).
960    ///
961    /// To always make a new value available for future receivers, even if no
962    /// receiver currently exists, one of the other send methods
963    /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
964    /// used instead.
965    ///
966    /// [`subscribe`]: Sender::subscribe
967    /// [`SendError`]: error::SendError
968    /// [`send_if_modified`]: Sender::send_if_modified
969    /// [`send_modify`]: Sender::send_modify
970    /// [`send_replace`]: Sender::send_replace
971    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
972        // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
973        if 0 == self.receiver_count() {
974            return Err(error::SendError(value));
975        }
976
977        self.send_replace(value);
978        Ok(())
979    }
980
981    /// Modifies the watched value **unconditionally** in-place,
982    /// notifying all receivers.
983    ///
984    /// This can be useful for modifying the watched value, without
985    /// having to allocate a new instance. Additionally, this
986    /// method permits sending values even when there are no receivers.
987    ///
988    /// Prefer to use the more versatile function [`Self::send_if_modified()`]
989    /// if the value is only modified conditionally during the mutable borrow
990    /// to prevent unneeded change notifications for unmodified values.
991    ///
992    /// # Panics
993    ///
994    /// This function panics when the invocation of the `modify` closure panics.
995    /// No receivers are notified when panicking. All changes of the watched
996    /// value applied by the closure before panicking will be visible in
997    /// subsequent calls to `borrow`.
998    ///
999    /// # Examples
1000    ///
1001    /// ```
1002    /// use tokio::sync::watch;
1003    ///
1004    /// struct State {
1005    ///     counter: usize,
1006    /// }
1007    /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1008    /// state_tx.send_modify(|state| state.counter += 1);
1009    /// assert_eq!(state_rx.borrow().counter, 1);
1010    /// ```
1011    pub fn send_modify<F>(&self, modify: F)
1012    where
1013        F: FnOnce(&mut T),
1014    {
1015        self.send_if_modified(|value| {
1016            modify(value);
1017            true
1018        });
1019    }
1020
1021    /// Modifies the watched value **conditionally** in-place,
1022    /// notifying all receivers only if modified.
1023    ///
1024    /// This can be useful for modifying the watched value, without
1025    /// having to allocate a new instance. Additionally, this
1026    /// method permits sending values even when there are no receivers.
1027    ///
1028    /// The `modify` closure must return `true` if the value has actually
1029    /// been modified during the mutable borrow. It should only return `false`
1030    /// if the value is guaranteed to be unmodified despite the mutable
1031    /// borrow.
1032    ///
1033    /// Receivers are only notified if the closure returned `true`. If the
1034    /// closure has modified the value but returned `false` this results
1035    /// in a *silent modification*, i.e. the modified value will be visible
1036    /// in subsequent calls to `borrow`, but receivers will not receive
1037    /// a change notification.
1038    ///
1039    /// Returns the result of the closure, i.e. `true` if the value has
1040    /// been modified and `false` otherwise.
1041    ///
1042    /// # Panics
1043    ///
1044    /// This function panics when the invocation of the `modify` closure panics.
1045    /// No receivers are notified when panicking. All changes of the watched
1046    /// value applied by the closure before panicking will be visible in
1047    /// subsequent calls to `borrow`.
1048    ///
1049    /// # Examples
1050    ///
1051    /// ```
1052    /// use tokio::sync::watch;
1053    ///
1054    /// struct State {
1055    ///     counter: usize,
1056    /// }
1057    /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1058    /// let inc_counter_if_odd = |state: &mut State| {
1059    ///     if state.counter % 2 == 1 {
1060    ///         state.counter += 1;
1061    ///         return true;
1062    ///     }
1063    ///     false
1064    /// };
1065    ///
1066    /// assert_eq!(state_rx.borrow().counter, 1);
1067    ///
1068    /// assert!(!state_rx.has_changed().unwrap());
1069    /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1070    /// assert!(state_rx.has_changed().unwrap());
1071    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1072    ///
1073    /// assert!(!state_rx.has_changed().unwrap());
1074    /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1075    /// assert!(!state_rx.has_changed().unwrap());
1076    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1077    /// ```
1078    pub fn send_if_modified<F>(&self, modify: F) -> bool
1079    where
1080        F: FnOnce(&mut T) -> bool,
1081    {
1082        {
1083            // Acquire the write lock and update the value.
1084            let mut lock = self.shared.value.write().unwrap();
1085
1086            // Update the value and catch possible panic inside func.
1087            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1088            match result {
1089                Ok(modified) => {
1090                    if !modified {
1091                        // Abort, i.e. don't notify receivers if unmodified
1092                        return false;
1093                    }
1094                    // Continue if modified
1095                }
1096                Err(panicked) => {
1097                    // Drop the lock to avoid poisoning it.
1098                    drop(lock);
1099                    // Forward the panic to the caller.
1100                    panic::resume_unwind(panicked);
1101                    // Unreachable
1102                }
1103            };
1104
1105            self.shared.state.increment_version_while_locked();
1106
1107            // Release the write lock.
1108            //
1109            // Incrementing the version counter while holding the lock ensures
1110            // that receivers are able to figure out the version number of the
1111            // value they are currently looking at.
1112            drop(lock);
1113        }
1114
1115        self.shared.notify_rx.notify_waiters();
1116
1117        true
1118    }
1119
1120    /// Sends a new value via the channel, notifying all receivers and returning
1121    /// the previous value in the channel.
1122    ///
1123    /// This can be useful for reusing the buffers inside a watched value.
1124    /// Additionally, this method permits sending values even when there are no
1125    /// receivers.
1126    ///
1127    /// # Examples
1128    ///
1129    /// ```
1130    /// use tokio::sync::watch;
1131    ///
1132    /// let (tx, _rx) = watch::channel(1);
1133    /// assert_eq!(tx.send_replace(2), 1);
1134    /// assert_eq!(tx.send_replace(3), 2);
1135    /// ```
1136    pub fn send_replace(&self, mut value: T) -> T {
1137        // swap old watched value with the new one
1138        self.send_modify(|old| mem::swap(old, &mut value));
1139
1140        value
1141    }
1142
1143    /// Returns a reference to the most recently sent value
1144    ///
1145    /// Outstanding borrows hold a read lock on the inner value. This means that
1146    /// long-lived borrows could cause the producer half to block. It is recommended
1147    /// to keep the borrow as short-lived as possible. Additionally, if you are
1148    /// running in an environment that allows `!Send` futures, you must ensure that
1149    /// the returned `Ref` type is never held alive across an `.await` point,
1150    /// otherwise, it can lead to a deadlock.
1151    ///
1152    /// # Examples
1153    ///
1154    /// ```
1155    /// use tokio::sync::watch;
1156    ///
1157    /// let (tx, _) = watch::channel("hello");
1158    /// assert_eq!(*tx.borrow(), "hello");
1159    /// ```
1160    pub fn borrow(&self) -> Ref<'_, T> {
1161        let inner = self.shared.value.read().unwrap();
1162
1163        // The sender/producer always sees the current version
1164        let has_changed = false;
1165
1166        Ref { inner, has_changed }
1167    }
1168
1169    /// Checks if the channel has been closed. This happens when all receivers
1170    /// have dropped.
1171    ///
1172    /// # Examples
1173    ///
1174    /// ```
1175    /// let (tx, rx) = tokio::sync::watch::channel(());
1176    /// assert!(!tx.is_closed());
1177    ///
1178    /// drop(rx);
1179    /// assert!(tx.is_closed());
1180    /// ```
1181    pub fn is_closed(&self) -> bool {
1182        self.receiver_count() == 0
1183    }
1184
1185    /// Completes when all receivers have dropped.
1186    ///
1187    /// This allows the producer to get notified when interest in the produced
1188    /// values is canceled and immediately stop doing work. Once a channel is
1189    /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1190    /// get a new receiver.
1191    ///
1192    /// If the channel becomes closed for a brief amount of time (e.g., the last
1193    /// receiver is dropped and then `subscribe` is called), then this call to
1194    /// `closed` might return, but it is also possible that it does not "notice"
1195    /// that the channel was closed for a brief amount of time.
1196    ///
1197    /// # Cancel safety
1198    ///
1199    /// This method is cancel safe.
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```
1204    /// use tokio::sync::watch;
1205    ///
1206    /// #[tokio::main]
1207    /// async fn main() {
1208    ///     let (tx, rx) = watch::channel("hello");
1209    ///
1210    ///     tokio::spawn(async move {
1211    ///         // use `rx`
1212    ///         drop(rx);
1213    ///     });
1214    ///
1215    ///     // Waits for `rx` to drop
1216    ///     tx.closed().await;
1217    ///     println!("the `rx` handles dropped")
1218    /// }
1219    /// ```
1220    pub async fn closed(&self) {
1221        crate::trace::async_trace_leaf().await;
1222
1223        while self.receiver_count() > 0 {
1224            let notified = self.shared.notify_tx.notified();
1225
1226            if self.receiver_count() == 0 {
1227                return;
1228            }
1229
1230            notified.await;
1231            // The channel could have been reopened in the meantime by calling
1232            // `subscribe`, so we loop again.
1233        }
1234    }
1235
1236    /// Creates a new [`Receiver`] connected to this `Sender`.
1237    ///
1238    /// All messages sent before this call to `subscribe` are initially marked
1239    /// as seen by the new `Receiver`.
1240    ///
1241    /// This method can be called even if there are no other receivers. In this
1242    /// case, the channel is reopened.
1243    ///
1244    /// # Examples
1245    ///
1246    /// The new channel will receive messages sent on this `Sender`.
1247    ///
1248    /// ```
1249    /// use tokio::sync::watch;
1250    ///
1251    /// #[tokio::main]
1252    /// async fn main() {
1253    ///     let (tx, _rx) = watch::channel(0u64);
1254    ///
1255    ///     tx.send(5).unwrap();
1256    ///
1257    ///     let rx = tx.subscribe();
1258    ///     assert_eq!(5, *rx.borrow());
1259    ///
1260    ///     tx.send(10).unwrap();
1261    ///     assert_eq!(10, *rx.borrow());
1262    /// }
1263    /// ```
1264    ///
1265    /// The most recent message is considered seen by the channel, so this test
1266    /// is guaranteed to pass.
1267    ///
1268    /// ```
1269    /// use tokio::sync::watch;
1270    /// use tokio::time::Duration;
1271    ///
1272    /// #[tokio::main]
1273    /// async fn main() {
1274    ///     let (tx, _rx) = watch::channel(0u64);
1275    ///     tx.send(5).unwrap();
1276    ///     let mut rx = tx.subscribe();
1277    ///
1278    ///     tokio::spawn(async move {
1279    ///         // by spawning and sleeping, the message is sent after `main`
1280    ///         // hits the call to `changed`.
1281    ///         # if false {
1282    ///         tokio::time::sleep(Duration::from_millis(10)).await;
1283    ///         # }
1284    ///         tx.send(100).unwrap();
1285    ///     });
1286    ///
1287    ///     rx.changed().await.unwrap();
1288    ///     assert_eq!(100, *rx.borrow());
1289    /// }
1290    /// ```
1291    pub fn subscribe(&self) -> Receiver<T> {
1292        let shared = self.shared.clone();
1293        let version = shared.state.load().version();
1294
1295        // The CLOSED bit in the state tracks only whether the sender is
1296        // dropped, so we do not need to unset it if this reopens the channel.
1297        Receiver::from_shared(version, shared)
1298    }
1299
1300    /// Returns the number of receivers that currently exist.
1301    ///
1302    /// # Examples
1303    ///
1304    /// ```
1305    /// use tokio::sync::watch;
1306    ///
1307    /// #[tokio::main]
1308    /// async fn main() {
1309    ///     let (tx, rx1) = watch::channel("hello");
1310    ///
1311    ///     assert_eq!(1, tx.receiver_count());
1312    ///
1313    ///     let mut _rx2 = rx1.clone();
1314    ///
1315    ///     assert_eq!(2, tx.receiver_count());
1316    /// }
1317    /// ```
1318    pub fn receiver_count(&self) -> usize {
1319        self.shared.ref_count_rx.load(Relaxed)
1320    }
1321}
1322
1323impl<T> Drop for Sender<T> {
1324    fn drop(&mut self) {
1325        if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1326            self.shared.state.set_closed();
1327            self.shared.notify_rx.notify_waiters();
1328        }
1329    }
1330}
1331
1332// ===== impl Ref =====
1333
1334impl<T> ops::Deref for Ref<'_, T> {
1335    type Target = T;
1336
1337    fn deref(&self) -> &T {
1338        self.inner.deref()
1339    }
1340}
1341
1342#[cfg(all(test, loom))]
1343mod tests {
1344    use futures::future::FutureExt;
1345    use loom::thread;
1346
1347    // test for https://github.com/tokio-rs/tokio/issues/3168
1348    #[test]
1349    fn watch_spurious_wakeup() {
1350        loom::model(|| {
1351            let (send, mut recv) = crate::sync::watch::channel(0i32);
1352
1353            send.send(1).unwrap();
1354
1355            let send_thread = thread::spawn(move || {
1356                send.send(2).unwrap();
1357                send
1358            });
1359
1360            recv.changed().now_or_never();
1361
1362            let send = send_thread.join().unwrap();
1363            let recv_thread = thread::spawn(move || {
1364                recv.changed().now_or_never();
1365                recv.changed().now_or_never();
1366                recv
1367            });
1368
1369            send.send(3).unwrap();
1370
1371            let mut recv = recv_thread.join().unwrap();
1372            let send_thread = thread::spawn(move || {
1373                send.send(2).unwrap();
1374            });
1375
1376            recv.changed().now_or_never();
1377
1378            send_thread.join().unwrap();
1379        });
1380    }
1381
1382    #[test]
1383    fn watch_borrow() {
1384        loom::model(|| {
1385            let (send, mut recv) = crate::sync::watch::channel(0i32);
1386
1387            assert!(send.borrow().eq(&0));
1388            assert!(recv.borrow().eq(&0));
1389
1390            send.send(1).unwrap();
1391            assert!(send.borrow().eq(&1));
1392
1393            let send_thread = thread::spawn(move || {
1394                send.send(2).unwrap();
1395                send
1396            });
1397
1398            recv.changed().now_or_never();
1399
1400            let send = send_thread.join().unwrap();
1401            let recv_thread = thread::spawn(move || {
1402                recv.changed().now_or_never();
1403                recv.changed().now_or_never();
1404                recv
1405            });
1406
1407            send.send(3).unwrap();
1408
1409            let recv = recv_thread.join().unwrap();
1410            assert!(recv.borrow().eq(&3));
1411            assert!(send.borrow().eq(&3));
1412
1413            send.send(2).unwrap();
1414
1415            thread::spawn(move || {
1416                assert!(recv.borrow().eq(&2));
1417            });
1418            assert!(send.borrow().eq(&2));
1419        });
1420    }
1421}