windowed_stats/experimental/event/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Event reactors and combinators.
6//!
7//! This module provides APIs for constructing [`Reactor`] types that respond to [state and data
8//! events][`Event`] by configuring and sampling data with [time matrices][`TimeMatrix`].
9//!
10//! [`Event`]: crate::experimental::event::Event
11//! [`Reactor`]: crate::experimental::event::Reactor
12//! [`TimeMatrix`]: crate::experimental::series::TimeMatrix
13
14mod builder;
15mod reactor;
16
17use crate::experimental::clock::Timed;
18
19pub use crate::experimental::event::builder::{SampleDataRecord, sample_data_record};
20pub use crate::experimental::event::reactor::{
21    And, AndChain, Context, Fail, FilterMapDataRecord, Inspect, IntoReactor, MapError, MapResponse,
22    Or, OrChain, Reactor, Respond, Then, ThenChain, WithState, and, fail, filter_map_data_record,
23    map_data_record, map_state, on_data_record, or, respond, then, with_state,
24};
25
26/// Extension methods for [`Reactor`] types.
27pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
28    /// Reacts to a [data record][`DataEvent::record`].
29    ///
30    /// This function constructs and [reacts to][`Reactor::react`] a data event with the given
31    /// record at [`Timestamp::now`].
32    ///
33    /// [`DataEvent::record`]: crate::experimental::event::DataEvent::record
34    /// [`Reactor::react`]: crate::experimental::event::Reactor::react
35    /// [`Timestamp::now`]: crate::experimental::clock::Timed
36    fn react_to_data_record(&mut self, record: T) -> Result<Self::Response, Self::Error>
37    where
38        S: Default,
39    {
40        self.react(Timed::now(DataEvent { record }.into()), Context::from_state(&mut S::default()))
41    }
42}
43
44impl<R, T> ReactorExt<T> for R where R: Reactor<T, ()> {}
45
46impl<T> Timed<Event<T>> {
47    pub(crate) fn to_timed_sample(&self) -> Option<Timed<T>>
48    where
49        T: Clone,
50    {
51        self.clone()
52            .map(|event| match event {
53                Event::Data(DataEvent { record, .. }) => Some(record),
54                _ => None,
55            })
56            .transpose()
57    }
58
59    pub fn as_data_record(&self) -> Option<&T> {
60        self.inner().as_data_record()
61    }
62
63    pub fn map_data_record<U, F>(self, f: F) -> Timed<Event<U>>
64    where
65        F: FnOnce(T) -> U,
66    {
67        self.map(move |event| event.map_data_record(f))
68    }
69
70    pub fn filter_map_data_record<U, F>(self, f: F) -> Option<Timed<Event<U>>>
71    where
72        F: FnOnce(T) -> Option<U>,
73    {
74        self.map_data_record(f).map(Event::transpose).transpose()
75    }
76}
77
78/// An event that describes a change to [the environment][`SystemEvent`] or the arrival of a [data
79/// record][`DataEvent::record`].
80///
81/// [`DataEvent::record`]: crate::experimental::event::DataEvent::record
82#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
83pub enum Event<T> {
84    System(SystemEvent),
85    Data(DataEvent<T>),
86}
87
88impl<T> Event<T> {
89    pub fn from_data_record(record: T) -> Self {
90        Event::Data(DataEvent { record })
91    }
92
93    pub fn map_data_record<U, F>(self, f: F) -> Event<U>
94    where
95        F: FnOnce(T) -> U,
96    {
97        match self {
98            Event::System(event) => Event::System(event),
99            Event::Data(event) => Event::Data(event.map(f)),
100        }
101    }
102
103    pub fn as_data_record(&self) -> Option<&T> {
104        match self {
105            Event::System(_) => None,
106            Event::Data(ref event) => Some(&event.record),
107        }
108    }
109}
110
111impl<T> Event<Option<T>> {
112    pub fn transpose(self) -> Option<Event<T>> {
113        match self {
114            Event::System(event) => Some(Event::System(event)),
115            Event::Data(event) => event.record.map(|record| Event::Data(DataEvent { record })),
116        }
117    }
118}
119
120impl<T> From<SystemEvent> for Event<T> {
121    fn from(event: SystemEvent) -> Self {
122        Event::System(event)
123    }
124}
125
126impl<T> From<DataEvent<T>> for Event<T> {
127    fn from(event: DataEvent<T>) -> Self {
128        Event::Data(event)
129    }
130}
131
132/// Describes a change to the environment that may require reconfiguration.
133///
134/// System events may change the behavior of a [`Reactor`]. For example, some [`Reactor`]s that
135/// configure a [`TimeMatrix`] may apply an alternative interpolation when a [`Sleep`] event is
136/// received.
137///
138/// [`Reactor`]: crate::experimental::event::Reactor
139/// [`Sleep`]: crate::experimental::event::SuspendEvent::Sleep
140/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
141#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
142pub enum SystemEvent {
143    Suspend(SuspendEvent),
144}
145
146impl From<SuspendEvent> for SystemEvent {
147    fn from(event: SuspendEvent) -> Self {
148        SystemEvent::Suspend(event)
149    }
150}
151
152/// Describes entering and exiting a mode of suspended execution.
153#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
154pub enum SuspendEvent {
155    /// Indicates that the system is entering a mode that suspends execution.
156    ///
157    /// This event describes a state in which any system clock is inactive. On [`Wake`], there may
158    /// be an arbitrarily large difference between [`Timestamp::now`] before and after suspension.
159    ///
160    /// [`Timestamp::now`]: crate::experimental::clock::Timestamp::now
161    /// [`Wake`]: crate::experimental::event::SuspendEvent::Wake
162    Sleep,
163    /// Indicates that the system has exited [`Sleep`].
164    ///
165    /// [`Sleep`]: crate::experimental::event::SuspendEvent::Sleep
166    Wake,
167}
168
169/// Describes an arbitrary event with associated data of interest.
170#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
171pub struct DataEvent<T> {
172    /// The record associated with the event.
173    ///
174    /// This type typically describes information or metrics associated with the event and can be
175    /// sampled by a [`Reactor`] using a [`TimeMatrix`] via combinators like
176    /// [`sample_data_record`].
177    ///
178    /// [`Reactor`]: crate::experimental::event::Reactor
179    /// [`sample_data_record`]: crate::experimental::event::sample_data_record
180    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
181    pub record: T,
182}
183
184impl<T> DataEvent<T> {
185    pub fn map<U, F>(self, f: F) -> DataEvent<U>
186    where
187        F: FnOnce(T) -> U,
188    {
189        DataEvent { record: f(self.record) }
190    }
191}
192
193#[cfg(test)]
194pub(crate) mod harness {
195    use fuchsia_async as fasync;
196    use fuchsia_inspect::{Inspector, Node};
197    use futures::Future;
198    use futures::task::Poll;
199    use std::fmt::Debug;
200    use std::marker::PhantomData;
201    use std::pin::Pin;
202
203    use crate::experimental::clock::Timed;
204    use crate::experimental::event::{self, Context, Event, Reactor};
205    use crate::experimental::series::SamplingProfile;
206    use crate::experimental::series::interpolation::LastSample;
207    use crate::experimental::series::statistic::{FoldError, Max, Sum};
208    use crate::experimental::serve::TimeMatrixClient;
209
210    pub const TIME_ZERO: fasync::MonotonicInstant = fasync::MonotonicInstant::from_nanos(0);
211    pub const TIME_ONE_SECOND: fasync::MonotonicInstant =
212        fasync::MonotonicInstant::from_nanos(1_000_000_000);
213
214    pub const TEST_NODE_NAME: &str = "event_test_node";
215
216    pub trait ReactorExt<T, S = ()>: Reactor<T, S> {
217        /// Asserts that `self` observes the given [`Event`] at least once.
218        ///
219        /// If `self` is leaked, then this function asserts nothing.
220        ///
221        /// # Panics
222        ///
223        /// This function panics if `self` has not observed an event that is partially equivalent
224        /// to `expected` when dropped.
225        fn assert_observes_event(
226            self,
227            expected: Event<T>,
228        ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
229        where
230            Self: Sized,
231            T: Clone + Debug + PartialEq,
232        {
233            #[derive(Debug)]
234            struct Assertion<T, R>
235            where
236                T: Debug,
237            {
238                reactor: R,
239                expected: Event<T>,
240                is_observed: bool,
241            }
242
243            impl<T, R> Drop for Assertion<T, R>
244            where
245                T: Debug,
246            {
247                fn drop(&mut self) {
248                    assert!(
249                        self.is_observed,
250                        "reactor never received an expected event before drop: {:?}",
251                        self.expected,
252                    );
253                }
254            }
255
256            impl<T, S, R> Reactor<T, S> for Assertion<T, R>
257            where
258                T: Debug + PartialEq,
259                R: Reactor<T, S>,
260            {
261                type Response = R::Response;
262                type Error = R::Error;
263
264                fn react(
265                    &mut self,
266                    event: Timed<Event<T>>,
267                    context: Context<'_, S>,
268                ) -> Result<Self::Response, Self::Error> {
269                    if &self.expected == event.inner() {
270                        self.is_observed = true;
271                    }
272                    self.reactor.react(event, context)
273                }
274            }
275
276            Assertion { reactor: self, expected, is_observed: false }
277        }
278
279        /// Asserts that `self` reacts to events exactly `n` times.
280        ///
281        /// If `self` is leaked, then this function only asserts that `self` reacts to no more than
282        /// `n` events.
283        ///
284        /// # Panics
285        ///
286        /// This function panics if `self` reacts to more than `n` events or fewer than `n` events
287        /// when dropped.
288        fn assert_reacts_times(
289            self,
290            n: usize,
291        ) -> impl Reactor<T, S, Response = Self::Response, Error = Self::Error>
292        where
293            Self: Sized,
294        {
295            #[derive(Debug)]
296            struct Assertion<T, R> {
297                reactor: R,
298                observed: usize,
299                expected: usize,
300                phantom: PhantomData<fn() -> T>,
301            }
302
303            impl<T, R> Drop for Assertion<T, R> {
304                fn drop(&mut self) {
305                    assert!(
306                        self.observed == self.expected,
307                        "reactor received unexpected number of events on drop: \
308                         observed {}, but expected {}",
309                        self.observed,
310                        self.expected,
311                    );
312                }
313            }
314
315            impl<T, S, R> Reactor<T, S> for Assertion<T, R>
316            where
317                R: Reactor<T, S>,
318            {
319                type Response = R::Response;
320                type Error = R::Error;
321
322                fn react(
323                    &mut self,
324                    event: Timed<Event<T>>,
325                    context: Context<'_, S>,
326                ) -> Result<Self::Response, Self::Error> {
327                    self.observed =
328                        self.observed.checked_add(1).expect("overflow in observed event count");
329                    assert!(
330                        self.observed <= self.expected,
331                        "reactor received unexpected number of events before drop: \
332                         observed {}, but expected {}",
333                        self.observed,
334                        self.expected,
335                    );
336                    self.reactor.react(event, context)
337                }
338            }
339
340            Assertion { reactor: self, observed: 0, expected: n, phantom: PhantomData }
341        }
342    }
343
344    impl<T, S, R> ReactorExt<T, S> for R where R: Reactor<T, S> {}
345
346    /// A data record with counts of transmission outcomes.
347    #[derive(Clone, Copy, Debug)]
348    pub struct TxCount {
349        pub failed: u64,
350        pub retried: u64,
351    }
352
353    /// Constructs an executor with its clock set to time zero.
354    pub fn executor_at_time_zero() -> fasync::TestExecutor {
355        let executor = fasync::TestExecutor::new_with_fake_time();
356        executor.set_fake_time(TIME_ZERO);
357        executor
358    }
359
360    /// Constructs an inspector and child node with the name defined by `TEST_NODE_NAME`.
361    pub fn inspector_and_test_node() -> (Inspector, Node) {
362        let inspector = Inspector::default();
363        let node = inspector.root().create_child(TEST_NODE_NAME);
364        (inspector, node)
365    }
366
367    // This function demonstrates how `Reactor`s can be parameterized and returned from functions.
368    // Such `Reactor`s can be further composed as needed.
369    /// Constructs a `Reactor` that samples `TxCount` fields.
370    pub fn sample_tx_count<'client, 'record>(
371        client: &'client TimeMatrixClient,
372    ) -> impl Reactor<&'record TxCount, (), Response = (), Error = FoldError> {
373        event::on_data_record::<&TxCount, _>(event::then((
374            event::map_data_record(
375                |count: &TxCount, _| count.failed,
376                event::then((
377                    event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
378                        &client,
379                        "tx_failed_sum",
380                        SamplingProfile::granular(),
381                        LastSample::or(0u64),
382                    ),
383                    event::sample_data_record(Max::<u64>::default()).in_time_matrix::<LastSample>(
384                        &client,
385                        "tx_failed_max",
386                        SamplingProfile::granular(),
387                        LastSample::or(0u64),
388                    ),
389                )),
390            ),
391            event::map_data_record(
392                |count: &TxCount, _| count.retried,
393                event::sample_data_record(Sum::<u64>::default()).in_time_matrix::<LastSample>(
394                    &client,
395                    "tx_retried_sum",
396                    SamplingProfile::granular(),
397                    LastSample::or(0u64),
398                ),
399            ),
400        )))
401    }
402
403    /// A `Reactor` of only the unit type `()` that always responds with `Ok`.
404    pub const fn respond(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
405        Ok(())
406    }
407
408    /// A `Reactor` of only the unit type `()` that always fails with `Err`.
409    pub const fn fail(_: Timed<Event<()>>, _: Context<'_, ()>) -> Result<(), ()> {
410        Err(())
411    }
412
413    /// Asserts that an Inspect time matrix server future is `Pending` (not terminated).
414    pub fn assert_inspect_time_matrix_server_polls_pending(
415        executor: &mut fasync::TestExecutor,
416        server: &mut Pin<&mut impl Future>,
417    ) {
418        let Poll::Pending = executor.run_until_stalled(server) else {
419            panic!("time matrix inspection server terminated unexpectedly");
420        };
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
427    use std::pin::pin;
428
429    use crate::experimental::clock::Timed;
430    use crate::experimental::event::harness::{self, ReactorExt as _};
431    use crate::experimental::event::{
432        self, Context, DataEvent, Event, Reactor, ReactorExt as _, SuspendEvent, SystemEvent,
433    };
434    use crate::experimental::series::SamplingProfile;
435    use crate::experimental::series::interpolation::LastSample;
436    use crate::experimental::series::metadata::BitSetMap;
437    use crate::experimental::series::statistic::{Max, Sum, Union};
438    use crate::experimental::serve;
439
440    #[test]
441    #[should_panic]
442    fn observes_event_assertion_observes_no_such_event_then_panics() {
443        let _executor = harness::executor_at_time_zero();
444
445        let mut reactor = harness::respond.assert_observes_event(Event::from_data_record(()));
446        let _ = reactor.react(
447            Timed::now(SystemEvent::Suspend(SuspendEvent::Sleep).into()),
448            Context::from_state(&mut ()),
449        );
450    }
451
452    #[test]
453    #[should_panic]
454    fn reacts_times_assertion_reacts_too_few_times_then_panics() {
455        let _executor = harness::executor_at_time_zero();
456
457        let mut reactor = harness::respond.assert_reacts_times(2);
458        let _ = reactor.react_to_data_record(());
459    }
460
461    #[test]
462    #[should_panic]
463    fn reacts_times_assertion_reacts_too_many_times_then_panics() {
464        let _executor = harness::executor_at_time_zero();
465
466        let mut reactor = harness::respond.assert_reacts_times(1);
467        let _ = reactor.react_to_data_record(());
468        let _ = reactor.react_to_data_record(());
469    }
470
471    #[test]
472    fn then_combinator_reacts_then_subsequent_reacts_on_ok_and_err() {
473        let _executor = harness::executor_at_time_zero();
474
475        let mut reactor =
476            harness::respond.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
477        let _ = reactor.react_to_data_record(());
478
479        let mut reactor =
480            harness::fail.assert_reacts_times(1).then(harness::respond.assert_reacts_times(1));
481        let _ = reactor.react_to_data_record(());
482
483        let mut reactor = event::then((
484            harness::respond.assert_reacts_times(1),
485            harness::fail.assert_reacts_times(1),
486            harness::respond.assert_reacts_times(1),
487        ));
488        let _ = reactor.react_to_data_record(());
489    }
490
491    #[test]
492    fn and_combinator_reacts_then_subsequent_reacts_only_on_ok() {
493        let _executor = harness::executor_at_time_zero();
494
495        let mut reactor =
496            harness::respond.assert_reacts_times(1).and(harness::respond.assert_reacts_times(1));
497        let _ = reactor.react_to_data_record(());
498
499        let mut reactor =
500            harness::fail.assert_reacts_times(1).and(harness::respond.assert_reacts_times(0));
501        let _ = reactor.react_to_data_record(());
502
503        let mut reactor = event::and((
504            harness::respond.assert_reacts_times(1),
505            harness::fail.assert_reacts_times(1),
506            harness::respond.assert_reacts_times(0),
507        ));
508        let _ = reactor.react_to_data_record(());
509    }
510
511    #[test]
512    fn or_combinator_reacts_then_subsequent_reacts_only_on_err() {
513        let _executor = harness::executor_at_time_zero();
514
515        let mut reactor =
516            harness::respond.assert_reacts_times(1).or(harness::respond.assert_reacts_times(0));
517        let _ = reactor.react_to_data_record(());
518
519        let mut reactor =
520            harness::fail.assert_reacts_times(1).or(harness::fail.assert_reacts_times(1));
521        let _ = reactor.react_to_data_record(());
522
523        let mut reactor = event::or((
524            harness::fail.assert_reacts_times(1),
525            harness::respond.assert_reacts_times(1),
526            harness::respond.assert_reacts_times(0),
527        ));
528        let _ = reactor.react_to_data_record(());
529    }
530
531    #[test]
532    fn map_data_record_then_subtree_reacts_to_mapped_record() {
533        let _executor = harness::executor_at_time_zero();
534
535        #[derive(Debug, Eq, PartialEq)]
536        struct Thread {
537            nominal: u128,
538            tpi: u128,
539        }
540
541        let thread = Thread { nominal: 1, tpi: 8 };
542        let mut observed = None;
543        let mut reactor = event::on_data_record::<&Thread, _>(event::map_data_record(
544            |thread: &Thread, _| &thread.tpi,
545            |event: Timed<Event<&u128>>, _: Context<'_, ()>| {
546                let (_, event) = event.into();
547                if let Event::Data(DataEvent { record: tpi, .. }) = event {
548                    observed = Some(*tpi);
549                }
550                Ok::<_, ()>(())
551            },
552        ));
553        let _ = reactor.react_to_data_record(&thread);
554        assert_eq!(observed, Some(8));
555    }
556
557    #[test]
558    fn retain_record_with_filter_map_data_record_then_subtree_reacts_to_mapped_record() {
559        const RECORD: i8 = 0;
560
561        let _executor = harness::executor_at_time_zero();
562
563        let mut observed = None;
564        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
565            // Ignore the `usize` data record and map to `Some` constant `i8`.
566            |_: usize, _| Some(RECORD),
567            |event: Timed<Event<i8>>, _: Context<'_, ()>| {
568                let (_, event) = event.into();
569                if let Event::Data(DataEvent { record, .. }) = event {
570                    observed = Some(record);
571                }
572                Ok::<_, ()>(())
573            },
574        ));
575        let _ = reactor.react_to_data_record(0usize);
576        assert_eq!(observed, Some(RECORD));
577    }
578
579    #[test]
580    fn discard_record_with_filter_map_data_record_then_subtree_does_not_react_to_mapped_record() {
581        let _executor = harness::executor_at_time_zero();
582
583        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
584            // Ignore the `usize` data record, map to `()`, and return `None`.
585            |_: usize, _| None::<()>,
586            harness::respond.assert_reacts_times(0),
587        ));
588        let _ = reactor.react_to_data_record(0usize);
589    }
590
591    // It is important that discarding data events does not interfere with the observation of
592    // system events.
593    #[test]
594    fn discard_record_with_filter_map_data_record_then_subtree_reacts_to_system_event() {
595        const SYSTEM_EVENT: SystemEvent = SystemEvent::Suspend(SuspendEvent::Sleep);
596
597        let _executor = harness::executor_at_time_zero();
598
599        let mut observed = None;
600        let mut reactor = event::on_data_record::<usize, _>(event::filter_map_data_record(
601            // Ignore the `usize` data record, map to `i8`, and return `None`.
602            |_: usize, _| None::<i8>,
603            |event: Timed<Event<i8>>, _: Context<'_, ()>| {
604                let (_, event) = event.into();
605                if let Event::System(event) = event {
606                    observed = Some(event);
607                }
608                Ok::<_, ()>(())
609            },
610        ));
611        let _ = reactor.react(Timed::now(SYSTEM_EVENT.into()), Context::from_state(&mut ()));
612        // Despite discarding any and all data records, the system event must be observed.
613        assert_eq!(observed, Some(SYSTEM_EVENT));
614    }
615
616    #[test]
617    fn with_state_then_subtree_reacts_to_state() {
618        let _executor = harness::executor_at_time_zero();
619
620        #[derive(Debug, Eq, PartialEq)]
621        struct ReactorState {
622            n: u128,
623        }
624
625        let mut observed = None;
626        let mut reactor = event::on_data_record::<(), _>(event::with_state(
627            ReactorState { n: 8 },
628            |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
629                observed = Some(context.state.n);
630                Ok::<_, ()>(())
631            },
632        ));
633        let _ = reactor.react_to_data_record(());
634        assert_eq!(observed, Some(8));
635    }
636
637    #[test]
638    fn write_state_then_subtree_reacts_to_written_state() {
639        let _executor = harness::executor_at_time_zero();
640
641        let mut reactor = event::on_data_record::<(), _>(event::with_state(
642            String::from("hello"),
643            event::then((
644                {
645                    |_: Timed<Event<()>>, context: Context<'_, String>| {
646                        assert_eq!(context.state, "hello");
647                        *context.state = String::from("goodbye");
648                        Ok::<_, ()>(())
649                    }
650                }
651                .assert_reacts_times(1),
652                {
653                    |_: Timed<Event<()>>, context: Context<'_, String>| {
654                        assert_eq!(context.state, "goodbye");
655                        Ok::<_, ()>(())
656                    }
657                }
658                .assert_reacts_times(1),
659            )),
660        ));
661        let _ = reactor.react_to_data_record(());
662    }
663
664    #[test]
665    fn map_state_then_subtree_reacts_to_mapped_state() {
666        let _executor = harness::executor_at_time_zero();
667
668        #[derive(Debug, Eq, PartialEq)]
669        struct ReactorState {
670            n: u128,
671        }
672
673        let mut observed = None;
674        let mut reactor = event::on_data_record::<(), _>(event::map_state(
675            |_| ReactorState { n: 8 },
676            |_: Timed<Event<()>>, context: Context<'_, ReactorState>| {
677                observed = Some(context.state.n);
678                Ok::<_, ()>(())
679            },
680        ));
681        let _ = reactor.react_to_data_record(());
682        assert_eq!(observed, Some(8));
683    }
684
685    #[test]
686    fn construct_reactor_with_samplers_then_inspect_data_tree_contains_buffers() {
687        let mut executor = harness::executor_at_time_zero();
688        let (inspector, node) = harness::inspector_and_test_node();
689
690        let (client, server) = serve::serve_time_matrix_inspection(node);
691        let mut server = pin!(server);
692        let _reactor = harness::sample_tx_count(&client);
693
694        executor.set_fake_time(harness::TIME_ONE_SECOND);
695        harness::assert_inspect_time_matrix_server_polls_pending(&mut executor, &mut server);
696        assert_data_tree!(
697            @executor executor,
698            inspector,
699            root: contains {
700                event_test_node: {
701                    tx_failed_sum: {
702                        "type": "gauge",
703                        "data": AnyBytesProperty,
704                    },
705                    tx_failed_max: {
706                        "type": "gauge",
707                        "data": AnyBytesProperty,
708                    },
709                    tx_retried_sum: {
710                        "type": "gauge",
711                        "data": AnyBytesProperty,
712                    },
713                },
714            }
715        );
716    }
717
718    #[test]
719    fn construct_reactor_with_metadata_then_inspect_data_tree_contains_metadata() {
720        use Connectivity::Idle;
721
722        #[derive(Clone, Copy, Debug, Eq, PartialEq)]
723        #[repr(u64)]
724        enum Connectivity {
725            Idle = 1 << 0,
726            Disconnected = 1 << 1,
727            Connected = 1 << 2,
728        }
729
730        let mut executor = harness::executor_at_time_zero();
731        let (inspector, node) = harness::inspector_and_test_node();
732
733        let (client, server) = serve::serve_time_matrix_inspection(node);
734        let mut server = pin!(server);
735        let _reactor = event::on_data_record::<Connectivity, _>(event::map_data_record(
736            |connectivity, _| connectivity as u64,
737            event::sample_data_record(Union::<u64>::default())
738                .with_metadata(BitSetMap::from_ordered(["idle", "disconnected", "connected"]))
739                .in_time_matrix::<LastSample>(
740                    &client,
741                    "connectivity",
742                    SamplingProfile::granular(),
743                    LastSample::or(Idle as u64),
744                ),
745        ));
746
747        executor.set_fake_time(harness::TIME_ONE_SECOND);
748        harness::assert_inspect_time_matrix_server_polls_pending(&mut executor, &mut server);
749        assert_data_tree!(
750            @executor executor,
751            inspector,
752            root: contains {
753                event_test_node: {
754                    connectivity: {
755                        "type": "bitset",
756                        "data": AnyBytesProperty,
757                        metadata: {
758                            index: {
759                                "0": "idle",
760                                "1": "disconnected",
761                                "2": "connected",
762                            }
763                        }
764                    },
765                },
766            }
767        );
768    }
769
770    #[test]
771    fn sample_data_record_fields_with_reactor_then_reacts_one_time_with_mapped_fields() {
772        let executor = harness::executor_at_time_zero();
773        let (_inspector, node) = harness::inspector_and_test_node();
774
775        let (client, _server) = serve::serve_time_matrix_inspection(node);
776        let mut reactor = event::on_data_record::<&harness::TxCount, _>(event::then((
777            event::map_data_record(
778                |count: &harness::TxCount, _| count.failed,
779                event::then((
780                    event::sample_data_record(Sum::<u64>::default())
781                        .in_time_matrix::<LastSample>(
782                            &client,
783                            "tx_failed_sum",
784                            SamplingProfile::granular(),
785                            LastSample::or(0u64),
786                        )
787                        .assert_observes_event(Event::from_data_record(1))
788                        .assert_reacts_times(1),
789                    event::sample_data_record(Max::<u64>::default())
790                        .in_time_matrix::<LastSample>(
791                            &client,
792                            "tx_failed_max",
793                            SamplingProfile::granular(),
794                            LastSample::or(0u64),
795                        )
796                        .assert_observes_event(Event::from_data_record(1))
797                        .assert_reacts_times(1),
798                )),
799            ),
800            event::map_data_record(
801                |count: &harness::TxCount, _| count.retried,
802                event::sample_data_record(Sum::<u64>::default())
803                    .in_time_matrix::<LastSample>(
804                        &client,
805                        "tx_retried_sum",
806                        SamplingProfile::granular(),
807                        LastSample::or(0u64),
808                    )
809                    .assert_observes_event(Event::from_data_record(3))
810                    .assert_reacts_times(1),
811            ),
812        )));
813
814        executor.set_fake_time(harness::TIME_ONE_SECOND);
815        reactor.react_to_data_record(&harness::TxCount { failed: 1, retried: 3 }).unwrap();
816    }
817}