1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7use crate::vfs::EpollKey;
8
9use std::cmp::min;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::sync::{Arc, Weak};
13
14use anyhow::{Context, anyhow};
15use fidl::endpoints::Proxy;
16use fuchsia_component::client::connect_to_protocol_sync;
17use fuchsia_inspect::ArrayProperty;
18use futures::stream::{FusedStream, Next};
19use futures::{FutureExt, StreamExt};
20use itertools::Itertools;
21use starnix_logging::{log_info, log_warn};
22use starnix_sync::{
23 EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
24 RwLockReadGuard,
25};
26use starnix_task_command::TaskCommand;
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::{HandleBased, Peered};
33use {
34 fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
35 fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
36};
37
38const MAX_PAST_DURATIONS_COUNT: usize = 10;
40
41#[derive(Debug, Hash)]
43struct EpollInfo {
44 command: TaskCommand,
46 added_timestamp: Option<zx::BootInstant>,
48 past_durations: VecDeque<zx::BootDuration>,
50}
51
52impl EpollInfo {
53 fn new(
54 command: TaskCommand,
55 added_timestamp: zx::BootInstant,
56 past_durations: Option<VecDeque<zx::BootDuration>>,
57 ) -> Self {
58 Self {
59 command,
60 added_timestamp: Some(added_timestamp),
61 past_durations: past_durations
62 .unwrap_or_else(|| VecDeque::with_capacity(MAX_PAST_DURATIONS_COUNT)),
63 }
64 }
65
66 fn update_lifecycle(self, end_timestamp: zx::BootInstant) -> Self {
69 let past_durations = if let Some(start_timestamp) = self.added_timestamp {
70 let duration = end_timestamp - start_timestamp;
71 let mut past_durations = self.past_durations;
72 past_durations.push_front(duration);
73 past_durations.into_iter().take(MAX_PAST_DURATIONS_COUNT).collect()
74 } else {
75 VecDeque::with_capacity(MAX_PAST_DURATIONS_COUNT)
76 };
77 Self { added_timestamp: None, past_durations, ..self }
78 }
79
80 fn record_into(&self, node: &inspect::Node, stop_timestamp: zx::BootInstant) {
83 let this_node = node.create_child(&self.command.to_string());
84 if let Some(added_timestamp) = self.added_timestamp {
85 this_node.record_int("created_timestamp_ns", added_timestamp.into_nanos());
86 let duration = stop_timestamp - added_timestamp;
87 this_node.record_int("this_duration_ns", duration.into_nanos());
88 }
89 let len = std::cmp::min(self.past_durations.len(), MAX_PAST_DURATIONS_COUNT);
90 let past_durations_node = this_node.create_int_array("past_durations_ns", len);
91 let _ = self
92 .past_durations
93 .iter()
94 .take(len)
95 .enumerate()
96 .map(|(i, d)| past_durations_node.set(i, d.into_nanos()))
97 .collect::<Vec<_>>();
98 this_node.record(past_durations_node);
99 node.record(this_node);
100 }
101}
102
103pub struct SuspendResumeManager {
105 inner: Arc<Mutex<SuspendResumeManagerInner>>,
107
108 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
111
112 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
114}
115
116pub struct SuspendResumeManagerInner {
118 suspend_stats: SuspendStats,
120 sync_on_suspend_enabled: bool,
121
122 suspend_events: VecDeque<SuspendEvent>,
123
124 active_locks: HashMap<String, LockSource>,
127 inactive_locks: HashSet<String>,
128
129 active_epolls: HashMap<EpollKey, EpollInfo>,
132
133 inactive_epolls: HashMap<EpollKey, EpollInfo>,
135
136 active_lock_reader: zx::EventPair,
139
140 active_lock_writer: zx::EventPair,
144}
145
146pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
147
148pub enum LockSource {
150 WakeLockFile,
151 ContainerPowerController,
152}
153
154#[derive(Clone, Debug)]
155pub enum SuspendEvent {
156 Attempt {
157 time: zx::BootInstant,
158 state: String,
159 },
160 Resume {
161 time: zx::BootInstant,
162 reason: String,
163 },
164 Fail {
165 time: zx::BootInstant,
166 wake_locks: Option<Vec<String>>,
167 epolls: Option<Vec<TaskCommand>>,
168 },
169}
170
171const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
173
174const INSPECT_MAX_WAKE_LOCK_NAMES: usize = 64;
176const INSPECT_MAX_EPOLLS: usize = 64;
177
178impl Default for SuspendResumeManagerInner {
179 fn default() -> Self {
180 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
181 Self {
182 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
183 suspend_stats: Default::default(),
184 sync_on_suspend_enabled: Default::default(),
185 active_locks: Default::default(),
186 inactive_locks: Default::default(),
187 active_epolls: Default::default(),
188 inactive_epolls: Default::default(),
189 active_lock_reader,
190 active_lock_writer,
191 }
192 }
193}
194
195impl SuspendResumeManagerInner {
196 fn can_suspend(&self) -> bool {
198 self.active_locks.is_empty() && self.active_epolls.is_empty()
199 }
200
201 pub fn active_wake_locks(&self) -> Vec<String> {
202 Vec::from_iter(self.active_locks.keys().cloned())
203 }
204
205 pub fn inactive_wake_locks(&self) -> Vec<String> {
206 Vec::from_iter(self.inactive_locks.clone())
207 }
208
209 fn active_epolls(&self) -> Vec<TaskCommand> {
210 Vec::from_iter(self.active_epolls.values().map(|e| &e.command).cloned())
211 }
212
213 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
214 where
215 UpdateFn: FnOnce(&mut SuspendStats),
216 {
217 update(&mut self.suspend_stats);
218 }
219
220 fn signal_wake_events(&mut self) {
222 let (clear_mask, set_mask) =
223 if self.active_locks.is_empty() && self.active_epolls.is_empty() {
224 (zx::Signals::EVENT_SIGNALED, zx::Signals::empty())
225 } else {
226 (zx::Signals::empty(), zx::Signals::EVENT_SIGNALED)
227 };
228 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
229 }
230
231 fn record_active_locks(&self, node: &inspect::Node) {
232 let active_locks = &self.active_locks;
233 let len = min(active_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
234 let active_wake_locks = node.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, len);
235 for (i, name) in active_locks.keys().sorted().take(len).enumerate() {
236 if let Some(src) = active_locks.get(name) {
237 active_wake_locks.set(i, format!("{} (source {})", name, src));
238 }
239 }
240 node.record(active_wake_locks);
241 }
242
243 fn record_inactive_locks(&self, node: &inspect::Node) {
244 let inactive_locks = &self.inactive_locks;
245 let len = min(inactive_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
246 let inactive_wake_locks = node.create_string_array(fobs::INACTIVE_WAKE_LOCK_NAMES, len);
247 for (i, name) in inactive_locks.iter().sorted().take(len).enumerate() {
248 inactive_wake_locks.set(i, name);
249 }
250 node.record(inactive_wake_locks);
251 }
252
253 fn record_active_epolls(&self, node: &inspect::Node) {
254 let active_epolls = &self.active_epolls;
255 let len = min(active_epolls.len(), INSPECT_MAX_EPOLLS);
256 let active_epolls_node = node.create_child(fobs::ACTIVE_EPOLLS);
257 let now = zx::BootInstant::get();
258 for key in active_epolls.keys().take(len) {
259 if let Some(epoll_info) = active_epolls.get(key) {
260 epoll_info.record_into(&active_epolls_node, now);
261 }
262 }
263 node.record(active_epolls_node);
264 }
265
266 fn record_inactive_epolls(&self, node: &inspect::Node) {
267 let inactive_epolls = &self.inactive_epolls;
268 let len = min(inactive_epolls.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
269 let inactive_epolls_node = node.create_child(fobs::INACTIVE_EPOLLS);
270 let now = zx::BootInstant::get();
271 for epoll in inactive_epolls.values().take(len) {
272 epoll.record_into(&inactive_epolls_node, now);
273 }
274 node.record(inactive_epolls_node);
275 }
276
277 fn add_suspend_event(&mut self, event: SuspendEvent) {
278 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
279 self.suspend_events.pop_front();
280 }
281 self.suspend_events.push_back(event);
282 }
283
284 fn record_suspend_events(&self, node: &inspect::Node) {
285 let events_node = node.create_child("suspend_events");
286 for (i, event) in self.suspend_events.iter().enumerate() {
287 let child = events_node.create_child(i.to_string());
288 match event {
289 SuspendEvent::Attempt { time, state } => {
290 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
291 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
292 }
293 SuspendEvent::Resume { time, reason } => {
294 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
295 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
296 }
297 SuspendEvent::Fail { time, wake_locks, epolls } => {
298 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
299 if let Some(names) = wake_locks {
300 let names_array =
301 child.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, names.len());
302 for (i, name) in names.iter().enumerate() {
303 names_array.set(i, name);
304 }
305 child.record(names_array);
306 }
307 if let Some(epolls) = epolls {
308 let epolls_array =
309 child.create_string_array(fobs::ACTIVE_EPOLLS, epolls.len());
310 for (i, command) in epolls.iter().enumerate() {
311 epolls_array.set(i, command.to_string());
312 }
313 child.record(epolls_array);
314 }
315 }
316 }
317 events_node.record(child);
318 }
319 node.record(events_node);
320 }
321}
322
323pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
324
325impl Default for SuspendResumeManager {
326 fn default() -> Self {
327 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
328 Default::default();
329 let message_counters_clone = message_counters.clone();
330 let root = inspect::component::inspector().root();
331 root.record_lazy_values("message_counters", move || {
332 let message_counters_clone = message_counters_clone.clone();
333 async move {
334 let inspector = fuchsia_inspect::Inspector::default();
335 let root = inspector.root();
336 let mut message_counters = message_counters_clone.lock();
337 message_counters.retain(|c| c.0.upgrade().is_some());
338 let message_counters_inspect =
339 root.create_string_array("message_counters", message_counters.len());
340 for (i, c) in message_counters.iter().enumerate() {
341 let counter = c.0.upgrade().expect("lost counter should be retained");
342 message_counters_inspect.set(i, counter.to_string());
343 }
344 root.record(message_counters_inspect);
345 Ok(inspector)
346 }
347 .boxed()
348 });
349 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
350 let inner_clone = inner.clone();
351 root.record_lazy_child("wake_locks", move || {
352 let inner = inner_clone.clone();
353 async move {
354 let inspector = fuchsia_inspect::Inspector::default();
355 let root = inspector.root();
356 let state = inner.lock();
357
358 state.record_active_locks(root);
359 state.record_inactive_locks(root);
360 state.record_active_epolls(root);
361 state.record_inactive_epolls(root);
362 state.record_suspend_events(root);
363
364 Ok(inspector)
365 }
366 .boxed()
367 });
368 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
369 }
370}
371
372impl SuspendResumeManager {
373 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
375 self.inner.lock()
376 }
377
378 pub fn init(
380 self: &SuspendResumeManagerHandle,
381 system_task: &CurrentTask,
382 ) -> Result<(), anyhow::Error> {
383 let handoff = system_task
384 .kernel()
385 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
386 .into_sync_proxy();
387 match handoff.take(zx::MonotonicInstant::INFINITE) {
388 Ok(parent_lease) => {
389 let parent_lease = parent_lease
390 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
391 drop(parent_lease)
392 }
393 Err(e) => {
394 if e.is_closed() {
395 log_warn!(
396 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
397 );
398 } else {
399 return Err(e).context("Handoff::Take");
400 }
401 }
402 }
403 Ok(())
404 }
405
406 pub fn add_lock(&self, name: &str, src: LockSource) -> bool {
408 let mut state = self.lock();
409 let res = state.active_locks.insert(String::from(name), src);
410 state.signal_wake_events();
411 res.is_none()
412 }
413
414 pub fn remove_lock(&self, name: &str) -> bool {
416 let mut state = self.lock();
417 let res = state.active_locks.remove(name);
418 if res.is_none() {
419 return false;
420 }
421
422 state.inactive_locks.insert(String::from(name));
423 state.signal_wake_events();
424 true
425 }
426
427 pub fn add_epoll(&self, current_task: &CurrentTask, key: EpollKey) {
429 let mut state = self.lock();
430 let past_durations = state.inactive_epolls.remove(&key).map(|info| info.past_durations);
431 let epoll_info = state.active_epolls.remove(&key).unwrap_or_else(|| {
432 EpollInfo::new(current_task.command(), zx::BootInstant::get(), past_durations)
433 });
434 state.active_epolls.insert(key, epoll_info);
435 state.signal_wake_events();
436 }
437
438 pub fn remove_epoll(&self, key: EpollKey) {
440 let mut state = self.lock();
441 let epoll_info = state.active_epolls.remove(&key);
442 if let Some(epoll_info) = epoll_info {
443 state.inactive_epolls.insert(key, epoll_info.update_lifecycle(zx::BootInstant::get()));
444 }
445 state.signal_wake_events();
446 }
447
448 pub fn add_message_counter(
449 &self,
450 name: &str,
451 counter: Option<zx::Counter>,
452 ) -> OwnedMessageCounterHandle {
453 let container_counter = OwnedMessageCounter::new(name, counter);
454 let mut message_counters = self.message_counters.lock();
455 message_counters.insert(WeakKey::from(&container_counter));
456 message_counters.retain(|c| c.0.upgrade().is_some());
457 container_counter
458 }
459
460 pub fn has_nonzero_message_counter(&self) -> bool {
461 self.message_counters.lock().iter().any(|c| {
462 let Some(c) = c.0.upgrade() else {
463 return false;
464 };
465 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
466 })
467 }
468
469 pub fn duplicate_lock_event(&self) -> zx::EventPair {
472 let state = self.lock();
473 state
474 .active_lock_reader
475 .duplicate_handle(zx::Rights::SAME_RIGHTS)
476 .expect("Failed to duplicate handle")
477 }
478
479 pub fn suspend_stats(&self) -> SuspendStats {
481 self.lock().suspend_stats.clone()
482 }
483
484 pub fn sync_on_suspend_enabled(&self) -> bool {
488 self.lock().sync_on_suspend_enabled.clone()
489 }
490
491 pub fn set_sync_on_suspend(&self, enable: bool) {
494 self.lock().sync_on_suspend_enabled = enable;
495 }
496
497 pub fn suspend_states(&self) -> HashSet<SuspendState> {
499 HashSet::from([SuspendState::Idle])
501 }
502
503 pub fn suspend(
504 &self,
505 locked: &mut Locked<FileOpsCore>,
506 suspend_state: SuspendState,
507 ) -> Result<(), Errno> {
508 let suspend_start_time = zx::BootInstant::get();
509 let mut state = self.lock();
510 state.add_suspend_event(SuspendEvent::Attempt {
511 time: suspend_start_time,
512 state: suspend_state.to_string(),
513 });
514
515 if !state.can_suspend() {
517 self.report_failed_suspension(state, "kernel wake lock");
518 return error!(EINVAL);
519 }
520
521 std::mem::drop(state);
525
526 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
529
530 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
531 .expect("Failed to connect to manager");
532 fuchsia_trace::duration!("power", "suspend_container:fidl");
533
534 let container_job = Some(
535 fuchsia_runtime::job_default()
536 .duplicate(zx::Rights::SAME_RIGHTS)
537 .expect("Failed to dup handle"),
538 );
539 let wake_lock_event = Some(self.duplicate_lock_event());
540
541 log_info!("Requesting container suspension.");
542 match manager.suspend_container(
543 frunner::ManagerSuspendContainerRequest {
544 container_job,
545 wake_locks: wake_lock_event,
546 ..Default::default()
547 },
548 zx::Instant::INFINITE,
549 ) {
550 Ok(Ok(res)) => {
551 self.report_container_resumed(suspend_start_time, res);
552 }
553 e => {
554 let state = self.lock();
555 self.report_failed_suspension(state, &format!("runner error {:?}", e));
556 return error!(EINVAL);
557 }
558 }
559 Ok(())
560 }
561
562 fn report_container_resumed(
563 &self,
564 suspend_start_time: zx::BootInstant,
565 res: frunner::ManagerSuspendContainerResponse,
566 ) {
567 let wake_time = zx::BootInstant::get();
568 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
571 log_info!("Resuming from container suspension: {:?}", resume_reason);
572 let mut state = self.lock();
573 state.update_suspend_stats(|suspend_stats| {
574 suspend_stats.success_count += 1;
575 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
576 suspend_stats.last_time_in_sleep =
577 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
578 suspend_stats.last_resume_reason = resume_reason.clone();
579 });
580 state.add_suspend_event(SuspendEvent::Resume {
581 time: wake_time,
582 reason: resume_reason.unwrap_or_default(),
583 });
584 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
585 }
586
587 fn report_failed_suspension(
588 &self,
589 mut state: MutexGuard<'_, SuspendResumeManagerInner>,
590 failure_reason: &str,
591 ) {
592 let wake_time = zx::BootInstant::get();
593 state.update_suspend_stats(|suspend_stats| {
594 suspend_stats.fail_count += 1;
595 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
596 suspend_stats.last_resume_reason = None;
597 });
598
599 let wake_lock_names = state.active_wake_locks();
600 let epoll_names = state.active_epolls();
601 let last_resume_reason =
602 format!("Abort: {}", wake_lock_names.join(" ") + &epoll_names.iter().join(" "));
603 state.update_suspend_stats(|suspend_stats| {
604 suspend_stats.last_resume_reason = Some(last_resume_reason);
606 });
607
608 log_warn!(
609 "Suspend failed due to {:?}. Here are the active wake locks: {:?}, and epolls: {:?}",
610 failure_reason,
611 wake_lock_names,
612 epoll_names
613 );
614 state.add_suspend_event(SuspendEvent::Fail {
615 time: wake_time,
616 wake_locks: Some(wake_lock_names),
617 epolls: Some(epoll_names),
618 });
619 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
620 }
621
622 pub fn acquire_ebpf_suspend_lock<'a, L>(
623 &'a self,
624 locked: &'a mut Locked<L>,
625 ) -> EbpfSuspendGuard<'a>
626 where
627 L: LockBefore<EbpfSuspendLock>,
628 {
629 self.ebpf_suspend_lock.read(locked)
630 }
631}
632
633impl fmt::Display for LockSource {
634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635 match self {
636 LockSource::WakeLockFile => write!(f, "wake lock file"),
637 LockSource::ContainerPowerController => write!(f, "container power controller"),
638 }
639 }
640}
641
642pub trait OnWakeOps: Send + Sync {
643 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
644}
645
646pub fn create_proxy_for_wake_events_counter_zero(
664 remote_channel: zx::Channel,
665 name: String,
666) -> (zx::Channel, zx::Counter) {
667 let (local_proxy, kernel_channel) = zx::Channel::create();
668 let counter = zx::Counter::create();
669
670 let local_counter =
671 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
672
673 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
674 .expect("failed");
675 manager
676 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
677 container_job: Some(
678 fuchsia_runtime::job_default()
679 .duplicate(zx::Rights::SAME_RIGHTS)
680 .expect("Failed to dup handle"),
681 ),
682 container_channel: Some(kernel_channel),
683 remote_channel: Some(remote_channel),
684 counter: Some(counter),
685 name: Some(name),
686 ..Default::default()
687 })
688 .expect("Failed to create proxy");
689
690 (local_proxy, local_counter)
691}
692
693pub fn create_proxy_for_wake_events_counter(
712 remote_channel: zx::Channel,
713 name: String,
714) -> (zx::Channel, zx::Counter) {
715 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
716
717 counter.add(1).expect("Failed to add to counter");
720
721 (proxy, counter)
722}
723
724pub fn mark_proxy_message_handled(counter: &zx::Counter) {
729 counter.add(-1).expect("Failed to decrement counter");
730}
731
732pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
734 counter.write(0).expect("Failed to decrement counter");
735}
736
737pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
741 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
742 .expect("failed");
743 manager
744 .register_wake_watcher(
745 frunner::ManagerRegisterWakeWatcherRequest {
746 watcher: Some(watcher),
747 ..Default::default()
748 },
749 zx::Instant::INFINITE,
750 )
751 .expect("Failed to register wake watcher");
752}
753
754#[derive(Debug)]
763pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
764
765impl Drop for SharedMessageCounter {
766 fn drop(&mut self) {
767 if let Some(message_counter) = self.0.upgrade() {
768 message_counter.mark_handled();
769 }
770 }
771}
772
773pub struct OwnedMessageCounter {
778 name: String,
779 counter: Option<zx::Counter>,
780}
781pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
782
783impl Drop for OwnedMessageCounter {
784 fn drop(&mut self) {
789 self.counter.as_ref().map(mark_all_proxy_messages_handled);
790 }
791}
792
793impl OwnedMessageCounter {
794 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
795 Arc::new(Self { name: name.to_string(), counter })
796 }
797
798 pub fn mark_handled(&self) {
803 self.counter.as_ref().map(mark_proxy_message_handled);
804 }
805
806 pub fn mark_pending(&self) {
810 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
811 }
812
813 pub fn share(
817 self: &OwnedMessageCounterHandle,
818 new_pending_message: bool,
819 ) -> SharedMessageCounter {
820 if new_pending_message {
821 self.mark_pending();
822 }
823 SharedMessageCounter(Arc::downgrade(self))
824 }
825}
826
827impl fmt::Display for OwnedMessageCounter {
828 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
829 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
830 }
831}
832
833pub struct ContainerWakingProxy<P: Proxy> {
838 counter: OwnedMessageCounterHandle,
839 proxy: P,
840}
841
842impl<P: Proxy> ContainerWakingProxy<P> {
843 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
844 Self { counter, proxy }
845 }
846
847 pub fn call<T, F, R>(&self, future: F) -> R
851 where
852 F: FnOnce(&P) -> R,
853 R: Future<Output = T>,
854 {
855 let f = future(&self.proxy);
863 self.counter.mark_handled();
864 f
865 }
866}
867
868pub struct ContainerWakingStream<S: FusedStream + Unpin> {
873 counter: OwnedMessageCounterHandle,
874 stream: S,
875}
876
877impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
878 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
879 Self { counter, stream }
880 }
881
882 pub fn next(&mut self) -> Next<'_, S> {
886 let is_terminated = self.stream.is_terminated();
888 let next = self.stream.next();
889 if !is_terminated {
890 self.counter.mark_handled();
891 }
892 next
893 }
894}
895
896#[cfg(test)]
897mod test {
898 use super::*;
899 use diagnostics_assertions::assert_data_tree;
900 use fidl::endpoints::create_proxy_and_stream;
901 use fidl_test_placeholders::{EchoMarker, EchoRequest};
902 use futures::StreamExt;
903 use zx::{self, HandleBased};
904 use {fuchsia_async as fasync, fuchsia_inspect as inspect};
905
906 #[::fuchsia::test]
907 fn test_counter_zero_initialization() {
908 let (_endpoint, endpoint) = zx::Channel::create();
909 let (_channel, counter) =
910 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
911 assert_eq!(counter.read(), Ok(0));
912 }
913
914 #[::fuchsia::test]
915 fn test_counter_initialization() {
916 let (_endpoint, endpoint) = zx::Channel::create();
917 let (_channel, counter) =
918 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
919 assert_eq!(counter.read(), Ok(1));
920 }
921
922 #[::fuchsia::test]
923 async fn test_container_waking_proxy() {
924 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
925 let server_task = fasync::Task::spawn(async move {
926 let request = stream.next().await.unwrap().unwrap();
927 match request {
928 EchoRequest::EchoString { value, responder } => {
929 responder.send(value.as_deref()).unwrap();
930 }
931 }
932 });
933
934 let counter = zx::Counter::create();
935 counter.add(5).unwrap();
936 assert_eq!(counter.read(), Ok(5));
937
938 let waking_proxy = ContainerWakingProxy {
939 counter: OwnedMessageCounter::new(
940 "test_proxy",
941 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
942 ),
943 proxy,
944 };
945
946 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
947
948 assert_eq!(counter.read(), Ok(4));
950
951 let response = response_future.await.unwrap();
952 assert_eq!(response.as_deref(), Some("hello"));
953
954 server_task.await;
955
956 assert_eq!(counter.read(), Ok(4));
957 drop(waking_proxy);
958 assert_eq!(counter.read(), Ok(0));
959 }
960
961 #[::fuchsia::test]
962 async fn test_container_waking_stream() {
963 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
964 let client_task = fasync::Task::spawn(async move {
965 let response = proxy.echo_string(Some("hello")).await.unwrap();
966 assert_eq!(response.as_deref(), Some("hello"));
967 });
968
969 let counter = zx::Counter::create();
970 counter.add(5).unwrap();
971 assert_eq!(counter.read(), Ok(5));
972
973 let mut waking_stream = ContainerWakingStream {
974 counter: OwnedMessageCounter::new(
975 "test_stream",
976 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
977 ),
978 stream,
979 };
980
981 let request_future = waking_stream.next();
982
983 assert_eq!(counter.read(), Ok(4));
985
986 let request = request_future.await.unwrap().unwrap();
987 match request {
988 EchoRequest::EchoString { value, responder } => {
989 assert_eq!(value.as_deref(), Some("hello"));
990 responder.send(value.as_deref()).unwrap();
991 }
992 }
993
994 client_task.await;
995
996 assert_eq!(counter.read(), Ok(4));
997 drop(waking_stream);
998 assert_eq!(counter.read(), Ok(0));
999 }
1000
1001 #[::fuchsia::test]
1002 async fn test_message_counters_inspect() {
1003 let power_manager = SuspendResumeManager::default();
1004 let inspector = inspect::component::inspector();
1005
1006 let zx_counter = zx::Counter::create();
1007 let counter_handle = power_manager.add_message_counter(
1008 "test_counter",
1009 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1010 );
1011
1012 zx_counter.add(1).unwrap();
1013
1014 assert_data_tree!(inspector, root: contains {
1015 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1016 });
1017
1018 zx_counter.add(1).unwrap();
1019 assert_data_tree!(inspector, root: contains {
1020 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1021 });
1022
1023 drop(counter_handle);
1024 assert_data_tree!(inspector, root: contains {
1025 message_counters: Vec::<String>::new(),
1026 });
1027 }
1028
1029 #[::fuchsia::test]
1030 fn test_shared_message_counter() {
1031 let zx_counter = zx::Counter::create();
1033 let owned_counter = OwnedMessageCounter::new(
1034 "test_shared_counter",
1035 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1036 );
1037 zx_counter.add(5).unwrap();
1038 assert_eq!(zx_counter.read(), Ok(5));
1039
1040 let shared_counter = owned_counter.share(false);
1042 assert_eq!(zx_counter.read(), Ok(5));
1043
1044 drop(shared_counter);
1046 assert_eq!(zx_counter.read(), Ok(4));
1047
1048 let shared_counter_2 = owned_counter.share(true);
1050 assert_eq!(zx_counter.read(), Ok(5));
1051
1052 drop(shared_counter_2);
1054 assert_eq!(zx_counter.read(), Ok(4));
1055
1056 let shared_counter_3 = owned_counter.share(false);
1058 assert_eq!(zx_counter.read(), Ok(4));
1059
1060 drop(owned_counter);
1062 assert_eq!(zx_counter.read(), Ok(0));
1063
1064 drop(shared_counter_3);
1066 assert_eq!(zx_counter.read(), Ok(0));
1067 }
1068
1069 #[::fuchsia::test]
1070 async fn test_container_waking_event_termination() {
1071 let stream = futures::stream::iter(vec![0]).fuse();
1072 let counter = zx::Counter::create();
1073 counter.add(2).unwrap();
1074 assert_eq!(counter.read(), Ok(2));
1075 let mut waking_stream = ContainerWakingStream {
1076 counter: OwnedMessageCounter::new(
1077 "test_stream",
1078 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1079 ),
1080 stream,
1081 };
1082
1083 assert_eq!(waking_stream.next().await, Some(0));
1084 assert_eq!(counter.read(), Ok(1));
1085
1086 assert_eq!(waking_stream.next().await, None);
1087 assert_eq!(waking_stream.next().await, None);
1088 assert_eq!(counter.read(), Ok(0));
1090 }
1091}