kernel_manager/
proxy.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::suspend::WakeSources;
6use anyhow::{Error, anyhow};
7use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use log::warn;
11use std::cell::RefCell;
12use std::mem::MaybeUninit;
13use std::rc::Rc;
14use std::sync::Arc;
15
16/// `ChannelProxy` is used to proxy messages on a `zx::Channel` between the Starnix
17/// container and the outside world. This allows the Starnix runner to wake the container
18/// on incoming messages.
19///
20/// [platform component] <-- remote_channel --> [Starnix runner] <-- container_channel --> [Starnix container]
21pub struct ChannelProxy {
22    /// The channel that is connected to the container component.
23    pub container_channel: zx::Channel,
24
25    /// The channel that is connected to a peer outside of the container component.
26    pub remote_channel: zx::Channel,
27
28    /// The number of unhandled messages on this proxy. If non-zero, the container is still
29    /// processing one of the incoming messages and the container should not be suspended.
30    pub message_counter: zx::Counter,
31
32    /// Human readable name for the thing that is being proxied.
33    pub name: String,
34}
35
36// `WaitReturn` is used to indicate which proxy endpoint caused the wait to complete.
37#[derive(Debug)]
38enum WaitReturn {
39    Container,
40    Remote,
41}
42
43/// The Zircon role name that is applied to proxy threads.
44const PROXY_ROLE_NAME: &str = "fuchsia.starnix.runner.proxy";
45
46/// Starts a thread that listens for new proxies and runs `start_proxy` on each.
47pub fn run_proxy_thread(
48    new_proxies: async_channel::Receiver<(ChannelProxy, Arc<Mutex<WakeSources>>)>,
49) {
50    let _ = std::thread::Builder::new().name("proxy_thread".to_string()).spawn(move || {
51        if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(PROXY_ROLE_NAME) {
52            warn!(e:%; "failed to set thread role");
53        }
54        let mut executor = fasync::LocalExecutor::default();
55        executor.run_singlethreaded(async move {
56            let mut tasks = fasync::TaskGroup::new();
57            let bounce_bytes = Rc::new(RefCell::new(
58                [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
59            ));
60            let bounce_handles = Rc::new(RefCell::new(
61                [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
62            ));
63            while let Ok((proxy, events)) = new_proxies.recv().await {
64                let bytes_clone = bounce_bytes.clone();
65                let handles_clone = bounce_handles.clone();
66                tasks.local(start_proxy(proxy, events, bytes_clone, handles_clone));
67            }
68        });
69    });
70}
71
72/// Starts a task that proxies messages between `proxy.container_channel` and
73/// `proxy.remote_channel`. The task will exit when either of the channels' peer is closed, or
74/// if `proxy.resume_event`'s peer is closed.
75///
76/// When the task exits, `proxy.resume_event` will be removed from `wake_sources`.
77async fn start_proxy(
78    proxy: ChannelProxy,
79    wake_sources: Arc<Mutex<WakeSources>>,
80    bounce_bytes: Rc<RefCell<[MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize]>>,
81    bounce_handles: Rc<
82        RefCell<[MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize]>,
83    >,
84) {
85    let proxy_name = proxy.name.as_str();
86    trace_instant("starnix_runner:start_proxy:loop:enter", proxy_name);
87
88    'outer: loop {
89        // Wait on messages from both the container and remote channel endpoints.
90        let mut container_wait = fasync::OnSignals::new(
91            proxy.container_channel.as_handle_ref(),
92            zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
93        )
94        .fuse();
95        let mut remote_wait = fasync::OnSignals::new(
96            proxy.remote_channel.as_handle_ref(),
97            zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
98        )
99        .fuse();
100
101        let (signals, finished_wait) = {
102            trace_duration("starnix_runner:start_proxy:wait_for_messages", proxy_name);
103            let result = futures::select! {
104                res = container_wait => {
105                    trace_instant("starnix_runner:start_proxy:container_readable", proxy_name);
106                    res.map(|s| (s, WaitReturn::Container))
107                },
108                res = remote_wait => {
109                    trace_instant("starnix_runner:start_proxy:remote_readable", proxy_name);
110                    res.map(|s| (s, WaitReturn::Remote))
111                },
112            };
113
114            match result {
115                Ok(result) => result,
116                Err(e) => {
117                    trace_instant("starnix_runner:start_proxy:result:error", proxy_name);
118                    log::warn!("Failed to wait on proxied channels in runner: {:?}", e);
119                    break 'outer;
120                }
121            }
122        };
123
124        // Forward messages in both directions. Only messages that are entering the container
125        // should signal `proxy.resume_event`, since those are the only messages that should
126        // wake the container if it's suspended.
127        let name = proxy.name.as_str();
128        let result = match finished_wait {
129            WaitReturn::Container => forward_message(
130                &signals,
131                &proxy.container_channel,
132                &proxy.remote_channel,
133                None,
134                &mut bounce_bytes.borrow_mut(),
135                &mut bounce_handles.borrow_mut(),
136                name,
137            ),
138            WaitReturn::Remote => forward_message(
139                &signals,
140                &proxy.remote_channel,
141                &proxy.container_channel,
142                Some(&proxy.message_counter),
143                &mut bounce_bytes.borrow_mut(),
144                &mut bounce_handles.borrow_mut(),
145                name,
146            ),
147        };
148
149        if result.is_err() {
150            log::warn!(
151                "Proxy failed to forward message {} kernel: {}; {:?}",
152                match finished_wait {
153                    WaitReturn::Container => "from",
154                    WaitReturn::Remote => "to",
155                },
156                name,
157                result,
158            );
159            break 'outer;
160        }
161    }
162
163    trace_instant("starnix_runner:start_proxy:loop:exit", proxy_name);
164    if let Ok(koid) = proxy.message_counter.koid() {
165        wake_sources.lock().remove(&koid);
166    }
167}
168
169/// Forwards any pending messages on `read_channel` to `write_channel`, if the `wait_item.pending`
170/// contains `CHANNEL_READABLE`.
171///
172/// If `message_counter` is `Some`, it will be incremented by one when writing the message to the
173/// write_channel.
174
175fn forward_message(
176    signals: &zx::Signals,
177    read_channel: &zx::Channel,
178    write_channel: &zx::Channel,
179    message_counter: Option<&zx::Counter>,
180    bytes: &mut [MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
181    handles: &mut [MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
182    name: &str,
183) -> Result<(), Error> {
184    trace_duration("starnix_runner:forward_message", name);
185
186    if signals.contains(zx::Signals::CHANNEL_READABLE) {
187        let (actual_bytes, actual_handles) = {
188            match read_channel.read_uninit(bytes, handles) {
189                zx::ChannelReadResult::Ok(r) => r,
190                _ => return Err(anyhow!("Failed to read from channel")),
191            }
192        };
193
194        if let Some(counter) = message_counter {
195            counter.add(1).expect("Failed to add to the proxy's message counter");
196            trace_instant("starnix_runner:forward_message:counter_incremented", name);
197        }
198
199        write_channel.write(actual_bytes, actual_handles)?;
200    }
201
202    // It is important to check for peer closed after readable, in order to flush any
203    // remaining messages in the proxied channel.
204    if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
205        Err(anyhow!("Proxy peer was closed"))
206    } else {
207        Ok(())
208    }
209}
210
211fn trace_duration(event: &'static str, name: &str) {
212    fuchsia_trace::duration!("power", event, "name" => name);
213}
214
215fn trace_instant(event: &'static str, name: &str) {
216    fuchsia_trace::instant!(
217        "power",
218        event,
219        fuchsia_trace::Scope::Process,
220        "name" => name
221    );
222}
223
224#[cfg(test)]
225mod test {
226    use super::{ChannelProxy, fasync, start_proxy};
227    use fidl::HandleBased;
228    use std::cell::RefCell;
229    use std::mem::MaybeUninit;
230    use std::rc::Rc;
231
232    fn run_proxy_for_test(proxy: ChannelProxy) -> fasync::Task<()> {
233        let bounce_bytes = Rc::new(RefCell::new(
234            [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
235        ));
236        let bounce_handles = Rc::new(RefCell::new(
237            [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
238        ));
239        fasync::Task::local(start_proxy(proxy, Default::default(), bounce_bytes, bounce_handles))
240    }
241
242    #[fuchsia::test]
243    async fn test_peer_closed_kernel() {
244        let (local_client, local_server) = zx::Channel::create();
245        let (remote_client, remote_server) = zx::Channel::create();
246        let message_counter = zx::Counter::create();
247
248        let channel_proxy = ChannelProxy {
249            container_channel: local_server,
250            remote_channel: remote_client,
251            message_counter,
252            name: "test".to_string(),
253        };
254        let _task = run_proxy_for_test(channel_proxy);
255
256        std::mem::drop(local_client);
257
258        fasync::OnSignals::new(remote_server, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
259    }
260
261    #[fuchsia::test]
262    async fn test_peer_closed_remote() {
263        let (local_client, local_server) = zx::Channel::create();
264        let (remote_client, remote_server) = zx::Channel::create();
265        let message_counter = zx::Counter::create();
266
267        let channel_proxy = ChannelProxy {
268            container_channel: local_server,
269            remote_channel: remote_client,
270            message_counter,
271            name: "test".to_string(),
272        };
273        let _task = run_proxy_for_test(channel_proxy);
274
275        std::mem::drop(remote_server);
276
277        fasync::OnSignals::new(local_client, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
278    }
279
280    #[fuchsia::test]
281    async fn test_counter_sequential() {
282        let (_local_client, local_server) = zx::Channel::create();
283        let (remote_client, remote_server) = zx::Channel::create();
284        let message_counter = zx::Counter::create();
285        let local_message_counter = message_counter
286            .duplicate_handle(zx::Rights::SAME_RIGHTS)
287            .expect("Failed to duplicate counter");
288
289        let channel_proxy = ChannelProxy {
290            container_channel: local_server,
291            remote_channel: remote_client,
292            message_counter,
293            name: "test".to_string(),
294        };
295        let _task = run_proxy_for_test(channel_proxy);
296
297        // Send a message and make sure counter is incremented.
298        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
299            .await
300            .unwrap();
301        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
302        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
303            .await
304            .unwrap();
305
306        // Decrement the counter, simulating a read, and make sure it goes back down to zero.
307        local_message_counter.add(-1).expect("Failed add");
308        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
309            .await
310            .unwrap();
311        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
312        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
313            .await
314            .unwrap();
315    }
316
317    #[fuchsia::test]
318    async fn test_counter_multiple() {
319        let (_local_client, local_server) = zx::Channel::create();
320        let (remote_client, remote_server) = zx::Channel::create();
321        let message_counter = zx::Counter::create();
322        let local_message_counter = message_counter
323            .duplicate_handle(zx::Rights::SAME_RIGHTS)
324            .expect("Failed to duplicate counter");
325
326        let channel_proxy = ChannelProxy {
327            container_channel: local_server,
328            remote_channel: remote_client,
329            message_counter,
330            name: "test".to_string(),
331        };
332        let _task = run_proxy_for_test(channel_proxy);
333
334        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
335        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
336        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
337        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
338            .await
339            .unwrap();
340        assert_eq!(local_message_counter.read().expect("Failed to read counter"), 3);
341    }
342}