1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7use crate::vfs::EpollKey;
8
9use std::cmp::min;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::sync::{Arc, Weak};
13
14use anyhow::{Context, anyhow};
15use fidl::endpoints::Proxy;
16use fuchsia_component::client::connect_to_protocol_sync;
17use fuchsia_inspect::ArrayProperty;
18use futures::stream::{FusedStream, Next};
19use futures::{FutureExt, StreamExt};
20use itertools::Itertools;
21use starnix_logging::{log_info, log_warn};
22use starnix_sync::{
23 EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
24 RwLockReadGuard,
25};
26use starnix_task_command::TaskCommand;
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::{HandleBased, Peered};
33use {
34 fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
35 fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
36};
37
38pub struct SuspendResumeManager {
40 inner: Arc<Mutex<SuspendResumeManagerInner>>,
42
43 message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
46
47 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
49}
50
51pub struct SuspendResumeManagerInner {
53 suspend_stats: SuspendStats,
55 sync_on_suspend_enabled: bool,
56
57 suspend_events: VecDeque<SuspendEvent>,
58
59 active_locks: HashMap<String, LockSource>,
62 inactive_locks: HashSet<String>,
63
64 active_epolls: HashMap<EpollKey, TaskCommand>,
67 inactive_epolls: HashSet<TaskCommand>,
68
69 active_lock_reader: zx::EventPair,
72
73 active_lock_writer: zx::EventPair,
77}
78
79pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
80
81pub enum LockSource {
83 WakeLockFile,
84 ContainerPowerController,
85}
86
87#[derive(Clone, Debug)]
88pub enum SuspendEvent {
89 Attempt {
90 time: zx::BootInstant,
91 state: String,
92 },
93 Resume {
94 time: zx::BootInstant,
95 reason: String,
96 },
97 Fail {
98 time: zx::BootInstant,
99 wake_locks: Option<Vec<String>>,
100 epolls: Option<Vec<TaskCommand>>,
101 },
102}
103
104const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
106
107const INSPECT_MAX_WAKE_LOCK_NAMES: usize = 64;
109const INSPECT_MAX_EPOLLS: usize = 64;
110
111impl Default for SuspendResumeManagerInner {
112 fn default() -> Self {
113 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
114 Self {
115 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
116 suspend_stats: Default::default(),
117 sync_on_suspend_enabled: Default::default(),
118 active_locks: Default::default(),
119 inactive_locks: Default::default(),
120 active_epolls: Default::default(),
121 inactive_epolls: Default::default(),
122 active_lock_reader,
123 active_lock_writer,
124 }
125 }
126}
127
128impl SuspendResumeManagerInner {
129 pub fn active_wake_locks(&self) -> Vec<String> {
130 Vec::from_iter(self.active_locks.keys().cloned())
131 }
132
133 pub fn inactive_wake_locks(&self) -> Vec<String> {
134 Vec::from_iter(self.inactive_locks.clone())
135 }
136
137 fn active_epolls(&self) -> Vec<TaskCommand> {
138 Vec::from_iter(self.active_epolls.values().cloned())
139 }
140
141 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
142 where
143 UpdateFn: FnOnce(&mut SuspendStats),
144 {
145 update(&mut self.suspend_stats);
146 }
147
148 fn signal_wake_events(&mut self) {
150 let (clear_mask, set_mask) =
151 if self.active_locks.is_empty() && self.active_epolls.is_empty() {
152 (zx::Signals::EVENT_SIGNALED, zx::Signals::empty())
153 } else {
154 (zx::Signals::empty(), zx::Signals::EVENT_SIGNALED)
155 };
156 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
157 }
158
159 fn record_active_locks(&self, node: &inspect::Node) {
160 let active_locks = &self.active_locks;
161 let len = min(active_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
162 let active_wake_locks = node.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, len);
163 for (i, name) in active_locks.keys().sorted().take(len).enumerate() {
164 if let Some(src) = active_locks.get(name) {
165 active_wake_locks.set(i, format!("{} (source {})", name, src));
166 }
167 }
168 node.record(active_wake_locks);
169 }
170
171 fn record_inactive_locks(&self, node: &inspect::Node) {
172 let inactive_locks = &self.inactive_locks;
173 let len = min(inactive_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
174 let inactive_wake_locks = node.create_string_array(fobs::INACTIVE_WAKE_LOCK_NAMES, len);
175 for (i, name) in inactive_locks.iter().sorted().take(len).enumerate() {
176 inactive_wake_locks.set(i, name);
177 }
178 node.record(inactive_wake_locks);
179 }
180
181 fn record_active_epolls(&self, node: &inspect::Node) {
182 let active_epolls = &self.active_epolls;
183 let len = min(active_epolls.len(), INSPECT_MAX_EPOLLS);
184 let active_epolls_node = node.create_string_array(fobs::ACTIVE_EPOLLS, len);
185 for (i, key) in active_epolls.keys().sorted().rev().take(len).enumerate() {
186 if let Some(name) = active_epolls.get(key) {
187 active_epolls_node.set(i, name.to_string());
188 }
189 }
190 node.record(active_epolls_node);
191 }
192
193 fn record_inactive_epolls(&self, node: &inspect::Node) {
194 let inactive_epolls = &self.inactive_epolls;
195 let len = min(inactive_epolls.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
196 let inactive_epolls_node = node.create_string_array(fobs::INACTIVE_EPOLLS, len);
197 for (i, name) in inactive_epolls.iter().sorted().take(len).enumerate() {
198 inactive_epolls_node.set(i, name.to_string());
199 }
200 node.record(inactive_epolls_node);
201 }
202
203 fn add_suspend_event(&mut self, event: SuspendEvent) {
204 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
205 self.suspend_events.pop_front();
206 }
207 self.suspend_events.push_back(event);
208 }
209
210 fn record_suspend_events(&self, node: &inspect::Node) {
211 let events_node = node.create_child("suspend_events");
212 for (i, event) in self.suspend_events.iter().enumerate() {
213 let child = events_node.create_child(i.to_string());
214 match event {
215 SuspendEvent::Attempt { time, state } => {
216 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
217 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
218 }
219 SuspendEvent::Resume { time, reason } => {
220 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
221 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
222 }
223 SuspendEvent::Fail { time, wake_locks, epolls } => {
224 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
225 if let Some(names) = wake_locks {
226 let names_array =
227 child.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, names.len());
228 for (i, name) in names.iter().enumerate() {
229 names_array.set(i, name);
230 }
231 child.record(names_array);
232 }
233 if let Some(epolls) = epolls {
234 let epolls_array =
235 child.create_string_array(fobs::ACTIVE_EPOLLS, epolls.len());
236 for (i, command) in epolls.iter().enumerate() {
237 epolls_array.set(i, command.to_string());
238 }
239 child.record(epolls_array);
240 }
241 }
242 }
243 events_node.record(child);
244 }
245 node.record(events_node);
246 }
247}
248
249pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
250
251impl Default for SuspendResumeManager {
252 fn default() -> Self {
253 let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
254 Default::default();
255 let message_counters_clone = message_counters.clone();
256 let root = inspect::component::inspector().root();
257 root.record_lazy_values("message_counters", move || {
258 let message_counters_clone = message_counters_clone.clone();
259 async move {
260 let inspector = fuchsia_inspect::Inspector::default();
261 let root = inspector.root();
262 let mut message_counters = message_counters_clone.lock();
263 message_counters.retain(|c| c.0.upgrade().is_some());
264 let message_counters_inspect =
265 root.create_string_array("message_counters", message_counters.len());
266 for (i, c) in message_counters.iter().enumerate() {
267 let counter = c.0.upgrade().expect("lost counter should be retained");
268 message_counters_inspect.set(i, counter.to_string());
269 }
270 root.record(message_counters_inspect);
271 Ok(inspector)
272 }
273 .boxed()
274 });
275 let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
276 let inner_clone = inner.clone();
277 root.record_lazy_child("wake_locks", move || {
278 let inner = inner_clone.clone();
279 async move {
280 let inspector = fuchsia_inspect::Inspector::default();
281 let root = inspector.root();
282 let state = inner.lock();
283
284 state.record_active_locks(root);
285 state.record_inactive_locks(root);
286 state.record_active_epolls(root);
287 state.record_inactive_epolls(root);
288 state.record_suspend_events(root);
289
290 Ok(inspector)
291 }
292 .boxed()
293 });
294 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
295 }
296}
297
298impl SuspendResumeManager {
299 pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
301 self.inner.lock()
302 }
303
304 pub fn init(
306 self: &SuspendResumeManagerHandle,
307 system_task: &CurrentTask,
308 ) -> Result<(), anyhow::Error> {
309 let handoff = system_task
310 .kernel()
311 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
312 .into_sync_proxy();
313 match handoff.take(zx::MonotonicInstant::INFINITE) {
314 Ok(parent_lease) => {
315 let parent_lease = parent_lease
316 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
317 drop(parent_lease)
318 }
319 Err(e) => {
320 if e.is_closed() {
321 log_warn!(
322 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
323 );
324 } else {
325 return Err(e).context("Handoff::Take");
326 }
327 }
328 }
329 Ok(())
330 }
331
332 pub fn add_lock(&self, name: &str, src: LockSource) -> bool {
334 let mut state = self.lock();
335 let res = state.active_locks.insert(String::from(name), src);
336 state.signal_wake_events();
337 res.is_none()
338 }
339
340 pub fn remove_lock(&self, name: &str) -> bool {
342 let mut state = self.lock();
343 let res = state.active_locks.remove(name);
344 if res.is_none() {
345 return false;
346 }
347
348 state.inactive_locks.insert(String::from(name));
349 state.signal_wake_events();
350 true
351 }
352
353 pub fn add_epoll(&self, current_task: &CurrentTask, key: EpollKey) {
355 let mut state = self.lock();
356 state.active_epolls.insert(key, current_task.command());
357 state.signal_wake_events();
358 }
359
360 pub fn remove_epoll(&self, key: EpollKey) {
362 let mut state = self.lock();
363 let epoll = state.active_epolls.remove(&key);
364 if let Some(epoll) = epoll {
365 state.inactive_epolls.insert(epoll);
366 }
367 state.signal_wake_events();
368 }
369
370 pub fn add_message_counter(
371 &self,
372 name: &str,
373 counter: Option<zx::Counter>,
374 ) -> OwnedMessageCounterHandle {
375 let container_counter = OwnedMessageCounter::new(name, counter);
376 let mut message_counters = self.message_counters.lock();
377 message_counters.insert(WeakKey::from(&container_counter));
378 message_counters.retain(|c| c.0.upgrade().is_some());
379 container_counter
380 }
381
382 pub fn has_nonzero_message_counter(&self) -> bool {
383 self.message_counters.lock().iter().any(|c| {
384 let Some(c) = c.0.upgrade() else {
385 return false;
386 };
387 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
388 })
389 }
390
391 pub fn duplicate_lock_event(&self) -> zx::EventPair {
394 let state = self.lock();
395 state
396 .active_lock_reader
397 .duplicate_handle(zx::Rights::SAME_RIGHTS)
398 .expect("Failed to duplicate handle")
399 }
400
401 pub fn suspend_stats(&self) -> SuspendStats {
403 self.lock().suspend_stats.clone()
404 }
405
406 pub fn sync_on_suspend_enabled(&self) -> bool {
410 self.lock().sync_on_suspend_enabled.clone()
411 }
412
413 pub fn set_sync_on_suspend(&self, enable: bool) {
416 self.lock().sync_on_suspend_enabled = enable;
417 }
418
419 pub fn suspend_states(&self) -> HashSet<SuspendState> {
421 HashSet::from([SuspendState::Idle])
423 }
424
425 pub fn suspend(
426 &self,
427 locked: &mut Locked<FileOpsCore>,
428 state: SuspendState,
429 ) -> Result<(), Errno> {
430 let suspend_start_time = zx::BootInstant::get();
431
432 self.lock().add_suspend_event(SuspendEvent::Attempt {
433 time: suspend_start_time,
434 state: state.to_string(),
435 });
436
437 let ebpf_lock = self.ebpf_suspend_lock.write(locked);
438
439 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
440 .expect("Failed to connect to manager");
441 fuchsia_trace::duration!("power", "suspend_container:fidl");
442 log_info!("Asking runner to suspend container.");
443 match manager.suspend_container(
444 frunner::ManagerSuspendContainerRequest {
445 container_job: Some(
446 fuchsia_runtime::job_default()
447 .duplicate(zx::Rights::SAME_RIGHTS)
448 .expect("Failed to dup handle"),
449 ),
450 wake_locks: Some(self.duplicate_lock_event()),
451 ..Default::default()
452 },
453 zx::Instant::INFINITE,
454 ) {
455 Ok(Ok(res)) => {
456 log_info!("Resuming from container suspension.");
457 let wake_time = zx::BootInstant::get();
458 let resume_reason = res.resume_reason;
459 let mut state = self.lock();
460 state.update_suspend_stats(|suspend_stats| {
461 suspend_stats.success_count += 1;
462 suspend_stats.last_time_in_suspend_operations =
463 (wake_time - suspend_start_time).into();
464 suspend_stats.last_time_in_sleep =
465 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
466 suspend_stats.last_resume_reason =
469 resume_reason.clone().map(|s| format!("0 {}", s));
470 });
471 state.add_suspend_event(SuspendEvent::Resume {
472 time: wake_time,
473 reason: resume_reason.unwrap_or_default(),
474 });
475 fuchsia_trace::instant!(
476 "power",
477 "suspend_container:done",
478 fuchsia_trace::Scope::Process
479 );
480 }
481 e => {
482 let wake_time = zx::BootInstant::get();
483 let mut state = self.lock();
484 state.update_suspend_stats(|suspend_stats| {
485 suspend_stats.fail_count += 1;
486 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
487 suspend_stats.last_resume_reason = None;
488 });
489
490 let (wake_lock_names, epoll_names) = match e {
491 Ok(Err(frunner::SuspendError::WakeLocksExist)) => {
492 let wake_lock_names = state.active_wake_locks();
493 let epoll_names = state.active_epolls();
494 let last_resume_reason = format!(
495 "Abort: {}",
496 wake_lock_names.join(" ") + &epoll_names.iter().join(" ")
497 );
498 state.update_suspend_stats(|suspend_stats| {
499 suspend_stats.last_resume_reason = Some(last_resume_reason);
501 });
502 (Some(wake_lock_names), Some(epoll_names))
503 }
504 _ => (None, None),
505 };
506
507 log_warn!(e:?; "Container suspension failed. wake locks: {:?}, epolls: {:?}", wake_lock_names, epoll_names);
508 state.add_suspend_event(SuspendEvent::Fail {
509 time: wake_time,
510 wake_locks: wake_lock_names,
511 epolls: epoll_names,
512 });
513 fuchsia_trace::instant!(
514 "power",
515 "suspend_container:error",
516 fuchsia_trace::Scope::Process
517 );
518 return error!(EINVAL);
519 }
520 }
521
522 std::mem::drop(ebpf_lock);
523
524 Ok(())
525 }
526
527 pub fn acquire_ebpf_suspend_lock<'a, L>(
528 &'a self,
529 locked: &'a mut Locked<L>,
530 ) -> EbpfSuspendGuard<'a>
531 where
532 L: LockBefore<EbpfSuspendLock>,
533 {
534 self.ebpf_suspend_lock.read(locked)
535 }
536}
537
538impl fmt::Display for LockSource {
539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540 match self {
541 LockSource::WakeLockFile => write!(f, "wake lock file"),
542 LockSource::ContainerPowerController => write!(f, "container power controller"),
543 }
544 }
545}
546
547pub trait OnWakeOps: Send + Sync {
548 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
549}
550
551pub fn create_proxy_for_wake_events_counter_zero(
569 remote_channel: zx::Channel,
570 name: String,
571) -> (zx::Channel, zx::Counter) {
572 let (local_proxy, kernel_channel) = zx::Channel::create();
573 let counter = zx::Counter::create();
574
575 let local_counter =
576 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
577
578 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
579 .expect("failed");
580 manager
581 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
582 container_job: Some(
583 fuchsia_runtime::job_default()
584 .duplicate(zx::Rights::SAME_RIGHTS)
585 .expect("Failed to dup handle"),
586 ),
587 container_channel: Some(kernel_channel),
588 remote_channel: Some(remote_channel),
589 counter: Some(counter),
590 name: Some(name),
591 ..Default::default()
592 })
593 .expect("Failed to create proxy");
594
595 (local_proxy, local_counter)
596}
597
598pub fn create_proxy_for_wake_events_counter(
617 remote_channel: zx::Channel,
618 name: String,
619) -> (zx::Channel, zx::Counter) {
620 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
621
622 counter.add(1).expect("Failed to add to counter");
625
626 (proxy, counter)
627}
628
629pub fn mark_proxy_message_handled(counter: &zx::Counter) {
634 counter.add(-1).expect("Failed to decrement counter");
635}
636
637pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
639 counter.write(0).expect("Failed to decrement counter");
640}
641
642pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
646 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
647 .expect("failed");
648 manager
649 .register_wake_watcher(
650 frunner::ManagerRegisterWakeWatcherRequest {
651 watcher: Some(watcher),
652 ..Default::default()
653 },
654 zx::Instant::INFINITE,
655 )
656 .expect("Failed to register wake watcher");
657}
658
659#[derive(Debug)]
668pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
669
670impl Drop for SharedMessageCounter {
671 fn drop(&mut self) {
672 if let Some(message_counter) = self.0.upgrade() {
673 message_counter.mark_handled();
674 }
675 }
676}
677
678pub struct OwnedMessageCounter {
683 name: String,
684 counter: Option<zx::Counter>,
685}
686pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
687
688impl Drop for OwnedMessageCounter {
689 fn drop(&mut self) {
694 self.counter.as_ref().map(mark_all_proxy_messages_handled);
695 }
696}
697
698impl OwnedMessageCounter {
699 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
700 Arc::new(Self { name: name.to_string(), counter })
701 }
702
703 pub fn mark_handled(&self) {
708 self.counter.as_ref().map(mark_proxy_message_handled);
709 }
710
711 pub fn mark_pending(&self) {
715 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
716 }
717
718 pub fn share(
722 self: &OwnedMessageCounterHandle,
723 new_pending_message: bool,
724 ) -> SharedMessageCounter {
725 if new_pending_message {
726 self.mark_pending();
727 }
728 SharedMessageCounter(Arc::downgrade(self))
729 }
730}
731
732impl fmt::Display for OwnedMessageCounter {
733 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
734 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
735 }
736}
737
738pub struct ContainerWakingProxy<P: Proxy> {
743 counter: OwnedMessageCounterHandle,
744 proxy: P,
745}
746
747impl<P: Proxy> ContainerWakingProxy<P> {
748 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
749 Self { counter, proxy }
750 }
751
752 pub fn call<T, F, R>(&self, future: F) -> R
756 where
757 F: FnOnce(&P) -> R,
758 R: Future<Output = T>,
759 {
760 let f = future(&self.proxy);
768 self.counter.mark_handled();
769 f
770 }
771}
772
773pub struct ContainerWakingStream<S: FusedStream + Unpin> {
778 counter: OwnedMessageCounterHandle,
779 stream: S,
780}
781
782impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
783 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
784 Self { counter, stream }
785 }
786
787 pub fn next(&mut self) -> Next<'_, S> {
791 let is_terminated = self.stream.is_terminated();
793 let next = self.stream.next();
794 if !is_terminated {
795 self.counter.mark_handled();
796 }
797 next
798 }
799}
800
801#[cfg(test)]
802mod test {
803 use super::*;
804 use diagnostics_assertions::assert_data_tree;
805 use fidl::endpoints::create_proxy_and_stream;
806 use fidl_test_placeholders::{EchoMarker, EchoRequest};
807 use futures::StreamExt;
808 use zx::{self, HandleBased};
809 use {fuchsia_async as fasync, fuchsia_inspect as inspect};
810
811 #[::fuchsia::test]
812 fn test_counter_zero_initialization() {
813 let (_endpoint, endpoint) = zx::Channel::create();
814 let (_channel, counter) =
815 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
816 assert_eq!(counter.read(), Ok(0));
817 }
818
819 #[::fuchsia::test]
820 fn test_counter_initialization() {
821 let (_endpoint, endpoint) = zx::Channel::create();
822 let (_channel, counter) =
823 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
824 assert_eq!(counter.read(), Ok(1));
825 }
826
827 #[::fuchsia::test]
828 async fn test_container_waking_proxy() {
829 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
830 let server_task = fasync::Task::spawn(async move {
831 let request = stream.next().await.unwrap().unwrap();
832 match request {
833 EchoRequest::EchoString { value, responder } => {
834 responder.send(value.as_deref()).unwrap();
835 }
836 }
837 });
838
839 let counter = zx::Counter::create();
840 counter.add(5).unwrap();
841 assert_eq!(counter.read(), Ok(5));
842
843 let waking_proxy = ContainerWakingProxy {
844 counter: OwnedMessageCounter::new(
845 "test_proxy",
846 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
847 ),
848 proxy,
849 };
850
851 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
852
853 assert_eq!(counter.read(), Ok(4));
855
856 let response = response_future.await.unwrap();
857 assert_eq!(response.as_deref(), Some("hello"));
858
859 server_task.await;
860
861 assert_eq!(counter.read(), Ok(4));
862 drop(waking_proxy);
863 assert_eq!(counter.read(), Ok(0));
864 }
865
866 #[::fuchsia::test]
867 async fn test_container_waking_stream() {
868 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
869 let client_task = fasync::Task::spawn(async move {
870 let response = proxy.echo_string(Some("hello")).await.unwrap();
871 assert_eq!(response.as_deref(), Some("hello"));
872 });
873
874 let counter = zx::Counter::create();
875 counter.add(5).unwrap();
876 assert_eq!(counter.read(), Ok(5));
877
878 let mut waking_stream = ContainerWakingStream {
879 counter: OwnedMessageCounter::new(
880 "test_stream",
881 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
882 ),
883 stream,
884 };
885
886 let request_future = waking_stream.next();
887
888 assert_eq!(counter.read(), Ok(4));
890
891 let request = request_future.await.unwrap().unwrap();
892 match request {
893 EchoRequest::EchoString { value, responder } => {
894 assert_eq!(value.as_deref(), Some("hello"));
895 responder.send(value.as_deref()).unwrap();
896 }
897 }
898
899 client_task.await;
900
901 assert_eq!(counter.read(), Ok(4));
902 drop(waking_stream);
903 assert_eq!(counter.read(), Ok(0));
904 }
905
906 #[::fuchsia::test]
907 async fn test_message_counters_inspect() {
908 let power_manager = SuspendResumeManager::default();
909 let inspector = inspect::component::inspector();
910
911 let zx_counter = zx::Counter::create();
912 let counter_handle = power_manager.add_message_counter(
913 "test_counter",
914 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
915 );
916
917 zx_counter.add(1).unwrap();
918
919 assert_data_tree!(inspector, root: contains {
920 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
921 });
922
923 zx_counter.add(1).unwrap();
924 assert_data_tree!(inspector, root: contains {
925 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
926 });
927
928 drop(counter_handle);
929 assert_data_tree!(inspector, root: contains {
930 message_counters: Vec::<String>::new(),
931 });
932 }
933
934 #[::fuchsia::test]
935 fn test_shared_message_counter() {
936 let zx_counter = zx::Counter::create();
938 let owned_counter = OwnedMessageCounter::new(
939 "test_shared_counter",
940 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
941 );
942 zx_counter.add(5).unwrap();
943 assert_eq!(zx_counter.read(), Ok(5));
944
945 let shared_counter = owned_counter.share(false);
947 assert_eq!(zx_counter.read(), Ok(5));
948
949 drop(shared_counter);
951 assert_eq!(zx_counter.read(), Ok(4));
952
953 let shared_counter_2 = owned_counter.share(true);
955 assert_eq!(zx_counter.read(), Ok(5));
956
957 drop(shared_counter_2);
959 assert_eq!(zx_counter.read(), Ok(4));
960
961 let shared_counter_3 = owned_counter.share(false);
963 assert_eq!(zx_counter.read(), Ok(4));
964
965 drop(owned_counter);
967 assert_eq!(zx_counter.read(), Ok(0));
968
969 drop(shared_counter_3);
971 assert_eq!(zx_counter.read(), Ok(0));
972 }
973
974 #[::fuchsia::test]
975 async fn test_container_waking_event_termination() {
976 let stream = futures::stream::iter(vec![0]).fuse();
977 let counter = zx::Counter::create();
978 counter.add(2).unwrap();
979 assert_eq!(counter.read(), Ok(2));
980 let mut waking_stream = ContainerWakingStream {
981 counter: OwnedMessageCounter::new(
982 "test_stream",
983 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
984 ),
985 stream,
986 };
987
988 assert_eq!(waking_stream.next().await, Some(0));
989 assert_eq!(counter.read(), Ok(1));
990
991 assert_eq!(waking_stream.next().await, None);
992 assert_eq!(waking_stream.next().await, None);
993 assert_eq!(counter.read(), Ok(0));
995 }
996}