async_helpers/hanging_get/
async_server.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::responding_channel as responding;
6use async_utils::hanging_get::error::HangingGetServerError;
7use async_utils::stream::{StreamItem, WithEpitaph};
8use core::hash::Hash;
9use futures::channel::mpsc;
10use futures::{select, SinkExt, StreamExt};
11use std::collections::HashMap;
12
13/// Default value that can be passed to `HangingGetBroker::new` by clients.
14/// If passed in, this will be used for all MPSC channels created by the broker.
15pub const DEFAULT_CHANNEL_SIZE: usize = 128;
16
17/// A `Send` wrapper for a `HangingGet` that can receive messages via an async channel.
18/// The `HangingGetBroker` is the primary way of implementing server side hanging get using
19/// this library. It manages all state and reacts to inputs sent over channels.
20///
21/// ### Example Usage:
22///
23/// Assuming some fidl protocol with a hanging get method:
24///
25/// ```fidl
26/// protocol SheepCounter {
27///     /// Returns the current number of sheep that have jumped the fence
28///     /// when that number changes.
29///     WatchCount() -> (uint64 count);
30/// }
31/// ```
32///
33/// A server implementation might include the following:
34///
35/// ```rust
36/// let broker = HangingGetBroker::new(
37///     0u64, // Initial state
38///     |s, o: SheepCounterWatchCountResponder| {
39///         o.send(s.clone()).unwrap();
40///         true
41///     }, // notify function with fidl auto-generated responder
42///     DEFAULT_CHANNEL_SIZE, // Size of channels used by Publishers and Subscribers
43/// );
44///
45/// // Create a new publisher that can be used to publish updates to the state
46/// let mut publisher = broker.new_publisher();
47/// // Create a new registrar that can be used to register subscribers
48/// let mut registrar = broker.new_registrar();
49///
50/// // Spawn broker as an async task that will run until there are not any more
51/// // `SubscriptionRegistrar`, `Publisher`, or `Subscriber` objects that can update the system.
52/// fuchsia_async::Task::spawn(broker.run()).detach();
53///
54/// // Spawn a background task to count sheep
55/// fuchsia_async::Task::spawn(async move {
56///     let interval = fuchsia_async::Interval::new(1.second);
57///     loop {
58///         interval.next.await();
59///         publisher.update(|sheep_count| *sheep_count += 1);
60///     }
61/// }).detach();
62///
63/// // Create a new `ServiceFs` and register SheepCounter fidl service
64/// let mut fs = ServiceFs::new();
65/// fs.dir("svc").add_fidl_service(|s: SheepCounterRequestStream| s);
66///
67/// // SubscriptionRegistrar new client connections sequentially
68/// while let Some(mut stream) = fs.next().await {
69///
70///     // Create a new subscriber associated with this client's request stream
71///     let mut subscriber = registrar.new_subscriber().await.unwrap();
72///
73///     // SubscriptionRegistrar requests from this client by registering new observers
74///     fuchsia_async::Task::spawn(async move {
75///         while let Some(Ok(SheepCounterWatchCountRequest { responder })) = stream.next().await {
76///             subscriber.register(responder).await.unwrap();
77///         }
78///     }).detach();
79/// }
80/// ```
81pub struct HangingGetBroker<S, O: Unpin + 'static, F: Fn(&S, O) -> bool> {
82    inner: HangingGet<S, subscriber_key::Key, O, F>,
83    publisher: Publisher<S>,
84    updates: mpsc::Receiver<UpdateFn<S>>,
85    registrar: SubscriptionRegistrar<O>,
86    subscription_requests: responding::Receiver<(), Subscriber<O>>,
87    /// A `subscriber_key::Key` held by the broker to track the next unique key that the broker can
88    /// assign to a `Subscriber`.
89    subscriber_key_generator: subscriber_key::Generator,
90    channel_size: usize,
91}
92
93impl<S, O, F> HangingGetBroker<S, O, F>
94where
95    S: Clone + Send,
96    O: Send + Unpin + 'static,
97    F: Fn(&S, O) -> bool,
98{
99    /// Create a new broker.
100    /// `state` is the initial state of the HangingGet
101    /// `notify` is a `Fn` that is used to notify observers of state.
102    /// `channel_size` is the maximum queue size of unprocessed messages from an individual object.
103    pub fn new(state: S, notify: F, channel_size: usize) -> Self {
104        let (sender, updates) = mpsc::channel(channel_size);
105        let publisher = Publisher { sender };
106        let (sender, subscription_requests) = responding::channel(channel_size);
107        let registrar = SubscriptionRegistrar { sender };
108        Self {
109            inner: HangingGet::new(state, notify),
110            publisher,
111            updates,
112            registrar,
113            subscription_requests,
114            subscriber_key_generator: subscriber_key::Generator::default(),
115            channel_size,
116        }
117    }
118
119    /// Create a new `Publisher` that can be used to communicate state updates
120    /// with this `HangingGetBroker` from another thread or async task.
121    pub fn new_publisher(&self) -> Publisher<S> {
122        self.publisher.clone()
123    }
124
125    /// Create a new `SubscriptionRegistrar` that can be used to register new subscribers
126    /// with this `HangingGetBroker` from another thread or async task.
127    pub fn new_registrar(&self) -> SubscriptionRegistrar<O> {
128        self.registrar.clone()
129    }
130
131    /// Consume `HangingGetBroker`, returning a Future object that can be polled to drive updates
132    /// to the HangingGet object. The Future completes when there are no remaining
133    /// `SubscriptionRegistrars` for this object.
134    pub async fn run(mut self) {
135        // Drop internally held references to incoming registrar.
136        // They are no longer externally reachable and will prevent the
137        // select! macro from completing if they are not dropped.
138        drop(self.publisher);
139        drop(self.registrar);
140
141        // A combined stream of all active subscribers which yields
142        // `observer` objects from those subscribers as they request
143        // observations.
144        let mut subscriptions = futures::stream::SelectAll::new();
145
146        loop {
147            select! {
148                // An update has been sent to the broker from a `Publisher`.
149                update = self.updates.next() => {
150                    if let Some(update) = update {
151                        self.inner.update(update)
152                    }
153                }
154                // A request for a new subscriber as been requested from a `SubscriptionRegistrar`.
155                subscriber = self.subscription_requests.next() => {
156                    if let Some((_, responder)) = subscriber {
157                        let (sender, receiver) = responding::channel(self.channel_size);
158                        let key = self.subscriber_key_generator.next().unwrap();
159                        if let Ok(()) = responder.respond(sender.into()) {
160                            subscriptions.push(receiver.map(move |o| (key, o)).with_epitaph(key));
161                        }
162                    }
163                }
164                // An observation request has been made by a `Subscriber`.
165                observer = subscriptions.next() => {
166                    match observer {
167                        Some(StreamItem::Item((key, (observer, responder)))) => {
168                            let _ = responder.respond(self.inner.subscribe(key, observer));
169                        },
170                        Some(StreamItem::Epitaph(key)) => {
171                            self.inner.unsubscribe(key);
172                        },
173                        None => (),
174                    }
175                }
176                // There are no live objects that can inject new inputs into the system.
177                complete => break,
178            }
179        }
180    }
181}
182
183/// A cheaply copyable handle that can be used to register new `Subscriber`s with
184/// the `HangingGetBroker`.
185pub struct SubscriptionRegistrar<O> {
186    sender: responding::Sender<(), Subscriber<O>>,
187}
188
189impl<O> Clone for SubscriptionRegistrar<O> {
190    fn clone(&self) -> Self {
191        Self { sender: self.sender.clone() }
192    }
193}
194
195impl<O> SubscriptionRegistrar<O> {
196    /// Register a new subscriber
197    pub async fn new_subscriber(&mut self) -> Result<Subscriber<O>, HangingGetServerError> {
198        Ok(self.sender.request(()).await?)
199    }
200}
201
202/// A `Subscriber` can be used to register observation requests with the `HangingGetBroker`.
203/// These observations will be fulfilled when the state changes or immediately the first time
204/// a `Subscriber` registers an observation.
205pub struct Subscriber<O> {
206    sender: responding::Sender<O, Result<(), HangingGetServerError>>,
207}
208
209impl<O> From<responding::Sender<O, Result<(), HangingGetServerError>>> for Subscriber<O> {
210    fn from(sender: responding::Sender<O, Result<(), HangingGetServerError>>) -> Self {
211        Self { sender }
212    }
213}
214
215impl<O> Subscriber<O> {
216    /// Register a new observation.
217    /// Errors occur when:
218    /// * A `Subscriber` is sending observation requests at too high a rate.
219    /// * The `HangingGetBroker` has been dropped by the server.
220    pub async fn register(&mut self, observation: O) -> Result<(), HangingGetServerError> {
221        self.sender.request(observation).await?
222    }
223}
224
225/// `FnOnce` type that can be used by library consumers to make in-place modifications to
226/// the hanging get state.
227type UpdateFn<S> = Box<dyn FnOnce(&mut S) -> bool + Send + 'static>;
228
229/// A `Publisher` is used to make changes to the state contained within a `HangingGetBroker`.
230/// It is designed to be cheaply cloned and `Send`.
231pub struct Publisher<S> {
232    sender: mpsc::Sender<UpdateFn<S>>,
233}
234
235impl<S> Clone for Publisher<S> {
236    fn clone(&self) -> Self {
237        Publisher { sender: self.sender.clone() }
238    }
239}
240
241impl<S> Publisher<S>
242where
243    S: Send + 'static,
244{
245    /// `set` is a specialized form of `update` that sets the hanging get state to the value
246    /// passed in as the `state` parameter.
247    pub async fn set(&mut self, state: S) -> Result<(), HangingGetServerError> {
248        Ok(self
249            .sender
250            .send(Box::new(move |s| {
251                *s = state;
252                true
253            }))
254            .await?)
255    }
256
257    /// Pass a function to the hanging get that can update the hanging get state in place.
258    /// `update` should return false if the state was not updated.
259    pub async fn update<F>(&mut self, update: F) -> Result<(), HangingGetServerError>
260    where
261        F: FnOnce(&mut S) -> bool + Send + 'static,
262    {
263        Ok(self.sender.send(Box::new(update)).await?)
264    }
265}
266
267/// *Deprecated*: New code should use [`async_utils::hanging_get::server::HangingGet`].
268///
269/// TODO(<https://fxbug.dev/42055741>): Remove this struct.
270///
271/// A `HangingGet` object manages some internal state `S` and notifies observers of type `O`
272/// when their view of the state is outdated.
273///
274/// `S` - the type of State to be watched
275/// `O` - the type of Observers of `S`
276/// `F` - the type of observer notification behavior, where `F: Fn(&S, O)`
277/// `K` - the Key by which Observers are identified
278///
279/// While it _can_ be used directly, most API consumers will want to use the higher level
280/// `HangingGetBroker` object. `HangingGetBroker` and its companion types provide `Send`
281/// for use from multiple threads or async tasks.
282pub struct HangingGet<S, K, O, F: Fn(&S, O) -> bool> {
283    state: S,
284    notify: F,
285    observers: HashMap<K, Window<O>>,
286}
287
288impl<S, K, O, F> HangingGet<S, K, O, F>
289where
290    K: Eq + Hash,
291    F: Fn(&S, O) -> bool,
292{
293    fn notify_all(&mut self) {
294        for window in self.observers.values_mut() {
295            window.notify(&self.notify, &self.state);
296        }
297    }
298
299    /// Create a new `HangingGet`.
300    /// `state` is the initial state of the HangingGet
301    /// `notify` is a `Fn` that is used to notify observers of state.
302    pub fn new(state: S, notify: F) -> Self {
303        Self { state, notify, observers: HashMap::new() }
304    }
305
306    /// Set the internal state value to `state` and notify all observers.
307    /// Note that notification is not conditional on the _value_ set by calling the `set` function.
308    /// Notification will occur regardless of whether the `state` value differs from the value
309    /// currently stored by `HangingGet`.
310    pub fn set(&mut self, state: S) {
311        self.state = state;
312        self.notify_all();
313    }
314
315    /// Modify the internal state in-place using the `state_update` function. Notify all
316    /// observers if `state_update` returns true.
317    pub fn update(&mut self, state_update: impl FnOnce(&mut S) -> bool) {
318        if state_update(&mut self.state) {
319            self.notify_all();
320        }
321    }
322
323    /// Register an observer as a subscriber of the state. Observers are grouped by key and
324    /// all observers will the same key are assumed to have received the latest state update.
325    /// If an observer with a previously unseen key subscribes, it is immediately notified
326    /// to the stated. If an observer with a known key subscribes, it will only be
327    /// notified when the state is updated since last sent to an observer with the same
328    /// key. All unresolved observers will be resolved to the same value immediately after the state
329    /// is updated.
330    pub fn subscribe(&mut self, key: K, observer: O) -> Result<(), HangingGetServerError> {
331        self.observers.entry(key).or_insert_with(Window::new).observe(
332            observer,
333            &self.notify,
334            &self.state,
335        )
336    }
337
338    /// Deregister all observers that subscribed with `key`. If an observer is subsequently
339    /// subscribed with the same `key` value, it will be treated as a previously unseen `key`.
340    pub fn unsubscribe(&mut self, key: K) {
341        drop(self.observers.remove(&key));
342    }
343}
344
345/// Window tracks all observers for a given `key` and whether their view of the state is
346/// `dirty` or not.
347struct Window<O> {
348    dirty: bool,
349    observer: Option<O>,
350}
351
352impl<O> Window<O> {
353    /// Create a new `Window` without an `observer` and an initial `dirty` value of `true`.
354    pub fn new() -> Self {
355        Window { dirty: true, observer: None }
356    }
357
358    /// Register a new observer. The observer will be notified immediately if the `Window`
359    /// has a dirty view of the state. The observer will be stored for future notification
360    /// if the `Window` does not have a dirty view.
361    pub fn observe<S>(
362        &mut self,
363        observer: O,
364        f: impl Fn(&S, O) -> bool,
365        current_state: &S,
366    ) -> Result<(), HangingGetServerError> {
367        if self.observer.is_some() {
368            return Err(HangingGetServerError::MultipleObservers);
369        }
370        self.observer = Some(observer);
371        if self.dirty {
372            self.notify(f, current_state);
373        }
374        Ok(())
375    }
376
377    /// Notify the observer _if and only if_ the `Window` has a dirty view of `state`.
378    /// If an observer was present and notified, the `Window` no longer has a dirty view
379    /// after this method returns.
380    pub fn notify<S>(&mut self, f: impl Fn(&S, O) -> bool, state: &S) {
381        match self.observer.take() {
382            Some(observer) => {
383                if f(state, observer) {
384                    self.dirty = false;
385                }
386            }
387            None => self.dirty = true,
388        }
389    }
390}
391
392/// Submodule used to keep the internals of `Key`s and `Generator`s inaccessable.
393mod subscriber_key {
394    /// Manages the creation and distribution of unique `Key`s
395    pub struct Generator {
396        next: Key,
397    }
398
399    impl Default for Generator {
400        fn default() -> Self {
401            Self { next: Key(0) }
402        }
403    }
404
405    impl Generator {
406        /// Get a unique key.
407        /// Returns `None` if 2^64-2 keys have already been generated.
408        pub fn next(&mut self) -> Option<Key> {
409            let key = self.next.clone();
410            if let Some(next) = self.next.0.checked_add(1) {
411                self.next.0 = next;
412                Some(key)
413            } else {
414                None
415            }
416        }
417    }
418
419    /// An internal per-subscriber key that is intended to be unique.
420    #[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
421    pub struct Key(u64);
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use async_utils::hanging_get::test_util::TestObserver;
428    use fuchsia_async as fasync;
429    use futures::channel::oneshot;
430    use std::pin::pin;
431    use std::task::Poll;
432
433    const TEST_CHANNEL_SIZE: usize = 128;
434
435    #[test]
436    fn subscriber_key_generator_creates_unique_keys() {
437        let mut gen = subscriber_key::Generator::default();
438        let key1 = gen.next();
439        let key2 = gen.next();
440        assert!(key1 != key2);
441    }
442
443    #[test]
444    fn window_add_first_observer_notifies() {
445        let state = 0;
446        let mut window = Window::new();
447        window.observe(TestObserver::expect_value(state), TestObserver::observe, &state).unwrap();
448    }
449
450    #[test]
451    fn window_add_second_observer_does_not_notify() {
452        let state = 0;
453        let mut window = Window::new();
454        window.observe(TestObserver::expect_value(state), TestObserver::observe, &state).unwrap();
455
456        // Second observer added without updating the value
457        window.observe(TestObserver::expect_no_value(), TestObserver::observe, &state).unwrap();
458    }
459
460    #[test]
461    fn window_add_second_observer_notifies_after_notify_call() {
462        let mut state = 0;
463        let mut window = Window::new();
464        window.observe(TestObserver::expect_value(state), TestObserver::observe, &state).unwrap();
465
466        state = 1;
467        window.notify(TestObserver::observe, &state);
468
469        // Second observer added without updating the value
470        window.observe(TestObserver::expect_value(state), TestObserver::observe, &state).unwrap();
471    }
472
473    #[test]
474    fn window_add_multiple_observers_are_notified() {
475        let mut state = 0;
476        let mut window = Window::new();
477        window.observe(TestObserver::expect_value(state), TestObserver::observe, &state).unwrap();
478
479        // Second observer added without updating the value
480        let o1 = TestObserver::expect_value(1);
481        let o2 = TestObserver::expect_no_value();
482        window.observe(o1.clone(), TestObserver::observe, &state).unwrap();
483        let result = window.observe(o2.clone(), TestObserver::observe, &state);
484        assert_eq!(result.unwrap_err(), HangingGetServerError::MultipleObservers);
485        assert!(!o1.has_value());
486        state = 1;
487        window.notify(TestObserver::observe, &state);
488    }
489
490    #[test]
491    fn window_dirty_flag_state() {
492        let state = 0;
493        let mut window = Window::new();
494        let o = TestObserver::expect_value(state);
495        window.observe(o, TestObserver::observe, &state).unwrap();
496        assert!(window.observer.is_none());
497        assert!(!window.dirty);
498        window.notify(TestObserver::observe, &state);
499        assert!(window.dirty);
500        let o = TestObserver::expect_value(state);
501        window.observe(o, TestObserver::observe, &state).unwrap();
502        assert!(!window.dirty);
503    }
504
505    #[test]
506    fn window_dirty_flag_respects_consumed_flag() {
507        let state = 0;
508        let mut window = Window::new();
509
510        let o = TestObserver::expect_value(state);
511        window.observe(o, TestObserver::observe_incomplete, &state).unwrap();
512        assert!(window.dirty);
513    }
514
515    #[test]
516    fn hanging_get_subscribe() {
517        let mut hanging = HangingGet::new(0, TestObserver::observe);
518        let o = TestObserver::expect_value(0);
519        assert!(!o.has_value());
520        hanging.subscribe(0, o.clone()).unwrap();
521    }
522
523    #[test]
524    fn hanging_get_subscribe_then_set() {
525        let mut hanging = HangingGet::new(0, TestObserver::observe);
526        let o = TestObserver::expect_value(0);
527        hanging.subscribe(0, o.clone()).unwrap();
528
529        // No change without a new subscription
530        hanging.set(1);
531    }
532
533    #[test]
534    fn hanging_get_subscribe_twice_then_set() {
535        let mut hanging = HangingGet::new(0, TestObserver::observe);
536        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
537
538        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
539        hanging.set(1);
540    }
541
542    #[test]
543    fn hanging_get_subscribe_multiple_then_set() {
544        let mut hanging = HangingGet::new(0, TestObserver::observe);
545        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
546
547        // A second subscription with the same client key should not be notified
548        let o2 = TestObserver::expect_value(1);
549        hanging.subscribe(0, o2.clone()).unwrap();
550        assert!(!o2.has_value());
551
552        // A third subscription will queue up along the other waiting observer
553        let _ = hanging.subscribe(0, TestObserver::expect_no_value()).unwrap_err();
554
555        // Set should notify all observers to the change
556        hanging.set(1);
557    }
558
559    #[test]
560    fn hanging_get_subscribe_with_two_clients_then_set() {
561        let mut hanging = HangingGet::new(0, TestObserver::observe);
562        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
563        hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
564        hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
565        hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
566        hanging.set(1);
567    }
568
569    #[test]
570    fn hanging_get_unsubscribe() {
571        let mut hanging = HangingGet::new(0, TestObserver::observe);
572        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
573        hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
574        hanging.unsubscribe(0);
575        hanging.set(1);
576    }
577
578    #[test]
579    fn hanging_get_unsubscribe_one_of_many() {
580        let mut hanging = HangingGet::new(0, TestObserver::observe);
581
582        hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
583        hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
584        hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
585        hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
586
587        // Unsubscribe one of the two observers
588        hanging.unsubscribe(0);
589        assert!(!hanging.observers.contains_key(&0));
590        assert!(hanging.observers.contains_key(&1));
591    }
592
593    #[fasync::run_until_stalled(test)]
594    async fn publisher_set_value() {
595        let (sender, mut receiver) = mpsc::channel(128);
596        let mut p = Publisher { sender };
597        let mut value = 1i32;
598        p.set(2i32).await.unwrap();
599        let f = receiver.next().await.unwrap();
600        assert_eq!(true, f(&mut value));
601        assert_eq!(value, 2);
602    }
603
604    #[fasync::run_until_stalled(test)]
605    async fn publisher_update_value() {
606        let (sender, mut receiver) = mpsc::channel(128);
607        let mut p = Publisher { sender };
608        let mut value = 1i32;
609        p.update(|v| {
610            *v += 1;
611            true
612        })
613        .await
614        .unwrap();
615        let f = receiver.next().await.unwrap();
616        assert_eq!(true, f(&mut value));
617        assert_eq!(value, 2);
618    }
619
620    #[test]
621    fn pub_sub_empty_completes() {
622        let mut ex = fasync::TestExecutor::new();
623        let broker = HangingGetBroker::new(
624            0i32,
625            |s, o: oneshot::Sender<_>| o.send(s.clone()).map(|()| true).unwrap(),
626            TEST_CHANNEL_SIZE,
627        );
628        let publisher = broker.new_publisher();
629        let registrar = broker.new_registrar();
630        let broker_future = broker.run();
631        let mut broker_future = pin!(broker_future);
632
633        // Broker future is still pending when registrars are live.
634        assert_eq!(ex.run_until_stalled(&mut broker_future), Poll::Pending);
635
636        drop(publisher);
637        drop(registrar);
638
639        // Broker future completes when registrars are dropped.
640        assert_eq!(ex.run_until_stalled(&mut broker_future), Poll::Ready(()));
641    }
642
643    #[fasync::run_until_stalled(test)]
644    async fn pub_sub_updates_and_observes() {
645        let broker = HangingGetBroker::new(
646            0i32,
647            |s, o: oneshot::Sender<_>| o.send(s.clone()).map(|()| true).unwrap(),
648            TEST_CHANNEL_SIZE,
649        );
650        let mut publisher = broker.new_publisher();
651        let mut registrar = broker.new_registrar();
652        let fut = async move {
653            let mut subscriber = registrar.new_subscriber().await.unwrap();
654
655            // Initial observation is immediate
656            let (sender, receiver) = oneshot::channel();
657            subscriber.register(sender).await.unwrap();
658            assert_eq!(receiver.await.unwrap(), 0);
659
660            // Subsequent observations do not happen until after an update
661            let (sender, mut receiver) = oneshot::channel();
662            subscriber.register(sender).await.unwrap();
663            assert!(receiver.try_recv().unwrap().is_none());
664            publisher.set(1).await.unwrap();
665            assert_eq!(receiver.await.unwrap(), 1);
666        };
667
668        // Broker future will complete when `fut` has complete
669        futures::join!(fut, broker.run());
670    }
671
672    #[fasync::run_until_stalled(test)]
673    async fn pub_sub_multiple_subscribers() {
674        let broker = HangingGetBroker::new(
675            0i32,
676            |s, o: oneshot::Sender<_>| o.send(s.clone()).map(|()| true).unwrap(),
677            TEST_CHANNEL_SIZE,
678        );
679        let mut publisher = broker.new_publisher();
680        let mut registrar = broker.new_registrar();
681        let fut = async move {
682            let mut sub1 = registrar.new_subscriber().await.unwrap();
683            let mut sub2 = registrar.new_subscriber().await.unwrap();
684
685            // Initial observation for subscribers is immediate
686            let (sender, receiver) = oneshot::channel();
687            sub1.register(sender).await.unwrap();
688            assert_eq!(receiver.await.unwrap(), 0);
689
690            let (sender, receiver) = oneshot::channel();
691            sub2.register(sender).await.unwrap();
692            assert_eq!(receiver.await.unwrap(), 0);
693
694            // Subsequent observations do not happen until after an update
695            let (sender, mut recv1) = oneshot::channel();
696            sub1.register(sender).await.unwrap();
697            assert!(recv1.try_recv().unwrap().is_none());
698
699            let (sender, mut recv2) = oneshot::channel();
700            sub2.register(sender).await.unwrap();
701            assert!(recv2.try_recv().unwrap().is_none());
702
703            publisher.set(1).await.unwrap();
704            assert_eq!(recv1.await.unwrap(), 1);
705            assert_eq!(recv2.await.unwrap(), 1);
706        };
707
708        // Broker future will complete when `fut` has complete
709        futures::join!(fut, broker.run());
710    }
711}