#![warn(rust_2018_idioms)]
mod backing;
mod binding;
pub mod logging;
pub(crate) mod to_escaped_string;
use futures::prelude::*;
use openthread::prelude::*;
use backing::*;
use binding::*;
use anyhow::Error;
use fuchsia_async as fasync;
use futures::channel::mpsc as fmpsc;
use lowpan_driver_common::spinel::*;
use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::task::{Context, Poll};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
const FRAME_READY_BUFFER_LEN: usize = 2;
const UDP_PACKET_MAX_LENGTH: usize = 1280usize;
pub struct PlatformBuilder {
pub(crate) thread_netif_index: Option<u32>,
pub(crate) backbone_netif_index: Option<u32>,
}
impl PlatformBuilder {
#[must_use]
pub fn thread_netif_index(mut self, index: u32) -> Self {
self.thread_netif_index = Some(index);
self
}
#[must_use]
pub fn backbone_netif_index(mut self, index: u32) -> Self {
self.backbone_netif_index = Some(index);
self
}
pub fn init<SpinelSink, SpinelStream>(
self,
spinel_sink: SpinelSink,
spinel_stream: SpinelStream,
) -> Platform
where
SpinelSink: SpinelDeviceClient + 'static,
SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
{
Platform::init(self, spinel_sink, spinel_stream)
}
}
pub struct Platform {
timer_receiver: fmpsc::Receiver<usize>,
rcp_to_ot_frame_ready_receiver: fmpsc::Receiver<()>,
nat64_platform_instance: Nat64PlatformInstance,
ot_to_rcp_task: fasync::Task<()>,
rcp_to_ot_task: fasync::Task<()>,
}
impl Platform {
#[must_use]
pub fn build() -> PlatformBuilder {
PlatformBuilder { thread_netif_index: None, backbone_netif_index: None }
}
fn init<SpinelSink, SpinelStream>(
builder: PlatformBuilder,
mut spinel_sink: SpinelSink,
mut spinel_stream: SpinelStream,
) -> Self
where
SpinelSink: SpinelDeviceClient + 'static,
SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
{
let (alarm, timer_receiver) = backing::AlarmInstance::new();
let (ot_to_rcp_sender, ot_to_rcp_receiver) = mpsc::channel::<Vec<u8>>();
let ot_to_rcp_task = fasync::Task::spawn(async move {
spinel_sink.open().await.expect("Unable to open spinel stream");
loop {
trace!(tag = "ot_to_rcp_task", "waiting on frame from OpenThread");
let frame = match ot_to_rcp_receiver.recv() {
Ok(frame) => frame,
Err(e) => {
warn!(
tag = "ot_to_rcp_task",
"ot_to_rcp_receiver.recv() failed with {:?}", e
);
break;
}
};
trace!(tag = "ot_to_rcp_task", "sending frame from OpenThread to RCP");
if let Err(e) = spinel_sink.send(frame.as_slice()).await {
warn!(tag = "ot_to_rcp_task", "spinel_sink.send() failed with {:?}", e);
break;
}
}
});
let (mut rcp_to_ot_frame_ready_sender, rcp_to_ot_frame_ready_receiver) =
fmpsc::channel(FRAME_READY_BUFFER_LEN);
let (rcp_to_ot_sender, rcp_to_ot_receiver) = mpsc::channel::<Vec<u8>>();
let rcp_to_ot_task = fasync::Task::spawn(async move {
while let Some(frame_result) = spinel_stream.next().await {
match frame_result {
Ok(frame) => {
trace!(tag = "rcp_to_ot_task", "sending frame from RCP to OpenThread");
if let Err(e) = rcp_to_ot_sender.send(frame) {
warn!(
tag = "rcp_to_ot_task",
"rcp_to_ot_sender.send() failed with {:?}", e
);
break;
}
match rcp_to_ot_frame_ready_sender.try_send(()) {
Ok(()) => {}
Err(e) if e.is_full() => {}
Err(e) => {
warn!(
tag = "rcp_to_ot_task",
"rcp_to_ot_frame_ready_sender.send() failed with {:?}", e
);
break;
}
}
}
Err(e) => {
warn!(tag = "rcp_to_ot_task", "spinel_stream.next() failed with {:?}", e);
break;
}
}
}
trace!(tag = "rcp_to_ot_task", "Stream ended.");
});
let (nat64_prefix_req_sender, nat64_prefix_req_receiver) = fmpsc::unbounded();
unsafe {
PlatformBacking::set_singleton(PlatformBacking {
ot_to_rcp_sender: RefCell::new(ot_to_rcp_sender),
rcp_to_ot_receiver: RefCell::new(rcp_to_ot_receiver),
alarm,
netif_index_thread: builder.thread_netif_index,
netif_index_backbone: builder.backbone_netif_index,
trel: RefCell::new(None),
infra_if: InfraIfInstance::new(builder.backbone_netif_index.unwrap_or(0)),
is_platform_reset_requested: AtomicBool::new(false),
nat64: Nat64Instance::new(nat64_prefix_req_sender),
resolver: Resolver::new(),
});
otSysInit(&mut otPlatformConfig { reset_rcp: false } as *mut otPlatformConfig);
};
Platform {
timer_receiver,
rcp_to_ot_frame_ready_receiver,
ot_to_rcp_task,
rcp_to_ot_task,
nat64_platform_instance: Nat64PlatformInstance::new(nat64_prefix_req_receiver),
}
}
}
impl Drop for Platform {
fn drop(&mut self) {
debug!(tag = "openthread_fuchsia", "Dropping Platform");
unsafe {
otSysDeinit();
PlatformBacking::drop_singleton()
}
}
}
impl ot::Platform for Platform {
unsafe fn process_poll(
&mut self,
instance: &ot::Instance,
cx: &mut Context<'_>,
) -> Result<(), Error> {
self.process_poll_alarm(instance, cx);
self.process_poll_radio(instance, cx);
self.process_poll_udp(instance, cx);
self.process_poll_trel(instance, cx);
self.process_poll_infra_if(instance, cx);
self.process_poll_nat64(instance, cx);
self.process_poll_tasks(cx);
PlatformBacking::as_ref().resolver.process_poll_resolver(instance, cx);
if PlatformBacking::as_ref().is_platform_reset_requested.load(Ordering::SeqCst) {
return Err(PlatformResetRequested {}.into());
}
Ok(())
}
}
impl Platform {
fn process_poll_radio(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
let instance_ptr = instance.as_ot_ptr();
while let Poll::Ready(Some(())) = self.rcp_to_ot_frame_ready_receiver.poll_next_unpin(cx) {
trace!(tag = "rcp", "Firing platformRadioProcess");
unsafe {
platformRadioProcess(instance_ptr, std::ptr::null());
}
}
}
fn process_poll_udp(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
for udp_socket in instance.udp_get_sockets() {
if let Poll::Ready(Err(err)) = poll_ot_udp_socket(udp_socket, instance, cx) {
error!(tag = "udp", "Error in {:?}: {:?}", udp_socket, err);
}
}
}
fn process_poll_trel(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
{
let mut trel = unsafe { PlatformBacking::as_ref() }.trel.borrow_mut();
if let Some(trel) = trel.as_mut() {
trel.poll(instance, cx);
}
}
{
let trel = unsafe { PlatformBacking::as_ref() }.trel.borrow();
if let Some(trel) = trel.as_ref() {
trel.poll_io(instance, cx);
}
}
}
fn process_poll_infra_if(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
let infra_if = unsafe { PlatformBacking::as_ref() }.infra_if.as_ref();
if let Some(infra_if) = infra_if {
infra_if.poll(instance, cx);
}
}
fn process_poll_tasks(&mut self, cx: &mut Context<'_>) {
if Poll::Ready(()) == self.rcp_to_ot_task.poll_unpin(cx) {
panic!("Platform: rcp_to_ot_task finished unexpectedly");
}
if Poll::Ready(()) == self.ot_to_rcp_task.poll_unpin(cx) {
panic!("Platform: ot_to_rcp_task finished unexpectedly");
}
}
}