1use crate::task::{
6 CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
7};
8use crate::vfs::buffers::{AncillaryData, InputBuffer, MessageReadInfo, OutputBuffer};
9use crate::vfs::socket::{
10 SockOptValue, Socket, SocketAddress, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer,
11 SocketShutdownFlags, SocketType,
12};
13use anyhow::Context;
14use fidl::endpoints::{SynchronousProxy, create_sync_proxy};
15use fidl_fuchsia_hardware_qualcomm_router as fqrtr;
16use starnix_logging::{log_warn, track_stub};
17use starnix_sync::{FileOpsCore, Locked, MappedMutexGuard, Mutex, MutexGuard};
18use starnix_uapi::errors::{Errno, from_status_like_fdio};
19use starnix_uapi::vfs::FdEvents;
20use starnix_uapi::{
21 AF_QIPCRTR, SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, errno, error, sockaddr_qrtr, socklen_t, ucred,
22};
23use zerocopy::{FromBytes, IntoBytes};
24
25const QRTR_CLIENT_SERVICE_DIRECTORY: &str = "/svc/fuchsia.hardware.qualcomm.router.ClientService";
26fn connect_to_connector() -> Result<fqrtr::QrtrConnectorSynchronousProxy, anyhow::Error> {
27 let mut dir = std::fs::read_dir(QRTR_CLIENT_SERVICE_DIRECTORY)
28 .context("Failed to read ClientService directory")?;
29 let entry = dir
30 .next()
31 .ok_or_else(|| anyhow::format_err!("Missing ClientService instance"))?
32 .context("Unable to read ClientService instance")?;
33 let path = entry
34 .path()
35 .join("qrtr_connector")
36 .into_os_string()
37 .into_string()
38 .map_err(|_| anyhow::format_err!("Failed to get qrtr_connector path"))?;
39
40 let (client_end, server_end) = zx::Channel::create();
41 fdio::service_connect(&path, server_end)?;
42 Ok(fqrtr::QrtrConnectorSynchronousProxy::from_channel(client_end))
43}
44
45pub const SEND_BUF_MIN_SIZE: usize = 2048;
47pub const SEND_BUF_MAX_SIZE: usize = 1 << 31;
48pub const SEND_BUF_DEFAULT_SIZE: usize = 2048;
49
50pub const RECV_BUF_MIN_SIZE: usize = 256;
52pub const RECV_BUF_MAX_SIZE: usize = 1 << 31;
53pub const RECV_BUF_DEFAULT_SIZE: usize = 256;
54
55pub struct QipcrtrSocket {
56 inner: Mutex<Option<QipcrtrSocketInner>>,
57}
58
59struct QipcrtrSocketInner {
60 proxy: fqrtr::QrtrClientConnectionSynchronousProxy,
62
63 events: zx::EventPair,
65
66 peer: Option<sockaddr_qrtr>,
69
70 send_buf_size: usize,
76
77 recv_buf_size: usize,
83}
84
85impl QipcrtrSocket {
86 pub fn new(_socket_type: SocketType) -> Self {
87 Self { inner: Mutex::new(None) }
88 }
89
90 fn connecting_lock(&self) -> Result<MappedMutexGuard<'_, QipcrtrSocketInner>, Errno> {
93 let mut inner = self.inner.lock();
94 if inner.is_none() {
95 *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
96 blocking: Some(false),
97 ..Default::default()
98 })?);
99 }
100 Ok(MutexGuard::map(inner, |inner| inner.as_mut().unwrap()))
101 }
102
103 fn close(&self) {
104 *self.inner.lock() = None;
105 }
106}
107
108impl QipcrtrSocketInner {
109 fn new(options: fqrtr::ConnectionOptions) -> Result<Self, Errno> {
110 let connector = connect_to_connector().map_err(|e| errno!(ENETUNREACH, e))?;
111
112 let (client_end, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
113 connector
114 .get_connection(&options, server_end, zx::MonotonicInstant::INFINITE)
115 .map_err(|e| errno!(ENETUNREACH, e))?
116 .map_err(qrtr_error_to_errno)?;
117
118 let proxy = fqrtr::QrtrClientConnectionSynchronousProxy::new(client_end.into_channel());
119 let events = proxy
120 .get_signals(zx::MonotonicInstant::INFINITE)
121 .map_err(|e| errno!(ENETUNREACH, e))?;
122
123 Ok(Self {
124 proxy,
125 events,
126 peer: None,
127 send_buf_size: SEND_BUF_DEFAULT_SIZE,
128 recv_buf_size: RECV_BUF_DEFAULT_SIZE,
129 })
130 }
131
132 fn bound_addr(&self) -> Result<sockaddr_qrtr, Errno> {
134 let addr = sockaddr_qrtr {
135 sq_family: AF_QIPCRTR,
136 sq_node: self
137 .proxy
138 .get_node_id(zx::MonotonicInstant::INFINITE)
139 .map_err(|e| errno!(EINVAL, e))?,
140 sq_port: self
141 .proxy
142 .get_port_id(zx::MonotonicInstant::INFINITE)
143 .map_err(|e| errno!(EINVAL, e))?,
144 ..Default::default()
145 };
146 Ok(addr)
147 }
148}
149
150impl Drop for QipcrtrSocketInner {
151 fn drop(&mut self) {
152 if let Err(e) = self.proxy.close_connection(zx::MonotonicInstant::INFINITE) {
153 log_warn!("Failed to close QRTR connection: {e:?}");
154 }
155 }
156}
157
158impl SocketOps for QipcrtrSocket {
159 fn connect(
160 &self,
161 _locked: &mut Locked<FileOpsCore>,
162 _socket: &SocketHandle,
163 _current_task: &CurrentTask,
164 peer: SocketPeer,
165 ) -> Result<(), Errno> {
166 let peer = match peer {
167 SocketPeer::Address(addr) => extract_qrtr_sockaddr(&addr)?,
168 _ => {
169 return error!(EINVAL);
170 }
171 };
172
173 let mut inner = self.inner.lock();
174 if inner.is_some() {
175 return error!(EISCONN);
176 }
177
178 let mut new_inner = QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
181 blocking: Some(false),
182 ..Default::default()
183 })?;
184 new_inner.peer = Some(peer);
185
186 *inner = Some(new_inner);
187 Ok(())
188 }
189
190 fn listen(
191 &self,
192 _locked: &mut Locked<FileOpsCore>,
193 _socket: &Socket,
194 _backlog: i32,
195 _credentials: ucred,
196 ) -> Result<(), Errno> {
197 error!(ENOTSUP)
198 }
199
200 fn accept(
201 &self,
202 _locked: &mut Locked<FileOpsCore>,
203 _socket: &Socket,
204 _current_task: &CurrentTask,
205 ) -> Result<SocketHandle, Errno> {
206 error!(ENOTSUP)
207 }
208
209 fn bind(
210 &self,
211 _locked: &mut Locked<FileOpsCore>,
212 _socket: &Socket,
213 _current_task: &CurrentTask,
214 socket_address: SocketAddress,
215 ) -> Result<(), Errno> {
216 let addr = extract_qrtr_sockaddr(&socket_address)?;
217
218 let mut inner = self.inner.lock();
219 if inner.is_some() {
220 return error!(EINVAL);
221 }
222
223 *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
225 blocking: Some(false),
226 port: Some(addr.sq_port),
227 ..Default::default()
228 })?);
229
230 Ok(())
231 }
232
233 fn read(
234 &self,
235 _locked: &mut Locked<FileOpsCore>,
236 _socket: &Socket,
237 _current_task: &CurrentTask,
238 data: &mut dyn OutputBuffer,
239 flags: SocketMessageFlags,
240 ) -> Result<MessageReadInfo, Errno> {
241 if flags.contains(SocketMessageFlags::PEEK) {
242 track_stub!(
243 TODO("https://fxbug.dev/388082019"),
244 "SocketMessageFlags::PEEK is unsupported"
245 );
246 return error!(EINVAL);
247 }
248
249 let inner = self.connecting_lock()?;
250
251 if flags.contains(SocketMessageFlags::DONTWAIT) {
252 match inner.events.wait_one(
253 zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)
254 | zx::Signals::EVENTPAIR_PEER_CLOSED,
255 zx::MonotonicInstant::INFINITE_PAST,
256 ) {
257 zx::WaitResult::Ok(_) => {}
258 zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
259 zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
260 }
261 }
262
263 let (src_node, src_port, src_data) = inner
264 .proxy
265 .read(zx::MonotonicInstant::INFINITE)
266 .map_err(|e| errno!(ECONNRESET, e))?
267 .map_err(qrtr_error_to_errno)?;
268
269 let bytes_read = data.write(src_data.as_bytes())?;
270 Ok(MessageReadInfo {
271 bytes_read,
272 message_length: src_data.len(),
273 address: Some(pack_qrtr_sockaddr(src_node, src_port)),
274 ..Default::default()
275 })
276 }
277
278 fn write(
279 &self,
280 _locked: &mut Locked<FileOpsCore>,
281 _socket: &Socket,
282 _current_task: &CurrentTask,
283 data: &mut dyn InputBuffer,
284 dest_address: &mut Option<SocketAddress>,
285 _ancillary_data: &mut Vec<AncillaryData>,
286 ) -> Result<usize, Errno> {
287 let inner = self.connecting_lock()?;
288
289 let dest = match dest_address {
292 Some(addr) => extract_qrtr_sockaddr(addr)?,
293 None => inner.peer.ok_or_else(|| errno!(EDESTADDRREQ))?,
294 };
295
296 match inner.events.wait_one(
297 zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)
298 | zx::Signals::EVENTPAIR_PEER_CLOSED,
299 zx::MonotonicInstant::INFINITE_PAST,
300 ) {
301 zx::WaitResult::Ok(_) => {}
302 zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
303 zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
304 }
305
306 let data_written = data.read_all()?;
307 let _ = inner
308 .proxy
309 .write(
310 dest.sq_node,
311 dest.sq_port,
312 data_written.as_ref(),
313 zx::MonotonicInstant::INFINITE,
314 )
315 .map_err(|e| errno!(ECONNRESET, e))?
316 .map_err(qrtr_error_to_errno)?;
317 Ok(data_written.len())
318 }
319
320 fn wait_async(
321 &self,
322 _locked: &mut Locked<FileOpsCore>,
323 _socket: &Socket,
324 _current_task: &CurrentTask,
325 waiter: &Waiter,
326 events: FdEvents,
327 handler: EventHandler,
328 ) -> WaitCanceler {
329 let Ok(inner) = self.connecting_lock() else {
330 return WaitCanceler::new_noop();
331 };
332 let signal_handler = SignalHandler {
333 inner: SignalHandlerInner::ZxHandle(qrtr_signals_to_fd_events),
334 event_handler: handler,
335 err_code: None,
336 };
337 let canceler = waiter
338 .wake_on_zircon_signals(
339 &inner.events,
340 fd_events_to_qrtr_signals(events),
341 signal_handler,
342 )
343 .unwrap();
344 WaitCanceler::new_port(canceler)
345 }
346
347 fn query_events(
348 &self,
349 _locked: &mut Locked<FileOpsCore>,
350 _socket: &Socket,
351 _current_task: &CurrentTask,
352 ) -> Result<FdEvents, Errno> {
353 let inner = self.connecting_lock()?;
354 let signals = inner
355 .events
356 .as_handle_ref()
357 .wait_one(zx::Signals::all(), zx::MonotonicInstant::INFINITE_PAST)
358 .map_err(|e| from_status_like_fdio!(e))?;
359 Ok(qrtr_signals_to_fd_events(signals))
360 }
361
362 fn shutdown(
363 &self,
364 _locked: &mut Locked<FileOpsCore>,
365 _socket: &Socket,
366 _how: SocketShutdownFlags,
367 ) -> Result<(), Errno> {
368 self.close();
369 Ok(())
370 }
371
372 fn close(
373 &self,
374 _locked: &mut Locked<FileOpsCore>,
375 _current_task: &CurrentTask,
376 _socket: &Socket,
377 ) {
378 self.close();
379 }
380
381 fn getsockname(
382 &self,
383 _locked: &mut Locked<FileOpsCore>,
384 _socket: &Socket,
385 ) -> Result<SocketAddress, Errno> {
386 let name = self.connecting_lock()?.bound_addr()?;
387 Ok(SocketAddress::Qipcrtr(name.as_bytes().to_vec()))
388 }
389
390 fn getpeername(
391 &self,
392 _locked: &mut Locked<FileOpsCore>,
393 _socket: &Socket,
394 ) -> Result<SocketAddress, Errno> {
395 let peer = self.connecting_lock()?.peer.ok_or_else(|| errno!(ENOTCONN))?;
396 Ok(SocketAddress::Qipcrtr(peer.as_bytes().to_vec()))
397 }
398
399 fn setsockopt(
400 &self,
401 _locked: &mut Locked<FileOpsCore>,
402 _socket: &Socket,
403 current_task: &CurrentTask,
404 level: u32,
405 optname: u32,
406 optval: SockOptValue,
407 ) -> Result<(), Errno> {
408 let mut inner = self.connecting_lock()?;
409 match level {
410 SOL_SOCKET => match optname {
411 SO_SNDBUF => {
412 let requested_capacity: socklen_t = optval.read(current_task)?;
413 let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
416 let capacity = capacity.clamp(SEND_BUF_MIN_SIZE, SEND_BUF_MAX_SIZE);
418 inner.send_buf_size = capacity;
419 }
420 SO_RCVBUF => {
421 let requested_capacity: socklen_t = optval.read(current_task)?;
422 let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
425 let capacity = capacity.clamp(RECV_BUF_MIN_SIZE, RECV_BUF_MAX_SIZE);
427 inner.recv_buf_size = capacity;
428 }
429 _ => return error!(ENOSYS),
430 },
431 _ => return error!(ENOSYS),
432 }
433
434 Ok(())
435 }
436
437 fn getsockopt(
438 &self,
439 _locked: &mut Locked<FileOpsCore>,
440 _socket: &Socket,
441 _current_task: &CurrentTask,
442 level: u32,
443 optname: u32,
444 _optlen: u32,
445 ) -> Result<Vec<u8>, Errno> {
446 let inner = self.connecting_lock()?;
447 Ok(match level {
448 SOL_SOCKET => match optname {
449 SO_SNDBUF => (inner.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
450 SO_RCVBUF => (inner.recv_buf_size as socklen_t).to_ne_bytes().to_vec(),
451 _ => return error!(ENOSYS),
452 },
453 _ => vec![],
454 })
455 }
456}
457
458fn extract_qrtr_sockaddr(addr: &SocketAddress) -> Result<sockaddr_qrtr, Errno> {
461 match addr {
462 SocketAddress::Qipcrtr(bytes) => sockaddr_qrtr::read_from_prefix(bytes.as_bytes())
463 .map(|(addr, _)| addr)
464 .map_err(|e| errno!(EINVAL, e)),
465 _ => error!(EINVAL),
466 }
467}
468
469fn pack_qrtr_sockaddr(node: u32, port: u32) -> SocketAddress {
471 let addr =
472 sockaddr_qrtr { sq_family: AF_QIPCRTR, sq_node: node, sq_port: port, ..Default::default() };
473 SocketAddress::Qipcrtr(addr.as_bytes().into())
474}
475
476fn qrtr_error_to_errno(e: fqrtr::Error) -> Errno {
478 match e {
479 fqrtr::Error::InternalError => errno!(EINVAL),
480 fqrtr::Error::AlreadyPending => errno!(EBUSY),
481 fqrtr::Error::RemoteNodeUnavailable => errno!(ECONNRESET),
482 fqrtr::Error::AlreadyBound => errno!(EADDRINUSE),
483 fqrtr::Error::NotSupported => errno!(ENOTSUP),
484 fqrtr::Error::WouldBlock => errno!(EAGAIN),
485 fqrtr::Error::NoResources => errno!(ENOMEM),
486 fqrtr::Error::InvalidArgs => errno!(EINVAL),
487 _ => errno!(EINVAL),
488 }
489}
490
491fn fd_events_to_qrtr_signals(events: FdEvents) -> zx::Signals {
493 let mut signals = zx::Signals::empty();
494 if events.contains(FdEvents::POLLIN) {
495 signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE);
496 }
497 if events.contains(FdEvents::POLLOUT) {
498 signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE);
499 }
500
501 signals |= zx::Signals::EVENTPAIR_PEER_CLOSED;
503 signals
504}
505
506fn qrtr_signals_to_fd_events(signals: zx::Signals) -> FdEvents {
508 let mut events = FdEvents::empty();
509 if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)) {
510 events |= FdEvents::POLLIN;
511 }
512 if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)) {
513 events |= FdEvents::POLLOUT;
514 }
515 if signals.contains(zx::Signals::EVENTPAIR_PEER_CLOSED) {
516 events |= FdEvents::POLLHUP;
517 }
518 events
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use crate::testing::spawn_kernel_and_run;
525 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
526 use crate::vfs::socket::{SocketDomain, SocketProtocol, SocketType};
527 use fidl::endpoints::create_sync_proxy;
528 use futures::StreamExt;
529
530 fn mock_qipcrtr_socket()
535 -> (QipcrtrSocket, fidl::endpoints::ServerEnd<fqrtr::QrtrClientConnectionMarker>) {
536 let (proxy, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
537
538 let (events, _) = zx::EventPair::create();
540
541 let inner = QipcrtrSocketInner {
542 proxy,
543 events,
544 peer: None,
545 send_buf_size: SEND_BUF_DEFAULT_SIZE,
546 recv_buf_size: RECV_BUF_DEFAULT_SIZE,
547 };
548
549 (QipcrtrSocket { inner: Mutex::new(Some(inner)) }, server_end)
550 }
551
552 #[::fuchsia::test]
553 async fn test_qipcrtr_socket_new() {
554 spawn_kernel_and_run(async |locked, current_task| {
555 let _kernel = current_task.kernel();
556 let _socket = Socket::new(
563 locked,
564 ¤t_task,
565 SocketDomain::Qipcrtr,
566 SocketType::Datagram,
567 SocketProtocol::default(),
568 false,
569 )
570 .expect("Failed to create socket.");
571 })
572 .await;
573 }
574
575 #[::fuchsia::test]
576 async fn test_qipcrtr_sockopt() {
577 spawn_kernel_and_run(async |locked, current_task| {
578 let socket = mock_qipcrtr_socket();
579 let socket_obj = Socket::new_with_ops_and_info(
580 Box::new(socket.0),
581 SocketDomain::Qipcrtr,
582 SocketType::Datagram,
583 SocketProtocol::default(),
584 );
585 let _server_end = socket.1;
586
587 let sndbuf =
589 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
590 let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
591 assert_eq!(sndbuf_val, SEND_BUF_DEFAULT_SIZE as u32);
592
593 let new_sndbuf: u32 = 4096;
594 socket_obj
595 .setsockopt(
596 locked,
597 ¤t_task,
598 SOL_SOCKET,
599 SO_SNDBUF,
600 SockOptValue::from(new_sndbuf.as_bytes().to_vec()),
601 )
602 .unwrap();
603
604 let sndbuf =
605 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
606 let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
607 assert_eq!(sndbuf_val, new_sndbuf * 2);
609
610 let rcvbuf =
612 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
613 let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
614 assert_eq!(rcvbuf_val, RECV_BUF_DEFAULT_SIZE as u32);
615
616 let new_rcvbuf: u32 = 1024;
617 socket_obj
618 .setsockopt(
619 locked,
620 ¤t_task,
621 SOL_SOCKET,
622 SO_RCVBUF,
623 SockOptValue::from(new_rcvbuf.as_bytes().to_vec()),
624 )
625 .unwrap();
626
627 let rcvbuf =
628 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
629 let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
630 assert_eq!(rcvbuf_val, new_rcvbuf * 2);
632 })
633 .await;
634 }
635
636 #[::fuchsia::test]
637 async fn test_qipcrtr_sockname() {
638 let (socket_inner, server_end) = mock_qipcrtr_socket();
639 std::thread::spawn(move || {
641 let mut executor = fuchsia_async::LocalExecutor::default();
642 executor.run_singlethreaded(async move {
643 let mut stream = server_end.into_stream();
644 while let Some(Ok(request)) = stream.next().await {
645 match request {
646 fqrtr::QrtrClientConnectionRequest::GetNodeId { responder, .. } => {
647 let _ = responder.send(123).unwrap();
648 }
649 fqrtr::QrtrClientConnectionRequest::GetPortId { responder, .. } => {
650 let _ = responder.send(456).unwrap();
651 }
652 fqrtr::QrtrClientConnectionRequest::CloseConnection {
653 responder, ..
654 } => {
655 let _ = responder.send();
656 }
657 _ => panic!("Unexpected request: {:?}", request),
658 }
659 }
660 });
661 });
662
663 spawn_kernel_and_run(async |locked, _current_task| {
664 let socket_obj = Socket::new_with_ops_and_info(
665 Box::new(socket_inner),
666 SocketDomain::Qipcrtr,
667 SocketType::Datagram,
668 SocketProtocol::default(),
669 );
670
671 let addr = socket_obj.getsockname(locked).unwrap();
672 let qrtr_addr = extract_qrtr_sockaddr(&addr).unwrap();
673 assert_eq!(qrtr_addr.sq_node, 123);
674 assert_eq!(qrtr_addr.sq_port, 456);
675
676 let peer_addr = sockaddr_qrtr {
678 sq_family: AF_QIPCRTR,
679 sq_node: 10,
680 sq_port: 20,
681 ..Default::default()
682 };
683 socket_obj
684 .downcast_socket::<QipcrtrSocket>()
685 .unwrap()
686 .inner
687 .lock()
688 .as_mut()
689 .unwrap()
690 .peer = Some(peer_addr);
691
692 let peer = socket_obj.getpeername(locked).unwrap();
693 let peer_qrtr = extract_qrtr_sockaddr(&peer).unwrap();
694 assert_eq!(peer_qrtr.sq_node, 10);
695 assert_eq!(peer_qrtr.sq_port, 20);
696 })
697 .await;
698 }
699
700 #[::fuchsia::test]
701 async fn test_qipcrtr_read_write() {
702 let (socket_inner, server_end) = mock_qipcrtr_socket();
703 std::thread::spawn(move || {
704 let mut executor = fuchsia_async::LocalExecutor::default();
705 executor.run_singlethreaded(async move {
706 let mut stream = server_end.into_stream();
707 while let Some(Ok(request)) = stream.next().await {
708 match request {
709 fqrtr::QrtrClientConnectionRequest::Write {
710 dst_node_id,
711 dst_port,
712 data,
713 responder,
714 ..
715 } => {
716 assert_eq!(dst_node_id, 10);
717 assert_eq!(dst_port, 20);
718 assert_eq!(data, b"hello");
719 let _ = responder.send(Ok(())).unwrap();
720 }
721 fqrtr::QrtrClientConnectionRequest::Read { responder, .. } => {
722 let _ = responder.send(Ok((5, 15, b"world"))).unwrap();
723 }
724 fqrtr::QrtrClientConnectionRequest::CloseConnection {
725 responder, ..
726 } => {
727 let _ = responder.send();
728 }
729 _ => panic!("Unexpected request: {:?}", request),
730 }
731 }
732 });
733 });
734
735 spawn_kernel_and_run(async |locked, current_task| {
736 let socket_obj = Socket::new_with_ops_and_info(
737 Box::new(socket_inner),
738 SocketDomain::Qipcrtr,
739 SocketType::Datagram,
740 SocketProtocol::default(),
741 );
742 let peer_addr = sockaddr_qrtr {
744 sq_family: AF_QIPCRTR,
745 sq_node: 10,
746 sq_port: 20,
747 ..Default::default()
748 };
749 socket_obj
750 .downcast_socket::<QipcrtrSocket>()
751 .unwrap()
752 .inner
753 .lock()
754 .as_mut()
755 .unwrap()
756 .peer = Some(peer_addr);
757
758 let mut input = VecInputBuffer::new(b"hello");
760 let written = socket_obj
761 .write(locked, ¤t_task, &mut input, &mut None, &mut vec![])
762 .unwrap();
763 assert_eq!(written, 5);
764
765 let mut output = VecOutputBuffer::new(100);
767 let info = socket_obj
768 .read(locked, ¤t_task, &mut output, SocketMessageFlags::empty())
769 .unwrap();
770 assert_eq!(info.bytes_read, 5);
771 assert_eq!(output.data(), b"world");
772
773 let addr = extract_qrtr_sockaddr(&info.address.unwrap()).unwrap();
774 assert_eq!(addr.sq_node, 5);
775 assert_eq!(addr.sq_port, 15);
776 })
777 .await;
778 }
779}