Skip to main content

kernel_manager/
proxy.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::suspend::WakeSources;
6use anyhow::{Error, anyhow};
7use fuchsia_async as fasync;
8use futures::FutureExt;
9use log::warn;
10use starnix_sync::{LockDepMutex, TerminalLock};
11use std::cell::RefCell;
12use std::mem::MaybeUninit;
13use std::pin::pin;
14use std::rc::Rc;
15use std::sync::Arc;
16
17/// `ChannelProxy` is used to proxy messages on a `zx::Channel` between the Starnix
18/// container and the outside world. This allows the Starnix runner to wake the container
19/// on incoming messages.
20///
21/// [platform component] <-- remote_channel --> [Starnix runner] <-- container_channel --> [Starnix container]
22pub struct ChannelProxy {
23    /// The channel that is connected to the container component.
24    pub container_channel: zx::Channel,
25
26    /// The channel that is connected to a peer outside of the container component.
27    pub remote_channel: zx::Channel,
28
29    /// The number of unhandled messages on this proxy. If non-zero, the container is still
30    /// processing one of the incoming messages and the container should not be suspended.
31    pub message_counter: zx::Counter,
32
33    /// Human readable name for the thing that is being proxied.
34    pub name: String,
35}
36
37// `WaitReturn` is used to indicate which proxy endpoint caused the wait to complete.
38#[derive(Debug)]
39enum WaitReturn {
40    Container,
41    Remote,
42}
43
44/// The Zircon role name that is applied to proxy threads.
45const PROXY_ROLE_NAME: &str = "fuchsia.starnix.runner.proxy";
46
47/// Starts a thread that listens for new proxies and runs `start_proxy` on each.
48pub fn run_proxy_thread(
49    new_proxies: async_channel::Receiver<(
50        ChannelProxy,
51        Arc<LockDepMutex<WakeSources, TerminalLock>>,
52    )>,
53) {
54    let _ = std::thread::Builder::new().name("proxy_thread".to_string()).spawn(move || {
55        if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(PROXY_ROLE_NAME) {
56            warn!(e:%; "failed to set thread role");
57        }
58        let mut executor = fasync::LocalExecutor::default();
59        executor.run_singlethreaded(async move {
60            let tasks = fasync::Scope::new();
61            let bounce_bytes = Rc::new(RefCell::new(
62                [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
63            ));
64            let bounce_handles = Rc::new(RefCell::new(
65                [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
66            ));
67            while let Ok((proxy, events)) = new_proxies.recv().await {
68                let bytes_clone = bounce_bytes.clone();
69                let handles_clone = bounce_handles.clone();
70                tasks.spawn_local(start_proxy(proxy, events, bytes_clone, handles_clone));
71            }
72        });
73    });
74}
75
76/// Starts a task that proxies messages between `proxy.container_channel` and
77/// `proxy.remote_channel`. The task will exit when either of the channels' peer is closed, or
78/// if `proxy.resume_event`'s peer is closed.
79///
80/// When the task exits, `proxy.resume_event` will be removed from `wake_sources`.
81async fn start_proxy(
82    proxy: ChannelProxy,
83    wake_sources: Arc<LockDepMutex<WakeSources, TerminalLock>>,
84    bounce_bytes: Rc<RefCell<[MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize]>>,
85    bounce_handles: Rc<
86        RefCell<[MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize]>,
87    >,
88) {
89    let proxy_name = proxy.name.as_str();
90    trace_instant("starnix_runner:start_proxy:loop:enter", proxy_name);
91
92    'outer: loop {
93        // Wait on messages from both the container and remote channel endpoints.
94        let mut container_wait = pin!(
95            fasync::OnSignals::new(
96                proxy.container_channel.as_handle_ref(),
97                zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
98            )
99            .fuse()
100        );
101        let mut remote_wait = pin!(
102            fasync::OnSignals::new(
103                proxy.remote_channel.as_handle_ref(),
104                zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
105            )
106            .fuse()
107        );
108
109        let (signals, finished_wait) = {
110            trace_duration("starnix_runner:start_proxy:wait_for_messages", proxy_name);
111            let result = futures::select! {
112                res = container_wait => {
113                    trace_instant("starnix_runner:start_proxy:container_readable", proxy_name);
114                    res.map(|s| (s, WaitReturn::Container))
115                },
116                res = remote_wait => {
117                    trace_instant("starnix_runner:start_proxy:remote_readable", proxy_name);
118                    res.map(|s| (s, WaitReturn::Remote))
119                },
120            };
121
122            match result {
123                Ok(result) => result,
124                Err(e) => {
125                    trace_instant("starnix_runner:start_proxy:result:error", proxy_name);
126                    log::warn!("Failed to wait on proxied channels in runner: {:?}", e);
127                    break 'outer;
128                }
129            }
130        };
131
132        // Forward messages in both directions. Only messages that are entering the container
133        // should signal `proxy.resume_event`, since those are the only messages that should
134        // wake the container if it's suspended.
135        let name = proxy.name.as_str();
136        let result = match finished_wait {
137            WaitReturn::Container => forward_message(
138                &signals,
139                &proxy.container_channel,
140                &proxy.remote_channel,
141                None,
142                &mut bounce_bytes.borrow_mut(),
143                &mut bounce_handles.borrow_mut(),
144                name,
145            ),
146            WaitReturn::Remote => forward_message(
147                &signals,
148                &proxy.remote_channel,
149                &proxy.container_channel,
150                Some(&proxy.message_counter),
151                &mut bounce_bytes.borrow_mut(),
152                &mut bounce_handles.borrow_mut(),
153                name,
154            ),
155        };
156
157        if result.is_err() {
158            log::warn!(
159                "Proxy failed to forward message {} kernel: {}; {:?}",
160                match finished_wait {
161                    WaitReturn::Container => "from",
162                    WaitReturn::Remote => "to",
163                },
164                name,
165                result,
166            );
167            break 'outer;
168        }
169    }
170
171    trace_instant("starnix_runner:start_proxy:loop:exit", proxy_name);
172    if let Ok(koid) = proxy.message_counter.koid() {
173        wake_sources.lock().remove(&koid);
174    }
175}
176
177/// Forwards any pending messages on `read_channel` to `write_channel`, if the `wait_item.pending`
178/// contains `CHANNEL_READABLE`.
179///
180/// If `message_counter` is `Some`, it will be incremented by one when writing the message to the
181/// write_channel.
182
183fn forward_message(
184    signals: &zx::Signals,
185    read_channel: &zx::Channel,
186    write_channel: &zx::Channel,
187    message_counter: Option<&zx::Counter>,
188    bytes: &mut [MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
189    handles: &mut [MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
190    name: &str,
191) -> Result<(), Error> {
192    trace_duration("starnix_runner:forward_message", name);
193
194    if signals.contains(zx::Signals::CHANNEL_READABLE) {
195        let (actual_bytes, actual_handles) = {
196            match read_channel.read_uninit(bytes, handles) {
197                zx::ChannelReadResult::Ok(r) => r,
198                _ => return Err(anyhow!("Failed to read from channel")),
199            }
200        };
201
202        if let Some(counter) = message_counter {
203            counter.add(1).expect("Failed to add to the proxy's message counter");
204            trace_instant("starnix_runner:forward_message:counter_incremented", name);
205        }
206
207        write_channel.write(actual_bytes, actual_handles)?;
208    }
209
210    // It is important to check for peer closed after readable, in order to flush any
211    // remaining messages in the proxied channel.
212    if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
213        Err(anyhow!("Proxy peer was closed"))
214    } else {
215        Ok(())
216    }
217}
218
219fn trace_duration(event: &'static str, name: &str) {
220    fuchsia_trace::duration!("power", event, "name" => name);
221}
222
223fn trace_instant(event: &'static str, name: &str) {
224    fuchsia_trace::instant!(
225        "power",
226        event,
227        fuchsia_trace::Scope::Process,
228        "name" => name
229    );
230}
231
232#[cfg(test)]
233mod test {
234    use super::{ChannelProxy, fasync, start_proxy};
235    use std::cell::RefCell;
236    use std::mem::MaybeUninit;
237    use std::rc::Rc;
238
239    fn run_proxy_for_test(proxy: ChannelProxy) -> fasync::Task<()> {
240        let bounce_bytes = Rc::new(RefCell::new(
241            [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
242        ));
243        let bounce_handles = Rc::new(RefCell::new(
244            [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
245        ));
246        fasync::Task::local(start_proxy(proxy, Default::default(), bounce_bytes, bounce_handles))
247    }
248
249    #[fuchsia::test]
250    async fn test_peer_closed_kernel() {
251        let (local_client, local_server) = zx::Channel::create();
252        let (remote_client, remote_server) = zx::Channel::create();
253        let message_counter = zx::Counter::create();
254
255        let channel_proxy = ChannelProxy {
256            container_channel: local_server,
257            remote_channel: remote_client,
258            message_counter,
259            name: "test".to_string(),
260        };
261        let _task = run_proxy_for_test(channel_proxy);
262
263        std::mem::drop(local_client);
264
265        fasync::OnSignals::new(remote_server, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
266    }
267
268    #[fuchsia::test]
269    async fn test_peer_closed_remote() {
270        let (local_client, local_server) = zx::Channel::create();
271        let (remote_client, remote_server) = zx::Channel::create();
272        let message_counter = zx::Counter::create();
273
274        let channel_proxy = ChannelProxy {
275            container_channel: local_server,
276            remote_channel: remote_client,
277            message_counter,
278            name: "test".to_string(),
279        };
280        let _task = run_proxy_for_test(channel_proxy);
281
282        std::mem::drop(remote_server);
283
284        fasync::OnSignals::new(local_client, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
285    }
286
287    #[fuchsia::test]
288    async fn test_counter_sequential() {
289        let (_local_client, local_server) = zx::Channel::create();
290        let (remote_client, remote_server) = zx::Channel::create();
291        let message_counter = zx::Counter::create();
292        let local_message_counter = message_counter
293            .duplicate_handle(zx::Rights::SAME_RIGHTS)
294            .expect("Failed to duplicate counter");
295
296        let channel_proxy = ChannelProxy {
297            container_channel: local_server,
298            remote_channel: remote_client,
299            message_counter,
300            name: "test".to_string(),
301        };
302        let _task = run_proxy_for_test(channel_proxy);
303
304        // Send a message and make sure counter is incremented.
305        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
306            .await
307            .unwrap();
308        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
309        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
310            .await
311            .unwrap();
312
313        // Decrement the counter, simulating a read, and make sure it goes back down to zero.
314        local_message_counter.add(-1).expect("Failed add");
315        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
316            .await
317            .unwrap();
318        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
319        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
320            .await
321            .unwrap();
322    }
323
324    #[fuchsia::test]
325    async fn test_counter_multiple() {
326        let (_local_client, local_server) = zx::Channel::create();
327        let (remote_client, remote_server) = zx::Channel::create();
328        let message_counter = zx::Counter::create();
329        let local_message_counter = message_counter
330            .duplicate_handle(zx::Rights::SAME_RIGHTS)
331            .expect("Failed to duplicate counter");
332
333        let channel_proxy = ChannelProxy {
334            container_channel: local_server,
335            remote_channel: remote_client,
336            message_counter,
337            name: "test".to_string(),
338        };
339        let _task = run_proxy_for_test(channel_proxy);
340
341        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
342        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
343        assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
344        fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
345            .await
346            .unwrap();
347        assert_eq!(local_message_counter.read().expect("Failed to read counter"), 3);
348    }
349}