1use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7
8use std::collections::{HashMap, HashSet};
9use std::future::Future;
10use std::sync::{Arc, Weak};
11
12use anyhow::{Context, anyhow};
13use fidl::endpoints::Proxy;
14use fidl_fuchsia_power_observability as fobs;
15use fidl_fuchsia_session_power as fpower;
16use fidl_fuchsia_starnix_runner as frunner;
17use fuchsia_component::client::connect_to_protocol_sync;
18use fuchsia_inspect as inspect;
19use fuchsia_inspect::ArrayProperty;
20use futures::stream::{FusedStream, Next};
21use futures::{FutureExt, StreamExt};
22use starnix_logging::{log_info, log_warn};
23use starnix_sync::{
24 EbpfSuspendLock, FileOpsCore, LockBefore, LockDepGuard, LockDepMutex, LockDepReadGuard, Locked,
25 OrderedRwLock, PowerMessageCountersLock, SuspendResumeManagerInnerLock,
26};
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::Peered;
33
34#[derive(Debug, Default)]
36pub struct WakeupSource {
37 active_count: u64,
39
40 event_count: u64,
43
44 wakeup_count: u64,
50
51 expire_count: u64,
53
54 active_since: zx::MonotonicInstant,
57
58 total_time: zx::MonotonicDuration,
60
61 max_time: zx::MonotonicDuration,
63
64 last_change: zx::MonotonicInstant,
66}
67
68impl WakeupSource {
69 pub fn active_duration(&self) -> zx::MonotonicDuration {
73 if self.active_since == zx::MonotonicInstant::ZERO {
74 zx::MonotonicDuration::default()
75 } else {
76 let now = zx::MonotonicInstant::get();
77 now - self.active_since
78 }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Hash)]
83pub enum WakeupSourceOrigin {
84 WakeLock(String),
85 Epoll(String),
86 HAL(String),
87}
88
89impl std::string::ToString for WakeupSourceOrigin {
90 fn to_string(&self) -> String {
91 match self {
92 WakeupSourceOrigin::WakeLock(lock) => lock.clone(),
93 WakeupSourceOrigin::Epoll(lock) => format!("[epoll] {}", lock),
94 WakeupSourceOrigin::HAL(lock) => format!("[HAL] {}", lock),
95 }
96 }
97}
98
99pub struct SuspendResumeManager {
101 inner: Arc<LockDepMutex<SuspendResumeManagerInner, SuspendResumeManagerInnerLock>>,
103
104 message_counters:
107 Arc<LockDepMutex<HashSet<WeakKey<OwnedMessageCounter>>, PowerMessageCountersLock>>,
108
109 ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
111}
112
113pub struct SuspendResumeManagerInner {
116 suspend_stats: SuspendStats,
118 sync_on_suspend_enabled: bool,
119
120 suspend_events: VecDeque<SuspendEvent>,
121
122 wakeup_sources: HashMap<WakeupSourceOrigin, WakeupSource>,
124
125 active_lock_reader: zx::EventPair,
128
129 active_lock_writer: zx::EventPair,
133
134 active_wakeup_source_count: u64,
136
137 total_wakeup_source_event_count: u64,
140
141 external_wake_sources: HashMap<zx::Koid, ExternalWakeSource>,
143}
144
145#[derive(Debug)]
146struct ExternalWakeSource {
147 handle: zx::NullableHandle,
149 signals: zx::Signals,
151 name: String,
153}
154
155impl SuspendResumeManager {
156 pub fn add_external_wake_source(
157 &self,
158 handle: zx::NullableHandle,
159 signals: zx::Signals,
160 name: String,
161 ) -> Result<(), Errno> {
162 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
163 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
164 manager
165 .add_wake_source(frunner::ManagerAddWakeSourceRequest {
166 container_job: Some(
167 fuchsia_runtime::job_default()
168 .duplicate_handle(zx::Rights::SAME_RIGHTS)
169 .expect("Failed to dup handle"),
170 ),
171 name: Some(name.clone()),
172 handle: Some(
173 handle.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(|e| errno!(EIO, e))?,
174 ),
175 signals: Some(signals.bits()),
176 ..Default::default()
177 })
178 .map_err(|e| errno!(EIO, e))?;
179
180 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
181 self.lock().external_wake_sources.insert(
182 koid,
183 ExternalWakeSource {
184 handle: handle
185 .duplicate_handle(zx::Rights::SAME_RIGHTS)
186 .map_err(|e| errno!(EIO, e))?,
187 signals,
188 name,
189 },
190 );
191 Ok(())
192 }
193
194 pub fn remove_external_wake_source(&self, handle: zx::NullableHandle) -> Result<(), Errno> {
195 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
196 .map_err(|e| errno!(EINVAL, format!("Failed to connect to manager: {e:?}")))?;
197
198 let koid = handle.koid().map_err(|e| errno!(EINVAL, e))?;
199 self.lock().external_wake_sources.remove(&koid);
200
201 manager
202 .remove_wake_source(frunner::ManagerRemoveWakeSourceRequest {
203 container_job: Some(
204 fuchsia_runtime::job_default()
205 .duplicate_handle(zx::Rights::SAME_RIGHTS)
206 .expect("Failed to dup handle"),
207 ),
208 handle: Some(handle),
209 ..Default::default()
210 })
211 .map_err(|e| errno!(EIO, e))?;
212
213 Ok(())
214 }
215}
216
217pub type EbpfSuspendGuard<'a> = LockDepReadGuard<'a, ()>;
218
219#[derive(Clone, Debug)]
220pub enum SuspendEvent {
221 Attempt { time: zx::BootInstant, state: String },
222 Resume { time: zx::BootInstant, reason: String },
223 Fail { time: zx::BootInstant, wakeup_sources: Option<Vec<String>> },
224}
225
226const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
228
229impl Default for SuspendResumeManagerInner {
230 fn default() -> Self {
231 let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
232 active_lock_writer
233 .signal_peer(zx::Signals::empty(), zx::Signals::USER_0)
234 .expect("Failed to signal peer");
235 Self {
236 suspend_stats: Default::default(),
237 sync_on_suspend_enabled: false,
238 suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
239 wakeup_sources: Default::default(),
240 active_lock_reader,
241 active_lock_writer,
242 active_wakeup_source_count: 0,
243 total_wakeup_source_event_count: 0,
244 external_wake_sources: Default::default(),
245 }
246 }
247}
248
249impl SuspendResumeManagerInner {
250 pub fn can_suspend(&self) -> bool {
252 self.active_wakeup_source_count == 0
253 }
254
255 pub fn active_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
256 self.wakeup_sources
257 .iter()
258 .filter_map(|(name, source)| match name {
259 WakeupSourceOrigin::WakeLock(_) => {
260 if source.active_since > zx::MonotonicInstant::ZERO {
261 Some(name.clone())
262 } else {
263 None
264 }
265 }
266 _ => None,
267 })
268 .collect()
269 }
270
271 pub fn inactive_wake_locks(&self) -> Vec<WakeupSourceOrigin> {
272 self.wakeup_sources
273 .iter()
274 .filter_map(|(name, source)| match name {
275 WakeupSourceOrigin::WakeLock(_) => {
276 if source.active_since == zx::MonotonicInstant::ZERO {
277 Some(name.clone())
278 } else {
279 None
280 }
281 }
282 _ => None,
283 })
284 .collect()
285 }
286
287 fn signal_wake_events(&mut self) {
289 let (clear_mask, set_mask) = if self.active_wakeup_source_count == 0 {
290 (zx::Signals::EVENT_SIGNALED, zx::Signals::USER_0)
291 } else {
292 (zx::Signals::USER_0, zx::Signals::EVENT_SIGNALED)
293 };
294 self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
295 }
296
297 fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
298 where
299 UpdateFn: FnOnce(&mut SuspendStats),
300 {
301 update(&mut self.suspend_stats);
302 }
303
304 fn add_suspend_event(&mut self, event: SuspendEvent) {
305 if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
306 self.suspend_events.pop_front();
307 }
308 self.suspend_events.push_back(event);
309 }
310
311 fn record_suspend_events(&self, node: &inspect::Node) {
312 let events_node = node.create_child("suspend_events");
313 for (i, event) in self.suspend_events.iter().enumerate() {
314 let child = events_node.create_child(i.to_string());
315 match event {
316 SuspendEvent::Attempt { time, state } => {
317 child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
318 child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
319 }
320 SuspendEvent::Resume { time, reason } => {
321 child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
322 child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
323 }
324 SuspendEvent::Fail { time, wakeup_sources } => {
325 child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
326 if let Some(names) = wakeup_sources {
327 let names_array =
328 child.create_string_array(fobs::WAKEUP_SOURCES_NAME, names.len());
329 for (i, name) in names.iter().enumerate() {
330 names_array.set(i, name);
331 }
332 child.record(names_array);
333 }
334 }
335 }
336 events_node.record(child);
337 }
338 node.record(events_node);
339 }
340
341 fn record_wakeup_sources(&self, node: &inspect::Node) {
342 let wakeup_node = node.create_child("wakeup_sources");
343 for (name, source) in self.wakeup_sources.iter() {
344 let child = wakeup_node.create_child(name.to_string());
345 child.record_uint("active_count", source.active_count);
346 child.record_uint("event_count", source.event_count);
347 child.record_uint("wakeup_count", source.wakeup_count);
348 child.record_uint("expire_count", source.expire_count);
349 child.record_int("active_since (ns)", source.active_since.into_nanos());
350 child.record_int("active_duration_mono (ns)", source.active_duration().into_nanos());
353 child.record_int("total_time (ms)", source.total_time.into_millis());
354 child.record_int("max_time (ms)", source.max_time.into_millis());
355 child.record_int("last_change (ns)", source.last_change.into_nanos());
356 wakeup_node.record(child);
357 }
358 node.record(wakeup_node);
359 }
360}
361
362pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
363
364impl Default for SuspendResumeManager {
365 fn default() -> Self {
366 let message_counters: Arc<
367 LockDepMutex<HashSet<WeakKey<OwnedMessageCounter>>, PowerMessageCountersLock>,
368 > = Default::default();
369 let message_counters_clone = message_counters.clone();
370 let root = inspect::component::inspector().root();
371 root.record_lazy_values("message_counters", move || {
372 let message_counters_clone = message_counters_clone.clone();
373 async move {
374 let inspector = fuchsia_inspect::Inspector::default();
375 let root = inspector.root();
376 let message_counters = message_counters_clone.lock();
377 let active_counter_names: Vec<String> = message_counters
378 .iter()
379 .filter_map(|c| c.0.upgrade())
380 .map(|c| c.to_string())
381 .collect();
382 let message_counters_inspect =
383 root.create_string_array("message_counters", active_counter_names.len());
384 for (i, name) in active_counter_names.iter().enumerate() {
385 message_counters_inspect.set(i, name);
386 }
387 root.record(message_counters_inspect);
388 Ok(inspector)
389 }
390 .boxed()
391 });
392 let inner = Arc::new(LockDepMutex::new(SuspendResumeManagerInner::default()));
393 let inner_clone = inner.clone();
394 root.record_lazy_child("wakeup_sources", move || {
395 let inner = inner_clone.clone();
396 async move {
397 let inspector = fuchsia_inspect::Inspector::default();
398 let root = inspector.root();
399 let state = inner.lock();
400
401 state.record_suspend_events(root);
402 state.record_wakeup_sources(root);
403
404 Ok(inspector)
405 }
406 .boxed()
407 });
408 Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
409 }
410}
411
412impl SuspendResumeManager {
413 pub fn lock(&self) -> LockDepGuard<'_, SuspendResumeManagerInner> {
415 self.inner.lock()
416 }
417
418 pub fn init(
420 self: &SuspendResumeManagerHandle,
421 system_task: &CurrentTask,
422 ) -> Result<(), anyhow::Error> {
423 let handoff = system_task
424 .kernel()
425 .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
426 .into_sync_proxy();
427 match handoff.take(zx::MonotonicInstant::INFINITE) {
428 Ok(parent_lease) => {
429 let parent_lease = parent_lease
430 .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
431 drop(parent_lease)
432 }
433 Err(e) => {
434 if e.is_closed() {
435 log_warn!(
436 "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
437 );
438 } else {
439 return Err(e).context("Handoff::Take");
440 }
441 }
442 }
443 Ok(())
444 }
445
446 pub fn activate_wakeup_source(&self, origin: WakeupSourceOrigin) -> bool {
447 let mut state = self.lock();
448 let did_activate = {
449 let entry = state.wakeup_sources.entry(origin).or_default();
450 let now = zx::MonotonicInstant::get();
451 entry.active_count += 1;
452 entry.event_count += 1;
453 entry.last_change = now;
454 if entry.active_since == zx::MonotonicInstant::ZERO {
455 entry.active_since = now;
456 true
457 } else {
458 false
459 }
460 };
461 if did_activate {
462 state.active_wakeup_source_count += 1;
463 state.signal_wake_events();
464 }
465 did_activate
466 }
467
468 pub fn deactivate_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
469 self.remove_wakeup_source(origin, false)
470 }
471
472 pub fn timeout_wakeup_source(&self, origin: &WakeupSourceOrigin) -> bool {
473 self.remove_wakeup_source(origin, true)
474 }
475
476 fn remove_wakeup_source(&self, origin: &WakeupSourceOrigin, timed_out: bool) -> bool {
477 let mut state = self.lock();
478 let removed = match state.wakeup_sources.get_mut(origin) {
479 Some(entry) if entry.active_since != zx::MonotonicInstant::ZERO => {
480 if timed_out {
481 entry.expire_count += 1;
482 }
483
484 let now = zx::MonotonicInstant::get();
485 let duration = now - entry.active_since;
486 entry.total_time += duration;
487 entry.max_time = std::cmp::max(duration, entry.max_time);
488 entry.last_change = now;
489 entry.active_since = zx::MonotonicInstant::ZERO;
490
491 true
492 }
493 _ => false,
494 };
495 if removed {
496 state.active_wakeup_source_count -= 1;
497 state.total_wakeup_source_event_count += 1;
498 state.signal_wake_events();
499 }
500 removed
501 }
502
503 pub fn add_message_counter(
504 &self,
505 name: &str,
506 counter: Option<zx::Counter>,
507 ) -> OwnedMessageCounterHandle {
508 let container_counter = OwnedMessageCounter::new(name, counter);
509 let mut message_counters = self.message_counters.lock();
510 message_counters.insert(WeakKey::from(&container_counter));
511 message_counters.retain(|c| c.0.upgrade().is_some());
512 container_counter
513 }
514
515 pub fn has_nonzero_message_counter(&self) -> bool {
516 self.message_counters.lock().iter().any(|c| {
517 let Some(c) = c.0.upgrade() else {
518 return false;
519 };
520 c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
521 })
522 }
523
524 pub fn duplicate_lock_event(&self) -> zx::EventPair {
527 let state = self.lock();
528 state
529 .active_lock_reader
530 .duplicate_handle(zx::Rights::SAME_RIGHTS)
531 .expect("Failed to duplicate handle")
532 }
533
534 pub fn suspend_stats(&self) -> SuspendStats {
536 self.lock().suspend_stats.clone()
537 }
538
539 pub fn total_wakeup_events(&self) -> u64 {
540 let state = self.lock();
541 state.total_wakeup_source_event_count + state.suspend_stats.success_count
542 }
543
544 pub fn sync_on_suspend_enabled(&self) -> bool {
548 self.lock().sync_on_suspend_enabled.clone()
549 }
550
551 pub fn set_sync_on_suspend(&self, enable: bool) {
554 self.lock().sync_on_suspend_enabled = enable;
555 }
556
557 pub fn suspend_states(&self) -> HashSet<SuspendState> {
559 HashSet::from([SuspendState::Idle])
561 }
562
563 pub fn suspend(
564 &self,
565 locked: &mut Locked<FileOpsCore>,
566 suspend_state: SuspendState,
567 ) -> Result<(), Errno> {
568 let suspend_start_time = zx::BootInstant::get();
569 let mut state = self.lock();
570 state.add_suspend_event(SuspendEvent::Attempt {
571 time: suspend_start_time,
572 state: suspend_state.to_string(),
573 });
574
575 if !state.can_suspend() {
577 self.report_failed_suspension(state, "kernel wake lock");
578 return error!(EINVAL);
579 }
580
581 let external_wake_source_abort = state.external_wake_sources.values().find_map(|source| {
583 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
584 Some(source.name.clone())
585 } else {
586 None
587 }
588 });
589
590 if let Some(name) = external_wake_source_abort {
591 self.report_failed_suspension(state, &format!("external wake source: {}", name));
592 return error!(EINVAL);
593 }
594
595 std::mem::drop(state);
599
600 let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
603
604 let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
605 .expect("Failed to connect to manager");
606 fuchsia_trace::duration!("power", "suspend_container:fidl");
607
608 let container_job = Some(
609 fuchsia_runtime::job_default()
610 .duplicate_handle(zx::Rights::SAME_RIGHTS)
611 .expect("Failed to dup handle"),
612 );
613 let wake_lock_event = Some(self.duplicate_lock_event());
614
615 log_info!("Requesting container suspension.");
616 match manager.suspend_container(
617 frunner::ManagerSuspendContainerRequest {
618 container_job,
619 wake_locks: wake_lock_event,
620 ..Default::default()
621 },
622 zx::Instant::INFINITE,
623 ) {
624 Ok(Ok(res)) => {
625 self.report_container_resumed(suspend_start_time, res);
626 }
627 e => {
628 let state = self.lock();
629 self.report_failed_suspension(state, &format!("runner error {:?}", e));
630 return error!(EINVAL);
631 }
632 }
633 Ok(())
634 }
635
636 fn report_container_resumed(
637 &self,
638 suspend_start_time: zx::BootInstant,
639 res: frunner::ManagerSuspendContainerResponse,
640 ) {
641 let wake_time = zx::BootInstant::get();
642 let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
645 log_info!("Resuming from container suspension: {:?}", resume_reason);
646 let mut state = self.lock();
647 state.update_suspend_stats(|suspend_stats| {
648 suspend_stats.success_count += 1;
649 suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
650 suspend_stats.last_time_in_sleep =
651 zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
652 suspend_stats.last_resume_reason = resume_reason.clone();
653 });
654 state.add_suspend_event(SuspendEvent::Resume {
655 time: wake_time,
656 reason: resume_reason.unwrap_or_default(),
657 });
658 fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
659 }
660
661 fn report_failed_suspension(
662 &self,
663 mut state: LockDepGuard<'_, SuspendResumeManagerInner>,
664 failure_reason: &str,
665 ) {
666 let wake_time = zx::BootInstant::get();
667 state.update_suspend_stats(|suspend_stats| {
668 suspend_stats.fail_count += 1;
669 suspend_stats.last_failed_errno = Some(errno!(EINVAL));
670 suspend_stats.last_resume_reason = None;
671 });
672
673 let mut wakeup_sources: Vec<String> = state
674 .wakeup_sources
675 .iter_mut()
676 .filter_map(|(origin, source)| {
677 if source.active_since > zx::MonotonicInstant::ZERO {
678 source.wakeup_count += 1;
679 Some(origin.to_string())
680 } else {
681 None
682 }
683 })
684 .collect();
685
686 for source in state.external_wake_sources.values() {
687 if source.handle.wait_one(source.signals, zx::MonotonicInstant::INFINITE_PAST).is_ok() {
688 wakeup_sources.push(source.name.clone());
689 }
690 }
691
692 let last_resume_reason = format!("Abort: {}", wakeup_sources.join(" "));
693 state.update_suspend_stats(|suspend_stats| {
694 suspend_stats.last_resume_reason = Some(last_resume_reason);
696 });
697
698 log_warn!(
700 "Suspend failed due to {:?}. Here are the active wakeup sources: {:?}",
701 failure_reason,
702 wakeup_sources,
703 );
704 state.add_suspend_event(SuspendEvent::Fail {
706 time: wake_time,
707 wakeup_sources: Some(wakeup_sources),
708 });
709 fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
710 }
711
712 pub fn acquire_ebpf_suspend_lock<'a, L>(
713 &'a self,
714 locked: &'a mut Locked<L>,
715 ) -> EbpfSuspendGuard<'a>
716 where
717 L: LockBefore<EbpfSuspendLock>,
718 {
719 self.ebpf_suspend_lock.read(locked)
720 }
721}
722
723pub trait OnWakeOps: Send + Sync {
725 fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
734}
735
736pub fn create_proxy_for_wake_events_counter_zero(
754 remote_channel: zx::Channel,
755 name: String,
756) -> (zx::Channel, zx::Counter) {
757 let (local_proxy, kernel_channel) = zx::Channel::create();
758 let counter = zx::Counter::create();
759
760 let local_counter =
761 counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
762
763 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
764 .expect("failed");
765 manager
766 .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
767 container_job: Some(
768 fuchsia_runtime::job_default()
769 .duplicate_handle(zx::Rights::SAME_RIGHTS)
770 .expect("Failed to dup handle"),
771 ),
772 container_channel: Some(kernel_channel),
773 remote_channel: Some(remote_channel),
774 counter: Some(counter),
775 name: Some(name),
776 ..Default::default()
777 })
778 .expect("Failed to create proxy");
779
780 (local_proxy, local_counter)
781}
782
783pub fn create_proxy_for_wake_events_counter(
802 remote_channel: zx::Channel,
803 name: String,
804) -> (zx::Channel, zx::Counter) {
805 let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
806
807 counter.add(1).expect("Failed to add to counter");
810
811 (proxy, counter)
812}
813
814pub fn mark_proxy_message_handled(counter: &zx::Counter) {
819 counter.add(-1).expect("Failed to decrement counter");
820}
821
822pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
824 counter.write(0).expect("Failed to decrement counter");
825}
826
827pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
831 let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
832 .expect("failed");
833 manager
834 .register_wake_watcher(
835 frunner::ManagerRegisterWakeWatcherRequest {
836 watcher: Some(watcher),
837 ..Default::default()
838 },
839 zx::Instant::INFINITE,
840 )
841 .expect("Failed to register wake watcher");
842}
843
844#[derive(Debug)]
853pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
854
855impl Drop for SharedMessageCounter {
856 fn drop(&mut self) {
857 if let Some(message_counter) = self.0.upgrade() {
858 message_counter.mark_handled();
859 }
860 }
861}
862
863pub struct OwnedMessageCounter {
868 name: String,
869 counter: Option<zx::Counter>,
870}
871pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
872
873impl Drop for OwnedMessageCounter {
874 fn drop(&mut self) {
879 self.counter.as_ref().map(mark_all_proxy_messages_handled);
880 }
881}
882
883impl OwnedMessageCounter {
884 pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
885 Arc::new(Self { name: name.to_string(), counter })
886 }
887
888 pub fn mark_handled(&self) {
893 self.counter.as_ref().map(mark_proxy_message_handled);
894 }
895
896 pub fn mark_pending(&self) {
900 self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
901 }
902
903 pub fn share(
907 self: &OwnedMessageCounterHandle,
908 new_pending_message: bool,
909 ) -> SharedMessageCounter {
910 if new_pending_message {
911 self.mark_pending();
912 }
913 SharedMessageCounter(Arc::downgrade(self))
914 }
915}
916
917impl fmt::Display for OwnedMessageCounter {
918 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919 write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
920 }
921}
922
923pub struct ContainerWakingProxy<P: Proxy> {
928 counter: OwnedMessageCounterHandle,
929 proxy: P,
930}
931
932impl<P: Proxy> ContainerWakingProxy<P> {
933 pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
934 Self { counter, proxy }
935 }
936
937 pub fn call<T, F, R>(&self, future: F) -> R
941 where
942 F: FnOnce(&P) -> R,
943 R: Future<Output = T>,
944 {
945 let f = future(&self.proxy);
953 self.counter.mark_handled();
954 f
955 }
956}
957
958pub struct ContainerWakingStream<S: FusedStream + Unpin> {
963 counter: OwnedMessageCounterHandle,
964 stream: S,
965}
966
967impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
968 pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
969 Self { counter, stream }
970 }
971
972 pub fn next(&mut self) -> Next<'_, S> {
976 let is_terminated = self.stream.is_terminated();
978 let next = self.stream.next();
979 if !is_terminated {
980 self.counter.mark_handled();
981 }
982 next
983 }
984}
985
986#[cfg(test)]
987mod test {
988 use super::*;
989 use diagnostics_assertions::assert_data_tree;
990 use fidl::endpoints::create_proxy_and_stream;
991 use fidl_test_placeholders::{EchoMarker, EchoRequest};
992 use fuchsia_async as fasync;
993 use fuchsia_inspect as inspect;
994 use futures::StreamExt;
995
996 #[::fuchsia::test]
997 fn test_counter_zero_initialization() {
998 let (_endpoint, endpoint) = zx::Channel::create();
999 let (_channel, counter) =
1000 super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
1001 assert_eq!(counter.read(), Ok(0));
1002 }
1003
1004 #[::fuchsia::test]
1005 fn test_counter_initialization() {
1006 let (_endpoint, endpoint) = zx::Channel::create();
1007 let (_channel, counter) =
1008 super::create_proxy_for_wake_events_counter(endpoint, "test".into());
1009 assert_eq!(counter.read(), Ok(1));
1010 }
1011
1012 #[::fuchsia::test]
1013 async fn test_container_waking_proxy() {
1014 let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
1015 let server_task = fasync::Task::spawn(async move {
1016 let request = stream.next().await.unwrap().unwrap();
1017 match request {
1018 EchoRequest::EchoString { value, responder } => {
1019 responder.send(value.as_deref()).unwrap();
1020 }
1021 }
1022 });
1023
1024 let counter = zx::Counter::create();
1025 counter.add(5).unwrap();
1026 assert_eq!(counter.read(), Ok(5));
1027
1028 let waking_proxy = ContainerWakingProxy {
1029 counter: OwnedMessageCounter::new(
1030 "test_proxy",
1031 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1032 ),
1033 proxy,
1034 };
1035
1036 let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
1037
1038 assert_eq!(counter.read(), Ok(4));
1040
1041 let response = response_future.await.unwrap();
1042 assert_eq!(response.as_deref(), Some("hello"));
1043
1044 server_task.await;
1045
1046 assert_eq!(counter.read(), Ok(4));
1047 drop(waking_proxy);
1048 assert_eq!(counter.read(), Ok(0));
1049 }
1050
1051 #[::fuchsia::test]
1052 async fn test_container_waking_stream() {
1053 let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
1054 let client_task = fasync::Task::spawn(async move {
1055 let response = proxy.echo_string(Some("hello")).await.unwrap();
1056 assert_eq!(response.as_deref(), Some("hello"));
1057 });
1058
1059 let counter = zx::Counter::create();
1060 counter.add(5).unwrap();
1061 assert_eq!(counter.read(), Ok(5));
1062
1063 let mut waking_stream = ContainerWakingStream {
1064 counter: OwnedMessageCounter::new(
1065 "test_stream",
1066 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1067 ),
1068 stream,
1069 };
1070
1071 let request_future = waking_stream.next();
1072
1073 assert_eq!(counter.read(), Ok(4));
1075
1076 let request = request_future.await.unwrap().unwrap();
1077 match request {
1078 EchoRequest::EchoString { value, responder } => {
1079 assert_eq!(value.as_deref(), Some("hello"));
1080 responder.send(value.as_deref()).unwrap();
1081 }
1082 }
1083
1084 client_task.await;
1085
1086 assert_eq!(counter.read(), Ok(4));
1087 drop(waking_stream);
1088 assert_eq!(counter.read(), Ok(0));
1089 }
1090
1091 #[::fuchsia::test]
1092 async fn test_message_counters_inspect() {
1093 let power_manager = SuspendResumeManager::default();
1094 let inspector = inspect::component::inspector();
1095
1096 let zx_counter = zx::Counter::create();
1097 let counter_handle = power_manager.add_message_counter(
1098 "test_counter",
1099 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1100 );
1101
1102 zx_counter.add(1).unwrap();
1103
1104 assert_data_tree!(inspector, root: contains {
1105 message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1106 });
1107
1108 zx_counter.add(1).unwrap();
1109 assert_data_tree!(inspector, root: contains {
1110 message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1111 });
1112
1113 drop(counter_handle);
1114 assert_data_tree!(inspector, root: contains {
1115 message_counters: Vec::<String>::new(),
1116 });
1117 }
1118
1119 #[::fuchsia::test]
1120 fn test_shared_message_counter() {
1121 let zx_counter = zx::Counter::create();
1123 let owned_counter = OwnedMessageCounter::new(
1124 "test_shared_counter",
1125 Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1126 );
1127 zx_counter.add(5).unwrap();
1128 assert_eq!(zx_counter.read(), Ok(5));
1129
1130 let shared_counter = owned_counter.share(false);
1132 assert_eq!(zx_counter.read(), Ok(5));
1133
1134 drop(shared_counter);
1136 assert_eq!(zx_counter.read(), Ok(4));
1137
1138 let shared_counter_2 = owned_counter.share(true);
1140 assert_eq!(zx_counter.read(), Ok(5));
1141
1142 drop(shared_counter_2);
1144 assert_eq!(zx_counter.read(), Ok(4));
1145
1146 let shared_counter_3 = owned_counter.share(false);
1148 assert_eq!(zx_counter.read(), Ok(4));
1149
1150 drop(owned_counter);
1152 assert_eq!(zx_counter.read(), Ok(0));
1153
1154 drop(shared_counter_3);
1156 assert_eq!(zx_counter.read(), Ok(0));
1157 }
1158
1159 #[::fuchsia::test]
1160 async fn test_container_waking_event_termination() {
1161 let stream = futures::stream::iter(vec![0]).fuse();
1162 let counter = zx::Counter::create();
1163 counter.add(2).unwrap();
1164 assert_eq!(counter.read(), Ok(2));
1165 let mut waking_stream = ContainerWakingStream {
1166 counter: OwnedMessageCounter::new(
1167 "test_stream",
1168 Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1169 ),
1170 stream,
1171 };
1172
1173 assert_eq!(waking_stream.next().await, Some(0));
1174 assert_eq!(counter.read(), Ok(1));
1175
1176 assert_eq!(waking_stream.next().await, None);
1177 assert_eq!(waking_stream.next().await, None);
1178 assert_eq!(counter.read(), Ok(0));
1180 }
1181
1182 #[::fuchsia::test]
1183 fn test_external_wake_source_aborts_suspend() {
1184 let manager = SuspendResumeManager::default();
1185 let event = zx::Event::create();
1186 let signals = zx::Signals::USER_0;
1187
1188 let res = manager.add_external_wake_source(
1193 event.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap().into_handle(),
1194 signals,
1195 "test_external".to_string(),
1196 );
1197
1198 if res.is_err() {
1199 println!(
1200 "Skipping test_external_wake_source_aborts_suspend because runner connection failed: {:?}",
1201 res
1202 );
1203 return;
1204 }
1205
1206 event.signal(zx::Signals::empty(), signals).unwrap();
1208
1209 let state = manager.lock();
1210 assert!(state.external_wake_sources.contains_key(&event.koid().unwrap()));
1211 }
1212}