1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7
8use std::collections::{HashMap, HashSet};
9use std::future::Future;
10use std::sync::{Arc, Weak};
11
12use anyhow::{Context, anyhow};
13use fidl::endpoints::Proxy;
14use fuchsia_component::client::connect_to_protocol_sync;
15use fuchsia_inspect::ArrayProperty;
16use futures::stream::{FusedStream, Next};
17use futures::{FutureExt, StreamExt};
18use starnix_logging::{log_info, log_warn};
19use starnix_sync::{
20 EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
21 RwLockReadGuard,
22};
23use starnix_uapi::arc_key::WeakKey;
24use starnix_uapi::errors::Errno;
25use starnix_uapi::{errno, error};
26use std::collections::VecDeque;
27use std::fmt;
28use zx::{HandleBased, Peered};
29use {
30 fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
31 fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
32};
33
34#[derive(Debug, Default)]
36pub struct WakeupSource {
37 active_count: u64,
39
40 event_count: u64,
43
44 wakeup_count: u64,
50
51 expire_count: u64,
53
54 active_since: zx::MonotonicInstant,
57
58 total_time: zx::MonotonicDuration,
60
61 max_time: zx::MonotonicDuration,
63
64 last_change: zx::MonotonicInstant,
66}
67
68#[derive(Debug, Clone, Eq, PartialEq, Hash)]
69pub enum WakeupSourceOrigin {
70 WakeLock(String),
71 Epoll(String),
72 HAL(String),
73}
74
75impl std::string::ToString for WakeupSourceOrigin {
76 fn to_string(&self) -> String {
77 match self {
78 WakeupSourceOrigin::WakeLock(lock) => lock.clone(),
79 WakeupSourceOrigin::Epoll(lock) => format!("[epoll] {}", lock),
80 WakeupSourceOrigin::HAL(lock) => format!("[HAL] {}", lock),
81 }
82 }
83}
84
85pub struct SuspendResumeManager {
87 inner: Arc<Mutex<SuspendResumeManagerInner>>,
89
90 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
93
94 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
96}
97
98pub struct SuspendResumeManagerInner {
101 suspend_stats: SuspendStats,
103 sync_on_suspend_enabled: bool,
104
105 suspend_events: VecDeque<SuspendEvent>,
106
107 wakeup_sources: HashMap<WakeupSourceOrigin, WakeupSource>,
109
110 active_lock_reader: zx::EventPair,
113
114 active_lock_writer: zx::EventPair,
118
119 active_wakeup_source_count: u64,
121
122 total_wakeup_source_event_count: u64,
125
126 external_wake_sources: HashMap<zx::Koid, ExternalWakeSource>,
128}
129
130#[derive(Debug)]
131struct ExternalWakeSource {
132 handle: zx::NullableHandle,
134 signals: zx::Signals,
136 name: String,
138}
139
140impl SuspendResumeManager {
141 pub fn add_external_wake_source(
142 &self,
143 handle: zx::NullableHandle,
144 signals: zx::Signals,
145 name: String,
146 ) -> Result<(), Errno> {
147 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
148 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
149 manager
150 .add_wake_source(frunner::ManagerAddWakeSourceRequest {
151 container_job: Some(
152 fuchsia_runtime::job_default()
153 .duplicate(zx::Rights::SAME_RIGHTS)
154 .expect("Failed to dup handle"),
155 ),
156 name: Some(name.clone()),
157 handle: Some(
158 handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
159 ),
160 signals: Some(signals.bits()),
161 ..Default::default()
162 })
163 .map_err(|e| errno!(EIO, e))?;
164
165 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
166 self.lock().external_wake_sources.insert(
167 koid,
168 ExternalWakeSource {
169 handle: handle.duplicate(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
170 signals,
171 name,
172 },
173 );
174 Ok(())
175 }
176
177 pub fn remove_external_wake_source(&self, handle: zx::NullableHandle) -> Result<(), Errno> {
178 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
179 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
180
181 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
182 self.lock().external_wake_sources.remove(&koid);
183
184 manager
185 .remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
186 container_job: Some(
187 fuchsia_runtime::job_default()
188 .duplicate(zx::Rights::SAME_RIGHTS)
189 .expect("Failed to dup handle"),
190 ),
191 handle: Some(handle),
192 ..Default::default()
193 })
194 .map_err(|e| errno!(EIO, e))?;
195
196 Ok(())
197 }
198}
199
200pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
201
202#[derive(Clone, Debug)]
203pub enum SuspendEvent {
204 Attempt { time: zx::BootInstant, state: String },
205 Resume { time: zx::BootInstant, reason: String },
206 Fail { time: zx::BootInstant, wakeup_sources: Option<Vec<String>> },
207}
208
209const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
211
212impl Default for SuspendResumeManagerInner {
213 fn default() -> Self {
214 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
215 active_lock_writer
216 .signal_peer(zx::Signals::empty(), zx::Signals::USER_0)
217 .expect("Failed to signal peer");
218 Self {
219 suspend_stats: Default::default(),
220 sync_on_suspend_enabled: false,
221 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
222 wakeup_sources: Default::default(),
223 active_lock_reader,
224 active_lock_writer,
225 active_wakeup_source_count: 0,
226 total_wakeup_source_event_count: 0,
227 external_wake_sources: Default::default(),
228 }
229 }
230}
231
232impl SuspendResumeManagerInner {
233 pub fn can_suspend(&self) -> bool {
235 self.active_wakeup_source_count == 0
236 }
237
238 pub fn active_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
239 self.wakeup_sources
240 .iter()
241 .filter_map(|(name, source)| match name {
242 WakeupSourceOrigin::WakeLock(_) => {
243 if source.active_since > zx::MonotonicInstant::ZERO {
244 Some(name.clone())
245 } else {
246 None
247 }
248 }
249 _ => None,
250 })
251 .collect()
252 }
253
254 pub fn inactive_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
255 self.wakeup_sources
256 .iter()
257 .filter_map(|(name, source)| match name {
258 WakeupSourceOrigin::WakeLock(_) => {
259 if source.active_since == zx::MonotonicInstant::ZERO {
260 Some(name.clone())
261 } else {
262 None
263 }
264 }
265 _ => None,
266 })
267 .collect()
268 }
269
270 fn signal_wake_events(&mut self) {
272 let (clear_mask, set_mask) = if self.active_wakeup_source_count == 0 {
273 (zx::Signals::EVENT_SIGNALED, zx::Signals::USER_0)
274 } else {
275 (zx::Signals::USER_0, zx::Signals::EVENT_SIGNALED)
276 };
277 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
278 }
279
280 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
281 where
282 UpdateFn: FnOnce(&mut SuspendStats),
283 {
284 update(&mut self.suspend_stats);
285 }
286
287 fn add_suspend_event(&mut self, event: SuspendEvent) {
288 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
289 self.suspend_events.pop_front();
290 }
291 self.suspend_events.push_back(event);
292 }
293
294 fn record_suspend_events(&self, node: &inspect::Node) {
295 let events_node = node.create_child("suspend_events");
296 for (i, event) in self.suspend_events.iter().enumerate() {
297 let child = events_node.create_child(i.to_string());
298 match event {
299 SuspendEvent::Attempt { time, state } => {
300 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
301 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
302 }
303 SuspendEvent::Resume { time, reason } => {
304 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
305 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
306 }
307 SuspendEvent::Fail { time, wakeup_sources } => {
308 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
309 if let Some(names) = wakeup_sources {
310 let names_array =
311 child.create_string_array(fobs::WAKEUP_SOURCES_NAME, names.len());
312 for (i, name) in names.iter().enumerate() {
313 names_array.set(i, name);
314 }
315 child.record(names_array);
316 }
317 }
318 }
319 events_node.record(child);
320 }
321 node.record(events_node);
322 }
323
324 fn record_wakeup_sources(&self, node: &inspect::Node) {
325 let wakeup_node = node.create_child("wakeup_sources");
326 for (name, source) in self.wakeup_sources.iter() {
327 let child = wakeup_node.create_child(name.to_string());
328 child.record_uint("active_count", source.active_count);
329 child.record_uint("event_count", source.event_count);
330 child.record_uint("wakeup_count", source.wakeup_count);
331 child.record_uint("expire_count", source.expire_count);
332 child.record_int("active_since (ns)", source.active_since.into_nanos());
333 child.record_int("total_time (ms)", source.total_time.into_millis());
334 child.record_int("max_time (ms)", source.max_time.into_millis());
335 child.record_int("last_change (ns)", source.last_change.into_nanos());
336 wakeup_node.record(child);
337 }
338 node.record(wakeup_node);
339 }
340}
341
342pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
343
344impl Default for SuspendResumeManager {
345 fn default() -> Self {
346 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
347 Default::default();
348 let message_counters_clone = message_counters.clone();
349 let root = inspect::component::inspector().root();
350 root.record_lazy_values("message_counters", move || {
351 let message_counters_clone = message_counters_clone.clone();
352 async move {
353 let inspector = fuchsia_inspect::Inspector::default();
354 let root = inspector.root();
355 let mut message_counters = message_counters_clone.lock();
356 message_counters.retain(|c| c.0.upgrade().is_some());
357 let message_counters_inspect =
358 root.create_string_array("message_counters", message_counters.len());
359 for (i, c) in message_counters.iter().enumerate() {
360 let counter = c.0.upgrade().expect("lost counter should be retained");
361 message_counters_inspect.set(i, counter.to_string());
362 }
363 root.record(message_counters_inspect);
364 Ok(inspector)
365 }
366 .boxed()
367 });
368 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
369 let inner_clone = inner.clone();
370 root.record_lazy_child("wakeup_sources", move || {
371 let inner = inner_clone.clone();
372 async move {
373 let inspector = fuchsia_inspect::Inspector::default();
374 let root = inspector.root();
375 let state = inner.lock();
376
377 state.record_suspend_events(root);
378 state.record_wakeup_sources(root);
379
380 Ok(inspector)
381 }
382 .boxed()
383 });
384 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
385 }
386}
387
388impl SuspendResumeManager {
389 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
391 self.inner.lock()
392 }
393
394 pub fn init(
396 self: &SuspendResumeManagerHandle,
397 system_task: &CurrentTask,
398 ) -> Result<(), anyhow::Error> {
399 let handoff = system_task
400 .kernel()
401 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
402 .into_sync_proxy();
403 match handoff.take(zx::MonotonicInstant::INFINITE) {
404 Ok(parent_lease) => {
405 let parent_lease = parent_lease
406 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
407 drop(parent_lease)
408 }
409 Err(e) => {
410 if e.is_closed() {
411 log_warn!(
412 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
413 );
414 } else {
415 return Err(e).context("Handoff::Take");
416 }
417 }
418 }
419 Ok(())
420 }
421
422 pub fn activate_wakeup_source(&self, origin: WakeupSourceOrigin) -> bool {
423 let mut state = self.lock();
424 let did_activate = {
425 let entry = state.wakeup_sources.entry(origin).or_default();
426 let now = zx::MonotonicInstant::get();
427 entry.active_count += 1;
428 entry.event_count += 1;
429 entry.last_change = now;
430 if entry.active_since == zx::MonotonicInstant::ZERO {
431 entry.active_since = now;
432 true
433 } else {
434 false
435 }
436 };
437 if did_activate {
438 state.active_wakeup_source_count += 1;
439 state.signal_wake_events();
440 }
441 did_activate
442 }
443
444 pub fn deactivate_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
445 self.remove_wakeup_source(origin, false)
446 }
447
448 pub fn timeout_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
449 self.remove_wakeup_source(origin, true)
450 }
451
452 fn remove_wakeup_source(&self, origin: &WakeupSourceOrigin, timed_out: bool) -> bool {
453 let mut state = self.lock();
454 let removed = match state.wakeup_sources.get_mut(origin) {
455 Some(entry) if entry.active_since != zx::MonotonicInstant::ZERO => {
456 if timed_out {
457 entry.expire_count += 1;
458 }
459
460 let now = zx::MonotonicInstant::get();
461 let duration = now - entry.active_since;
462 entry.total_time += duration;
463 entry.max_time = std::cmp::max(duration, entry.max_time);
464 entry.last_change = now;
465 entry.active_since = zx::MonotonicInstant::ZERO;
466
467 true
468 }
469 _ => false,
470 };
471 if removed {
472 state.active_wakeup_source_count -= 1;
473 state.total_wakeup_source_event_count += 1;
474 state.signal_wake_events();
475 }
476 removed
477 }
478
479 pub fn add_message_counter(
480 &self,
481 name: &str,
482 counter: Option<zx::Counter>,
483 ) -> OwnedMessageCounterHandle {
484 let container_counter = OwnedMessageCounter::new(name, counter);
485 let mut message_counters = self.message_counters.lock();
486 message_counters.insert(WeakKey::from(&container_counter));
487 message_counters.retain(|c| c.0.upgrade().is_some());
488 container_counter
489 }
490
491 pub fn has_nonzero_message_counter(&self) -> bool {
492 self.message_counters.lock().iter().any(|c| {
493 let Some(c) = c.0.upgrade() else {
494 return false;
495 };
496 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
497 })
498 }
499
500 pub fn duplicate_lock_event(&self) -> zx::EventPair {
503 let state = self.lock();
504 state
505 .active_lock_reader
506 .duplicate_handle(zx::Rights::SAME_RIGHTS)
507 .expect("Failed to duplicate handle")
508 }
509
510 pub fn suspend_stats(&self) -> SuspendStats {
512 self.lock().suspend_stats.clone()
513 }
514
515 pub fn total_wakeup_events(&self) -> u64 {
516 let state = self.lock();
517 state.total_wakeup_source_event_count + state.suspend_stats.success_count
518 }
519
520 pub fn sync_on_suspend_enabled(&self) -> bool {
524 self.lock().sync_on_suspend_enabled.clone()
525 }
526
527 pub fn set_sync_on_suspend(&self, enable: bool) {
530 self.lock().sync_on_suspend_enabled = enable;
531 }
532
533 pub fn suspend_states(&self) -> HashSet<SuspendState> {
535 HashSet::from([SuspendState::Idle])
537 }
538
539 pub fn suspend(
540 &self,
541 locked: &mut Locked<FileOpsCore>,
542 suspend_state: SuspendState,
543 ) -> Result<(), Errno> {
544 let suspend_start_time = zx::BootInstant::get();
545 let mut state = self.lock();
546 state.add_suspend_event(SuspendEvent::Attempt {
547 time: suspend_start_time,
548 state: suspend_state.to_string(),
549 });
550
551 if !state.can_suspend() {
553 self.report_failed_suspension(state, "kernel wake lock");
554 return error!(EINVAL);
555 }
556
557 let external_wake_source_abort = state.external_wake_sources.values().find_map(|source| {
559 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
560 Some(source.name.clone())
561 } else {
562 None
563 }
564 });
565
566 if let Some(name) = external_wake_source_abort {
567 self.report_failed_suspension(state, &format!("external wake source: {}", name));
568 return error!(EINVAL);
569 }
570
571 std::mem::drop(state);
575
576 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
579
580 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
581 .expect("Failed to connect to manager");
582 fuchsia_trace::duration!("power", "suspend_container:fidl");
583
584 let container_job = Some(
585 fuchsia_runtime::job_default()
586 .duplicate(zx::Rights::SAME_RIGHTS)
587 .expect("Failed to dup handle"),
588 );
589 let wake_lock_event = Some(self.duplicate_lock_event());
590
591 log_info!("Requesting container suspension.");
592 match manager.suspend_container(
593 frunner::ManagerSuspendContainerRequest {
594 container_job,
595 wake_locks: wake_lock_event,
596 ..Default::default()
597 },
598 zx::Instant::INFINITE,
599 ) {
600 Ok(Ok(res)) => {
601 self.report_container_resumed(suspend_start_time, res);
602 }
603 e => {
604 let state = self.lock();
605 self.report_failed_suspension(state, &format!("runner error {:?}", e));
606 return error!(EINVAL);
607 }
608 }
609 Ok(())
610 }
611
612 fn report_container_resumed(
613 &self,
614 suspend_start_time: zx::BootInstant,
615 res: frunner::ManagerSuspendContainerResponse,
616 ) {
617 let wake_time = zx::BootInstant::get();
618 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
621 log_info!("Resuming from container suspension: {:?}", resume_reason);
622 let mut state = self.lock();
623 state.update_suspend_stats(|suspend_stats| {
624 suspend_stats.success_count += 1;
625 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
626 suspend_stats.last_time_in_sleep =
627 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
628 suspend_stats.last_resume_reason = resume_reason.clone();
629 });
630 state.add_suspend_event(SuspendEvent::Resume {
631 time: wake_time,
632 reason: resume_reason.unwrap_or_default(),
633 });
634 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
635 }
636
637 fn report_failed_suspension(
638 &self,
639 mut state: MutexGuard<'_, SuspendResumeManagerInner>,
640 failure_reason: &str,
641 ) {
642 let wake_time = zx::BootInstant::get();
643 state.update_suspend_stats(|suspend_stats| {
644 suspend_stats.fail_count += 1;
645 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
646 suspend_stats.last_resume_reason = None;
647 });
648
649 let wakeup_sources: Vec<String> = state
650 .wakeup_sources
651 .iter_mut()
652 .filter_map(|(origin, source)| {
653 if source.active_since > zx::MonotonicInstant::ZERO {
654 source.wakeup_count += 1;
655 Some(origin.to_string())
656 } else {
657 None
658 }
659 })
660 .collect();
661 let last_resume_reason = format!("Abort: {}", wakeup_sources.join(" "));
662 state.update_suspend_stats(|suspend_stats| {
663 suspend_stats.last_resume_reason = Some(last_resume_reason);
665 });
666
667 log_warn!(
668 "Suspend failed due to {:?}. Here are the active wakeup sources: {:?}",
669 failure_reason,
670 wakeup_sources,
671 );
672 state.add_suspend_event(SuspendEvent::Fail {
673 time: wake_time,
674 wakeup_sources: Some(wakeup_sources),
675 });
676 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
677 }
678
679 pub fn acquire_ebpf_suspend_lock<'a, L>(
680 &'a self,
681 locked: &'a mut Locked<L>,
682 ) -> EbpfSuspendGuard<'a>
683 where
684 L: LockBefore<EbpfSuspendLock>,
685 {
686 self.ebpf_suspend_lock.read(locked)
687 }
688}
689
690pub trait OnWakeOps: Send + Sync {
691 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
692}
693
694pub fn create_proxy_for_wake_events_counter_zero(
712 remote_channel: zx::Channel,
713 name: String,
714) -> (zx::Channel, zx::Counter) {
715 let (local_proxy, kernel_channel) = zx::Channel::create();
716 let counter = zx::Counter::create();
717
718 let local_counter =
719 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
720
721 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
722 .expect("failed");
723 manager
724 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
725 container_job: Some(
726 fuchsia_runtime::job_default()
727 .duplicate(zx::Rights::SAME_RIGHTS)
728 .expect("Failed to dup handle"),
729 ),
730 container_channel: Some(kernel_channel),
731 remote_channel: Some(remote_channel),
732 counter: Some(counter),
733 name: Some(name),
734 ..Default::default()
735 })
736 .expect("Failed to create proxy");
737
738 (local_proxy, local_counter)
739}
740
741pub fn create_proxy_for_wake_events_counter(
760 remote_channel: zx::Channel,
761 name: String,
762) -> (zx::Channel, zx::Counter) {
763 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
764
765 counter.add(1).expect("Failed to add to counter");
768
769 (proxy, counter)
770}
771
772pub fn mark_proxy_message_handled(counter: &zx::Counter) {
777 counter.add(-1).expect("Failed to decrement counter");
778}
779
780pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
782 counter.write(0).expect("Failed to decrement counter");
783}
784
785pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
789 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
790 .expect("failed");
791 manager
792 .register_wake_watcher(
793 frunner::ManagerRegisterWakeWatcherRequest {
794 watcher: Some(watcher),
795 ..Default::default()
796 },
797 zx::Instant::INFINITE,
798 )
799 .expect("Failed to register wake watcher");
800}
801
802#[derive(Debug)]
811pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
812
813impl Drop for SharedMessageCounter {
814 fn drop(&mut self) {
815 if let Some(message_counter) = self.0.upgrade() {
816 message_counter.mark_handled();
817 }
818 }
819}
820
821pub struct OwnedMessageCounter {
826 name: String,
827 counter: Option<zx::Counter>,
828}
829pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
830
831impl Drop for OwnedMessageCounter {
832 fn drop(&mut self) {
837 self.counter.as_ref().map(mark_all_proxy_messages_handled);
838 }
839}
840
841impl OwnedMessageCounter {
842 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
843 Arc::new(Self { name: name.to_string(), counter })
844 }
845
846 pub fn mark_handled(&self) {
851 self.counter.as_ref().map(mark_proxy_message_handled);
852 }
853
854 pub fn mark_pending(&self) {
858 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
859 }
860
861 pub fn share(
865 self: &OwnedMessageCounterHandle,
866 new_pending_message: bool,
867 ) -> SharedMessageCounter {
868 if new_pending_message {
869 self.mark_pending();
870 }
871 SharedMessageCounter(Arc::downgrade(self))
872 }
873}
874
875impl fmt::Display for OwnedMessageCounter {
876 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
878 }
879}
880
881pub struct ContainerWakingProxy<P: Proxy> {
886 counter: OwnedMessageCounterHandle,
887 proxy: P,
888}
889
890impl<P: Proxy> ContainerWakingProxy<P> {
891 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
892 Self { counter, proxy }
893 }
894
895 pub fn call<T, F, R>(&self, future: F) -> R
899 where
900 F: FnOnce(&P) -> R,
901 R: Future<Output = T>,
902 {
903 let f = future(&self.proxy);
911 self.counter.mark_handled();
912 f
913 }
914}
915
916pub struct ContainerWakingStream<S: FusedStream + Unpin> {
921 counter: OwnedMessageCounterHandle,
922 stream: S,
923}
924
925impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
926 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
927 Self { counter, stream }
928 }
929
930 pub fn next(&mut self) -> Next<'_, S> {
934 let is_terminated = self.stream.is_terminated();
936 let next = self.stream.next();
937 if !is_terminated {
938 self.counter.mark_handled();
939 }
940 next
941 }
942}
943
944#[cfg(test)]
945mod test {
946 use super::*;
947 use diagnostics_assertions::assert_data_tree;
948 use fidl::endpoints::create_proxy_and_stream;
949 use fidl_test_placeholders::{EchoMarker, EchoRequest};
950 use futures::StreamExt;
951 use zx::{self, HandleBased};
952 use {fuchsia_async as fasync, fuchsia_inspect as inspect};
953
954 #[::fuchsia::test]
955 fn test_counter_zero_initialization() {
956 let (_endpoint, endpoint) = zx::Channel::create();
957 let (_channel, counter) =
958 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
959 assert_eq!(counter.read(), Ok(0));
960 }
961
962 #[::fuchsia::test]
963 fn test_counter_initialization() {
964 let (_endpoint, endpoint) = zx::Channel::create();
965 let (_channel, counter) =
966 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
967 assert_eq!(counter.read(), Ok(1));
968 }
969
970 #[::fuchsia::test]
971 async fn test_container_waking_proxy() {
972 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
973 let server_task = fasync::Task::spawn(async move {
974 let request = stream.next().await.unwrap().unwrap();
975 match request {
976 EchoRequest::EchoString { value, responder } => {
977 responder.send(value.as_deref()).unwrap();
978 }
979 }
980 });
981
982 let counter = zx::Counter::create();
983 counter.add(5).unwrap();
984 assert_eq!(counter.read(), Ok(5));
985
986 let waking_proxy = ContainerWakingProxy {
987 counter: OwnedMessageCounter::new(
988 "test_proxy",
989 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
990 ),
991 proxy,
992 };
993
994 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
995
996 assert_eq!(counter.read(), Ok(4));
998
999 let response = response_future.await.unwrap();
1000 assert_eq!(response.as_deref(), Some("hello"));
1001
1002 server_task.await;
1003
1004 assert_eq!(counter.read(), Ok(4));
1005 drop(waking_proxy);
1006 assert_eq!(counter.read(), Ok(0));
1007 }
1008
1009 #[::fuchsia::test]
1010 async fn test_container_waking_stream() {
1011 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
1012 let client_task = fasync::Task::spawn(async move {
1013 let response = proxy.echo_string(Some("hello")).await.unwrap();
1014 assert_eq!(response.as_deref(), Some("hello"));
1015 });
1016
1017 let counter = zx::Counter::create();
1018 counter.add(5).unwrap();
1019 assert_eq!(counter.read(), Ok(5));
1020
1021 let mut waking_stream = ContainerWakingStream {
1022 counter: OwnedMessageCounter::new(
1023 "test_stream",
1024 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1025 ),
1026 stream,
1027 };
1028
1029 let request_future = waking_stream.next();
1030
1031 assert_eq!(counter.read(), Ok(4));
1033
1034 let request = request_future.await.unwrap().unwrap();
1035 match request {
1036 EchoRequest::EchoString { value, responder } => {
1037 assert_eq!(value.as_deref(), Some("hello"));
1038 responder.send(value.as_deref()).unwrap();
1039 }
1040 }
1041
1042 client_task.await;
1043
1044 assert_eq!(counter.read(), Ok(4));
1045 drop(waking_stream);
1046 assert_eq!(counter.read(), Ok(0));
1047 }
1048
1049 #[::fuchsia::test]
1050 async fn test_message_counters_inspect() {
1051 let power_manager = SuspendResumeManager::default();
1052 let inspector = inspect::component::inspector();
1053
1054 let zx_counter = zx::Counter::create();
1055 let counter_handle = power_manager.add_message_counter(
1056 "test_counter",
1057 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1058 );
1059
1060 zx_counter.add(1).unwrap();
1061
1062 assert_data_tree!(inspector, root: contains {
1063 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1064 });
1065
1066 zx_counter.add(1).unwrap();
1067 assert_data_tree!(inspector, root: contains {
1068 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1069 });
1070
1071 drop(counter_handle);
1072 assert_data_tree!(inspector, root: contains {
1073 message_counters: Vec::<String>::new(),
1074 });
1075 }
1076
1077 #[::fuchsia::test]
1078 fn test_shared_message_counter() {
1079 let zx_counter = zx::Counter::create();
1081 let owned_counter = OwnedMessageCounter::new(
1082 "test_shared_counter",
1083 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1084 );
1085 zx_counter.add(5).unwrap();
1086 assert_eq!(zx_counter.read(), Ok(5));
1087
1088 let shared_counter = owned_counter.share(false);
1090 assert_eq!(zx_counter.read(), Ok(5));
1091
1092 drop(shared_counter);
1094 assert_eq!(zx_counter.read(), Ok(4));
1095
1096 let shared_counter_2 = owned_counter.share(true);
1098 assert_eq!(zx_counter.read(), Ok(5));
1099
1100 drop(shared_counter_2);
1102 assert_eq!(zx_counter.read(), Ok(4));
1103
1104 let shared_counter_3 = owned_counter.share(false);
1106 assert_eq!(zx_counter.read(), Ok(4));
1107
1108 drop(owned_counter);
1110 assert_eq!(zx_counter.read(), Ok(0));
1111
1112 drop(shared_counter_3);
1114 assert_eq!(zx_counter.read(), Ok(0));
1115 }
1116
1117 #[::fuchsia::test]
1118 async fn test_container_waking_event_termination() {
1119 let stream = futures::stream::iter(vec![0]).fuse();
1120 let counter = zx::Counter::create();
1121 counter.add(2).unwrap();
1122 assert_eq!(counter.read(), Ok(2));
1123 let mut waking_stream = ContainerWakingStream {
1124 counter: OwnedMessageCounter::new(
1125 "test_stream",
1126 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1127 ),
1128 stream,
1129 };
1130
1131 assert_eq!(waking_stream.next().await, Some(0));
1132 assert_eq!(counter.read(), Ok(1));
1133
1134 assert_eq!(waking_stream.next().await, None);
1135 assert_eq!(waking_stream.next().await, None);
1136 assert_eq!(counter.read(), Ok(0));
1138 }
1139
1140 #[::fuchsia::test]
1141 fn test_external_wake_source_aborts_suspend() {
1142 let manager = SuspendResumeManager::default();
1143 let event = zx::Event::create();
1144 let signals = zx::Signals::USER_0;
1145
1146 let res = manager.add_external_wake_source(
1151 event.duplicate(zx::Rights::SAME_RIGHTS).unwrap().into_handle(),
1152 signals,
1153 "test_external".to_string(),
1154 );
1155
1156 if res.is_err() {
1157 println!(
1158 "Skipping test_external_wake_source_aborts_suspend because runner connection failed: {:?}",
1159 res
1160 );
1161 return;
1162 }
1163
1164 event.signal(zx::Signals::empty(), signals).unwrap();
1166
1167 let state = manager.lock();
1168 assert!(state.external_wake_sources.contains_key(&event.koid().unwrap()));
1169 }
1170}