1use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_metrics::MetricEvent;
10use fidl_fuchsia_metrics_test::{LogMethod, MetricEventLoggerQuerierProxy};
11use fidl_fuchsia_testing::{FakeClockControlProxy, FakeClockProxy};
12use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
13use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
14use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
15use fuchsia_component::server::ServiceFs;
16use fuchsia_component_test::{
17 Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
18 Route,
19};
20use fuchsia_sync::Mutex;
21use futures::channel::mpsc::Sender;
22use futures::stream::{Stream, StreamExt, TryStreamExt};
23use futures::{Future, FutureExt, SinkExt};
24use push_source::{PushSource, TestUpdateAlgorithm, Update};
25use std::ops::Deref;
26use std::sync::{Arc, LazyLock};
27use time_metrics_registry::PROJECT_ID;
28use vfs::pseudo_directory;
29use zx::{self as zx, HandleBased, Rights};
30use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
31
32const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
34const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
36const COBALT_URL: &str = "#meta/fake_cobalt.cm";
38const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
40
41pub struct NestedTimekeeper {
44 _realm_instance: RealmInstance,
45}
46
47impl Into<RealmInstance> for NestedTimekeeper {
48 fn into(self) -> RealmInstance {
50 self._realm_instance
51 }
52}
53
54impl NestedTimekeeper {
55 pub async fn new(
68 clock: Arc<zx::Clock>,
69 rtc_options: RtcOptions,
70 use_fake_clock: bool,
71 ) -> (
72 Self,
73 Arc<PushSourcePuppet>,
74 RtcUpdates,
75 MetricEventLoggerQuerierProxy,
76 Option<FakeClockController>,
77 ) {
78 let push_source_puppet = Arc::new(PushSourcePuppet::new());
79
80 let builder = RealmBuilder::new().await.unwrap();
81 let fake_cobalt =
82 builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
83
84 let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
85 log::trace!("using timekeeper_url: {}", timekeeper_url);
86 let timekeeper = builder
87 .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
88 .await
89 .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
90 .unwrap();
91
92 let timesource_server = builder
93 .add_local_child(
94 "timesource_mock",
95 {
96 let push_source_puppet = Arc::clone(&push_source_puppet);
97 move |handles: LocalComponentHandles| {
98 Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
99 }
100 },
101 ChildOptions::new(),
102 )
103 .await
104 .context("while starting up timesource_mock")
105 .unwrap();
106
107 let maintenance_server = builder
108 .add_local_child(
109 "maintenance_mock",
110 move |handles: LocalComponentHandles| {
111 Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
112 },
113 ChildOptions::new(),
114 )
115 .await
116 .context("while starting up maintenance_mock")
117 .unwrap();
118
119 if use_fake_clock {
121 let fake_clock =
122 builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
123
124 builder
125 .add_route(
126 Route::new()
127 .capability(Capability::protocol_by_name(
128 "fuchsia.testing.FakeClockControl",
129 ))
130 .from(&fake_clock)
131 .to(Ref::parent()),
132 )
133 .await
134 .context("while setting up FakeClockControl")
135 .unwrap();
136
137 builder
138 .add_route(
139 Route::new()
140 .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
141 .from(&fake_clock)
142 .to(Ref::parent())
143 .to(&timekeeper),
144 )
145 .await
146 .context("while setting up FakeClock")
147 .unwrap();
148
149 builder
150 .add_route(
151 Route::new()
152 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
153 .from(Ref::parent())
154 .to(&fake_clock),
155 )
156 .await
157 .context("while setting up LogSink")
158 .unwrap();
159 };
160
161 builder
162 .add_route(
163 Route::new()
164 .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
165 .from(&maintenance_server)
166 .to(&timekeeper),
167 )
168 .await
169 .context("while setting up Maintenance")
170 .unwrap();
171
172 builder
173 .add_route(
174 Route::new()
175 .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
176 .from(×ource_server)
177 .to(&timekeeper),
178 )
179 .await
180 .unwrap();
181
182 builder
183 .add_route(
184 Route::new()
185 .capability(Capability::protocol_by_name(
186 "fuchsia.metrics.test.MetricEventLoggerQuerier",
187 ))
188 .from(&fake_cobalt)
189 .to(Ref::parent()),
190 )
191 .await
192 .unwrap();
193
194 builder
195 .add_route(
196 Route::new()
197 .capability(Capability::protocol_by_name(
198 "fuchsia.metrics.MetricEventLoggerFactory",
199 ))
200 .from(&fake_cobalt)
201 .to(&timekeeper),
202 )
203 .await
204 .unwrap();
205
206 builder
207 .add_route(
208 Route::new()
209 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
210 .from(Ref::parent())
211 .to(&fake_cobalt)
212 .to(&timekeeper)
213 .to(×ource_server)
214 .to(&maintenance_server),
215 )
216 .await
217 .unwrap();
218
219 builder
220 .add_route(
221 Route::new()
222 .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
223 .from(Ref::parent())
224 .to(&timekeeper),
225 )
226 .await
227 .unwrap();
228
229 let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
230 let realm_instance = builder.build().await.unwrap();
231
232 let fake_clock_control = if use_fake_clock {
233 let control_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
234 let clock_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
235 Some(FakeClockController { control_proxy, clock_proxy })
236 } else {
237 None
238 };
239
240 let cobalt_querier = realm_instance
241 .root
242 .connect_to_protocol_at_exposed_dir()
243 .expect("the connection succeeds");
244
245 let nested_timekeeper = Self { _realm_instance: realm_instance };
246
247 (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
248 }
249}
250
251pub struct RemotePushSourcePuppet {
252 proxy: fidl_test_time_realm::PushSourcePuppetProxy,
253}
254
255impl RemotePushSourcePuppet {
256 pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
258 Arc::new(Self { proxy })
259 }
260
261 pub async fn set_sample(&self, sample: TimeSample) {
263 self.proxy.set_sample(&sample).await.expect("original API was infallible");
264 }
265
266 pub async fn set_status(&self, status: Status) {
268 self.proxy.set_status(status).await.expect("original API was infallible");
269 }
270
271 pub async fn simulate_crash(&self) {
273 self.proxy.crash().await.expect("original local API was infallible");
274 }
275
276 pub async fn lifetime_served_connections(&self) -> u32 {
279 self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
280 }
281}
282
283pub struct PushSourcePuppet {
285 inner: Mutex<PushSourcePuppetInner>,
288 cumulative_clients: Mutex<u32>,
290}
291
292impl PushSourcePuppet {
293 fn new() -> Self {
295 Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
296 }
297
298 fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
300 log::debug!("serve_client entry");
301 let mut inner = self.inner.lock();
302 if inner.served_client() {
306 *inner = PushSourcePuppetInner::new();
307 }
308 inner.serve_client(server_end);
309 *self.cumulative_clients.lock() += 1;
310 }
311
312 pub async fn set_sample(&self, sample: TimeSample) {
314 let mut sink = self.inner.lock().get_sink();
315 sink.send(sample.into()).await.unwrap();
316 }
317
318 pub async fn set_status(&self, status: Status) {
320 let mut sink = self.inner.lock().get_sink();
321 sink.send(status.into()).await.unwrap();
322 }
323
324 pub fn simulate_crash(&self) {
326 *self.inner.lock() = PushSourcePuppetInner::new();
327 }
329
330 pub fn lifetime_served_connections(&self) -> u32 {
333 *self.cumulative_clients.lock()
334 }
335}
336
337struct PushSourcePuppetInner {
340 push_source: Arc<PushSource<TestUpdateAlgorithm>>,
341 tasks: Vec<fasync::Task<()>>,
343 update_sink: Sender<Update>,
345}
346
347impl PushSourcePuppetInner {
348 fn new() -> Self {
349 let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
350 let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
351 let push_source_clone = Arc::clone(&push_source);
352 let tasks = vec![fasync::Task::spawn(async move {
353 push_source_clone.poll_updates().await.unwrap();
354 })];
355 Self { push_source, tasks, update_sink }
356 }
357
358 fn served_client(&self) -> bool {
360 self.tasks.len() > 1
361 }
362
363 fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
365 let push_source_clone = Arc::clone(&self.push_source);
366 self.tasks.push(fasync::Task::spawn(async move {
367 push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
368 }));
369 }
370
371 fn get_sink(&self) -> Sender<Update> {
376 self.update_sink.clone()
377 }
378}
379
380#[derive(Clone, Debug)]
382pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
383
384impl RtcUpdates {
385 pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
387 self.0.lock().clone()
388 }
389}
390
391pub struct RemoteRtcUpdates {
394 proxy: fidl_test_time_realm::RtcUpdatesProxy,
395}
396
397impl RemoteRtcUpdates {
398 pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
399 self.proxy
400 .get(fidl_test_time_realm::GetRequest::default())
401 .await
402 .expect("no errors or overflows") .unwrap()
404 .0
405 }
406 pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
407 RemoteRtcUpdates { proxy }
408 }
409}
410
411pub struct FakeClockController {
414 control_proxy: FakeClockControlProxy,
415 clock_proxy: FakeClockProxy,
416}
417
418impl Deref for FakeClockController {
419 type Target = FakeClockControlProxy;
420
421 fn deref(&self) -> &Self::Target {
422 &self.control_proxy
423 }
424}
425
426impl FakeClockController {
427 pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
429 FakeClockController { control_proxy, clock_proxy }
430 }
431
432 pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
434 (self.control_proxy, self.clock_proxy)
435 }
436
437 pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
438 self.clock_proxy.get().await.map(|(_boot, mono)| mono.into_nanos())
439 }
440
441 pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
443 self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
444 }
445}
446
447pub enum RtcOptions {
449 None,
452 InitialRtcTime(zx::SyntheticInstant),
454 InjectedRtc(fio::DirectoryProxy),
470}
471
472impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
473 fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
474 match value {
475 fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
476 RtcOptions::InjectedRtc(h.into_proxy())
477 }
478 fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
479 RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
480 }
481 _ => unimplemented!(),
482 }
483 }
484}
485
486impl From<zx::SyntheticInstant> for RtcOptions {
487 fn from(value: zx::SyntheticInstant) -> Self {
488 RtcOptions::InitialRtcTime(value)
489 }
490}
491
492impl From<Option<zx::SyntheticInstant>> for RtcOptions {
493 fn from(value: Option<zx::SyntheticInstant>) -> Self {
494 value.map(|t| t.into()).unwrap_or(Self::None)
495 }
496}
497
498async fn setup_rtc(
509 rtc_options: RtcOptions,
510 builder: &RealmBuilder,
511 timekeeper: &ChildRef,
512) -> RtcUpdates {
513 let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
514
515 let rtc_dir = match rtc_options {
516 RtcOptions::InitialRtcTime(initial_time) => {
517 log::debug!("using fake /dev/class/rtc/000");
518 pseudo_directory! {
519 "class" => pseudo_directory! {
520 "rtc" => pseudo_directory! {
521 "000" => vfs::service::host({
522 let rtc_updates = rtc_updates.clone();
523 move |stream| {
524 serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
525 }
526 })
527 }
528 }
529 }
530 }
531 RtcOptions::None => {
532 log::debug!("using an empty /dev/class/rtc directory");
533 pseudo_directory! {
534 "class" => pseudo_directory! {
535 "rtc" => pseudo_directory! {
536 }
537 }
538 }
539 }
540 RtcOptions::InjectedRtc(h) => {
541 log::debug!("using /dev/class/rtc provided by client");
542 pseudo_directory! {
543 "class" => pseudo_directory! {
544 "rtc" => vfs::remote::remote_dir(h)
545 }
546 }
547 }
548 };
549
550 let fake_rtc_server = builder
551 .add_local_child(
552 "fake_rtc",
553 {
554 move |handles| {
555 let rtc_dir = rtc_dir.clone();
556 async move {
557 let _ = &handles;
558 let mut fs = ServiceFs::new();
559 fs.add_remote("dev", vfs::directory::serve_read_only(rtc_dir));
560 fs.serve_connection(handles.outgoing_dir)
561 .expect("failed to serve fake RTC ServiceFs");
562 fs.collect::<()>().await;
563 Ok(())
564 }
565 .boxed()
566 }
567 },
568 ChildOptions::new().eager(),
569 )
570 .await
571 .unwrap();
572
573 builder
574 .add_route(
575 Route::new()
576 .capability(
577 Capability::directory("dev-rtc").path("/dev/class/rtc").rights(fio::R_STAR_DIR),
578 )
579 .from(&fake_rtc_server)
580 .to(&*timekeeper),
581 )
582 .await
583 .unwrap();
584
585 rtc_updates
586}
587
588async fn serve_fake_rtc(
589 initial_time: zx::SyntheticInstant,
590 rtc_updates: RtcUpdates,
591 mut stream: DeviceRequestStream,
592) {
593 while let Some(req) = stream.try_next().await.unwrap() {
594 match req {
595 DeviceRequest::Get { responder } => {
596 log::debug!("serve_fake_rtc: DeviceRequest::Get");
597 responder.send(Ok(&zx_time_to_rtc_time(initial_time))).unwrap();
600 }
601 DeviceRequest::Set { rtc, responder } => {
602 log::debug!("serve_fake_rtc: DeviceRequest::Set");
603 rtc_updates.0.lock().push(rtc);
604 responder.send(zx::Status::OK.into_raw()).unwrap();
605 }
606 DeviceRequest::Set2 { rtc, responder } => {
607 log::debug!("serve_fake_rtc: DeviceRequest::Set2");
608 rtc_updates.0.lock().push(rtc);
609 responder.send(Ok(())).unwrap();
610 }
611 DeviceRequest::_UnknownMethod { .. } => {}
612 }
613 }
614}
615
616async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
617 stream
618 .try_for_each_concurrent(None, |req| async {
619 let _ = &req;
620 let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
621 puppet.serve_client(push_source);
622 Ok(())
623 })
624 .await
625 .unwrap();
626}
627
628async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
629 while let Some(req) = stream.try_next().await.unwrap() {
630 let MaintenanceRequest::GetWritableUtcClock { responder } = req;
631 responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
632 }
633}
634
635async fn timesource_mock_server(
636 handles: LocalComponentHandles,
637 push_source_puppet: Arc<PushSourcePuppet>,
638) -> Result<(), anyhow::Error> {
639 let mut fs = ServiceFs::new();
640 let mut tasks = vec![];
641
642 fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
643 let puppet_clone = Arc::clone(&push_source_puppet);
644
645 tasks.push(fasync::Task::local(async move {
646 serve_test_control(&*puppet_clone, stream).await;
647 }));
648 });
649
650 fs.serve_connection(handles.outgoing_dir)?;
651 fs.collect::<()>().await;
652
653 Ok(())
654}
655
656async fn maintenance_mock_server(
657 handles: LocalComponentHandles,
658 clock: Arc<zx::Clock>,
659) -> Result<(), anyhow::Error> {
660 let mut fs = ServiceFs::new();
661 let mut tasks = vec![];
662
663 fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
664 let clock_clone = Arc::clone(&clock);
665
666 tasks.push(fasync::Task::local(async move {
667 serve_maintenance(clock_clone, stream).await;
668 }));
669 });
670
671 fs.serve_connection(handles.outgoing_dir)?;
672 fs.collect::<()>().await;
673
674 Ok(())
675}
676
677fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
678 zx::SyntheticInstant::from_nanos(
679 chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
680 )
681}
682
683pub static BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
684 LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT"));
685pub static VALID_RTC_TIME: LazyLock<zx::SyntheticInstant> =
686 LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT"));
687pub static BEFORE_BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
688 LazyLock::new(|| from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT"));
689pub static VALID_TIME: LazyLock<zx::SyntheticInstant> =
690 LazyLock::new(|| from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT"));
691pub static VALID_TIME_2: LazyLock<zx::SyntheticInstant> =
692 LazyLock::new(|| from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT"));
693
694pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
696
697pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
699
700pub fn new_clock() -> Arc<zx::SyntheticClock> {
703 Arc::new(new_nonshareable_clock())
704}
705
706pub fn new_nonshareable_clock() -> zx::SyntheticClock {
708 zx::SyntheticClock::create(zx::ClockOpts::MAPPABLE, Some(*BACKSTOP_TIME)).unwrap()
709}
710
711fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
712 let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
713 fidl_fuchsia_hardware_rtc::Time {
714 seconds: date.second() as u8,
715 minutes: date.minute() as u8,
716 hours: date.hour() as u8,
717 day: date.day() as u8,
718 month: date.month() as u8,
719 year: date.year() as u16,
720 }
721}
722
723pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
724 let date = chrono::Utc
725 .with_ymd_and_hms(
726 rtc_time.year as i32,
727 rtc_time.month as u32,
728 rtc_time.day as u32,
729 rtc_time.hours as u32,
730 rtc_time.minutes as u32,
731 rtc_time.seconds as u32,
732 )
733 .unwrap();
734 zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
735}
736
737pub fn create_cobalt_event_stream(
739 proxy: Arc<MetricEventLoggerQuerierProxy>,
740 log_method: LogMethod,
741) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
742 async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
743 p.watch_logs(PROJECT_ID, log_method)
744 })
745 .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
746 .flatten()
747 .boxed()
748}
749
750#[macro_export]
752macro_rules! poll_until_some {
753 ($condition:expr) => {
754 $crate::poll_until_some_impl(
755 $condition,
756 &$crate::SourceLocation::new(file!(), line!(), column!()),
757 )
758 };
759}
760
761#[macro_export]
764macro_rules! poll_until_some_async {
765 ($condition:expr) => {{
766 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
767 log::info!("=> poll_until_some_async() for {}", &loc);
768 let mut result = None;
769 loop {
770 result = $condition.await;
771 if result.is_some() {
772 break;
773 }
774 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
775 }
776 log::info!("=> poll_until_some_async() done for {}", &loc);
777 result.expect("we loop around while result is None")
778 }};
779}
780
781#[macro_export]
784macro_rules! poll_until_async {
785 ($condition:expr) => {
786 $crate::poll_until_async_impl(
787 $condition,
788 &$crate::SourceLocation::new(file!(), line!(), column!()),
789 )
790 };
791}
792
793#[macro_export]
795macro_rules! poll_until_async_2 {
796 ($condition:expr) => {{
797 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
798 log::info!("=> poll_until_async() for {}", &loc);
799 let mut result = true;
800 loop {
801 result = $condition.await;
802 if result {
803 break;
804 }
805 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
806 }
807 log::info!("=> poll_until_async_2() done for {}", &loc);
808 result
809 }};
810}
811
812#[macro_export]
814macro_rules! poll_until {
815 ($condition:expr) => {
816 $crate::poll_until_impl(
817 $condition,
818 &$crate::SourceLocation::new(file!(), line!(), column!()),
819 )
820 };
821}
822
823pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
825
826pub struct SourceLocation {
827 file: &'static str,
828 line: u32,
829 column: u32,
830}
831
832impl std::fmt::Display for SourceLocation {
833 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
834 write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
835 }
836}
837
838impl SourceLocation {
839 pub fn new(file: &'static str, line: u32, column: u32) -> Self {
840 Self { file, line, column }
841 }
842}
843
844pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
846where
847 F: Fn() -> Option<T>,
848{
849 log::info!("=> poll_until_some() for {}", loc);
850 loop {
851 match poll_fn() {
852 Some(value) => {
853 log::info!("<= poll_until_some() for {}", loc);
854 return value;
855 }
856 None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
857 }
858 }
859}
860
861pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
863where
864 F: Fn() -> Fut,
865 Fut: Future<Output = bool>,
866{
867 log::info!("=> poll_until_async() for {}", loc);
868 while !poll_fn().await {
869 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
870 }
871 log::info!("<= poll_until_async() for {}", loc);
872}
873
874pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
876 log::info!("=> poll_until() for {}", loc);
877 while !poll_fn() {
878 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
879 }
880 log::info!("<= poll_until() for {}", loc);
881}