Skip to main content

openthread_fuchsia/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This crate contains the rust OpenThread platform implementation for Fuchsia.
6
7#![warn(rust_2018_idioms)]
8
9mod backing;
10mod binding;
11pub mod logging;
12pub(crate) mod to_escaped_string;
13
14use futures::prelude::*;
15use openthread::prelude::*;
16
17use backing::*;
18use binding::*;
19
20use anyhow::Error;
21use fuchsia_async as fasync;
22use futures::channel::mpsc as fmpsc;
23use lowpan_driver_common::spinel::*;
24use std::cell::RefCell;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::mpsc;
27use std::task::{Context, Poll};
28
29#[allow(unused_imports)]
30use log::{debug, error, info, trace, warn};
31
32// Number of entries in the frame-ready channel.
33// This length doesn't need to be very large, as it
34// is effectively just a wakeup flag.
35const FRAME_READY_BUFFER_LEN: usize = 2;
36
37const UDP_PACKET_MAX_LENGTH: usize = 1280usize;
38
39/// Builder type for the OpenThread platform. Create using [`Platform::builder()`].
40pub struct PlatformBuilder {
41    pub(crate) thread_netif_index: Option<u32>,
42    pub(crate) backbone_netif_index: Option<u32>,
43}
44
45impl PlatformBuilder {
46    #[must_use]
47    pub fn thread_netif_index(mut self, index: u32) -> Self {
48        self.thread_netif_index = Some(index);
49        self
50    }
51
52    #[must_use]
53    pub fn backbone_netif_index(mut self, index: u32) -> Self {
54        self.backbone_netif_index = Some(index);
55        self
56    }
57
58    /// Initializes the OpenThread platform.
59    ///
60    /// The instance returned by this method must be passed to
61    /// [`ot::Instance::new()`](openthread_rust::ot::Instance::new).
62    ///
63    /// The returned object is a singleton. Attempting to have more than one instance
64    /// around at a time will cause a panic.
65    pub fn init<SpinelSink, SpinelStream>(
66        self,
67        spinel_sink: SpinelSink,
68        spinel_stream: SpinelStream,
69    ) -> Platform
70    where
71        SpinelSink: SpinelDeviceClient + 'static,
72        SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
73    {
74        Platform::init(self, spinel_sink, spinel_stream)
75    }
76}
77
78/// OpenThread Singleton Platform Implementation.
79///
80/// An instance of this type must be passed to
81/// [`ot::Instance::new()`](openthread_rust::ot::Instance::new).
82///
83/// This type is a singleton. Attempting to init more than a
84/// single instance of `Platform` at a time will cause a panic.
85pub struct Platform {
86    timer_receiver: fmpsc::Receiver<usize>,
87    rcp_to_ot_frame_ready_receiver: fmpsc::Receiver<()>,
88    nat64_platform_instance: Nat64PlatformInstance,
89    ot_to_rcp_task: fasync::Task<()>,
90    rcp_to_ot_task: fasync::Task<()>,
91}
92
93impl Platform {
94    #[must_use]
95    pub fn build() -> PlatformBuilder {
96        PlatformBuilder { thread_netif_index: None, backbone_netif_index: None }
97    }
98
99    /// This method is called by `PlatformBuilder::init`.
100    fn init<SpinelSink, SpinelStream>(
101        builder: PlatformBuilder,
102        mut spinel_sink: SpinelSink,
103        mut spinel_stream: SpinelStream,
104    ) -> Self
105    where
106        SpinelSink: SpinelDeviceClient + 'static,
107        SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
108    {
109        // OpenThread to RCP data-pump and related machinery.
110        let (alarm, timer_receiver) = backing::AlarmInstance::new();
111        let (ot_to_rcp_sender, ot_to_rcp_receiver) = mpsc::channel::<Vec<u8>>();
112        let ot_to_rcp_task = fasync::Task::spawn(async move {
113            spinel_sink.open().await.expect("Unable to open spinel stream");
114            // Wait for initial allowance before sending any frames
115            spinel_sink.wait_for_allowance().await;
116            loop {
117                trace!(tag = "ot_to_rcp_task"; "waiting on frame from OpenThread");
118
119                let frame = match ot_to_rcp_receiver.recv() {
120                    Ok(frame) => frame,
121                    Err(e) => {
122                        warn!(
123                            tag = "ot_to_rcp_task";
124                            "ot_to_rcp_receiver.recv() failed with {:?}", e
125                        );
126                        break;
127                    }
128                };
129
130                trace!(tag = "ot_to_rcp_task"; "sending frame from OpenThread to RCP");
131                if let Err(e) = spinel_sink.send(frame.as_slice()).await {
132                    warn!(tag = "ot_to_rcp_task"; "spinel_sink.send() failed with {:?}", e);
133                    break;
134                }
135            }
136        });
137
138        // RCP to OpenThread data-pump and related machinery.
139        let (mut rcp_to_ot_frame_ready_sender, rcp_to_ot_frame_ready_receiver) =
140            fmpsc::channel(FRAME_READY_BUFFER_LEN);
141        let (rcp_to_ot_sender, rcp_to_ot_receiver) = mpsc::channel::<Vec<u8>>();
142        let rcp_to_ot_task = fasync::Task::spawn(async move {
143            while let Some(frame_result) = spinel_stream.next().await {
144                match frame_result {
145                    Ok(frame) => {
146                        trace!(tag = "rcp_to_ot_task"; "sending frame from RCP to OpenThread");
147
148                        if let Err(e) = rcp_to_ot_sender.send(frame) {
149                            warn!(
150                                tag = "rcp_to_ot_task";
151                                "rcp_to_ot_sender.send() failed with {:?}", e
152                            );
153                            break;
154                        }
155
156                        // Notify our `process_poll` that it needs to call `platformRadioProcess`.
157                        match rcp_to_ot_frame_ready_sender.try_send(()) {
158                            Ok(()) => {}
159                            Err(e) if e.is_full() => {}
160                            Err(e) => {
161                                warn!(
162                                    tag = "rcp_to_ot_task";
163                                    "rcp_to_ot_frame_ready_sender.send() failed with {:?}", e
164                                );
165                                break;
166                            }
167                        }
168                    }
169                    Err(e) => {
170                        warn!(tag = "rcp_to_ot_task"; "spinel_stream.next() failed with {:?}", e);
171                        break;
172                    }
173                }
174            }
175            trace!(tag = "rcp_to_ot_task"; "Stream ended.");
176        });
177
178        let (nat64_prefix_req_sender, nat64_prefix_req_receiver) = fmpsc::unbounded();
179
180        unsafe {
181            // Initialize our singleton
182            PlatformBacking::set_singleton(PlatformBacking {
183                ot_to_rcp_sender: RefCell::new(ot_to_rcp_sender),
184                rcp_to_ot_receiver: RefCell::new(rcp_to_ot_receiver),
185                alarm,
186                netif_index_thread: builder.thread_netif_index,
187                netif_index_backbone: builder.backbone_netif_index,
188                trel: RefCell::new(None),
189                infra_if: InfraIfInstance::new(builder.backbone_netif_index.unwrap_or(0)),
190                is_platform_reset_requested: AtomicBool::new(false),
191                nat64: Nat64Instance::new(nat64_prefix_req_sender),
192                resolver: Resolver::new(),
193            });
194
195            // Initialize the lower-level platform implementation
196            otSysInit(&mut otPlatformConfig { reset_rcp: false } as *mut otPlatformConfig);
197        };
198
199        Platform {
200            timer_receiver,
201            rcp_to_ot_frame_ready_receiver,
202            ot_to_rcp_task,
203            rcp_to_ot_task,
204            nat64_platform_instance: Nat64PlatformInstance::new(nat64_prefix_req_receiver),
205        }
206    }
207}
208
209impl Drop for Platform {
210    fn drop(&mut self) {
211        debug!(tag = "openthread_fuchsia"; "Dropping Platform");
212        unsafe {
213            // SAFETY: Both calls below must only be called from Drop.
214            otSysDeinit();
215            PlatformBacking::drop_singleton()
216        }
217    }
218}
219
220impl ot::Platform for Platform {
221    unsafe fn process_poll(
222        &mut self,
223        instance: &ot::Instance,
224        cx: &mut Context<'_>,
225    ) -> Result<(), Error> {
226        self.process_poll_alarm(instance, cx);
227        self.process_poll_radio(instance, cx);
228        self.process_poll_udp(instance, cx);
229        self.process_poll_trel(instance, cx);
230        self.process_poll_infra_if(instance, cx);
231        self.process_poll_nat64(instance, cx);
232        self.process_poll_tasks(cx);
233        unsafe { PlatformBacking::as_ref() }.resolver.process_poll_resolver(instance, cx);
234        if unsafe { PlatformBacking::as_ref() }.is_platform_reset_requested.load(Ordering::SeqCst) {
235            return Err(PlatformResetRequested {}.into());
236        }
237        Ok(())
238    }
239}
240
241impl Platform {
242    fn process_poll_radio(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
243        let instance_ptr = instance.as_ot_ptr();
244
245        while let Poll::Ready(Some(())) = self.rcp_to_ot_frame_ready_receiver.poll_next_unpin(cx) {
246            trace!(tag = "rcp"; "Firing platformRadioProcess");
247
248            // SAFETY: Must be called with a valid pointer to otInstance,
249            //         must also only be called from the main OpenThread thread,
250            //         which is a guarantee of this method.
251            unsafe {
252                platformSpinelManagerProcess(instance_ptr, std::ptr::null());
253                platformRadioProcess(instance_ptr, std::ptr::null());
254            }
255        }
256    }
257
258    fn process_poll_udp(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
259        for udp_socket in instance.udp_get_sockets() {
260            // This `poll` call comes from the trait `UdpSocketHelpers` in `backing/udp.rs`
261            if let Poll::Ready(Err(err)) = poll_ot_udp_socket(udp_socket, instance, cx) {
262                error!(tag = "udp"; "Error in {:?}: {:?}", udp_socket, err);
263            }
264        }
265    }
266
267    fn process_poll_trel(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
268        // Poll for discovery. This must be `mut` because it calls poll methods internally,
269        // and is not reentrant.
270        {
271            // SAFETY: Guaranteed to only be called from the OpenThread thread.
272            let mut trel = unsafe { PlatformBacking::as_ref() }.trel.borrow_mut();
273            if let Some(trel) = trel.as_mut() {
274                trel.poll(instance, cx);
275            }
276        }
277
278        // Poll for I/O. This is separate from the above poll because it must not be
279        // `mut` so that it can be reentrant.
280        {
281            // SAFETY: Guaranteed to only be called from the OpenThread thread.
282            let trel = unsafe { PlatformBacking::as_ref() }.trel.borrow();
283            if let Some(trel) = trel.as_ref() {
284                trel.poll_io(instance, cx);
285            }
286        }
287    }
288
289    fn process_poll_infra_if(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
290        // SAFETY: Guaranteed to only be called from the OpenThread thread.
291        let infra_if = unsafe { PlatformBacking::as_ref() }.infra_if.as_ref();
292        if let Some(infra_if) = infra_if {
293            infra_if.poll(instance, cx);
294        }
295    }
296
297    fn process_poll_tasks(&mut self, cx: &mut Context<'_>) {
298        if Poll::Ready(()) == self.rcp_to_ot_task.poll_unpin(cx) {
299            panic!("Platform: rcp_to_ot_task finished unexpectedly");
300        }
301
302        if Poll::Ready(()) == self.ot_to_rcp_task.poll_unpin(cx) {
303            panic!("Platform: ot_to_rcp_task finished unexpectedly");
304        }
305    }
306}