1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use fuchsia_sync::Mutex;
6use futures::channel::oneshot;
7use std::time::Duration;
8use zx_types::{
9 zx_handle_t, zx_packet_signal_t, zx_signals_t, zx_status_t, zx_time_t, ZX_ERR_NOT_SUPPORTED,
10 ZX_OK,
11};
12use {fuchsia_async as fasync, zx_status};
1314struct EPtr(*mut Executor);
15unsafe impl Send for EPtr {}
16unsafe impl Sync for EPtr {}
1718impl EPtr {
19unsafe fn as_ref(&self) -> &Executor {
20self.0.as_ref().unwrap()
21 }
2223unsafe fn as_mut(&mut self) -> &mut Executor {
24self.0.as_mut().unwrap()
25 }
26}
2728#[cfg(target_os = "fuchsia")]
29mod fuchsia_details {
3031use super::*;
3233// The callback holder... gets registered with the executor and receives a packet when
34 // trigger signals are observed. Needs to hold its own registration as dropping that would cause
35 // deregistration.
36pub(crate) struct WaitEnder {
37 wait: *mut async_wait_t,
38 dispatcher: *mut std::ffi::c_void,
39 registration: Mutex<Registration>,
40 }
41// Registration state machine:
42 // Unregistered -+-> Registered -+-> Finished
43 // | |
44 // +---------------+
45 // i.e. we start at Unregistered, move to Finished, and for a time we may be Registered, or not.
46 // The or not case happens when we get the receive_packet callback *before* we record the registration on
47 // the WaitEnder instance.
48enum Registration {
49// Wait registration not yet recorded.
50Unregistered,
51// Wait registered. We expect to receive_packet sometime in the future.
52 // Hold onto the [`ReceiverRegistration`], as dropping it would cause
53 // deregistration.
54Registered { _receiver: fasync::ReceiverRegistration<WaitEnder> },
55// We have gotten a receive_packet callback.
56Finished,
57 }
58unsafe impl Send for WaitEnder {}
59unsafe impl Sync for WaitEnder {}
60impl fasync::PacketReceiver for WaitEnder {
61fn receive_packet(&self, packet: zx::Packet) {
62// Record that the receive_packet call has been received... this finishes the
63 // registration state machine.
64 // Lock only needs to be held as long as it takes to note this fact.
65*self.registration.lock() = Registration::Finished;
66if let zx::PacketContents::SignalOne(sig) = packet.contents() {
67unsafe {
68 ((*self.wait).handler)(
69self.dispatcher,
70self.wait,
71 packet.status(),
72 sig.raw_packet(),
73 )
74 }
75 } else {
76panic!("Expected a signal packet");
77 }
78 }
79 }
80impl WaitEnder {
81pub(crate) fn new(dispatcher: *mut std::ffi::c_void, wait: *mut async_wait_t) -> WaitEnder {
82 WaitEnder { wait, dispatcher, registration: Mutex::new(Registration::Unregistered) }
83 }
8485pub(crate) fn record_registration(
86&self,
87 registration: fasync::ReceiverRegistration<WaitEnder>,
88 ) {
89let mut lock = self.registration.lock();
90if let Registration::Finished =
91 std::mem::replace(&mut *lock, Registration::Registered { _receiver: registration })
92 {
93// Need to be able to cope with the callback coming before we reach this code and finish
94 // the cycle holding our registration.
95*lock = Registration::Finished;
96 }
97 }
98 }
99}
100101pub struct Executor {
102 executor: Mutex<fasync::LocalExecutor>,
103 quit_tx: Mutex<Option<oneshot::Sender<()>>>,
104 start: fasync::MonotonicInstant,
105 cb_executor: *mut std::ffi::c_void,
106}
107108impl Executor {
109fn new(cb_executor: *mut std::ffi::c_void) -> Box<Executor> {
110 Box::new(Executor {
111 executor: Mutex::new(fasync::LocalExecutor::new()),
112 quit_tx: Mutex::new(None),
113 start: fasync::MonotonicInstant::now(),
114 cb_executor,
115 })
116 }
117118fn run_singlethreaded(&self) {
119let (tx, rx) = oneshot::channel();
120121// We make sure the quit message channel is `None` before replacing with
122 // the new channel. This ensures that even if it panics, the executor
123 // can still be successfully quit afterwards.
124let mut quit_tx = self.quit_tx.lock();
125if quit_tx.is_none() {
126*quit_tx = Some(tx);
127 drop(quit_tx);
128 } else {
129// `fuchsia_sync::Mutex` doesn't poison on panic, but dropping the
130 // guard before panicking is good practice in case we migrate away
131 // from it in the future.
132drop(quit_tx);
133panic!("run_singlethreaded called but the executor is already running");
134 }
135136self.executor.lock().run_singlethreaded(async move { rx.await }).unwrap();
137 }
138139fn quit(&self) {
140// These operations are separate to avoid poisoning the quit message
141 // channel if the executor is not running.
142let quit_tx = self.quit_tx.lock().take();
143 quit_tx.unwrap().send(()).unwrap();
144 }
145146#[cfg(target_os = "fuchsia")]
147unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
148// TODO: figure out how to change fasync::Executor such that this can be done without allocation.
149150use fuchsia_details::*;
151152use std::sync::Arc;
153use zx::AsHandleRef;
154155let handle = zx::HandleRef::from_raw_handle((*wait).object);
156let dispatcher: *mut Executor = &mut *self;
157let wait_ender = Arc::new(WaitEnder::new(dispatcher as *mut std::ffi::c_void, wait));
158let registration = fasync::EHandle::local().register_receiver(wait_ender.clone());
159160let signals =
161 zx::Signals::from_bits((*wait).trigger).ok_or(zx_status::Status::INVALID_ARGS)?;
162let options =
163 zx::WaitAsyncOpts::from_bits((*wait).options).ok_or(zx_status::Status::INVALID_ARGS)?;
164let result =
165 handle.wait_async_handle(registration.port(), registration.key(), signals, options);
166167 wait_ender.record_registration(registration);
168169 result
170 }
171172#[cfg(not(target_os = "fuchsia"))]
173unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
174use fasync::emulated_handle::{Handle, Signals};
175use fasync::OnSignalsRef;
176177let wait_ptr = wait;
178let dispatcher: *mut Executor = &mut *self;
179let wait = &mut *wait;
180// TODO(sadmac): Make sure the handle is guaranteed to remain open for the duration of the
181 // wait, or document what effect that has on correctness.
182let object = std::mem::ManuallyDrop::new(Handle::from_raw(wait.object));
183let trigger = Signals::from_bits_retain(wait.trigger);
184let handler = wait.handler;
185186 fasync::Task::local(async move {
187match OnSignalsRef::new(&*object, trigger).await {
188Ok(sigs) => {
189let packet = zx_packet_signal_t {
190 trigger: trigger.bits(),
191 observed: sigs.bits(),
192 count: 1,
193 timestamp: 1234,
194 };
195196 handler(dispatcher as *mut std::ffi::c_void, wait_ptr, ZX_OK, &packet);
197 }
198Err(x) => {
199 handler(
200 dispatcher as *mut std::ffi::c_void,
201 wait_ptr,
202 x.into_raw(),
203 std::ptr::null_mut(),
204 );
205 }
206 }
207 })
208 .detach();
209210Ok(())
211 }
212213fn now(&self) -> zx_time_t {
214let dur = fasync::MonotonicInstant::now() - self.start;
215#[cfg(target_os = "fuchsia")]
216let r = dur.into_nanos();
217#[cfg(not(target_os = "fuchsia"))]
218let r = dur.as_nanos() as zx_time_t;
219 r
220 }
221}
222223/// Duplicated from //sdk/lib/async/include/lib/async/dispatcher.h
224#[repr(C)]
225struct async_state_t {
226 _reserved: [usize; 2],
227}
228229/// Duplicated from //sdk/lib/async/include/lib/async/wait.h
230#[repr(C)]
231pub(crate) struct async_wait_t {
232 _state: async_state_t,
233 handler: extern "C" fn(
234*mut std::ffi::c_void,
235*mut async_wait_t,
236 zx_status_t,
237*const zx_packet_signal_t,
238 ),
239 object: zx_handle_t,
240 trigger: zx_signals_t,
241 options: u32,
242}
243244/// Duplicated from //sdk/lib/async/include/lib/async/task.h
245#[repr(C)]
246struct async_task_t {
247 _state: async_state_t,
248 handler: extern "C" fn(*mut std::ffi::c_void, *mut async_task_t, zx_status_t),
249 deadline: zx_time_t,
250}
251252struct TaskPtr(*mut async_task_t);
253unsafe impl Send for TaskPtr {}
254unsafe impl Sync for TaskPtr {}
255impl TaskPtr {
256unsafe fn as_ref(&self) -> &async_task_t {
257self.0.as_ref().unwrap()
258 }
259}
260261#[no_mangle]
262pub extern "C" fn fasync_executor_create(cb_executor: *mut std::ffi::c_void) -> *mut Executor {
263 Box::into_raw(Executor::new(cb_executor))
264}
265266/// Runs the given executor in a single-threaded fashion.
267///
268/// This will block the calling thread until the executor is quit with
269/// [`fasync_executor_quit`] or destroyed with [`fasync_executor_destroy`]. Note
270/// that after calling this function, `executor` may be a dangling pointer.
271///
272/// # Panics
273///
274/// Panics if the given executor is already running, for example if another
275/// thread is already calling this function.
276///
277/// # Safety
278///
279/// `executor` must be non-null, properly aligned, and point to an initialized
280/// [`Executor`]. To guarantee these properties, `executor` should be a pointer
281/// returned from [`fasync_executor_create`] that has not yet been passed to
282/// [`fasync_executor_destroy`].
283#[no_mangle]
284pub unsafe extern "C" fn fasync_executor_run_singlethreaded(executor: *mut Executor) {
285 EPtr(executor).as_ref().run_singlethreaded()
286}
287288/// Signals the given executor to shut down.
289///
290/// This function does not wait for the executor to finish shutting down. After
291/// calling this function, running tasks may continue to be processed until the
292/// executor processes the shutdown signal. Once the executor shuts down,
293/// [`fasync_executor_run_singlethreaded`] will return.
294///
295/// # Panics
296///
297/// Panics if the given executor is not currently running.
298///
299/// # Safety
300///
301/// `executor` must be non-null, properly aligned, and point to an initialized
302/// [`Executor`]. To guarantee these properties, `executor` should be a pointer
303/// returned from [`fasync_executor_create`] that has not yet been passed to
304/// [`fasync_executor_destroy`].
305#[no_mangle]
306pub unsafe extern "C" fn fasync_executor_quit(executor: *mut Executor) {
307 EPtr(executor).as_ref().quit()
308}
309310/// Drops the given executor.
311///
312/// This will block the current thread until all tasks that are currently
313/// spawned onto the executor have been dropped. After calling this function,
314/// `executor` no longer points to an initialized [`Executor`].
315///
316/// # Safety
317///
318/// `executor` must be non-null, properly aligned, and point to an initialized
319/// [`Executor`] that is not currently running. To guarantee these properties,
320/// `executor` should be a pointer returned from [`fasync_executor_create`] that
321/// has not yet been passed to [`fasync_executor_destroy`] and is not currently
322/// running in a call to [`fasync_executor_run_singlethreaded`].
323#[no_mangle]
324pub unsafe extern "C" fn fasync_executor_destroy(executor: *mut Executor) {
325 drop(Box::from_raw(executor))
326}
327328#[no_mangle]
329unsafe extern "C" fn fasync_executor_now(executor: *mut Executor) -> zx_time_t {
330 EPtr(executor).as_ref().now()
331}
332333#[no_mangle]
334unsafe extern "C" fn fasync_executor_begin_wait(
335 executor: *mut Executor,
336 wait: *mut async_wait_t,
337) -> zx_status_t {
338 zx_status::Status::from_result(EPtr(executor).as_mut().begin_wait(wait)).into_raw()
339}
340341#[no_mangle]
342unsafe extern "C" fn fasync_executor_cancel_wait(
343 _executor: *mut Executor,
344 _wait: *mut async_wait_t,
345) -> zx_status_t {
346 ZX_ERR_NOT_SUPPORTED
347}
348349#[no_mangle]
350unsafe extern "C" fn fasync_executor_post_task(
351 executor: *mut Executor,
352 task: *mut async_task_t,
353) -> zx_status_t {
354let executor = EPtr(executor);
355let task = TaskPtr(task);
356 fasync::Task::spawn(async move {
357let deadline = Duration::from_nanos(task.as_ref().deadline as u64);
358let start = executor.as_ref().start;
359 fasync::Timer::new(start + deadline.into()).await;
360 (task.as_ref().handler)(executor.as_ref().cb_executor, task.0, ZX_OK)
361 })
362 .detach();
363 ZX_OK
364}
365366#[no_mangle]
367unsafe extern "C" fn fasync_executor_cancel_task(
368 _executor: *mut Executor,
369 _task: *mut async_task_t,
370) -> zx_status_t {
371 ZX_ERR_NOT_SUPPORTED
372}