1use super::*;
6use crate::to_escaped_string::*;
7use anyhow::Context as _;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_net_mdns::*;
10use fuchsia_async::Task;
11use futures::stream::FusedStream;
12use openthread_sys::*;
13use ot::{PlatTrel as _, TrelCounters};
14use std::collections::HashMap;
15use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
16use std::task::{Context, Poll};
17
18pub(crate) struct TrelInstance {
19 socket: fasync::net::UdpSocket,
20 publication_responder: Option<Task<Result<(), anyhow::Error>>>,
21 instance_name: String,
22 peer_instance_sockaddr_map: HashMap<String, ot::SockAddr>,
23
24 #[allow(dead_code)] subscriber: ServiceSubscriber2Proxy,
26
27 subscriber_request_stream: ServiceSubscriptionListenerRequestStream,
28
29 counters: RefCell<TrelCounters>,
30}
31
32fn flatten_txt(txt: Option<Vec<Vec<u8>>>) -> Vec<u8> {
34 let mut ret = vec![];
35
36 for mut txt in txt.iter().flat_map(|x| x.iter()).map(Vec::as_slice) {
37 if txt.len() > u8::MAX as usize {
38 txt = &txt[0..(u8::MAX as usize) + 1];
40 }
41 ret.push(u8::try_from(txt.len()).unwrap());
42 ret.extend_from_slice(txt);
43 }
44
45 ret
46}
47
48fn process_addresses_from_socket_addresses<
51 T: IntoIterator<Item = fidl_fuchsia_net::SocketAddress>,
52>(
53 addresses: T,
54) -> (Vec<ot::Ip6Address>, Option<u16>) {
55 let mut ret_port: Option<u16> = None;
56 let mut addresses =
57 addresses
58 .into_iter()
59 .flat_map(|x| {
60 if let fidl_fuchsia_net::SocketAddress::Ipv6(
61 fidl_fuchsia_net::Ipv6SocketAddress { address, port, .. },
62 ) = x
63 {
64 let addr = ot::Ip6Address::from(address.addr);
65 if ret_port.is_none() {
66 ret_port = Some(port);
67 } else if ret_port != Some(port) {
68 warn!(
69 tag = "trel";
70 "mDNS service has multiple ports for the same service, {:?} != {:?}",
71 ret_port.unwrap(),
72 port
73 );
74 }
75 if ipv6addr_is_unicast_link_local(&addr) {
77 return Some(addr);
78 }
79 }
80 None
81 })
82 .collect::<Vec<_>>();
83 addresses.sort();
84 (addresses, ret_port)
85}
86
87fn ipv6addr_is_unicast_link_local(addr: &std::net::Ipv6Addr) -> bool {
92 (addr.segments()[0] & 0xffc0) == 0xfe80
93}
94
95fn split_txt(txt: &[u8]) -> Vec<Vec<u8>> {
97 info!(tag = "trel"; "trel:split_txt: Splitting TXT record: {:?}", hex::encode(txt));
98 let txt =
99 ot::DnsTxtEntryIterator::try_new(txt).expect("can't parse TXT records from OpenThread");
100 txt.map(|x| x.expect("can't parse TXT records from OpenThread").to_vec()).collect::<Vec<_>>()
101}
102
103impl TrelInstance {
104 fn new(instance_name: String) -> Result<TrelInstance, anyhow::Error> {
105 let (client, server) = create_endpoints::<ServiceSubscriptionListenerMarker>();
106
107 let subscriber =
108 fuchsia_component::client::connect_to_protocol::<ServiceSubscriber2Marker>().unwrap();
109
110 subscriber
111 .subscribe_to_service(
112 ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
113 &ServiceSubscriptionOptions { exclude_local: Some(true), ..Default::default() },
114 client,
115 )
116 .context("Unable to subscribe to TREL services")?;
117
118 Ok(TrelInstance {
119 socket: fasync::net::UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
120 Ipv6Addr::UNSPECIFIED,
121 0,
122 0,
123 0,
124 )))
125 .context("Unable to open TREL UDP socket")?,
126 publication_responder: None,
127 instance_name,
128 peer_instance_sockaddr_map: HashMap::default(),
129 subscriber,
130 subscriber_request_stream: server.into_stream(),
131 counters: RefCell::new(TrelCounters::default()),
132 })
133 }
134
135 fn port(&self) -> u16 {
136 self.socket.local_addr().unwrap().port()
137 }
138
139 fn register_service(&mut self, port: u16, txt: &[u8]) {
140 let txt = split_txt(txt);
141
142 let (client, server) = create_endpoints::<ServiceInstancePublicationResponder_Marker>();
143
144 let publisher =
145 fuchsia_component::client::connect_to_protocol::<ServiceInstancePublisherMarker>()
146 .unwrap();
147
148 let publish_init_future = publisher
149 .publish_service_instance(
150 ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
151 self.instance_name.as_str(),
152 &ServiceInstancePublicationOptions::default(),
153 client,
154 )
155 .map(|x| -> Result<(), anyhow::Error> {
156 match x {
157 Ok(Ok(x)) => Ok(x),
158 Ok(Err(err)) => Err(anyhow::format_err!("{:?}", err)),
159 Err(zx_err) => Err(zx_err.into()),
160 }
161 });
162
163 let publish_responder_future = server.into_stream().map_err(Into::into).try_for_each(
164 move |ServiceInstancePublicationResponder_Request::OnPublication {
165 responder, ..
166 }| {
167 let txt = txt.clone();
168 let _publisher = publisher.clone();
169 async move {
170 responder
171 .send(Ok(&ServiceInstancePublication {
172 port: Some(port),
173 text: Some(txt),
174 ..Default::default()
175 }))
176 .map_err(Into::into)
177 }
178 },
179 );
180
181 let future =
182 futures::future::try_join(publish_init_future, publish_responder_future).map_ok(|_| ());
183
184 self.publication_responder = Some(fuchsia_async::Task::spawn(future));
185 }
186
187 pub fn handle_service_subscriber_request(
188 &mut self,
189 ot_instance: &ot::Instance,
190 service_subscriber_request: ServiceSubscriptionListenerRequest,
191 ) -> Result<(), anyhow::Error> {
192 match service_subscriber_request {
193 ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
195 instance:
196 ServiceInstance {
197 instance: Some(instance_name),
198 addresses: Some(addresses),
199 text_strings,
200 ..
201 },
202 responder,
203 } => {
204 let txt = flatten_txt(text_strings);
205
206 let (addresses, port) = process_addresses_from_socket_addresses(addresses);
207
208 info!(
209 tag = "trel";
210 "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
211 );
212
213 if let Some(address) = addresses.first() {
215 let sockaddr = ot::SockAddr::new(*address, port.unwrap());
216
217 self.peer_instance_sockaddr_map.insert(instance_name, sockaddr);
218
219 let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
220 info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Adding {:?}", info);
221 ot_instance.plat_trel_handle_discovered_peer_info(&info);
222 } else {
223 warn!(
224 tag = "trel";
225 "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
226 );
227 }
228
229 responder.send().context("Unable to respond to OnInstanceDiscovered")?;
230 }
231
232 ServiceSubscriptionListenerRequest::OnInstanceChanged {
234 instance:
235 ServiceInstance {
236 instance: Some(instance_name),
237 addresses: Some(addresses),
238 text_strings,
239 ..
240 },
241 responder,
242 } => {
243 let txt = flatten_txt(text_strings);
244 let (addresses, port) = process_addresses_from_socket_addresses(addresses);
245
246 info!(
247 tag = "trel";
248 "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
249 );
250
251 if let Some(address) = addresses.first() {
253 let sockaddr = ot::SockAddr::new(*address, port.unwrap());
254
255 if let Some(old_sockaddr) =
256 self.peer_instance_sockaddr_map.insert(instance_name, sockaddr)
257 {
258 if old_sockaddr != sockaddr {
259 let info_old = ot::PlatTrelPeerInfo::new(true, &[], old_sockaddr);
261 info!(
262 tag = "trel";
263 "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info_old
264 );
265 ot_instance.plat_trel_handle_discovered_peer_info(&info_old);
266 }
267
268 let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
269 info!(
270 tag = "trel";
271 "otPlatTrelHandleDiscoveredPeerInfo: Updating {:?}", info
272 );
273 ot_instance.plat_trel_handle_discovered_peer_info(&info);
274 }
275 } else {
276 warn!(
277 tag = "trel";
278 "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
279 );
280 }
281
282 responder.send().context("Unable to respond to OnInstanceChanged")?;
283 }
284
285 ServiceSubscriptionListenerRequest::OnInstanceLost { instance, responder, .. } => {
287 info!(
288 tag = "trel";
289 "ServiceSubscriptionListenerRequest::OnInstanceLost [PII]({instance:?})"
290 );
291 if let Some(sockaddr) = self.peer_instance_sockaddr_map.remove(&instance) {
292 let info = ot::PlatTrelPeerInfo::new(true, &[], sockaddr);
293 info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info);
294 ot_instance.plat_trel_handle_discovered_peer_info(&info);
295 }
296
297 responder.send().context("Unable to respond to OnInstanceLost")?;
298 }
299
300 ServiceSubscriptionListenerRequest::OnInstanceChanged { instance, responder } => {
301 warn!(
302 tag = "trel";
303 "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance:?})"
304 );
305 responder.send().context("Unable to respond to OnInstanceChanged")?;
307 }
308
309 ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
310 instance,
311 responder,
312 ..
313 } => {
314 warn!(
315 tag = "trel";
316 "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance:?})"
317 );
318 responder.send().context("Unable to respond to OnInstanceDiscovered")?;
320 }
321
322 ServiceSubscriptionListenerRequest::OnQuery { resource_type, responder, .. } => {
323 info!(
324 tag = "trel";
325 "ServiceSubscriptionListenerRequest::OnQuery: {resource_type:?}"
326 );
327
328 responder.send().context("Unable to respond to OnQuery")?;
330 }
331 }
332 Ok(())
333 }
334
335 pub fn get_trel_counters(&self) -> *const otPlatTrelCounters {
336 self.counters.borrow().as_ot_ptr()
337 }
338
339 pub fn reset_trel_counters(&self) {
340 self.counters.borrow_mut().reset_counters()
341 }
342
343 pub fn poll_io(&self, instance: &ot::Instance, cx: &mut Context<'_>) {
347 let mut buffer = [0u8; crate::UDP_PACKET_MAX_LENGTH];
348 loop {
349 match self.socket.async_recv_from(&mut buffer, cx) {
350 Poll::Ready(Ok((len, sockaddr))) => {
351 let sockaddr: ot::SockAddr = sockaddr.as_socket_ipv6().unwrap().into();
352 debug!(tag = "trel"; "Incoming {} byte TREL packet from {:?}", len, sockaddr);
353 {
354 let mut counters = self.counters.borrow_mut();
355 counters.update_rx_bytes(len.try_into().unwrap());
356 counters.update_rx_packets(1);
357 }
358 instance.plat_trel_handle_received(&buffer[..len], &sockaddr)
359 }
360 Poll::Ready(Err(err)) => {
361 warn!(tag = "trel"; "Error receiving packet: {:?}", err);
362 break;
363 }
364 _ => {
365 break;
366 }
367 }
368 }
369 }
370
371 pub fn poll(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
373 if let Some(task) = &mut self.publication_responder {
374 if let Poll::Ready(x) = task.poll_unpin(cx) {
375 warn!(
376 tag = "trel";
377 "TrelInstance: publication_responder finished unexpectedly: {:?}", x
378 );
379 self.publication_responder = None;
380 }
381 }
382
383 if !self.subscriber_request_stream.is_terminated() {
384 while let Poll::Ready(Some(event)) = self.subscriber_request_stream.poll_next_unpin(cx)
385 {
386 match event {
387 Ok(event) => {
388 if let Err(err) = self.handle_service_subscriber_request(instance, event) {
389 error!(
390 tag = "trel";
391 "Error handling service subscriber request: {err:?}"
392 );
393 }
394 }
395 Err(err) => {
396 error!(tag = "trel"; "subscriber_request_stream FIDL error: {:?}", err);
397 }
398 }
399 }
400 }
401 }
402}
403
404impl PlatformBacking {
405 fn on_trel_enable(&self, instance: &ot::Instance) -> Result<u16, anyhow::Error> {
406 let mut trel = self.trel.borrow_mut();
407 if let Some(trel) = trel.as_ref() {
408 Ok(trel.port())
409 } else {
410 let instance_name = hex::encode(instance.get_extended_address().as_slice());
411 let trel_instance = TrelInstance::new(instance_name)?;
412 let port = trel_instance.port();
413 trel.replace(trel_instance);
414 Ok(port)
415 }
416 }
417
418 fn on_trel_disable(&self, _instance: &ot::Instance) {
419 self.trel.replace(None);
420 }
421
422 fn on_trel_register_service(&self, _instance: &ot::Instance, port: u16, txt: &[u8]) {
423 let mut trel = self.trel.borrow_mut();
424 if let Some(trel) = trel.as_mut() {
425 info!(
426 tag = "trel";
427 "otPlatTrelRegisterService: port:{} txt:{:?}",
428 port,
429 txt.to_escaped_string()
430 );
431 trel.register_service(port, txt);
432 } else {
433 debug!(tag = "trel"; "otPlatTrelRegisterService: TREL is disabled, cannot register.");
434 }
435 }
436
437 fn on_trel_send(&self, _instance: &ot::Instance, payload: &[u8], sockaddr: &ot::SockAddr) {
438 let trel = self.trel.borrow();
439 if let Some(trel) = trel.as_ref() {
440 let mut counters = trel.counters.borrow_mut();
441 debug!(tag = "trel"; "otPlatTrelSend: {:?} -> {}", sockaddr, hex::encode(payload));
442 let mut addr: SocketAddr = (*sockaddr).into();
443 if let SocketAddr::V6(ref mut addr_v6) = addr {
444 match self.netif_index_backbone {
445 Some(scope_id) => {
446 if addr_v6.scope_id() == 0 {
447 addr_v6.set_scope_id(scope_id);
448 } else if addr_v6.scope_id() != scope_id {
449 warn!(
450 tag = "trel";
451 "otPlatTrelSend: dest addr's scope_id {} does not match infra \
452 interface {}",
453 addr_v6.scope_id(),
454 scope_id
455 );
456 }
457 }
458 None => {
459 if addr_v6.scope_id() == 0 {
460 warn!(
461 tag = "trel";
462 "otPlatTrelSend: No scope_id present for infra interface, dropping"
463 );
464 return;
465 }
466 }
467 }
468 }
469 match trel.socket.send_to(payload, addr).now_or_never() {
470 Some(Ok(_)) => {
471 counters.update_tx_bytes(payload.len().try_into().unwrap());
472 counters.update_tx_packets(1);
473 }
474 Some(Err(err)) => {
475 counters.update_tx_failure(1);
476 warn!(tag = "trel"; "otPlatTrelSend: send_to failed: {:?}", err);
477 }
478 None => {
479 warn!(tag = "trel"; "otPlatTrelSend: send_to didn't finish immediately");
480 }
481 }
482 } else {
483 debug!(tag = "trel"; "otPlatTrelSend: TREL is disabled, cannot send.");
484 }
485 }
486}
487
488#[unsafe(no_mangle)]
489unsafe extern "C" fn otPlatTrelEnable(instance: *mut otInstance, port_ptr: *mut u16) {
490 match PlatformBacking::on_trel_enable(
491 unsafe { PlatformBacking::as_ref() },
493 unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
496 ) {
497 Ok(port) => {
498 info!(tag = "trel"; "otPlatTrelEnable: Ready on port {}", port);
499 unsafe { *port_ptr = port };
500 }
501 Err(err) => {
502 warn!(tag = "trel"; "otPlatTrelEnable: Unable to start TREL: {:?}", err);
503 }
504 }
505}
506
507#[unsafe(no_mangle)]
508unsafe extern "C" fn otPlatTrelDisable(instance: *mut otInstance) {
509 PlatformBacking::on_trel_disable(
510 unsafe { PlatformBacking::as_ref() },
512 unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
515 );
516 info!(tag = "trel"; "otPlatTrelDisable: Closed.");
517}
518
519#[unsafe(no_mangle)]
520unsafe extern "C" fn otPlatTrelRegisterService(
521 instance: *mut otInstance,
522 port: u16,
523 txt_data: *const u8,
524 txt_len: u8,
525) {
526 PlatformBacking::on_trel_register_service(
527 unsafe { PlatformBacking::as_ref() },
529 unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
532 port,
533 unsafe { std::slice::from_raw_parts(txt_data, txt_len.into()) },
535 );
536}
537
538#[unsafe(no_mangle)]
539unsafe extern "C" fn otPlatTrelSend(
540 instance: *mut otInstance,
541 payload_data: *const u8,
542 payload_len: u16,
543 dest: *const otSockAddr,
544) {
545 PlatformBacking::on_trel_send(
546 unsafe { PlatformBacking::as_ref() },
548 unsafe { ot::Instance::ref_from_ot_ptr(instance) }.unwrap(),
551 unsafe { std::slice::from_raw_parts(payload_data, payload_len.into()) },
553 unsafe { ot::SockAddr::ref_from_ot_ptr(dest) }.unwrap(),
555 );
556}
557
558#[unsafe(no_mangle)]
559unsafe extern "C" fn otPlatTrelGetCounters(
560 _instance: *mut otInstance,
561) -> *const otPlatTrelCounters {
562 if let Some(trel) = unsafe { PlatformBacking::as_ref() }.trel.borrow().as_ref() {
563 trel.get_trel_counters()
564 } else {
565 std::ptr::null()
566 }
567}
568
569#[unsafe(no_mangle)]
570unsafe extern "C" fn otPlatTrelNotifyPeerSocketAddressDifference(
571 _instance: *mut otsys::otInstance,
572 peer_sock_addr: &ot::SockAddr,
573 rx_sock_addr: &ot::SockAddr,
574) {
575 info!(tag = "trel"; "otPlatTrelNotifyPeerSocketAddressDifference: Not Implemented. peer_sock_addr {}, rx_sock_addr {}", peer_sock_addr, rx_sock_addr);
576}
577
578#[unsafe(no_mangle)]
579unsafe extern "C" fn otPlatTrelResetCounters(_instance: *mut otInstance) {
580 if let Some(trel) = unsafe { PlatformBacking::as_ref() }.trel.borrow().as_ref() {
581 trel.reset_trel_counters()
582 }
583}
584
585#[cfg(test)]
586mod test {
587 use super::*;
588
589 #[test]
590 fn test_split_txt() {
591 assert_eq!(
592 split_txt(b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb"),
593 vec![b"xa=a7bfc4981f4e4d22".to_vec(), b"xp=029c6f4dbae059cb".to_vec()]
594 );
595 }
596
597 #[test]
598 fn test_flatten_txt() {
599 assert_eq!(flatten_txt(None), vec![]);
600 assert_eq!(flatten_txt(Some(vec![])), vec![]);
601 assert_eq!(
602 flatten_txt(Some(vec![b"xa=a7bfc4981f4e4d22".to_vec()])),
603 b"\x13xa=a7bfc4981f4e4d22".to_vec()
604 );
605 assert_eq!(
606 flatten_txt(Some(vec![
607 b"xa=a7bfc4981f4e4d22".to_vec(),
608 b"xp=029c6f4dbae059cb".to_vec()
609 ])),
610 b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb".to_vec()
611 );
612 }
613}