Skip to main content

timekeeper_integration_lib/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_io as fio;
10use fidl_fuchsia_metrics::MetricEvent;
11use fidl_fuchsia_metrics_test::{LogMethod, MetricEventLoggerQuerierProxy};
12use fidl_fuchsia_testing::{FakeClockControlProxy, FakeClockProxy};
13use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
14use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
15use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
16use fuchsia_async as fasync;
17use fuchsia_component::server::ServiceFs;
18use fuchsia_component_test::{
19    Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
20    Route,
21};
22use fuchsia_sync::Mutex;
23use futures::channel::mpsc::Sender;
24use futures::stream::{Stream, StreamExt, TryStreamExt};
25use futures::{Future, FutureExt, SinkExt};
26use push_source::{PushSource, TestUpdateAlgorithm, Update};
27use std::ops::Deref;
28use std::sync::{Arc, LazyLock};
29use time_metrics_registry::PROJECT_ID;
30use vfs::pseudo_directory;
31use zx::{self as zx, Rights};
32
33/// URL for timekeeper.
34const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
35/// URL for timekeeper with fake time.
36const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
37/// URL for fake cobalt.
38const COBALT_URL: &str = "#meta/fake_cobalt.cm";
39/// URL for the fake clock component.
40const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
41
42/// A reference to a timekeeper running inside a nested environment which runs fake versions of
43/// the services timekeeper requires.
44pub struct NestedTimekeeper {
45    _realm_instance: RealmInstance,
46}
47
48impl Into<RealmInstance> for NestedTimekeeper {
49    // Deconstructs [Self] into an underlying [RealmInstance].
50    fn into(self) -> RealmInstance {
51        self._realm_instance
52    }
53}
54
55impl NestedTimekeeper {
56    /// Creates a new [NestedTimekeeper].
57    ///
58    /// Launches an instance of timekeeper maintaining the provided |clock| in a nested
59    /// environment.
60    ///
61    /// If |initial_rtc_time| is provided, then the environment contains a fake RTC
62    /// device that reports the time as |initial_rtc_time|.
63    ///
64    /// If use_fake_clock is true, also launches a fake monotonic clock service.
65    ///
66    /// Returns a `NestedTimekeeper`, handles to the PushSource and RTC it obtains updates from,
67    /// Cobalt debug querier, and a fake clock control handle if use_fake_clock is true.
68    pub async fn new(
69        clock: Arc<zx::Clock>,
70        rtc_options: RtcOptions,
71        use_fake_clock: bool,
72    ) -> (
73        Self,
74        Arc<PushSourcePuppet>,
75        RtcUpdates,
76        MetricEventLoggerQuerierProxy,
77        Option<FakeClockController>,
78    ) {
79        let push_source_puppet = Arc::new(PushSourcePuppet::new());
80
81        let builder = RealmBuilder::new().await.unwrap();
82        let fake_cobalt =
83            builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
84
85        let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
86        log::trace!("using timekeeper_url: {}", timekeeper_url);
87        let timekeeper = builder
88            .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
89            .await
90            .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
91            .unwrap();
92
93        let timesource_server = builder
94            .add_local_child(
95                "timesource_mock",
96                {
97                    let push_source_puppet = Arc::clone(&push_source_puppet);
98                    move |handles: LocalComponentHandles| {
99                        Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
100                    }
101                },
102                ChildOptions::new(),
103            )
104            .await
105            .context("while starting up timesource_mock")
106            .unwrap();
107
108        let maintenance_server = builder
109            .add_local_child(
110                "maintenance_mock",
111                move |handles: LocalComponentHandles| {
112                    Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
113                },
114                ChildOptions::new(),
115            )
116            .await
117            .context("while starting up maintenance_mock")
118            .unwrap();
119
120        // Launch fake clock if needed.
121        if use_fake_clock {
122            let fake_clock =
123                builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
124
125            builder
126                .add_route(
127                    Route::new()
128                        .capability(Capability::protocol_by_name(
129                            "fuchsia.testing.FakeClockControl",
130                        ))
131                        .from(&fake_clock)
132                        .to(Ref::parent()),
133                )
134                .await
135                .context("while setting up FakeClockControl")
136                .unwrap();
137
138            builder
139                .add_route(
140                    Route::new()
141                        .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
142                        .from(&fake_clock)
143                        .to(Ref::parent())
144                        .to(&timekeeper),
145                )
146                .await
147                .context("while setting up FakeClock")
148                .unwrap();
149
150            builder
151                .add_route(
152                    Route::new()
153                        .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
154                        .from(Ref::parent())
155                        .to(&fake_clock),
156                )
157                .await
158                .context("while setting up LogSink")
159                .unwrap();
160        };
161
162        builder
163            .add_route(
164                Route::new()
165                    .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
166                    .from(&maintenance_server)
167                    .to(&timekeeper),
168            )
169            .await
170            .context("while setting up Maintenance")
171            .unwrap();
172
173        builder
174            .add_route(
175                Route::new()
176                    .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
177                    .from(&timesource_server)
178                    .to(&timekeeper),
179            )
180            .await
181            .unwrap();
182
183        builder
184            .add_route(
185                Route::new()
186                    .capability(Capability::protocol_by_name(
187                        "fuchsia.metrics.test.MetricEventLoggerQuerier",
188                    ))
189                    .from(&fake_cobalt)
190                    .to(Ref::parent()),
191            )
192            .await
193            .unwrap();
194
195        builder
196            .add_route(
197                Route::new()
198                    .capability(Capability::protocol_by_name(
199                        "fuchsia.metrics.MetricEventLoggerFactory",
200                    ))
201                    .from(&fake_cobalt)
202                    .to(&timekeeper),
203            )
204            .await
205            .unwrap();
206
207        builder
208            .add_route(
209                Route::new()
210                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
211                    .from(Ref::parent())
212                    .to(&fake_cobalt)
213                    .to(&timekeeper)
214                    .to(&timesource_server)
215                    .to(&maintenance_server),
216            )
217            .await
218            .unwrap();
219
220        builder
221            .add_route(
222                Route::new()
223                    .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
224                    .from(Ref::parent())
225                    .to(&timekeeper),
226            )
227            .await
228            .unwrap();
229
230        let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
231        let realm_instance = builder.build().await.unwrap();
232
233        let fake_clock_control = if use_fake_clock {
234            let control_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
235            let clock_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
236            Some(FakeClockController { control_proxy, clock_proxy })
237        } else {
238            None
239        };
240
241        let cobalt_querier = realm_instance
242            .root
243            .connect_to_protocol_at_exposed_dir()
244            .expect("the connection succeeds");
245
246        let nested_timekeeper = Self { _realm_instance: realm_instance };
247
248        (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
249    }
250}
251
252pub struct RemotePushSourcePuppet {
253    proxy: fidl_test_time_realm::PushSourcePuppetProxy,
254}
255
256impl RemotePushSourcePuppet {
257    /// Creates a new [RemotePushSourcePuppet].
258    pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
259        Arc::new(Self { proxy })
260    }
261
262    /// Set the next sample reported by the time source.
263    pub async fn set_sample(&self, sample: TimeSample) {
264        self.proxy.set_sample(&sample).await.expect("original API was infallible");
265    }
266
267    /// Set the next status reported by the time source.
268    pub async fn set_status(&self, status: Status) {
269        self.proxy.set_status(status).await.expect("original API was infallible");
270    }
271
272    /// Simulate a crash by closing client channels and wiping state.
273    pub async fn simulate_crash(&self) {
274        self.proxy.crash().await.expect("original local API was infallible");
275    }
276
277    /// Returns the number of cumulative connections served. This allows asserting
278    /// behavior such as whether Timekeeper has restarted a connection.
279    pub async fn lifetime_served_connections(&self) -> u32 {
280        self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
281    }
282}
283
284/// A `PushSource` that allows a single client and can be controlled by a test.
285pub struct PushSourcePuppet {
286    /// Internal state for the current PushSource. May be dropped and replaced
287    /// to clear all state.
288    inner: Mutex<PushSourcePuppetInner>,
289    /// The number of client connections received over the lifetime of the puppet.
290    cumulative_clients: Mutex<u32>,
291}
292
293impl PushSourcePuppet {
294    /// Create a new `PushSourcePuppet`.
295    fn new() -> Self {
296        Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
297    }
298
299    /// Serve the `PushSource` service to a client.
300    fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
301        log::debug!("serve_client entry");
302        let mut inner = self.inner.lock();
303        // Timekeeper should only need to connect to a push source once, except when it is
304        // restarting a time source. This case appears to the test as a second connection to the
305        // puppet. Since the puppet is restarted, all its state should be cleared as well.
306        if inner.served_client() {
307            *inner = PushSourcePuppetInner::new();
308        }
309        inner.serve_client(server_end);
310        *self.cumulative_clients.lock() += 1;
311    }
312
313    /// Set the next sample reported by the time source.
314    pub async fn set_sample(&self, sample: TimeSample) {
315        let mut sink = self.inner.lock().get_sink();
316        sink.send(sample.into()).await.unwrap();
317    }
318
319    /// Set the next status reported by the time source.
320    pub async fn set_status(&self, status: Status) {
321        let mut sink = self.inner.lock().get_sink();
322        sink.send(status.into()).await.unwrap();
323    }
324
325    /// Simulate a crash by closing client channels and wiping state.
326    pub fn simulate_crash(&self) {
327        *self.inner.lock() = PushSourcePuppetInner::new();
328        // This drops the old inner and cleans up any tasks it owns.
329    }
330
331    /// Returns the number of cumulative connections served. This allows asserting
332    /// behavior such as whether Timekeeper has restarted a connection.
333    pub fn lifetime_served_connections(&self) -> u32 {
334        *self.cumulative_clients.lock()
335    }
336}
337
338/// Internal state for a PushSourcePuppet. This struct contains a PushSource and
339/// all Tasks needed for it to serve requests,
340struct PushSourcePuppetInner {
341    push_source: Arc<PushSource<TestUpdateAlgorithm>>,
342    /// Tasks serving PushSource clients.
343    tasks: Vec<fasync::Task<()>>,
344    /// Sink through which updates are passed to the PushSource.
345    update_sink: Sender<Update>,
346}
347
348impl PushSourcePuppetInner {
349    fn new() -> Self {
350        let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
351        let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
352        let push_source_clone = Arc::clone(&push_source);
353        let tasks = vec![fasync::Task::spawn(async move {
354            push_source_clone.poll_updates().await.unwrap();
355        })];
356        Self { push_source, tasks, update_sink }
357    }
358
359    /// Returns true if this puppet has or is currently serving a client.
360    fn served_client(&self) -> bool {
361        self.tasks.len() > 1
362    }
363
364    /// Serve the `PushSource` service to a client.
365    fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
366        let push_source_clone = Arc::clone(&self.push_source);
367        self.tasks.push(fasync::Task::spawn(async move {
368            push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
369        }));
370    }
371
372    /// Obtains the sink used to send commands to the push source puppet.
373    ///
374    /// The sink is detached from the puppet, so can be used whenever needed
375    /// without locking.
376    fn get_sink(&self) -> Sender<Update> {
377        self.update_sink.clone()
378    }
379}
380
381/// The list of RTC update requests received by a `NestedTimekeeper`.
382#[derive(Clone, Debug)]
383pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
384
385impl RtcUpdates {
386    /// Get all received RTC times as a vec.
387    pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
388        self.0.lock().clone()
389    }
390}
391
392/// Remote RTC updates - peek into the life of the RTC on the other side of a
393/// RTC connection.
394pub struct RemoteRtcUpdates {
395    proxy: fidl_test_time_realm::RtcUpdatesProxy,
396}
397
398impl RemoteRtcUpdates {
399    pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
400        self.proxy
401            .get(fidl_test_time_realm::GetRequest::default())
402            .await
403            .expect("no errors or overflows") // Original API was infallible.
404            .unwrap()
405            .0
406    }
407    pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
408        RemoteRtcUpdates { proxy }
409    }
410}
411
412/// A wrapper around a `FakeClockControlProxy` that also allows a client to read
413/// the current fake time.
414pub struct FakeClockController {
415    control_proxy: FakeClockControlProxy,
416    clock_proxy: FakeClockProxy,
417}
418
419impl Deref for FakeClockController {
420    type Target = FakeClockControlProxy;
421
422    fn deref(&self) -> &Self::Target {
423        &self.control_proxy
424    }
425}
426
427impl FakeClockController {
428    /// Re-constructs FakeClockController from the constituents.
429    pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
430        FakeClockController { control_proxy, clock_proxy }
431    }
432
433    /// Deconstructs [Self] into fake clock proxies.
434    pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
435        (self.control_proxy, self.clock_proxy)
436    }
437
438    pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
439        self.clock_proxy.get().await.map(|(_boot, mono)| mono.into_nanos())
440    }
441
442    /// Returns the current fake instant on the reference timeline.
443    pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
444        self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
445    }
446}
447
448/// The RTC configuration options.
449pub enum RtcOptions {
450    /// No real-time clock available. This configuration simulates a system that
451    /// does not have a RTC circuit available.
452    None,
453    /// Fake real-time clock. Supplied initial RTC time to report.
454    InitialRtcTime(zx::SyntheticInstant),
455    /// Injected real-time clock.
456    ///
457    /// This is the handle that will appear as the directory
458    /// `/dev/class/rtc` in the Timekeeper's namespace.
459    ///
460    /// The caller must set this directory up so that it serves
461    /// a RTC device (e.g. named `/dev/class/rtc/000`, and serving
462    /// the FIDL `fuchsia.hardware.rtc/Device`) from this directory.
463    ///
464    /// It is also possible to serve more RTCs from the directory, or
465    /// other files and file types at the caller's option.
466    ///
467    /// Use this option if you need to implement corner cases, or
468    /// very specific RTC behavior, such as abnormal configuration
469    /// or anomalous behavior.
470    InjectedRtc(fio::DirectoryProxy),
471}
472
473impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
474    fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
475        match value {
476            fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
477                RtcOptions::InjectedRtc(h.into_proxy())
478            }
479            fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
480                RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
481            }
482            _ => unimplemented!(),
483        }
484    }
485}
486
487impl From<zx::SyntheticInstant> for RtcOptions {
488    fn from(value: zx::SyntheticInstant) -> Self {
489        RtcOptions::InitialRtcTime(value)
490    }
491}
492
493impl From<Option<zx::SyntheticInstant>> for RtcOptions {
494    fn from(value: Option<zx::SyntheticInstant>) -> Self {
495        value.map(|t| t.into()).unwrap_or(Self::None)
496    }
497}
498
499/// Sets up the RTC serving.
500///
501/// Args:
502/// - `rtc_options`: options for RTC setup.
503/// - `build`: the `RealmBuilder` that will construct the realm.
504/// - `timekeeper`: the Timekeeper component instance.
505///
506/// Returns:
507/// - `RtcUpdates`: A vector of RTC updates received from a fake RTC. If the
508///   client serves the RTC directory, then the return value is useless.
509async fn setup_rtc(
510    rtc_options: RtcOptions,
511    builder: &RealmBuilder,
512    timekeeper: &ChildRef,
513) -> RtcUpdates {
514    let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
515
516    let rtc_dir = match rtc_options {
517        RtcOptions::InitialRtcTime(initial_time) => {
518            log::debug!("using fake /dev/class/rtc/000");
519            pseudo_directory! {
520                "class" => pseudo_directory! {
521                    "rtc" => pseudo_directory! {
522                        "000" => vfs::service::host({
523                            let rtc_updates = rtc_updates.clone();
524                            move |stream| {
525                                serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
526                            }
527                        })
528                    }
529                }
530            }
531        }
532        RtcOptions::None => {
533            log::debug!("using an empty /dev/class/rtc directory");
534            pseudo_directory! {
535                "class" => pseudo_directory! {
536                    "rtc" => pseudo_directory! {
537                    }
538                }
539            }
540        }
541        RtcOptions::InjectedRtc(h) => {
542            log::debug!("using /dev/class/rtc provided by client");
543            pseudo_directory! {
544                "class" => pseudo_directory! {
545                    "rtc" => vfs::remote::remote_dir(h)
546                }
547            }
548        }
549    };
550
551    let fake_rtc_server = builder
552        .add_local_child(
553            "fake_rtc",
554            {
555                move |handles| {
556                    let rtc_dir = rtc_dir.clone();
557                    async move {
558                        let _ = &handles;
559                        let mut fs = ServiceFs::new();
560                        fs.add_remote(
561                            "dev",
562                            vfs::directory::serve_read_only(
563                                rtc_dir,
564                                vfs::execution_scope::ExecutionScope::new(),
565                            ),
566                        );
567                        fs.serve_connection(handles.outgoing_dir)
568                            .expect("failed to serve fake RTC ServiceFs");
569                        fs.collect::<()>().await;
570                        Ok(())
571                    }
572                    .boxed()
573                }
574            },
575            ChildOptions::new().eager(),
576        )
577        .await
578        .unwrap();
579
580    builder
581        .add_route(
582            Route::new()
583                .capability(
584                    Capability::directory("dev-rtc").path("/dev/class/rtc").rights(fio::R_STAR_DIR),
585                )
586                .from(&fake_rtc_server)
587                .to(&*timekeeper),
588        )
589        .await
590        .unwrap();
591
592    rtc_updates
593}
594
595// Emulates the low-level RTC device driver with a fake which gets given the initial
596// reading, and can be set externally.
597async fn serve_fake_rtc(
598    initial_time: zx::SyntheticInstant,
599    rtc_updates: RtcUpdates,
600    mut stream: DeviceRequestStream,
601) {
602    let mut current = zx_time_to_rtc_time(initial_time);
603    while let Some(req) = stream.try_next().await.unwrap() {
604        match req {
605            DeviceRequest::Get { responder } => {
606                log::debug!("serve_fake_rtc: DeviceRequest::Get: {current:?}");
607                responder.send(Ok(&current)).unwrap();
608            }
609            DeviceRequest::Set2 { rtc, responder } => {
610                log::debug!("serve_fake_rtc: DeviceRequest::Set2: {rtc:?}");
611                rtc_updates.0.lock().push(rtc);
612                current = rtc;
613                responder.send(Ok(())).unwrap();
614            }
615            DeviceRequest::_UnknownMethod { .. } => {}
616        }
617    }
618}
619
620async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
621    stream
622        .try_for_each_concurrent(None, |req| async {
623            let _ = &req;
624            let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
625            puppet.serve_client(push_source);
626            Ok(())
627        })
628        .await
629        .unwrap();
630}
631
632async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
633    while let Some(req) = stream.try_next().await.unwrap() {
634        let MaintenanceRequest::GetWritableUtcClock { responder } = req;
635        responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
636    }
637}
638
639async fn timesource_mock_server(
640    handles: LocalComponentHandles,
641    push_source_puppet: Arc<PushSourcePuppet>,
642) -> Result<(), anyhow::Error> {
643    let mut fs = ServiceFs::new();
644    let mut tasks = vec![];
645
646    fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
647        let puppet_clone = Arc::clone(&push_source_puppet);
648
649        tasks.push(fasync::Task::local(async move {
650            serve_test_control(&*puppet_clone, stream).await;
651        }));
652    });
653
654    fs.serve_connection(handles.outgoing_dir)?;
655    fs.collect::<()>().await;
656
657    Ok(())
658}
659
660async fn maintenance_mock_server(
661    handles: LocalComponentHandles,
662    clock: Arc<zx::Clock>,
663) -> Result<(), anyhow::Error> {
664    let mut fs = ServiceFs::new();
665    let mut tasks = vec![];
666
667    fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
668        let clock_clone = Arc::clone(&clock);
669
670        tasks.push(fasync::Task::local(async move {
671            serve_maintenance(clock_clone, stream).await;
672        }));
673    });
674
675    fs.serve_connection(handles.outgoing_dir)?;
676    fs.collect::<()>().await;
677
678    Ok(())
679}
680
681fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
682    zx::SyntheticInstant::from_nanos(
683        chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
684    )
685}
686
687pub static BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
688    LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT"));
689pub static VALID_RTC_TIME: LazyLock<zx::SyntheticInstant> =
690    LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT"));
691pub static BEFORE_BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
692    LazyLock::new(|| from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT"));
693pub static VALID_TIME: LazyLock<zx::SyntheticInstant> =
694    LazyLock::new(|| from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT"));
695pub static VALID_TIME_2: LazyLock<zx::SyntheticInstant> =
696    LazyLock::new(|| from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT"));
697
698/// Time between each reported sample.
699pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
700
701/// The standard deviation to report on valid time samples.
702pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
703
704/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
705// TODO: b/306024715 - To be removed once all tests are migrated to TTRF.
706pub fn new_clock() -> Arc<zx::SyntheticClock> {
707    Arc::new(new_nonshareable_clock())
708}
709
710/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
711pub fn new_nonshareable_clock() -> zx::SyntheticClock {
712    zx::SyntheticClock::create(zx::ClockOpts::MAPPABLE, Some(*BACKSTOP_TIME)).unwrap()
713}
714
715fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
716    let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
717    fidl_fuchsia_hardware_rtc::Time {
718        seconds: date.second() as u8,
719        minutes: date.minute() as u8,
720        hours: date.hour() as u8,
721        day: date.day() as u8,
722        month: date.month() as u8,
723        year: date.year() as u16,
724    }
725}
726
727pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
728    let date = chrono::Utc
729        .with_ymd_and_hms(
730            rtc_time.year as i32,
731            rtc_time.month as u32,
732            rtc_time.day as u32,
733            rtc_time.hours as u32,
734            rtc_time.minutes as u32,
735            rtc_time.seconds as u32,
736        )
737        .unwrap();
738    zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
739}
740
741/// Create a stream of MetricEvents from a proxy.
742pub fn create_cobalt_event_stream(
743    proxy: Arc<MetricEventLoggerQuerierProxy>,
744    log_method: LogMethod,
745) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
746    async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
747        p.watch_logs(PROJECT_ID, log_method)
748    })
749    .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
750    .flatten()
751    .boxed()
752}
753
754/// Repeatedly evaluates `condition` until it returns `Some(v)`. Returns `v`.
755#[macro_export]
756macro_rules! poll_until_some {
757    ($condition:expr) => {
758        $crate::poll_until_some_impl(
759            $condition,
760            &$crate::SourceLocation::new(file!(), line!(), column!()),
761        )
762    };
763}
764
765/// Repeatedly evaluates an async `condition` until it returns `Some(v)`. Returns `v`.
766/// Use if your condition is an async fn.
767#[macro_export]
768macro_rules! poll_until_some_async {
769    ($condition:expr) => {{
770        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
771        log::info!("=> poll_until_some_async() for {}", &loc);
772        let mut result = None;
773        loop {
774            result = $condition.await;
775            if result.is_some() {
776                break;
777            }
778            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
779        }
780        log::info!("=> poll_until_some_async() done for {}", &loc);
781        result.expect("we loop around while result is None")
782    }};
783}
784
785/// Repeatedly evaluates `condition` to create a `Future`, and then awaits the `Future`.
786/// Returns `()` when the (most recently created) `Future` resolves to `true`.
787#[macro_export]
788macro_rules! poll_until_async {
789    ($condition:expr) => {
790        $crate::poll_until_async_impl(
791            $condition,
792            &$crate::SourceLocation::new(file!(), line!(), column!()),
793        )
794    };
795}
796
797/// A reimplementation of the above, which deals better with borrows.
798#[macro_export]
799macro_rules! poll_until_async_2 {
800    ($condition:expr) => {{
801        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
802        log::info!("=> poll_until_async() for {}", &loc);
803        let mut result = true;
804        loop {
805            result = $condition.await;
806            if result {
807                break;
808            }
809            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
810        }
811        log::info!("=> poll_until_async_2() done for {}", &loc);
812        result
813    }};
814}
815
816/// Repeatedly evaluates `condition` until it returns `true`. Returns `()`.
817#[macro_export]
818macro_rules! poll_until {
819    ($condition:expr) => {
820        $crate::poll_until_impl(
821            $condition,
822            &$crate::SourceLocation::new(file!(), line!(), column!()),
823        )
824    };
825}
826
827/// Wait duration for polling.
828pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
829
830pub struct SourceLocation {
831    file: &'static str,
832    line: u32,
833    column: u32,
834}
835
836impl std::fmt::Display for SourceLocation {
837    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
838        write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
839    }
840}
841
842impl SourceLocation {
843    pub fn new(file: &'static str, line: u32, column: u32) -> Self {
844        Self { file, line, column }
845    }
846}
847
848/// Use `poll_until_some!()` instead.
849pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
850where
851    F: Fn() -> Option<T>,
852{
853    log::info!("=> poll_until_some() for {}", loc);
854    loop {
855        match poll_fn() {
856            Some(value) => {
857                log::info!("<= poll_until_some() for {}", loc);
858                return value;
859            }
860            None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
861        }
862    }
863}
864
865/// Use `poll_until_async!()` instead.
866pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
867where
868    F: Fn() -> Fut,
869    Fut: Future<Output = bool>,
870{
871    log::info!("=> poll_until_async() for {}", loc);
872    while !poll_fn().await {
873        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
874    }
875    log::info!("<= poll_until_async() for {}", loc);
876}
877
878/// Use `poll_until!()` instead.
879pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
880    log::info!("=> poll_until() for {}", loc);
881    while !poll_fn() {
882        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
883    }
884    log::info!("<= poll_until() for {}", loc);
885}