starnix_core/power/
manager.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::power::{SuspendState, SuspendStats};
6use crate::task::CurrentTask;
7use crate::vfs::EpollKey;
8
9use std::cmp::min;
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::sync::{Arc, Weak};
13
14use anyhow::{Context, anyhow};
15use fidl::endpoints::Proxy;
16use fuchsia_component::client::connect_to_protocol_sync;
17use fuchsia_inspect::ArrayProperty;
18use futures::stream::{FusedStream, Next};
19use futures::{FutureExt, StreamExt};
20use itertools::Itertools;
21use starnix_logging::{log_info, log_warn};
22use starnix_sync::{
23    EbpfSuspendLock, FileOpsCore, LockBefore, Locked, Mutex, MutexGuard, OrderedRwLock,
24    RwLockReadGuard,
25};
26use starnix_task_command::TaskCommand;
27use starnix_uapi::arc_key::WeakKey;
28use starnix_uapi::errors::Errno;
29use starnix_uapi::{errno, error};
30use std::collections::VecDeque;
31use std::fmt;
32use zx::{HandleBased, Peered};
33use {
34    fidl_fuchsia_power_observability as fobs, fidl_fuchsia_session_power as fpower,
35    fidl_fuchsia_starnix_runner as frunner, fuchsia_inspect as inspect,
36};
37
38// Remember at most this many past durations.
39const MAX_PAST_DURATIONS_COUNT: usize = 10;
40
41/// Epoll persistent info, exposed in inspect diagnostics.
42#[derive(Debug, Hash)]
43struct EpollInfo {
44    // The name of the command that this epoll was created for.
45    command: TaskCommand,
46    // When this epoll was created, used for detecting long-running epolls.
47    added_timestamp: Option<zx::BootInstant>,
48    // Records a limited number of past durations for this epoll.
49    past_durations: VecDeque<zx::BootDuration>,
50}
51
52impl EpollInfo {
53    fn new(
54        command: TaskCommand,
55        added_timestamp: zx::BootInstant,
56        past_durations: Option<VecDeque<zx::BootDuration>>,
57    ) -> Self {
58        Self {
59            command,
60            added_timestamp: Some(added_timestamp),
61            past_durations: past_durations
62                .unwrap_or_else(|| VecDeque::with_capacity(MAX_PAST_DURATIONS_COUNT)),
63        }
64    }
65
66    /// Moves the current epoll duration into past durations, computed based off
67    /// of the provided epoll end timestamp.
68    fn update_lifecycle(self, end_timestamp: zx::BootInstant) -> Self {
69        let past_durations = if let Some(start_timestamp) = self.added_timestamp {
70            let duration = end_timestamp - start_timestamp;
71            let mut past_durations = self.past_durations;
72            past_durations.push_front(duration);
73            past_durations.into_iter().take(MAX_PAST_DURATIONS_COUNT).collect()
74        } else {
75            VecDeque::with_capacity(MAX_PAST_DURATIONS_COUNT)
76        };
77        Self { added_timestamp: None, past_durations, ..self }
78    }
79
80    /// Records the contents of this `EpollInfo` into the provided inspect `node`. The
81    /// current epoll is terminated at `stop_timestamp`.
82    fn record_into(&self, node: &inspect::Node, stop_timestamp: zx::BootInstant) {
83        let this_node = node.create_child(&self.command.to_string());
84        if let Some(added_timestamp) = self.added_timestamp {
85            this_node.record_int("created_timestamp_ns", added_timestamp.into_nanos());
86            let duration = stop_timestamp - added_timestamp;
87            this_node.record_int("this_duration_ns", duration.into_nanos());
88        }
89        let len = std::cmp::min(self.past_durations.len(), MAX_PAST_DURATIONS_COUNT);
90        let past_durations_node = this_node.create_int_array("past_durations_ns", len);
91        let _ = self
92            .past_durations
93            .iter()
94            .take(len)
95            .enumerate()
96            .map(|(i, d)| past_durations_node.set(i, d.into_nanos()))
97            .collect::<Vec<_>>();
98        this_node.record(past_durations_node);
99        node.record(this_node);
100    }
101}
102
103/// Manager for suspend and resume.
104pub struct SuspendResumeManager {
105    // The mutable state of [SuspendResumeManager].
106    inner: Arc<Mutex<SuspendResumeManagerInner>>,
107
108    /// The currently registered message counters in the system whose values are exposed to inspect
109    /// via a lazy node.
110    message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>>,
111
112    // The lock used to to avoid suspension while holding eBPF locks.
113    ebpf_suspend_lock: OrderedRwLock<(), EbpfSuspendLock>,
114}
115
116/// Manager for suspend and resume.
117pub struct SuspendResumeManagerInner {
118    /// The suspend counters and gauges.
119    suspend_stats: SuspendStats,
120    sync_on_suspend_enabled: bool,
121
122    suspend_events: VecDeque<SuspendEvent>,
123
124    /// The currently active wake locks in the system. If non-empty, this prevents
125    /// the container from suspending.
126    active_locks: HashMap<String, LockSource>,
127    inactive_locks: HashSet<String>,
128
129    /// The currently active EPOLLWAKEUPs in the system. If non-empty, this prevents
130    /// the container from suspending.
131    active_epolls: HashMap<EpollKey, EpollInfo>,
132
133    /// Previous, but now inactive epolls.
134    inactive_epolls: HashMap<EpollKey, EpollInfo>,
135
136    /// The event pair that is passed to the Starnix runner so it can observe whether
137    /// or not any wake locks are active before completing a suspend operation.
138    active_lock_reader: zx::EventPair,
139
140    /// The event pair that is used by the Starnix kernel to signal when there are
141    /// active wake locks in the container. Note that the peer of the writer is the
142    /// object that is signaled.
143    active_lock_writer: zx::EventPair,
144}
145
146pub type EbpfSuspendGuard<'a> = RwLockReadGuard<'a, ()>;
147
148/// The source of a wake lock.
149pub enum LockSource {
150    WakeLockFile,
151    ContainerPowerController,
152}
153
154#[derive(Clone, Debug)]
155pub enum SuspendEvent {
156    Attempt {
157        time: zx::BootInstant,
158        state: String,
159    },
160    Resume {
161        time: zx::BootInstant,
162        reason: String,
163    },
164    Fail {
165        time: zx::BootInstant,
166        wake_locks: Option<Vec<String>>,
167        epolls: Option<Vec<TaskCommand>>,
168    },
169}
170
171/// The inspect node ring buffer will keep at most this many entries.
172const INSPECT_RING_BUFFER_CAPACITY: usize = 128;
173
174/// The inspect wakelock nodes will keep at most this many entries.
175const INSPECT_MAX_WAKE_LOCK_NAMES: usize = 64;
176const INSPECT_MAX_EPOLLS: usize = 64;
177
178impl Default for SuspendResumeManagerInner {
179    fn default() -> Self {
180        let (active_lock_reader, active_lock_writer) = zx::EventPair::create();
181        Self {
182            suspend_events: VecDeque::with_capacity(INSPECT_RING_BUFFER_CAPACITY),
183            suspend_stats: Default::default(),
184            sync_on_suspend_enabled: Default::default(),
185            active_locks: Default::default(),
186            inactive_locks: Default::default(),
187            active_epolls: Default::default(),
188            inactive_epolls: Default::default(),
189            active_lock_reader,
190            active_lock_writer,
191        }
192    }
193}
194
195impl SuspendResumeManagerInner {
196    // Returns true if there are no wake locks preventing suspension.
197    fn can_suspend(&self) -> bool {
198        self.active_locks.is_empty() && self.active_epolls.is_empty()
199    }
200
201    pub fn active_wake_locks(&self) -> Vec<String> {
202        Vec::from_iter(self.active_locks.keys().cloned())
203    }
204
205    pub fn inactive_wake_locks(&self) -> Vec<String> {
206        Vec::from_iter(self.inactive_locks.clone())
207    }
208
209    fn active_epolls(&self) -> Vec<TaskCommand> {
210        Vec::from_iter(self.active_epolls.values().map(|e| &e.command).cloned())
211    }
212
213    fn update_suspend_stats<UpdateFn>(&mut self, update: UpdateFn)
214    where
215        UpdateFn: FnOnce(&mut SuspendStats),
216    {
217        update(&mut self.suspend_stats);
218    }
219
220    /// Signals whether or not there are currently any active wake locks in the kernel.
221    fn signal_wake_events(&mut self) {
222        let (clear_mask, set_mask) =
223            if self.active_locks.is_empty() && self.active_epolls.is_empty() {
224                (zx::Signals::EVENT_SIGNALED, zx::Signals::empty())
225            } else {
226                (zx::Signals::empty(), zx::Signals::EVENT_SIGNALED)
227            };
228        self.active_lock_writer.signal_peer(clear_mask, set_mask).expect("Failed to signal peer");
229    }
230
231    fn record_active_locks(&self, node: &inspect::Node) {
232        let active_locks = &self.active_locks;
233        let len = min(active_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
234        let active_wake_locks = node.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, len);
235        for (i, name) in active_locks.keys().sorted().take(len).enumerate() {
236            if let Some(src) = active_locks.get(name) {
237                active_wake_locks.set(i, format!("{} (source {})", name, src));
238            }
239        }
240        node.record(active_wake_locks);
241    }
242
243    fn record_inactive_locks(&self, node: &inspect::Node) {
244        let inactive_locks = &self.inactive_locks;
245        let len = min(inactive_locks.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
246        let inactive_wake_locks = node.create_string_array(fobs::INACTIVE_WAKE_LOCK_NAMES, len);
247        for (i, name) in inactive_locks.iter().sorted().take(len).enumerate() {
248            inactive_wake_locks.set(i, name);
249        }
250        node.record(inactive_wake_locks);
251    }
252
253    fn record_active_epolls(&self, node: &inspect::Node) {
254        let active_epolls = &self.active_epolls;
255        let len = min(active_epolls.len(), INSPECT_MAX_EPOLLS);
256        let active_epolls_node = node.create_child(fobs::ACTIVE_EPOLLS);
257        let now = zx::BootInstant::get();
258        for key in active_epolls.keys().take(len) {
259            if let Some(epoll_info) = active_epolls.get(key) {
260                epoll_info.record_into(&active_epolls_node, now);
261            }
262        }
263        node.record(active_epolls_node);
264    }
265
266    fn record_inactive_epolls(&self, node: &inspect::Node) {
267        let inactive_epolls = &self.inactive_epolls;
268        let len = min(inactive_epolls.len(), INSPECT_MAX_WAKE_LOCK_NAMES);
269        let inactive_epolls_node = node.create_child(fobs::INACTIVE_EPOLLS);
270        let now = zx::BootInstant::get();
271        for epoll in inactive_epolls.values().take(len) {
272            epoll.record_into(&inactive_epolls_node, now);
273        }
274        node.record(inactive_epolls_node);
275    }
276
277    fn add_suspend_event(&mut self, event: SuspendEvent) {
278        if self.suspend_events.len() >= INSPECT_RING_BUFFER_CAPACITY {
279            self.suspend_events.pop_front();
280        }
281        self.suspend_events.push_back(event);
282    }
283
284    fn record_suspend_events(&self, node: &inspect::Node) {
285        let events_node = node.create_child("suspend_events");
286        for (i, event) in self.suspend_events.iter().enumerate() {
287            let child = events_node.create_child(i.to_string());
288            match event {
289                SuspendEvent::Attempt { time, state } => {
290                    child.record_int(fobs::SUSPEND_ATTEMPTED_AT, time.into_nanos());
291                    child.record_string(fobs::SUSPEND_REQUESTED_STATE, state);
292                }
293                SuspendEvent::Resume { time, reason } => {
294                    child.record_int(fobs::SUSPEND_RESUMED_AT, time.into_nanos());
295                    child.record_string(fobs::SUSPEND_RESUME_REASON, reason);
296                }
297                SuspendEvent::Fail { time, wake_locks, epolls } => {
298                    child.record_int(fobs::SUSPEND_FAILED_AT, time.into_nanos());
299                    if let Some(names) = wake_locks {
300                        let names_array =
301                            child.create_string_array(fobs::ACTIVE_WAKE_LOCK_NAMES, names.len());
302                        for (i, name) in names.iter().enumerate() {
303                            names_array.set(i, name);
304                        }
305                        child.record(names_array);
306                    }
307                    if let Some(epolls) = epolls {
308                        let epolls_array =
309                            child.create_string_array(fobs::ACTIVE_EPOLLS, epolls.len());
310                        for (i, command) in epolls.iter().enumerate() {
311                            epolls_array.set(i, command.to_string());
312                        }
313                        child.record(epolls_array);
314                    }
315                }
316            }
317            events_node.record(child);
318        }
319        node.record(events_node);
320    }
321}
322
323pub type SuspendResumeManagerHandle = Arc<SuspendResumeManager>;
324
325impl Default for SuspendResumeManager {
326    fn default() -> Self {
327        let message_counters: Arc<Mutex<HashSet<WeakKey<OwnedMessageCounter>>>> =
328            Default::default();
329        let message_counters_clone = message_counters.clone();
330        let root = inspect::component::inspector().root();
331        root.record_lazy_values("message_counters", move || {
332            let message_counters_clone = message_counters_clone.clone();
333            async move {
334                let inspector = fuchsia_inspect::Inspector::default();
335                let root = inspector.root();
336                let mut message_counters = message_counters_clone.lock();
337                message_counters.retain(|c| c.0.upgrade().is_some());
338                let message_counters_inspect =
339                    root.create_string_array("message_counters", message_counters.len());
340                for (i, c) in message_counters.iter().enumerate() {
341                    let counter = c.0.upgrade().expect("lost counter should be retained");
342                    message_counters_inspect.set(i, counter.to_string());
343                }
344                root.record(message_counters_inspect);
345                Ok(inspector)
346            }
347            .boxed()
348        });
349        let inner = Arc::new(Mutex::new(SuspendResumeManagerInner::default()));
350        let inner_clone = inner.clone();
351        root.record_lazy_child("wake_locks", move || {
352            let inner = inner_clone.clone();
353            async move {
354                let inspector = fuchsia_inspect::Inspector::default();
355                let root = inspector.root();
356                let state = inner.lock();
357
358                state.record_active_locks(root);
359                state.record_inactive_locks(root);
360                state.record_active_epolls(root);
361                state.record_inactive_epolls(root);
362                state.record_suspend_events(root);
363
364                Ok(inspector)
365            }
366            .boxed()
367        });
368        Self { message_counters, inner, ebpf_suspend_lock: Default::default() }
369    }
370}
371
372impl SuspendResumeManager {
373    /// Locks and returns the inner state of the manager.
374    pub fn lock(&self) -> MutexGuard<'_, SuspendResumeManagerInner> {
375        self.inner.lock()
376    }
377
378    /// Power on the PowerMode element and start listening to the suspend stats updates.
379    pub fn init(
380        self: &SuspendResumeManagerHandle,
381        system_task: &CurrentTask,
382    ) -> Result<(), anyhow::Error> {
383        let handoff = system_task
384            .kernel()
385            .connect_to_protocol_at_container_svc::<fpower::HandoffMarker>()?
386            .into_sync_proxy();
387        match handoff.take(zx::MonotonicInstant::INFINITE) {
388            Ok(parent_lease) => {
389                let parent_lease = parent_lease
390                    .map_err(|e| anyhow!("Failed to take lessor and lease from parent: {e:?}"))?;
391                drop(parent_lease)
392            }
393            Err(e) => {
394                if e.is_closed() {
395                    log_warn!(
396                        "Failed to send the fuchsia.session.power/Handoff.Take request. Assuming no Handoff protocol exists and moving on..."
397                    );
398                } else {
399                    return Err(e).context("Handoff::Take");
400                }
401            }
402        }
403        Ok(())
404    }
405
406    /// Adds a wake lock `name` to the active wake locks.
407    pub fn add_lock(&self, name: &str, src: LockSource) -> bool {
408        let mut state = self.lock();
409        let res = state.active_locks.insert(String::from(name), src);
410        state.signal_wake_events();
411        res.is_none()
412    }
413
414    /// Removes a wake lock `name` from the active wake locks.
415    pub fn remove_lock(&self, name: &str) -> bool {
416        let mut state = self.lock();
417        let res = state.active_locks.remove(name);
418        if res.is_none() {
419            return false;
420        }
421
422        state.inactive_locks.insert(String::from(name));
423        state.signal_wake_events();
424        true
425    }
426
427    /// Adds a wake lock `key` to the active epoll wake locks.
428    pub fn add_epoll(&self, current_task: &CurrentTask, key: EpollKey) {
429        let mut state = self.lock();
430        let past_durations = state.inactive_epolls.remove(&key).map(|info| info.past_durations);
431        let epoll_info = state.active_epolls.remove(&key).unwrap_or_else(|| {
432            EpollInfo::new(current_task.command(), zx::BootInstant::get(), past_durations)
433        });
434        state.active_epolls.insert(key, epoll_info);
435        state.signal_wake_events();
436    }
437
438    /// Removes a wake lock `key` from the active epoll wake locks.
439    pub fn remove_epoll(&self, key: EpollKey) {
440        let mut state = self.lock();
441        let epoll_info = state.active_epolls.remove(&key);
442        if let Some(epoll_info) = epoll_info {
443            state.inactive_epolls.insert(key, epoll_info.update_lifecycle(zx::BootInstant::get()));
444        }
445        state.signal_wake_events();
446    }
447
448    pub fn add_message_counter(
449        &self,
450        name: &str,
451        counter: Option<zx::Counter>,
452    ) -> OwnedMessageCounterHandle {
453        let container_counter = OwnedMessageCounter::new(name, counter);
454        let mut message_counters = self.message_counters.lock();
455        message_counters.insert(WeakKey::from(&container_counter));
456        message_counters.retain(|c| c.0.upgrade().is_some());
457        container_counter
458    }
459
460    pub fn has_nonzero_message_counter(&self) -> bool {
461        self.message_counters.lock().iter().any(|c| {
462            let Some(c) = c.0.upgrade() else {
463                return false;
464            };
465            c.counter.as_ref().and_then(|counter| counter.read().ok()).map_or(false, |v| v != 0)
466        })
467    }
468
469    /// Returns a duplicate handle to the `EventPair` that is signaled when wake
470    /// locks are active.
471    pub fn duplicate_lock_event(&self) -> zx::EventPair {
472        let state = self.lock();
473        state
474            .active_lock_reader
475            .duplicate_handle(zx::Rights::SAME_RIGHTS)
476            .expect("Failed to duplicate handle")
477    }
478
479    /// Gets the suspend statistics.
480    pub fn suspend_stats(&self) -> SuspendStats {
481        self.lock().suspend_stats.clone()
482    }
483
484    /// Get the contents of the power "sync_on_suspend" file in the power
485    /// filesystem.  True will cause `1` to be reported, and false will cause
486    /// `0` to be reported.
487    pub fn sync_on_suspend_enabled(&self) -> bool {
488        self.lock().sync_on_suspend_enabled.clone()
489    }
490
491    /// Get the contents of the power "sync_on_suspend" file in the power
492    /// filesystem.  See also [sync_on_suspend_enabled].
493    pub fn set_sync_on_suspend(&self, enable: bool) {
494        self.lock().sync_on_suspend_enabled = enable;
495    }
496
497    /// Returns the supported suspend states.
498    pub fn suspend_states(&self) -> HashSet<SuspendState> {
499        // TODO(b/326470421): Remove the hardcoded supported state.
500        HashSet::from([SuspendState::Idle])
501    }
502
503    pub fn suspend(
504        &self,
505        locked: &mut Locked<FileOpsCore>,
506        suspend_state: SuspendState,
507    ) -> Result<(), Errno> {
508        let suspend_start_time = zx::BootInstant::get();
509        let mut state = self.lock();
510        state.add_suspend_event(SuspendEvent::Attempt {
511            time: suspend_start_time,
512            state: suspend_state.to_string(),
513        });
514
515        // Check if any wake locks are active. If they are, short-circuit the suspend attempt.
516        if !state.can_suspend() {
517            self.report_failed_suspension(state, "kernel wake lock");
518            return error!(EINVAL);
519        }
520
521        // Drop the state lock. This allows programs to acquire wake locks again. The runner will
522        // check that no wake locks were acquired once all the container threads have been
523        // suspended, and thus honor any wake locks that were acquired during suspension.
524        std::mem::drop(state);
525
526        // Take the ebpf lock to ensure that ebpf is not preventing suspension. This is necessary
527        // because other components in the system might be executing ebpf programs on our behalf.
528        let _ebpf_lock = self.ebpf_suspend_lock.write(locked);
529
530        let manager = connect_to_protocol_sync::<frunner::ManagerMarker>()
531            .expect("Failed to connect to manager");
532        fuchsia_trace::duration!("power", "suspend_container:fidl");
533
534        let container_job = Some(
535            fuchsia_runtime::job_default()
536                .duplicate(zx::Rights::SAME_RIGHTS)
537                .expect("Failed to dup handle"),
538        );
539        let wake_lock_event = Some(self.duplicate_lock_event());
540
541        log_info!("Requesting container suspension.");
542        match manager.suspend_container(
543            frunner::ManagerSuspendContainerRequest {
544                container_job,
545                wake_locks: wake_lock_event,
546                ..Default::default()
547            },
548            zx::Instant::INFINITE,
549        ) {
550            Ok(Ok(res)) => {
551                self.report_container_resumed(suspend_start_time, res);
552            }
553            e => {
554                let state = self.lock();
555                self.report_failed_suspension(state, &format!("runner error {:?}", e));
556                return error!(EINVAL);
557            }
558        }
559        Ok(())
560    }
561
562    fn report_container_resumed(
563        &self,
564        suspend_start_time: zx::BootInstant,
565        res: frunner::ManagerSuspendContainerResponse,
566    ) {
567        let wake_time = zx::BootInstant::get();
568        // The "0" here is to mimic the expected power management success string,
569        // while we don't have IRQ numbers to report.
570        let resume_reason = res.resume_reason.clone().map(|s| format!("0 {}", s));
571        log_info!("Resuming from container suspension: {:?}", resume_reason);
572        let mut state = self.lock();
573        state.update_suspend_stats(|suspend_stats| {
574            suspend_stats.success_count += 1;
575            suspend_stats.last_time_in_suspend_operations = (wake_time - suspend_start_time).into();
576            suspend_stats.last_time_in_sleep =
577                zx::BootDuration::from_nanos(res.suspend_time.unwrap_or(0));
578            suspend_stats.last_resume_reason = resume_reason.clone();
579        });
580        state.add_suspend_event(SuspendEvent::Resume {
581            time: wake_time,
582            reason: resume_reason.unwrap_or_default(),
583        });
584        fuchsia_trace::instant!("power", "suspend_container:done", fuchsia_trace::Scope::Process);
585    }
586
587    fn report_failed_suspension(
588        &self,
589        mut state: MutexGuard<'_, SuspendResumeManagerInner>,
590        failure_reason: &str,
591    ) {
592        let wake_time = zx::BootInstant::get();
593        state.update_suspend_stats(|suspend_stats| {
594            suspend_stats.fail_count += 1;
595            suspend_stats.last_failed_errno = Some(errno!(EINVAL));
596            suspend_stats.last_resume_reason = None;
597        });
598
599        let wake_lock_names = state.active_wake_locks();
600        let epoll_names = state.active_epolls();
601        let last_resume_reason =
602            format!("Abort: {}", wake_lock_names.join(" ") + &epoll_names.iter().join(" "));
603        state.update_suspend_stats(|suspend_stats| {
604            // Power analysis tools require `Abort: ` in the case of failed suspends
605            suspend_stats.last_resume_reason = Some(last_resume_reason);
606        });
607
608        log_warn!(
609            "Suspend failed due to {:?}. Here are the active wake locks: {:?}, and epolls: {:?}",
610            failure_reason,
611            wake_lock_names,
612            epoll_names
613        );
614        state.add_suspend_event(SuspendEvent::Fail {
615            time: wake_time,
616            wake_locks: Some(wake_lock_names),
617            epolls: Some(epoll_names),
618        });
619        fuchsia_trace::instant!("power", "suspend_container:error", fuchsia_trace::Scope::Process);
620    }
621
622    pub fn acquire_ebpf_suspend_lock<'a, L>(
623        &'a self,
624        locked: &'a mut Locked<L>,
625    ) -> EbpfSuspendGuard<'a>
626    where
627        L: LockBefore<EbpfSuspendLock>,
628    {
629        self.ebpf_suspend_lock.read(locked)
630    }
631}
632
633impl fmt::Display for LockSource {
634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635        match self {
636            LockSource::WakeLockFile => write!(f, "wake lock file"),
637            LockSource::ContainerPowerController => write!(f, "container power controller"),
638        }
639    }
640}
641
642pub trait OnWakeOps: Send + Sync {
643    fn on_wake(&self, current_task: &CurrentTask, baton_lease: &zx::NullableHandle);
644}
645
646/// Creates a proxy between `remote_channel` and the returned `zx::Channel`.
647///
648/// The message counter's initial value will be set to 0.
649///
650/// The returned counter will be incremented each time there is an incoming message on the proxied
651/// channel. The starnix_kernel is expected to decrement the counter when that incoming message is
652/// handled.
653///
654/// Note that "message" in this context means channel message. This can be either a FIDL event, or
655/// a response to a FIDL message from the platform.
656///
657/// For example, the starnix_kernel may issue a hanging get to retrieve input events. When that
658/// hanging get returns, the counter will be incremented by 1. When the next hanging get has been
659/// scheduled, the input subsystem decrements the counter by 1.
660///
661/// The proxying is done by the Starnix runner, and allows messages on the channel to wake
662/// the container.
663pub fn create_proxy_for_wake_events_counter_zero(
664    remote_channel: zx::Channel,
665    name: String,
666) -> (zx::Channel, zx::Counter) {
667    let (local_proxy, kernel_channel) = zx::Channel::create();
668    let counter = zx::Counter::create();
669
670    let local_counter =
671        counter.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("Failed to duplicate counter");
672
673    let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
674        .expect("failed");
675    manager
676        .proxy_wake_channel(frunner::ManagerProxyWakeChannelRequest {
677            container_job: Some(
678                fuchsia_runtime::job_default()
679                    .duplicate(zx::Rights::SAME_RIGHTS)
680                    .expect("Failed to dup handle"),
681            ),
682            container_channel: Some(kernel_channel),
683            remote_channel: Some(remote_channel),
684            counter: Some(counter),
685            name: Some(name),
686            ..Default::default()
687        })
688        .expect("Failed to create proxy");
689
690    (local_proxy, local_counter)
691}
692
693/// Creates a proxy between `remote_channel` and the returned `zx::Channel`.
694///
695/// The message counter's initial value will be set to 1, which will prevent the container from
696/// suspending until the caller decrements the counter.
697///
698/// The returned counter will be incremented each time there is an incoming message on the proxied
699/// channel. The starnix_kernel is expected to decrement the counter when that incoming message is
700/// handled.
701///
702/// Note that "message" in this context means channel message. This can be either a FIDL event, or
703/// a response to a FIDL message from the platform.
704///
705/// For example, the starnix_kernel may issue a hanging get to retrieve input events. When that
706/// hanging get returns, the counter will be incremented by 1. When the next hanging get has been
707/// scheduled, the input subsystem decrements the counter by 1.
708///
709/// The proxying is done by the Starnix runner, and allows messages on the channel to wake
710/// the container.
711pub fn create_proxy_for_wake_events_counter(
712    remote_channel: zx::Channel,
713    name: String,
714) -> (zx::Channel, zx::Counter) {
715    let (proxy, counter) = create_proxy_for_wake_events_counter_zero(remote_channel, name);
716
717    // Increment the counter by one so that the initial incoming message to the container will
718    // set the count to 0, instead of -1.
719    counter.add(1).expect("Failed to add to counter");
720
721    (proxy, counter)
722}
723
724/// Marks a message handled by decrementing `counter`.
725///
726/// This should be called when a proxied channel message has been handled, and the caller would
727/// be ok letting the container suspend.
728pub fn mark_proxy_message_handled(counter: &zx::Counter) {
729    counter.add(-1).expect("Failed to decrement counter");
730}
731
732/// Marks all messages tracked by `counter` as handled.
733pub fn mark_all_proxy_messages_handled(counter: &zx::Counter) {
734    counter.write(0).expect("Failed to decrement counter");
735}
736
737/// Creates a watcher between clients and the Starnix runner.
738///
739/// Changes in the power state of the container are relayed by the event pair.
740pub fn create_watcher_for_wake_events(watcher: zx::EventPair) {
741    let manager = fuchsia_component::client::connect_to_protocol_sync::<frunner::ManagerMarker>()
742        .expect("failed");
743    manager
744        .register_wake_watcher(
745            frunner::ManagerRegisterWakeWatcherRequest {
746                watcher: Some(watcher),
747                ..Default::default()
748            },
749            zx::Instant::INFINITE,
750        )
751        .expect("Failed to register wake watcher");
752}
753
754/// Wrapper around a Weak `OwnedMessageCounter` that can be passed around to keep the container
755/// awake.
756///
757/// Each live `SharedMessageCounter` is responsible for a pending message while it in scope,
758/// and removes it from the counter when it goes out of scope.  Processes that need to cooperate
759/// can pass a `SharedMessageCounter` to each other to ensure that once the work is done, the lock
760/// goes out of scope as well. This allows for precise accounting of remaining work, and should
761/// give us control over container suspension which is guarded by the compiler, not conventions.
762#[derive(Debug)]
763pub struct SharedMessageCounter(Weak<OwnedMessageCounter>);
764
765impl Drop for SharedMessageCounter {
766    fn drop(&mut self) {
767        if let Some(message_counter) = self.0.upgrade() {
768            message_counter.mark_handled();
769        }
770    }
771}
772
773/// Owns a `zx::Counter` to track pending messages that prevent the container from suspending.
774///
775/// This struct ensures that the counter is reset to 0 when the last strong reference is dropped,
776/// effectively releasing any wake lock held by this counter.
777pub struct OwnedMessageCounter {
778    name: String,
779    counter: Option<zx::Counter>,
780}
781pub type OwnedMessageCounterHandle = Arc<OwnedMessageCounter>;
782
783impl Drop for OwnedMessageCounter {
784    /// Resets the underlying `zx::Counter` to 0 when the `OwnedMessageCounter` is dropped.
785    ///
786    /// This ensures that all pending messages are marked as handled, allowing the system to suspend
787    /// if no other wake locks are held.
788    fn drop(&mut self) {
789        self.counter.as_ref().map(mark_all_proxy_messages_handled);
790    }
791}
792
793impl OwnedMessageCounter {
794    pub fn new(name: &str, counter: Option<zx::Counter>) -> OwnedMessageCounterHandle {
795        Arc::new(Self { name: name.to_string(), counter })
796    }
797
798    /// Decrements the counter, signaling that a pending message or operation has been handled.
799    ///
800    /// This should be called when the work associated with a previous `mark_pending` call is
801    /// complete.
802    pub fn mark_handled(&self) {
803        self.counter.as_ref().map(mark_proxy_message_handled);
804    }
805
806    /// Increments the counter, signaling that a new message or operation is pending.
807    ///
808    /// This prevents the system from suspending until a corresponding `mark_handled` call is made.
809    pub fn mark_pending(&self) {
810        self.counter.as_ref().map(|c| c.add(1).expect("Failed to increment counter"));
811    }
812
813    /// Creates a `SharedMessageCounter` from this `OwnedMessageCounter`.
814    ///
815    /// `new_pending_message` - if a new pending message should be added
816    pub fn share(
817        self: &OwnedMessageCounterHandle,
818        new_pending_message: bool,
819    ) -> SharedMessageCounter {
820        if new_pending_message {
821            self.mark_pending();
822        }
823        SharedMessageCounter(Arc::downgrade(self))
824    }
825}
826
827impl fmt::Display for OwnedMessageCounter {
828    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
829        write!(f, "Counter({}): {:?}", self.name, self.counter.as_ref().map(|c| c.read()))
830    }
831}
832
833/// A proxy wrapper that manages a `zx::Counter` to allow the container to suspend
834/// after events are being processed.
835///
836/// When the proxy is dropped, the counter is reset to 0 to release the wake-lock.
837pub struct ContainerWakingProxy<P: Proxy> {
838    counter: OwnedMessageCounterHandle,
839    proxy: P,
840}
841
842impl<P: Proxy> ContainerWakingProxy<P> {
843    pub fn new(counter: OwnedMessageCounterHandle, proxy: P) -> Self {
844        Self { counter, proxy }
845    }
846
847    /// Create a `Future` call on the proxy.
848    ///
849    /// The counter will be decremented as message handled after the future is created.
850    pub fn call<T, F, R>(&self, future: F) -> R
851    where
852        F: FnOnce(&P) -> R,
853        R: Future<Output = T>,
854    {
855        // The sequence for handling events MUST be:
856        //
857        // 1. create future
858        // 2. decrease counter
859        // 3. await future
860        //
861        // for allowing suspend - wake.
862        let f = future(&self.proxy);
863        self.counter.mark_handled();
864        f
865    }
866}
867
868/// A stream wrapper that manages a `zx::Counter` to allow the container to suspend
869/// after events are being processed.
870///
871/// When the stream is dropped, the counter is reset to 0 to release the wake-lock.
872pub struct ContainerWakingStream<S: FusedStream + Unpin> {
873    counter: OwnedMessageCounterHandle,
874    stream: S,
875}
876
877impl<S: FusedStream + Unpin> ContainerWakingStream<S> {
878    pub fn new(counter: OwnedMessageCounterHandle, stream: S) -> Self {
879        Self { counter, stream }
880    }
881
882    /// Create a `Next` call on the stream.poll_next().
883    ///
884    /// The counter will be decremented as message handled after the future is created.
885    pub fn next(&mut self) -> Next<'_, S> {
886        // See `ContainerWakingProxy::call` for sequence of handling events.
887        let is_terminated = self.stream.is_terminated();
888        let next = self.stream.next();
889        if !is_terminated {
890            self.counter.mark_handled();
891        }
892        next
893    }
894}
895
896#[cfg(test)]
897mod test {
898    use super::*;
899    use diagnostics_assertions::assert_data_tree;
900    use fidl::endpoints::create_proxy_and_stream;
901    use fidl_test_placeholders::{EchoMarker, EchoRequest};
902    use futures::StreamExt;
903    use zx::{self, HandleBased};
904    use {fuchsia_async as fasync, fuchsia_inspect as inspect};
905
906    #[::fuchsia::test]
907    fn test_counter_zero_initialization() {
908        let (_endpoint, endpoint) = zx::Channel::create();
909        let (_channel, counter) =
910            super::create_proxy_for_wake_events_counter_zero(endpoint, "test".into());
911        assert_eq!(counter.read(), Ok(0));
912    }
913
914    #[::fuchsia::test]
915    fn test_counter_initialization() {
916        let (_endpoint, endpoint) = zx::Channel::create();
917        let (_channel, counter) =
918            super::create_proxy_for_wake_events_counter(endpoint, "test".into());
919        assert_eq!(counter.read(), Ok(1));
920    }
921
922    #[::fuchsia::test]
923    async fn test_container_waking_proxy() {
924        let (proxy, mut stream) = create_proxy_and_stream::<EchoMarker>();
925        let server_task = fasync::Task::spawn(async move {
926            let request = stream.next().await.unwrap().unwrap();
927            match request {
928                EchoRequest::EchoString { value, responder } => {
929                    responder.send(value.as_deref()).unwrap();
930                }
931            }
932        });
933
934        let counter = zx::Counter::create();
935        counter.add(5).unwrap();
936        assert_eq!(counter.read(), Ok(5));
937
938        let waking_proxy = ContainerWakingProxy {
939            counter: OwnedMessageCounter::new(
940                "test_proxy",
941                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
942            ),
943            proxy,
944        };
945
946        let response_future = waking_proxy.call(|p| p.echo_string(Some("hello")));
947
948        // The `call` method decrements the counter.
949        assert_eq!(counter.read(), Ok(4));
950
951        let response = response_future.await.unwrap();
952        assert_eq!(response.as_deref(), Some("hello"));
953
954        server_task.await;
955
956        assert_eq!(counter.read(), Ok(4));
957        drop(waking_proxy);
958        assert_eq!(counter.read(), Ok(0));
959    }
960
961    #[::fuchsia::test]
962    async fn test_container_waking_stream() {
963        let (proxy, stream) = create_proxy_and_stream::<EchoMarker>();
964        let client_task = fasync::Task::spawn(async move {
965            let response = proxy.echo_string(Some("hello")).await.unwrap();
966            assert_eq!(response.as_deref(), Some("hello"));
967        });
968
969        let counter = zx::Counter::create();
970        counter.add(5).unwrap();
971        assert_eq!(counter.read(), Ok(5));
972
973        let mut waking_stream = ContainerWakingStream {
974            counter: OwnedMessageCounter::new(
975                "test_stream",
976                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
977            ),
978            stream,
979        };
980
981        let request_future = waking_stream.next();
982
983        // The `next` method decrements the counter.
984        assert_eq!(counter.read(), Ok(4));
985
986        let request = request_future.await.unwrap().unwrap();
987        match request {
988            EchoRequest::EchoString { value, responder } => {
989                assert_eq!(value.as_deref(), Some("hello"));
990                responder.send(value.as_deref()).unwrap();
991            }
992        }
993
994        client_task.await;
995
996        assert_eq!(counter.read(), Ok(4));
997        drop(waking_stream);
998        assert_eq!(counter.read(), Ok(0));
999    }
1000
1001    #[::fuchsia::test]
1002    async fn test_message_counters_inspect() {
1003        let power_manager = SuspendResumeManager::default();
1004        let inspector = inspect::component::inspector();
1005
1006        let zx_counter = zx::Counter::create();
1007        let counter_handle = power_manager.add_message_counter(
1008            "test_counter",
1009            Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1010        );
1011
1012        zx_counter.add(1).unwrap();
1013
1014        assert_data_tree!(inspector, root: contains {
1015            message_counters: vec!["Counter(test_counter): Some(Ok(1))"],
1016        });
1017
1018        zx_counter.add(1).unwrap();
1019        assert_data_tree!(inspector, root: contains {
1020            message_counters: vec!["Counter(test_counter): Some(Ok(2))"],
1021        });
1022
1023        drop(counter_handle);
1024        assert_data_tree!(inspector, root: contains {
1025            message_counters: Vec::<String>::new(),
1026        });
1027    }
1028
1029    #[::fuchsia::test]
1030    fn test_shared_message_counter() {
1031        // Create an owned counter and set its value.
1032        let zx_counter = zx::Counter::create();
1033        let owned_counter = OwnedMessageCounter::new(
1034            "test_shared_counter",
1035            Some(zx_counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1036        );
1037        zx_counter.add(5).unwrap();
1038        assert_eq!(zx_counter.read(), Ok(5));
1039
1040        // Create a shared counter with no new message. The value should be unchanged.
1041        let shared_counter = owned_counter.share(false);
1042        assert_eq!(zx_counter.read(), Ok(5));
1043
1044        // Drop the shared counter. The value should be decremented.
1045        drop(shared_counter);
1046        assert_eq!(zx_counter.read(), Ok(4));
1047
1048        // Create a shared counter with a new message. The value should be incremented.
1049        let shared_counter_2 = owned_counter.share(true);
1050        assert_eq!(zx_counter.read(), Ok(5));
1051
1052        // Drop the shared counter. The value should be decremented.
1053        drop(shared_counter_2);
1054        assert_eq!(zx_counter.read(), Ok(4));
1055
1056        // Create another shared counter.
1057        let shared_counter_3 = owned_counter.share(false);
1058        assert_eq!(zx_counter.read(), Ok(4));
1059
1060        // Drop the owned counter. The value should be reset to 0.
1061        drop(owned_counter);
1062        assert_eq!(zx_counter.read(), Ok(0));
1063
1064        // Drop the shared counter. The value should remain 0, and it shouldn't panic.
1065        drop(shared_counter_3);
1066        assert_eq!(zx_counter.read(), Ok(0));
1067    }
1068
1069    #[::fuchsia::test]
1070    async fn test_container_waking_event_termination() {
1071        let stream = futures::stream::iter(vec![0]).fuse();
1072        let counter = zx::Counter::create();
1073        counter.add(2).unwrap();
1074        assert_eq!(counter.read(), Ok(2));
1075        let mut waking_stream = ContainerWakingStream {
1076            counter: OwnedMessageCounter::new(
1077                "test_stream",
1078                Some(counter.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap()),
1079            ),
1080            stream,
1081        };
1082
1083        assert_eq!(waking_stream.next().await, Some(0));
1084        assert_eq!(counter.read(), Ok(1));
1085
1086        assert_eq!(waking_stream.next().await, None);
1087        assert_eq!(waking_stream.next().await, None);
1088        // The stream is already terminated, so the counter should remain 0.
1089        assert_eq!(counter.read(), Ok(0));
1090    }
1091}