openthread/ot/
tasklets.rsuse crate::prelude_internal::*;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
pub trait Tasklets: Unpin {
fn set_waker(&self, waker: Waker);
fn wake_waker(&self);
fn process(&self);
fn has_pending(&self) -> bool;
}
impl<T: Tasklets + ot::Boxable> Tasklets for ot::Box<T> {
fn set_waker(&self, waker: Waker) {
self.as_ref().set_waker(waker)
}
fn wake_waker(&self) {
self.as_ref().wake_waker()
}
fn process(&self) {
self.as_ref().process()
}
fn has_pending(&self) -> bool {
self.as_ref().has_pending()
}
}
pub trait ProcessPollAsync {
fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>>;
}
impl ProcessPollAsync for ot::Instance {
fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>> {
if let Err(err) = self.platform_poll(cx) {
warn!("process_poll terminating: {:?}", err);
return std::task::Poll::Ready(None);
};
self.set_waker(cx.waker().clone());
if self.has_pending() {
std::task::Poll::Ready({
self.process();
Some(())
})
} else {
std::task::Poll::Pending
}
}
}
#[no_mangle]
unsafe extern "C" fn otTaskletsSignalPending(instance: *mut otInstance) {
trace!("otTaskletsSignalPending");
Instance::ref_from_ot_ptr(instance).unwrap().wake_waker();
}
#[derive(Debug)]
pub struct TaskletsStream<'a, T: ?Sized>(&'a T);
impl<T: TaskletsStreamExt + ?Sized> Stream for TaskletsStream<'_, T> {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.tasklets_poll(cx)
}
}
pub trait TaskletsStreamExt {
fn tasklets_poll(&self, cx: &mut Context<'_>) -> Poll<Option<()>>;
fn tasklets_stream(&self) -> TaskletsStream<'_, Self> {
TaskletsStream(self)
}
}
impl<T: AsRef<ot::Instance>> TaskletsStreamExt for fuchsia_sync::Mutex<T> {
fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
use std::ops::Deref;
let guard = self.lock();
guard.deref().as_ref().process_poll(cx)
}
}
impl<T: AsRef<ot::Instance>> TaskletsStreamExt for std::sync::Mutex<T> {
fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
use std::ops::Deref;
let guard = self.lock().expect("Lock is poisoned");
guard.deref().as_ref().process_poll(cx)
}
}