starnix_core/power/
manager.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7use crate::vfs::EpollKey;
8
9use std::cmp::min;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::sync::{Arc, Weak};
13
14use anyhow::{Context, anyhow};
15use fidl::endpoints::Proxy;
16use fuchsia_component::client::connect_to_protocol_sync;
17use fuchsia_inspect::ArrayProperty;
18use futures::stream::{FusedStream, Next};
19use futures::{FutureExt, StreamExt};
20use itertools::Itertools;
21use starnix_logging::{log_info, log_warn};
22use starnix_sync::{
23    EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
24    RwLockReadGuard,
25};
26use starnix_task_command::TaskCommand;
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::{HandleBased, Peered};
33use {
34    fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
35    fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
36};
37
38/// Manager for suspend and resume.
39pub struct SuspendResumeManager {
40    // The mutable state of [SuspendResumeManager].
41    inner: Arc<Mutex<SuspendResumeManagerInner>>,
42
43    /// The currently registered message counters in the system whose values are exposed to inspect
44    /// via a lazy node.
45    message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
46
47    // The lock used to to avoid suspension while holding eBPF locks.
48    ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
49}
50
51/// Manager for suspend and resume.
52pub struct SuspendResumeManagerInner {
53    /// The suspend counters and gauges.
54    suspend_stats: SuspendStats,
55    sync_on_suspend_enabled: bool,
56
57    suspend_events: VecDeque<SuspendEvent>,
58
59    /// The currently active wake locks in the system. If non-empty, this prevents
60    /// the container from suspending.
61    active_locks: HashMap<String, LockSource>,
62    inactive_locks: HashSet<String>,
63
64    /// The currently active EPOLLWAKEUPs in the system. If non-empty, this prevents
65    /// the container from suspending.
66    active_epolls: HashMap<EpollKey, TaskCommand>,
67    inactive_epolls: HashSet<TaskCommand>,
68
69    /// The event pair that is passed to the Starnix runner so it can observe whether
70    /// or not any wake locks are active before completing a suspend operation.
71    active_lock_reader: zx::EventPair,
72
73    /// The event pair that is used by the Starnix kernel to signal when there are
74    /// active wake locks in the container. Note that the peer of the writer is the
75    /// object that is signaled.
76    active_lock_writer: zx::EventPair,
77}
78
79pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
80
81/// The source of a wake lock.
82pub enum LockSource {
83    WakeLockFile,
84    ContainerPowerController,
85}
86
87#[derive(Clone, Debug)]
88pub enum SuspendEvent {
89    Attempt {
90        time: zx::BootInstant,
91        state: String,
92    },
93    Resume {
94        time: zx::BootInstant,
95        reason: String,
96    },
97    Fail {
98        time: zx::BootInstant,
99        wake_locks: Option<Vec<String>>,
100        epolls: Option<Vec<TaskCommand>>,
101    },
102}
103
104/// The inspect node ring buffer will keep at most this many entries.
105const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
106
107/// The inspect wakelock nodes will keep at most this many entries.
108const INSPECT_MAX_WAKE_LOCK_NAMES: usize = 64;
109const INSPECT_MAX_EPOLLS: usize = 64;
110
111impl Default for SuspendResumeManagerInner {
112    fn default() -> Self {
113        let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
114        Self {
115            suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
116            suspend_stats: Default::default(),
117            sync_on_suspend_enabled: Default::default(),
118            active_locks: Default::default(),
119            inactive_locks: Default::default(),
120            active_epolls: Default::default(),
121            inactive_epolls: Default::default(),
122            active_lock_reader,
123            active_lock_writer,
124        }
125    }
126}
127
128impl SuspendResumeManagerInner {
129    pub fn active_wake_locks(&self) -> Vec<String> {
130        Vec::from_iter(self.active_locks.keys().cloned())
131    }
132
133    pub fn inactive_wake_locks(&self) -> Vec<String> {
134        Vec::from_iter(self.inactive_locks.clone())
135    }
136
137    fn active_epolls(&self) -> Vec<TaskCommand> {
138        Vec::from_iter(self.active_epolls.values().cloned())
139    }
140
141    fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
142    where
143        UpdateFn: FnOnce(&mut SuspendStats),
144    {
145        update(&mut self.suspend_stats);
146    }
147
148    /// Signals whether or not there are currently any active wake locks in the kernel.
149    fn signal_wake_events(&mut self) {
150        let (clear_mask, set_mask) =
151            if self.active_locks.is_empty() && self.active_epolls.is_empty() {
152                (zx::Signals::EVENT_SIGNALED, zx::Signals::empty())
153            } else {
154                (zx::Signals::empty(), zx::Signals::EVENT_SIGNALED)
155            };
156        self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
157    }
158
159    fn record_active_locks(&self, node: &inspect::Node) {
160        let active_locks = &self.active_locks;
161        let len = min(active_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
162        let active_wake_locks = node.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, len);
163        for (i, name) in active_locks.keys().sorted().take(len).enumerate() {
164            if let Some(src) = active_locks.get(name) {
165                active_wake_locks.set(i, format!("{} (source {})", name, src));
166            }
167        }
168        node.record(active_wake_locks);
169    }
170
171    fn record_inactive_locks(&self, node: &inspect::Node) {
172        let inactive_locks = &self.inactive_locks;
173        let len = min(inactive_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
174        let inactive_wake_locks = node.create_string_array(fobs::INACTIVE_WAKE_LOCK_NAMES, len);
175        for (i, name) in inactive_locks.iter().sorted().take(len).enumerate() {
176            inactive_wake_locks.set(i, name);
177        }
178        node.record(inactive_wake_locks);
179    }
180
181    fn record_active_epolls(&self, node: &inspect::Node) {
182        let active_epolls = &self.active_epolls;
183        let len = min(active_epolls.len(), INSPECT_MAX_EPOLLS);
184        let active_epolls_node = node.create_string_array(fobs::ACTIVE_EPOLLS, len);
185        for (i, key) in active_epolls.keys().sorted().rev().take(len).enumerate() {
186            if let Some(name) = active_epolls.get(key) {
187                active_epolls_node.set(i, name.to_string());
188            }
189        }
190        node.record(active_epolls_node);
191    }
192
193    fn record_inactive_epolls(&self, node: &inspect::Node) {
194        let inactive_epolls = &self.inactive_epolls;
195        let len = min(inactive_epolls.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
196        let inactive_epolls_node = node.create_string_array(fobs::INACTIVE_EPOLLS, len);
197        for (i, name) in inactive_epolls.iter().sorted().take(len).enumerate() {
198            inactive_epolls_node.set(i, name.to_string());
199        }
200        node.record(inactive_epolls_node);
201    }
202
203    fn add_suspend_event(&mut self, event: SuspendEvent) {
204        if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
205            self.suspend_events.pop_front();
206        }
207        self.suspend_events.push_back(event);
208    }
209
210    fn record_suspend_events(&self, node: &inspect::Node) {
211        let events_node = node.create_child("suspend_events");
212        for (i, event) in self.suspend_events.iter().enumerate() {
213            let child = events_node.create_child(i.to_string());
214            match event {
215                SuspendEvent::Attempt { time, state } => {
216                    child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
217                    child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
218                }
219                SuspendEvent::Resume { time, reason } => {
220                    child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
221                    child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
222                }
223                SuspendEvent::Fail { time, wake_locks, epolls } => {
224                    child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
225                    if let Some(names) = wake_locks {
226                        let names_array =
227                            child.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, names.len());
228                        for (i, name) in names.iter().enumerate() {
229                            names_array.set(i, name);
230                        }
231                        child.record(names_array);
232                    }
233                    if let Some(epolls) = epolls {
234                        let epolls_array =
235                            child.create_string_array(fobs::ACTIVE_EPOLLS, epolls.len());
236                        for (i, command) in epolls.iter().enumerate() {
237                            epolls_array.set(i, command.to_string());
238                        }
239                        child.record(epolls_array);
240                    }
241                }
242            }
243            events_node.record(child);
244        }
245        node.record(events_node);
246    }
247}
248
249pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
250
251impl Default for SuspendResumeManager {
252    fn default() -> Self {
253        let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
254            Default::default();
255        let message_counters_clone = message_counters.clone();
256        let root = inspect::component::inspector().root();
257        root.record_lazy_values("message_counters", move || {
258            let message_counters_clone = message_counters_clone.clone();
259            async move {
260                let inspector = fuchsia_inspect::Inspector::default();
261                let root = inspector.root();
262                let mut message_counters = message_counters_clone.lock();
263                message_counters.retain(|c| c.0.upgrade().is_some());
264                let message_counters_inspect =
265                    root.create_string_array("message_counters", message_counters.len());
266                for (i, c) in message_counters.iter().enumerate() {
267                    let counter = c.0.upgrade().expect("lost counter should be retained");
268                    message_counters_inspect.set(i, counter.to_string());
269                }
270                root.record(message_counters_inspect);
271                Ok(inspector)
272            }
273            .boxed()
274        });
275        let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
276        let inner_clone = inner.clone();
277        root.record_lazy_child("wake_locks", move || {
278            let inner = inner_clone.clone();
279            async move {
280                let inspector = fuchsia_inspect::Inspector::default();
281                let root = inspector.root();
282                let state = inner.lock();
283
284                state.record_active_locks(root);
285                state.record_inactive_locks(root);
286                state.record_active_epolls(root);
287                state.record_inactive_epolls(root);
288                state.record_suspend_events(root);
289
290                Ok(inspector)
291            }
292            .boxed()
293        });
294        Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
295    }
296}
297
298impl SuspendResumeManager {
299    /// Locks and returns the inner state of the manager.
300    pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
301        self.inner.lock()
302    }
303
304    /// Power on the PowerMode element and start listening to the suspend stats updates.
305    pub fn init(
306        self: &SuspendResumeManagerHandle,
307        system_task: &CurrentTask,
308    ) -> Result<(), anyhow::Error> {
309        let handoff = system_task
310            .kernel()
311            .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
312            .into_sync_proxy();
313        match handoff.take(zx::MonotonicInstant::INFINITE) {
314            Ok(parent_lease) => {
315                let parent_lease = parent_lease
316                    .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
317                drop(parent_lease)
318            }
319            Err(e) => {
320                if e.is_closed() {
321                    log_warn!(
322                        "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
323                    );
324                } else {
325                    return Err(e).context("Handoff::Take");
326                }
327            }
328        }
329        Ok(())
330    }
331
332    /// Adds a wake lock `name` to the active wake locks.
333    pub fn add_lock(&self, name: &str, src: LockSource) -> bool {
334        let mut state = self.lock();
335        let res = state.active_locks.insert(String::from(name), src);
336        state.signal_wake_events();
337        res.is_none()
338    }
339
340    /// Removes a wake lock `name` from the active wake locks.
341    pub fn remove_lock(&self, name: &str) -> bool {
342        let mut state = self.lock();
343        let res = state.active_locks.remove(name);
344        if res.is_none() {
345            return false;
346        }
347
348        state.inactive_locks.insert(String::from(name));
349        state.signal_wake_events();
350        true
351    }
352
353    /// Adds a wake lock `key` to the active epoll wake locks.
354    pub fn add_epoll(&self, current_task: &CurrentTask, key: EpollKey) {
355        let mut state = self.lock();
356        state.active_epolls.insert(key, current_task.command());
357        state.signal_wake_events();
358    }
359
360    /// Removes a wake lock `key` from the active epoll wake locks.
361    pub fn remove_epoll(&self, key: EpollKey) {
362        let mut state = self.lock();
363        let epoll = state.active_epolls.remove(&key);
364        if let Some(epoll) = epoll {
365            state.inactive_epolls.insert(epoll);
366        }
367        state.signal_wake_events();
368    }
369
370    pub fn add_message_counter(
371        &self,
372        name: &str,
373        counter: Option<zx::Counter>,
374    ) -> OwnedMessageCounterHandle {
375        let container_counter = OwnedMessageCounter::new(name, counter);
376        let mut message_counters = self.message_counters.lock();
377        message_counters.insert(WeakKey::from(&container_counter));
378        message_counters.retain(|c| c.0.upgrade().is_some());
379        container_counter
380    }
381
382    pub fn has_nonzero_message_counter(&self) -> bool {
383        self.message_counters.lock().iter().any(|c| {
384            let Some(c) = c.0.upgrade() else {
385                return false;
386            };
387            c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
388        })
389    }
390
391    /// Returns a duplicate handle to the `EventPair` that is signaled when wake
392    /// locks are active.
393    pub fn duplicate_lock_event(&self) -> zx::EventPair {
394        let state = self.lock();
395        state
396            .active_lock_reader
397            .duplicate_handle(zx::Rights::SAME_RIGHTS)
398            .expect("Failed to duplicate handle")
399    }
400
401    /// Gets the suspend statistics.
402    pub fn suspend_stats(&self) -> SuspendStats {
403        self.lock().suspend_stats.clone()
404    }
405
406    /// Get the contents of the power "sync_on_suspend" file in the power
407    /// filesystem.  True will cause `1` to be reported, and false will cause
408    /// `0` to be reported.
409    pub fn sync_on_suspend_enabled(&self) -> bool {
410        self.lock().sync_on_suspend_enabled.clone()
411    }
412
413    /// Get the contents of the power "sync_on_suspend" file in the power
414    /// filesystem.  See also [sync_on_suspend_enabled].
415    pub fn set_sync_on_suspend(&self, enable: bool) {
416        self.lock().sync_on_suspend_enabled = enable;
417    }
418
419    /// Returns the supported suspend states.
420    pub fn suspend_states(&self) -> HashSet<SuspendState> {
421        // TODO(b/326470421): Remove the hardcoded supported state.
422        HashSet::from([SuspendState::Idle])
423    }
424
425    pub fn suspend(
426        &self,
427        locked: &mut Locked<FileOpsCore>,
428        state: SuspendState,
429    ) -> Result<(), Errno> {
430        let suspend_start_time = zx::BootInstant::get();
431
432        self.lock().add_suspend_event(SuspendEvent::Attempt {
433            time: suspend_start_time,
434            state: state.to_string(),
435        });
436
437        let ebpf_lock = self.ebpf_suspend_lock.write(locked);
438
439        let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
440            .expect("Failed to connect to manager");
441        fuchsia_trace::duration!("power", "suspend_container:fidl");
442        log_info!("Asking runner to suspend container.");
443        match manager.suspend_container(
444            frunner::ManagerSuspendContainerRequest {
445                container_job: Some(
446                    fuchsia_runtime::job_default()
447                        .duplicate(zx::Rights::SAME_RIGHTS)
448                        .expect("Failed to dup handle"),
449                ),
450                wake_locks: Some(self.duplicate_lock_event()),
451                ..Default::default()
452            },
453            zx::Instant::INFINITE,
454        ) {
455            Ok(Ok(res)) => {
456                log_info!("Resuming from container suspension.");
457                let wake_time = zx::BootInstant::get();
458                let resume_reason = res.resume_reason;
459                let mut state = self.lock();
460                state.update_suspend_stats(|suspend_stats| {
461                    suspend_stats.success_count += 1;
462                    suspend_stats.last_time_in_suspend_operations =
463                        (wake_time - suspend_start_time).into();
464                    suspend_stats.last_time_in_sleep =
465                        zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
466                    // The "0" here is to mimic the expected power management success string,
467                    // while we don't have IRQ numbers to report.
468                    suspend_stats.last_resume_reason =
469                        resume_reason.clone().map(|s| format!("0 {}", s));
470                });
471                state.add_suspend_event(SuspendEvent::Resume {
472                    time: wake_time,
473                    reason: resume_reason.unwrap_or_default(),
474                });
475                fuchsia_trace::instant!(
476                    "power",
477                    "suspend_container:done",
478                    fuchsia_trace::Scope::Process
479                );
480            }
481            e => {
482                let wake_time = zx::BootInstant::get();
483                let mut state = self.lock();
484                state.update_suspend_stats(|suspend_stats| {
485                    suspend_stats.fail_count += 1;
486                    suspend_stats.last_failed_errno = Some(errno!(EINVAL));
487                    suspend_stats.last_resume_reason = None;
488                });
489
490                let (wake_lock_names, epoll_names) = match e {
491                    Ok(Err(frunner::SuspendError::WakeLocksExist)) => {
492                        let wake_lock_names = state.active_wake_locks();
493                        let epoll_names = state.active_epolls();
494                        let last_resume_reason = format!(
495                            "Abort: {}",
496                            wake_lock_names.join(" ") + &epoll_names.iter().join(" ")
497                        );
498                        state.update_suspend_stats(|suspend_stats| {
499                            // Power analysis tools require `Abort: ` in the case of failed suspends
500                            suspend_stats.last_resume_reason = Some(last_resume_reason);
501                        });
502                        (Some(wake_lock_names), Some(epoll_names))
503                    }
504                    _ => (None, None),
505                };
506
507                log_warn!(e:?; "Container suspension failed. wake locks: {:?}, epolls: {:?}", wake_lock_names, epoll_names);
508                state.add_suspend_event(SuspendEvent::Fail {
509                    time: wake_time,
510                    wake_locks: wake_lock_names,
511                    epolls: epoll_names,
512                });
513                fuchsia_trace::instant!(
514                    "power",
515                    "suspend_container:error",
516                    fuchsia_trace::Scope::Process
517                );
518                return error!(EINVAL);
519            }
520        }
521
522        std::mem::drop(ebpf_lock);
523
524        Ok(())
525    }
526
527    pub fn acquire_ebpf_suspend_lock<'a, L>(
528        &'a self,
529        locked: &'a mut Locked<L>,
530    ) -> EbpfSuspendGuard<'a>
531    where
532        L: LockBefore<EbpfSuspendLock>,
533    {
534        self.ebpf_suspend_lock.read(locked)
535    }
536}
537
538impl fmt::Display for LockSource {
539    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540        match self {
541            LockSource::WakeLockFile => write!(f, "wake lock file"),
542            LockSource::ContainerPowerController => write!(f, "container power controller"),
543        }
544    }
545}
546
547pub trait OnWakeOps: Send + Sync {
548    fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
549}
550
551/// Creates a proxy between `remote_channel` and the returned `zx::Channel`.
552///
553/// The message counter's initial value will be set to 0.
554///
555/// The returned counter will be incremented each time there is an incoming message on the proxied
556/// channel. The starnix_kernel is expected to decrement the counter when that incoming message is
557/// handled.
558///
559/// Note that "message" in this context means channel message. This can be either a FIDL event, or
560/// a response to a FIDL message from the platform.
561///
562/// For example, the starnix_kernel may issue a hanging get to retrieve input events. When that
563/// hanging get returns, the counter will be incremented by 1. When the next hanging get has been
564/// scheduled, the input subsystem decrements the counter by 1.
565///
566/// The proxying is done by the Starnix runner, and allows messages on the channel to wake
567/// the container.
568pub fn create_proxy_for_wake_events_counter_zero(
569    remote_channel: zx::Channel,
570    name: String,
571) -> (zx::Channel, zx::Counter) {
572    let (local_proxy, kernel_channel) = zx::Channel::create();
573    let counter = zx::Counter::create();
574
575    let local_counter =
576        counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
577
578    let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
579        .expect("failed");
580    manager
581        .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
582            container_job: Some(
583                fuchsia_runtime::job_default()
584                    .duplicate(zx::Rights::SAME_RIGHTS)
585                    .expect("Failed to dup handle"),
586            ),
587            container_channel: Some(kernel_channel),
588            remote_channel: Some(remote_channel),
589            counter: Some(counter),
590            name: Some(name),
591            ..Default::default()
592        })
593        .expect("Failed to create proxy");
594
595    (local_proxy, local_counter)
596}
597
598/// Creates a proxy between `remote_channel` and the returned `zx::Channel`.
599///
600/// The message counter's initial value will be set to 1, which will prevent the container from
601/// suspending until the caller decrements the counter.
602///
603/// The returned counter will be incremented each time there is an incoming message on the proxied
604/// channel. The starnix_kernel is expected to decrement the counter when that incoming message is
605/// handled.
606///
607/// Note that "message" in this context means channel message. This can be either a FIDL event, or
608/// a response to a FIDL message from the platform.
609///
610/// For example, the starnix_kernel may issue a hanging get to retrieve input events. When that
611/// hanging get returns, the counter will be incremented by 1. When the next hanging get has been
612/// scheduled, the input subsystem decrements the counter by 1.
613///
614/// The proxying is done by the Starnix runner, and allows messages on the channel to wake
615/// the container.
616pub fn create_proxy_for_wake_events_counter(
617    remote_channel: zx::Channel,
618    name: String,
619) -> (zx::Channel, zx::Counter) {
620    let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
621
622    // Increment the counter by one so that the initial incoming message to the container will
623    // set the count to 0, instead of -1.
624    counter.add(1).expect("Failed to add to counter");
625
626    (proxy, counter)
627}
628
629/// Marks a message handled by decrementing `counter`.
630///
631/// This should be called when a proxied channel message has been handled, and the caller would
632/// be ok letting the container suspend.
633pub fn mark_proxy_message_handled(counter: &zx::Counter) {
634    counter.add(-1).expect("Failed to decrement counter");
635}
636
637/// Marks all messages tracked by `counter` as handled.
638pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
639    counter.write(0).expect("Failed to decrement counter");
640}
641
642/// Creates a watcher between clients and the Starnix runner.
643///
644/// Changes in the power state of the container are relayed by the event pair.
645pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
646    let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
647        .expect("failed");
648    manager
649        .register_wake_watcher(
650            frunner::ManagerRegisterWakeWatcherRequest {
651                watcher: Some(watcher),
652                ..Default::default()
653            },
654            zx::Instant::INFINITE,
655        )
656        .expect("Failed to register wake watcher");
657}
658
659/// Wrapper around a Weak `OwnedMessageCounter` that can be passed around to keep the container
660/// awake.
661///
662/// Each live `SharedMessageCounter` is responsible for a pending message while it in scope,
663/// and removes it from the counter when it goes out of scope.  Processes that need to cooperate
664/// can pass a `SharedMessageCounter` to each other to ensure that once the work is done, the lock
665/// goes out of scope as well. This allows for precise accounting of remaining work, and should
666/// give us control over container suspension which is guarded by the compiler, not conventions.
667#[derive(Debug)]
668pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
669
670impl Drop for SharedMessageCounter {
671    fn drop(&mut self) {
672        if let Some(message_counter) = self.0.upgrade() {
673            message_counter.mark_handled();
674        }
675    }
676}
677
678/// Owns a `zx::Counter` to track pending messages that prevent the container from suspending.
679///
680/// This struct ensures that the counter is reset to 0 when the last strong reference is dropped,
681/// effectively releasing any wake lock held by this counter.
682pub struct OwnedMessageCounter {
683    name: String,
684    counter: Option<zx::Counter>,
685}
686pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
687
688impl Drop for OwnedMessageCounter {
689    /// Resets the underlying `zx::Counter` to 0 when the `OwnedMessageCounter` is dropped.
690    ///
691    /// This ensures that all pending messages are marked as handled, allowing the system to suspend
692    /// if no other wake locks are held.
693    fn drop(&mut self) {
694        self.counter.as_ref().map(mark_all_proxy_messages_handled);
695    }
696}
697
698impl OwnedMessageCounter {
699    pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
700        Arc::new(Self { name: name.to_string(), counter })
701    }
702
703    /// Decrements the counter, signaling that a pending message or operation has been handled.
704    ///
705    /// This should be called when the work associated with a previous `mark_pending` call is
706    /// complete.
707    pub fn mark_handled(&self) {
708        self.counter.as_ref().map(mark_proxy_message_handled);
709    }
710
711    /// Increments the counter, signaling that a new message or operation is pending.
712    ///
713    /// This prevents the system from suspending until a corresponding `mark_handled` call is made.
714    pub fn mark_pending(&self) {
715        self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
716    }
717
718    /// Creates a `SharedMessageCounter` from this `OwnedMessageCounter`.
719    ///
720    /// `new_pending_message` - if a new pending message should be added
721    pub fn share(
722        self: &OwnedMessageCounterHandle,
723        new_pending_message: bool,
724    ) -> SharedMessageCounter {
725        if new_pending_message {
726            self.mark_pending();
727        }
728        SharedMessageCounter(Arc::downgrade(self))
729    }
730}
731
732impl fmt::Display for OwnedMessageCounter {
733    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
734        write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
735    }
736}
737
738/// A proxy wrapper that manages a `zx::Counter` to allow the container to suspend
739/// after events are being processed.
740///
741/// When the proxy is dropped, the counter is reset to 0 to release the wake-lock.
742pub struct ContainerWakingProxy<P: Proxy> {
743    counter: OwnedMessageCounterHandle,
744    proxy: P,
745}
746
747impl<P: Proxy> ContainerWakingProxy<P> {
748    pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
749        Self { counter, proxy }
750    }
751
752    /// Create a `Future` call on the proxy.
753    ///
754    /// The counter will be decremented as message handled after the future is created.
755    pub fn call<T, F, R>(&self, future: F) -> R
756    where
757        F: FnOnce(&P) -> R,
758        R: Future<Output = T>,
759    {
760        // The sequence for handling events MUST be:
761        //
762        // 1. create future
763        // 2. decrease counter
764        // 3. await future
765        //
766        // for allowing suspend - wake.
767        let f = future(&self.proxy);
768        self.counter.mark_handled();
769        f
770    }
771}
772
773/// A stream wrapper that manages a `zx::Counter` to allow the container to suspend
774/// after events are being processed.
775///
776/// When the stream is dropped, the counter is reset to 0 to release the wake-lock.
777pub struct ContainerWakingStream<S: FusedStream + Unpin> {
778    counter: OwnedMessageCounterHandle,
779    stream: S,
780}
781
782impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
783    pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
784        Self { counter, stream }
785    }
786
787    /// Create a `Next` call on the stream.poll_next().
788    ///
789    /// The counter will be decremented as message handled after the future is created.
790    pub fn next(&mut self) -> Next<'_, S> {
791        // See `ContainerWakingProxy::call` for sequence of handling events.
792        let is_terminated = self.stream.is_terminated();
793        let next = self.stream.next();
794        if !is_terminated {
795            self.counter.mark_handled();
796        }
797        next
798    }
799}
800
801#[cfg(test)]
802mod test {
803    use super::*;
804    use diagnostics_assertions::assert_data_tree;
805    use fidl::endpoints::create_proxy_and_stream;
806    use fidl_test_placeholders::{EchoMarker, EchoRequest};
807    use futures::StreamExt;
808    use zx::{self, HandleBased};
809    use {fuchsia_async as fasync, fuchsia_inspect as inspect};
810
811    #[::fuchsia::test]
812    fn test_counter_zero_initialization() {
813        let (_endpoint, endpoint) = zx::Channel::create();
814        let (_channel, counter) =
815            super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
816        assert_eq!(counter.read(), Ok(0));
817    }
818
819    #[::fuchsia::test]
820    fn test_counter_initialization() {
821        let (_endpoint, endpoint) = zx::Channel::create();
822        let (_channel, counter) =
823            super::create_proxy_for_wake_events_counter(endpoint, "test".into());
824        assert_eq!(counter.read(), Ok(1));
825    }
826
827    #[::fuchsia::test]
828    async fn test_container_waking_proxy() {
829        let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
830        let server_task = fasync::Task::spawn(async move {
831            let request = stream.next().await.unwrap().unwrap();
832            match request {
833                EchoRequest::EchoString { value, responder } => {
834                    responder.send(value.as_deref()).unwrap();
835                }
836            }
837        });
838
839        let counter = zx::Counter::create();
840        counter.add(5).unwrap();
841        assert_eq!(counter.read(), Ok(5));
842
843        let waking_proxy = ContainerWakingProxy {
844            counter: OwnedMessageCounter::new(
845                "test_proxy",
846                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
847            ),
848            proxy,
849        };
850
851        let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
852
853        // The `call` method decrements the counter.
854        assert_eq!(counter.read(), Ok(4));
855
856        let response = response_future.await.unwrap();
857        assert_eq!(response.as_deref(), Some("hello"));
858
859        server_task.await;
860
861        assert_eq!(counter.read(), Ok(4));
862        drop(waking_proxy);
863        assert_eq!(counter.read(), Ok(0));
864    }
865
866    #[::fuchsia::test]
867    async fn test_container_waking_stream() {
868        let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
869        let client_task = fasync::Task::spawn(async move {
870            let response = proxy.echo_string(Some("hello")).await.unwrap();
871            assert_eq!(response.as_deref(), Some("hello"));
872        });
873
874        let counter = zx::Counter::create();
875        counter.add(5).unwrap();
876        assert_eq!(counter.read(), Ok(5));
877
878        let mut waking_stream = ContainerWakingStream {
879            counter: OwnedMessageCounter::new(
880                "test_stream",
881                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
882            ),
883            stream,
884        };
885
886        let request_future = waking_stream.next();
887
888        // The `next` method decrements the counter.
889        assert_eq!(counter.read(), Ok(4));
890
891        let request = request_future.await.unwrap().unwrap();
892        match request {
893            EchoRequest::EchoString { value, responder } => {
894                assert_eq!(value.as_deref(), Some("hello"));
895                responder.send(value.as_deref()).unwrap();
896            }
897        }
898
899        client_task.await;
900
901        assert_eq!(counter.read(), Ok(4));
902        drop(waking_stream);
903        assert_eq!(counter.read(), Ok(0));
904    }
905
906    #[::fuchsia::test]
907    async fn test_message_counters_inspect() {
908        let power_manager = SuspendResumeManager::default();
909        let inspector = inspect::component::inspector();
910
911        let zx_counter = zx::Counter::create();
912        let counter_handle = power_manager.add_message_counter(
913            "test_counter",
914            Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
915        );
916
917        zx_counter.add(1).unwrap();
918
919        assert_data_tree!(inspector, root: contains {
920            message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
921        });
922
923        zx_counter.add(1).unwrap();
924        assert_data_tree!(inspector, root: contains {
925            message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
926        });
927
928        drop(counter_handle);
929        assert_data_tree!(inspector, root: contains {
930            message_counters: Vec::<String>::new(),
931        });
932    }
933
934    #[::fuchsia::test]
935    fn test_shared_message_counter() {
936        // Create an owned counter and set its value.
937        let zx_counter = zx::Counter::create();
938        let owned_counter = OwnedMessageCounter::new(
939            "test_shared_counter",
940            Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
941        );
942        zx_counter.add(5).unwrap();
943        assert_eq!(zx_counter.read(), Ok(5));
944
945        // Create a shared counter with no new message. The value should be unchanged.
946        let shared_counter = owned_counter.share(false);
947        assert_eq!(zx_counter.read(), Ok(5));
948
949        // Drop the shared counter. The value should be decremented.
950        drop(shared_counter);
951        assert_eq!(zx_counter.read(), Ok(4));
952
953        // Create a shared counter with a new message. The value should be incremented.
954        let shared_counter_2 = owned_counter.share(true);
955        assert_eq!(zx_counter.read(), Ok(5));
956
957        // Drop the shared counter. The value should be decremented.
958        drop(shared_counter_2);
959        assert_eq!(zx_counter.read(), Ok(4));
960
961        // Create another shared counter.
962        let shared_counter_3 = owned_counter.share(false);
963        assert_eq!(zx_counter.read(), Ok(4));
964
965        // Drop the owned counter. The value should be reset to 0.
966        drop(owned_counter);
967        assert_eq!(zx_counter.read(), Ok(0));
968
969        // Drop the shared counter. The value should remain 0, and it shouldn't panic.
970        drop(shared_counter_3);
971        assert_eq!(zx_counter.read(), Ok(0));
972    }
973
974    #[::fuchsia::test]
975    async fn test_container_waking_event_termination() {
976        let stream = futures::stream::iter(vec![0]).fuse();
977        let counter = zx::Counter::create();
978        counter.add(2).unwrap();
979        assert_eq!(counter.read(), Ok(2));
980        let mut waking_stream = ContainerWakingStream {
981            counter: OwnedMessageCounter::new(
982                "test_stream",
983                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
984            ),
985            stream,
986        };
987
988        assert_eq!(waking_stream.next().await, Some(0));
989        assert_eq!(counter.read(), Ok(1));
990
991        assert_eq!(waking_stream.next().await, None);
992        assert_eq!(waking_stream.next().await, None);
993        // The stream is already terminated, so the counter should remain 0.
994        assert_eq!(counter.read(), Ok(0));
995    }
996}