windowed_stats/experimental/event/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Event reactors and combinators.
6//!
7//! This module provides APIs for constructing [`Reactor`] types that respond to [state and data
8//! events][`Event`] by configuring and sampling data with [time matrices][`TimeMatrix`].
9//!
10//! [`Event`]: crate::experimental::event::Event
11//! [`Reactor`]: crate::experimental::event::Reactor
12//! [`TimeMatrix`]: crate::experimental::series::TimeMatrix
13
14mod builder;
15mod reactor;
16
17use crate::experimental::clock::Timed;
18
19pub use crate::experimental::event::builder::{SampleDataRecord, sample_data_record};
20pub use crate::experimental::event::reactor::{
21    And, AndChain, Context, Fail, FilterMapDataRecord, Inspect, IntoReactor, MapError, MapResponse,
22    Or, OrChain, Reactor, Respond, Then, ThenChain, WithState, and, fail, filter_map_data_record,
23    map_data_record, map_state, on_data_record, or, respond, then, with_state,
24};
25
26/// Extension methods for [`Reactor`] types.
27pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
28    /// Reacts to a [data record][`DataEvent::record`].
29    ///
30    /// This function constructs and [reacts to][`Reactor::react`] a data event with the given
31    /// record at [`Timestamp::now`].
32    ///
33    /// [`DataEvent::record`]: crate::experimental::event::DataEvent::record
34    /// [`Reactor::react`]: crate::experimental::event::Reactor::react
35    /// [`Timestamp::now`]: crate::experimental::clock::Timed
36    fn react_to_data_record(&mut self, record: T) -> Result<Self::Response, Self::Error>
37    where
38        S: Default,
39    {
40        self.react(Timed::now(DataEvent { record }.into()), Context::from_state(&mut S::default()))
41    }
42}
43
44impl<R, T> ReactorExt<T> for R where R: Reactor<T, ()> {}
45
46impl<T> Timed<Event<T>> {
47    pub(crate) fn to_timed_sample(&self) -> Option<Timed<T>>
48    where
49        T: Clone,
50    {
51        self.clone()
52            .map(|event| match event {
53                Event::Data(DataEvent { record, .. }) => Some(record),
54                _ => None,
55            })
56            .transpose()
57    }
58
59    pub fn as_data_record(&self) -> Option<&T> {
60        self.inner().as_data_record()
61    }
62
63    pub fn map_data_record<U, F>(self, f: F) -> Timed<Event<U>>
64    where
65        F: FnOnce(T) -> U,
66    {
67        self.map(move |event| event.map_data_record(f))
68    }
69
70    pub fn filter_map_data_record<U, F>(self, f: F) -> Option<Timed<Event<U>>>
71    where
72        F: FnOnce(T) -> Option<U>,
73    {
74        self.map_data_record(f).map(Event::transpose).transpose()
75    }
76}
77
78/// An event that describes a change to [the environment][`SystemEvent`] or the arrival of a [data
79/// record][`DataEvent::record`].
80///
81/// [`DataEvent::record`]: crate::experimental::event::DataEvent::record
82#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
83pub enum Event<T> {
84    System(SystemEvent),
85    Data(DataEvent<T>),
86}
87
88impl<T> Event<T> {
89    pub fn from_data_record(record: T) -> Self {
90        Event::Data(DataEvent { record })
91    }
92
93    pub fn map_data_record<U, F>(self, f: F) -> Event<U>
94    where
95        F: FnOnce(T) -> U,
96    {
97        match self {
98            Event::System(event) => Event::System(event),
99            Event::Data(event) => Event::Data(event.map(f)),
100        }
101    }
102
103    pub fn as_data_record(&self) -> Option<&T> {
104        match self {
105            Event::System(_) => None,
106            Event::Data(ref event) => Some(&event.record),
107        }
108    }
109}
110
111impl<T> Event<Option<T>> {
112    pub fn transpose(self) -> Option<Event<T>> {
113        match self {
114            Event::System(event) => Some(Event::System(event)),
115            Event::Data(event) => event.record.map(|record| Event::Data(DataEvent { record })),
116        }
117    }
118}
119
120impl<T> From<SystemEvent> for Event<T> {
121    fn from(event: SystemEvent) -> Self {
122        Event::System(event)
123    }
124}
125
126impl<T> From<DataEvent<T>> for Event<T> {
127    fn from(event: DataEvent<T>) -> Self {
128        Event::Data(event)
129    }
130}
131
132/// Describes a change to the environment that may require reconfiguration.
133///
134/// System events may change the behavior of a [`Reactor`]. For example, some [`Reactor`]s that
135/// configure a [`TimeMatrix`] may apply an alternative interpolation when a [`Sleep`] event is
136/// received.
137///
138/// [`Reactor`]: crate::experimental::event::Reactor
139/// [`Sleep`]: crate::experimental::event::SuspendEvent::Sleep
140/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
141#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
142pub enum SystemEvent {
143    Suspend(SuspendEvent),
144}
145
146impl From<SuspendEvent> for SystemEvent {
147    fn from(event: SuspendEvent) -> Self {
148        SystemEvent::Suspend(event)
149    }
150}
151
152/// Describes entering and exiting a mode of suspended execution.
153#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
154pub enum SuspendEvent {
155    /// Indicates that the system is entering a mode that suspends execution.
156    ///
157    /// This event describes a state in which any system clock is inactive. On [`Wake`], there may
158    /// be an arbitrarily large difference between [`Timestamp::now`] before and after suspension.
159    ///
160    /// [`Timestamp::now`]: crate::experimental::clock::Timestamp::now
161    /// [`Wake`]: crate::experimental::event::SuspendEvent::Wake
162    Sleep,
163    /// Indicates that the system has exited [`Sleep`].
164    ///
165    /// [`Sleep`]: crate::experimental::event::SuspendEvent::Sleep
166    Wake,
167}
168
169/// Describes an arbitrary event with associated data of interest.
170#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
171pub struct DataEvent<T> {
172    /// The record associated with the event.
173    ///
174    /// This type typically describes information or metrics associated with the event and can be
175    /// sampled by a [`Reactor`] using a [`TimeMatrix`] via combinators like
176    /// [`sample_data_record`].
177    ///
178    /// [`Reactor`]: crate::experimental::event::Reactor
179    /// [`sample_data_record`]: crate::experimental::event::sample_data_record
180    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
181    pub record: T,
182}
183
184impl<T> DataEvent<T> {
185    pub fn map<U, F>(self, f: F) -> DataEvent<U>
186    where
187        F: FnOnce(T) -> U,
188    {
189        DataEvent { record: f(self.record) }
190    }
191}
192
193#[cfg(test)]
194pub(crate) mod harness {
195    use fuchsia_async as fasync;
196    use fuchsia_inspect::{Inspector, Node};
197    use std::fmt::Debug;
198    use std::marker::PhantomData;
199
200    use crate::experimental::clock::Timed;
201    use crate::experimental::event::{self, Context, Event, Reactor};
202    use crate::experimental::inspect::TimeMatrixClient;
203    use crate::experimental::series::SamplingProfile;
204    use crate::experimental::series::interpolation::LastSample;
205    use crate::experimental::series::statistic::{FoldError, Max, Sum};
206
207    pub const TIME_ZERO: fasync::MonotonicInstant = fasync::MonotonicInstant::from_nanos(0);
208    pub const TIME_ONE_SECOND: fasync::MonotonicInstant =
209        fasync::MonotonicInstant::from_nanos(1_000_000_000);
210
211    pub const TEST_NODE_NAME: &str = "event_test_node";
212
213    pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
214        /// Asserts that `self` observes the given [`Event`] at least once.
215        ///
216        /// If `self` is leaked, then this function asserts nothing.
217        ///
218        /// # Panics
219        ///
220        /// This function panics if `self` has not observed an event that is partially equivalent
221        /// to `expected` when dropped.
222        fn assert_observes_event(
223            self,
224            expected: Event<T>,
225        ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
226        where
227            Self: Sized,
228            T: Clone + Debug + PartialEq,
229        {
230            #[derive(Debug)]
231            struct Assertion<T, R>
232            where
233                T: Debug,
234            {
235                reactor: R,
236                expected: Event<T>,
237                is_observed: bool,
238            }
239
240            impl<T, R> Drop for Assertion<T, R>
241            where
242                T: Debug,
243            {
244                fn drop(&mut self) {
245                    assert!(
246                        self.is_observed,
247                        "reactor never received an expected event before drop: {:?}",
248                        self.expected,
249                    );
250                }
251            }
252
253            impl<T, S, R> Reactor<T, S> for Assertion<T, R>
254            where
255                T: Debug + PartialEq,
256                R: Reactor<T, S>,
257            {
258                type Response = R::Response;
259                type Error = R::Error;
260
261                fn react(
262                    &mut self,
263                    event: Timed<Event<T>>,
264                    context: Context<'_, S>,
265                ) -> Result<Self::Response, Self::Error> {
266                    if &self.expected == event.inner() {
267                        self.is_observed = true;
268                    }
269                    self.reactor.react(event, context)
270                }
271            }
272
273            Assertion { reactor: self, expected, is_observed: false }
274        }
275
276        /// Asserts that `self` reacts to events exactly `n` times.
277        ///
278        /// If `self` is leaked, then this function only asserts that `self` reacts to no more than
279        /// `n` events.
280        ///
281        /// # Panics
282        ///
283        /// This function panics if `self` reacts to more than `n` events or fewer than `n` events
284        /// when dropped.
285        fn assert_reacts_times(
286            self,
287            n: usize,
288        ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
289        where
290            Self: Sized,
291        {
292            #[derive(Debug)]
293            struct Assertion<T, R> {
294                reactor: R,
295                observed: usize,
296                expected: usize,
297                phantom: PhantomData<fn() -> T>,
298            }
299
300            impl<T, R> Drop for Assertion<T, R> {
301                fn drop(&mut self) {
302                    assert!(
303                        self.observed == self.expected,
304                        "reactor received unexpected number of events on drop: \
305                         observed {}, but expected {}",
306                        self.observed,
307                        self.expected,
308                    );
309                }
310            }
311
312            impl<T, S, R> Reactor<T, S> for Assertion<T, R>
313            where
314                R: Reactor<T, S>,
315            {
316                type Response = R::Response;
317                type Error = R::Error;
318
319                fn react(
320                    &mut self,
321                    event: Timed<Event<T>>,
322                    context: Context<'_, S>,
323                ) -> Result<Self::Response, Self::Error> {
324                    self.observed =
325                        self.observed.checked_add(1).expect("overflow in observed event count");
326                    assert!(
327                        self.observed <= self.expected,
328                        "reactor received unexpected number of events before drop: \
329                         observed {}, but expected {}",
330                        self.observed,
331                        self.expected,
332                    );
333                    self.reactor.react(event, context)
334                }
335            }
336
337            Assertion { reactor: self, observed: 0, expected: n, phantom: PhantomData }
338        }
339    }
340
341    impl<T, S, R> ReactorExt<T, S> for R where R: Reactor<T, S> {}
342
343    /// A data record with counts of transmission outcomes.
344    #[derive(Clone, Copy, Debug)]
345    pub struct TxCount {
346        pub failed: u64,
347        pub retried: u64,
348    }
349
350    /// Constructs an executor with its clock set to time zero.
351    pub fn executor_at_time_zero() -> fasync::TestExecutor {
352        let executor = fasync::TestExecutor::new_with_fake_time();
353        executor.set_fake_time(TIME_ZERO);
354        executor
355    }
356
357    /// Constructs an inspector and child node with the name defined by `TEST_NODE_NAME`.
358    pub fn inspector_and_test_node() -> (Inspector, Node) {
359        let inspector = Inspector::default();
360        let node = inspector.root().create_child(TEST_NODE_NAME);
361        (inspector, node)
362    }
363
364    // This function demonstrates how `Reactor`s can be parameterized and returned from functions.
365    // Such `Reactor`s can be further composed as needed.
366    /// Constructs a `Reactor` that samples `TxCount` fields.
367    pub fn sample_tx_count<'client, 'record>(
368        client: &'client TimeMatrixClient,
369    ) -> impl Reactor<&'record TxCount, (), Response = (), Error = FoldError> {
370        event::on_data_record::<&TxCount, _>(event::then((
371            event::map_data_record(
372                |count: &TxCount, _| count.failed,
373                event::then((
374                    event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
375                        &client,
376                        "tx_failed_sum",
377                        SamplingProfile::granular(),
378                        LastSample::or(0u64),
379                    ),
380                    event::sample_data_record(Max::<u64>::default()).in_time_matrix::<LastSample>(
381                        &client,
382                        "tx_failed_max",
383                        SamplingProfile::granular(),
384                        LastSample::or(0u64),
385                    ),
386                )),
387            ),
388            event::map_data_record(
389                |count: &TxCount, _| count.retried,
390                event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
391                    &client,
392                    "tx_retried_sum",
393                    SamplingProfile::granular(),
394                    LastSample::or(0u64),
395                ),
396            ),
397        )))
398    }
399
400    /// A `Reactor` of only the unit type `()` that always responds with `Ok`.
401    pub const fn respond(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
402        Ok(())
403    }
404
405    /// A `Reactor` of only the unit type `()` that always fails with `Err`.
406    pub const fn fail(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
407        Err(())
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
414
415    use crate::experimental::clock::Timed;
416    use crate::experimental::event::harness::{self, ReactorExt as _};
417    use crate::experimental::event::{
418        self, Context, DataEvent, Event, Reactor, ReactorExt as _, SuspendEvent, SystemEvent,
419    };
420    use crate::experimental::inspect::TimeMatrixClient;
421    use crate::experimental::series::SamplingProfile;
422    use crate::experimental::series::interpolation::LastSample;
423    use crate::experimental::series::metadata::BitSetMap;
424    use crate::experimental::series::statistic::{Max, Sum, Union};
425
426    #[test]
427    #[should_panic]
428    fn observes_event_assertion_observes_no_such_event_then_panics() {
429        let _executor = harness::executor_at_time_zero();
430
431        let mut reactor = harness::respond.assert_observes_event(Event::from_data_record(()));
432        let _ = reactor.react(
433            Timed::now(SystemEvent::Suspend(SuspendEvent::Sleep).into()),
434            Context::from_state(&mut ()),
435        );
436    }
437
438    #[test]
439    #[should_panic]
440    fn reacts_times_assertion_reacts_too_few_times_then_panics() {
441        let _executor = harness::executor_at_time_zero();
442
443        let mut reactor = harness::respond.assert_reacts_times(2);
444        let _ = reactor.react_to_data_record(());
445    }
446
447    #[test]
448    #[should_panic]
449    fn reacts_times_assertion_reacts_too_many_times_then_panics() {
450        let _executor = harness::executor_at_time_zero();
451
452        let mut reactor = harness::respond.assert_reacts_times(1);
453        let _ = reactor.react_to_data_record(());
454        let _ = reactor.react_to_data_record(());
455    }
456
457    #[test]
458    fn then_combinator_reacts_then_subsequent_reacts_on_ok_and_err() {
459        let _executor = harness::executor_at_time_zero();
460
461        let mut reactor =
462            harness::respond.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
463        let _ = reactor.react_to_data_record(());
464
465        let mut reactor =
466            harness::fail.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
467        let _ = reactor.react_to_data_record(());
468
469        let mut reactor = event::then((
470            harness::respond.assert_reacts_times(1),
471            harness::fail.assert_reacts_times(1),
472            harness::respond.assert_reacts_times(1),
473        ));
474        let _ = reactor.react_to_data_record(());
475    }
476
477    #[test]
478    fn and_combinator_reacts_then_subsequent_reacts_only_on_ok() {
479        let _executor = harness::executor_at_time_zero();
480
481        let mut reactor =
482            harness::respond.assert_reacts_times(1).and(harness::respond.assert_reacts_times(1));
483        let _ = reactor.react_to_data_record(());
484
485        let mut reactor =
486            harness::fail.assert_reacts_times(1).and(harness::respond.assert_reacts_times(0));
487        let _ = reactor.react_to_data_record(());
488
489        let mut reactor = event::and((
490            harness::respond.assert_reacts_times(1),
491            harness::fail.assert_reacts_times(1),
492            harness::respond.assert_reacts_times(0),
493        ));
494        let _ = reactor.react_to_data_record(());
495    }
496
497    #[test]
498    fn or_combinator_reacts_then_subsequent_reacts_only_on_err() {
499        let _executor = harness::executor_at_time_zero();
500
501        let mut reactor =
502            harness::respond.assert_reacts_times(1).or(harness::respond.assert_reacts_times(0));
503        let _ = reactor.react_to_data_record(());
504
505        let mut reactor =
506            harness::fail.assert_reacts_times(1).or(harness::fail.assert_reacts_times(1));
507        let _ = reactor.react_to_data_record(());
508
509        let mut reactor = event::or((
510            harness::fail.assert_reacts_times(1),
511            harness::respond.assert_reacts_times(1),
512            harness::respond.assert_reacts_times(0),
513        ));
514        let _ = reactor.react_to_data_record(());
515    }
516
517    #[test]
518    fn map_data_record_then_subtree_reacts_to_mapped_record() {
519        let _executor = harness::executor_at_time_zero();
520
521        #[derive(Debug, Eq, PartialEq)]
522        struct Thread {
523            nominal: u128,
524            tpi: u128,
525        }
526
527        let thread = Thread { nominal: 1, tpi: 8 };
528        let mut observed = None;
529        let mut reactor = event::on_data_record::<&Thread, _>(event::map_data_record(
530            |thread: &Thread, _| &thread.tpi,
531            |event: Timed<Event<&u128>>, _: Context<'_, ()>| {
532                let (_, event) = event.into();
533                if let Event::Data(DataEvent { record: tpi, .. }) = event {
534                    observed = Some(*tpi);
535                }
536                Ok::<_, ()>(())
537            },
538        ));
539        let _ = reactor.react_to_data_record(&thread);
540        assert_eq!(observed, Some(8));
541    }
542
543    #[test]
544    fn retain_record_with_filter_map_data_record_then_subtree_reacts_to_mapped_record() {
545        const RECORD: i8 = 0;
546
547        let _executor = harness::executor_at_time_zero();
548
549        let mut observed = None;
550        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
551            // Ignore the `usize` data record and map to `Some` constant `i8`.
552            |_: usize, _| Some(RECORD),
553            |event: Timed<Event<i8>>, _: Context<'_, ()>| {
554                let (_, event) = event.into();
555                if let Event::Data(DataEvent { record, .. }) = event {
556                    observed = Some(record);
557                }
558                Ok::<_, ()>(())
559            },
560        ));
561        let _ = reactor.react_to_data_record(0usize);
562        assert_eq!(observed, Some(RECORD));
563    }
564
565    #[test]
566    fn discard_record_with_filter_map_data_record_then_subtree_does_not_react_to_mapped_record() {
567        let _executor = harness::executor_at_time_zero();
568
569        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
570            // Ignore the `usize` data record, map to `()`, and return `None`.
571            |_: usize, _| None::<()>,
572            harness::respond.assert_reacts_times(0),
573        ));
574        let _ = reactor.react_to_data_record(0usize);
575    }
576
577    // It is important that discarding data events does not interfere with the observation of
578    // system events.
579    #[test]
580    fn discard_record_with_filter_map_data_record_then_subtree_reacts_to_system_event() {
581        const SYSTEM_EVENT: SystemEvent = SystemEvent::Suspend(SuspendEvent::Sleep);
582
583        let _executor = harness::executor_at_time_zero();
584
585        let mut observed = None;
586        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
587            // Ignore the `usize` data record, map to `i8`, and return `None`.
588            |_: usize, _| None::<i8>,
589            |event: Timed<Event<i8>>, _: Context<'_, ()>| {
590                let (_, event) = event.into();
591                if let Event::System(event) = event {
592                    observed = Some(event);
593                }
594                Ok::<_, ()>(())
595            },
596        ));
597        let _ = reactor.react(Timed::now(SYSTEM_EVENT.into()), Context::from_state(&mut ()));
598        // Despite discarding any and all data records, the system event must be observed.
599        assert_eq!(observed, Some(SYSTEM_EVENT));
600    }
601
602    #[test]
603    fn with_state_then_subtree_reacts_to_state() {
604        let _executor = harness::executor_at_time_zero();
605
606        #[derive(Debug, Eq, PartialEq)]
607        struct ReactorState {
608            n: u128,
609        }
610
611        let mut observed = None;
612        let mut reactor = event::on_data_record::<(), _>(event::with_state(
613            ReactorState { n: 8 },
614            |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
615                observed = Some(context.state.n);
616                Ok::<_, ()>(())
617            },
618        ));
619        let _ = reactor.react_to_data_record(());
620        assert_eq!(observed, Some(8));
621    }
622
623    #[test]
624    fn write_state_then_subtree_reacts_to_written_state() {
625        let _executor = harness::executor_at_time_zero();
626
627        let mut reactor = event::on_data_record::<(), _>(event::with_state(
628            String::from("hello"),
629            event::then((
630                {
631                    |_: Timed<Event<()>>, context: Context<'_, String>| {
632                        assert_eq!(context.state, "hello");
633                        *context.state = String::from("goodbye");
634                        Ok::<_, ()>(())
635                    }
636                }
637                .assert_reacts_times(1),
638                {
639                    |_: Timed<Event<()>>, context: Context<'_, String>| {
640                        assert_eq!(context.state, "goodbye");
641                        Ok::<_, ()>(())
642                    }
643                }
644                .assert_reacts_times(1),
645            )),
646        ));
647        let _ = reactor.react_to_data_record(());
648    }
649
650    #[test]
651    fn map_state_then_subtree_reacts_to_mapped_state() {
652        let _executor = harness::executor_at_time_zero();
653
654        #[derive(Debug, Eq, PartialEq)]
655        struct ReactorState {
656            n: u128,
657        }
658
659        let mut observed = None;
660        let mut reactor = event::on_data_record::<(), _>(event::map_state(
661            |_| ReactorState { n: 8 },
662            |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
663                observed = Some(context.state.n);
664                Ok::<_, ()>(())
665            },
666        ));
667        let _ = reactor.react_to_data_record(());
668        assert_eq!(observed, Some(8));
669    }
670
671    #[test]
672    fn construct_reactor_with_samplers_then_inspect_data_tree_contains_buffers() {
673        let mut executor = harness::executor_at_time_zero();
674        let (inspector, node) = harness::inspector_and_test_node();
675
676        let client = TimeMatrixClient::new(node);
677        let _reactor = harness::sample_tx_count(&client);
678
679        executor.set_fake_time(harness::TIME_ONE_SECOND);
680        assert_data_tree!(
681            @executor executor,
682            inspector,
683            root: contains {
684                event_test_node: {
685                    tx_failed_sum: {
686                        "type": "gauge",
687                        "data": AnyBytesProperty,
688                    },
689                    tx_failed_max: {
690                        "type": "gauge",
691                        "data": AnyBytesProperty,
692                    },
693                    tx_retried_sum: {
694                        "type": "gauge",
695                        "data": AnyBytesProperty,
696                    },
697                },
698            }
699        );
700    }
701
702    #[test]
703    fn construct_reactor_with_metadata_then_inspect_data_tree_contains_metadata() {
704        use Connectivity::Idle;
705
706        #[derive(Clone, Copy, Debug, Eq, PartialEq)]
707        #[repr(u64)]
708        enum Connectivity {
709            Idle = 1 << 0,
710            Disconnected = 1 << 1,
711            Connected = 1 << 2,
712        }
713
714        let mut executor = harness::executor_at_time_zero();
715        let (inspector, node) = harness::inspector_and_test_node();
716
717        let client = TimeMatrixClient::new(node);
718        let _reactor = event::on_data_record::<Connectivity, _>(event::map_data_record(
719            |connectivity, _| connectivity as u64,
720            event::sample_data_record(Union::<u64>::default())
721                .with_metadata(BitSetMap::from_ordered(["idle", "disconnected", "connected"]))
722                .in_time_matrix::<LastSample>(
723                    &client,
724                    "connectivity",
725                    SamplingProfile::granular(),
726                    LastSample::or(Idle as u64),
727                ),
728        ));
729
730        executor.set_fake_time(harness::TIME_ONE_SECOND);
731        assert_data_tree!(
732            @executor executor,
733            inspector,
734            root: contains {
735                event_test_node: {
736                    connectivity: {
737                        "type": "bitset",
738                        "data": AnyBytesProperty,
739                        metadata: {
740                            index: {
741                                "0": "idle",
742                                "1": "disconnected",
743                                "2": "connected",
744                            }
745                        }
746                    },
747                },
748            }
749        );
750    }
751
752    #[test]
753    fn sample_data_record_fields_with_reactor_then_reacts_one_time_with_mapped_fields() {
754        let executor = harness::executor_at_time_zero();
755        let (_inspector, node) = harness::inspector_and_test_node();
756
757        let client = TimeMatrixClient::new(node);
758        let mut reactor = event::on_data_record::<&harness::TxCount, _>(event::then((
759            event::map_data_record(
760                |count: &harness::TxCount, _| count.failed,
761                event::then((
762                    event::sample_data_record(Sum::<u64>::default())
763                        .in_time_matrix::<LastSample>(
764                            &client,
765                            "tx_failed_sum",
766                            SamplingProfile::granular(),
767                            LastSample::or(0u64),
768                        )
769                        .assert_observes_event(Event::from_data_record(1))
770                        .assert_reacts_times(1),
771                    event::sample_data_record(Max::<u64>::default())
772                        .in_time_matrix::<LastSample>(
773                            &client,
774                            "tx_failed_max",
775                            SamplingProfile::granular(),
776                            LastSample::or(0u64),
777                        )
778                        .assert_observes_event(Event::from_data_record(1))
779                        .assert_reacts_times(1),
780                )),
781            ),
782            event::map_data_record(
783                |count: &harness::TxCount, _| count.retried,
784                event::sample_data_record(Sum::<u64>::default())
785                    .in_time_matrix::<LastSample>(
786                        &client,
787                        "tx_retried_sum",
788                        SamplingProfile::granular(),
789                        LastSample::or(0u64),
790                    )
791                    .assert_observes_event(Event::from_data_record(3))
792                    .assert_reacts_times(1),
793            ),
794        )));
795
796        executor.set_fake_time(harness::TIME_ONE_SECOND);
797        reactor.react_to_data_record(&harness::TxCount { failed: 1, retried: 3 }).unwrap();
798    }
799}