1use crate::suspend::WakeSources;
6use anyhow::{Error, anyhow};
7use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use log::warn;
11use std::cell::RefCell;
12use std::mem::MaybeUninit;
13use std::rc::Rc;
14use std::sync::Arc;
15
16pub struct ChannelProxy {
22 pub container_channel: zx::Channel,
24
25 pub remote_channel: zx::Channel,
27
28 pub message_counter: zx::Counter,
31
32 pub name: String,
34}
35
36#[derive(Debug)]
38enum WaitReturn {
39 Container,
40 Remote,
41}
42
43const PROXY_ROLE_NAME: &str = "fuchsia.starnix.runner.proxy";
45
46pub fn run_proxy_thread(
48 new_proxies: async_channel::Receiver<(ChannelProxy, Arc<Mutex<WakeSources>>)>,
49) {
50 let _ = std::thread::Builder::new().name("proxy_thread".to_string()).spawn(move || {
51 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(PROXY_ROLE_NAME) {
52 warn!(e:%; "failed to set thread role");
53 }
54 let mut executor = fasync::LocalExecutor::default();
55 executor.run_singlethreaded(async move {
56 let mut tasks = fasync::TaskGroup::new();
57 let bounce_bytes = Rc::new(RefCell::new(
58 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
59 ));
60 let bounce_handles = Rc::new(RefCell::new(
61 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
62 ));
63 while let Ok((proxy, events)) = new_proxies.recv().await {
64 let bytes_clone = bounce_bytes.clone();
65 let handles_clone = bounce_handles.clone();
66 tasks.local(start_proxy(proxy, events, bytes_clone, handles_clone));
67 }
68 });
69 });
70}
71
72async fn start_proxy(
78 proxy: ChannelProxy,
79 wake_sources: Arc<Mutex<WakeSources>>,
80 bounce_bytes: Rc<RefCell<[MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize]>>,
81 bounce_handles: Rc<
82 RefCell<[MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize]>,
83 >,
84) {
85 let proxy_name = proxy.name.as_str();
86 trace_instant("starnix_runner:start_proxy:loop:enter", proxy_name);
87
88 'outer: loop {
89 let mut container_wait = fasync::OnSignals::new(
91 proxy.container_channel.as_handle_ref(),
92 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
93 )
94 .fuse();
95 let mut remote_wait = fasync::OnSignals::new(
96 proxy.remote_channel.as_handle_ref(),
97 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
98 )
99 .fuse();
100
101 let (signals, finished_wait) = {
102 trace_duration("starnix_runner:start_proxy:wait_for_messages", proxy_name);
103 let result = futures::select! {
104 res = container_wait => {
105 trace_instant("starnix_runner:start_proxy:container_readable", proxy_name);
106 res.map(|s| (s, WaitReturn::Container))
107 },
108 res = remote_wait => {
109 trace_instant("starnix_runner:start_proxy:remote_readable", proxy_name);
110 res.map(|s| (s, WaitReturn::Remote))
111 },
112 };
113
114 match result {
115 Ok(result) => result,
116 Err(e) => {
117 trace_instant("starnix_runner:start_proxy:result:error", proxy_name);
118 log::warn!("Failed to wait on proxied channels in runner: {:?}", e);
119 break 'outer;
120 }
121 }
122 };
123
124 let name = proxy.name.as_str();
128 let result = match finished_wait {
129 WaitReturn::Container => forward_message(
130 &signals,
131 &proxy.container_channel,
132 &proxy.remote_channel,
133 None,
134 &mut bounce_bytes.borrow_mut(),
135 &mut bounce_handles.borrow_mut(),
136 name,
137 ),
138 WaitReturn::Remote => forward_message(
139 &signals,
140 &proxy.remote_channel,
141 &proxy.container_channel,
142 Some(&proxy.message_counter),
143 &mut bounce_bytes.borrow_mut(),
144 &mut bounce_handles.borrow_mut(),
145 name,
146 ),
147 };
148
149 if result.is_err() {
150 log::warn!(
151 "Proxy failed to forward message {} kernel: {}; {:?}",
152 match finished_wait {
153 WaitReturn::Container => "from",
154 WaitReturn::Remote => "to",
155 },
156 name,
157 result,
158 );
159 break 'outer;
160 }
161 }
162
163 trace_instant("starnix_runner:start_proxy:loop:exit", proxy_name);
164 if let Ok(koid) = proxy.message_counter.koid() {
165 wake_sources.lock().remove(&koid);
166 }
167}
168
169fn forward_message(
176 signals: &zx::Signals,
177 read_channel: &zx::Channel,
178 write_channel: &zx::Channel,
179 message_counter: Option<&zx::Counter>,
180 bytes: &mut [MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
181 handles: &mut [MaybeUninit<zx::NullableHandle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
182 name: &str,
183) -> Result<(), Error> {
184 trace_duration("starnix_runner:forward_message", name);
185
186 if signals.contains(zx::Signals::CHANNEL_READABLE) {
187 let (actual_bytes, actual_handles) = {
188 match read_channel.read_uninit(bytes, handles) {
189 zx::ChannelReadResult::Ok(r) => r,
190 _ => return Err(anyhow!("Failed to read from channel")),
191 }
192 };
193
194 if let Some(counter) = message_counter {
195 counter.add(1).expect("Failed to add to the proxy's message counter");
196 trace_instant("starnix_runner:forward_message:counter_incremented", name);
197 }
198
199 write_channel.write(actual_bytes, actual_handles)?;
200 }
201
202 if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
205 Err(anyhow!("Proxy peer was closed"))
206 } else {
207 Ok(())
208 }
209}
210
211fn trace_duration(event: &'static str, name: &str) {
212 fuchsia_trace::duration!("power", event, "name" => name);
213}
214
215fn trace_instant(event: &'static str, name: &str) {
216 fuchsia_trace::instant!(
217 "power",
218 event,
219 fuchsia_trace::Scope::Process,
220 "name" => name
221 );
222}
223
224#[cfg(test)]
225mod test {
226 use super::{ChannelProxy, fasync, start_proxy};
227 use fidl::HandleBased;
228 use std::cell::RefCell;
229 use std::mem::MaybeUninit;
230 use std::rc::Rc;
231
232 fn run_proxy_for_test(proxy: ChannelProxy) -> fasync::Task<()> {
233 let bounce_bytes = Rc::new(RefCell::new(
234 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
235 ));
236 let bounce_handles = Rc::new(RefCell::new(
237 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
238 ));
239 fasync::Task::local(start_proxy(proxy, Default::default(), bounce_bytes, bounce_handles))
240 }
241
242 #[fuchsia::test]
243 async fn test_peer_closed_kernel() {
244 let (local_client, local_server) = zx::Channel::create();
245 let (remote_client, remote_server) = zx::Channel::create();
246 let message_counter = zx::Counter::create();
247
248 let channel_proxy = ChannelProxy {
249 container_channel: local_server,
250 remote_channel: remote_client,
251 message_counter,
252 name: "test".to_string(),
253 };
254 let _task = run_proxy_for_test(channel_proxy);
255
256 std::mem::drop(local_client);
257
258 fasync::OnSignals::new(remote_server, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
259 }
260
261 #[fuchsia::test]
262 async fn test_peer_closed_remote() {
263 let (local_client, local_server) = zx::Channel::create();
264 let (remote_client, remote_server) = zx::Channel::create();
265 let message_counter = zx::Counter::create();
266
267 let channel_proxy = ChannelProxy {
268 container_channel: local_server,
269 remote_channel: remote_client,
270 message_counter,
271 name: "test".to_string(),
272 };
273 let _task = run_proxy_for_test(channel_proxy);
274
275 std::mem::drop(remote_server);
276
277 fasync::OnSignals::new(local_client, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
278 }
279
280 #[fuchsia::test]
281 async fn test_counter_sequential() {
282 let (_local_client, local_server) = zx::Channel::create();
283 let (remote_client, remote_server) = zx::Channel::create();
284 let message_counter = zx::Counter::create();
285 let local_message_counter = message_counter
286 .duplicate_handle(zx::Rights::SAME_RIGHTS)
287 .expect("Failed to duplicate counter");
288
289 let channel_proxy = ChannelProxy {
290 container_channel: local_server,
291 remote_channel: remote_client,
292 message_counter,
293 name: "test".to_string(),
294 };
295 let _task = run_proxy_for_test(channel_proxy);
296
297 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
299 .await
300 .unwrap();
301 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
302 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
303 .await
304 .unwrap();
305
306 local_message_counter.add(-1).expect("Failed add");
308 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
309 .await
310 .unwrap();
311 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
312 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
313 .await
314 .unwrap();
315 }
316
317 #[fuchsia::test]
318 async fn test_counter_multiple() {
319 let (_local_client, local_server) = zx::Channel::create();
320 let (remote_client, remote_server) = zx::Channel::create();
321 let message_counter = zx::Counter::create();
322 let local_message_counter = message_counter
323 .duplicate_handle(zx::Rights::SAME_RIGHTS)
324 .expect("Failed to duplicate counter");
325
326 let channel_proxy = ChannelProxy {
327 container_channel: local_server,
328 remote_channel: remote_client,
329 message_counter,
330 name: "test".to_string(),
331 };
332 let _task = run_proxy_for_test(channel_proxy);
333
334 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
335 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
336 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
337 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
338 .await
339 .unwrap();
340 assert_eq!(local_message_counter.read().expect("Failed to read counter"), 3);
341 }
342}