1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7
8use std::collections::{HashMap, HashSet};
9use std::future::Future;
10use std::sync::{Arc, Weak};
11
12use anyhow::{Context, anyhow};
13use fidl::endpoints::Proxy;
14use fidl_fuchsia_power_observability as fobs;
15use fidl_fuchsia_session_power as fpower;
16use fidl_fuchsia_starnix_runner as frunner;
17use fuchsia_component::client::connect_to_protocol_sync;
18use fuchsia_inspect as inspect;
19use fuchsia_inspect::ArrayProperty;
20use futures::stream::{FusedStream, Next};
21use futures::{FutureExt, StreamExt};
22use starnix_logging::{log_info, log_warn};
23use starnix_sync::{
24 EbpfSuspendLock, FileOpsCore, LockBefore, LockDepReadGuard, Locked, Mutex, MutexGuard,
25 OrderedRwLock,
26};
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::Peered;
33
34#[derive(Debug, Default)]
36pub struct WakeupSource {
37 active_count: u64,
39
40 event_count: u64,
43
44 wakeup_count: u64,
50
51 expire_count: u64,
53
54 active_since: zx::MonotonicInstant,
57
58 total_time: zx::MonotonicDuration,
60
61 max_time: zx::MonotonicDuration,
63
64 last_change: zx::MonotonicInstant,
66}
67
68impl WakeupSource {
69 pub fn active_duration(&self) -> zx::MonotonicDuration {
73 if self.active_since == zx::MonotonicInstant::ZERO {
74 zx::MonotonicDuration::default()
75 } else {
76 let now = zx::MonotonicInstant::get();
77 now - self.active_since
78 }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Hash)]
83pub enum WakeupSourceOrigin {
84 WakeLock(String),
85 Epoll(String),
86 HAL(String),
87}
88
89impl std::string::ToString for WakeupSourceOrigin {
90 fn to_string(&self) -> String {
91 match self {
92 WakeupSourceOrigin::WakeLock(lock) => lock.clone(),
93 WakeupSourceOrigin::Epoll(lock) => format!("[epoll] {}", lock),
94 WakeupSourceOrigin::HAL(lock) => format!("[HAL] {}", lock),
95 }
96 }
97}
98
99pub struct SuspendResumeManager {
101 inner: Arc<Mutex<SuspendResumeManagerInner>>,
103
104 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
107
108 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
110}
111
112pub struct SuspendResumeManagerInner {
115 suspend_stats: SuspendStats,
117 sync_on_suspend_enabled: bool,
118
119 suspend_events: VecDeque<SuspendEvent>,
120
121 wakeup_sources: HashMap<WakeupSourceOrigin, WakeupSource>,
123
124 active_lock_reader: zx::EventPair,
127
128 active_lock_writer: zx::EventPair,
132
133 active_wakeup_source_count: u64,
135
136 total_wakeup_source_event_count: u64,
139
140 external_wake_sources: HashMap<zx::Koid, ExternalWakeSource>,
142}
143
144#[derive(Debug)]
145struct ExternalWakeSource {
146 handle: zx::NullableHandle,
148 signals: zx::Signals,
150 name: String,
152}
153
154impl SuspendResumeManager {
155 pub fn add_external_wake_source(
156 &self,
157 handle: zx::NullableHandle,
158 signals: zx::Signals,
159 name: String,
160 ) -> Result<(), Errno> {
161 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
162 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
163 manager
164 .add_wake_source(frunner::ManagerAddWakeSourceRequest {
165 container_job: Some(
166 fuchsia_runtime::job_default()
167 .duplicate_handle(zx::Rights::SAME_RIGHTS)
168 .expect("Failed to dup handle"),
169 ),
170 name: Some(name.clone()),
171 handle: Some(
172 handle.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
173 ),
174 signals: Some(signals.bits()),
175 ..Default::default()
176 })
177 .map_err(|e| errno!(EIO, e))?;
178
179 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
180 self.lock().external_wake_sources.insert(
181 koid,
182 ExternalWakeSource {
183 handle: handle
184 .duplicate_handle(zx::Rights::SAME_RIGHTS)
185 .map_err(|e| errno!(EIO, e))?,
186 signals,
187 name,
188 },
189 );
190 Ok(())
191 }
192
193 pub fn remove_external_wake_source(&self, handle: zx::NullableHandle) -> Result<(), Errno> {
194 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
195 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
196
197 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
198 self.lock().external_wake_sources.remove(&koid);
199
200 manager
201 .remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
202 container_job: Some(
203 fuchsia_runtime::job_default()
204 .duplicate_handle(zx::Rights::SAME_RIGHTS)
205 .expect("Failed to dup handle"),
206 ),
207 handle: Some(handle),
208 ..Default::default()
209 })
210 .map_err(|e| errno!(EIO, e))?;
211
212 Ok(())
213 }
214}
215
216pub type EbpfSuspendGuard<'a> = LockDepReadGuard<'a, ()>;
217
218#[derive(Clone, Debug)]
219pub enum SuspendEvent {
220 Attempt { time: zx::BootInstant, state: String },
221 Resume { time: zx::BootInstant, reason: String },
222 Fail { time: zx::BootInstant, wakeup_sources: Option<Vec<String>> },
223}
224
225const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
227
228impl Default for SuspendResumeManagerInner {
229 fn default() -> Self {
230 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
231 active_lock_writer
232 .signal_peer(zx::Signals::empty(), zx::Signals::USER_0)
233 .expect("Failed to signal peer");
234 Self {
235 suspend_stats: Default::default(),
236 sync_on_suspend_enabled: false,
237 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
238 wakeup_sources: Default::default(),
239 active_lock_reader,
240 active_lock_writer,
241 active_wakeup_source_count: 0,
242 total_wakeup_source_event_count: 0,
243 external_wake_sources: Default::default(),
244 }
245 }
246}
247
248impl SuspendResumeManagerInner {
249 pub fn can_suspend(&self) -> bool {
251 self.active_wakeup_source_count == 0
252 }
253
254 pub fn active_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
255 self.wakeup_sources
256 .iter()
257 .filter_map(|(name, source)| match name {
258 WakeupSourceOrigin::WakeLock(_) => {
259 if source.active_since > zx::MonotonicInstant::ZERO {
260 Some(name.clone())
261 } else {
262 None
263 }
264 }
265 _ => None,
266 })
267 .collect()
268 }
269
270 pub fn inactive_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
271 self.wakeup_sources
272 .iter()
273 .filter_map(|(name, source)| match name {
274 WakeupSourceOrigin::WakeLock(_) => {
275 if source.active_since == zx::MonotonicInstant::ZERO {
276 Some(name.clone())
277 } else {
278 None
279 }
280 }
281 _ => None,
282 })
283 .collect()
284 }
285
286 fn signal_wake_events(&mut self) {
288 let (clear_mask, set_mask) = if self.active_wakeup_source_count == 0 {
289 (zx::Signals::EVENT_SIGNALED, zx::Signals::USER_0)
290 } else {
291 (zx::Signals::USER_0, zx::Signals::EVENT_SIGNALED)
292 };
293 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
294 }
295
296 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
297 where
298 UpdateFn: FnOnce(&mut SuspendStats),
299 {
300 update(&mut self.suspend_stats);
301 }
302
303 fn add_suspend_event(&mut self, event: SuspendEvent) {
304 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
305 self.suspend_events.pop_front();
306 }
307 self.suspend_events.push_back(event);
308 }
309
310 fn record_suspend_events(&self, node: &inspect::Node) {
311 let events_node = node.create_child("suspend_events");
312 for (i, event) in self.suspend_events.iter().enumerate() {
313 let child = events_node.create_child(i.to_string());
314 match event {
315 SuspendEvent::Attempt { time, state } => {
316 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
317 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
318 }
319 SuspendEvent::Resume { time, reason } => {
320 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
321 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
322 }
323 SuspendEvent::Fail { time, wakeup_sources } => {
324 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
325 if let Some(names) = wakeup_sources {
326 let names_array =
327 child.create_string_array(fobs::WAKEUP_SOURCES_NAME, names.len());
328 for (i, name) in names.iter().enumerate() {
329 names_array.set(i, name);
330 }
331 child.record(names_array);
332 }
333 }
334 }
335 events_node.record(child);
336 }
337 node.record(events_node);
338 }
339
340 fn record_wakeup_sources(&self, node: &inspect::Node) {
341 let wakeup_node = node.create_child("wakeup_sources");
342 for (name, source) in self.wakeup_sources.iter() {
343 let child = wakeup_node.create_child(name.to_string());
344 child.record_uint("active_count", source.active_count);
345 child.record_uint("event_count", source.event_count);
346 child.record_uint("wakeup_count", source.wakeup_count);
347 child.record_uint("expire_count", source.expire_count);
348 child.record_int("active_since (ns)", source.active_since.into_nanos());
349 child.record_int("active_duration_mono (ns)", source.active_duration().into_nanos());
352 child.record_int("total_time (ms)", source.total_time.into_millis());
353 child.record_int("max_time (ms)", source.max_time.into_millis());
354 child.record_int("last_change (ns)", source.last_change.into_nanos());
355 wakeup_node.record(child);
356 }
357 node.record(wakeup_node);
358 }
359}
360
361pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
362
363impl Default for SuspendResumeManager {
364 fn default() -> Self {
365 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
366 Default::default();
367 let message_counters_clone = message_counters.clone();
368 let root = inspect::component::inspector().root();
369 root.record_lazy_values("message_counters", move || {
370 let message_counters_clone = message_counters_clone.clone();
371 async move {
372 let inspector = fuchsia_inspect::Inspector::default();
373 let root = inspector.root();
374 let message_counters = message_counters_clone.lock();
375 let active_counter_names: Vec<String> = message_counters
376 .iter()
377 .filter_map(|c| c.0.upgrade())
378 .map(|c| c.to_string())
379 .collect();
380 let message_counters_inspect =
381 root.create_string_array("message_counters", active_counter_names.len());
382 for (i, name) in active_counter_names.iter().enumerate() {
383 message_counters_inspect.set(i, name);
384 }
385 root.record(message_counters_inspect);
386 Ok(inspector)
387 }
388 .boxed()
389 });
390 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
391 let inner_clone = inner.clone();
392 root.record_lazy_child("wakeup_sources", move || {
393 let inner = inner_clone.clone();
394 async move {
395 let inspector = fuchsia_inspect::Inspector::default();
396 let root = inspector.root();
397 let state = inner.lock();
398
399 state.record_suspend_events(root);
400 state.record_wakeup_sources(root);
401
402 Ok(inspector)
403 }
404 .boxed()
405 });
406 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
407 }
408}
409
410impl SuspendResumeManager {
411 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
413 self.inner.lock()
414 }
415
416 pub fn init(
418 self: &SuspendResumeManagerHandle,
419 system_task: &CurrentTask,
420 ) -> Result<(), anyhow::Error> {
421 let handoff = system_task
422 .kernel()
423 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
424 .into_sync_proxy();
425 match handoff.take(zx::MonotonicInstant::INFINITE) {
426 Ok(parent_lease) => {
427 let parent_lease = parent_lease
428 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
429 drop(parent_lease)
430 }
431 Err(e) => {
432 if e.is_closed() {
433 log_warn!(
434 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
435 );
436 } else {
437 return Err(e).context("Handoff::Take");
438 }
439 }
440 }
441 Ok(())
442 }
443
444 pub fn activate_wakeup_source(&self, origin: WakeupSourceOrigin) -> bool {
445 let mut state = self.lock();
446 let did_activate = {
447 let entry = state.wakeup_sources.entry(origin).or_default();
448 let now = zx::MonotonicInstant::get();
449 entry.active_count += 1;
450 entry.event_count += 1;
451 entry.last_change = now;
452 if entry.active_since == zx::MonotonicInstant::ZERO {
453 entry.active_since = now;
454 true
455 } else {
456 false
457 }
458 };
459 if did_activate {
460 state.active_wakeup_source_count += 1;
461 state.signal_wake_events();
462 }
463 did_activate
464 }
465
466 pub fn deactivate_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
467 self.remove_wakeup_source(origin, false)
468 }
469
470 pub fn timeout_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
471 self.remove_wakeup_source(origin, true)
472 }
473
474 fn remove_wakeup_source(&self, origin: &WakeupSourceOrigin, timed_out: bool) -> bool {
475 let mut state = self.lock();
476 let removed = match state.wakeup_sources.get_mut(origin) {
477 Some(entry) if entry.active_since != zx::MonotonicInstant::ZERO => {
478 if timed_out {
479 entry.expire_count += 1;
480 }
481
482 let now = zx::MonotonicInstant::get();
483 let duration = now - entry.active_since;
484 entry.total_time += duration;
485 entry.max_time = std::cmp::max(duration, entry.max_time);
486 entry.last_change = now;
487 entry.active_since = zx::MonotonicInstant::ZERO;
488
489 true
490 }
491 _ => false,
492 };
493 if removed {
494 state.active_wakeup_source_count -= 1;
495 state.total_wakeup_source_event_count += 1;
496 state.signal_wake_events();
497 }
498 removed
499 }
500
501 pub fn add_message_counter(
502 &self,
503 name: &str,
504 counter: Option<zx::Counter>,
505 ) -> OwnedMessageCounterHandle {
506 let container_counter = OwnedMessageCounter::new(name, counter);
507 let mut message_counters = self.message_counters.lock();
508 message_counters.insert(WeakKey::from(&container_counter));
509 message_counters.retain(|c| c.0.upgrade().is_some());
510 container_counter
511 }
512
513 pub fn has_nonzero_message_counter(&self) -> bool {
514 self.message_counters.lock().iter().any(|c| {
515 let Some(c) = c.0.upgrade() else {
516 return false;
517 };
518 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
519 })
520 }
521
522 pub fn duplicate_lock_event(&self) -> zx::EventPair {
525 let state = self.lock();
526 state
527 .active_lock_reader
528 .duplicate_handle(zx::Rights::SAME_RIGHTS)
529 .expect("Failed to duplicate handle")
530 }
531
532 pub fn suspend_stats(&self) -> SuspendStats {
534 self.lock().suspend_stats.clone()
535 }
536
537 pub fn total_wakeup_events(&self) -> u64 {
538 let state = self.lock();
539 state.total_wakeup_source_event_count + state.suspend_stats.success_count
540 }
541
542 pub fn sync_on_suspend_enabled(&self) -> bool {
546 self.lock().sync_on_suspend_enabled.clone()
547 }
548
549 pub fn set_sync_on_suspend(&self, enable: bool) {
552 self.lock().sync_on_suspend_enabled = enable;
553 }
554
555 pub fn suspend_states(&self) -> HashSet<SuspendState> {
557 HashSet::from([SuspendState::Idle])
559 }
560
561 pub fn suspend(
562 &self,
563 locked: &mut Locked<FileOpsCore>,
564 suspend_state: SuspendState,
565 ) -> Result<(), Errno> {
566 let suspend_start_time = zx::BootInstant::get();
567 let mut state = self.lock();
568 state.add_suspend_event(SuspendEvent::Attempt {
569 time: suspend_start_time,
570 state: suspend_state.to_string(),
571 });
572
573 if !state.can_suspend() {
575 self.report_failed_suspension(state, "kernel wake lock");
576 return error!(EINVAL);
577 }
578
579 let external_wake_source_abort = state.external_wake_sources.values().find_map(|source| {
581 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
582 Some(source.name.clone())
583 } else {
584 None
585 }
586 });
587
588 if let Some(name) = external_wake_source_abort {
589 self.report_failed_suspension(state, &format!("external wake source: {}", name));
590 return error!(EINVAL);
591 }
592
593 std::mem::drop(state);
597
598 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
601
602 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
603 .expect("Failed to connect to manager");
604 fuchsia_trace::duration!("power", "suspend_container:fidl");
605
606 let container_job = Some(
607 fuchsia_runtime::job_default()
608 .duplicate_handle(zx::Rights::SAME_RIGHTS)
609 .expect("Failed to dup handle"),
610 );
611 let wake_lock_event = Some(self.duplicate_lock_event());
612
613 log_info!("Requesting container suspension.");
614 match manager.suspend_container(
615 frunner::ManagerSuspendContainerRequest {
616 container_job,
617 wake_locks: wake_lock_event,
618 ..Default::default()
619 },
620 zx::Instant::INFINITE,
621 ) {
622 Ok(Ok(res)) => {
623 self.report_container_resumed(suspend_start_time, res);
624 }
625 e => {
626 let state = self.lock();
627 self.report_failed_suspension(state, &format!("runner error {:?}", e));
628 return error!(EINVAL);
629 }
630 }
631 Ok(())
632 }
633
634 fn report_container_resumed(
635 &self,
636 suspend_start_time: zx::BootInstant,
637 res: frunner::ManagerSuspendContainerResponse,
638 ) {
639 let wake_time = zx::BootInstant::get();
640 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
643 log_info!("Resuming from container suspension: {:?}", resume_reason);
644 let mut state = self.lock();
645 state.update_suspend_stats(|suspend_stats| {
646 suspend_stats.success_count += 1;
647 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
648 suspend_stats.last_time_in_sleep =
649 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
650 suspend_stats.last_resume_reason = resume_reason.clone();
651 });
652 state.add_suspend_event(SuspendEvent::Resume {
653 time: wake_time,
654 reason: resume_reason.unwrap_or_default(),
655 });
656 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
657 }
658
659 fn report_failed_suspension(
660 &self,
661 mut state: MutexGuard<'_, SuspendResumeManagerInner>,
662 failure_reason: &str,
663 ) {
664 let wake_time = zx::BootInstant::get();
665 state.update_suspend_stats(|suspend_stats| {
666 suspend_stats.fail_count += 1;
667 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
668 suspend_stats.last_resume_reason = None;
669 });
670
671 let mut wakeup_sources: Vec<String> = state
672 .wakeup_sources
673 .iter_mut()
674 .filter_map(|(origin, source)| {
675 if source.active_since > zx::MonotonicInstant::ZERO {
676 source.wakeup_count += 1;
677 Some(origin.to_string())
678 } else {
679 None
680 }
681 })
682 .collect();
683
684 for source in state.external_wake_sources.values() {
685 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
686 wakeup_sources.push(source.name.clone());
687 }
688 }
689
690 let last_resume_reason = format!("Abort: {}", wakeup_sources.join(" "));
691 state.update_suspend_stats(|suspend_stats| {
692 suspend_stats.last_resume_reason = Some(last_resume_reason);
694 });
695
696 log_warn!(
698 "Suspend failed due to {:?}. Here are the active wakeup sources: {:?}",
699 failure_reason,
700 wakeup_sources,
701 );
702 state.add_suspend_event(SuspendEvent::Fail {
704 time: wake_time,
705 wakeup_sources: Some(wakeup_sources),
706 });
707 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
708 }
709
710 pub fn acquire_ebpf_suspend_lock<'a, L>(
711 &'a self,
712 locked: &'a mut Locked<L>,
713 ) -> EbpfSuspendGuard<'a>
714 where
715 L: LockBefore<EbpfSuspendLock>,
716 {
717 self.ebpf_suspend_lock.read(locked)
718 }
719}
720
721pub trait OnWakeOps: Send + Sync {
723 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
732}
733
734pub fn create_proxy_for_wake_events_counter_zero(
752 remote_channel: zx::Channel,
753 name: String,
754) -> (zx::Channel, zx::Counter) {
755 let (local_proxy, kernel_channel) = zx::Channel::create();
756 let counter = zx::Counter::create();
757
758 let local_counter =
759 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
760
761 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
762 .expect("failed");
763 manager
764 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
765 container_job: Some(
766 fuchsia_runtime::job_default()
767 .duplicate_handle(zx::Rights::SAME_RIGHTS)
768 .expect("Failed to dup handle"),
769 ),
770 container_channel: Some(kernel_channel),
771 remote_channel: Some(remote_channel),
772 counter: Some(counter),
773 name: Some(name),
774 ..Default::default()
775 })
776 .expect("Failed to create proxy");
777
778 (local_proxy, local_counter)
779}
780
781pub fn create_proxy_for_wake_events_counter(
800 remote_channel: zx::Channel,
801 name: String,
802) -> (zx::Channel, zx::Counter) {
803 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
804
805 counter.add(1).expect("Failed to add to counter");
808
809 (proxy, counter)
810}
811
812pub fn mark_proxy_message_handled(counter: &zx::Counter) {
817 counter.add(-1).expect("Failed to decrement counter");
818}
819
820pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
822 counter.write(0).expect("Failed to decrement counter");
823}
824
825pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
829 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
830 .expect("failed");
831 manager
832 .register_wake_watcher(
833 frunner::ManagerRegisterWakeWatcherRequest {
834 watcher: Some(watcher),
835 ..Default::default()
836 },
837 zx::Instant::INFINITE,
838 )
839 .expect("Failed to register wake watcher");
840}
841
842#[derive(Debug)]
851pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
852
853impl Drop for SharedMessageCounter {
854 fn drop(&mut self) {
855 if let Some(message_counter) = self.0.upgrade() {
856 message_counter.mark_handled();
857 }
858 }
859}
860
861pub struct OwnedMessageCounter {
866 name: String,
867 counter: Option<zx::Counter>,
868}
869pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
870
871impl Drop for OwnedMessageCounter {
872 fn drop(&mut self) {
877 self.counter.as_ref().map(mark_all_proxy_messages_handled);
878 }
879}
880
881impl OwnedMessageCounter {
882 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
883 Arc::new(Self { name: name.to_string(), counter })
884 }
885
886 pub fn mark_handled(&self) {
891 self.counter.as_ref().map(mark_proxy_message_handled);
892 }
893
894 pub fn mark_pending(&self) {
898 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
899 }
900
901 pub fn share(
905 self: &OwnedMessageCounterHandle,
906 new_pending_message: bool,
907 ) -> SharedMessageCounter {
908 if new_pending_message {
909 self.mark_pending();
910 }
911 SharedMessageCounter(Arc::downgrade(self))
912 }
913}
914
915impl fmt::Display for OwnedMessageCounter {
916 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
917 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
918 }
919}
920
921pub struct ContainerWakingProxy<P: Proxy> {
926 counter: OwnedMessageCounterHandle,
927 proxy: P,
928}
929
930impl<P: Proxy> ContainerWakingProxy<P> {
931 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
932 Self { counter, proxy }
933 }
934
935 pub fn call<T, F, R>(&self, future: F) -> R
939 where
940 F: FnOnce(&P) -> R,
941 R: Future<Output = T>,
942 {
943 let f = future(&self.proxy);
951 self.counter.mark_handled();
952 f
953 }
954}
955
956pub struct ContainerWakingStream<S: FusedStream + Unpin> {
961 counter: OwnedMessageCounterHandle,
962 stream: S,
963}
964
965impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
966 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
967 Self { counter, stream }
968 }
969
970 pub fn next(&mut self) -> Next<'_, S> {
974 let is_terminated = self.stream.is_terminated();
976 let next = self.stream.next();
977 if !is_terminated {
978 self.counter.mark_handled();
979 }
980 next
981 }
982}
983
984#[cfg(test)]
985mod test {
986 use super::*;
987 use diagnostics_assertions::assert_data_tree;
988 use fidl::endpoints::create_proxy_and_stream;
989 use fidl_test_placeholders::{EchoMarker, EchoRequest};
990 use fuchsia_async as fasync;
991 use fuchsia_inspect as inspect;
992 use futures::StreamExt;
993
994 #[::fuchsia::test]
995 fn test_counter_zero_initialization() {
996 let (_endpoint, endpoint) = zx::Channel::create();
997 let (_channel, counter) =
998 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
999 assert_eq!(counter.read(), Ok(0));
1000 }
1001
1002 #[::fuchsia::test]
1003 fn test_counter_initialization() {
1004 let (_endpoint, endpoint) = zx::Channel::create();
1005 let (_channel, counter) =
1006 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
1007 assert_eq!(counter.read(), Ok(1));
1008 }
1009
1010 #[::fuchsia::test]
1011 async fn test_container_waking_proxy() {
1012 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
1013 let server_task = fasync::Task::spawn(async move {
1014 let request = stream.next().await.unwrap().unwrap();
1015 match request {
1016 EchoRequest::EchoString { value, responder } => {
1017 responder.send(value.as_deref()).unwrap();
1018 }
1019 }
1020 });
1021
1022 let counter = zx::Counter::create();
1023 counter.add(5).unwrap();
1024 assert_eq!(counter.read(), Ok(5));
1025
1026 let waking_proxy = ContainerWakingProxy {
1027 counter: OwnedMessageCounter::new(
1028 "test_proxy",
1029 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1030 ),
1031 proxy,
1032 };
1033
1034 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
1035
1036 assert_eq!(counter.read(), Ok(4));
1038
1039 let response = response_future.await.unwrap();
1040 assert_eq!(response.as_deref(), Some("hello"));
1041
1042 server_task.await;
1043
1044 assert_eq!(counter.read(), Ok(4));
1045 drop(waking_proxy);
1046 assert_eq!(counter.read(), Ok(0));
1047 }
1048
1049 #[::fuchsia::test]
1050 async fn test_container_waking_stream() {
1051 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
1052 let client_task = fasync::Task::spawn(async move {
1053 let response = proxy.echo_string(Some("hello")).await.unwrap();
1054 assert_eq!(response.as_deref(), Some("hello"));
1055 });
1056
1057 let counter = zx::Counter::create();
1058 counter.add(5).unwrap();
1059 assert_eq!(counter.read(), Ok(5));
1060
1061 let mut waking_stream = ContainerWakingStream {
1062 counter: OwnedMessageCounter::new(
1063 "test_stream",
1064 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1065 ),
1066 stream,
1067 };
1068
1069 let request_future = waking_stream.next();
1070
1071 assert_eq!(counter.read(), Ok(4));
1073
1074 let request = request_future.await.unwrap().unwrap();
1075 match request {
1076 EchoRequest::EchoString { value, responder } => {
1077 assert_eq!(value.as_deref(), Some("hello"));
1078 responder.send(value.as_deref()).unwrap();
1079 }
1080 }
1081
1082 client_task.await;
1083
1084 assert_eq!(counter.read(), Ok(4));
1085 drop(waking_stream);
1086 assert_eq!(counter.read(), Ok(0));
1087 }
1088
1089 #[::fuchsia::test]
1090 async fn test_message_counters_inspect() {
1091 let power_manager = SuspendResumeManager::default();
1092 let inspector = inspect::component::inspector();
1093
1094 let zx_counter = zx::Counter::create();
1095 let counter_handle = power_manager.add_message_counter(
1096 "test_counter",
1097 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1098 );
1099
1100 zx_counter.add(1).unwrap();
1101
1102 assert_data_tree!(inspector, root: contains {
1103 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1104 });
1105
1106 zx_counter.add(1).unwrap();
1107 assert_data_tree!(inspector, root: contains {
1108 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1109 });
1110
1111 drop(counter_handle);
1112 assert_data_tree!(inspector, root: contains {
1113 message_counters: Vec::<String>::new(),
1114 });
1115 }
1116
1117 #[::fuchsia::test]
1118 fn test_shared_message_counter() {
1119 let zx_counter = zx::Counter::create();
1121 let owned_counter = OwnedMessageCounter::new(
1122 "test_shared_counter",
1123 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1124 );
1125 zx_counter.add(5).unwrap();
1126 assert_eq!(zx_counter.read(), Ok(5));
1127
1128 let shared_counter = owned_counter.share(false);
1130 assert_eq!(zx_counter.read(), Ok(5));
1131
1132 drop(shared_counter);
1134 assert_eq!(zx_counter.read(), Ok(4));
1135
1136 let shared_counter_2 = owned_counter.share(true);
1138 assert_eq!(zx_counter.read(), Ok(5));
1139
1140 drop(shared_counter_2);
1142 assert_eq!(zx_counter.read(), Ok(4));
1143
1144 let shared_counter_3 = owned_counter.share(false);
1146 assert_eq!(zx_counter.read(), Ok(4));
1147
1148 drop(owned_counter);
1150 assert_eq!(zx_counter.read(), Ok(0));
1151
1152 drop(shared_counter_3);
1154 assert_eq!(zx_counter.read(), Ok(0));
1155 }
1156
1157 #[::fuchsia::test]
1158 async fn test_container_waking_event_termination() {
1159 let stream = futures::stream::iter(vec![0]).fuse();
1160 let counter = zx::Counter::create();
1161 counter.add(2).unwrap();
1162 assert_eq!(counter.read(), Ok(2));
1163 let mut waking_stream = ContainerWakingStream {
1164 counter: OwnedMessageCounter::new(
1165 "test_stream",
1166 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1167 ),
1168 stream,
1169 };
1170
1171 assert_eq!(waking_stream.next().await, Some(0));
1172 assert_eq!(counter.read(), Ok(1));
1173
1174 assert_eq!(waking_stream.next().await, None);
1175 assert_eq!(waking_stream.next().await, None);
1176 assert_eq!(counter.read(), Ok(0));
1178 }
1179
1180 #[::fuchsia::test]
1181 fn test_external_wake_source_aborts_suspend() {
1182 let manager = SuspendResumeManager::default();
1183 let event = zx::Event::create();
1184 let signals = zx::Signals::USER_0;
1185
1186 let res = manager.add_external_wake_source(
1191 event.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap().into_handle(),
1192 signals,
1193 "test_external".to_string(),
1194 );
1195
1196 if res.is_err() {
1197 println!(
1198 "Skipping test_external_wake_source_aborts_suspend because runner connection failed: {:?}",
1199 res
1200 );
1201 return;
1202 }
1203
1204 event.signal(zx::Signals::empty(), signals).unwrap();
1206
1207 let state = manager.lock();
1208 assert!(state.external_wake_sources.contains_key(&event.koid().unwrap()));
1209 }
1210}