1mod builder;
15mod reactor;
16
17use crate::experimental::clock::Timed;
18
19pub use crate::experimental::event::builder::{SampleDataRecord, sample_data_record};
20pub use crate::experimental::event::reactor::{
21 And, AndChain, Context, Fail, FilterMapDataRecord, Inspect, IntoReactor, MapError, MapResponse,
22 Or, OrChain, Reactor, Respond, Then, ThenChain, WithState, and, fail, filter_map_data_record,
23 map_data_record, map_state, on_data_record, or, respond, then, with_state,
24};
25
26pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
28 fn react_to_data_record(&mut self, record: T) -> Result<Self::Response, Self::Error>
37 where
38 S: Default,
39 {
40 self.react(Timed::now(DataEvent { record }.into()), Context::from_state(&mut S::default()))
41 }
42}
43
44impl<R, T> ReactorExt<T> for R where R: Reactor<T, ()> {}
45
46impl<T> Timed<Event<T>> {
47 pub(crate) fn to_timed_sample(&self) -> Option<Timed<T>>
48 where
49 T: Clone,
50 {
51 self.clone()
52 .map(|event| match event {
53 Event::Data(DataEvent { record, .. }) => Some(record),
54 _ => None,
55 })
56 .transpose()
57 }
58
59 pub fn as_data_record(&self) -> Option<&T> {
60 self.inner().as_data_record()
61 }
62
63 pub fn map_data_record<U, F>(self, f: F) -> Timed<Event<U>>
64 where
65 F: FnOnce(T) -> U,
66 {
67 self.map(move |event| event.map_data_record(f))
68 }
69
70 pub fn filter_map_data_record<U, F>(self, f: F) -> Option<Timed<Event<U>>>
71 where
72 F: FnOnce(T) -> Option<U>,
73 {
74 self.map_data_record(f).map(Event::transpose).transpose()
75 }
76}
77
78#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
83pub enum Event<T> {
84 System(SystemEvent),
85 Data(DataEvent<T>),
86}
87
88impl<T> Event<T> {
89 pub fn from_data_record(record: T) -> Self {
90 Event::Data(DataEvent { record })
91 }
92
93 pub fn map_data_record<U, F>(self, f: F) -> Event<U>
94 where
95 F: FnOnce(T) -> U,
96 {
97 match self {
98 Event::System(event) => Event::System(event),
99 Event::Data(event) => Event::Data(event.map(f)),
100 }
101 }
102
103 pub fn as_data_record(&self) -> Option<&T> {
104 match self {
105 Event::System(_) => None,
106 Event::Data(ref event) => Some(&event.record),
107 }
108 }
109}
110
111impl<T> Event<Option<T>> {
112 pub fn transpose(self) -> Option<Event<T>> {
113 match self {
114 Event::System(event) => Some(Event::System(event)),
115 Event::Data(event) => event.record.map(|record| Event::Data(DataEvent { record })),
116 }
117 }
118}
119
120impl<T> From<SystemEvent> for Event<T> {
121 fn from(event: SystemEvent) -> Self {
122 Event::System(event)
123 }
124}
125
126impl<T> From<DataEvent<T>> for Event<T> {
127 fn from(event: DataEvent<T>) -> Self {
128 Event::Data(event)
129 }
130}
131
132#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
142pub enum SystemEvent {
143 Suspend(SuspendEvent),
144}
145
146impl From<SuspendEvent> for SystemEvent {
147 fn from(event: SuspendEvent) -> Self {
148 SystemEvent::Suspend(event)
149 }
150}
151
152#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
154pub enum SuspendEvent {
155 Sleep,
163 Wake,
167}
168
169#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
171pub struct DataEvent<T> {
172 pub record: T,
182}
183
184impl<T> DataEvent<T> {
185 pub fn map<U, F>(self, f: F) -> DataEvent<U>
186 where
187 F: FnOnce(T) -> U,
188 {
189 DataEvent { record: f(self.record) }
190 }
191}
192
193#[cfg(test)]
194pub(crate) mod harness {
195 use fuchsia_async as fasync;
196 use fuchsia_inspect::{Inspector, Node};
197 use futures::Future;
198 use futures::task::Poll;
199 use std::fmt::Debug;
200 use std::marker::PhantomData;
201 use std::pin::Pin;
202
203 use crate::experimental::clock::Timed;
204 use crate::experimental::event::{self, Context, Event, Reactor};
205 use crate::experimental::series::SamplingProfile;
206 use crate::experimental::series::interpolation::LastSample;
207 use crate::experimental::series::statistic::{FoldError, Max, Sum};
208 use crate::experimental::serve::TimeMatrixClient;
209
210 pub const TIME_ZERO: fasync::MonotonicInstant = fasync::MonotonicInstant::from_nanos(0);
211 pub const TIME_ONE_SECOND: fasync::MonotonicInstant =
212 fasync::MonotonicInstant::from_nanos(1_000_000_000);
213
214 pub const TEST_NODE_NAME: &str = "event_test_node";
215
216 pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
217 fn assert_observes_event(
226 self,
227 expected: Event<T>,
228 ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
229 where
230 Self: Sized,
231 T: Clone + Debug + PartialEq,
232 {
233 #[derive(Debug)]
234 struct Assertion<T, R>
235 where
236 T: Debug,
237 {
238 reactor: R,
239 expected: Event<T>,
240 is_observed: bool,
241 }
242
243 impl<T, R> Drop for Assertion<T, R>
244 where
245 T: Debug,
246 {
247 fn drop(&mut self) {
248 assert!(
249 self.is_observed,
250 "reactor never received an expected event before drop: {:?}",
251 self.expected,
252 );
253 }
254 }
255
256 impl<T, S, R> Reactor<T, S> for Assertion<T, R>
257 where
258 T: Debug + PartialEq,
259 R: Reactor<T, S>,
260 {
261 type Response = R::Response;
262 type Error = R::Error;
263
264 fn react(
265 &mut self,
266 event: Timed<Event<T>>,
267 context: Context<'_, S>,
268 ) -> Result<Self::Response, Self::Error> {
269 if &self.expected == event.inner() {
270 self.is_observed = true;
271 }
272 self.reactor.react(event, context)
273 }
274 }
275
276 Assertion { reactor: self, expected, is_observed: false }
277 }
278
279 fn assert_reacts_times(
289 self,
290 n: usize,
291 ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
292 where
293 Self: Sized,
294 {
295 #[derive(Debug)]
296 struct Assertion<T, R> {
297 reactor: R,
298 observed: usize,
299 expected: usize,
300 phantom: PhantomData<fn() -> T>,
301 }
302
303 impl<T, R> Drop for Assertion<T, R> {
304 fn drop(&mut self) {
305 assert!(
306 self.observed == self.expected,
307 "reactor received unexpected number of events on drop: \
308 observed {}, but expected {}",
309 self.observed,
310 self.expected,
311 );
312 }
313 }
314
315 impl<T, S, R> Reactor<T, S> for Assertion<T, R>
316 where
317 R: Reactor<T, S>,
318 {
319 type Response = R::Response;
320 type Error = R::Error;
321
322 fn react(
323 &mut self,
324 event: Timed<Event<T>>,
325 context: Context<'_, S>,
326 ) -> Result<Self::Response, Self::Error> {
327 self.observed =
328 self.observed.checked_add(1).expect("overflow in observed event count");
329 assert!(
330 self.observed <= self.expected,
331 "reactor received unexpected number of events before drop: \
332 observed {}, but expected {}",
333 self.observed,
334 self.expected,
335 );
336 self.reactor.react(event, context)
337 }
338 }
339
340 Assertion { reactor: self, observed: 0, expected: n, phantom: PhantomData }
341 }
342 }
343
344 impl<T, S, R> ReactorExt<T, S> for R where R: Reactor<T, S> {}
345
346 #[derive(Clone, Copy, Debug)]
348 pub struct TxCount {
349 pub failed: u64,
350 pub retried: u64,
351 }
352
353 pub fn executor_at_time_zero() -> fasync::TestExecutor {
355 let executor = fasync::TestExecutor::new_with_fake_time();
356 executor.set_fake_time(TIME_ZERO);
357 executor
358 }
359
360 pub fn inspector_and_test_node() -> (Inspector, Node) {
362 let inspector = Inspector::default();
363 let node = inspector.root().create_child(TEST_NODE_NAME);
364 (inspector, node)
365 }
366
367 pub fn sample_tx_count<'client, 'record>(
371 client: &'client TimeMatrixClient,
372 ) -> impl Reactor<&'record TxCount, (), Response = (), Error = FoldError> {
373 event::on_data_record::<&TxCount, _>(event::then((
374 event::map_data_record(
375 |count: &TxCount, _| count.failed,
376 event::then((
377 event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
378 &client,
379 "tx_failed_sum",
380 SamplingProfile::granular(),
381 LastSample::or(0u64),
382 ),
383 event::sample_data_record(Max::<u64>::default()).in_time_matrix::<LastSample>(
384 &client,
385 "tx_failed_max",
386 SamplingProfile::granular(),
387 LastSample::or(0u64),
388 ),
389 )),
390 ),
391 event::map_data_record(
392 |count: &TxCount, _| count.retried,
393 event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
394 &client,
395 "tx_retried_sum",
396 SamplingProfile::granular(),
397 LastSample::or(0u64),
398 ),
399 ),
400 )))
401 }
402
403 pub const fn respond(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
405 Ok(())
406 }
407
408 pub const fn fail(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
410 Err(())
411 }
412
413 pub fn assert_inspect_time_matrix_server_polls_pending(
415 executor: &mut fasync::TestExecutor,
416 server: &mut Pin<&mut impl Future>,
417 ) {
418 let Poll::Pending = executor.run_until_stalled(server) else {
419 panic!("time matrix inspection server terminated unexpectedly");
420 };
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
427 use std::pin::pin;
428
429 use crate::experimental::clock::Timed;
430 use crate::experimental::event::harness::{self, ReactorExt as _};
431 use crate::experimental::event::{
432 self, Context, DataEvent, Event, Reactor, ReactorExt as _, SuspendEvent, SystemEvent,
433 };
434 use crate::experimental::series::SamplingProfile;
435 use crate::experimental::series::interpolation::LastSample;
436 use crate::experimental::series::metadata::BitSetMap;
437 use crate::experimental::series::statistic::{Max, Sum, Union};
438 use crate::experimental::serve;
439
440 #[test]
441 #[should_panic]
442 fn observes_event_assertion_observes_no_such_event_then_panics() {
443 let _executor = harness::executor_at_time_zero();
444
445 let mut reactor = harness::respond.assert_observes_event(Event::from_data_record(()));
446 let _ = reactor.react(
447 Timed::now(SystemEvent::Suspend(SuspendEvent::Sleep).into()),
448 Context::from_state(&mut ()),
449 );
450 }
451
452 #[test]
453 #[should_panic]
454 fn reacts_times_assertion_reacts_too_few_times_then_panics() {
455 let _executor = harness::executor_at_time_zero();
456
457 let mut reactor = harness::respond.assert_reacts_times(2);
458 let _ = reactor.react_to_data_record(());
459 }
460
461 #[test]
462 #[should_panic]
463 fn reacts_times_assertion_reacts_too_many_times_then_panics() {
464 let _executor = harness::executor_at_time_zero();
465
466 let mut reactor = harness::respond.assert_reacts_times(1);
467 let _ = reactor.react_to_data_record(());
468 let _ = reactor.react_to_data_record(());
469 }
470
471 #[test]
472 fn then_combinator_reacts_then_subsequent_reacts_on_ok_and_err() {
473 let _executor = harness::executor_at_time_zero();
474
475 let mut reactor =
476 harness::respond.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
477 let _ = reactor.react_to_data_record(());
478
479 let mut reactor =
480 harness::fail.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
481 let _ = reactor.react_to_data_record(());
482
483 let mut reactor = event::then((
484 harness::respond.assert_reacts_times(1),
485 harness::fail.assert_reacts_times(1),
486 harness::respond.assert_reacts_times(1),
487 ));
488 let _ = reactor.react_to_data_record(());
489 }
490
491 #[test]
492 fn and_combinator_reacts_then_subsequent_reacts_only_on_ok() {
493 let _executor = harness::executor_at_time_zero();
494
495 let mut reactor =
496 harness::respond.assert_reacts_times(1).and(harness::respond.assert_reacts_times(1));
497 let _ = reactor.react_to_data_record(());
498
499 let mut reactor =
500 harness::fail.assert_reacts_times(1).and(harness::respond.assert_reacts_times(0));
501 let _ = reactor.react_to_data_record(());
502
503 let mut reactor = event::and((
504 harness::respond.assert_reacts_times(1),
505 harness::fail.assert_reacts_times(1),
506 harness::respond.assert_reacts_times(0),
507 ));
508 let _ = reactor.react_to_data_record(());
509 }
510
511 #[test]
512 fn or_combinator_reacts_then_subsequent_reacts_only_on_err() {
513 let _executor = harness::executor_at_time_zero();
514
515 let mut reactor =
516 harness::respond.assert_reacts_times(1).or(harness::respond.assert_reacts_times(0));
517 let _ = reactor.react_to_data_record(());
518
519 let mut reactor =
520 harness::fail.assert_reacts_times(1).or(harness::fail.assert_reacts_times(1));
521 let _ = reactor.react_to_data_record(());
522
523 let mut reactor = event::or((
524 harness::fail.assert_reacts_times(1),
525 harness::respond.assert_reacts_times(1),
526 harness::respond.assert_reacts_times(0),
527 ));
528 let _ = reactor.react_to_data_record(());
529 }
530
531 #[test]
532 fn map_data_record_then_subtree_reacts_to_mapped_record() {
533 let _executor = harness::executor_at_time_zero();
534
535 #[derive(Debug, Eq, PartialEq)]
536 struct Thread {
537 nominal: u128,
538 tpi: u128,
539 }
540
541 let thread = Thread { nominal: 1, tpi: 8 };
542 let mut observed = None;
543 let mut reactor = event::on_data_record::<&Thread, _>(event::map_data_record(
544 |thread: &Thread, _| &thread.tpi,
545 |event: Timed<Event<&u128>>, _: Context<'_, ()>| {
546 let (_, event) = event.into();
547 if let Event::Data(DataEvent { record: tpi, .. }) = event {
548 observed = Some(*tpi);
549 }
550 Ok::<_, ()>(())
551 },
552 ));
553 let _ = reactor.react_to_data_record(&thread);
554 assert_eq!(observed, Some(8));
555 }
556
557 #[test]
558 fn retain_record_with_filter_map_data_record_then_subtree_reacts_to_mapped_record() {
559 const RECORD: i8 = 0;
560
561 let _executor = harness::executor_at_time_zero();
562
563 let mut observed = None;
564 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
565 |_: usize, _| Some(RECORD),
567 |event: Timed<Event<i8>>, _: Context<'_, ()>| {
568 let (_, event) = event.into();
569 if let Event::Data(DataEvent { record, .. }) = event {
570 observed = Some(record);
571 }
572 Ok::<_, ()>(())
573 },
574 ));
575 let _ = reactor.react_to_data_record(0usize);
576 assert_eq!(observed, Some(RECORD));
577 }
578
579 #[test]
580 fn discard_record_with_filter_map_data_record_then_subtree_does_not_react_to_mapped_record() {
581 let _executor = harness::executor_at_time_zero();
582
583 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
584 |_: usize, _| None::<()>,
586 harness::respond.assert_reacts_times(0),
587 ));
588 let _ = reactor.react_to_data_record(0usize);
589 }
590
591 #[test]
594 fn discard_record_with_filter_map_data_record_then_subtree_reacts_to_system_event() {
595 const SYSTEM_EVENT: SystemEvent = SystemEvent::Suspend(SuspendEvent::Sleep);
596
597 let _executor = harness::executor_at_time_zero();
598
599 let mut observed = None;
600 let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
601 |_: usize, _| None::<i8>,
603 |event: Timed<Event<i8>>, _: Context<'_, ()>| {
604 let (_, event) = event.into();
605 if let Event::System(event) = event {
606 observed = Some(event);
607 }
608 Ok::<_, ()>(())
609 },
610 ));
611 let _ = reactor.react(Timed::now(SYSTEM_EVENT.into()), Context::from_state(&mut ()));
612 assert_eq!(observed, Some(SYSTEM_EVENT));
614 }
615
616 #[test]
617 fn with_state_then_subtree_reacts_to_state() {
618 let _executor = harness::executor_at_time_zero();
619
620 #[derive(Debug, Eq, PartialEq)]
621 struct ReactorState {
622 n: u128,
623 }
624
625 let mut observed = None;
626 let mut reactor = event::on_data_record::<(), _>(event::with_state(
627 ReactorState { n: 8 },
628 |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
629 observed = Some(context.state.n);
630 Ok::<_, ()>(())
631 },
632 ));
633 let _ = reactor.react_to_data_record(());
634 assert_eq!(observed, Some(8));
635 }
636
637 #[test]
638 fn write_state_then_subtree_reacts_to_written_state() {
639 let _executor = harness::executor_at_time_zero();
640
641 let mut reactor = event::on_data_record::<(), _>(event::with_state(
642 String::from("hello"),
643 event::then((
644 {
645 |_: Timed<Event<()>>, context: Context<'_, String>| {
646 assert_eq!(context.state, "hello");
647 *context.state = String::from("goodbye");
648 Ok::<_, ()>(())
649 }
650 }
651 .assert_reacts_times(1),
652 {
653 |_: Timed<Event<()>>, context: Context<'_, String>| {
654 assert_eq!(context.state, "goodbye");
655 Ok::<_, ()>(())
656 }
657 }
658 .assert_reacts_times(1),
659 )),
660 ));
661 let _ = reactor.react_to_data_record(());
662 }
663
664 #[test]
665 fn map_state_then_subtree_reacts_to_mapped_state() {
666 let _executor = harness::executor_at_time_zero();
667
668 #[derive(Debug, Eq, PartialEq)]
669 struct ReactorState {
670 n: u128,
671 }
672
673 let mut observed = None;
674 let mut reactor = event::on_data_record::<(), _>(event::map_state(
675 |_| ReactorState { n: 8 },
676 |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
677 observed = Some(context.state.n);
678 Ok::<_, ()>(())
679 },
680 ));
681 let _ = reactor.react_to_data_record(());
682 assert_eq!(observed, Some(8));
683 }
684
685 #[test]
686 fn construct_reactor_with_samplers_then_inspect_data_tree_contains_buffers() {
687 let mut executor = harness::executor_at_time_zero();
688 let (inspector, node) = harness::inspector_and_test_node();
689
690 let (client, server) = serve::serve_time_matrix_inspection(node);
691 let mut server = pin!(server);
692 let _reactor = harness::sample_tx_count(&client);
693
694 executor.set_fake_time(harness::TIME_ONE_SECOND);
695 harness::assert_inspect_time_matrix_server_polls_pending(&mut executor, &mut server);
696 assert_data_tree!(
697 @executor executor,
698 inspector,
699 root: contains {
700 event_test_node: {
701 tx_failed_sum: {
702 "type": "gauge",
703 "data": AnyBytesProperty,
704 },
705 tx_failed_max: {
706 "type": "gauge",
707 "data": AnyBytesProperty,
708 },
709 tx_retried_sum: {
710 "type": "gauge",
711 "data": AnyBytesProperty,
712 },
713 },
714 }
715 );
716 }
717
718 #[test]
719 fn construct_reactor_with_metadata_then_inspect_data_tree_contains_metadata() {
720 use Connectivity::Idle;
721
722 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
723 #[repr(u64)]
724 enum Connectivity {
725 Idle = 1 << 0,
726 Disconnected = 1 << 1,
727 Connected = 1 << 2,
728 }
729
730 let mut executor = harness::executor_at_time_zero();
731 let (inspector, node) = harness::inspector_and_test_node();
732
733 let (client, server) = serve::serve_time_matrix_inspection(node);
734 let mut server = pin!(server);
735 let _reactor = event::on_data_record::<Connectivity, _>(event::map_data_record(
736 |connectivity, _| connectivity as u64,
737 event::sample_data_record(Union::<u64>::default())
738 .with_metadata(BitSetMap::from_ordered(["idle", "disconnected", "connected"]))
739 .in_time_matrix::<LastSample>(
740 &client,
741 "connectivity",
742 SamplingProfile::granular(),
743 LastSample::or(Idle as u64),
744 ),
745 ));
746
747 executor.set_fake_time(harness::TIME_ONE_SECOND);
748 harness::assert_inspect_time_matrix_server_polls_pending(&mut executor, &mut server);
749 assert_data_tree!(
750 @executor executor,
751 inspector,
752 root: contains {
753 event_test_node: {
754 connectivity: {
755 "type": "bitset",
756 "data": AnyBytesProperty,
757 metadata: {
758 index: {
759 "0": "idle",
760 "1": "disconnected",
761 "2": "connected",
762 }
763 }
764 },
765 },
766 }
767 );
768 }
769
770 #[test]
771 fn sample_data_record_fields_with_reactor_then_reacts_one_time_with_mapped_fields() {
772 let executor = harness::executor_at_time_zero();
773 let (_inspector, node) = harness::inspector_and_test_node();
774
775 let (client, _server) = serve::serve_time_matrix_inspection(node);
776 let mut reactor = event::on_data_record::<&harness::TxCount, _>(event::then((
777 event::map_data_record(
778 |count: &harness::TxCount, _| count.failed,
779 event::then((
780 event::sample_data_record(Sum::<u64>::default())
781 .in_time_matrix::<LastSample>(
782 &client,
783 "tx_failed_sum",
784 SamplingProfile::granular(),
785 LastSample::or(0u64),
786 )
787 .assert_observes_event(Event::from_data_record(1))
788 .assert_reacts_times(1),
789 event::sample_data_record(Max::<u64>::default())
790 .in_time_matrix::<LastSample>(
791 &client,
792 "tx_failed_max",
793 SamplingProfile::granular(),
794 LastSample::or(0u64),
795 )
796 .assert_observes_event(Event::from_data_record(1))
797 .assert_reacts_times(1),
798 )),
799 ),
800 event::map_data_record(
801 |count: &harness::TxCount, _| count.retried,
802 event::sample_data_record(Sum::<u64>::default())
803 .in_time_matrix::<LastSample>(
804 &client,
805 "tx_retried_sum",
806 SamplingProfile::granular(),
807 LastSample::or(0u64),
808 )
809 .assert_observes_event(Event::from_data_record(3))
810 .assert_reacts_times(1),
811 ),
812 )));
813
814 executor.set_fake_time(harness::TIME_ONE_SECOND);
815 reactor.react_to_data_record(&harness::TxCount { failed: 1, retried: 3 }).unwrap();
816 }
817}