async_utils/hanging_get/
server.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::hanging_get::error::HangingGetServerError;
6use core::hash::Hash;
7use fuchsia_sync::Mutex;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11/// A broker of updates to some state, using the hanging get pattern.
12///
13/// The broker stores a state, and mediates state changes between
14/// [Publisher]s and [Subscriber]s.  Any [Publisher] can change the state at any
15/// time.  When the state is changed, all [Subscriber]s are notified with the new
16/// state.
17///
18/// To follow the hanging get pattern, when a [Subscriber] is first created,
19/// the current state is sent immediately.  Subsequent updates to the same
20/// subscriber are only sent when the state is modified.
21///
22/// * Use [HangingGet::new] to create a new broker for a single state item.
23/// * Use [HangingGet::new_publisher] to create an object that allows you to
24///   update the state.
25/// * Use [HangingGet::new_subscriber] to create an object that monitors for
26///   updates to the state.
27///
28/// ## Type parameters
29///
30/// * `S`: the type of the stored hanging get state.  This is the state that gets
31///   communicated to subscribers.
32/// * `O`: The type of the object used to notify of state change.
33/// * `F`: The type of a function used to send the new state content to an instance
34///   of `O`.  `F` gets passed the content of the new state, the object that it
35///   needs to notify, and is expected to return `true` if the notification was
36///   a success; otherwise it must return `false`.
37pub struct HangingGet<S, O, F: Fn(&S, O) -> bool> {
38    inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
39    /// A [subscriber_key::Key] held by the broker to track the next unique key that the broker can
40    /// assign to a [Subscriber].
41    subscriber_key_generator: subscriber_key::Generator,
42}
43
44impl<S, O, F> HangingGet<S, O, F>
45where
46    F: Fn(&S, O) -> bool,
47{
48    /// Create a new broker.
49    ///
50    /// ## Args:
51    ///
52    /// * `state` is the initial state of the [HangingGet].
53    /// * `notify` is a function to notify observers of state of the state change.
54    pub fn new(state: S, notify: F) -> Self {
55        Self {
56            inner: Arc::new(Mutex::new(HangingGetInner::new(Some(state), notify))),
57            subscriber_key_generator: subscriber_key::Generator::default(),
58        }
59    }
60
61    /// Create a new broker, but delays any subscribers from being notified until the value is
62    /// initialized.
63    ///
64    /// ## Args:
65    ///
66    /// * `notify` is a function to notify observers of state of the state change.
67    ///
68    /// # Disclaimer:
69    /// This initialier is more prone to cause hangs if code is not properly setup. This
70    /// should only be used for patterns where the is no useful default state.
71    pub fn new_unknown_state(notify: F) -> Self {
72        Self {
73            inner: Arc::new(Mutex::new(HangingGetInner::new(None, notify))),
74            subscriber_key_generator: subscriber_key::Generator::default(),
75        }
76    }
77
78    /// Create a new [Publisher] that can make atomic updates to the state value.
79    pub fn new_publisher(&self) -> Publisher<S, O, F> {
80        Publisher { inner: self.inner.clone() }
81    }
82
83    /// Create a new [Subscriber] that represents a single hanging get client.
84    ///
85    /// The newly-created subscriber will be notified with the current state
86    /// immediately.  After the first notification, the subscriber will be
87    /// notified only if the state changes.
88    pub fn new_subscriber(&mut self) -> Subscriber<S, O, F> {
89        Subscriber { inner: self.inner.clone(), key: self.subscriber_key_generator.next().unwrap() }
90    }
91}
92
93/// A `Subscriber` can be used to register observation requests with the `HangingGet`.
94/// These will be notified when the state changes or immediately the first time
95/// a `Subscriber` registers an observation.
96///
97/// ## Type parameters
98///
99/// See [HangingGet] for the explanation of the type parameters `S`, `O`, `F`.
100pub struct Subscriber<S, O, F: Fn(&S, O) -> bool> {
101    inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
102    key: subscriber_key::Key,
103}
104
105impl<S, O, F> Subscriber<S, O, F>
106where
107    F: Fn(&S, O) -> bool,
108{
109    /// Register a new observation.
110    ///
111    /// Errors occur when:
112    /// * A Subscriber attempts to register an observation when there is already an outstanding
113    ///   observation waiting on updates.
114    ///
115    /// Returns `observation` on error.
116    pub fn register(&self, observation: O) -> Result<(), HangingGetServerError> {
117        self.register2(observation).map_err(|_| HangingGetServerError::MultipleObservers)
118    }
119    /// Register a new observation.
120    ///
121    /// Errors occur when:
122    /// * A Subscriber attempts to register an observation when there is already an outstanding
123    ///   observation waiting on updates.
124    ///
125    /// Returns `observation` on error.
126    pub fn register2(&self, observation: O) -> Result<(), O> {
127        self.inner.lock().subscribe(self.key, observation)
128    }
129}
130
131/// A `Publisher` is used to make changes to the state contained within a `HangingGet`.
132/// It is designed to be cheaply cloned and `Send`.
133///
134/// ## Type parameters
135///
136/// See [HangingGet] for the explanation of the generic types `S`, `O`, `F`.
137pub struct Publisher<S, O, F: Fn(&S, O) -> bool> {
138    inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
139}
140
141impl<S, O, F: Fn(&S, O) -> bool> Clone for Publisher<S, O, F> {
142    /// Clones this [Publisher].
143    ///
144    /// It is cheap to clone a [Publisher].
145    fn clone(&self) -> Self {
146        Self { inner: self.inner.clone() }
147    }
148}
149
150impl<S, O, F> Publisher<S, O, F>
151where
152    F: Fn(&S, O) -> bool,
153{
154    /// Set the stored state to `S`. Subscribers will be updated.
155    pub fn set(&self, state: S) {
156        self.inner.lock().set(state)
157    }
158
159    /// Pass a function to the hanging get that can update the hanging get state in place.
160    ///
161    /// Any subscriber that has registered an observer will immediately be notified of the
162    /// update.
163    ///
164    /// ## Type parameters
165    ///
166    /// * `UpdateFn`: an update function: gets passed the new state, and returns `true`
167    ///   if the state has been updated with success.
168    pub fn update<UpdateFn>(&self, update: UpdateFn)
169    where
170        UpdateFn: FnOnce(&mut Option<S>) -> bool,
171    {
172        self.inner.lock().update(update)
173    }
174}
175
176/// A [HangingGetInner] object manages some internal state `S` and notifies observers of type `O`
177/// when their view of the state is outdated.
178///
179/// While it _can_ be used directly, most API consumers will want to use the higher level
180/// [HangingGet] object. `HangingGet` and its companion types provide `Send`
181/// for use from multiple threads or async tasks.
182///
183/// ## Type parameters
184///
185/// * `K` - the Key by which Observers are identified.
186/// * For other type args see [HangingGet].
187pub struct HangingGetInner<S, K, O, F: Fn(&S, O) -> bool> {
188    state: Option<S>,
189    notify: F,
190    observers: HashMap<K, Window<O>>,
191}
192
193impl<S, K, O, F> HangingGetInner<S, K, O, F>
194where
195    K: Eq + Hash,
196    F: Fn(&S, O) -> bool,
197{
198    fn notify_all(&mut self) {
199        for window in self.observers.values_mut() {
200            window.notify(&self.notify, self.state.as_ref().unwrap());
201        }
202    }
203
204    /// Create a new `HangingGetInner`.
205    /// `state` is the initial state of the HangingGetInner
206    /// `notify` is a `Fn` that is used to notify observers of state.
207    pub fn new(state: Option<S>, notify: F) -> Self {
208        Self { state, notify, observers: HashMap::new() }
209    }
210
211    /// Set the internal state value to `state` and notify all observers.
212    /// Note that notification is not conditional on the _value_ set by calling the `set` function.
213    /// Notification will occur regardless of whether the `state` value differs from the value
214    /// currently stored by `HangingGetInner`.
215    pub fn set(&mut self, state: S) {
216        self.state = Some(state);
217        self.notify_all();
218    }
219
220    /// Modify the internal state in-place using the `state_update` function. Notify all
221    /// observers if `state_update` returns true.
222    pub fn update(&mut self, state_update: impl FnOnce(&mut Option<S>) -> bool) {
223        if state_update(&mut self.state) {
224            self.notify_all();
225        }
226    }
227
228    /// Register an observer as a subscriber of the state.
229    ///
230    /// Observers are grouped by key and all observers will the same key are assumed to have
231    /// received the latest state update. If an observer with a previously unseen key subscribes,
232    /// it is immediately notified to the stated. If an observer with a known key subscribes, it
233    /// will only be notified when the state is updated since last sent to an observer with the same
234    /// key. All unresolved observers will be resolved to the same value immediately after the state
235    /// is updated. If there is no stored state, then the notification will be delayed until an
236    /// update is made.
237    ///
238    /// Returns `observer` on error.
239    pub fn subscribe(&mut self, key: K, observer: O) -> Result<(), O> {
240        let entry = self.observers.entry(key).or_insert_with(Window::new);
241        entry.observe(observer, &self.notify, self.state.as_ref())
242    }
243
244    /// Deregister all observers that subscribed with `key`. If an observer is subsequently
245    /// subscribed with the same `key` value, it will be treated as a previously unseen `key`.
246    pub fn unsubscribe(&mut self, key: K) {
247        drop(self.observers.remove(&key));
248    }
249}
250
251/// Window tracks all observers for a given `key` and whether their view of the state is
252/// `dirty` or not.
253struct Window<O> {
254    dirty: bool,
255    observer: Option<O>,
256}
257
258impl<O> Window<O> {
259    /// Create a new `Window` without an `observer` and an initial `dirty` value of `true`.
260    pub fn new() -> Self {
261        Window { dirty: true, observer: None }
262    }
263
264    /// Register a new observer. The observer will be notified immediately if the `Window`
265    /// has a dirty view of the state. The observer will be stored for future notification
266    /// if the `Window` does not have a dirty view.
267    ///
268    /// Returns `Err(observer)` if the window already has an observer.
269    pub fn observe<S>(
270        &mut self,
271        observer: O,
272        f: impl Fn(&S, O) -> bool,
273        current_state: Option<&S>,
274    ) -> Result<(), O> {
275        if self.observer.is_some() {
276            return Err(observer);
277        }
278        self.observer = Some(observer);
279        if let Some(current_state) = current_state {
280            if self.dirty {
281                self.notify(f, current_state);
282            }
283        }
284        Ok(())
285    }
286
287    /// Notify the observer _if and only if_ the `Window` has a dirty view of `state`.
288    /// If an observer was present and notified, the `Window` no longer has a dirty view
289    /// after this method returns.
290    pub fn notify<S>(&mut self, f: impl Fn(&S, O) -> bool, state: &S) {
291        match self.observer.take() {
292            Some(observer) => {
293                if f(state, observer) {
294                    self.dirty = false;
295                }
296            }
297            None => self.dirty = true,
298        }
299    }
300}
301
302/// Submodule used to keep the internals of `Key`s and `Generator`s inaccessable.
303mod subscriber_key {
304    /// Manages the creation and distribution of unique `Key`s
305    pub struct Generator {
306        next: Key,
307    }
308
309    impl Default for Generator {
310        fn default() -> Self {
311            Self { next: Key(0) }
312        }
313    }
314
315    impl Generator {
316        /// Get a unique key.
317        /// Returns `None` if 2^64-2 keys have already been generated.
318        pub fn next(&mut self) -> Option<Key> {
319            let key = self.next.clone();
320            if let Some(next) = self.next.0.checked_add(1) {
321                self.next.0 = next;
322                Some(key)
323            } else {
324                None
325            }
326        }
327    }
328
329    /// An internal per-subscriber key that is intended to be unique.
330    #[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
331    pub struct Key(u64);
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::hanging_get::test_util::TestObserver;
338    use crate::PollExt;
339    use fuchsia_async as fasync;
340    use futures::channel::oneshot;
341
342    #[test]
343    fn subscriber_key_generator_creates_unique_keys() {
344        let mut gen = subscriber_key::Generator::default();
345        let key1 = gen.next();
346        let key2 = gen.next();
347        assert!(key1 != key2);
348    }
349
350    #[test]
351    fn window_add_first_observer_notifies() {
352        let state = 0;
353        let mut window = Window::new();
354        window
355            .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
356            .unwrap();
357    }
358
359    #[test]
360    fn window_none_state_does_not_notify() {
361        let mut window = Window::new();
362        window
363            .observe::<i32>(TestObserver::expect_no_value(), TestObserver::observe, None)
364            .unwrap();
365    }
366
367    #[test]
368    fn window_add_second_observer_does_not_notify() {
369        let state = 0;
370        let mut window = Window::new();
371        window
372            .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
373            .unwrap();
374
375        // Second observer added without updating the value
376        window
377            .observe(TestObserver::expect_no_value(), TestObserver::observe, Some(&state))
378            .unwrap();
379    }
380
381    #[test]
382    fn window_add_second_observer_notifies_after_notify_call() {
383        let mut state = 0;
384        let mut window = Window::new();
385        window
386            .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
387            .unwrap();
388
389        state = 1;
390        window.notify(TestObserver::observe, &state);
391
392        // Second observer added without updating the value
393        window
394            .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
395            .unwrap();
396    }
397
398    #[test]
399    fn window_add_multiple_observers_are_notified() {
400        let mut state = 0;
401        let mut window = Window::new();
402        window
403            .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
404            .unwrap();
405
406        // Second observer added without updating the value
407        let o1 = TestObserver::expect_value(1);
408        let o2 = TestObserver::expect_no_value();
409        window.observe(o1.clone(), TestObserver::observe, Some(&state)).unwrap();
410        let result = window.observe(o2.clone(), TestObserver::observe, Some(&state));
411        assert_eq!(result, Err(o2));
412        assert!(!o1.has_value());
413        state = 1;
414        window.notify(TestObserver::observe, &state);
415    }
416
417    #[test]
418    fn window_dirty_flag_state() {
419        let state = 0;
420        let mut window = Window::new();
421        let o = TestObserver::expect_value(state);
422        window.observe(o, TestObserver::observe, Some(&state)).unwrap();
423        assert!(window.observer.is_none());
424        assert!(!window.dirty);
425        window.notify(TestObserver::observe, &state);
426        assert!(window.dirty);
427        let o = TestObserver::expect_value(state);
428        window.observe(o, TestObserver::observe, Some(&state)).unwrap();
429        assert!(!window.dirty);
430    }
431
432    #[test]
433    fn window_dirty_flag_respects_consumed_flag() {
434        let state = 0;
435        let mut window = Window::new();
436
437        let o = TestObserver::expect_value(state);
438        window.observe(o, TestObserver::observe_incomplete, Some(&state)).unwrap();
439        assert!(window.dirty);
440    }
441
442    #[test]
443    fn hanging_get_inner_subscribe() {
444        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
445        let o = TestObserver::expect_value(0);
446        assert!(!o.has_value());
447        hanging.subscribe(0, o.clone()).unwrap();
448    }
449
450    #[test]
451    fn hanging_get_inner_subscribe_then_set() {
452        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
453        let o = TestObserver::expect_value(0);
454        hanging.subscribe(0, o.clone()).unwrap();
455
456        // No change without a new subscription
457        hanging.set(1);
458    }
459
460    #[test]
461    fn hanging_get_inner_subscribe_twice_then_set() {
462        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
463        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
464
465        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
466        hanging.set(1);
467    }
468
469    #[test]
470    fn hanging_get_inner_subscribe_multiple_then_set() {
471        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
472        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
473
474        // A second subscription with the same client key should not be notified
475        let o2 = TestObserver::expect_value(1);
476        hanging.subscribe(0, o2.clone()).unwrap();
477        assert!(!o2.has_value());
478
479        // A third subscription will queue up along the other waiting observer
480        let _ = hanging.subscribe(0, TestObserver::expect_no_value()).unwrap_err();
481
482        // Set should notify all observers to the change
483        hanging.set(1);
484    }
485
486    #[test]
487    fn hanging_get_inner_subscribe_with_two_clients_then_set() {
488        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
489        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
490        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
491        hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
492        hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
493        hanging.set(1);
494    }
495
496    #[test]
497    fn hanging_get_inner_unsubscribe() {
498        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
499        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
500        hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
501        hanging.unsubscribe(0);
502        hanging.set(1);
503    }
504
505    #[test]
506    fn hanging_get_inner_unsubscribe_one_of_many() {
507        let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
508
509        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
510        hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
511        hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
512        hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
513
514        // Unsubscribe one of the two observers
515        hanging.unsubscribe(0);
516        assert!(!hanging.observers.contains_key(&0));
517        assert!(hanging.observers.contains_key(&1));
518    }
519
520    #[test]
521    fn hanging_get_inner_delayed_subscribe() {
522        let mut hanging = HangingGetInner::new(None, TestObserver::<u8>::observe);
523        let o = TestObserver::expect_no_value();
524        assert!(!o.has_value());
525        hanging.subscribe(0, o.clone()).unwrap();
526    }
527
528    #[test]
529    fn hanging_get_inner_delayed_subscribe_then_set() {
530        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
531        let o = TestObserver::expect_value(1);
532        hanging.subscribe(0, o.clone()).unwrap();
533
534        // Initial value now set.
535        hanging.set(1);
536    }
537
538    #[test]
539    fn hanging_get_inner_delayed_subscribe_twice_then_set() {
540        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
541        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
542
543        // Since the first result is delayed, subscribing again is an error.
544        let o = TestObserver::expect_no_value();
545        let result = hanging.subscribe(0, o.clone());
546        assert_eq!(result, Err(o));
547        hanging.set(1);
548    }
549
550    #[test]
551    fn hanging_get_inner_delayed_subscribe_multiple_then_set() {
552        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
553        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
554
555        // A second subscription with the same client key should fail while the subscription hasn't
556        // been notified
557        let o2 = TestObserver::expect_no_value();
558        let result = hanging.subscribe(0, o2.clone());
559        assert_eq!(result, Err(o2.clone()));
560        assert!(!o2.has_value());
561
562        // A third subscription will also fail for the same reason.
563        let o3 = TestObserver::expect_no_value();
564        let result = hanging.subscribe(0, o3.clone());
565        assert_eq!(result, Err(o3));
566
567        // Set should notify all observers to the change
568        hanging.set(1);
569    }
570
571    #[test]
572    fn hanging_get_inner_delayed_subscribe_with_two_clients_then_set() {
573        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
574        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
575        let o = TestObserver::expect_no_value();
576        let result = hanging.subscribe(0, o.clone());
577        assert_eq!(result, Err(o));
578        hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
579        let o2 = TestObserver::expect_no_value();
580        let result = hanging.subscribe(1, o2.clone());
581        assert_eq!(result, Err(o2));
582        hanging.set(1);
583    }
584
585    #[test]
586    fn hanging_get_inner_delayed_unsubscribe() {
587        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
588        hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
589        let o = TestObserver::expect_no_value();
590        let result = hanging.subscribe(0, o.clone());
591        assert_eq!(result, Err(o));
592        hanging.unsubscribe(0);
593        hanging.set(1);
594    }
595
596    #[test]
597    fn hanging_get_inner_delayed_unsubscribe_one_of_many() {
598        let mut hanging = HangingGetInner::new(None, TestObserver::observe);
599
600        hanging.subscribe(0, TestObserver::<i32>::expect_no_value()).unwrap();
601        let o = TestObserver::expect_no_value();
602        let result = hanging.subscribe(0, o.clone());
603        assert_eq!(result, Err(o));
604        hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
605        let o2 = TestObserver::expect_no_value();
606        let result = hanging.subscribe(1, o2.clone());
607        assert_eq!(result, Err(o2));
608
609        // Unsubscribe one of the two observers
610        hanging.unsubscribe(0);
611        assert!(!hanging.observers.contains_key(&0));
612        assert!(hanging.observers.contains_key(&1));
613    }
614
615    #[test]
616    fn sync_pub_sub_updates_and_observes() {
617        let mut ex = fasync::TestExecutor::new();
618        let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
619            o.send(s.clone()).map(|()| true).unwrap()
620        });
621        let publisher = broker.new_publisher();
622        let subscriber = broker.new_subscriber();
623
624        // Initial observation is immediate
625        let (sender, mut receiver) = oneshot::channel();
626        subscriber.register(sender).unwrap();
627        let observation =
628            ex.run_until_stalled(&mut receiver).expect("received initial observation");
629        assert_eq!(observation, Ok(0));
630
631        // Subsequent observations do not happen until after an update
632        let (sender, mut receiver) = oneshot::channel();
633        subscriber.register(sender).unwrap();
634        assert!(ex.run_until_stalled(&mut receiver).is_pending());
635
636        publisher.set(1);
637
638        let observation =
639            ex.run_until_stalled(&mut receiver).expect("received subsequent observation");
640        assert_eq!(observation, Ok(1));
641    }
642
643    #[test]
644    fn sync_pub_sub_multiple_subscribers() {
645        let mut ex = fasync::TestExecutor::new();
646        let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
647            o.send(s.clone()).map(|()| true).unwrap()
648        });
649        let publisher = broker.new_publisher();
650
651        let sub1 = broker.new_subscriber();
652        let sub2 = broker.new_subscriber();
653
654        // Initial observation for subscribers is immediate
655        let (sender, mut receiver) = oneshot::channel();
656        sub1.register(sender).unwrap();
657        let observation =
658            ex.run_until_stalled(&mut receiver).expect("received initial observation");
659        assert_eq!(observation, Ok(0));
660
661        let (sender, mut receiver) = oneshot::channel();
662        sub2.register(sender).unwrap();
663        let observation =
664            ex.run_until_stalled(&mut receiver).expect("received initial observation");
665        assert_eq!(observation, Ok(0));
666
667        // Subsequent observations do not happen until after an update
668        let (sender, mut recv1) = oneshot::channel();
669        sub1.register(sender).unwrap();
670        assert!(ex.run_until_stalled(&mut recv1).is_pending());
671
672        let (sender, mut recv2) = oneshot::channel();
673        sub2.register(sender).unwrap();
674        assert!(ex.run_until_stalled(&mut recv2).is_pending());
675
676        publisher.set(1);
677        let obs1 =
678            ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
679        assert_eq!(obs1, Ok(1));
680        let obs2 =
681            ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
682        assert_eq!(obs2, Ok(1));
683    }
684
685    #[test]
686    fn sync_pub_sub_delayed_updates_and_observes() {
687        let mut ex = fasync::TestExecutor::new();
688        let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
689            o.send(s.clone()).map(|()| true).unwrap()
690        });
691        let publisher = broker.new_publisher();
692        let subscriber = broker.new_subscriber();
693
694        // Initial observation is delayed due to lack of initial state.
695        let (sender, mut recv1) = oneshot::channel();
696        subscriber.register(sender).unwrap();
697        assert!(ex.run_until_stalled(&mut recv1).is_pending());
698
699        // Subsequent registration fails since the original registration is still pending its
700        // observation.
701        let (sender, mut receiver) = oneshot::channel();
702        assert!(subscriber.register(sender).is_err());
703        assert!(ex.run_until_stalled(&mut receiver).expect("sender closed").is_err());
704
705        // Initial observation received now that the initial value is set.
706        publisher.set(1);
707
708        let observation =
709            ex.run_until_stalled(&mut recv1).expect("received subsequent observation");
710        assert_eq!(observation, Ok(1));
711    }
712
713    #[test]
714    fn sync_pub_sub_delayed_multiple_subscribers() {
715        let mut ex = fasync::TestExecutor::new();
716        let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
717            o.send(s.clone()).map(|()| true).unwrap()
718        });
719        let publisher = broker.new_publisher();
720
721        let sub1 = broker.new_subscriber();
722        let sub2 = broker.new_subscriber();
723
724        // Initial observation for subscribers delayed due to lack of initial state.
725        let (sender, mut recv1) = oneshot::channel();
726        sub1.register(sender).unwrap();
727        assert!(ex.run_until_stalled(&mut recv1).is_pending());
728
729        let (sender, mut recv2) = oneshot::channel();
730        sub2.register(sender).unwrap();
731        assert!(ex.run_until_stalled(&mut recv2).is_pending());
732
733        // Subsequent registrations fails since the original registrations are still pending their
734        // observations.
735        let (sender, mut recv3) = oneshot::channel();
736        assert!(sub1.register(sender).is_err());
737        assert!(ex.run_until_stalled(&mut recv3).expect("sender 3 closed").is_err());
738
739        let (sender, mut recv4) = oneshot::channel();
740        assert!(sub2.register(sender).is_err());
741        assert!(ex.run_until_stalled(&mut recv4).expect("sender 4 closed").is_err());
742
743        // Initial observations received now that initial value is set.
744        publisher.set(1);
745        let obs1 =
746            ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
747        assert_eq!(obs1, Ok(1));
748        let obs2 =
749            ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
750        assert_eq!(obs2, Ok(1));
751    }
752}