1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7
8use std::collections::{HashMap, HashSet};
9use std::future::Future;
10use std::sync::{Arc, Weak};
11
12use anyhow::{Context, anyhow};
13use fidl::endpoints::Proxy;
14use fidl_fuchsia_power_observability as fobs;
15use fidl_fuchsia_session_power as fpower;
16use fidl_fuchsia_starnix_runner as frunner;
17use fuchsia_component::client::connect_to_protocol_sync;
18use fuchsia_inspect as inspect;
19use fuchsia_inspect::ArrayProperty;
20use futures::stream::{FusedStream, Next};
21use futures::{FutureExt, StreamExt};
22use starnix_logging::{log_info, log_warn};
23use starnix_sync::{
24 EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
25 RwLockReadGuard,
26};
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::{HandleBased, Peered};
33
34#[derive(Debug, Default)]
36pub struct WakeupSource {
37 active_count: u64,
39
40 event_count: u64,
43
44 wakeup_count: u64,
50
51 expire_count: u64,
53
54 active_since: zx::MonotonicInstant,
57
58 total_time: zx::MonotonicDuration,
60
61 max_time: zx::MonotonicDuration,
63
64 last_change: zx::MonotonicInstant,
66}
67
68impl WakeupSource {
69 pub fn active_duration(&self) -> zx::MonotonicDuration {
73 if self.active_since == zx::MonotonicInstant::ZERO {
74 zx::MonotonicDuration::default()
75 } else {
76 let now = zx::MonotonicInstant::get();
77 now - self.active_since
78 }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Hash)]
83pub enum WakeupSourceOrigin {
84 WakeLock(String),
85 Epoll(String),
86 HAL(String),
87}
88
89impl std::string::ToString for WakeupSourceOrigin {
90 fn to_string(&self) -> String {
91 match self {
92 WakeupSourceOrigin::WakeLock(lock) => lock.clone(),
93 WakeupSourceOrigin::Epoll(lock) => format!("[epoll] {}", lock),
94 WakeupSourceOrigin::HAL(lock) => format!("[HAL] {}", lock),
95 }
96 }
97}
98
99pub struct SuspendResumeManager {
101 inner: Arc<Mutex<SuspendResumeManagerInner>>,
103
104 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
107
108 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
110}
111
112pub struct SuspendResumeManagerInner {
115 suspend_stats: SuspendStats,
117 sync_on_suspend_enabled: bool,
118
119 suspend_events: VecDeque<SuspendEvent>,
120
121 wakeup_sources: HashMap<WakeupSourceOrigin, WakeupSource>,
123
124 active_lock_reader: zx::EventPair,
127
128 active_lock_writer: zx::EventPair,
132
133 active_wakeup_source_count: u64,
135
136 total_wakeup_source_event_count: u64,
139
140 external_wake_sources: HashMap<zx::Koid, ExternalWakeSource>,
142}
143
144#[derive(Debug)]
145struct ExternalWakeSource {
146 handle: zx::NullableHandle,
148 signals: zx::Signals,
150 name: String,
152}
153
154impl SuspendResumeManager {
155 pub fn add_external_wake_source(
156 &self,
157 handle: zx::NullableHandle,
158 signals: zx::Signals,
159 name: String,
160 ) -> Result<(), Errno> {
161 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
162 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
163 manager
164 .add_wake_source(frunner::ManagerAddWakeSourceRequest {
165 container_job: Some(
166 fuchsia_runtime::job_default()
167 .duplicate(zx::Rights::SAME_RIGHTS)
168 .expect("Failed to dup handle"),
169 ),
170 name: Some(name.clone()),
171 handle: Some(
172 handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
173 ),
174 signals: Some(signals.bits()),
175 ..Default::default()
176 })
177 .map_err(|e| errno!(EIO, e))?;
178
179 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
180 self.lock().external_wake_sources.insert(
181 koid,
182 ExternalWakeSource {
183 handle: handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
184 signals,
185 name,
186 },
187 );
188 Ok(())
189 }
190
191 pub fn remove_external_wake_source(&self, handle: zx::NullableHandle) -> Result<(), Errno> {
192 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
193 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
194
195 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
196 self.lock().external_wake_sources.remove(&koid);
197
198 manager
199 .remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
200 container_job: Some(
201 fuchsia_runtime::job_default()
202 .duplicate(zx::Rights::SAME_RIGHTS)
203 .expect("Failed to dup handle"),
204 ),
205 handle: Some(handle),
206 ..Default::default()
207 })
208 .map_err(|e| errno!(EIO, e))?;
209
210 Ok(())
211 }
212}
213
214pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
215
216#[derive(Clone, Debug)]
217pub enum SuspendEvent {
218 Attempt { time: zx::BootInstant, state: String },
219 Resume { time: zx::BootInstant, reason: String },
220 Fail { time: zx::BootInstant, wakeup_sources: Option<Vec<String>> },
221}
222
223const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
225
226impl Default for SuspendResumeManagerInner {
227 fn default() -> Self {
228 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
229 active_lock_writer
230 .signal_peer(zx::Signals::empty(), zx::Signals::USER_0)
231 .expect("Failed to signal peer");
232 Self {
233 suspend_stats: Default::default(),
234 sync_on_suspend_enabled: false,
235 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
236 wakeup_sources: Default::default(),
237 active_lock_reader,
238 active_lock_writer,
239 active_wakeup_source_count: 0,
240 total_wakeup_source_event_count: 0,
241 external_wake_sources: Default::default(),
242 }
243 }
244}
245
246impl SuspendResumeManagerInner {
247 pub fn can_suspend(&self) -> bool {
249 self.active_wakeup_source_count == 0
250 }
251
252 pub fn active_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
253 self.wakeup_sources
254 .iter()
255 .filter_map(|(name, source)| match name {
256 WakeupSourceOrigin::WakeLock(_) => {
257 if source.active_since > zx::MonotonicInstant::ZERO {
258 Some(name.clone())
259 } else {
260 None
261 }
262 }
263 _ => None,
264 })
265 .collect()
266 }
267
268 pub fn inactive_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
269 self.wakeup_sources
270 .iter()
271 .filter_map(|(name, source)| match name {
272 WakeupSourceOrigin::WakeLock(_) => {
273 if source.active_since == zx::MonotonicInstant::ZERO {
274 Some(name.clone())
275 } else {
276 None
277 }
278 }
279 _ => None,
280 })
281 .collect()
282 }
283
284 fn signal_wake_events(&mut self) {
286 let (clear_mask, set_mask) = if self.active_wakeup_source_count == 0 {
287 (zx::Signals::EVENT_SIGNALED, zx::Signals::USER_0)
288 } else {
289 (zx::Signals::USER_0, zx::Signals::EVENT_SIGNALED)
290 };
291 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
292 }
293
294 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
295 where
296 UpdateFn: FnOnce(&mut SuspendStats),
297 {
298 update(&mut self.suspend_stats);
299 }
300
301 fn add_suspend_event(&mut self, event: SuspendEvent) {
302 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
303 self.suspend_events.pop_front();
304 }
305 self.suspend_events.push_back(event);
306 }
307
308 fn record_suspend_events(&self, node: &inspect::Node) {
309 let events_node = node.create_child("suspend_events");
310 for (i, event) in self.suspend_events.iter().enumerate() {
311 let child = events_node.create_child(i.to_string());
312 match event {
313 SuspendEvent::Attempt { time, state } => {
314 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
315 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
316 }
317 SuspendEvent::Resume { time, reason } => {
318 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
319 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
320 }
321 SuspendEvent::Fail { time, wakeup_sources } => {
322 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
323 if let Some(names) = wakeup_sources {
324 let names_array =
325 child.create_string_array(fobs::WAKEUP_SOURCES_NAME, names.len());
326 for (i, name) in names.iter().enumerate() {
327 names_array.set(i, name);
328 }
329 child.record(names_array);
330 }
331 }
332 }
333 events_node.record(child);
334 }
335 node.record(events_node);
336 }
337
338 fn record_wakeup_sources(&self, node: &inspect::Node) {
339 let wakeup_node = node.create_child("wakeup_sources");
340 for (name, source) in self.wakeup_sources.iter() {
341 let child = wakeup_node.create_child(name.to_string());
342 child.record_uint("active_count", source.active_count);
343 child.record_uint("event_count", source.event_count);
344 child.record_uint("wakeup_count", source.wakeup_count);
345 child.record_uint("expire_count", source.expire_count);
346 child.record_int("active_since (ns)", source.active_since.into_nanos());
347 child.record_int("active_duration_mono (ns)", source.active_duration().into_nanos());
350 child.record_int("total_time (ms)", source.total_time.into_millis());
351 child.record_int("max_time (ms)", source.max_time.into_millis());
352 child.record_int("last_change (ns)", source.last_change.into_nanos());
353 wakeup_node.record(child);
354 }
355 node.record(wakeup_node);
356 }
357}
358
359pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
360
361impl Default for SuspendResumeManager {
362 fn default() -> Self {
363 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
364 Default::default();
365 let message_counters_clone = message_counters.clone();
366 let root = inspect::component::inspector().root();
367 root.record_lazy_values("message_counters", move || {
368 let message_counters_clone = message_counters_clone.clone();
369 async move {
370 let inspector = fuchsia_inspect::Inspector::default();
371 let root = inspector.root();
372 let mut message_counters = message_counters_clone.lock();
373 message_counters.retain(|c| c.0.upgrade().is_some());
374 let message_counters_inspect =
375 root.create_string_array("message_counters", message_counters.len());
376 for (i, c) in message_counters.iter().enumerate() {
377 let counter = c.0.upgrade().expect("lost counter should be retained");
378 message_counters_inspect.set(i, counter.to_string());
379 }
380 root.record(message_counters_inspect);
381 Ok(inspector)
382 }
383 .boxed()
384 });
385 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
386 let inner_clone = inner.clone();
387 root.record_lazy_child("wakeup_sources", move || {
388 let inner = inner_clone.clone();
389 async move {
390 let inspector = fuchsia_inspect::Inspector::default();
391 let root = inspector.root();
392 let state = inner.lock();
393
394 state.record_suspend_events(root);
395 state.record_wakeup_sources(root);
396
397 Ok(inspector)
398 }
399 .boxed()
400 });
401 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
402 }
403}
404
405impl SuspendResumeManager {
406 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
408 self.inner.lock()
409 }
410
411 pub fn init(
413 self: &SuspendResumeManagerHandle,
414 system_task: &CurrentTask,
415 ) -> Result<(), anyhow::Error> {
416 let handoff = system_task
417 .kernel()
418 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
419 .into_sync_proxy();
420 match handoff.take(zx::MonotonicInstant::INFINITE) {
421 Ok(parent_lease) => {
422 let parent_lease = parent_lease
423 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
424 drop(parent_lease)
425 }
426 Err(e) => {
427 if e.is_closed() {
428 log_warn!(
429 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
430 );
431 } else {
432 return Err(e).context("Handoff::Take");
433 }
434 }
435 }
436 Ok(())
437 }
438
439 pub fn activate_wakeup_source(&self, origin: WakeupSourceOrigin) -> bool {
440 let mut state = self.lock();
441 let did_activate = {
442 let entry = state.wakeup_sources.entry(origin).or_default();
443 let now = zx::MonotonicInstant::get();
444 entry.active_count += 1;
445 entry.event_count += 1;
446 entry.last_change = now;
447 if entry.active_since == zx::MonotonicInstant::ZERO {
448 entry.active_since = now;
449 true
450 } else {
451 false
452 }
453 };
454 if did_activate {
455 state.active_wakeup_source_count += 1;
456 state.signal_wake_events();
457 }
458 did_activate
459 }
460
461 pub fn deactivate_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
462 self.remove_wakeup_source(origin, false)
463 }
464
465 pub fn timeout_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
466 self.remove_wakeup_source(origin, true)
467 }
468
469 fn remove_wakeup_source(&self, origin: &WakeupSourceOrigin, timed_out: bool) -> bool {
470 let mut state = self.lock();
471 let removed = match state.wakeup_sources.get_mut(origin) {
472 Some(entry) if entry.active_since != zx::MonotonicInstant::ZERO => {
473 if timed_out {
474 entry.expire_count += 1;
475 }
476
477 let now = zx::MonotonicInstant::get();
478 let duration = now - entry.active_since;
479 entry.total_time += duration;
480 entry.max_time = std::cmp::max(duration, entry.max_time);
481 entry.last_change = now;
482 entry.active_since = zx::MonotonicInstant::ZERO;
483
484 true
485 }
486 _ => false,
487 };
488 if removed {
489 state.active_wakeup_source_count -= 1;
490 state.total_wakeup_source_event_count += 1;
491 state.signal_wake_events();
492 }
493 removed
494 }
495
496 pub fn add_message_counter(
497 &self,
498 name: &str,
499 counter: Option<zx::Counter>,
500 ) -> OwnedMessageCounterHandle {
501 let container_counter = OwnedMessageCounter::new(name, counter);
502 let mut message_counters = self.message_counters.lock();
503 message_counters.insert(WeakKey::from(&container_counter));
504 message_counters.retain(|c| c.0.upgrade().is_some());
505 container_counter
506 }
507
508 pub fn has_nonzero_message_counter(&self) -> bool {
509 self.message_counters.lock().iter().any(|c| {
510 let Some(c) = c.0.upgrade() else {
511 return false;
512 };
513 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
514 })
515 }
516
517 pub fn duplicate_lock_event(&self) -> zx::EventPair {
520 let state = self.lock();
521 state
522 .active_lock_reader
523 .duplicate_handle(zx::Rights::SAME_RIGHTS)
524 .expect("Failed to duplicate handle")
525 }
526
527 pub fn suspend_stats(&self) -> SuspendStats {
529 self.lock().suspend_stats.clone()
530 }
531
532 pub fn total_wakeup_events(&self) -> u64 {
533 let state = self.lock();
534 state.total_wakeup_source_event_count + state.suspend_stats.success_count
535 }
536
537 pub fn sync_on_suspend_enabled(&self) -> bool {
541 self.lock().sync_on_suspend_enabled.clone()
542 }
543
544 pub fn set_sync_on_suspend(&self, enable: bool) {
547 self.lock().sync_on_suspend_enabled = enable;
548 }
549
550 pub fn suspend_states(&self) -> HashSet<SuspendState> {
552 HashSet::from([SuspendState::Idle])
554 }
555
556 pub fn suspend(
557 &self,
558 locked: &mut Locked<FileOpsCore>,
559 suspend_state: SuspendState,
560 ) -> Result<(), Errno> {
561 let suspend_start_time = zx::BootInstant::get();
562 let mut state = self.lock();
563 state.add_suspend_event(SuspendEvent::Attempt {
564 time: suspend_start_time,
565 state: suspend_state.to_string(),
566 });
567
568 if !state.can_suspend() {
570 self.report_failed_suspension(state, "kernel wake lock");
571 return error!(EINVAL);
572 }
573
574 let external_wake_source_abort = state.external_wake_sources.values().find_map(|source| {
576 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
577 Some(source.name.clone())
578 } else {
579 None
580 }
581 });
582
583 if let Some(name) = external_wake_source_abort {
584 self.report_failed_suspension(state, &format!("external wake source: {}", name));
585 return error!(EINVAL);
586 }
587
588 std::mem::drop(state);
592
593 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
596
597 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
598 .expect("Failed to connect to manager");
599 fuchsia_trace::duration!("power", "suspend_container:fidl");
600
601 let container_job = Some(
602 fuchsia_runtime::job_default()
603 .duplicate(zx::Rights::SAME_RIGHTS)
604 .expect("Failed to dup handle"),
605 );
606 let wake_lock_event = Some(self.duplicate_lock_event());
607
608 log_info!("Requesting container suspension.");
609 match manager.suspend_container(
610 frunner::ManagerSuspendContainerRequest {
611 container_job,
612 wake_locks: wake_lock_event,
613 ..Default::default()
614 },
615 zx::Instant::INFINITE,
616 ) {
617 Ok(Ok(res)) => {
618 self.report_container_resumed(suspend_start_time, res);
619 }
620 e => {
621 let state = self.lock();
622 self.report_failed_suspension(state, &format!("runner error {:?}", e));
623 return error!(EINVAL);
624 }
625 }
626 Ok(())
627 }
628
629 fn report_container_resumed(
630 &self,
631 suspend_start_time: zx::BootInstant,
632 res: frunner::ManagerSuspendContainerResponse,
633 ) {
634 let wake_time = zx::BootInstant::get();
635 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
638 log_info!("Resuming from container suspension: {:?}", resume_reason);
639 let mut state = self.lock();
640 state.update_suspend_stats(|suspend_stats| {
641 suspend_stats.success_count += 1;
642 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
643 suspend_stats.last_time_in_sleep =
644 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
645 suspend_stats.last_resume_reason = resume_reason.clone();
646 });
647 state.add_suspend_event(SuspendEvent::Resume {
648 time: wake_time,
649 reason: resume_reason.unwrap_or_default(),
650 });
651 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
652 }
653
654 fn report_failed_suspension(
655 &self,
656 mut state: MutexGuard<'_, SuspendResumeManagerInner>,
657 failure_reason: &str,
658 ) {
659 let wake_time = zx::BootInstant::get();
660 state.update_suspend_stats(|suspend_stats| {
661 suspend_stats.fail_count += 1;
662 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
663 suspend_stats.last_resume_reason = None;
664 });
665
666 let mut wakeup_sources: Vec<String> = state
667 .wakeup_sources
668 .iter_mut()
669 .filter_map(|(origin, source)| {
670 if source.active_since > zx::MonotonicInstant::ZERO {
671 source.wakeup_count += 1;
672 Some(origin.to_string())
673 } else {
674 None
675 }
676 })
677 .collect();
678
679 for source in state.external_wake_sources.values() {
680 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
681 wakeup_sources.push(source.name.clone());
682 }
683 }
684
685 let last_resume_reason = format!("Abort: {}", wakeup_sources.join(" "));
686 state.update_suspend_stats(|suspend_stats| {
687 suspend_stats.last_resume_reason = Some(last_resume_reason);
689 });
690
691 log_warn!(
693 "Suspend failed due to {:?}. Here are the active wakeup sources: {:?}",
694 failure_reason,
695 wakeup_sources,
696 );
697 state.add_suspend_event(SuspendEvent::Fail {
699 time: wake_time,
700 wakeup_sources: Some(wakeup_sources),
701 });
702 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
703 }
704
705 pub fn acquire_ebpf_suspend_lock<'a, L>(
706 &'a self,
707 locked: &'a mut Locked<L>,
708 ) -> EbpfSuspendGuard<'a>
709 where
710 L: LockBefore<EbpfSuspendLock>,
711 {
712 self.ebpf_suspend_lock.read(locked)
713 }
714}
715
716pub trait OnWakeOps: Send + Sync {
718 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
727}
728
729pub fn create_proxy_for_wake_events_counter_zero(
747 remote_channel: zx::Channel,
748 name: String,
749) -> (zx::Channel, zx::Counter) {
750 let (local_proxy, kernel_channel) = zx::Channel::create();
751 let counter = zx::Counter::create();
752
753 let local_counter =
754 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
755
756 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
757 .expect("failed");
758 manager
759 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
760 container_job: Some(
761 fuchsia_runtime::job_default()
762 .duplicate(zx::Rights::SAME_RIGHTS)
763 .expect("Failed to dup handle"),
764 ),
765 container_channel: Some(kernel_channel),
766 remote_channel: Some(remote_channel),
767 counter: Some(counter),
768 name: Some(name),
769 ..Default::default()
770 })
771 .expect("Failed to create proxy");
772
773 (local_proxy, local_counter)
774}
775
776pub fn create_proxy_for_wake_events_counter(
795 remote_channel: zx::Channel,
796 name: String,
797) -> (zx::Channel, zx::Counter) {
798 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
799
800 counter.add(1).expect("Failed to add to counter");
803
804 (proxy, counter)
805}
806
807pub fn mark_proxy_message_handled(counter: &zx::Counter) {
812 counter.add(-1).expect("Failed to decrement counter");
813}
814
815pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
817 counter.write(0).expect("Failed to decrement counter");
818}
819
820pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
824 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
825 .expect("failed");
826 manager
827 .register_wake_watcher(
828 frunner::ManagerRegisterWakeWatcherRequest {
829 watcher: Some(watcher),
830 ..Default::default()
831 },
832 zx::Instant::INFINITE,
833 )
834 .expect("Failed to register wake watcher");
835}
836
837#[derive(Debug)]
846pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
847
848impl Drop for SharedMessageCounter {
849 fn drop(&mut self) {
850 if let Some(message_counter) = self.0.upgrade() {
851 message_counter.mark_handled();
852 }
853 }
854}
855
856pub struct OwnedMessageCounter {
861 name: String,
862 counter: Option<zx::Counter>,
863}
864pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
865
866impl Drop for OwnedMessageCounter {
867 fn drop(&mut self) {
872 self.counter.as_ref().map(mark_all_proxy_messages_handled);
873 }
874}
875
876impl OwnedMessageCounter {
877 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
878 Arc::new(Self { name: name.to_string(), counter })
879 }
880
881 pub fn mark_handled(&self) {
886 self.counter.as_ref().map(mark_proxy_message_handled);
887 }
888
889 pub fn mark_pending(&self) {
893 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
894 }
895
896 pub fn share(
900 self: &OwnedMessageCounterHandle,
901 new_pending_message: bool,
902 ) -> SharedMessageCounter {
903 if new_pending_message {
904 self.mark_pending();
905 }
906 SharedMessageCounter(Arc::downgrade(self))
907 }
908}
909
910impl fmt::Display for OwnedMessageCounter {
911 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
912 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
913 }
914}
915
916pub struct ContainerWakingProxy<P: Proxy> {
921 counter: OwnedMessageCounterHandle,
922 proxy: P,
923}
924
925impl<P: Proxy> ContainerWakingProxy<P> {
926 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
927 Self { counter, proxy }
928 }
929
930 pub fn call<T, F, R>(&self, future: F) -> R
934 where
935 F: FnOnce(&P) -> R,
936 R: Future<Output = T>,
937 {
938 let f = future(&self.proxy);
946 self.counter.mark_handled();
947 f
948 }
949}
950
951pub struct ContainerWakingStream<S: FusedStream + Unpin> {
956 counter: OwnedMessageCounterHandle,
957 stream: S,
958}
959
960impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
961 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
962 Self { counter, stream }
963 }
964
965 pub fn next(&mut self) -> Next<'_, S> {
969 let is_terminated = self.stream.is_terminated();
971 let next = self.stream.next();
972 if !is_terminated {
973 self.counter.mark_handled();
974 }
975 next
976 }
977}
978
979#[cfg(test)]
980mod test {
981 use super::*;
982 use diagnostics_assertions::assert_data_tree;
983 use fidl::endpoints::create_proxy_and_stream;
984 use fidl_test_placeholders::{EchoMarker, EchoRequest};
985 use fuchsia_async as fasync;
986 use fuchsia_inspect as inspect;
987 use futures::StreamExt;
988 use zx::{self, HandleBased};
989
990 #[::fuchsia::test]
991 fn test_counter_zero_initialization() {
992 let (_endpoint, endpoint) = zx::Channel::create();
993 let (_channel, counter) =
994 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
995 assert_eq!(counter.read(), Ok(0));
996 }
997
998 #[::fuchsia::test]
999 fn test_counter_initialization() {
1000 let (_endpoint, endpoint) = zx::Channel::create();
1001 let (_channel, counter) =
1002 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
1003 assert_eq!(counter.read(), Ok(1));
1004 }
1005
1006 #[::fuchsia::test]
1007 async fn test_container_waking_proxy() {
1008 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
1009 let server_task = fasync::Task::spawn(async move {
1010 let request = stream.next().await.unwrap().unwrap();
1011 match request {
1012 EchoRequest::EchoString { value, responder } => {
1013 responder.send(value.as_deref()).unwrap();
1014 }
1015 }
1016 });
1017
1018 let counter = zx::Counter::create();
1019 counter.add(5).unwrap();
1020 assert_eq!(counter.read(), Ok(5));
1021
1022 let waking_proxy = ContainerWakingProxy {
1023 counter: OwnedMessageCounter::new(
1024 "test_proxy",
1025 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1026 ),
1027 proxy,
1028 };
1029
1030 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
1031
1032 assert_eq!(counter.read(), Ok(4));
1034
1035 let response = response_future.await.unwrap();
1036 assert_eq!(response.as_deref(), Some("hello"));
1037
1038 server_task.await;
1039
1040 assert_eq!(counter.read(), Ok(4));
1041 drop(waking_proxy);
1042 assert_eq!(counter.read(), Ok(0));
1043 }
1044
1045 #[::fuchsia::test]
1046 async fn test_container_waking_stream() {
1047 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
1048 let client_task = fasync::Task::spawn(async move {
1049 let response = proxy.echo_string(Some("hello")).await.unwrap();
1050 assert_eq!(response.as_deref(), Some("hello"));
1051 });
1052
1053 let counter = zx::Counter::create();
1054 counter.add(5).unwrap();
1055 assert_eq!(counter.read(), Ok(5));
1056
1057 let mut waking_stream = ContainerWakingStream {
1058 counter: OwnedMessageCounter::new(
1059 "test_stream",
1060 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1061 ),
1062 stream,
1063 };
1064
1065 let request_future = waking_stream.next();
1066
1067 assert_eq!(counter.read(), Ok(4));
1069
1070 let request = request_future.await.unwrap().unwrap();
1071 match request {
1072 EchoRequest::EchoString { value, responder } => {
1073 assert_eq!(value.as_deref(), Some("hello"));
1074 responder.send(value.as_deref()).unwrap();
1075 }
1076 }
1077
1078 client_task.await;
1079
1080 assert_eq!(counter.read(), Ok(4));
1081 drop(waking_stream);
1082 assert_eq!(counter.read(), Ok(0));
1083 }
1084
1085 #[::fuchsia::test]
1086 async fn test_message_counters_inspect() {
1087 let power_manager = SuspendResumeManager::default();
1088 let inspector = inspect::component::inspector();
1089
1090 let zx_counter = zx::Counter::create();
1091 let counter_handle = power_manager.add_message_counter(
1092 "test_counter",
1093 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1094 );
1095
1096 zx_counter.add(1).unwrap();
1097
1098 assert_data_tree!(inspector, root: contains {
1099 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1100 });
1101
1102 zx_counter.add(1).unwrap();
1103 assert_data_tree!(inspector, root: contains {
1104 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1105 });
1106
1107 drop(counter_handle);
1108 assert_data_tree!(inspector, root: contains {
1109 message_counters: Vec::<String>::new(),
1110 });
1111 }
1112
1113 #[::fuchsia::test]
1114 fn test_shared_message_counter() {
1115 let zx_counter = zx::Counter::create();
1117 let owned_counter = OwnedMessageCounter::new(
1118 "test_shared_counter",
1119 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1120 );
1121 zx_counter.add(5).unwrap();
1122 assert_eq!(zx_counter.read(), Ok(5));
1123
1124 let shared_counter = owned_counter.share(false);
1126 assert_eq!(zx_counter.read(), Ok(5));
1127
1128 drop(shared_counter);
1130 assert_eq!(zx_counter.read(), Ok(4));
1131
1132 let shared_counter_2 = owned_counter.share(true);
1134 assert_eq!(zx_counter.read(), Ok(5));
1135
1136 drop(shared_counter_2);
1138 assert_eq!(zx_counter.read(), Ok(4));
1139
1140 let shared_counter_3 = owned_counter.share(false);
1142 assert_eq!(zx_counter.read(), Ok(4));
1143
1144 drop(owned_counter);
1146 assert_eq!(zx_counter.read(), Ok(0));
1147
1148 drop(shared_counter_3);
1150 assert_eq!(zx_counter.read(), Ok(0));
1151 }
1152
1153 #[::fuchsia::test]
1154 async fn test_container_waking_event_termination() {
1155 let stream = futures::stream::iter(vec![0]).fuse();
1156 let counter = zx::Counter::create();
1157 counter.add(2).unwrap();
1158 assert_eq!(counter.read(), Ok(2));
1159 let mut waking_stream = ContainerWakingStream {
1160 counter: OwnedMessageCounter::new(
1161 "test_stream",
1162 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1163 ),
1164 stream,
1165 };
1166
1167 assert_eq!(waking_stream.next().await, Some(0));
1168 assert_eq!(counter.read(), Ok(1));
1169
1170 assert_eq!(waking_stream.next().await, None);
1171 assert_eq!(waking_stream.next().await, None);
1172 assert_eq!(counter.read(), Ok(0));
1174 }
1175
1176 #[::fuchsia::test]
1177 fn test_external_wake_source_aborts_suspend() {
1178 let manager = SuspendResumeManager::default();
1179 let event = zx::Event::create();
1180 let signals = zx::Signals::USER_0;
1181
1182 let res = manager.add_external_wake_source(
1187 event.duplicate(zx::Rights::SAME_RIGHTS).unwrap().into_handle(),
1188 signals,
1189 "test_external".to_string(),
1190 );
1191
1192 if res.is_err() {
1193 println!(
1194 "Skipping test_external_wake_source_aborts_suspend because runner connection failed: {:?}",
1195 res
1196 );
1197 return;
1198 }
1199
1200 event.signal(zx::Signals::empty(), signals).unwrap();
1202
1203 let state = manager.lock();
1204 assert!(state.external_wake_sources.contains_key(&event.koid().unwrap()));
1205 }
1206}