_fuchsia_async_staticlib_rustc_static/
ffi.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_sync::Mutex;
6use futures::channel::oneshot;
7use std::time::Duration;
8use zx_types::{
9    zx_handle_t, zx_packet_signal_t, zx_signals_t, zx_status_t, zx_time_t, ZX_ERR_NOT_SUPPORTED,
10    ZX_OK,
11};
12use {fuchsia_async as fasync, zx_status};
13
14struct EPtr(*mut Executor);
15unsafe impl Send for EPtr {}
16unsafe impl Sync for EPtr {}
17
18impl EPtr {
19    unsafe fn as_ref(&self) -> &Executor {
20        self.0.as_ref().unwrap()
21    }
22
23    unsafe fn as_mut(&mut self) -> &mut Executor {
24        self.0.as_mut().unwrap()
25    }
26}
27
28#[cfg(target_os = "fuchsia")]
29mod fuchsia_details {
30
31    use super::*;
32
33    // The callback holder... gets registered with the executor and receives a packet when
34    // trigger signals are observed. Needs to hold its own registration as dropping that would cause
35    // deregistration.
36    pub(crate) struct WaitEnder {
37        wait: *mut async_wait_t,
38        dispatcher: *mut std::ffi::c_void,
39        registration: Mutex<Registration>,
40    }
41    // Registration state machine:
42    // Unregistered -+-> Registered -+-> Finished
43    //               |               |
44    //               +---------------+
45    // i.e. we start at Unregistered, move to Finished, and for a time we may be Registered, or not.
46    // The or not case happens when we get the receive_packet callback *before* we record the registration on
47    // the WaitEnder instance.
48    enum Registration {
49        // Wait registration not yet recorded.
50        Unregistered,
51        // Wait registered. We expect to receive_packet sometime in the future.
52        // Hold onto the [`ReceiverRegistration`], as dropping it would cause
53        // deregistration.
54        Registered { _receiver: fasync::ReceiverRegistration<WaitEnder> },
55        // We have gotten a receive_packet callback.
56        Finished,
57    }
58    unsafe impl Send for WaitEnder {}
59    unsafe impl Sync for WaitEnder {}
60    impl fasync::PacketReceiver for WaitEnder {
61        fn receive_packet(&self, packet: zx::Packet) {
62            // Record that the receive_packet call has been received... this finishes the
63            // registration state machine.
64            // Lock only needs to be held as long as it takes to note this fact.
65            *self.registration.lock() = Registration::Finished;
66            if let zx::PacketContents::SignalOne(sig) = packet.contents() {
67                unsafe {
68                    ((*self.wait).handler)(
69                        self.dispatcher,
70                        self.wait,
71                        packet.status(),
72                        sig.raw_packet(),
73                    )
74                }
75            } else {
76                panic!("Expected a signal packet");
77            }
78        }
79    }
80    impl WaitEnder {
81        pub(crate) fn new(dispatcher: *mut std::ffi::c_void, wait: *mut async_wait_t) -> WaitEnder {
82            WaitEnder { wait, dispatcher, registration: Mutex::new(Registration::Unregistered) }
83        }
84
85        pub(crate) fn record_registration(
86            &self,
87            registration: fasync::ReceiverRegistration<WaitEnder>,
88        ) {
89            let mut lock = self.registration.lock();
90            if let Registration::Finished =
91                std::mem::replace(&mut *lock, Registration::Registered { _receiver: registration })
92            {
93                // Need to be able to cope with the callback coming before we reach this code and finish
94                // the cycle holding our registration.
95                *lock = Registration::Finished;
96            }
97        }
98    }
99}
100
101pub struct Executor {
102    executor: Mutex<fasync::LocalExecutor>,
103    quit_tx: Mutex<Option<oneshot::Sender<()>>>,
104    start: fasync::MonotonicInstant,
105    cb_executor: *mut std::ffi::c_void,
106}
107
108impl Executor {
109    fn new(cb_executor: *mut std::ffi::c_void) -> Box<Executor> {
110        Box::new(Executor {
111            executor: Mutex::new(fasync::LocalExecutor::new()),
112            quit_tx: Mutex::new(None),
113            start: fasync::MonotonicInstant::now(),
114            cb_executor,
115        })
116    }
117
118    fn run_singlethreaded(&self) {
119        let (tx, rx) = oneshot::channel();
120
121        // We make sure the quit message channel is `None` before replacing with
122        // the new channel. This ensures that even if it panics, the executor
123        // can still be successfully quit afterwards.
124        let mut quit_tx = self.quit_tx.lock();
125        if quit_tx.is_none() {
126            *quit_tx = Some(tx);
127            drop(quit_tx);
128        } else {
129            // `fuchsia_sync::Mutex` doesn't poison on panic, but dropping the
130            // guard before panicking is good practice in case we migrate away
131            // from it in the future.
132            drop(quit_tx);
133            panic!("run_singlethreaded called but the executor is already running");
134        }
135
136        self.executor.lock().run_singlethreaded(async move { rx.await }).unwrap();
137    }
138
139    fn quit(&self) {
140        // These operations are separate to avoid poisoning the quit message
141        // channel if the executor is not running.
142        let quit_tx = self.quit_tx.lock().take();
143        quit_tx.unwrap().send(()).unwrap();
144    }
145
146    #[cfg(target_os = "fuchsia")]
147    unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
148        // TODO: figure out how to change fasync::Executor such that this can be done without allocation.
149
150        use fuchsia_details::*;
151
152        use std::sync::Arc;
153        use zx::AsHandleRef;
154
155        let handle = zx::HandleRef::from_raw_handle((*wait).object);
156        let dispatcher: *mut Executor = &mut *self;
157        let wait_ender = Arc::new(WaitEnder::new(dispatcher as *mut std::ffi::c_void, wait));
158        let registration = fasync::EHandle::local().register_receiver(wait_ender.clone());
159
160        let signals =
161            zx::Signals::from_bits((*wait).trigger).ok_or(zx_status::Status::INVALID_ARGS)?;
162        let options =
163            zx::WaitAsyncOpts::from_bits((*wait).options).ok_or(zx_status::Status::INVALID_ARGS)?;
164        let result =
165            handle.wait_async_handle(registration.port(), registration.key(), signals, options);
166
167        wait_ender.record_registration(registration);
168
169        result
170    }
171
172    #[cfg(not(target_os = "fuchsia"))]
173    unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
174        use fasync::emulated_handle::{Handle, Signals};
175        use fasync::OnSignalsRef;
176
177        let wait_ptr = wait;
178        let dispatcher: *mut Executor = &mut *self;
179        let wait = &mut *wait;
180        // TODO(sadmac): Make sure the handle is guaranteed to remain open for the duration of the
181        // wait, or document what effect that has on correctness.
182        let object = std::mem::ManuallyDrop::new(Handle::from_raw(wait.object));
183        let trigger = Signals::from_bits_retain(wait.trigger);
184        let handler = wait.handler;
185
186        fasync::Task::local(async move {
187            match OnSignalsRef::new(&*object, trigger).await {
188                Ok(sigs) => {
189                    let packet = zx_packet_signal_t {
190                        trigger: trigger.bits(),
191                        observed: sigs.bits(),
192                        count: 1,
193                        timestamp: 1234,
194                    };
195
196                    handler(dispatcher as *mut std::ffi::c_void, wait_ptr, ZX_OK, &packet);
197                }
198                Err(x) => {
199                    handler(
200                        dispatcher as *mut std::ffi::c_void,
201                        wait_ptr,
202                        x.into_raw(),
203                        std::ptr::null_mut(),
204                    );
205                }
206            }
207        })
208        .detach();
209
210        Ok(())
211    }
212
213    fn now(&self) -> zx_time_t {
214        let dur = fasync::MonotonicInstant::now() - self.start;
215        #[cfg(target_os = "fuchsia")]
216        let r = dur.into_nanos();
217        #[cfg(not(target_os = "fuchsia"))]
218        let r = dur.as_nanos() as zx_time_t;
219        r
220    }
221}
222
223/// Duplicated from //sdk/lib/async/include/lib/async/dispatcher.h
224#[repr(C)]
225struct async_state_t {
226    _reserved: [usize; 2],
227}
228
229/// Duplicated from //sdk/lib/async/include/lib/async/wait.h
230#[repr(C)]
231pub(crate) struct async_wait_t {
232    _state: async_state_t,
233    handler: extern "C" fn(
234        *mut std::ffi::c_void,
235        *mut async_wait_t,
236        zx_status_t,
237        *const zx_packet_signal_t,
238    ),
239    object: zx_handle_t,
240    trigger: zx_signals_t,
241    options: u32,
242}
243
244/// Duplicated from //sdk/lib/async/include/lib/async/task.h
245#[repr(C)]
246struct async_task_t {
247    _state: async_state_t,
248    handler: extern "C" fn(*mut std::ffi::c_void, *mut async_task_t, zx_status_t),
249    deadline: zx_time_t,
250}
251
252struct TaskPtr(*mut async_task_t);
253unsafe impl Send for TaskPtr {}
254unsafe impl Sync for TaskPtr {}
255impl TaskPtr {
256    unsafe fn as_ref(&self) -> &async_task_t {
257        self.0.as_ref().unwrap()
258    }
259}
260
261#[no_mangle]
262pub extern "C" fn fasync_executor_create(cb_executor: *mut std::ffi::c_void) -> *mut Executor {
263    Box::into_raw(Executor::new(cb_executor))
264}
265
266/// Runs the given executor in a single-threaded fashion.
267///
268/// This will block the calling thread until the executor is quit with
269/// [`fasync_executor_quit`] or destroyed with [`fasync_executor_destroy`]. Note
270/// that after calling this function, `executor` may be a dangling pointer.
271///
272/// # Panics
273///
274/// Panics if the given executor is already running, for example if another
275/// thread is already calling this function.
276///
277/// # Safety
278///
279/// `executor` must be non-null, properly aligned, and point to an initialized
280/// [`Executor`]. To guarantee these properties, `executor` should be a pointer
281/// returned from [`fasync_executor_create`] that has not yet been passed to
282/// [`fasync_executor_destroy`].
283#[no_mangle]
284pub unsafe extern "C" fn fasync_executor_run_singlethreaded(executor: *mut Executor) {
285    EPtr(executor).as_ref().run_singlethreaded()
286}
287
288/// Signals the given executor to shut down.
289///
290/// This function does not wait for the executor to finish shutting down. After
291/// calling this function, running tasks may continue to be processed until the
292/// executor processes the shutdown signal. Once the executor shuts down,
293/// [`fasync_executor_run_singlethreaded`] will return.
294///
295/// # Panics
296///
297/// Panics if the given executor is not currently running.
298///
299/// # Safety
300///
301/// `executor` must be non-null, properly aligned, and point to an initialized
302/// [`Executor`]. To guarantee these properties, `executor` should be a pointer
303/// returned from [`fasync_executor_create`] that has not yet been passed to
304/// [`fasync_executor_destroy`].
305#[no_mangle]
306pub unsafe extern "C" fn fasync_executor_quit(executor: *mut Executor) {
307    EPtr(executor).as_ref().quit()
308}
309
310/// Drops the given executor.
311///
312/// This will block the current thread until all tasks that are currently
313/// spawned onto the executor have been dropped. After calling this function,
314/// `executor` no longer points to an initialized [`Executor`].
315///
316/// # Safety
317///
318/// `executor` must be non-null, properly aligned, and point to an initialized
319/// [`Executor`] that is not currently running. To guarantee these properties,
320/// `executor` should be a pointer returned from [`fasync_executor_create`] that
321/// has not yet been passed to [`fasync_executor_destroy`] and is not currently
322/// running in a call to [`fasync_executor_run_singlethreaded`].
323#[no_mangle]
324pub unsafe extern "C" fn fasync_executor_destroy(executor: *mut Executor) {
325    drop(Box::from_raw(executor))
326}
327
328#[no_mangle]
329unsafe extern "C" fn fasync_executor_now(executor: *mut Executor) -> zx_time_t {
330    EPtr(executor).as_ref().now()
331}
332
333#[no_mangle]
334unsafe extern "C" fn fasync_executor_begin_wait(
335    executor: *mut Executor,
336    wait: *mut async_wait_t,
337) -> zx_status_t {
338    zx_status::Status::from_result(EPtr(executor).as_mut().begin_wait(wait)).into_raw()
339}
340
341#[no_mangle]
342unsafe extern "C" fn fasync_executor_cancel_wait(
343    _executor: *mut Executor,
344    _wait: *mut async_wait_t,
345) -> zx_status_t {
346    ZX_ERR_NOT_SUPPORTED
347}
348
349#[no_mangle]
350unsafe extern "C" fn fasync_executor_post_task(
351    executor: *mut Executor,
352    task: *mut async_task_t,
353) -> zx_status_t {
354    let executor = EPtr(executor);
355    let task = TaskPtr(task);
356    fasync::Task::spawn(async move {
357        let deadline = Duration::from_nanos(task.as_ref().deadline as u64);
358        let start = executor.as_ref().start;
359        fasync::Timer::new(start + deadline.into()).await;
360        (task.as_ref().handler)(executor.as_ref().cb_executor, task.0, ZX_OK)
361    })
362    .detach();
363    ZX_OK
364}
365
366#[no_mangle]
367unsafe extern "C" fn fasync_executor_cancel_task(
368    _executor: *mut Executor,
369    _task: *mut async_task_t,
370) -> zx_status_t {
371    ZX_ERR_NOT_SUPPORTED
372}