1use crate::task::{
6 CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
7};
8use crate::vfs::buffers::{AncillaryData, InputBuffer, MessageReadInfo, OutputBuffer};
9use crate::vfs::socket::{
10 SockOptValue, Socket, SocketAddress, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer,
11 SocketShutdownFlags, SocketType,
12};
13use fidl::endpoints::create_sync_proxy;
14use fidl_fuchsia_hardware_qualcomm_router as fqrtr;
15use starnix_logging::{log_warn, track_stub};
16use starnix_sync::{FileOpsCore, Locked, MappedMutexGuard, Mutex, MutexGuard};
17use starnix_uapi::errors::{Errno, from_status_like_fdio};
18use starnix_uapi::vfs::FdEvents;
19use starnix_uapi::{
20 AF_QIPCRTR, SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, errno, error, sockaddr_qrtr, socklen_t, ucred,
21};
22use zerocopy::{FromBytes, IntoBytes};
23
24pub const SEND_BUF_MIN_SIZE: usize = 2048;
26pub const SEND_BUF_MAX_SIZE: usize = 1 << 31;
27pub const SEND_BUF_DEFAULT_SIZE: usize = 2048;
28
29pub const RECV_BUF_MIN_SIZE: usize = 256;
31pub const RECV_BUF_MAX_SIZE: usize = 1 << 31;
32pub const RECV_BUF_DEFAULT_SIZE: usize = 256;
33
34pub struct QipcrtrSocket {
35 inner: Mutex<Option<QipcrtrSocketInner>>,
36}
37
38struct QipcrtrSocketInner {
39 proxy: fqrtr::QrtrClientConnectionSynchronousProxy,
41
42 events: zx::EventPair,
44
45 peer: Option<sockaddr_qrtr>,
48
49 send_buf_size: usize,
55
56 recv_buf_size: usize,
62}
63
64impl QipcrtrSocket {
65 pub fn new(_socket_type: SocketType) -> Self {
66 Self { inner: Mutex::new(None) }
67 }
68
69 fn connecting_lock(&self) -> Result<MappedMutexGuard<'_, QipcrtrSocketInner>, Errno> {
72 let mut inner = self.inner.lock();
73 if inner.is_none() {
74 *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
75 blocking: Some(false),
76 ..Default::default()
77 })?);
78 }
79 Ok(MutexGuard::map(inner, |inner| inner.as_mut().unwrap()))
80 }
81
82 fn close(&self) {
83 *self.inner.lock() = None;
84 }
85}
86
87impl QipcrtrSocketInner {
88 fn new(options: fqrtr::ConnectionOptions) -> Result<Self, Errno> {
89 let connector =
90 fuchsia_component::client::connect_to_protocol_sync::<fqrtr::QrtrConnectorMarker>()
91 .map_err(|e| errno!(ENETUNREACH, e))?;
92
93 let (client_end, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
94 connector
95 .get_connection(&options, server_end, zx::MonotonicInstant::INFINITE)
96 .map_err(|e| errno!(ENETUNREACH, e))?
97 .map_err(qrtr_error_to_errno)?;
98
99 let proxy = fqrtr::QrtrClientConnectionSynchronousProxy::new(client_end.into_channel());
100 let events = proxy
101 .get_signals(zx::MonotonicInstant::INFINITE)
102 .map_err(|e| errno!(ENETUNREACH, e))?;
103
104 Ok(Self {
105 proxy,
106 events,
107 peer: None,
108 send_buf_size: SEND_BUF_DEFAULT_SIZE,
109 recv_buf_size: RECV_BUF_DEFAULT_SIZE,
110 })
111 }
112
113 fn bound_addr(&self) -> Result<sockaddr_qrtr, Errno> {
115 let addr = sockaddr_qrtr {
116 sq_family: AF_QIPCRTR,
117 sq_node: self
118 .proxy
119 .get_node_id(zx::MonotonicInstant::INFINITE)
120 .map_err(|e| errno!(EINVAL, e))?,
121 sq_port: self
122 .proxy
123 .get_port_id(zx::MonotonicInstant::INFINITE)
124 .map_err(|e| errno!(EINVAL, e))?,
125 ..Default::default()
126 };
127 Ok(addr)
128 }
129}
130
131impl Drop for QipcrtrSocketInner {
132 fn drop(&mut self) {
133 if let Err(e) = self.proxy.close_connection(zx::MonotonicInstant::INFINITE) {
134 log_warn!("Failed to close QRTR connection: {e:?}");
135 }
136 }
137}
138
139impl SocketOps for QipcrtrSocket {
140 fn connect(
141 &self,
142 _locked: &mut Locked<FileOpsCore>,
143 _socket: &SocketHandle,
144 _current_task: &CurrentTask,
145 peer: SocketPeer,
146 ) -> Result<(), Errno> {
147 let peer = match peer {
148 SocketPeer::Address(addr) => extract_qrtr_sockaddr(&addr)?,
149 _ => {
150 return error!(EINVAL);
151 }
152 };
153
154 let mut inner = self.inner.lock();
155 if inner.is_some() {
156 return error!(EISCONN);
157 }
158
159 let mut new_inner = QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
162 blocking: Some(false),
163 ..Default::default()
164 })?;
165 new_inner.peer = Some(peer);
166
167 *inner = Some(new_inner);
168 Ok(())
169 }
170
171 fn listen(
172 &self,
173 _locked: &mut Locked<FileOpsCore>,
174 _socket: &Socket,
175 _backlog: i32,
176 _credentials: ucred,
177 ) -> Result<(), Errno> {
178 error!(ENOTSUP)
179 }
180
181 fn accept(
182 &self,
183 _locked: &mut Locked<FileOpsCore>,
184 _socket: &Socket,
185 _current_task: &CurrentTask,
186 ) -> Result<SocketHandle, Errno> {
187 error!(ENOTSUP)
188 }
189
190 fn bind(
191 &self,
192 _locked: &mut Locked<FileOpsCore>,
193 _socket: &Socket,
194 _current_task: &CurrentTask,
195 socket_address: SocketAddress,
196 ) -> Result<(), Errno> {
197 let addr = extract_qrtr_sockaddr(&socket_address)?;
198
199 let mut inner = self.inner.lock();
200 if inner.is_some() {
201 return error!(EINVAL);
202 }
203
204 *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
206 blocking: Some(false),
207 port: Some(addr.sq_port),
208 ..Default::default()
209 })?);
210
211 Ok(())
212 }
213
214 fn read(
215 &self,
216 _locked: &mut Locked<FileOpsCore>,
217 _socket: &Socket,
218 _current_task: &CurrentTask,
219 data: &mut dyn OutputBuffer,
220 flags: SocketMessageFlags,
221 ) -> Result<MessageReadInfo, Errno> {
222 if flags.contains(SocketMessageFlags::PEEK) {
223 track_stub!(
224 TODO("https://fxbug.dev/388082019"),
225 "SocketMessageFlags::PEEK is unsupported"
226 );
227 return error!(EINVAL);
228 }
229
230 let inner = self.connecting_lock()?;
231
232 if flags.contains(SocketMessageFlags::DONTWAIT) {
233 match inner.events.wait_one(
234 zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)
235 | zx::Signals::EVENTPAIR_PEER_CLOSED,
236 zx::MonotonicInstant::INFINITE_PAST,
237 ) {
238 zx::WaitResult::Ok(_) => {}
239 zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
240 zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
241 }
242 }
243
244 let (src_node, src_port, src_data) = inner
245 .proxy
246 .read(zx::MonotonicInstant::INFINITE)
247 .map_err(|e| errno!(ECONNRESET, e))?
248 .map_err(qrtr_error_to_errno)?;
249
250 let bytes_read = data.write(src_data.as_bytes())?;
251 Ok(MessageReadInfo {
252 bytes_read,
253 message_length: src_data.len(),
254 address: Some(pack_qrtr_sockaddr(src_node, src_port)),
255 ..Default::default()
256 })
257 }
258
259 fn write(
260 &self,
261 _locked: &mut Locked<FileOpsCore>,
262 _socket: &Socket,
263 _current_task: &CurrentTask,
264 data: &mut dyn InputBuffer,
265 dest_address: &mut Option<SocketAddress>,
266 _ancillary_data: &mut Vec<AncillaryData>,
267 ) -> Result<usize, Errno> {
268 let inner = self.connecting_lock()?;
269
270 let dest = match dest_address {
273 Some(addr) => extract_qrtr_sockaddr(addr)?,
274 None => inner.peer.ok_or_else(|| errno!(EDESTADDRREQ))?,
275 };
276
277 match inner.events.wait_one(
278 zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)
279 | zx::Signals::EVENTPAIR_PEER_CLOSED,
280 zx::MonotonicInstant::INFINITE_PAST,
281 ) {
282 zx::WaitResult::Ok(_) => {}
283 zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
284 zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
285 }
286
287 let data_written = data.read_all()?;
288 let _ = inner
289 .proxy
290 .write(
291 dest.sq_node,
292 dest.sq_port,
293 data_written.as_ref(),
294 zx::MonotonicInstant::INFINITE,
295 )
296 .map_err(|e| errno!(ECONNRESET, e))?
297 .map_err(qrtr_error_to_errno)?;
298 Ok(data_written.len())
299 }
300
301 fn wait_async(
302 &self,
303 _locked: &mut Locked<FileOpsCore>,
304 _socket: &Socket,
305 _current_task: &CurrentTask,
306 waiter: &Waiter,
307 events: FdEvents,
308 handler: EventHandler,
309 ) -> WaitCanceler {
310 let Ok(inner) = self.connecting_lock() else {
311 return WaitCanceler::new_noop();
312 };
313 let signal_handler = SignalHandler {
314 inner: SignalHandlerInner::ZxHandle(qrtr_signals_to_fd_events),
315 event_handler: handler,
316 err_code: None,
317 };
318 let canceler = waiter
319 .wake_on_zircon_signals(
320 &inner.events,
321 fd_events_to_qrtr_signals(events),
322 signal_handler,
323 )
324 .unwrap();
325 WaitCanceler::new_port(canceler)
326 }
327
328 fn query_events(
329 &self,
330 _locked: &mut Locked<FileOpsCore>,
331 _socket: &Socket,
332 _current_task: &CurrentTask,
333 ) -> Result<FdEvents, Errno> {
334 let inner = self.connecting_lock()?;
335 let signals = inner
336 .events
337 .as_handle_ref()
338 .wait_one(zx::Signals::all(), zx::MonotonicInstant::INFINITE_PAST)
339 .map_err(|e| from_status_like_fdio!(e))?;
340 Ok(qrtr_signals_to_fd_events(signals))
341 }
342
343 fn shutdown(
344 &self,
345 _locked: &mut Locked<FileOpsCore>,
346 _socket: &Socket,
347 _how: SocketShutdownFlags,
348 ) -> Result<(), Errno> {
349 self.close();
350 Ok(())
351 }
352
353 fn close(
354 &self,
355 _locked: &mut Locked<FileOpsCore>,
356 _current_task: &CurrentTask,
357 _socket: &Socket,
358 ) {
359 self.close();
360 }
361
362 fn getsockname(
363 &self,
364 _locked: &mut Locked<FileOpsCore>,
365 _socket: &Socket,
366 ) -> Result<SocketAddress, Errno> {
367 let name = self.connecting_lock()?.bound_addr()?;
368 Ok(SocketAddress::Qipcrtr(name.as_bytes().to_vec()))
369 }
370
371 fn getpeername(
372 &self,
373 _locked: &mut Locked<FileOpsCore>,
374 _socket: &Socket,
375 ) -> Result<SocketAddress, Errno> {
376 let peer = self.connecting_lock()?.peer.ok_or_else(|| errno!(ENOTCONN))?;
377 Ok(SocketAddress::Qipcrtr(peer.as_bytes().to_vec()))
378 }
379
380 fn setsockopt(
381 &self,
382 _locked: &mut Locked<FileOpsCore>,
383 _socket: &Socket,
384 current_task: &CurrentTask,
385 level: u32,
386 optname: u32,
387 optval: SockOptValue,
388 ) -> Result<(), Errno> {
389 let mut inner = self.connecting_lock()?;
390 match level {
391 SOL_SOCKET => match optname {
392 SO_SNDBUF => {
393 let requested_capacity: socklen_t = optval.read(current_task)?;
394 let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
397 let capacity = capacity.clamp(SEND_BUF_MIN_SIZE, SEND_BUF_MAX_SIZE);
399 inner.send_buf_size = capacity;
400 }
401 SO_RCVBUF => {
402 let requested_capacity: socklen_t = optval.read(current_task)?;
403 let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
406 let capacity = capacity.clamp(RECV_BUF_MIN_SIZE, RECV_BUF_MAX_SIZE);
408 inner.recv_buf_size = capacity;
409 }
410 _ => return error!(ENOSYS),
411 },
412 _ => return error!(ENOSYS),
413 }
414
415 Ok(())
416 }
417
418 fn getsockopt(
419 &self,
420 _locked: &mut Locked<FileOpsCore>,
421 _socket: &Socket,
422 _current_task: &CurrentTask,
423 level: u32,
424 optname: u32,
425 _optlen: u32,
426 ) -> Result<Vec<u8>, Errno> {
427 let inner = self.connecting_lock()?;
428 Ok(match level {
429 SOL_SOCKET => match optname {
430 SO_SNDBUF => (inner.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
431 SO_RCVBUF => (inner.recv_buf_size as socklen_t).to_ne_bytes().to_vec(),
432 _ => return error!(ENOSYS),
433 },
434 _ => vec![],
435 })
436 }
437}
438
439fn extract_qrtr_sockaddr(addr: &SocketAddress) -> Result<sockaddr_qrtr, Errno> {
442 match addr {
443 SocketAddress::Qipcrtr(bytes) => sockaddr_qrtr::read_from_prefix(bytes.as_bytes())
444 .map(|(addr, _)| addr)
445 .map_err(|e| errno!(EINVAL, e)),
446 _ => error!(EINVAL),
447 }
448}
449
450fn pack_qrtr_sockaddr(node: u32, port: u32) -> SocketAddress {
452 let addr =
453 sockaddr_qrtr { sq_family: AF_QIPCRTR, sq_node: node, sq_port: port, ..Default::default() };
454 SocketAddress::Qipcrtr(addr.as_bytes().into())
455}
456
457fn qrtr_error_to_errno(e: fqrtr::Error) -> Errno {
459 match e {
460 fqrtr::Error::InternalError => errno!(EINVAL),
461 fqrtr::Error::AlreadyPending => errno!(EBUSY),
462 fqrtr::Error::RemoteNodeUnavailable => errno!(ECONNRESET),
463 fqrtr::Error::AlreadyBound => errno!(EADDRINUSE),
464 fqrtr::Error::NotSupported => errno!(ENOTSUP),
465 fqrtr::Error::WouldBlock => errno!(EAGAIN),
466 fqrtr::Error::NoResources => errno!(ENOMEM),
467 fqrtr::Error::InvalidArgs => errno!(EINVAL),
468 _ => errno!(EINVAL),
469 }
470}
471
472fn fd_events_to_qrtr_signals(events: FdEvents) -> zx::Signals {
474 let mut signals = zx::Signals::empty();
475 if events.contains(FdEvents::POLLIN) {
476 signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE);
477 }
478 if events.contains(FdEvents::POLLOUT) {
479 signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE);
480 }
481
482 signals |= zx::Signals::EVENTPAIR_PEER_CLOSED;
484 signals
485}
486
487fn qrtr_signals_to_fd_events(signals: zx::Signals) -> FdEvents {
489 let mut events = FdEvents::empty();
490 if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)) {
491 events |= FdEvents::POLLIN;
492 }
493 if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)) {
494 events |= FdEvents::POLLOUT;
495 }
496 if signals.contains(zx::Signals::EVENTPAIR_PEER_CLOSED) {
497 events |= FdEvents::POLLHUP;
498 }
499 events
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::testing::spawn_kernel_and_run;
506 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
507 use crate::vfs::socket::{SocketDomain, SocketProtocol, SocketType};
508 use fidl::endpoints::create_sync_proxy;
509 use futures::StreamExt;
510
511 fn mock_qipcrtr_socket()
516 -> (QipcrtrSocket, fidl::endpoints::ServerEnd<fqrtr::QrtrClientConnectionMarker>) {
517 let (proxy, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
518
519 let (events, _) = zx::EventPair::create();
521
522 let inner = QipcrtrSocketInner {
523 proxy,
524 events,
525 peer: None,
526 send_buf_size: SEND_BUF_DEFAULT_SIZE,
527 recv_buf_size: RECV_BUF_DEFAULT_SIZE,
528 };
529
530 (QipcrtrSocket { inner: Mutex::new(Some(inner)) }, server_end)
531 }
532
533 #[::fuchsia::test]
534 async fn test_qipcrtr_socket_new() {
535 spawn_kernel_and_run(async |locked, current_task| {
536 let _kernel = current_task.kernel();
537 let _socket = Socket::new(
544 locked,
545 ¤t_task,
546 SocketDomain::Qipcrtr,
547 SocketType::Datagram,
548 SocketProtocol::default(),
549 false,
550 )
551 .expect("Failed to create socket.");
552 })
553 .await;
554 }
555
556 #[::fuchsia::test]
557 async fn test_qipcrtr_sockopt() {
558 spawn_kernel_and_run(async |locked, current_task| {
559 let socket = mock_qipcrtr_socket();
560 let socket_obj = Socket::new_with_ops_and_info(
561 Box::new(socket.0),
562 SocketDomain::Qipcrtr,
563 SocketType::Datagram,
564 SocketProtocol::default(),
565 );
566 let _server_end = socket.1;
567
568 let sndbuf =
570 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
571 let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
572 assert_eq!(sndbuf_val, SEND_BUF_DEFAULT_SIZE as u32);
573
574 let new_sndbuf: u32 = 4096;
575 socket_obj
576 .setsockopt(
577 locked,
578 ¤t_task,
579 SOL_SOCKET,
580 SO_SNDBUF,
581 SockOptValue::from(new_sndbuf.as_bytes().to_vec()),
582 )
583 .unwrap();
584
585 let sndbuf =
586 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
587 let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
588 assert_eq!(sndbuf_val, new_sndbuf * 2);
590
591 let rcvbuf =
593 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
594 let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
595 assert_eq!(rcvbuf_val, RECV_BUF_DEFAULT_SIZE as u32);
596
597 let new_rcvbuf: u32 = 1024;
598 socket_obj
599 .setsockopt(
600 locked,
601 ¤t_task,
602 SOL_SOCKET,
603 SO_RCVBUF,
604 SockOptValue::from(new_rcvbuf.as_bytes().to_vec()),
605 )
606 .unwrap();
607
608 let rcvbuf =
609 socket_obj.getsockopt(locked, ¤t_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
610 let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
611 assert_eq!(rcvbuf_val, new_rcvbuf * 2);
613 })
614 .await;
615 }
616
617 #[::fuchsia::test]
618 async fn test_qipcrtr_sockname() {
619 let (socket_inner, server_end) = mock_qipcrtr_socket();
620 std::thread::spawn(move || {
622 let mut executor = fuchsia_async::LocalExecutor::default();
623 executor.run_singlethreaded(async move {
624 let mut stream = server_end.into_stream();
625 while let Some(Ok(request)) = stream.next().await {
626 match request {
627 fqrtr::QrtrClientConnectionRequest::GetNodeId { responder, .. } => {
628 let _ = responder.send(123).unwrap();
629 }
630 fqrtr::QrtrClientConnectionRequest::GetPortId { responder, .. } => {
631 let _ = responder.send(456).unwrap();
632 }
633 fqrtr::QrtrClientConnectionRequest::CloseConnection {
634 responder, ..
635 } => {
636 let _ = responder.send();
637 }
638 _ => panic!("Unexpected request: {:?}", request),
639 }
640 }
641 });
642 });
643
644 spawn_kernel_and_run(async |locked, _current_task| {
645 let socket_obj = Socket::new_with_ops_and_info(
646 Box::new(socket_inner),
647 SocketDomain::Qipcrtr,
648 SocketType::Datagram,
649 SocketProtocol::default(),
650 );
651
652 let addr = socket_obj.getsockname(locked).unwrap();
653 let qrtr_addr = extract_qrtr_sockaddr(&addr).unwrap();
654 assert_eq!(qrtr_addr.sq_node, 123);
655 assert_eq!(qrtr_addr.sq_port, 456);
656
657 let peer_addr = sockaddr_qrtr {
659 sq_family: AF_QIPCRTR,
660 sq_node: 10,
661 sq_port: 20,
662 ..Default::default()
663 };
664 socket_obj
665 .downcast_socket::<QipcrtrSocket>()
666 .unwrap()
667 .inner
668 .lock()
669 .as_mut()
670 .unwrap()
671 .peer = Some(peer_addr);
672
673 let peer = socket_obj.getpeername(locked).unwrap();
674 let peer_qrtr = extract_qrtr_sockaddr(&peer).unwrap();
675 assert_eq!(peer_qrtr.sq_node, 10);
676 assert_eq!(peer_qrtr.sq_port, 20);
677 })
678 .await;
679 }
680
681 #[::fuchsia::test]
682 async fn test_qipcrtr_read_write() {
683 let (socket_inner, server_end) = mock_qipcrtr_socket();
684 std::thread::spawn(move || {
685 let mut executor = fuchsia_async::LocalExecutor::default();
686 executor.run_singlethreaded(async move {
687 let mut stream = server_end.into_stream();
688 while let Some(Ok(request)) = stream.next().await {
689 match request {
690 fqrtr::QrtrClientConnectionRequest::Write {
691 dst_node_id,
692 dst_port,
693 data,
694 responder,
695 ..
696 } => {
697 assert_eq!(dst_node_id, 10);
698 assert_eq!(dst_port, 20);
699 assert_eq!(data, b"hello");
700 let _ = responder.send(Ok(())).unwrap();
701 }
702 fqrtr::QrtrClientConnectionRequest::Read { responder, .. } => {
703 let _ = responder.send(Ok((5, 15, b"world"))).unwrap();
704 }
705 fqrtr::QrtrClientConnectionRequest::CloseConnection {
706 responder, ..
707 } => {
708 let _ = responder.send();
709 }
710 _ => panic!("Unexpected request: {:?}", request),
711 }
712 }
713 });
714 });
715
716 spawn_kernel_and_run(async |locked, current_task| {
717 let socket_obj = Socket::new_with_ops_and_info(
718 Box::new(socket_inner),
719 SocketDomain::Qipcrtr,
720 SocketType::Datagram,
721 SocketProtocol::default(),
722 );
723 let peer_addr = sockaddr_qrtr {
725 sq_family: AF_QIPCRTR,
726 sq_node: 10,
727 sq_port: 20,
728 ..Default::default()
729 };
730 socket_obj
731 .downcast_socket::<QipcrtrSocket>()
732 .unwrap()
733 .inner
734 .lock()
735 .as_mut()
736 .unwrap()
737 .peer = Some(peer_addr);
738
739 let mut input = VecInputBuffer::new(b"hello");
741 let written = socket_obj
742 .write(locked, ¤t_task, &mut input, &mut None, &mut vec![])
743 .unwrap();
744 assert_eq!(written, 5);
745
746 let mut output = VecOutputBuffer::new(100);
748 let info = socket_obj
749 .read(locked, ¤t_task, &mut output, SocketMessageFlags::empty())
750 .unwrap();
751 assert_eq!(info.bytes_read, 5);
752 assert_eq!(output.data(), b"world");
753
754 let addr = extract_qrtr_sockaddr(&info.address.unwrap()).unwrap();
755 assert_eq!(addr.sq_node, 5);
756 assert_eq!(addr.sq_port, 15);
757 })
758 .await;
759 }
760}