timekeeper_integration_lib/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_metrics::MetricEvent;
10use fidl_fuchsia_metrics_test::{LogMethod, MetricEventLoggerQuerierProxy};
11use fidl_fuchsia_testing::{FakeClockControlProxy, FakeClockProxy};
12use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
13use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
14use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
15use fuchsia_component::server::ServiceFs;
16use fuchsia_component_test::{
17    Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
18    Route,
19};
20use fuchsia_sync::Mutex;
21use futures::channel::mpsc::Sender;
22use futures::stream::{Stream, StreamExt, TryStreamExt};
23use futures::{Future, FutureExt, SinkExt};
24use push_source::{PushSource, TestUpdateAlgorithm, Update};
25use std::ops::Deref;
26use std::sync::{Arc, LazyLock};
27use time_metrics_registry::PROJECT_ID;
28use vfs::pseudo_directory;
29use zx::{self as zx, HandleBased, Rights};
30use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
31
32/// URL for timekeeper.
33const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
34/// URL for timekeeper with fake time.
35const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
36/// URL for fake cobalt.
37const COBALT_URL: &str = "#meta/fake_cobalt.cm";
38/// URL for the fake clock component.
39const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
40
41/// A reference to a timekeeper running inside a nested environment which runs fake versions of
42/// the services timekeeper requires.
43pub struct NestedTimekeeper {
44    _realm_instance: RealmInstance,
45}
46
47impl Into<RealmInstance> for NestedTimekeeper {
48    // Deconstructs [Self] into an underlying [RealmInstance].
49    fn into(self) -> RealmInstance {
50        self._realm_instance
51    }
52}
53
54impl NestedTimekeeper {
55    /// Creates a new [NestedTimekeeper].
56    ///
57    /// Launches an instance of timekeeper maintaining the provided |clock| in a nested
58    /// environment.
59    ///
60    /// If |initial_rtc_time| is provided, then the environment contains a fake RTC
61    /// device that reports the time as |initial_rtc_time|.
62    ///
63    /// If use_fake_clock is true, also launches a fake monotonic clock service.
64    ///
65    /// Returns a `NestedTimekeeper`, handles to the PushSource and RTC it obtains updates from,
66    /// Cobalt debug querier, and a fake clock control handle if use_fake_clock is true.
67    pub async fn new(
68        clock: Arc<zx::Clock>,
69        rtc_options: RtcOptions,
70        use_fake_clock: bool,
71    ) -> (
72        Self,
73        Arc<PushSourcePuppet>,
74        RtcUpdates,
75        MetricEventLoggerQuerierProxy,
76        Option<FakeClockController>,
77    ) {
78        let push_source_puppet = Arc::new(PushSourcePuppet::new());
79
80        let builder = RealmBuilder::new().await.unwrap();
81        let fake_cobalt =
82            builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
83
84        let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
85        log::trace!("using timekeeper_url: {}", timekeeper_url);
86        let timekeeper = builder
87            .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
88            .await
89            .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
90            .unwrap();
91
92        let timesource_server = builder
93            .add_local_child(
94                "timesource_mock",
95                {
96                    let push_source_puppet = Arc::clone(&push_source_puppet);
97                    move |handles: LocalComponentHandles| {
98                        Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
99                    }
100                },
101                ChildOptions::new(),
102            )
103            .await
104            .context("while starting up timesource_mock")
105            .unwrap();
106
107        let maintenance_server = builder
108            .add_local_child(
109                "maintenance_mock",
110                move |handles: LocalComponentHandles| {
111                    Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
112                },
113                ChildOptions::new(),
114            )
115            .await
116            .context("while starting up maintenance_mock")
117            .unwrap();
118
119        // Launch fake clock if needed.
120        if use_fake_clock {
121            let fake_clock =
122                builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
123
124            builder
125                .add_route(
126                    Route::new()
127                        .capability(Capability::protocol_by_name(
128                            "fuchsia.testing.FakeClockControl",
129                        ))
130                        .from(&fake_clock)
131                        .to(Ref::parent()),
132                )
133                .await
134                .context("while setting up FakeClockControl")
135                .unwrap();
136
137            builder
138                .add_route(
139                    Route::new()
140                        .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
141                        .from(&fake_clock)
142                        .to(Ref::parent())
143                        .to(&timekeeper),
144                )
145                .await
146                .context("while setting up FakeClock")
147                .unwrap();
148
149            builder
150                .add_route(
151                    Route::new()
152                        .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
153                        .from(Ref::parent())
154                        .to(&fake_clock),
155                )
156                .await
157                .context("while setting up LogSink")
158                .unwrap();
159        };
160
161        builder
162            .add_route(
163                Route::new()
164                    .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
165                    .from(&maintenance_server)
166                    .to(&timekeeper),
167            )
168            .await
169            .context("while setting up Maintenance")
170            .unwrap();
171
172        builder
173            .add_route(
174                Route::new()
175                    .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
176                    .from(&timesource_server)
177                    .to(&timekeeper),
178            )
179            .await
180            .unwrap();
181
182        builder
183            .add_route(
184                Route::new()
185                    .capability(Capability::protocol_by_name(
186                        "fuchsia.metrics.test.MetricEventLoggerQuerier",
187                    ))
188                    .from(&fake_cobalt)
189                    .to(Ref::parent()),
190            )
191            .await
192            .unwrap();
193
194        builder
195            .add_route(
196                Route::new()
197                    .capability(Capability::protocol_by_name(
198                        "fuchsia.metrics.MetricEventLoggerFactory",
199                    ))
200                    .from(&fake_cobalt)
201                    .to(&timekeeper),
202            )
203            .await
204            .unwrap();
205
206        builder
207            .add_route(
208                Route::new()
209                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
210                    .from(Ref::parent())
211                    .to(&fake_cobalt)
212                    .to(&timekeeper)
213                    .to(&timesource_server)
214                    .to(&maintenance_server),
215            )
216            .await
217            .unwrap();
218
219        builder
220            .add_route(
221                Route::new()
222                    .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
223                    .from(Ref::parent())
224                    .to(&timekeeper),
225            )
226            .await
227            .unwrap();
228
229        let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
230        let realm_instance = builder.build().await.unwrap();
231
232        let fake_clock_control = if use_fake_clock {
233            let control_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
234            let clock_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
235            Some(FakeClockController { control_proxy, clock_proxy })
236        } else {
237            None
238        };
239
240        let cobalt_querier = realm_instance
241            .root
242            .connect_to_protocol_at_exposed_dir()
243            .expect("the connection succeeds");
244
245        let nested_timekeeper = Self { _realm_instance: realm_instance };
246
247        (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
248    }
249}
250
251pub struct RemotePushSourcePuppet {
252    proxy: fidl_test_time_realm::PushSourcePuppetProxy,
253}
254
255impl RemotePushSourcePuppet {
256    /// Creates a new [RemotePushSourcePuppet].
257    pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
258        Arc::new(Self { proxy })
259    }
260
261    /// Set the next sample reported by the time source.
262    pub async fn set_sample(&self, sample: TimeSample) {
263        self.proxy.set_sample(&sample).await.expect("original API was infallible");
264    }
265
266    /// Set the next status reported by the time source.
267    pub async fn set_status(&self, status: Status) {
268        self.proxy.set_status(status).await.expect("original API was infallible");
269    }
270
271    /// Simulate a crash by closing client channels and wiping state.
272    pub async fn simulate_crash(&self) {
273        self.proxy.crash().await.expect("original local API was infallible");
274    }
275
276    /// Returns the number of cumulative connections served. This allows asserting
277    /// behavior such as whether Timekeeper has restarted a connection.
278    pub async fn lifetime_served_connections(&self) -> u32 {
279        self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
280    }
281}
282
283/// A `PushSource` that allows a single client and can be controlled by a test.
284pub struct PushSourcePuppet {
285    /// Internal state for the current PushSource. May be dropped and replaced
286    /// to clear all state.
287    inner: Mutex<PushSourcePuppetInner>,
288    /// The number of client connections received over the lifetime of the puppet.
289    cumulative_clients: Mutex<u32>,
290}
291
292impl PushSourcePuppet {
293    /// Create a new `PushSourcePuppet`.
294    fn new() -> Self {
295        Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
296    }
297
298    /// Serve the `PushSource` service to a client.
299    fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
300        log::debug!("serve_client entry");
301        let mut inner = self.inner.lock();
302        // Timekeeper should only need to connect to a push source once, except when it is
303        // restarting a time source. This case appears to the test as a second connection to the
304        // puppet. Since the puppet is restarted, all its state should be cleared as well.
305        if inner.served_client() {
306            *inner = PushSourcePuppetInner::new();
307        }
308        inner.serve_client(server_end);
309        *self.cumulative_clients.lock() += 1;
310    }
311
312    /// Set the next sample reported by the time source.
313    pub async fn set_sample(&self, sample: TimeSample) {
314        let mut sink = self.inner.lock().get_sink();
315        sink.send(sample.into()).await.unwrap();
316    }
317
318    /// Set the next status reported by the time source.
319    pub async fn set_status(&self, status: Status) {
320        let mut sink = self.inner.lock().get_sink();
321        sink.send(status.into()).await.unwrap();
322    }
323
324    /// Simulate a crash by closing client channels and wiping state.
325    pub fn simulate_crash(&self) {
326        *self.inner.lock() = PushSourcePuppetInner::new();
327        // This drops the old inner and cleans up any tasks it owns.
328    }
329
330    /// Returns the number of cumulative connections served. This allows asserting
331    /// behavior such as whether Timekeeper has restarted a connection.
332    pub fn lifetime_served_connections(&self) -> u32 {
333        *self.cumulative_clients.lock()
334    }
335}
336
337/// Internal state for a PushSourcePuppet. This struct contains a PushSource and
338/// all Tasks needed for it to serve requests,
339struct PushSourcePuppetInner {
340    push_source: Arc<PushSource<TestUpdateAlgorithm>>,
341    /// Tasks serving PushSource clients.
342    tasks: Vec<fasync::Task<()>>,
343    /// Sink through which updates are passed to the PushSource.
344    update_sink: Sender<Update>,
345}
346
347impl PushSourcePuppetInner {
348    fn new() -> Self {
349        let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
350        let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
351        let push_source_clone = Arc::clone(&push_source);
352        let tasks = vec![fasync::Task::spawn(async move {
353            push_source_clone.poll_updates().await.unwrap();
354        })];
355        Self { push_source, tasks, update_sink }
356    }
357
358    /// Returns true if this puppet has or is currently serving a client.
359    fn served_client(&self) -> bool {
360        self.tasks.len() > 1
361    }
362
363    /// Serve the `PushSource` service to a client.
364    fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
365        let push_source_clone = Arc::clone(&self.push_source);
366        self.tasks.push(fasync::Task::spawn(async move {
367            push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
368        }));
369    }
370
371    /// Obtains the sink used to send commands to the push source puppet.
372    ///
373    /// The sink is detached from the puppet, so can be used whenever needed
374    /// without locking.
375    fn get_sink(&self) -> Sender<Update> {
376        self.update_sink.clone()
377    }
378}
379
380/// The list of RTC update requests received by a `NestedTimekeeper`.
381#[derive(Clone, Debug)]
382pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
383
384impl RtcUpdates {
385    /// Get all received RTC times as a vec.
386    pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
387        self.0.lock().clone()
388    }
389}
390
391/// Remote RTC updates - peek into the life of the RTC on the other side of a
392/// RTC connection.
393pub struct RemoteRtcUpdates {
394    proxy: fidl_test_time_realm::RtcUpdatesProxy,
395}
396
397impl RemoteRtcUpdates {
398    pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
399        self.proxy
400            .get(fidl_test_time_realm::GetRequest::default())
401            .await
402            .expect("no errors or overflows") // Original API was infallible.
403            .unwrap()
404            .0
405    }
406    pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
407        RemoteRtcUpdates { proxy }
408    }
409}
410
411/// A wrapper around a `FakeClockControlProxy` that also allows a client to read
412/// the current fake time.
413pub struct FakeClockController {
414    control_proxy: FakeClockControlProxy,
415    clock_proxy: FakeClockProxy,
416}
417
418impl Deref for FakeClockController {
419    type Target = FakeClockControlProxy;
420
421    fn deref(&self) -> &Self::Target {
422        &self.control_proxy
423    }
424}
425
426impl FakeClockController {
427    /// Re-constructs FakeClockController from the constituents.
428    pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
429        FakeClockController { control_proxy, clock_proxy }
430    }
431
432    /// Deconstructs [Self] into fake clock proxies.
433    pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
434        (self.control_proxy, self.clock_proxy)
435    }
436
437    pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
438        self.clock_proxy.get().await.map(|(_boot, mono)| mono.into_nanos())
439    }
440
441    /// Returns the current fake instant on the reference timeline.
442    pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
443        self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
444    }
445}
446
447/// The RTC configuration options.
448pub enum RtcOptions {
449    /// No real-time clock available. This configuration simulates a system that
450    /// does not have a RTC circuit available.
451    None,
452    /// Fake real-time clock. Supplied initial RTC time to report.
453    InitialRtcTime(zx::SyntheticInstant),
454    /// Injected real-time clock.
455    ///
456    /// This is the handle that will appear as the directory
457    /// `/dev/class/rtc` in the Timekeeper's namespace.
458    ///
459    /// The caller must set this directory up so that it serves
460    /// a RTC device (e.g. named `/dev/class/rtc/000`, and serving
461    /// the FIDL `fuchsia.hardware.rtc/Device`) from this directory.
462    ///
463    /// It is also possible to serve more RTCs from the directory, or
464    /// other files and file types at the caller's option.
465    ///
466    /// Use this option if you need to implement corner cases, or
467    /// very specific RTC behavior, such as abnormal configuration
468    /// or anomalous behavior.
469    InjectedRtc(fio::DirectoryProxy),
470}
471
472impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
473    fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
474        match value {
475            fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
476                RtcOptions::InjectedRtc(h.into_proxy())
477            }
478            fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
479                RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
480            }
481            _ => unimplemented!(),
482        }
483    }
484}
485
486impl From<zx::SyntheticInstant> for RtcOptions {
487    fn from(value: zx::SyntheticInstant) -> Self {
488        RtcOptions::InitialRtcTime(value)
489    }
490}
491
492impl From<Option<zx::SyntheticInstant>> for RtcOptions {
493    fn from(value: Option<zx::SyntheticInstant>) -> Self {
494        value.map(|t| t.into()).unwrap_or(Self::None)
495    }
496}
497
498/// Sets up the RTC serving.
499///
500/// Args:
501/// - `rtc_options`: options for RTC setup.
502/// - `build`: the `RealmBuilder` that will construct the realm.
503/// - `timekeeper`: the Timekeeper component instance.
504///
505/// Returns:
506/// - `RtcUpdates`: A vector of RTC updates received from a fake RTC. If the
507///   client serves the RTC directory, then the return value is useless.
508async fn setup_rtc(
509    rtc_options: RtcOptions,
510    builder: &RealmBuilder,
511    timekeeper: &ChildRef,
512) -> RtcUpdates {
513    let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
514
515    let rtc_dir = match rtc_options {
516        RtcOptions::InitialRtcTime(initial_time) => {
517            log::debug!("using fake /dev/class/rtc/000");
518            pseudo_directory! {
519                "class" => pseudo_directory! {
520                    "rtc" => pseudo_directory! {
521                        "000" => vfs::service::host({
522                            let rtc_updates = rtc_updates.clone();
523                            move |stream| {
524                                serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
525                            }
526                        })
527                    }
528                }
529            }
530        }
531        RtcOptions::None => {
532            log::debug!("using an empty /dev/class/rtc directory");
533            pseudo_directory! {
534                "class" => pseudo_directory! {
535                    "rtc" => pseudo_directory! {
536                    }
537                }
538            }
539        }
540        RtcOptions::InjectedRtc(h) => {
541            log::debug!("using /dev/class/rtc provided by client");
542            pseudo_directory! {
543                "class" => pseudo_directory! {
544                    "rtc" => vfs::remote::remote_dir(h)
545                }
546            }
547        }
548    };
549
550    let fake_rtc_server = builder
551        .add_local_child(
552            "fake_rtc",
553            {
554                move |handles| {
555                    let rtc_dir = rtc_dir.clone();
556                    async move {
557                        let _ = &handles;
558                        let mut fs = ServiceFs::new();
559                        fs.add_remote("dev", vfs::directory::serve_read_only(rtc_dir));
560                        fs.serve_connection(handles.outgoing_dir)
561                            .expect("failed to serve fake RTC ServiceFs");
562                        fs.collect::<()>().await;
563                        Ok(())
564                    }
565                    .boxed()
566                }
567            },
568            ChildOptions::new().eager(),
569        )
570        .await
571        .unwrap();
572
573    builder
574        .add_route(
575            Route::new()
576                .capability(
577                    Capability::directory("dev-rtc").path("/dev/class/rtc").rights(fio::R_STAR_DIR),
578                )
579                .from(&fake_rtc_server)
580                .to(&*timekeeper),
581        )
582        .await
583        .unwrap();
584
585    rtc_updates
586}
587
588async fn serve_fake_rtc(
589    initial_time: zx::SyntheticInstant,
590    rtc_updates: RtcUpdates,
591    mut stream: DeviceRequestStream,
592) {
593    while let Some(req) = stream.try_next().await.unwrap() {
594        match req {
595            DeviceRequest::Get { responder } => {
596                log::debug!("serve_fake_rtc: DeviceRequest::Get");
597                // Since timekeeper only pulls a time off of the RTC device once on startup, we
598                // don't attempt to update the sent time.
599                responder.send(Ok(&zx_time_to_rtc_time(initial_time))).unwrap();
600            }
601            DeviceRequest::Set { rtc, responder } => {
602                log::debug!("serve_fake_rtc: DeviceRequest::Set");
603                rtc_updates.0.lock().push(rtc);
604                responder.send(zx::Status::OK.into_raw()).unwrap();
605            }
606            DeviceRequest::Set2 { rtc, responder } => {
607                log::debug!("serve_fake_rtc: DeviceRequest::Set2");
608                rtc_updates.0.lock().push(rtc);
609                responder.send(Ok(())).unwrap();
610            }
611            DeviceRequest::_UnknownMethod { .. } => {}
612        }
613    }
614}
615
616async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
617    stream
618        .try_for_each_concurrent(None, |req| async {
619            let _ = &req;
620            let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
621            puppet.serve_client(push_source);
622            Ok(())
623        })
624        .await
625        .unwrap();
626}
627
628async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
629    while let Some(req) = stream.try_next().await.unwrap() {
630        let MaintenanceRequest::GetWritableUtcClock { responder } = req;
631        responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
632    }
633}
634
635async fn timesource_mock_server(
636    handles: LocalComponentHandles,
637    push_source_puppet: Arc<PushSourcePuppet>,
638) -> Result<(), anyhow::Error> {
639    let mut fs = ServiceFs::new();
640    let mut tasks = vec![];
641
642    fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
643        let puppet_clone = Arc::clone(&push_source_puppet);
644
645        tasks.push(fasync::Task::local(async move {
646            serve_test_control(&*puppet_clone, stream).await;
647        }));
648    });
649
650    fs.serve_connection(handles.outgoing_dir)?;
651    fs.collect::<()>().await;
652
653    Ok(())
654}
655
656async fn maintenance_mock_server(
657    handles: LocalComponentHandles,
658    clock: Arc<zx::Clock>,
659) -> Result<(), anyhow::Error> {
660    let mut fs = ServiceFs::new();
661    let mut tasks = vec![];
662
663    fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
664        let clock_clone = Arc::clone(&clock);
665
666        tasks.push(fasync::Task::local(async move {
667            serve_maintenance(clock_clone, stream).await;
668        }));
669    });
670
671    fs.serve_connection(handles.outgoing_dir)?;
672    fs.collect::<()>().await;
673
674    Ok(())
675}
676
677fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
678    zx::SyntheticInstant::from_nanos(
679        chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
680    )
681}
682
683pub static BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
684    LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT"));
685pub static VALID_RTC_TIME: LazyLock<zx::SyntheticInstant> =
686    LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT"));
687pub static BEFORE_BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
688    LazyLock::new(|| from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT"));
689pub static VALID_TIME: LazyLock<zx::SyntheticInstant> =
690    LazyLock::new(|| from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT"));
691pub static VALID_TIME_2: LazyLock<zx::SyntheticInstant> =
692    LazyLock::new(|| from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT"));
693
694/// Time between each reported sample.
695pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
696
697/// The standard deviation to report on valid time samples.
698pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
699
700/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
701// TODO: b/306024715 - To be removed once all tests are migrated to TTRF.
702pub fn new_clock() -> Arc<zx::SyntheticClock> {
703    Arc::new(new_nonshareable_clock())
704}
705
706/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
707pub fn new_nonshareable_clock() -> zx::SyntheticClock {
708    zx::SyntheticClock::create(zx::ClockOpts::MAPPABLE, Some(*BACKSTOP_TIME)).unwrap()
709}
710
711fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
712    let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
713    fidl_fuchsia_hardware_rtc::Time {
714        seconds: date.second() as u8,
715        minutes: date.minute() as u8,
716        hours: date.hour() as u8,
717        day: date.day() as u8,
718        month: date.month() as u8,
719        year: date.year() as u16,
720    }
721}
722
723pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
724    let date = chrono::Utc
725        .with_ymd_and_hms(
726            rtc_time.year as i32,
727            rtc_time.month as u32,
728            rtc_time.day as u32,
729            rtc_time.hours as u32,
730            rtc_time.minutes as u32,
731            rtc_time.seconds as u32,
732        )
733        .unwrap();
734    zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
735}
736
737/// Create a stream of MetricEvents from a proxy.
738pub fn create_cobalt_event_stream(
739    proxy: Arc<MetricEventLoggerQuerierProxy>,
740    log_method: LogMethod,
741) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
742    async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
743        p.watch_logs(PROJECT_ID, log_method)
744    })
745    .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
746    .flatten()
747    .boxed()
748}
749
750/// Repeatedly evaluates `condition` until it returns `Some(v)`. Returns `v`.
751#[macro_export]
752macro_rules! poll_until_some {
753    ($condition:expr) => {
754        $crate::poll_until_some_impl(
755            $condition,
756            &$crate::SourceLocation::new(file!(), line!(), column!()),
757        )
758    };
759}
760
761/// Repeatedly evaluates an async `condition` until it returns `Some(v)`. Returns `v`.
762/// Use if your condition is an async fn.
763#[macro_export]
764macro_rules! poll_until_some_async {
765    ($condition:expr) => {{
766        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
767        log::info!("=> poll_until_some_async() for {}", &loc);
768        let mut result = None;
769        loop {
770            result = $condition.await;
771            if result.is_some() {
772                break;
773            }
774            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
775        }
776        log::info!("=> poll_until_some_async() done for {}", &loc);
777        result.expect("we loop around while result is None")
778    }};
779}
780
781/// Repeatedly evaluates `condition` to create a `Future`, and then awaits the `Future`.
782/// Returns `()` when the (most recently created) `Future` resolves to `true`.
783#[macro_export]
784macro_rules! poll_until_async {
785    ($condition:expr) => {
786        $crate::poll_until_async_impl(
787            $condition,
788            &$crate::SourceLocation::new(file!(), line!(), column!()),
789        )
790    };
791}
792
793/// A reimplementation of the above, which deals better with borrows.
794#[macro_export]
795macro_rules! poll_until_async_2 {
796    ($condition:expr) => {{
797        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
798        log::info!("=> poll_until_async() for {}", &loc);
799        let mut result = true;
800        loop {
801            result = $condition.await;
802            if result {
803                break;
804            }
805            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
806        }
807        log::info!("=> poll_until_async_2() done for {}", &loc);
808        result
809    }};
810}
811
812/// Repeatedly evaluates `condition` until it returns `true`. Returns `()`.
813#[macro_export]
814macro_rules! poll_until {
815    ($condition:expr) => {
816        $crate::poll_until_impl(
817            $condition,
818            &$crate::SourceLocation::new(file!(), line!(), column!()),
819        )
820    };
821}
822
823/// Wait duration for polling.
824pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
825
826pub struct SourceLocation {
827    file: &'static str,
828    line: u32,
829    column: u32,
830}
831
832impl std::fmt::Display for SourceLocation {
833    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
834        write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
835    }
836}
837
838impl SourceLocation {
839    pub fn new(file: &'static str, line: u32, column: u32) -> Self {
840        Self { file, line, column }
841    }
842}
843
844/// Use `poll_until_some!()` instead.
845pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
846where
847    F: Fn() -> Option<T>,
848{
849    log::info!("=> poll_until_some() for {}", loc);
850    loop {
851        match poll_fn() {
852            Some(value) => {
853                log::info!("<= poll_until_some() for {}", loc);
854                return value;
855            }
856            None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
857        }
858    }
859}
860
861/// Use `poll_until_async!()` instead.
862pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
863where
864    F: Fn() -> Fut,
865    Fut: Future<Output = bool>,
866{
867    log::info!("=> poll_until_async() for {}", loc);
868    while !poll_fn().await {
869        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
870    }
871    log::info!("<= poll_until_async() for {}", loc);
872}
873
874/// Use `poll_until!()` instead.
875pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
876    log::info!("=> poll_until() for {}", loc);
877    while !poll_fn() {
878        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
879    }
880    log::info!("<= poll_until() for {}", loc);
881}