1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7
8use std::collections::{HashMap, HashSet};
9use std::future::Future;
10use std::sync::{Arc, Weak};
11
12use anyhow::{Context, anyhow};
13use fidl::endpoints::Proxy;
14use fuchsia_component::client::connect_to_protocol_sync;
15use fuchsia_inspect::ArrayProperty;
16use futures::stream::{FusedStream, Next};
17use futures::{FutureExt, StreamExt};
18use starnix_logging::{log_info, log_warn};
19use starnix_sync::{
20 EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
21 RwLockReadGuard,
22};
23use starnix_uapi::arc_key::WeakKey;
24use starnix_uapi::errors::Errno;
25use starnix_uapi::{errno, error};
26use std::collections::VecDeque;
27use std::fmt;
28use zx::{HandleBased, Peered};
29use {
30 fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
31 fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
32};
33
34#[derive(Debug, Default)]
36pub struct WakeupSource {
37 active_count: u64,
39
40 event_count: u64,
43
44 wakeup_count: u64,
50
51 expire_count: u64,
53
54 active_since: zx::MonotonicInstant,
57
58 total_time: zx::MonotonicDuration,
60
61 max_time: zx::MonotonicDuration,
63
64 last_change: zx::MonotonicInstant,
66}
67
68impl WakeupSource {
69 pub fn active_duration(&self) -> zx::MonotonicDuration {
73 if self.active_since == zx::MonotonicInstant::ZERO {
74 zx::MonotonicDuration::default()
75 } else {
76 let now = zx::MonotonicInstant::get();
77 now - self.active_since
78 }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Hash)]
83pub enum WakeupSourceOrigin {
84 WakeLock(String),
85 Epoll(String),
86 HAL(String),
87}
88
89impl std::string::ToString for WakeupSourceOrigin {
90 fn to_string(&self) -> String {
91 match self {
92 WakeupSourceOrigin::WakeLock(lock) => lock.clone(),
93 WakeupSourceOrigin::Epoll(lock) => format!("[epoll] {}", lock),
94 WakeupSourceOrigin::HAL(lock) => format!("[HAL] {}", lock),
95 }
96 }
97}
98
99pub struct SuspendResumeManager {
101 inner: Arc<Mutex<SuspendResumeManagerInner>>,
103
104 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
107
108 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
110}
111
112pub struct SuspendResumeManagerInner {
115 suspend_stats: SuspendStats,
117 sync_on_suspend_enabled: bool,
118
119 suspend_events: VecDeque<SuspendEvent>,
120
121 wakeup_sources: HashMap<WakeupSourceOrigin, WakeupSource>,
123
124 active_lock_reader: zx::EventPair,
127
128 active_lock_writer: zx::EventPair,
132
133 active_wakeup_source_count: u64,
135
136 total_wakeup_source_event_count: u64,
139
140 external_wake_sources: HashMap<zx::Koid, ExternalWakeSource>,
142}
143
144#[derive(Debug)]
145struct ExternalWakeSource {
146 handle: zx::NullableHandle,
148 signals: zx::Signals,
150 name: String,
152}
153
154impl SuspendResumeManager {
155 pub fn add_external_wake_source(
156 &self,
157 handle: zx::NullableHandle,
158 signals: zx::Signals,
159 name: String,
160 ) -> Result<(), Errno> {
161 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
162 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
163 manager
164 .add_wake_source(frunner::ManagerAddWakeSourceRequest {
165 container_job: Some(
166 fuchsia_runtime::job_default()
167 .duplicate(zx::Rights::SAME_RIGHTS)
168 .expect("Failed to dup handle"),
169 ),
170 name: Some(name.clone()),
171 handle: Some(
172 handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
173 ),
174 signals: Some(signals.bits()),
175 ..Default::default()
176 })
177 .map_err(|e| errno!(EIO, e))?;
178
179 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
180 self.lock().external_wake_sources.insert(
181 koid,
182 ExternalWakeSource {
183 handle: handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
184 signals,
185 name,
186 },
187 );
188 Ok(())
189 }
190
191 pub fn remove_external_wake_source(&self, handle: zx::NullableHandle) -> Result<(), Errno> {
192 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
193 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
194
195 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
196 self.lock().external_wake_sources.remove(&koid);
197
198 manager
199 .remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
200 container_job: Some(
201 fuchsia_runtime::job_default()
202 .duplicate(zx::Rights::SAME_RIGHTS)
203 .expect("Failed to dup handle"),
204 ),
205 handle: Some(handle),
206 ..Default::default()
207 })
208 .map_err(|e| errno!(EIO, e))?;
209
210 Ok(())
211 }
212}
213
214pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
215
216#[derive(Clone, Debug)]
217pub enum SuspendEvent {
218 Attempt { time: zx::BootInstant, state: String },
219 Resume { time: zx::BootInstant, reason: String },
220 Fail { time: zx::BootInstant, wakeup_sources: Option<Vec<String>> },
221}
222
223const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
225
226impl Default for SuspendResumeManagerInner {
227 fn default() -> Self {
228 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
229 active_lock_writer
230 .signal_peer(zx::Signals::empty(), zx::Signals::USER_0)
231 .expect("Failed to signal peer");
232 Self {
233 suspend_stats: Default::default(),
234 sync_on_suspend_enabled: false,
235 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
236 wakeup_sources: Default::default(),
237 active_lock_reader,
238 active_lock_writer,
239 active_wakeup_source_count: 0,
240 total_wakeup_source_event_count: 0,
241 external_wake_sources: Default::default(),
242 }
243 }
244}
245
246impl SuspendResumeManagerInner {
247 pub fn can_suspend(&self) -> bool {
249 self.active_wakeup_source_count == 0
250 }
251
252 pub fn active_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
253 self.wakeup_sources
254 .iter()
255 .filter_map(|(name, source)| match name {
256 WakeupSourceOrigin::WakeLock(_) => {
257 if source.active_since > zx::MonotonicInstant::ZERO {
258 Some(name.clone())
259 } else {
260 None
261 }
262 }
263 _ => None,
264 })
265 .collect()
266 }
267
268 pub fn inactive_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
269 self.wakeup_sources
270 .iter()
271 .filter_map(|(name, source)| match name {
272 WakeupSourceOrigin::WakeLock(_) => {
273 if source.active_since == zx::MonotonicInstant::ZERO {
274 Some(name.clone())
275 } else {
276 None
277 }
278 }
279 _ => None,
280 })
281 .collect()
282 }
283
284 fn signal_wake_events(&mut self) {
286 let (clear_mask, set_mask) = if self.active_wakeup_source_count == 0 {
287 (zx::Signals::EVENT_SIGNALED, zx::Signals::USER_0)
288 } else {
289 (zx::Signals::USER_0, zx::Signals::EVENT_SIGNALED)
290 };
291 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
292 }
293
294 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
295 where
296 UpdateFn: FnOnce(&mut SuspendStats),
297 {
298 update(&mut self.suspend_stats);
299 }
300
301 fn add_suspend_event(&mut self, event: SuspendEvent) {
302 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
303 self.suspend_events.pop_front();
304 }
305 self.suspend_events.push_back(event);
306 }
307
308 fn record_suspend_events(&self, node: &inspect::Node) {
309 let events_node = node.create_child("suspend_events");
310 for (i, event) in self.suspend_events.iter().enumerate() {
311 let child = events_node.create_child(i.to_string());
312 match event {
313 SuspendEvent::Attempt { time, state } => {
314 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
315 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
316 }
317 SuspendEvent::Resume { time, reason } => {
318 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
319 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
320 }
321 SuspendEvent::Fail { time, wakeup_sources } => {
322 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
323 if let Some(names) = wakeup_sources {
324 let names_array =
325 child.create_string_array(fobs::WAKEUP_SOURCES_NAME, names.len());
326 for (i, name) in names.iter().enumerate() {
327 names_array.set(i, name);
328 }
329 child.record(names_array);
330 }
331 }
332 }
333 events_node.record(child);
334 }
335 node.record(events_node);
336 }
337
338 fn record_wakeup_sources(&self, node: &inspect::Node) {
339 let wakeup_node = node.create_child("wakeup_sources");
340 for (name, source) in self.wakeup_sources.iter() {
341 let child = wakeup_node.create_child(name.to_string());
342 child.record_uint("active_count", source.active_count);
343 child.record_uint("event_count", source.event_count);
344 child.record_uint("wakeup_count", source.wakeup_count);
345 child.record_uint("expire_count", source.expire_count);
346 child.record_int("active_since (ns)", source.active_since.into_nanos());
347 child.record_int("active_duration_mono (ns)", source.active_duration().into_nanos());
350 child.record_int("total_time (ms)", source.total_time.into_millis());
351 child.record_int("max_time (ms)", source.max_time.into_millis());
352 child.record_int("last_change (ns)", source.last_change.into_nanos());
353 wakeup_node.record(child);
354 }
355 node.record(wakeup_node);
356 }
357}
358
359pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
360
361impl Default for SuspendResumeManager {
362 fn default() -> Self {
363 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
364 Default::default();
365 let message_counters_clone = message_counters.clone();
366 let root = inspect::component::inspector().root();
367 root.record_lazy_values("message_counters", move || {
368 let message_counters_clone = message_counters_clone.clone();
369 async move {
370 let inspector = fuchsia_inspect::Inspector::default();
371 let root = inspector.root();
372 let mut message_counters = message_counters_clone.lock();
373 message_counters.retain(|c| c.0.upgrade().is_some());
374 let message_counters_inspect =
375 root.create_string_array("message_counters", message_counters.len());
376 for (i, c) in message_counters.iter().enumerate() {
377 let counter = c.0.upgrade().expect("lost counter should be retained");
378 message_counters_inspect.set(i, counter.to_string());
379 }
380 root.record(message_counters_inspect);
381 Ok(inspector)
382 }
383 .boxed()
384 });
385 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
386 let inner_clone = inner.clone();
387 root.record_lazy_child("wakeup_sources", move || {
388 let inner = inner_clone.clone();
389 async move {
390 let inspector = fuchsia_inspect::Inspector::default();
391 let root = inspector.root();
392 let state = inner.lock();
393
394 state.record_suspend_events(root);
395 state.record_wakeup_sources(root);
396
397 Ok(inspector)
398 }
399 .boxed()
400 });
401 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
402 }
403}
404
405impl SuspendResumeManager {
406 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
408 self.inner.lock()
409 }
410
411 pub fn init(
413 self: &SuspendResumeManagerHandle,
414 system_task: &CurrentTask,
415 ) -> Result<(), anyhow::Error> {
416 let handoff = system_task
417 .kernel()
418 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
419 .into_sync_proxy();
420 match handoff.take(zx::MonotonicInstant::INFINITE) {
421 Ok(parent_lease) => {
422 let parent_lease = parent_lease
423 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
424 drop(parent_lease)
425 }
426 Err(e) => {
427 if e.is_closed() {
428 log_warn!(
429 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
430 );
431 } else {
432 return Err(e).context("Handoff::Take");
433 }
434 }
435 }
436 Ok(())
437 }
438
439 pub fn activate_wakeup_source(&self, origin: WakeupSourceOrigin) -> bool {
440 let mut state = self.lock();
441 let did_activate = {
442 let entry = state.wakeup_sources.entry(origin).or_default();
443 let now = zx::MonotonicInstant::get();
444 entry.active_count += 1;
445 entry.event_count += 1;
446 entry.last_change = now;
447 if entry.active_since == zx::MonotonicInstant::ZERO {
448 entry.active_since = now;
449 true
450 } else {
451 false
452 }
453 };
454 if did_activate {
455 state.active_wakeup_source_count += 1;
456 state.signal_wake_events();
457 }
458 did_activate
459 }
460
461 pub fn deactivate_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
462 self.remove_wakeup_source(origin, false)
463 }
464
465 pub fn timeout_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
466 self.remove_wakeup_source(origin, true)
467 }
468
469 fn remove_wakeup_source(&self, origin: &WakeupSourceOrigin, timed_out: bool) -> bool {
470 let mut state = self.lock();
471 let removed = match state.wakeup_sources.get_mut(origin) {
472 Some(entry) if entry.active_since != zx::MonotonicInstant::ZERO => {
473 if timed_out {
474 entry.expire_count += 1;
475 }
476
477 let now = zx::MonotonicInstant::get();
478 let duration = now - entry.active_since;
479 entry.total_time += duration;
480 entry.max_time = std::cmp::max(duration, entry.max_time);
481 entry.last_change = now;
482 entry.active_since = zx::MonotonicInstant::ZERO;
483
484 true
485 }
486 _ => false,
487 };
488 if removed {
489 state.active_wakeup_source_count -= 1;
490 state.total_wakeup_source_event_count += 1;
491 state.signal_wake_events();
492 }
493 removed
494 }
495
496 pub fn add_message_counter(
497 &self,
498 name: &str,
499 counter: Option<zx::Counter>,
500 ) -> OwnedMessageCounterHandle {
501 let container_counter = OwnedMessageCounter::new(name, counter);
502 let mut message_counters = self.message_counters.lock();
503 message_counters.insert(WeakKey::from(&container_counter));
504 message_counters.retain(|c| c.0.upgrade().is_some());
505 container_counter
506 }
507
508 pub fn has_nonzero_message_counter(&self) -> bool {
509 self.message_counters.lock().iter().any(|c| {
510 let Some(c) = c.0.upgrade() else {
511 return false;
512 };
513 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
514 })
515 }
516
517 pub fn duplicate_lock_event(&self) -> zx::EventPair {
520 let state = self.lock();
521 state
522 .active_lock_reader
523 .duplicate_handle(zx::Rights::SAME_RIGHTS)
524 .expect("Failed to duplicate handle")
525 }
526
527 pub fn suspend_stats(&self) -> SuspendStats {
529 self.lock().suspend_stats.clone()
530 }
531
532 pub fn total_wakeup_events(&self) -> u64 {
533 let state = self.lock();
534 state.total_wakeup_source_event_count + state.suspend_stats.success_count
535 }
536
537 pub fn sync_on_suspend_enabled(&self) -> bool {
541 self.lock().sync_on_suspend_enabled.clone()
542 }
543
544 pub fn set_sync_on_suspend(&self, enable: bool) {
547 self.lock().sync_on_suspend_enabled = enable;
548 }
549
550 pub fn suspend_states(&self) -> HashSet<SuspendState> {
552 HashSet::from([SuspendState::Idle])
554 }
555
556 pub fn suspend(
557 &self,
558 locked: &mut Locked<FileOpsCore>,
559 suspend_state: SuspendState,
560 ) -> Result<(), Errno> {
561 let suspend_start_time = zx::BootInstant::get();
562 let mut state = self.lock();
563 state.add_suspend_event(SuspendEvent::Attempt {
564 time: suspend_start_time,
565 state: suspend_state.to_string(),
566 });
567
568 if !state.can_suspend() {
570 self.report_failed_suspension(state, "kernel wake lock");
571 return error!(EINVAL);
572 }
573
574 let external_wake_source_abort = state.external_wake_sources.values().find_map(|source| {
576 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
577 Some(source.name.clone())
578 } else {
579 None
580 }
581 });
582
583 if let Some(name) = external_wake_source_abort {
584 self.report_failed_suspension(state, &format!("external wake source: {}", name));
585 return error!(EINVAL);
586 }
587
588 std::mem::drop(state);
592
593 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
596
597 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
598 .expect("Failed to connect to manager");
599 fuchsia_trace::duration!("power", "suspend_container:fidl");
600
601 let container_job = Some(
602 fuchsia_runtime::job_default()
603 .duplicate(zx::Rights::SAME_RIGHTS)
604 .expect("Failed to dup handle"),
605 );
606 let wake_lock_event = Some(self.duplicate_lock_event());
607
608 log_info!("Requesting container suspension.");
609 match manager.suspend_container(
610 frunner::ManagerSuspendContainerRequest {
611 container_job,
612 wake_locks: wake_lock_event,
613 ..Default::default()
614 },
615 zx::Instant::INFINITE,
616 ) {
617 Ok(Ok(res)) => {
618 self.report_container_resumed(suspend_start_time, res);
619 }
620 e => {
621 let state = self.lock();
622 self.report_failed_suspension(state, &format!("runner error {:?}", e));
623 return error!(EINVAL);
624 }
625 }
626 Ok(())
627 }
628
629 fn report_container_resumed(
630 &self,
631 suspend_start_time: zx::BootInstant,
632 res: frunner::ManagerSuspendContainerResponse,
633 ) {
634 let wake_time = zx::BootInstant::get();
635 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
638 log_info!("Resuming from container suspension: {:?}", resume_reason);
639 let mut state = self.lock();
640 state.update_suspend_stats(|suspend_stats| {
641 suspend_stats.success_count += 1;
642 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
643 suspend_stats.last_time_in_sleep =
644 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
645 suspend_stats.last_resume_reason = resume_reason.clone();
646 });
647 state.add_suspend_event(SuspendEvent::Resume {
648 time: wake_time,
649 reason: resume_reason.unwrap_or_default(),
650 });
651 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
652 }
653
654 fn report_failed_suspension(
655 &self,
656 mut state: MutexGuard<'_, SuspendResumeManagerInner>,
657 failure_reason: &str,
658 ) {
659 let wake_time = zx::BootInstant::get();
660 state.update_suspend_stats(|suspend_stats| {
661 suspend_stats.fail_count += 1;
662 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
663 suspend_stats.last_resume_reason = None;
664 });
665
666 let mut wakeup_sources: Vec<String> = state
667 .wakeup_sources
668 .iter_mut()
669 .filter_map(|(origin, source)| {
670 if source.active_since > zx::MonotonicInstant::ZERO {
671 source.wakeup_count += 1;
672 Some(origin.to_string())
673 } else {
674 None
675 }
676 })
677 .collect();
678
679 for source in state.external_wake_sources.values() {
680 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
681 wakeup_sources.push(source.name.clone());
682 }
683 }
684
685 let last_resume_reason = format!("Abort: {}", wakeup_sources.join(" "));
686 state.update_suspend_stats(|suspend_stats| {
687 suspend_stats.last_resume_reason = Some(last_resume_reason);
689 });
690
691 log_warn!(
693 "Suspend failed due to {:?}. Here are the active wakeup sources: {:?}",
694 failure_reason,
695 wakeup_sources,
696 );
697 state.add_suspend_event(SuspendEvent::Fail {
699 time: wake_time,
700 wakeup_sources: Some(wakeup_sources),
701 });
702 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
703 }
704
705 pub fn acquire_ebpf_suspend_lock<'a, L>(
706 &'a self,
707 locked: &'a mut Locked<L>,
708 ) -> EbpfSuspendGuard<'a>
709 where
710 L: LockBefore<EbpfSuspendLock>,
711 {
712 self.ebpf_suspend_lock.read(locked)
713 }
714}
715
716pub trait OnWakeOps: Send + Sync {
717 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
718}
719
720pub fn create_proxy_for_wake_events_counter_zero(
738 remote_channel: zx::Channel,
739 name: String,
740) -> (zx::Channel, zx::Counter) {
741 let (local_proxy, kernel_channel) = zx::Channel::create();
742 let counter = zx::Counter::create();
743
744 let local_counter =
745 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
746
747 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
748 .expect("failed");
749 manager
750 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
751 container_job: Some(
752 fuchsia_runtime::job_default()
753 .duplicate(zx::Rights::SAME_RIGHTS)
754 .expect("Failed to dup handle"),
755 ),
756 container_channel: Some(kernel_channel),
757 remote_channel: Some(remote_channel),
758 counter: Some(counter),
759 name: Some(name),
760 ..Default::default()
761 })
762 .expect("Failed to create proxy");
763
764 (local_proxy, local_counter)
765}
766
767pub fn create_proxy_for_wake_events_counter(
786 remote_channel: zx::Channel,
787 name: String,
788) -> (zx::Channel, zx::Counter) {
789 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
790
791 counter.add(1).expect("Failed to add to counter");
794
795 (proxy, counter)
796}
797
798pub fn mark_proxy_message_handled(counter: &zx::Counter) {
803 counter.add(-1).expect("Failed to decrement counter");
804}
805
806pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
808 counter.write(0).expect("Failed to decrement counter");
809}
810
811pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
815 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
816 .expect("failed");
817 manager
818 .register_wake_watcher(
819 frunner::ManagerRegisterWakeWatcherRequest {
820 watcher: Some(watcher),
821 ..Default::default()
822 },
823 zx::Instant::INFINITE,
824 )
825 .expect("Failed to register wake watcher");
826}
827
828#[derive(Debug)]
837pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
838
839impl Drop for SharedMessageCounter {
840 fn drop(&mut self) {
841 if let Some(message_counter) = self.0.upgrade() {
842 message_counter.mark_handled();
843 }
844 }
845}
846
847pub struct OwnedMessageCounter {
852 name: String,
853 counter: Option<zx::Counter>,
854}
855pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
856
857impl Drop for OwnedMessageCounter {
858 fn drop(&mut self) {
863 self.counter.as_ref().map(mark_all_proxy_messages_handled);
864 }
865}
866
867impl OwnedMessageCounter {
868 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
869 Arc::new(Self { name: name.to_string(), counter })
870 }
871
872 pub fn mark_handled(&self) {
877 self.counter.as_ref().map(mark_proxy_message_handled);
878 }
879
880 pub fn mark_pending(&self) {
884 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
885 }
886
887 pub fn share(
891 self: &OwnedMessageCounterHandle,
892 new_pending_message: bool,
893 ) -> SharedMessageCounter {
894 if new_pending_message {
895 self.mark_pending();
896 }
897 SharedMessageCounter(Arc::downgrade(self))
898 }
899}
900
901impl fmt::Display for OwnedMessageCounter {
902 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
903 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
904 }
905}
906
907pub struct ContainerWakingProxy<P: Proxy> {
912 counter: OwnedMessageCounterHandle,
913 proxy: P,
914}
915
916impl<P: Proxy> ContainerWakingProxy<P> {
917 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
918 Self { counter, proxy }
919 }
920
921 pub fn call<T, F, R>(&self, future: F) -> R
925 where
926 F: FnOnce(&P) -> R,
927 R: Future<Output = T>,
928 {
929 let f = future(&self.proxy);
937 self.counter.mark_handled();
938 f
939 }
940}
941
942pub struct ContainerWakingStream<S: FusedStream + Unpin> {
947 counter: OwnedMessageCounterHandle,
948 stream: S,
949}
950
951impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
952 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
953 Self { counter, stream }
954 }
955
956 pub fn next(&mut self) -> Next<'_, S> {
960 let is_terminated = self.stream.is_terminated();
962 let next = self.stream.next();
963 if !is_terminated {
964 self.counter.mark_handled();
965 }
966 next
967 }
968}
969
970#[cfg(test)]
971mod test {
972 use super::*;
973 use diagnostics_assertions::assert_data_tree;
974 use fidl::endpoints::create_proxy_and_stream;
975 use fidl_test_placeholders::{EchoMarker, EchoRequest};
976 use futures::StreamExt;
977 use zx::{self, HandleBased};
978 use {fuchsia_async as fasync, fuchsia_inspect as inspect};
979
980 #[::fuchsia::test]
981 fn test_counter_zero_initialization() {
982 let (_endpoint, endpoint) = zx::Channel::create();
983 let (_channel, counter) =
984 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
985 assert_eq!(counter.read(), Ok(0));
986 }
987
988 #[::fuchsia::test]
989 fn test_counter_initialization() {
990 let (_endpoint, endpoint) = zx::Channel::create();
991 let (_channel, counter) =
992 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
993 assert_eq!(counter.read(), Ok(1));
994 }
995
996 #[::fuchsia::test]
997 async fn test_container_waking_proxy() {
998 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
999 let server_task = fasync::Task::spawn(async move {
1000 let request = stream.next().await.unwrap().unwrap();
1001 match request {
1002 EchoRequest::EchoString { value, responder } => {
1003 responder.send(value.as_deref()).unwrap();
1004 }
1005 }
1006 });
1007
1008 let counter = zx::Counter::create();
1009 counter.add(5).unwrap();
1010 assert_eq!(counter.read(), Ok(5));
1011
1012 let waking_proxy = ContainerWakingProxy {
1013 counter: OwnedMessageCounter::new(
1014 "test_proxy",
1015 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1016 ),
1017 proxy,
1018 };
1019
1020 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
1021
1022 assert_eq!(counter.read(), Ok(4));
1024
1025 let response = response_future.await.unwrap();
1026 assert_eq!(response.as_deref(), Some("hello"));
1027
1028 server_task.await;
1029
1030 assert_eq!(counter.read(), Ok(4));
1031 drop(waking_proxy);
1032 assert_eq!(counter.read(), Ok(0));
1033 }
1034
1035 #[::fuchsia::test]
1036 async fn test_container_waking_stream() {
1037 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
1038 let client_task = fasync::Task::spawn(async move {
1039 let response = proxy.echo_string(Some("hello")).await.unwrap();
1040 assert_eq!(response.as_deref(), Some("hello"));
1041 });
1042
1043 let counter = zx::Counter::create();
1044 counter.add(5).unwrap();
1045 assert_eq!(counter.read(), Ok(5));
1046
1047 let mut waking_stream = ContainerWakingStream {
1048 counter: OwnedMessageCounter::new(
1049 "test_stream",
1050 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1051 ),
1052 stream,
1053 };
1054
1055 let request_future = waking_stream.next();
1056
1057 assert_eq!(counter.read(), Ok(4));
1059
1060 let request = request_future.await.unwrap().unwrap();
1061 match request {
1062 EchoRequest::EchoString { value, responder } => {
1063 assert_eq!(value.as_deref(), Some("hello"));
1064 responder.send(value.as_deref()).unwrap();
1065 }
1066 }
1067
1068 client_task.await;
1069
1070 assert_eq!(counter.read(), Ok(4));
1071 drop(waking_stream);
1072 assert_eq!(counter.read(), Ok(0));
1073 }
1074
1075 #[::fuchsia::test]
1076 async fn test_message_counters_inspect() {
1077 let power_manager = SuspendResumeManager::default();
1078 let inspector = inspect::component::inspector();
1079
1080 let zx_counter = zx::Counter::create();
1081 let counter_handle = power_manager.add_message_counter(
1082 "test_counter",
1083 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1084 );
1085
1086 zx_counter.add(1).unwrap();
1087
1088 assert_data_tree!(inspector, root: contains {
1089 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1090 });
1091
1092 zx_counter.add(1).unwrap();
1093 assert_data_tree!(inspector, root: contains {
1094 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1095 });
1096
1097 drop(counter_handle);
1098 assert_data_tree!(inspector, root: contains {
1099 message_counters: Vec::<String>::new(),
1100 });
1101 }
1102
1103 #[::fuchsia::test]
1104 fn test_shared_message_counter() {
1105 let zx_counter = zx::Counter::create();
1107 let owned_counter = OwnedMessageCounter::new(
1108 "test_shared_counter",
1109 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1110 );
1111 zx_counter.add(5).unwrap();
1112 assert_eq!(zx_counter.read(), Ok(5));
1113
1114 let shared_counter = owned_counter.share(false);
1116 assert_eq!(zx_counter.read(), Ok(5));
1117
1118 drop(shared_counter);
1120 assert_eq!(zx_counter.read(), Ok(4));
1121
1122 let shared_counter_2 = owned_counter.share(true);
1124 assert_eq!(zx_counter.read(), Ok(5));
1125
1126 drop(shared_counter_2);
1128 assert_eq!(zx_counter.read(), Ok(4));
1129
1130 let shared_counter_3 = owned_counter.share(false);
1132 assert_eq!(zx_counter.read(), Ok(4));
1133
1134 drop(owned_counter);
1136 assert_eq!(zx_counter.read(), Ok(0));
1137
1138 drop(shared_counter_3);
1140 assert_eq!(zx_counter.read(), Ok(0));
1141 }
1142
1143 #[::fuchsia::test]
1144 async fn test_container_waking_event_termination() {
1145 let stream = futures::stream::iter(vec![0]).fuse();
1146 let counter = zx::Counter::create();
1147 counter.add(2).unwrap();
1148 assert_eq!(counter.read(), Ok(2));
1149 let mut waking_stream = ContainerWakingStream {
1150 counter: OwnedMessageCounter::new(
1151 "test_stream",
1152 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1153 ),
1154 stream,
1155 };
1156
1157 assert_eq!(waking_stream.next().await, Some(0));
1158 assert_eq!(counter.read(), Ok(1));
1159
1160 assert_eq!(waking_stream.next().await, None);
1161 assert_eq!(waking_stream.next().await, None);
1162 assert_eq!(counter.read(), Ok(0));
1164 }
1165
1166 #[::fuchsia::test]
1167 fn test_external_wake_source_aborts_suspend() {
1168 let manager = SuspendResumeManager::default();
1169 let event = zx::Event::create();
1170 let signals = zx::Signals::USER_0;
1171
1172 let res = manager.add_external_wake_source(
1177 event.duplicate(zx::Rights::SAME_RIGHTS).unwrap().into_handle(),
1178 signals,
1179 "test_external".to_string(),
1180 );
1181
1182 if res.is_err() {
1183 println!(
1184 "Skipping test_external_wake_source_aborts_suspend because runner connection failed: {:?}",
1185 res
1186 );
1187 return;
1188 }
1189
1190 event.signal(zx::Signals::empty(), signals).unwrap();
1192
1193 let state = manager.lock();
1194 assert!(state.external_wake_sources.contains_key(&event.koid().unwrap()));
1195 }
1196}