1#![warn(rust_2018_idioms)]
8
9mod backing;
10mod binding;
11pub mod logging;
12pub(crate) mod to_escaped_string;
13
14use futures::prelude::*;
15use openthread::prelude::*;
16
17use backing::*;
18use binding::*;
19
20use anyhow::Error;
21use fuchsia_async as fasync;
22use futures::channel::mpsc as fmpsc;
23use lowpan_driver_common::spinel::*;
24use std::cell::RefCell;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::mpsc;
27use std::task::{Context, Poll};
28
29#[allow(unused_imports)]
30use log::{debug, error, info, trace, warn};
31
32const FRAME_READY_BUFFER_LEN: usize = 2;
36
37const UDP_PACKET_MAX_LENGTH: usize = 1280usize;
38
39pub struct PlatformBuilder {
41 pub(crate) thread_netif_index: Option<u32>,
42 pub(crate) backbone_netif_index: Option<u32>,
43}
44
45impl PlatformBuilder {
46 #[must_use]
47 pub fn thread_netif_index(mut self, index: u32) -> Self {
48 self.thread_netif_index = Some(index);
49 self
50 }
51
52 #[must_use]
53 pub fn backbone_netif_index(mut self, index: u32) -> Self {
54 self.backbone_netif_index = Some(index);
55 self
56 }
57
58 pub fn init<SpinelSink, SpinelStream>(
66 self,
67 spinel_sink: SpinelSink,
68 spinel_stream: SpinelStream,
69 ) -> Platform
70 where
71 SpinelSink: SpinelDeviceClient + 'static,
72 SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
73 {
74 Platform::init(self, spinel_sink, spinel_stream)
75 }
76}
77
78pub struct Platform {
86 timer_receiver: fmpsc::Receiver<usize>,
87 rcp_to_ot_frame_ready_receiver: fmpsc::Receiver<()>,
88 nat64_platform_instance: Nat64PlatformInstance,
89 ot_to_rcp_task: fasync::Task<()>,
90 rcp_to_ot_task: fasync::Task<()>,
91}
92
93impl Platform {
94 #[must_use]
95 pub fn build() -> PlatformBuilder {
96 PlatformBuilder { thread_netif_index: None, backbone_netif_index: None }
97 }
98
99 fn init<SpinelSink, SpinelStream>(
101 builder: PlatformBuilder,
102 mut spinel_sink: SpinelSink,
103 mut spinel_stream: SpinelStream,
104 ) -> Self
105 where
106 SpinelSink: SpinelDeviceClient + 'static,
107 SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
108 {
109 let (alarm, timer_receiver) = backing::AlarmInstance::new();
111 let (ot_to_rcp_sender, ot_to_rcp_receiver) = mpsc::channel::<Vec<u8>>();
112 let ot_to_rcp_task = fasync::Task::spawn(async move {
113 spinel_sink.open().await.expect("Unable to open spinel stream");
114 spinel_sink.wait_for_allowance().await;
116 loop {
117 trace!(tag = "ot_to_rcp_task"; "waiting on frame from OpenThread");
118
119 let frame = match ot_to_rcp_receiver.recv() {
120 Ok(frame) => frame,
121 Err(e) => {
122 warn!(
123 tag = "ot_to_rcp_task";
124 "ot_to_rcp_receiver.recv() failed with {:?}", e
125 );
126 break;
127 }
128 };
129
130 trace!(tag = "ot_to_rcp_task"; "sending frame from OpenThread to RCP");
131 if let Err(e) = spinel_sink.send(frame.as_slice()).await {
132 warn!(tag = "ot_to_rcp_task"; "spinel_sink.send() failed with {:?}", e);
133 break;
134 }
135 }
136 });
137
138 let (mut rcp_to_ot_frame_ready_sender, rcp_to_ot_frame_ready_receiver) =
140 fmpsc::channel(FRAME_READY_BUFFER_LEN);
141 let (rcp_to_ot_sender, rcp_to_ot_receiver) = mpsc::channel::<Vec<u8>>();
142 let rcp_to_ot_task = fasync::Task::spawn(async move {
143 while let Some(frame_result) = spinel_stream.next().await {
144 match frame_result {
145 Ok(frame) => {
146 trace!(tag = "rcp_to_ot_task"; "sending frame from RCP to OpenThread");
147
148 if let Err(e) = rcp_to_ot_sender.send(frame) {
149 warn!(
150 tag = "rcp_to_ot_task";
151 "rcp_to_ot_sender.send() failed with {:?}", e
152 );
153 break;
154 }
155
156 match rcp_to_ot_frame_ready_sender.try_send(()) {
158 Ok(()) => {}
159 Err(e) if e.is_full() => {}
160 Err(e) => {
161 warn!(
162 tag = "rcp_to_ot_task";
163 "rcp_to_ot_frame_ready_sender.send() failed with {:?}", e
164 );
165 break;
166 }
167 }
168 }
169 Err(e) => {
170 warn!(tag = "rcp_to_ot_task"; "spinel_stream.next() failed with {:?}", e);
171 break;
172 }
173 }
174 }
175 trace!(tag = "rcp_to_ot_task"; "Stream ended.");
176 });
177
178 let (nat64_prefix_req_sender, nat64_prefix_req_receiver) = fmpsc::unbounded();
179
180 unsafe {
181 PlatformBacking::set_singleton(PlatformBacking {
183 ot_to_rcp_sender: RefCell::new(ot_to_rcp_sender),
184 rcp_to_ot_receiver: RefCell::new(rcp_to_ot_receiver),
185 alarm,
186 netif_index_thread: builder.thread_netif_index,
187 netif_index_backbone: builder.backbone_netif_index,
188 trel: RefCell::new(None),
189 infra_if: InfraIfInstance::new(builder.backbone_netif_index.unwrap_or(0)),
190 is_platform_reset_requested: AtomicBool::new(false),
191 nat64: Nat64Instance::new(nat64_prefix_req_sender),
192 resolver: Resolver::new(),
193 });
194
195 otSysInit(&mut otPlatformConfig { reset_rcp: false } as *mut otPlatformConfig);
197 };
198
199 Platform {
200 timer_receiver,
201 rcp_to_ot_frame_ready_receiver,
202 ot_to_rcp_task,
203 rcp_to_ot_task,
204 nat64_platform_instance: Nat64PlatformInstance::new(nat64_prefix_req_receiver),
205 }
206 }
207}
208
209impl Drop for Platform {
210 fn drop(&mut self) {
211 debug!(tag = "openthread_fuchsia"; "Dropping Platform");
212 unsafe {
213 otSysDeinit();
215 PlatformBacking::drop_singleton()
216 }
217 }
218}
219
220impl ot::Platform for Platform {
221 unsafe fn process_poll(
222 &mut self,
223 instance: &ot::Instance,
224 cx: &mut Context<'_>,
225 ) -> Result<(), Error> {
226 self.process_poll_alarm(instance, cx);
227 self.process_poll_radio(instance, cx);
228 self.process_poll_udp(instance, cx);
229 self.process_poll_trel(instance, cx);
230 self.process_poll_infra_if(instance, cx);
231 self.process_poll_nat64(instance, cx);
232 self.process_poll_tasks(cx);
233 unsafe { PlatformBacking::as_ref() }.resolver.process_poll_resolver(instance, cx);
234 if unsafe { PlatformBacking::as_ref() }.is_platform_reset_requested.load(Ordering::SeqCst) {
235 return Err(PlatformResetRequested {}.into());
236 }
237 Ok(())
238 }
239}
240
241impl Platform {
242 fn process_poll_radio(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
243 let instance_ptr = instance.as_ot_ptr();
244
245 while let Poll::Ready(Some(())) = self.rcp_to_ot_frame_ready_receiver.poll_next_unpin(cx) {
246 trace!(tag = "rcp"; "Firing platformRadioProcess");
247
248 unsafe {
252 platformSpinelManagerProcess(instance_ptr, std::ptr::null());
253 platformRadioProcess(instance_ptr, std::ptr::null());
254 }
255 }
256 }
257
258 fn process_poll_udp(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
259 for udp_socket in instance.udp_get_sockets() {
260 if let Poll::Ready(Err(err)) = poll_ot_udp_socket(udp_socket, instance, cx) {
262 error!(tag = "udp"; "Error in {:?}: {:?}", udp_socket, err);
263 }
264 }
265 }
266
267 fn process_poll_trel(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
268 {
271 let mut trel = unsafe { PlatformBacking::as_ref() }.trel.borrow_mut();
273 if let Some(trel) = trel.as_mut() {
274 trel.poll(instance, cx);
275 }
276 }
277
278 {
281 let trel = unsafe { PlatformBacking::as_ref() }.trel.borrow();
283 if let Some(trel) = trel.as_ref() {
284 trel.poll_io(instance, cx);
285 }
286 }
287 }
288
289 fn process_poll_infra_if(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
290 let infra_if = unsafe { PlatformBacking::as_ref() }.infra_if.as_ref();
292 if let Some(infra_if) = infra_if {
293 infra_if.poll(instance, cx);
294 }
295 }
296
297 fn process_poll_tasks(&mut self, cx: &mut Context<'_>) {
298 if Poll::Ready(()) == self.rcp_to_ot_task.poll_unpin(cx) {
299 panic!("Platform: rcp_to_ot_task finished unexpectedly");
300 }
301
302 if Poll::Ready(()) == self.ot_to_rcp_task.poll_unpin(cx) {
303 panic!("Platform: ot_to_rcp_task finished unexpectedly");
304 }
305 }
306}