openthread_fuchsia/backing/
trel.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::*;
6use crate::to_escaped_string::*;
7use anyhow::Context as _;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_net_mdns::*;
10use fuchsia_async::Task;
11use futures::stream::FusedStream;
12use openthread_sys::*;
13use ot::{PlatTrel as _, TrelCounters};
14use std::collections::HashMap;
15use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
16use std::task::{Context, Poll};
17
18pub(crate) struct TrelInstance {
19    socket: fasync::net::UdpSocket,
20    publication_responder: Option<Task<Result<(), anyhow::Error>>>,
21    instance_name: String,
22    peer_instance_sockaddr_map: HashMap<String, ot::SockAddr>,
23
24    #[allow(dead_code)] // This field must be kept around for <https://fxbug.dev/42182233>
25    subscriber: ServiceSubscriber2Proxy,
26
27    subscriber_request_stream: ServiceSubscriptionListenerRequestStream,
28
29    counters: RefCell<TrelCounters>,
30}
31
32// Converts an optional vector of strings to a single DNS-compatible string.
33fn flatten_txt(txt: Option<Vec<Vec<u8>>>) -> Vec<u8> {
34    let mut ret = vec![];
35
36    for mut txt in txt.iter().flat_map(|x| x.iter()).map(Vec::as_slice) {
37        if txt.len() > u8::MAX as usize {
38            // Limit the size of the records to 255 characters.
39            txt = &txt[0..(u8::MAX as usize) + 1];
40        }
41        ret.push(u8::try_from(txt.len()).unwrap());
42        ret.extend_from_slice(txt);
43    }
44
45    ret
46}
47
48/// Converts an iterator over [`fidl_fuchsia_net::SocketAddress`]es to a vector of
49/// [`ot::Ip6Address`]es and a port.
50fn process_addresses_from_socket_addresses<
51    T: IntoIterator<Item = fidl_fuchsia_net::SocketAddress>,
52>(
53    addresses: T,
54) -> (Vec<ot::Ip6Address>, Option<u16>) {
55    let mut ret_port: Option<u16> = None;
56    let mut addresses =
57        addresses
58            .into_iter()
59            .flat_map(|x| {
60                if let fidl_fuchsia_net::SocketAddress::Ipv6(
61                    fidl_fuchsia_net::Ipv6SocketAddress { address, port, .. },
62                ) = x
63                {
64                    let addr = ot::Ip6Address::from(address.addr);
65                    if ret_port.is_none() {
66                        ret_port = Some(port);
67                    } else if ret_port != Some(port) {
68                        warn!(
69                            tag = "trel";
70                            "mDNS service has multiple ports for the same service, {:?} != {:?}",
71                            ret_port.unwrap(),
72                            port
73                        );
74                    }
75                    // Require link-local. Thread requires TREL peers to advertise link-local via mDNS.
76                    if ipv6addr_is_unicast_link_local(&addr) {
77                        return Some(addr);
78                    }
79                }
80                None
81            })
82            .collect::<Vec<_>>();
83    addresses.sort();
84    (addresses, ret_port)
85}
86
87/// Returns `true` if the address is a unicast address with link-local scope.
88///
89/// The official equivalent of this method is [`std::net::Ipv6Addr::is_unicast_link_local()`],
90/// however that method is [still experimental](https://github.com/rust-lang/rust/issues/27709).
91fn ipv6addr_is_unicast_link_local(addr: &std::net::Ipv6Addr) -> bool {
92    (addr.segments()[0] & 0xffc0) == 0xfe80
93}
94
95// Splits the TXT record into individual values.
96fn split_txt(txt: &[u8]) -> Vec<Vec<u8>> {
97    info!(tag = "trel"; "trel:split_txt: Splitting TXT record: {:?}", hex::encode(txt));
98    let txt =
99        ot::DnsTxtEntryIterator::try_new(txt).expect("can't parse TXT records from OpenThread");
100    txt.map(|x| x.expect("can't parse TXT records from OpenThread").to_vec()).collect::<Vec<_>>()
101}
102
103impl TrelInstance {
104    fn new(instance_name: String) -> Result<TrelInstance, anyhow::Error> {
105        let (client, server) = create_endpoints::<ServiceSubscriptionListenerMarker>();
106
107        let subscriber =
108            fuchsia_component::client::connect_to_protocol::<ServiceSubscriber2Marker>().unwrap();
109
110        subscriber
111            .subscribe_to_service(
112                ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
113                &ServiceSubscriptionOptions { exclude_local: Some(true), ..Default::default() },
114                client,
115            )
116            .context("Unable to subscribe to TREL services")?;
117
118        Ok(TrelInstance {
119            socket: fasync::net::UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
120                Ipv6Addr::UNSPECIFIED,
121                0,
122                0,
123                0,
124            )))
125            .context("Unable to open TREL UDP socket")?,
126            publication_responder: None,
127            instance_name,
128            peer_instance_sockaddr_map: HashMap::default(),
129            subscriber,
130            subscriber_request_stream: server.into_stream(),
131            counters: RefCell::new(TrelCounters::default()),
132        })
133    }
134
135    fn port(&self) -> u16 {
136        self.socket.local_addr().unwrap().port()
137    }
138
139    fn register_service(&mut self, port: u16, txt: &[u8]) {
140        let txt = split_txt(txt);
141
142        let (client, server) = create_endpoints::<ServiceInstancePublicationResponder_Marker>();
143
144        let publisher =
145            fuchsia_component::client::connect_to_protocol::<ServiceInstancePublisherMarker>()
146                .unwrap();
147
148        let publish_init_future = publisher
149            .publish_service_instance(
150                ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
151                self.instance_name.as_str(),
152                &ServiceInstancePublicationOptions::default(),
153                client,
154            )
155            .map(|x| -> Result<(), anyhow::Error> {
156                match x {
157                    Ok(Ok(x)) => Ok(x),
158                    Ok(Err(err)) => Err(anyhow::format_err!("{:?}", err)),
159                    Err(zx_err) => Err(zx_err.into()),
160                }
161            });
162
163        let publish_responder_future = server.into_stream().map_err(Into::into).try_for_each(
164            move |ServiceInstancePublicationResponder_Request::OnPublication {
165                      responder, ..
166                  }| {
167                let txt = txt.clone();
168                let _publisher = publisher.clone();
169                async move {
170                    responder
171                        .send(Ok(&ServiceInstancePublication {
172                            port: Some(port),
173                            text: Some(txt),
174                            ..Default::default()
175                        }))
176                        .map_err(Into::into)
177                }
178            },
179        );
180
181        let future =
182            futures::future::try_join(publish_init_future, publish_responder_future).map_ok(|_| ());
183
184        self.publication_responder = Some(fuchsia_async::Task::spawn(future));
185    }
186
187    pub fn handle_service_subscriber_request(
188        &mut self,
189        ot_instance: &ot::Instance,
190        service_subscriber_request: ServiceSubscriptionListenerRequest,
191    ) -> Result<(), anyhow::Error> {
192        match service_subscriber_request {
193            // A DNS-SD IPv6 service instance has been discovered.
194            ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
195                instance:
196                    ServiceInstance {
197                        instance: Some(instance_name),
198                        addresses: Some(addresses),
199                        text_strings,
200                        ..
201                    },
202                responder,
203            } => {
204                let txt = flatten_txt(text_strings);
205
206                let (addresses, port) = process_addresses_from_socket_addresses(addresses);
207
208                info!(
209                    tag = "trel";
210                    "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
211                );
212
213                // Pick the smallest link-local to be robust to reorderings.
214                if let Some(address) = addresses.first() {
215                    let sockaddr = ot::SockAddr::new(*address, port.unwrap());
216
217                    self.peer_instance_sockaddr_map.insert(instance_name, sockaddr);
218
219                    let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
220                    info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Adding {:?}", info);
221                    ot_instance.plat_trel_handle_discovered_peer_info(&info);
222                } else {
223                    warn!(
224                        tag = "trel";
225                        "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
226                    );
227                }
228
229                responder.send().context("Unable to respond to OnInstanceDiscovered")?;
230            }
231
232            // A DNS-SD IPv6 service instance has changed.
233            ServiceSubscriptionListenerRequest::OnInstanceChanged {
234                instance:
235                    ServiceInstance {
236                        instance: Some(instance_name),
237                        addresses: Some(addresses),
238                        text_strings,
239                        ..
240                    },
241                responder,
242            } => {
243                let txt = flatten_txt(text_strings);
244                let (addresses, port) = process_addresses_from_socket_addresses(addresses);
245
246                info!(
247                    tag = "trel";
248                    "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
249                );
250
251                // Pick the smallest link-local to be robust to reorderings.
252                if let Some(address) = addresses.first() {
253                    let sockaddr = ot::SockAddr::new(*address, port.unwrap());
254
255                    if let Some(old_sockaddr) =
256                        self.peer_instance_sockaddr_map.insert(instance_name, sockaddr)
257                    {
258                        if old_sockaddr != sockaddr {
259                            // Remove old sockaddr with the same instance name
260                            let info_old = ot::PlatTrelPeerInfo::new(true, &[], old_sockaddr);
261                            info!(
262                                tag = "trel";
263                                "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info_old
264                            );
265                            ot_instance.plat_trel_handle_discovered_peer_info(&info_old);
266                        }
267
268                        let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
269                        info!(
270                            tag = "trel";
271                            "otPlatTrelHandleDiscoveredPeerInfo: Updating {:?}", info
272                        );
273                        ot_instance.plat_trel_handle_discovered_peer_info(&info);
274                    }
275                } else {
276                    warn!(
277                        tag = "trel";
278                        "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
279                    );
280                }
281
282                responder.send().context("Unable to respond to OnInstanceChanged")?;
283            }
284
285            // A DNS-SD IPv6 service instance has been lost.
286            ServiceSubscriptionListenerRequest::OnInstanceLost { instance, responder, .. } => {
287                info!(
288                    tag = "trel";
289                    "ServiceSubscriptionListenerRequest::OnInstanceLost [PII]({instance:?})"
290                );
291                if let Some(sockaddr) = self.peer_instance_sockaddr_map.remove(&instance) {
292                    let info = ot::PlatTrelPeerInfo::new(true, &[], sockaddr);
293                    info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info);
294                    ot_instance.plat_trel_handle_discovered_peer_info(&info);
295                }
296
297                responder.send().context("Unable to respond to OnInstanceLost")?;
298            }
299
300            ServiceSubscriptionListenerRequest::OnInstanceChanged { instance, responder } => {
301                warn!(
302                    tag = "trel";
303                    "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance:?})"
304                );
305                // Skip changes without an IPv6 address.
306                responder.send().context("Unable to respond to OnInstanceChanged")?;
307            }
308
309            ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
310                instance,
311                responder,
312                ..
313            } => {
314                warn!(
315                    tag = "trel";
316                    "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance:?})"
317                );
318                // Skip discoveries without an IPv6 address.
319                responder.send().context("Unable to respond to OnInstanceDiscovered")?;
320            }
321
322            ServiceSubscriptionListenerRequest::OnQuery { resource_type, responder, .. } => {
323                info!(
324                    tag = "trel";
325                    "ServiceSubscriptionListenerRequest::OnQuery: {resource_type:?}"
326                );
327
328                // We don't care about queries.
329                responder.send().context("Unable to respond to OnQuery")?;
330            }
331        }
332        Ok(())
333    }
334
335    pub fn get_trel_counters(&self) -> *const otPlatTrelCounters {
336        self.counters.borrow().as_ot_ptr()
337    }
338
339    pub fn reset_trel_counters(&self) {
340        self.counters.borrow_mut().reset_counters()
341    }
342
343    /// Async entrypoint for I/O.
344    ///
345    /// This is explicitly not `mut` so that `on_trel_send` can be called reentrantly from here.
346    pub fn poll_io(&self, instance: &ot::Instance, cx: &mut Context<'_>) {
347        let mut buffer = [0u8; crate::UDP_PACKET_MAX_LENGTH];
348        loop {
349            match self.socket.async_recv_from(&mut buffer, cx) {
350                Poll::Ready(Ok((len, sockaddr))) => {
351                    let sockaddr: ot::SockAddr = sockaddr.as_socket_ipv6().unwrap().into();
352                    debug!(tag = "trel"; "Incoming {} byte TREL packet from {:?}", len, sockaddr);
353                    {
354                        let mut counters = self.counters.borrow_mut();
355                        counters.update_rx_bytes(len.try_into().unwrap());
356                        counters.update_rx_packets(1);
357                    }
358                    instance.plat_trel_handle_received(&buffer[..len], &sockaddr)
359                }
360                Poll::Ready(Err(err)) => {
361                    warn!(tag = "trel"; "Error receiving packet: {:?}", err);
362                    break;
363                }
364                _ => {
365                    break;
366                }
367            }
368        }
369    }
370
371    /// Async entrypoint for non-I/O
372    pub fn poll(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
373        if let Some(task) = &mut self.publication_responder {
374            if let Poll::Ready(x) = task.poll_unpin(cx) {
375                warn!(
376                    tag = "trel";
377                    "TrelInstance: publication_responder finished unexpectedly: {:?}", x
378                );
379                self.publication_responder = None;
380            }
381        }
382
383        if !self.subscriber_request_stream.is_terminated() {
384            while let Poll::Ready(Some(event)) = self.subscriber_request_stream.poll_next_unpin(cx)
385            {
386                match event {
387                    Ok(event) => {
388                        if let Err(err) = self.handle_service_subscriber_request(instance, event) {
389                            error!(
390                                tag = "trel";
391                                "Error handling service subscriber request: {err:?}"
392                            );
393                        }
394                    }
395                    Err(err) => {
396                        error!(tag = "trel"; "subscriber_request_stream FIDL error: {:?}", err);
397                    }
398                }
399            }
400        }
401    }
402}
403
404impl PlatformBacking {
405    fn on_trel_enable(&self, instance: &ot::Instance) -> Result<u16, anyhow::Error> {
406        let mut trel = self.trel.borrow_mut();
407        if let Some(trel) = trel.as_ref() {
408            Ok(trel.port())
409        } else {
410            let instance_name = hex::encode(instance.get_extended_address().as_slice());
411            let trel_instance = TrelInstance::new(instance_name)?;
412            let port = trel_instance.port();
413            trel.replace(trel_instance);
414            Ok(port)
415        }
416    }
417
418    fn on_trel_disable(&self, _instance: &ot::Instance) {
419        self.trel.replace(None);
420    }
421
422    fn on_trel_register_service(&self, _instance: &ot::Instance, port: u16, txt: &[u8]) {
423        let mut trel = self.trel.borrow_mut();
424        if let Some(trel) = trel.as_mut() {
425            info!(
426                tag = "trel";
427                "otPlatTrelRegisterService: port:{} txt:{:?}",
428                port,
429                txt.to_escaped_string()
430            );
431            trel.register_service(port, txt);
432        } else {
433            debug!(tag = "trel"; "otPlatTrelRegisterService: TREL is disabled, cannot register.");
434        }
435    }
436
437    fn on_trel_send(&self, _instance: &ot::Instance, payload: &[u8], sockaddr: &ot::SockAddr) {
438        let trel = self.trel.borrow();
439        if let Some(trel) = trel.as_ref() {
440            let mut counters = trel.counters.borrow_mut();
441            debug!(tag = "trel"; "otPlatTrelSend: {:?} -> {}", sockaddr, hex::encode(payload));
442            let mut addr: SocketAddr = (*sockaddr).into();
443            if let SocketAddr::V6(ref mut addr_v6) = addr {
444                match self.netif_index_backbone {
445                    Some(scope_id) => {
446                        if addr_v6.scope_id() == 0 {
447                            addr_v6.set_scope_id(scope_id);
448                        } else if addr_v6.scope_id() != scope_id {
449                            warn!(
450                                tag = "trel";
451                                "otPlatTrelSend: dest addr's scope_id {} does not match infra \
452                                interface {}",
453                                addr_v6.scope_id(),
454                                scope_id
455                            );
456                        }
457                    }
458                    None => {
459                        if addr_v6.scope_id() == 0 {
460                            warn!(
461                                tag = "trel";
462                                "otPlatTrelSend: No scope_id present for infra interface, dropping"
463                            );
464                            return;
465                        }
466                    }
467                }
468            }
469            match trel.socket.send_to(payload, addr).now_or_never() {
470                Some(Ok(_)) => {
471                    counters.update_tx_bytes(payload.len().try_into().unwrap());
472                    counters.update_tx_packets(1);
473                }
474                Some(Err(err)) => {
475                    counters.update_tx_failure(1);
476                    warn!(tag = "trel"; "otPlatTrelSend: send_to failed: {:?}", err);
477                }
478                None => {
479                    warn!(tag = "trel"; "otPlatTrelSend: send_to didn't finish immediately");
480                }
481            }
482        } else {
483            debug!(tag = "trel"; "otPlatTrelSend: TREL is disabled, cannot send.");
484        }
485    }
486}
487
488#[unsafe(no_mangle)]
489unsafe extern "C" fn otPlatTrelEnable(instance: *mut otInstance, port_ptr: *mut u16) {
490    match PlatformBacking::on_trel_enable(
491        // SAFETY: Must only be called from OpenThread thread,
492        unsafe { PlatformBacking::as_ref() },
493        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
494        //         which is guaranteed by the caller.
495        unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
496    ) {
497        Ok(port) => {
498            info!(tag = "trel"; "otPlatTrelEnable: Ready on port {}", port);
499            unsafe { *port_ptr = port };
500        }
501        Err(err) => {
502            warn!(tag = "trel"; "otPlatTrelEnable: Unable to start TREL: {:?}", err);
503        }
504    }
505}
506
507#[unsafe(no_mangle)]
508unsafe extern "C" fn otPlatTrelDisable(instance: *mut otInstance) {
509    PlatformBacking::on_trel_disable(
510        // SAFETY: Must only be called from OpenThread thread,
511        unsafe { PlatformBacking::as_ref() },
512        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
513        //         which is guaranteed by the caller.
514        unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
515    );
516    info!(tag = "trel"; "otPlatTrelDisable: Closed.");
517}
518
519#[unsafe(no_mangle)]
520unsafe extern "C" fn otPlatTrelRegisterService(
521    instance: *mut otInstance,
522    port: u16,
523    txt_data: *const u8,
524    txt_len: u8,
525) {
526    PlatformBacking::on_trel_register_service(
527        // SAFETY: Must only be called from OpenThread thread,
528        unsafe { PlatformBacking::as_ref() },
529        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
530        //         which is guaranteed by the caller.
531        unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
532        port,
533        // SAFETY: Caller guarantees either txt_data is valid or txt_len is zero.
534        unsafe { std::slice::from_raw_parts(txt_data, txt_len.into()) },
535    );
536}
537
538#[unsafe(no_mangle)]
539unsafe extern "C" fn otPlatTrelSend(
540    instance: *mut otInstance,
541    payload_data: *const u8,
542    payload_len: u16,
543    dest: *const otSockAddr,
544) {
545    PlatformBacking::on_trel_send(
546        // SAFETY: Must only be called from OpenThread thread,
547        unsafe { PlatformBacking::as_ref() },
548        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
549        //         which is guaranteed by the caller.
550        unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
551        // SAFETY: Caller guarantees either payload_data is valid or payload_len is zero.
552        unsafe { std::slice::from_raw_parts(payload_data, payload_len.into()) },
553        // SAFETY: Caller guarantees dest points to a valid otSockAddr.
554        unsafe { ot::SockAddr::ref_from_ot_ptr(dest) }.unwrap(),
555    );
556}
557
558#[unsafe(no_mangle)]
559unsafe extern "C" fn otPlatTrelGetCounters(
560    _instance: *mut otInstance,
561) -> *const otPlatTrelCounters {
562    if let Some(trel) = unsafe { PlatformBacking::as_ref() }.trel.borrow().as_ref() {
563        trel.get_trel_counters()
564    } else {
565        std::ptr::null()
566    }
567}
568
569#[unsafe(no_mangle)]
570unsafe extern "C" fn otPlatTrelNotifyPeerSocketAddressDifference(
571    _instance: *mut otsys::otInstance,
572    peer_sock_addr: &ot::SockAddr,
573    rx_sock_addr: &ot::SockAddr,
574) {
575    info!(tag = "trel"; "otPlatTrelNotifyPeerSocketAddressDifference: Not Implemented. peer_sock_addr {}, rx_sock_addr {}", peer_sock_addr, rx_sock_addr);
576}
577
578#[unsafe(no_mangle)]
579unsafe extern "C" fn otPlatTrelResetCounters(_instance: *mut otInstance) {
580    if let Some(trel) = unsafe { PlatformBacking::as_ref() }.trel.borrow().as_ref() {
581        trel.reset_trel_counters()
582    }
583}
584
585#[cfg(test)]
586mod test {
587    use super::*;
588
589    #[test]
590    fn test_split_txt() {
591        assert_eq!(
592            split_txt(b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb"),
593            vec![b"xa=a7bfc4981f4e4d22".to_vec(), b"xp=029c6f4dbae059cb".to_vec()]
594        );
595    }
596
597    #[test]
598    fn test_flatten_txt() {
599        assert_eq!(flatten_txt(None), vec![]);
600        assert_eq!(flatten_txt(Some(vec![])), vec![]);
601        assert_eq!(
602            flatten_txt(Some(vec![b"xa=a7bfc4981f4e4d22".to_vec()])),
603            b"\x13xa=a7bfc4981f4e4d22".to_vec()
604        );
605        assert_eq!(
606            flatten_txt(Some(vec![
607                b"xa=a7bfc4981f4e4d22".to_vec(),
608                b"xp=029c6f4dbae059cb".to_vec()
609            ])),
610            b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb".to_vec()
611        );
612    }
613}