1use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_io as fio;
10use fidl_fuchsia_metrics::MetricEvent;
11use fidl_fuchsia_metrics_test::{LogMethod, MetricEventLoggerQuerierProxy};
12use fidl_fuchsia_testing::{FakeClockControlProxy, FakeClockProxy};
13use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
14use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
15use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
16use fuchsia_async as fasync;
17use fuchsia_component::server::ServiceFs;
18use fuchsia_component_test::{
19 Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
20 Route,
21};
22use fuchsia_sync::Mutex;
23use futures::channel::mpsc::Sender;
24use futures::stream::{Stream, StreamExt, TryStreamExt};
25use futures::{Future, FutureExt, SinkExt};
26use push_source::{PushSource, TestUpdateAlgorithm, Update};
27use std::ops::Deref;
28use std::sync::{Arc, LazyLock};
29use time_metrics_registry::PROJECT_ID;
30use vfs::pseudo_directory;
31use zx::{self as zx, Rights};
32
33const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
35const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
37const COBALT_URL: &str = "#meta/fake_cobalt.cm";
39const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
41
42pub struct NestedTimekeeper {
45 _realm_instance: RealmInstance,
46}
47
48impl Into<RealmInstance> for NestedTimekeeper {
49 fn into(self) -> RealmInstance {
51 self._realm_instance
52 }
53}
54
55impl NestedTimekeeper {
56 pub async fn new(
69 clock: Arc<zx::Clock>,
70 rtc_options: RtcOptions,
71 use_fake_clock: bool,
72 ) -> (
73 Self,
74 Arc<PushSourcePuppet>,
75 RtcUpdates,
76 MetricEventLoggerQuerierProxy,
77 Option<FakeClockController>,
78 ) {
79 let push_source_puppet = Arc::new(PushSourcePuppet::new());
80
81 let builder = RealmBuilder::new().await.unwrap();
82 let fake_cobalt =
83 builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
84
85 let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
86 log::trace!("using timekeeper_url: {}", timekeeper_url);
87 let timekeeper = builder
88 .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
89 .await
90 .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
91 .unwrap();
92
93 let timesource_server = builder
94 .add_local_child(
95 "timesource_mock",
96 {
97 let push_source_puppet = Arc::clone(&push_source_puppet);
98 move |handles: LocalComponentHandles| {
99 Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
100 }
101 },
102 ChildOptions::new(),
103 )
104 .await
105 .context("while starting up timesource_mock")
106 .unwrap();
107
108 let maintenance_server = builder
109 .add_local_child(
110 "maintenance_mock",
111 move |handles: LocalComponentHandles| {
112 Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
113 },
114 ChildOptions::new(),
115 )
116 .await
117 .context("while starting up maintenance_mock")
118 .unwrap();
119
120 if use_fake_clock {
122 let fake_clock =
123 builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
124
125 builder
126 .add_route(
127 Route::new()
128 .capability(Capability::protocol_by_name(
129 "fuchsia.testing.FakeClockControl",
130 ))
131 .from(&fake_clock)
132 .to(Ref::parent()),
133 )
134 .await
135 .context("while setting up FakeClockControl")
136 .unwrap();
137
138 builder
139 .add_route(
140 Route::new()
141 .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
142 .from(&fake_clock)
143 .to(Ref::parent())
144 .to(&timekeeper),
145 )
146 .await
147 .context("while setting up FakeClock")
148 .unwrap();
149
150 builder
151 .add_route(
152 Route::new()
153 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
154 .from(Ref::parent())
155 .to(&fake_clock),
156 )
157 .await
158 .context("while setting up LogSink")
159 .unwrap();
160 };
161
162 builder
163 .add_route(
164 Route::new()
165 .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
166 .from(&maintenance_server)
167 .to(&timekeeper),
168 )
169 .await
170 .context("while setting up Maintenance")
171 .unwrap();
172
173 builder
174 .add_route(
175 Route::new()
176 .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
177 .from(×ource_server)
178 .to(&timekeeper),
179 )
180 .await
181 .unwrap();
182
183 builder
184 .add_route(
185 Route::new()
186 .capability(Capability::protocol_by_name(
187 "fuchsia.metrics.test.MetricEventLoggerQuerier",
188 ))
189 .from(&fake_cobalt)
190 .to(Ref::parent()),
191 )
192 .await
193 .unwrap();
194
195 builder
196 .add_route(
197 Route::new()
198 .capability(Capability::protocol_by_name(
199 "fuchsia.metrics.MetricEventLoggerFactory",
200 ))
201 .from(&fake_cobalt)
202 .to(&timekeeper),
203 )
204 .await
205 .unwrap();
206
207 builder
208 .add_route(
209 Route::new()
210 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
211 .from(Ref::parent())
212 .to(&fake_cobalt)
213 .to(&timekeeper)
214 .to(×ource_server)
215 .to(&maintenance_server),
216 )
217 .await
218 .unwrap();
219
220 builder
221 .add_route(
222 Route::new()
223 .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
224 .from(Ref::parent())
225 .to(&timekeeper),
226 )
227 .await
228 .unwrap();
229
230 let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
231 let realm_instance = builder.build().await.unwrap();
232
233 let fake_clock_control = if use_fake_clock {
234 let control_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
235 let clock_proxy = realm_instance.root.connect_to_protocol_at_exposed_dir().unwrap();
236 Some(FakeClockController { control_proxy, clock_proxy })
237 } else {
238 None
239 };
240
241 let cobalt_querier = realm_instance
242 .root
243 .connect_to_protocol_at_exposed_dir()
244 .expect("the connection succeeds");
245
246 let nested_timekeeper = Self { _realm_instance: realm_instance };
247
248 (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
249 }
250}
251
252pub struct RemotePushSourcePuppet {
253 proxy: fidl_test_time_realm::PushSourcePuppetProxy,
254}
255
256impl RemotePushSourcePuppet {
257 pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
259 Arc::new(Self { proxy })
260 }
261
262 pub async fn set_sample(&self, sample: TimeSample) {
264 self.proxy.set_sample(&sample).await.expect("original API was infallible");
265 }
266
267 pub async fn set_status(&self, status: Status) {
269 self.proxy.set_status(status).await.expect("original API was infallible");
270 }
271
272 pub async fn simulate_crash(&self) {
274 self.proxy.crash().await.expect("original local API was infallible");
275 }
276
277 pub async fn lifetime_served_connections(&self) -> u32 {
280 self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
281 }
282}
283
284pub struct PushSourcePuppet {
286 inner: Mutex<PushSourcePuppetInner>,
289 cumulative_clients: Mutex<u32>,
291}
292
293impl PushSourcePuppet {
294 fn new() -> Self {
296 Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
297 }
298
299 fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
301 log::debug!("serve_client entry");
302 let mut inner = self.inner.lock();
303 if inner.served_client() {
307 *inner = PushSourcePuppetInner::new();
308 }
309 inner.serve_client(server_end);
310 *self.cumulative_clients.lock() += 1;
311 }
312
313 pub async fn set_sample(&self, sample: TimeSample) {
315 let mut sink = self.inner.lock().get_sink();
316 sink.send(sample.into()).await.unwrap();
317 }
318
319 pub async fn set_status(&self, status: Status) {
321 let mut sink = self.inner.lock().get_sink();
322 sink.send(status.into()).await.unwrap();
323 }
324
325 pub fn simulate_crash(&self) {
327 *self.inner.lock() = PushSourcePuppetInner::new();
328 }
330
331 pub fn lifetime_served_connections(&self) -> u32 {
334 *self.cumulative_clients.lock()
335 }
336}
337
338struct PushSourcePuppetInner {
341 push_source: Arc<PushSource<TestUpdateAlgorithm>>,
342 tasks: Vec<fasync::Task<()>>,
344 update_sink: Sender<Update>,
346}
347
348impl PushSourcePuppetInner {
349 fn new() -> Self {
350 let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
351 let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
352 let push_source_clone = Arc::clone(&push_source);
353 let tasks = vec![fasync::Task::spawn(async move {
354 push_source_clone.poll_updates().await.unwrap();
355 })];
356 Self { push_source, tasks, update_sink }
357 }
358
359 fn served_client(&self) -> bool {
361 self.tasks.len() > 1
362 }
363
364 fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
366 let push_source_clone = Arc::clone(&self.push_source);
367 self.tasks.push(fasync::Task::spawn(async move {
368 push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
369 }));
370 }
371
372 fn get_sink(&self) -> Sender<Update> {
377 self.update_sink.clone()
378 }
379}
380
381#[derive(Clone, Debug)]
383pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
384
385impl RtcUpdates {
386 pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
388 self.0.lock().clone()
389 }
390}
391
392pub struct RemoteRtcUpdates {
395 proxy: fidl_test_time_realm::RtcUpdatesProxy,
396}
397
398impl RemoteRtcUpdates {
399 pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
400 self.proxy
401 .get(fidl_test_time_realm::GetRequest::default())
402 .await
403 .expect("no errors or overflows") .unwrap()
405 .0
406 }
407 pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
408 RemoteRtcUpdates { proxy }
409 }
410}
411
412pub struct FakeClockController {
415 control_proxy: FakeClockControlProxy,
416 clock_proxy: FakeClockProxy,
417}
418
419impl Deref for FakeClockController {
420 type Target = FakeClockControlProxy;
421
422 fn deref(&self) -> &Self::Target {
423 &self.control_proxy
424 }
425}
426
427impl FakeClockController {
428 pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
430 FakeClockController { control_proxy, clock_proxy }
431 }
432
433 pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
435 (self.control_proxy, self.clock_proxy)
436 }
437
438 pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
439 self.clock_proxy.get().await.map(|(_boot, mono)| mono.into_nanos())
440 }
441
442 pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
444 self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
445 }
446}
447
448pub enum RtcOptions {
450 None,
453 InitialRtcTime(zx::SyntheticInstant),
455 InjectedRtc(fio::DirectoryProxy),
471}
472
473impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
474 fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
475 match value {
476 fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
477 RtcOptions::InjectedRtc(h.into_proxy())
478 }
479 fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
480 RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
481 }
482 _ => unimplemented!(),
483 }
484 }
485}
486
487impl From<zx::SyntheticInstant> for RtcOptions {
488 fn from(value: zx::SyntheticInstant) -> Self {
489 RtcOptions::InitialRtcTime(value)
490 }
491}
492
493impl From<Option<zx::SyntheticInstant>> for RtcOptions {
494 fn from(value: Option<zx::SyntheticInstant>) -> Self {
495 value.map(|t| t.into()).unwrap_or(Self::None)
496 }
497}
498
499async fn setup_rtc(
510 rtc_options: RtcOptions,
511 builder: &RealmBuilder,
512 timekeeper: &ChildRef,
513) -> RtcUpdates {
514 let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
515
516 let rtc_dir = match rtc_options {
517 RtcOptions::InitialRtcTime(initial_time) => {
518 log::debug!("using fake /dev/class/rtc/000");
519 pseudo_directory! {
520 "class" => pseudo_directory! {
521 "rtc" => pseudo_directory! {
522 "000" => vfs::service::host({
523 let rtc_updates = rtc_updates.clone();
524 move |stream| {
525 serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
526 }
527 })
528 }
529 }
530 }
531 }
532 RtcOptions::None => {
533 log::debug!("using an empty /dev/class/rtc directory");
534 pseudo_directory! {
535 "class" => pseudo_directory! {
536 "rtc" => pseudo_directory! {
537 }
538 }
539 }
540 }
541 RtcOptions::InjectedRtc(h) => {
542 log::debug!("using /dev/class/rtc provided by client");
543 pseudo_directory! {
544 "class" => pseudo_directory! {
545 "rtc" => vfs::remote::remote_dir(h)
546 }
547 }
548 }
549 };
550
551 let fake_rtc_server = builder
552 .add_local_child(
553 "fake_rtc",
554 {
555 move |handles| {
556 let rtc_dir = rtc_dir.clone();
557 async move {
558 let _ = &handles;
559 let mut fs = ServiceFs::new();
560 fs.add_remote(
561 "dev",
562 vfs::directory::serve_read_only(
563 rtc_dir,
564 vfs::execution_scope::ExecutionScope::new(),
565 ),
566 );
567 fs.serve_connection(handles.outgoing_dir)
568 .expect("failed to serve fake RTC ServiceFs");
569 fs.collect::<()>().await;
570 Ok(())
571 }
572 .boxed()
573 }
574 },
575 ChildOptions::new().eager(),
576 )
577 .await
578 .unwrap();
579
580 builder
581 .add_route(
582 Route::new()
583 .capability(
584 Capability::directory("dev-rtc").path("/dev/class/rtc").rights(fio::R_STAR_DIR),
585 )
586 .from(&fake_rtc_server)
587 .to(&*timekeeper),
588 )
589 .await
590 .unwrap();
591
592 rtc_updates
593}
594
595async fn serve_fake_rtc(
598 initial_time: zx::SyntheticInstant,
599 rtc_updates: RtcUpdates,
600 mut stream: DeviceRequestStream,
601) {
602 let mut current = zx_time_to_rtc_time(initial_time);
603 while let Some(req) = stream.try_next().await.unwrap() {
604 match req {
605 DeviceRequest::Get { responder } => {
606 log::debug!("serve_fake_rtc: DeviceRequest::Get: {current:?}");
607 responder.send(Ok(¤t)).unwrap();
608 }
609 DeviceRequest::Set2 { rtc, responder } => {
610 log::debug!("serve_fake_rtc: DeviceRequest::Set2: {rtc:?}");
611 rtc_updates.0.lock().push(rtc);
612 current = rtc;
613 responder.send(Ok(())).unwrap();
614 }
615 DeviceRequest::_UnknownMethod { .. } => {}
616 }
617 }
618}
619
620async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
621 stream
622 .try_for_each_concurrent(None, |req| async {
623 let _ = &req;
624 let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
625 puppet.serve_client(push_source);
626 Ok(())
627 })
628 .await
629 .unwrap();
630}
631
632async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
633 while let Some(req) = stream.try_next().await.unwrap() {
634 let MaintenanceRequest::GetWritableUtcClock { responder } = req;
635 responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
636 }
637}
638
639async fn timesource_mock_server(
640 handles: LocalComponentHandles,
641 push_source_puppet: Arc<PushSourcePuppet>,
642) -> Result<(), anyhow::Error> {
643 let mut fs = ServiceFs::new();
644 let mut tasks = vec![];
645
646 fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
647 let puppet_clone = Arc::clone(&push_source_puppet);
648
649 tasks.push(fasync::Task::local(async move {
650 serve_test_control(&*puppet_clone, stream).await;
651 }));
652 });
653
654 fs.serve_connection(handles.outgoing_dir)?;
655 fs.collect::<()>().await;
656
657 Ok(())
658}
659
660async fn maintenance_mock_server(
661 handles: LocalComponentHandles,
662 clock: Arc<zx::Clock>,
663) -> Result<(), anyhow::Error> {
664 let mut fs = ServiceFs::new();
665 let mut tasks = vec![];
666
667 fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
668 let clock_clone = Arc::clone(&clock);
669
670 tasks.push(fasync::Task::local(async move {
671 serve_maintenance(clock_clone, stream).await;
672 }));
673 });
674
675 fs.serve_connection(handles.outgoing_dir)?;
676 fs.collect::<()>().await;
677
678 Ok(())
679}
680
681fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
682 zx::SyntheticInstant::from_nanos(
683 chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
684 )
685}
686
687pub static BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
688 LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT"));
689pub static VALID_RTC_TIME: LazyLock<zx::SyntheticInstant> =
690 LazyLock::new(|| from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT"));
691pub static BEFORE_BACKSTOP_TIME: LazyLock<zx::SyntheticInstant> =
692 LazyLock::new(|| from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT"));
693pub static VALID_TIME: LazyLock<zx::SyntheticInstant> =
694 LazyLock::new(|| from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT"));
695pub static VALID_TIME_2: LazyLock<zx::SyntheticInstant> =
696 LazyLock::new(|| from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT"));
697
698pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
700
701pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
703
704pub fn new_clock() -> Arc<zx::SyntheticClock> {
707 Arc::new(new_nonshareable_clock())
708}
709
710pub fn new_nonshareable_clock() -> zx::SyntheticClock {
712 zx::SyntheticClock::create(zx::ClockOpts::MAPPABLE, Some(*BACKSTOP_TIME)).unwrap()
713}
714
715fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
716 let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
717 fidl_fuchsia_hardware_rtc::Time {
718 seconds: date.second() as u8,
719 minutes: date.minute() as u8,
720 hours: date.hour() as u8,
721 day: date.day() as u8,
722 month: date.month() as u8,
723 year: date.year() as u16,
724 }
725}
726
727pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
728 let date = chrono::Utc
729 .with_ymd_and_hms(
730 rtc_time.year as i32,
731 rtc_time.month as u32,
732 rtc_time.day as u32,
733 rtc_time.hours as u32,
734 rtc_time.minutes as u32,
735 rtc_time.seconds as u32,
736 )
737 .unwrap();
738 zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
739}
740
741pub fn create_cobalt_event_stream(
743 proxy: Arc<MetricEventLoggerQuerierProxy>,
744 log_method: LogMethod,
745) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
746 async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
747 p.watch_logs(PROJECT_ID, log_method)
748 })
749 .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
750 .flatten()
751 .boxed()
752}
753
754#[macro_export]
756macro_rules! poll_until_some {
757 ($condition:expr) => {
758 $crate::poll_until_some_impl(
759 $condition,
760 &$crate::SourceLocation::new(file!(), line!(), column!()),
761 )
762 };
763}
764
765#[macro_export]
768macro_rules! poll_until_some_async {
769 ($condition:expr) => {{
770 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
771 log::info!("=> poll_until_some_async() for {}", &loc);
772 let mut result = None;
773 loop {
774 result = $condition.await;
775 if result.is_some() {
776 break;
777 }
778 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
779 }
780 log::info!("=> poll_until_some_async() done for {}", &loc);
781 result.expect("we loop around while result is None")
782 }};
783}
784
785#[macro_export]
788macro_rules! poll_until_async {
789 ($condition:expr) => {
790 $crate::poll_until_async_impl(
791 $condition,
792 &$crate::SourceLocation::new(file!(), line!(), column!()),
793 )
794 };
795}
796
797#[macro_export]
799macro_rules! poll_until_async_2 {
800 ($condition:expr) => {{
801 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
802 log::info!("=> poll_until_async() for {}", &loc);
803 let mut result = true;
804 loop {
805 result = $condition.await;
806 if result {
807 break;
808 }
809 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
810 }
811 log::info!("=> poll_until_async_2() done for {}", &loc);
812 result
813 }};
814}
815
816#[macro_export]
818macro_rules! poll_until {
819 ($condition:expr) => {
820 $crate::poll_until_impl(
821 $condition,
822 &$crate::SourceLocation::new(file!(), line!(), column!()),
823 )
824 };
825}
826
827pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
829
830pub struct SourceLocation {
831 file: &'static str,
832 line: u32,
833 column: u32,
834}
835
836impl std::fmt::Display for SourceLocation {
837 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
838 write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
839 }
840}
841
842impl SourceLocation {
843 pub fn new(file: &'static str, line: u32, column: u32) -> Self {
844 Self { file, line, column }
845 }
846}
847
848pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
850where
851 F: Fn() -> Option<T>,
852{
853 log::info!("=> poll_until_some() for {}", loc);
854 loop {
855 match poll_fn() {
856 Some(value) => {
857 log::info!("<= poll_until_some() for {}", loc);
858 return value;
859 }
860 None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
861 }
862 }
863}
864
865pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
867where
868 F: Fn() -> Fut,
869 Fut: Future<Output = bool>,
870{
871 log::info!("=> poll_until_async() for {}", loc);
872 while !poll_fn().await {
873 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
874 }
875 log::info!("<= poll_until_async() for {}", loc);
876}
877
878pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
880 log::info!("=> poll_until() for {}", loc);
881 while !poll_fn() {
882 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
883 }
884 log::info!("<= poll_until() for {}", loc);
885}