1use crate::hanging_get::error::HangingGetServerError;
6use core::hash::Hash;
7use fuchsia_sync::Mutex;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11pub struct HangingGet<S, O, F: Fn(&S, O) -> bool> {
38 inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
39 subscriber_key_generator: subscriber_key::Generator,
42}
43
44impl<S, O, F> HangingGet<S, O, F>
45where
46 F: Fn(&S, O) -> bool,
47{
48 pub fn new(state: S, notify: F) -> Self {
55 Self {
56 inner: Arc::new(Mutex::new(HangingGetInner::new(Some(state), notify))),
57 subscriber_key_generator: subscriber_key::Generator::default(),
58 }
59 }
60
61 pub fn new_unknown_state(notify: F) -> Self {
72 Self {
73 inner: Arc::new(Mutex::new(HangingGetInner::new(None, notify))),
74 subscriber_key_generator: subscriber_key::Generator::default(),
75 }
76 }
77
78 pub fn new_publisher(&self) -> Publisher<S, O, F> {
80 Publisher { inner: self.inner.clone() }
81 }
82
83 pub fn new_subscriber(&mut self) -> Subscriber<S, O, F> {
89 Subscriber { inner: self.inner.clone(), key: self.subscriber_key_generator.next().unwrap() }
90 }
91}
92
93pub struct Subscriber<S, O, F: Fn(&S, O) -> bool> {
101 inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
102 key: subscriber_key::Key,
103}
104
105impl<S, O, F> Subscriber<S, O, F>
106where
107 F: Fn(&S, O) -> bool,
108{
109 pub fn register(&self, observation: O) -> Result<(), HangingGetServerError> {
117 self.register2(observation).map_err(|_| HangingGetServerError::MultipleObservers)
118 }
119 pub fn register2(&self, observation: O) -> Result<(), O> {
127 self.inner.lock().subscribe(self.key, observation)
128 }
129}
130
131pub struct Publisher<S, O, F: Fn(&S, O) -> bool> {
138 inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
139}
140
141impl<S, O, F: Fn(&S, O) -> bool> Clone for Publisher<S, O, F> {
142 fn clone(&self) -> Self {
146 Self { inner: self.inner.clone() }
147 }
148}
149
150impl<S, O, F> Publisher<S, O, F>
151where
152 F: Fn(&S, O) -> bool,
153{
154 pub fn set(&self, state: S) {
156 self.inner.lock().set(state)
157 }
158
159 pub fn update<UpdateFn>(&self, update: UpdateFn)
169 where
170 UpdateFn: FnOnce(&mut Option<S>) -> bool,
171 {
172 self.inner.lock().update(update)
173 }
174}
175
176pub struct HangingGetInner<S, K, O, F: Fn(&S, O) -> bool> {
188 state: Option<S>,
189 notify: F,
190 observers: HashMap<K, Window<O>>,
191}
192
193impl<S, K, O, F> HangingGetInner<S, K, O, F>
194where
195 K: Eq + Hash,
196 F: Fn(&S, O) -> bool,
197{
198 fn notify_all(&mut self) {
199 for window in self.observers.values_mut() {
200 window.notify(&self.notify, self.state.as_ref().unwrap());
201 }
202 }
203
204 pub fn new(state: Option<S>, notify: F) -> Self {
208 Self { state, notify, observers: HashMap::new() }
209 }
210
211 pub fn set(&mut self, state: S) {
216 self.state = Some(state);
217 self.notify_all();
218 }
219
220 pub fn update(&mut self, state_update: impl FnOnce(&mut Option<S>) -> bool) {
223 if state_update(&mut self.state) {
224 self.notify_all();
225 }
226 }
227
228 pub fn subscribe(&mut self, key: K, observer: O) -> Result<(), O> {
240 let entry = self.observers.entry(key).or_insert_with(Window::new);
241 entry.observe(observer, &self.notify, self.state.as_ref())
242 }
243
244 pub fn unsubscribe(&mut self, key: K) {
247 drop(self.observers.remove(&key));
248 }
249}
250
251struct Window<O> {
254 dirty: bool,
255 observer: Option<O>,
256}
257
258impl<O> Window<O> {
259 pub fn new() -> Self {
261 Window { dirty: true, observer: None }
262 }
263
264 pub fn observe<S>(
270 &mut self,
271 observer: O,
272 f: impl Fn(&S, O) -> bool,
273 current_state: Option<&S>,
274 ) -> Result<(), O> {
275 if self.observer.is_some() {
276 return Err(observer);
277 }
278 self.observer = Some(observer);
279 if let Some(current_state) = current_state {
280 if self.dirty {
281 self.notify(f, current_state);
282 }
283 }
284 Ok(())
285 }
286
287 pub fn notify<S>(&mut self, f: impl Fn(&S, O) -> bool, state: &S) {
291 match self.observer.take() {
292 Some(observer) => {
293 if f(state, observer) {
294 self.dirty = false;
295 }
296 }
297 None => self.dirty = true,
298 }
299 }
300}
301
302mod subscriber_key {
304 pub struct Generator {
306 next: Key,
307 }
308
309 impl Default for Generator {
310 fn default() -> Self {
311 Self { next: Key(0) }
312 }
313 }
314
315 impl Generator {
316 pub fn next(&mut self) -> Option<Key> {
319 let key = self.next.clone();
320 if let Some(next) = self.next.0.checked_add(1) {
321 self.next.0 = next;
322 Some(key)
323 } else {
324 None
325 }
326 }
327 }
328
329 #[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
331 pub struct Key(u64);
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::hanging_get::test_util::TestObserver;
338 use crate::PollExt;
339 use fuchsia_async as fasync;
340 use futures::channel::oneshot;
341
342 #[test]
343 fn subscriber_key_generator_creates_unique_keys() {
344 let mut gen = subscriber_key::Generator::default();
345 let key1 = gen.next();
346 let key2 = gen.next();
347 assert!(key1 != key2);
348 }
349
350 #[test]
351 fn window_add_first_observer_notifies() {
352 let state = 0;
353 let mut window = Window::new();
354 window
355 .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
356 .unwrap();
357 }
358
359 #[test]
360 fn window_none_state_does_not_notify() {
361 let mut window = Window::new();
362 window
363 .observe::<i32>(TestObserver::expect_no_value(), TestObserver::observe, None)
364 .unwrap();
365 }
366
367 #[test]
368 fn window_add_second_observer_does_not_notify() {
369 let state = 0;
370 let mut window = Window::new();
371 window
372 .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
373 .unwrap();
374
375 window
377 .observe(TestObserver::expect_no_value(), TestObserver::observe, Some(&state))
378 .unwrap();
379 }
380
381 #[test]
382 fn window_add_second_observer_notifies_after_notify_call() {
383 let mut state = 0;
384 let mut window = Window::new();
385 window
386 .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
387 .unwrap();
388
389 state = 1;
390 window.notify(TestObserver::observe, &state);
391
392 window
394 .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
395 .unwrap();
396 }
397
398 #[test]
399 fn window_add_multiple_observers_are_notified() {
400 let mut state = 0;
401 let mut window = Window::new();
402 window
403 .observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
404 .unwrap();
405
406 let o1 = TestObserver::expect_value(1);
408 let o2 = TestObserver::expect_no_value();
409 window.observe(o1.clone(), TestObserver::observe, Some(&state)).unwrap();
410 let result = window.observe(o2.clone(), TestObserver::observe, Some(&state));
411 assert_eq!(result, Err(o2));
412 assert!(!o1.has_value());
413 state = 1;
414 window.notify(TestObserver::observe, &state);
415 }
416
417 #[test]
418 fn window_dirty_flag_state() {
419 let state = 0;
420 let mut window = Window::new();
421 let o = TestObserver::expect_value(state);
422 window.observe(o, TestObserver::observe, Some(&state)).unwrap();
423 assert!(window.observer.is_none());
424 assert!(!window.dirty);
425 window.notify(TestObserver::observe, &state);
426 assert!(window.dirty);
427 let o = TestObserver::expect_value(state);
428 window.observe(o, TestObserver::observe, Some(&state)).unwrap();
429 assert!(!window.dirty);
430 }
431
432 #[test]
433 fn window_dirty_flag_respects_consumed_flag() {
434 let state = 0;
435 let mut window = Window::new();
436
437 let o = TestObserver::expect_value(state);
438 window.observe(o, TestObserver::observe_incomplete, Some(&state)).unwrap();
439 assert!(window.dirty);
440 }
441
442 #[test]
443 fn hanging_get_inner_subscribe() {
444 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
445 let o = TestObserver::expect_value(0);
446 assert!(!o.has_value());
447 hanging.subscribe(0, o.clone()).unwrap();
448 }
449
450 #[test]
451 fn hanging_get_inner_subscribe_then_set() {
452 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
453 let o = TestObserver::expect_value(0);
454 hanging.subscribe(0, o.clone()).unwrap();
455
456 hanging.set(1);
458 }
459
460 #[test]
461 fn hanging_get_inner_subscribe_twice_then_set() {
462 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
463 hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
464
465 hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
466 hanging.set(1);
467 }
468
469 #[test]
470 fn hanging_get_inner_subscribe_multiple_then_set() {
471 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
472 hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
473
474 let o2 = TestObserver::expect_value(1);
476 hanging.subscribe(0, o2.clone()).unwrap();
477 assert!(!o2.has_value());
478
479 let _ = hanging.subscribe(0, TestObserver::expect_no_value()).unwrap_err();
481
482 hanging.set(1);
484 }
485
486 #[test]
487 fn hanging_get_inner_subscribe_with_two_clients_then_set() {
488 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
489 hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
490 hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
491 hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
492 hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
493 hanging.set(1);
494 }
495
496 #[test]
497 fn hanging_get_inner_unsubscribe() {
498 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
499 hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
500 hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
501 hanging.unsubscribe(0);
502 hanging.set(1);
503 }
504
505 #[test]
506 fn hanging_get_inner_unsubscribe_one_of_many() {
507 let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
508
509 hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
510 hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
511 hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
512 hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
513
514 hanging.unsubscribe(0);
516 assert!(!hanging.observers.contains_key(&0));
517 assert!(hanging.observers.contains_key(&1));
518 }
519
520 #[test]
521 fn hanging_get_inner_delayed_subscribe() {
522 let mut hanging = HangingGetInner::new(None, TestObserver::<u8>::observe);
523 let o = TestObserver::expect_no_value();
524 assert!(!o.has_value());
525 hanging.subscribe(0, o.clone()).unwrap();
526 }
527
528 #[test]
529 fn hanging_get_inner_delayed_subscribe_then_set() {
530 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
531 let o = TestObserver::expect_value(1);
532 hanging.subscribe(0, o.clone()).unwrap();
533
534 hanging.set(1);
536 }
537
538 #[test]
539 fn hanging_get_inner_delayed_subscribe_twice_then_set() {
540 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
541 hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
542
543 let o = TestObserver::expect_no_value();
545 let result = hanging.subscribe(0, o.clone());
546 assert_eq!(result, Err(o));
547 hanging.set(1);
548 }
549
550 #[test]
551 fn hanging_get_inner_delayed_subscribe_multiple_then_set() {
552 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
553 hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
554
555 let o2 = TestObserver::expect_no_value();
558 let result = hanging.subscribe(0, o2.clone());
559 assert_eq!(result, Err(o2.clone()));
560 assert!(!o2.has_value());
561
562 let o3 = TestObserver::expect_no_value();
564 let result = hanging.subscribe(0, o3.clone());
565 assert_eq!(result, Err(o3));
566
567 hanging.set(1);
569 }
570
571 #[test]
572 fn hanging_get_inner_delayed_subscribe_with_two_clients_then_set() {
573 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
574 hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
575 let o = TestObserver::expect_no_value();
576 let result = hanging.subscribe(0, o.clone());
577 assert_eq!(result, Err(o));
578 hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
579 let o2 = TestObserver::expect_no_value();
580 let result = hanging.subscribe(1, o2.clone());
581 assert_eq!(result, Err(o2));
582 hanging.set(1);
583 }
584
585 #[test]
586 fn hanging_get_inner_delayed_unsubscribe() {
587 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
588 hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
589 let o = TestObserver::expect_no_value();
590 let result = hanging.subscribe(0, o.clone());
591 assert_eq!(result, Err(o));
592 hanging.unsubscribe(0);
593 hanging.set(1);
594 }
595
596 #[test]
597 fn hanging_get_inner_delayed_unsubscribe_one_of_many() {
598 let mut hanging = HangingGetInner::new(None, TestObserver::observe);
599
600 hanging.subscribe(0, TestObserver::<i32>::expect_no_value()).unwrap();
601 let o = TestObserver::expect_no_value();
602 let result = hanging.subscribe(0, o.clone());
603 assert_eq!(result, Err(o));
604 hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
605 let o2 = TestObserver::expect_no_value();
606 let result = hanging.subscribe(1, o2.clone());
607 assert_eq!(result, Err(o2));
608
609 hanging.unsubscribe(0);
611 assert!(!hanging.observers.contains_key(&0));
612 assert!(hanging.observers.contains_key(&1));
613 }
614
615 #[test]
616 fn sync_pub_sub_updates_and_observes() {
617 let mut ex = fasync::TestExecutor::new();
618 let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
619 o.send(s.clone()).map(|()| true).unwrap()
620 });
621 let publisher = broker.new_publisher();
622 let subscriber = broker.new_subscriber();
623
624 let (sender, mut receiver) = oneshot::channel();
626 subscriber.register(sender).unwrap();
627 let observation =
628 ex.run_until_stalled(&mut receiver).expect("received initial observation");
629 assert_eq!(observation, Ok(0));
630
631 let (sender, mut receiver) = oneshot::channel();
633 subscriber.register(sender).unwrap();
634 assert!(ex.run_until_stalled(&mut receiver).is_pending());
635
636 publisher.set(1);
637
638 let observation =
639 ex.run_until_stalled(&mut receiver).expect("received subsequent observation");
640 assert_eq!(observation, Ok(1));
641 }
642
643 #[test]
644 fn sync_pub_sub_multiple_subscribers() {
645 let mut ex = fasync::TestExecutor::new();
646 let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
647 o.send(s.clone()).map(|()| true).unwrap()
648 });
649 let publisher = broker.new_publisher();
650
651 let sub1 = broker.new_subscriber();
652 let sub2 = broker.new_subscriber();
653
654 let (sender, mut receiver) = oneshot::channel();
656 sub1.register(sender).unwrap();
657 let observation =
658 ex.run_until_stalled(&mut receiver).expect("received initial observation");
659 assert_eq!(observation, Ok(0));
660
661 let (sender, mut receiver) = oneshot::channel();
662 sub2.register(sender).unwrap();
663 let observation =
664 ex.run_until_stalled(&mut receiver).expect("received initial observation");
665 assert_eq!(observation, Ok(0));
666
667 let (sender, mut recv1) = oneshot::channel();
669 sub1.register(sender).unwrap();
670 assert!(ex.run_until_stalled(&mut recv1).is_pending());
671
672 let (sender, mut recv2) = oneshot::channel();
673 sub2.register(sender).unwrap();
674 assert!(ex.run_until_stalled(&mut recv2).is_pending());
675
676 publisher.set(1);
677 let obs1 =
678 ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
679 assert_eq!(obs1, Ok(1));
680 let obs2 =
681 ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
682 assert_eq!(obs2, Ok(1));
683 }
684
685 #[test]
686 fn sync_pub_sub_delayed_updates_and_observes() {
687 let mut ex = fasync::TestExecutor::new();
688 let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
689 o.send(s.clone()).map(|()| true).unwrap()
690 });
691 let publisher = broker.new_publisher();
692 let subscriber = broker.new_subscriber();
693
694 let (sender, mut recv1) = oneshot::channel();
696 subscriber.register(sender).unwrap();
697 assert!(ex.run_until_stalled(&mut recv1).is_pending());
698
699 let (sender, mut receiver) = oneshot::channel();
702 assert!(subscriber.register(sender).is_err());
703 assert!(ex.run_until_stalled(&mut receiver).expect("sender closed").is_err());
704
705 publisher.set(1);
707
708 let observation =
709 ex.run_until_stalled(&mut recv1).expect("received subsequent observation");
710 assert_eq!(observation, Ok(1));
711 }
712
713 #[test]
714 fn sync_pub_sub_delayed_multiple_subscribers() {
715 let mut ex = fasync::TestExecutor::new();
716 let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
717 o.send(s.clone()).map(|()| true).unwrap()
718 });
719 let publisher = broker.new_publisher();
720
721 let sub1 = broker.new_subscriber();
722 let sub2 = broker.new_subscriber();
723
724 let (sender, mut recv1) = oneshot::channel();
726 sub1.register(sender).unwrap();
727 assert!(ex.run_until_stalled(&mut recv1).is_pending());
728
729 let (sender, mut recv2) = oneshot::channel();
730 sub2.register(sender).unwrap();
731 assert!(ex.run_until_stalled(&mut recv2).is_pending());
732
733 let (sender, mut recv3) = oneshot::channel();
736 assert!(sub1.register(sender).is_err());
737 assert!(ex.run_until_stalled(&mut recv3).expect("sender 3 closed").is_err());
738
739 let (sender, mut recv4) = oneshot::channel();
740 assert!(sub2.register(sender).is_err());
741 assert!(ex.run_until_stalled(&mut recv4).expect("sender 4 closed").is_err());
742
743 publisher.set(1);
745 let obs1 =
746 ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
747 assert_eq!(obs1, Ok(1));
748 let obs2 =
749 ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
750 assert_eq!(obs2, Ok(1));
751 }
752}