1use crate::suspend::WakeSources;
6use anyhow::{Error, anyhow};
7use fuchsia_async as fasync;
8use futures::FutureExt;
9use log::warn;
10use starnix_sync::{LockDepMutex, TerminalLock};
11use std::cell::RefCell;
12use std::mem::MaybeUninit;
13use std::pin::pin;
14use std::rc::Rc;
15use std::sync::Arc;
16
17pub struct ChannelProxy {
23 pub container_channel: zx::Channel,
25
26 pub remote_channel: zx::Channel,
28
29 pub message_counter: zx::Counter,
32
33 pub name: String,
35}
36
37#[derive(Debug)]
39enum WaitReturn {
40 Container,
41 Remote,
42}
43
44const PROXY_ROLE_NAME: &str = "fuchsia.starnix.runner.proxy";
46
47pub fn run_proxy_thread(
49 new_proxies: async_channel::Receiver<(
50 ChannelProxy,
51 Arc<LockDepMutex<WakeSources, TerminalLock>>,
52 )>,
53) {
54 let _ = std::thread::Builder::new().name("proxy_thread".to_string()).spawn(move || {
55 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(PROXY_ROLE_NAME) {
56 warn!(e:%; "failed to set thread role");
57 }
58 let mut executor = fasync::LocalExecutor::default();
59 executor.run_singlethreaded(async move {
60 let tasks = fasync::Scope::new();
61 let bounce_bytes = Rc::new(RefCell::new(
62 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
63 ));
64 let bounce_handles = Rc::new(RefCell::new(
65 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
66 ));
67 while let Ok((proxy, events)) = new_proxies.recv().await {
68 let bytes_clone = bounce_bytes.clone();
69 let handles_clone = bounce_handles.clone();
70 tasks.spawn_local(start_proxy(proxy, events, bytes_clone, handles_clone));
71 }
72 });
73 });
74}
75
76async fn start_proxy(
82 proxy: ChannelProxy,
83 wake_sources: Arc<LockDepMutex<WakeSources, TerminalLock>>,
84 bounce_bytes: Rc<RefCell<[MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize]>>,
85 bounce_handles: Rc<
86 RefCell<[MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize]>,
87 >,
88) {
89 let proxy_name = proxy.name.as_str();
90 trace_instant("starnix_runner:start_proxy:loop:enter", proxy_name);
91
92 'outer: loop {
93 let mut container_wait = pin!(
95 fasync::OnSignals::new(
96 proxy.container_channel.as_handle_ref(),
97 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
98 )
99 .fuse()
100 );
101 let mut remote_wait = pin!(
102 fasync::OnSignals::new(
103 proxy.remote_channel.as_handle_ref(),
104 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
105 )
106 .fuse()
107 );
108
109 let (signals, finished_wait) = {
110 trace_duration("starnix_runner:start_proxy:wait_for_messages", proxy_name);
111 let result = futures::select! {
112 res = container_wait => {
113 trace_instant("starnix_runner:start_proxy:container_readable", proxy_name);
114 res.map(|s| (s, WaitReturn::Container))
115 },
116 res = remote_wait => {
117 trace_instant("starnix_runner:start_proxy:remote_readable", proxy_name);
118 res.map(|s| (s, WaitReturn::Remote))
119 },
120 };
121
122 match result {
123 Ok(result) => result,
124 Err(e) => {
125 trace_instant("starnix_runner:start_proxy:result:error", proxy_name);
126 log::warn!("Failed to wait on proxied channels in runner: {:?}", e);
127 break 'outer;
128 }
129 }
130 };
131
132 let name = proxy.name.as_str();
136 let result = match finished_wait {
137 WaitReturn::Container => forward_message(
138 &signals,
139 &proxy.container_channel,
140 &proxy.remote_channel,
141 None,
142 &mut bounce_bytes.borrow_mut(),
143 &mut bounce_handles.borrow_mut(),
144 name,
145 ),
146 WaitReturn::Remote => forward_message(
147 &signals,
148 &proxy.remote_channel,
149 &proxy.container_channel,
150 Some(&proxy.message_counter),
151 &mut bounce_bytes.borrow_mut(),
152 &mut bounce_handles.borrow_mut(),
153 name,
154 ),
155 };
156
157 if result.is_err() {
158 log::warn!(
159 "Proxy failed to forward message {} kernel: {}; {:?}",
160 match finished_wait {
161 WaitReturn::Container => "from",
162 WaitReturn::Remote => "to",
163 },
164 name,
165 result,
166 );
167 break 'outer;
168 }
169 }
170
171 trace_instant("starnix_runner:start_proxy:loop:exit", proxy_name);
172 if let Ok(koid) = proxy.message_counter.koid() {
173 wake_sources.lock().remove(&koid);
174 }
175}
176
177fn forward_message(
184 signals: &zx::Signals,
185 read_channel: &zx::Channel,
186 write_channel: &zx::Channel,
187 message_counter: Option<&zx::Counter>,
188 bytes: &mut [MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
189 handles: &mut [MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
190 name: &str,
191) -> Result<(), Error> {
192 trace_duration("starnix_runner:forward_message", name);
193
194 if signals.contains(zx::Signals::CHANNEL_READABLE) {
195 let (actual_bytes, actual_handles) = {
196 match read_channel.read_uninit(bytes, handles) {
197 zx::ChannelReadResult::Ok(r) => r,
198 _ => return Err(anyhow!("Failed to read from channel")),
199 }
200 };
201
202 if let Some(counter) = message_counter {
203 counter.add(1).expect("Failed to add to the proxy's message counter");
204 trace_instant("starnix_runner:forward_message:counter_incremented", name);
205 }
206
207 write_channel.write(actual_bytes, actual_handles)?;
208 }
209
210 if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
213 Err(anyhow!("Proxy peer was closed"))
214 } else {
215 Ok(())
216 }
217}
218
219fn trace_duration(event: &'static str, name: &str) {
220 fuchsia_trace::duration!("power", event, "name" => name);
221}
222
223fn trace_instant(event: &'static str, name: &str) {
224 fuchsia_trace::instant!(
225 "power",
226 event,
227 fuchsia_trace::Scope::Process,
228 "name" => name
229 );
230}
231
232#[cfg(test)]
233mod test {
234 use super::{ChannelProxy, fasync, start_proxy};
235 use std::cell::RefCell;
236 use std::mem::MaybeUninit;
237 use std::rc::Rc;
238
239 fn run_proxy_for_test(proxy: ChannelProxy) -> fasync::Task<()> {
240 let bounce_bytes = Rc::new(RefCell::new(
241 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
242 ));
243 let bounce_handles = Rc::new(RefCell::new(
244 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
245 ));
246 fasync::Task::local(start_proxy(proxy, Default::default(), bounce_bytes, bounce_handles))
247 }
248
249 #[fuchsia::test]
250 async fn test_peer_closed_kernel() {
251 let (local_client, local_server) = zx::Channel::create();
252 let (remote_client, remote_server) = zx::Channel::create();
253 let message_counter = zx::Counter::create();
254
255 let channel_proxy = ChannelProxy {
256 container_channel: local_server,
257 remote_channel: remote_client,
258 message_counter,
259 name: "test".to_string(),
260 };
261 let _task = run_proxy_for_test(channel_proxy);
262
263 std::mem::drop(local_client);
264
265 fasync::OnSignals::new(remote_server, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
266 }
267
268 #[fuchsia::test]
269 async fn test_peer_closed_remote() {
270 let (local_client, local_server) = zx::Channel::create();
271 let (remote_client, remote_server) = zx::Channel::create();
272 let message_counter = zx::Counter::create();
273
274 let channel_proxy = ChannelProxy {
275 container_channel: local_server,
276 remote_channel: remote_client,
277 message_counter,
278 name: "test".to_string(),
279 };
280 let _task = run_proxy_for_test(channel_proxy);
281
282 std::mem::drop(remote_server);
283
284 fasync::OnSignals::new(local_client, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
285 }
286
287 #[fuchsia::test]
288 async fn test_counter_sequential() {
289 let (_local_client, local_server) = zx::Channel::create();
290 let (remote_client, remote_server) = zx::Channel::create();
291 let message_counter = zx::Counter::create();
292 let local_message_counter = message_counter
293 .duplicate_handle(zx::Rights::SAME_RIGHTS)
294 .expect("Failed to duplicate counter");
295
296 let channel_proxy = ChannelProxy {
297 container_channel: local_server,
298 remote_channel: remote_client,
299 message_counter,
300 name: "test".to_string(),
301 };
302 let _task = run_proxy_for_test(channel_proxy);
303
304 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
306 .await
307 .unwrap();
308 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
309 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
310 .await
311 .unwrap();
312
313 local_message_counter.add(-1).expect("Failed add");
315 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
316 .await
317 .unwrap();
318 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
319 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
320 .await
321 .unwrap();
322 }
323
324 #[fuchsia::test]
325 async fn test_counter_multiple() {
326 let (_local_client, local_server) = zx::Channel::create();
327 let (remote_client, remote_server) = zx::Channel::create();
328 let message_counter = zx::Counter::create();
329 let local_message_counter = message_counter
330 .duplicate_handle(zx::Rights::SAME_RIGHTS)
331 .expect("Failed to duplicate counter");
332
333 let channel_proxy = ChannelProxy {
334 container_channel: local_server,
335 remote_channel: remote_client,
336 message_counter,
337 name: "test".to_string(),
338 };
339 let _task = run_proxy_for_test(channel_proxy);
340
341 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
342 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
343 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
344 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
345 .await
346 .unwrap();
347 assert_eq!(local_message_counter.read().expect("Failed to read counter"), 3);
348 }
349}