_fuchsia_async_staticlib_rustc_static/
ffi.rs1use fuchsia_sync::Mutex;
6use futures::channel::oneshot;
7use std::time::Duration;
8use zx_types::{
9 zx_handle_t, zx_packet_signal_t, zx_signals_t, zx_status_t, zx_time_t, ZX_ERR_NOT_SUPPORTED,
10 ZX_OK,
11};
12use {fuchsia_async as fasync, zx_status};
13
14struct EPtr(*mut Executor);
15unsafe impl Send for EPtr {}
16unsafe impl Sync for EPtr {}
17
18impl EPtr {
19 unsafe fn as_ref(&self) -> &Executor {
20 self.0.as_ref().unwrap()
21 }
22
23 unsafe fn as_mut(&mut self) -> &mut Executor {
24 self.0.as_mut().unwrap()
25 }
26}
27
28#[cfg(target_os = "fuchsia")]
29mod fuchsia_details {
30
31 use super::*;
32
33 pub(crate) struct WaitEnder {
37 wait: *mut async_wait_t,
38 dispatcher: *mut std::ffi::c_void,
39 registration: Mutex<Registration>,
40 }
41 enum Registration {
49 Unregistered,
51 Registered { _receiver: fasync::ReceiverRegistration<WaitEnder> },
55 Finished,
57 }
58 unsafe impl Send for WaitEnder {}
59 unsafe impl Sync for WaitEnder {}
60 impl fasync::PacketReceiver for WaitEnder {
61 fn receive_packet(&self, packet: zx::Packet) {
62 *self.registration.lock() = Registration::Finished;
66 if let zx::PacketContents::SignalOne(sig) = packet.contents() {
67 unsafe {
68 ((*self.wait).handler)(
69 self.dispatcher,
70 self.wait,
71 packet.status(),
72 sig.raw_packet(),
73 )
74 }
75 } else {
76 panic!("Expected a signal packet");
77 }
78 }
79 }
80 impl WaitEnder {
81 pub(crate) fn new(dispatcher: *mut std::ffi::c_void, wait: *mut async_wait_t) -> WaitEnder {
82 WaitEnder { wait, dispatcher, registration: Mutex::new(Registration::Unregistered) }
83 }
84
85 pub(crate) fn record_registration(
86 &self,
87 registration: fasync::ReceiverRegistration<WaitEnder>,
88 ) {
89 let mut lock = self.registration.lock();
90 if let Registration::Finished =
91 std::mem::replace(&mut *lock, Registration::Registered { _receiver: registration })
92 {
93 *lock = Registration::Finished;
96 }
97 }
98 }
99}
100
101pub struct Executor {
102 executor: Mutex<fasync::LocalExecutor>,
103 quit_tx: Mutex<Option<oneshot::Sender<()>>>,
104 start: fasync::MonotonicInstant,
105 cb_executor: *mut std::ffi::c_void,
106}
107
108impl Executor {
109 fn new(cb_executor: *mut std::ffi::c_void) -> Box<Executor> {
110 Box::new(Executor {
111 executor: Mutex::new(fasync::LocalExecutor::new()),
112 quit_tx: Mutex::new(None),
113 start: fasync::MonotonicInstant::now(),
114 cb_executor,
115 })
116 }
117
118 fn run_singlethreaded(&self) {
119 let (tx, rx) = oneshot::channel();
120
121 let mut quit_tx = self.quit_tx.lock();
125 if quit_tx.is_none() {
126 *quit_tx = Some(tx);
127 drop(quit_tx);
128 } else {
129 drop(quit_tx);
133 panic!("run_singlethreaded called but the executor is already running");
134 }
135
136 self.executor.lock().run_singlethreaded(async move { rx.await }).unwrap();
137 }
138
139 fn quit(&self) {
140 let quit_tx = self.quit_tx.lock().take();
143 quit_tx.unwrap().send(()).unwrap();
144 }
145
146 #[cfg(target_os = "fuchsia")]
147 unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
148 use fuchsia_details::*;
151
152 use std::sync::Arc;
153 use zx::AsHandleRef;
154
155 let handle = zx::HandleRef::from_raw_handle((*wait).object);
156 let dispatcher: *mut Executor = &mut *self;
157 let wait_ender = Arc::new(WaitEnder::new(dispatcher as *mut std::ffi::c_void, wait));
158 let registration = fasync::EHandle::local().register_receiver(wait_ender.clone());
159
160 let signals =
161 zx::Signals::from_bits((*wait).trigger).ok_or(zx_status::Status::INVALID_ARGS)?;
162 let options =
163 zx::WaitAsyncOpts::from_bits((*wait).options).ok_or(zx_status::Status::INVALID_ARGS)?;
164 let result =
165 handle.wait_async_handle(registration.port(), registration.key(), signals, options);
166
167 wait_ender.record_registration(registration);
168
169 result
170 }
171
172 #[cfg(not(target_os = "fuchsia"))]
173 unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
174 use fasync::emulated_handle::{Handle, Signals};
175 use fasync::OnSignalsRef;
176
177 let wait_ptr = wait;
178 let dispatcher: *mut Executor = &mut *self;
179 let wait = &mut *wait;
180 let object = std::mem::ManuallyDrop::new(Handle::from_raw(wait.object));
183 let trigger = Signals::from_bits_retain(wait.trigger);
184 let handler = wait.handler;
185
186 fasync::Task::local(async move {
187 match OnSignalsRef::new(&*object, trigger).await {
188 Ok(sigs) => {
189 let packet = zx_packet_signal_t {
190 trigger: trigger.bits(),
191 observed: sigs.bits(),
192 count: 1,
193 timestamp: 1234,
194 };
195
196 handler(dispatcher as *mut std::ffi::c_void, wait_ptr, ZX_OK, &packet);
197 }
198 Err(x) => {
199 handler(
200 dispatcher as *mut std::ffi::c_void,
201 wait_ptr,
202 x.into_raw(),
203 std::ptr::null_mut(),
204 );
205 }
206 }
207 })
208 .detach();
209
210 Ok(())
211 }
212
213 fn now(&self) -> zx_time_t {
214 let dur = fasync::MonotonicInstant::now() - self.start;
215 #[cfg(target_os = "fuchsia")]
216 let r = dur.into_nanos();
217 #[cfg(not(target_os = "fuchsia"))]
218 let r = dur.as_nanos() as zx_time_t;
219 r
220 }
221}
222
223#[repr(C)]
225struct async_state_t {
226 _reserved: [usize; 2],
227}
228
229#[repr(C)]
231pub(crate) struct async_wait_t {
232 _state: async_state_t,
233 handler: extern "C" fn(
234 *mut std::ffi::c_void,
235 *mut async_wait_t,
236 zx_status_t,
237 *const zx_packet_signal_t,
238 ),
239 object: zx_handle_t,
240 trigger: zx_signals_t,
241 options: u32,
242}
243
244#[repr(C)]
246struct async_task_t {
247 _state: async_state_t,
248 handler: extern "C" fn(*mut std::ffi::c_void, *mut async_task_t, zx_status_t),
249 deadline: zx_time_t,
250}
251
252struct TaskPtr(*mut async_task_t);
253unsafe impl Send for TaskPtr {}
254unsafe impl Sync for TaskPtr {}
255impl TaskPtr {
256 unsafe fn as_ref(&self) -> &async_task_t {
257 self.0.as_ref().unwrap()
258 }
259}
260
261#[no_mangle]
262pub extern "C" fn fasync_executor_create(cb_executor: *mut std::ffi::c_void) -> *mut Executor {
263 Box::into_raw(Executor::new(cb_executor))
264}
265
266#[no_mangle]
284pub unsafe extern "C" fn fasync_executor_run_singlethreaded(executor: *mut Executor) {
285 EPtr(executor).as_ref().run_singlethreaded()
286}
287
288#[no_mangle]
306pub unsafe extern "C" fn fasync_executor_quit(executor: *mut Executor) {
307 EPtr(executor).as_ref().quit()
308}
309
310#[no_mangle]
324pub unsafe extern "C" fn fasync_executor_destroy(executor: *mut Executor) {
325 drop(Box::from_raw(executor))
326}
327
328#[no_mangle]
329unsafe extern "C" fn fasync_executor_now(executor: *mut Executor) -> zx_time_t {
330 EPtr(executor).as_ref().now()
331}
332
333#[no_mangle]
334unsafe extern "C" fn fasync_executor_begin_wait(
335 executor: *mut Executor,
336 wait: *mut async_wait_t,
337) -> zx_status_t {
338 zx_status::Status::from_result(EPtr(executor).as_mut().begin_wait(wait)).into_raw()
339}
340
341#[no_mangle]
342unsafe extern "C" fn fasync_executor_cancel_wait(
343 _executor: *mut Executor,
344 _wait: *mut async_wait_t,
345) -> zx_status_t {
346 ZX_ERR_NOT_SUPPORTED
347}
348
349#[no_mangle]
350unsafe extern "C" fn fasync_executor_post_task(
351 executor: *mut Executor,
352 task: *mut async_task_t,
353) -> zx_status_t {
354 let executor = EPtr(executor);
355 let task = TaskPtr(task);
356 fasync::Task::spawn(async move {
357 let deadline = Duration::from_nanos(task.as_ref().deadline as u64);
358 let start = executor.as_ref().start;
359 fasync::Timer::new(start + deadline.into()).await;
360 (task.as_ref().handler)(executor.as_ref().cb_executor, task.0, ZX_OK)
361 })
362 .detach();
363 ZX_OK
364}
365
366#[no_mangle]
367unsafe extern "C" fn fasync_executor_cancel_task(
368 _executor: *mut Executor,
369 _task: *mut async_task_t,
370) -> zx_status_t {
371 ZX_ERR_NOT_SUPPORTED
372}