1mod builder;
15mod reactor;
16
17use crate::experimental::clock::Timed;
18
19pub use crate::experimental::event::builder::{SampleDataRecord, sample_data_record};
20pub use crate::experimental::event::reactor::{
21 And, AndChain, Context, Fail, FilterMapDataRecord, Inspect, IntoReactor, MapError, MapResponse,
22 Or, OrChain, Reactor, Respond, Then, ThenChain, WithState, and, fail, filter_map_data_record,
23 map_data_record, map_state, on_data_record, or, respond, then, with_state,
24};
25
26pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
28 fn react_to_data_record(&mut self, record: T) -> Result<Self::Response, Self::Error>
37 where
38 S: Default,
39 {
40 self.react(Timed::now(DataEvent { record }.into()), Context::from_state(&mut S::default()))
41 }
42}
43
44impl<R, T> ReactorExt<T> for R where R: Reactor<T, ()> {}
45
46impl<T> Timed<Event<T>> {
47 pub(crate) fn to_timed_sample(&self) -> Option<Timed<T>>
48 where
49 T: Clone,
50 {
51 self.clone()
52 .map(|event| match event {
53 Event::Data(DataEvent { record, .. }) => Some(record),
54 _ => None,
55 })
56 .transpose()
57 }
58
59 pub fn as_data_record(&self) -> Option<&T> {
60 self.inner().as_data_record()
61 }
62
63 pub fn map_data_record<U, F>(self, f: F) -> Timed<Event<U>>
64 where
65 F: FnOnce(T) -> U,
66 {
67 self.map(move |event| event.map_data_record(f))
68 }
69
70 pub fn filter_map_data_record<U, F>(self, f: F) -> Option<Timed<Event<U>>>
71 where
72 F: FnOnce(T) -> Option<U>,
73 {
74 self.map_data_record(f).map(Event::transpose).transpose()
75 }
76}
77
78#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
83pub enum Event<T> {
84 System(SystemEvent),
85 Data(DataEvent<T>),
86}
87
88impl<T> Event<T> {
89 pub fn from_data_record(record: T) -> Self {
90 Event::Data(DataEvent { record })
91 }
92
93 pub fn map_data_record<U, F>(self, f: F) -> Event<U>
94 where
95 F: FnOnce(T) -> U,
96 {
97 match self {
98 Event::System(event) => Event::System(event),
99 Event::Data(event) => Event::Data(event.map(f)),
100 }
101 }
102
103 pub fn as_data_record(&self) -> Option<&T> {
104 match self {
105 Event::System(_) => None,
106 Event::Data(ref event) => Some(&event.record),
107 }
108 }
109}
110
111impl<T> Event<Option<T>> {
112 pub fn transpose(self) -> Option<Event<T>> {
113 match self {
114 Event::System(event) => Some(Event::System(event)),
115 Event::Data(event) => event.record.map(|record| Event::Data(DataEvent { record })),
116 }
117 }
118}
119
120impl<T> From<SystemEvent> for Event<T> {
121 fn from(event: SystemEvent) -> Self {
122 Event::System(event)
123 }
124}
125
126impl<T> From<DataEvent<T>> for Event<T> {
127 fn from(event: DataEvent<T>) -> Self {
128 Event::Data(event)
129 }
130}
131
132#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
142pub enum SystemEvent {
143 Suspend(SuspendEvent),
144}
145
146impl From<SuspendEvent> for SystemEvent {
147 fn from(event: SuspendEvent) -> Self {
148 SystemEvent::Suspend(event)
149 }
150}
151
152#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
154pub enum SuspendEvent {
155 Sleep,
163 Wake,
167}
168
169#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
171pub struct DataEvent<T> {
172 pub record: T,
182}
183
184impl<T> DataEvent<T> {
185 pub fn map<U, F>(self, f: F) -> DataEvent<U>
186 where
187 F: FnOnce(T) -> U,
188 {
189 DataEvent { record: f(self.record) }
190 }
191}
192
193#[cfg(test)]
194pub(crate) mod harness {
195 use fuchsia_async as fasync;
196 use fuchsia_inspect::{Inspector, Node};
197 use std::fmt::Debug;
198 use std::marker::PhantomData;
199
200 use crate::experimental::clock::Timed;
201 use crate::experimental::event::{self, Context, Event, Reactor};
202 use crate::experimental::inspect::TimeMatrixClient;
203 use crate::experimental::series::SamplingProfile;
204 use crate::experimental::series::interpolation::LastSample;
205 use crate::experimental::series::statistic::{FoldError, Max, Sum};
206
207 pub const TIME_ZERO: fasync::MonotonicInstant = fasync::MonotonicInstant::from_nanos(0);
208 pub const TIME_ONE_SECOND: fasync::MonotonicInstant =
209 fasync::MonotonicInstant::from_nanos(1_000_000_000);
210
211 pub const TEST_NODE_NAME: &str = "event_test_node";
212
213 pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
214 fn assert_observes_event(
223 self,
224 expected: Event<T>,
225 ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
226 where
227 Self: Sized,
228 T: Clone + Debug + PartialEq,
229 {
230 #[derive(Debug)]
231 struct Assertion<T, R>
232 where
233 T: Debug,
234 {
235 reactor: R,
236 expected: Event<T>,
237 is_observed: bool,
238 }
239
240 impl<T, R> Drop for Assertion<T, R>
241 where
242 T: Debug,
243 {
244 fn drop(&mut self) {
245 assert!(
246 self.is_observed,
247 "reactor never received an expected event before drop: {:?}",
248 self.expected,
249 );
250 }
251 }
252
253 impl<T, S, R> Reactor<T, S> for Assertion<T, R>
254 where
255 T: Debug + PartialEq,
256 R: Reactor<T, S>,
257 {
258 type Response = R::Response;
259 type Error = R::Error;
260
261 fn react(
262 &mut self,
263 event: Timed<Event<T>>,
264 context: Context<'_, S>,
265 ) -> Result<Self::Response, Self::Error> {
266 if &self.expected == event.inner() {
267 self.is_observed = true;
268 }
269 self.reactor.react(event, context)
270 }
271 }
272
273 Assertion { reactor: self, expected, is_observed: false }
274 }
275
276 fn assert_reacts_times(
286 self,
287 n: usize,
288 ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
289 where
290 Self: Sized,
291 {
292 #[derive(Debug)]
293 struct Assertion<T, R> {
294 reactor: R,
295 observed: usize,
296 expected: usize,
297 phantom: PhantomData<fn() -> T>,
298 }
299
300 impl<T, R> Drop for Assertion<T, R> {
301 fn drop(&mut self) {
302 assert!(
303 self.observed == self.expected,
304 "reactor received unexpected number of events on drop: \
305 observed {}, but expected {}",
306 self.observed,
307 self.expected,
308 );
309 }
310 }
311
312 impl<T, S, R> Reactor<T, S> for Assertion<T, R>
313 where
314 R: Reactor<T, S>,
315 {
316 type Response = R::Response;
317 type Error = R::Error;
318
319 fn react(
320 &mut self,
321 event: Timed<Event<T>>,
322 context: Context<'_, S>,
323 ) -> Result<Self::Response, Self::Error> {
324 self.observed =
325 self.observed.checked_add(1).expect("overflow in observed event count");
326 assert!(
327 self.observed <= self.expected,
328 "reactor received unexpected number of events before drop: \
329 observed {}, but expected {}",
330 self.observed,
331 self.expected,
332 );
333 self.reactor.react(event, context)
334 }
335 }
336
337 Assertion { reactor: self, observed: 0, expected: n, phantom: PhantomData }
338 }
339 }
340
341 impl<T, S, R> ReactorExt<T, S> for R where R: Reactor<T, S> {}
342
343 #[derive(Clone, Copy, Debug)]
345 pub struct TxCount {
346 pub failed: u64,
347 pub retried: u64,
348 }
349
350 pub fn executor_at_time_zero() -> fasync::TestExecutor {
352 let executor = fasync::TestExecutor::new_with_fake_time();
353 executor.set_fake_time(TIME_ZERO);
354 executor
355 }
356
357 pub fn inspector_and_test_node() -> (Inspector, Node) {
359 let inspector = Inspector::default();
360 let node = inspector.root().create_child(TEST_NODE_NAME);
361 (inspector, node)
362 }
363
364 pub fn sample_tx_count<'client, 'record>(
368 client: &'client TimeMatrixClient,
369 ) -> impl Reactor<&'record TxCount, (), Response = (), Error = FoldError> {
370 event::on_data_record::<&TxCount, _>(event::then((
371 event::map_data_record(
372 |count: &TxCount, _| count.failed,
373 event::then((
374 event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
375 &client,
376 "tx_failed_sum",
377 SamplingProfile::granular(),
378 LastSample::or(0u64),
379 ),
380 event::sample_data_record(Max::<u64>::default()).in_time_matrix::<LastSample>(
381 &client,
382 "tx_failed_max",
383 SamplingProfile::granular(),
384 LastSample::or(0u64),
385 ),
386 )),
387 ),
388 event::map_data_record(
389 |count: &TxCount, _| count.retried,
390 event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
391 &client,
392 "tx_retried_sum",
393 SamplingProfile::granular(),
394 LastSample::or(0u64),
395 ),
396 ),
397 )))
398 }
399
400 pub const fn respond(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
402 Ok(())
403 }
404
405 pub const fn fail(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
407 Err(())
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
414
415 use crate::experimental::clock::Timed;
416 use crate::experimental::event::harness::{self, ReactorExt as _};
417 use crate::experimental::event::{
418 self, Context, DataEvent, Event, Reactor, ReactorExt as _, SuspendEvent, SystemEvent,
419 };
420 use crate::experimental::inspect::TimeMatrixClient;
421 use crate::experimental::series::SamplingProfile;
422 use crate::experimental::series::interpolation::LastSample;
423 use crate::experimental::series::metadata::BitSetMap;
424 use crate::experimental::series::statistic::{Max, Sum, Union};
425
426 #[test]
427 #[should_panic]
428 fn observes_event_assertion_observes_no_such_event_then_panics() {
429 let _executor = harness::executor_at_time_zero();
430
431 let mut reactor = harness::respond.assert_observes_event(Event::from_data_record(()));
432 let _ = reactor.react(
433 Timed::now(SystemEvent::Suspend(SuspendEvent::Sleep).into()),
434 Context::from_state(&mut ()),
435 );
436 }
437
438 #[test]
439 #[should_panic]
440 fn reacts_times_assertion_reacts_too_few_times_then_panics() {
441 let _executor = harness::executor_at_time_zero();
442
443 let mut reactor = harness::respond.assert_reacts_times(2);
444 let _ = reactor.react_to_data_record(());
445 }
446
447 #[test]
448 #[should_panic]
449 fn reacts_times_assertion_reacts_too_many_times_then_panics() {
450 let _executor = harness::executor_at_time_zero();
451
452 let mut reactor = harness::respond.assert_reacts_times(1);
453 let _ = reactor.react_to_data_record(());
454 let _ = reactor.react_to_data_record(());
455 }
456
457 #[test]
458 fn then_combinator_reacts_then_subsequent_reacts_on_ok_and_err() {
459 let _executor = harness::executor_at_time_zero();
460
461 let mut reactor =
462 harness::respond.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
463 let _ = reactor.react_to_data_record(());
464
465 let mut reactor =
466 harness::fail.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
467 let _ = reactor.react_to_data_record(());
468
469 let mut reactor = event::then((
470 harness::respond.assert_reacts_times(1),
471 harness::fail.assert_reacts_times(1),
472 harness::respond.assert_reacts_times(1),
473 ));
474 let _ = reactor.react_to_data_record(());
475 }
476
477 #[test]
478 fn and_combinator_reacts_then_subsequent_reacts_only_on_ok() {
479 let _executor = harness::executor_at_time_zero();
480
481 let mut reactor =
482 harness::respond.assert_reacts_times(1).and(harness::respond.assert_reacts_times(1));
483 let _ = reactor.react_to_data_record(());
484
485 let mut reactor =
486 harness::fail.assert_reacts_times(1).and(harness::respond.assert_reacts_times(0));
487 let _ = reactor.react_to_data_record(());
488
489 let mut reactor = event::and((
490 harness::respond.assert_reacts_times(1),
491 harness::fail.assert_reacts_times(1),
492 harness::respond.assert_reacts_times(0),
493 ));
494 let _ = reactor.react_to_data_record(());
495 }
496
497 #[test]
498 fn or_combinator_reacts_then_subsequent_reacts_only_on_err() {
499 let _executor = harness::executor_at_time_zero();
500
501 let mut reactor =
502 harness::respond.assert_reacts_times(1).or(harness::respond.assert_reacts_times(0));
503 let _ = reactor.react_to_data_record(());
504
505 let mut reactor =
506 harness::fail.assert_reacts_times(1).or(harness::fail.assert_reacts_times(1));
507 let _ = reactor.react_to_data_record(());
508
509 let mut reactor = event::or((
510 harness::fail.assert_reacts_times(1),
511 harness::respond.assert_reacts_times(1),
512 harness::respond.assert_reacts_times(0),
513 ));
514 let _ = reactor.react_to_data_record(());
515 }
516
517 #[test]
518 fn map_data_record_then_subtree_reacts_to_mapped_record() {
519 let _executor = harness::executor_at_time_zero();
520
521 #[derive(Debug, Eq, PartialEq)]
522 struct Thread {
523 nominal: u128,
524 tpi: u128,
525 }
526
527 let thread = Thread { nominal: 1, tpi: 8 };
528 let mut observed = None;
529 let mut reactor = event::on_data_record::<&Thread, _>(event::map_data_record(
530 |thread: &Thread, _| &thread.tpi,
531 |event: Timed<Event<&u128>>, _: Context<'_, ()>| {
532 let (_, event) = event.into();
533 if let Event::Data(DataEvent { record: tpi, .. }) = event {
534 observed = Some(*tpi);
535 }
536 Ok::<_, ()>(())
537 },
538 ));
539 let _ = reactor.react_to_data_record(&thread);
540 assert_eq!(observed, Some(8));
541 }
542
543 #[test]
544 fn retain_record_with_filter_map_data_record_then_subtree_reacts_to_mapped_record() {
545 const RECORD: i8 = 0;
546
547 let _executor = harness::executor_at_time_zero();
548
549 let mut observed = None;
550 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
551 |_: usize, _| Some(RECORD),
553 |event: Timed<Event<i8>>, _: Context<'_, ()>| {
554 let (_, event) = event.into();
555 if let Event::Data(DataEvent { record, .. }) = event {
556 observed = Some(record);
557 }
558 Ok::<_, ()>(())
559 },
560 ));
561 let _ = reactor.react_to_data_record(0usize);
562 assert_eq!(observed, Some(RECORD));
563 }
564
565 #[test]
566 fn discard_record_with_filter_map_data_record_then_subtree_does_not_react_to_mapped_record() {
567 let _executor = harness::executor_at_time_zero();
568
569 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
570 |_: usize, _| None::<()>,
572 harness::respond.assert_reacts_times(0),
573 ));
574 let _ = reactor.react_to_data_record(0usize);
575 }
576
577 #[test]
580 fn discard_record_with_filter_map_data_record_then_subtree_reacts_to_system_event() {
581 const SYSTEM_EVENT: SystemEvent = SystemEvent::Suspend(SuspendEvent::Sleep);
582
583 let _executor = harness::executor_at_time_zero();
584
585 let mut observed = None;
586 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
587 |_: usize, _| None::<i8>,
589 |event: Timed<Event<i8>>, _: Context<'_, ()>| {
590 let (_, event) = event.into();
591 if let Event::System(event) = event {
592 observed = Some(event);
593 }
594 Ok::<_, ()>(())
595 },
596 ));
597 let _ = reactor.react(Timed::now(SYSTEM_EVENT.into()), Context::from_state(&mut ()));
598 assert_eq!(observed, Some(SYSTEM_EVENT));
600 }
601
602 #[test]
603 fn with_state_then_subtree_reacts_to_state() {
604 let _executor = harness::executor_at_time_zero();
605
606 #[derive(Debug, Eq, PartialEq)]
607 struct ReactorState {
608 n: u128,
609 }
610
611 let mut observed = None;
612 let mut reactor = event::on_data_record::<(), _>(event::with_state(
613 ReactorState { n: 8 },
614 |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
615 observed = Some(context.state.n);
616 Ok::<_, ()>(())
617 },
618 ));
619 let _ = reactor.react_to_data_record(());
620 assert_eq!(observed, Some(8));
621 }
622
623 #[test]
624 fn write_state_then_subtree_reacts_to_written_state() {
625 let _executor = harness::executor_at_time_zero();
626
627 let mut reactor = event::on_data_record::<(), _>(event::with_state(
628 String::from("hello"),
629 event::then((
630 {
631 |_: Timed<Event<()>>, context: Context<'_, String>| {
632 assert_eq!(context.state, "hello");
633 *context.state = String::from("goodbye");
634 Ok::<_, ()>(())
635 }
636 }
637 .assert_reacts_times(1),
638 {
639 |_: Timed<Event<()>>, context: Context<'_, String>| {
640 assert_eq!(context.state, "goodbye");
641 Ok::<_, ()>(())
642 }
643 }
644 .assert_reacts_times(1),
645 )),
646 ));
647 let _ = reactor.react_to_data_record(());
648 }
649
650 #[test]
651 fn map_state_then_subtree_reacts_to_mapped_state() {
652 let _executor = harness::executor_at_time_zero();
653
654 #[derive(Debug, Eq, PartialEq)]
655 struct ReactorState {
656 n: u128,
657 }
658
659 let mut observed = None;
660 let mut reactor = event::on_data_record::<(), _>(event::map_state(
661 |_| ReactorState { n: 8 },
662 |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
663 observed = Some(context.state.n);
664 Ok::<_, ()>(())
665 },
666 ));
667 let _ = reactor.react_to_data_record(());
668 assert_eq!(observed, Some(8));
669 }
670
671 #[test]
672 fn construct_reactor_with_samplers_then_inspect_data_tree_contains_buffers() {
673 let mut executor = harness::executor_at_time_zero();
674 let (inspector, node) = harness::inspector_and_test_node();
675
676 let client = TimeMatrixClient::new(node);
677 let _reactor = harness::sample_tx_count(&client);
678
679 executor.set_fake_time(harness::TIME_ONE_SECOND);
680 assert_data_tree!(
681 @executor executor,
682 inspector,
683 root: contains {
684 event_test_node: {
685 tx_failed_sum: {
686 "type": "gauge",
687 "data": AnyBytesProperty,
688 },
689 tx_failed_max: {
690 "type": "gauge",
691 "data": AnyBytesProperty,
692 },
693 tx_retried_sum: {
694 "type": "gauge",
695 "data": AnyBytesProperty,
696 },
697 },
698 }
699 );
700 }
701
702 #[test]
703 fn construct_reactor_with_metadata_then_inspect_data_tree_contains_metadata() {
704 use Connectivity::Idle;
705
706 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
707 #[repr(u64)]
708 enum Connectivity {
709 Idle = 1 << 0,
710 Disconnected = 1 << 1,
711 Connected = 1 << 2,
712 }
713
714 let mut executor = harness::executor_at_time_zero();
715 let (inspector, node) = harness::inspector_and_test_node();
716
717 let client = TimeMatrixClient::new(node);
718 let _reactor = event::on_data_record::<Connectivity, _>(event::map_data_record(
719 |connectivity, _| connectivity as u64,
720 event::sample_data_record(Union::<u64>::default())
721 .with_metadata(BitSetMap::from_ordered(["idle", "disconnected", "connected"]))
722 .in_time_matrix::<LastSample>(
723 &client,
724 "connectivity",
725 SamplingProfile::granular(),
726 LastSample::or(Idle as u64),
727 ),
728 ));
729
730 executor.set_fake_time(harness::TIME_ONE_SECOND);
731 assert_data_tree!(
732 @executor executor,
733 inspector,
734 root: contains {
735 event_test_node: {
736 connectivity: {
737 "type": "bitset",
738 "data": AnyBytesProperty,
739 metadata: {
740 index: {
741 "0": "idle",
742 "1": "disconnected",
743 "2": "connected",
744 }
745 }
746 },
747 },
748 }
749 );
750 }
751
752 #[test]
753 fn sample_data_record_fields_with_reactor_then_reacts_one_time_with_mapped_fields() {
754 let executor = harness::executor_at_time_zero();
755 let (_inspector, node) = harness::inspector_and_test_node();
756
757 let client = TimeMatrixClient::new(node);
758 let mut reactor = event::on_data_record::<&harness::TxCount, _>(event::then((
759 event::map_data_record(
760 |count: &harness::TxCount, _| count.failed,
761 event::then((
762 event::sample_data_record(Sum::<u64>::default())
763 .in_time_matrix::<LastSample>(
764 &client,
765 "tx_failed_sum",
766 SamplingProfile::granular(),
767 LastSample::or(0u64),
768 )
769 .assert_observes_event(Event::from_data_record(1))
770 .assert_reacts_times(1),
771 event::sample_data_record(Max::<u64>::default())
772 .in_time_matrix::<LastSample>(
773 &client,
774 "tx_failed_max",
775 SamplingProfile::granular(),
776 LastSample::or(0u64),
777 )
778 .assert_observes_event(Event::from_data_record(1))
779 .assert_reacts_times(1),
780 )),
781 ),
782 event::map_data_record(
783 |count: &harness::TxCount, _| count.retried,
784 event::sample_data_record(Sum::<u64>::default())
785 .in_time_matrix::<LastSample>(
786 &client,
787 "tx_retried_sum",
788 SamplingProfile::granular(),
789 LastSample::or(0u64),
790 )
791 .assert_observes_event(Event::from_data_record(3))
792 .assert_reacts_times(1),
793 ),
794 )));
795
796 executor.set_fake_time(harness::TIME_ONE_SECOND);
797 reactor.react_to_data_record(&harness::TxCount { failed: 1, retried: 3 }).unwrap();
798 }
799}