starnix_core/vfs/socket/
socket_qipcrtr.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task::{
6    CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
7};
8use crate::vfs::buffers::{AncillaryData, InputBuffer, MessageReadInfo, OutputBuffer};
9use crate::vfs::socket::{
10    SockOptValue, Socket, SocketAddress, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer,
11    SocketShutdownFlags, SocketType,
12};
13use fidl::endpoints::create_sync_proxy;
14use fidl_fuchsia_hardware_qualcomm_router as fqrtr;
15use starnix_logging::{log_warn, track_stub};
16use starnix_sync::{FileOpsCore, Locked, MappedMutexGuard, Mutex, MutexGuard};
17use starnix_uapi::errors::{Errno, from_status_like_fdio};
18use starnix_uapi::vfs::FdEvents;
19use starnix_uapi::{
20    AF_QIPCRTR, SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, errno, error, sockaddr_qrtr, socklen_t, ucred,
21};
22use zerocopy::{FromBytes, IntoBytes};
23
24// From socket(7).
25pub const SEND_BUF_MIN_SIZE: usize = 2048;
26pub const SEND_BUF_MAX_SIZE: usize = 1 << 31;
27pub const SEND_BUF_DEFAULT_SIZE: usize = 2048;
28
29// From socket(7).
30pub const RECV_BUF_MIN_SIZE: usize = 256;
31pub const RECV_BUF_MAX_SIZE: usize = 1 << 31;
32pub const RECV_BUF_DEFAULT_SIZE: usize = 256;
33
34pub struct QipcrtrSocket {
35    inner: Mutex<Option<QipcrtrSocketInner>>,
36}
37
38struct QipcrtrSocketInner {
39    /// The proxy representing the socket in the QRTR driver.
40    proxy: fqrtr::QrtrClientConnectionSynchronousProxy,
41
42    /// The event pair representing the readable and writable signals.
43    events: zx::EventPair,
44
45    /// The peer for a connected socket, which is the default address to send messages to when no
46    /// destination is given.
47    peer: Option<sockaddr_qrtr>,
48
49    /// The socket's send buffer size.
50    ///
51    /// This value is only used to serve getsockopt calls for `SO_SNDBUF`. It does not yet enforce
52    /// a limit on the buffer size.
53    /// TODO(https://fxbug.dev/478337980): Limit the size of the send buffer.
54    send_buf_size: usize,
55
56    /// The socket's receive buffer size.
57    ///
58    /// This value is only used to serve getsockopt calls for `SO_RCVBUF`. It does not yet enforce
59    /// a limit on the buffer size.
60    /// TODO(https://fxbug.dev/478337980): Limit the size of the receive buffer.
61    recv_buf_size: usize,
62}
63
64impl QipcrtrSocket {
65    pub fn new(_socket_type: SocketType) -> Self {
66        Self { inner: Mutex::new(None) }
67    }
68
69    /// Locks and returns the inner state of the socket. If the socket is not connected to the
70    /// driver, a connection will be established, binding to an ephemeral port number.
71    fn connecting_lock(&self) -> Result<MappedMutexGuard<'_, QipcrtrSocketInner>, Errno> {
72        let mut inner = self.inner.lock();
73        if inner.is_none() {
74            *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
75                blocking: Some(false),
76                ..Default::default()
77            })?);
78        }
79        Ok(MutexGuard::map(inner, |inner| inner.as_mut().unwrap()))
80    }
81
82    fn close(&self) {
83        *self.inner.lock() = None;
84    }
85}
86
87impl QipcrtrSocketInner {
88    fn new(options: fqrtr::ConnectionOptions) -> Result<Self, Errno> {
89        let connector =
90            fuchsia_component::client::connect_to_protocol_sync::<fqrtr::QrtrConnectorMarker>()
91                .map_err(|e| errno!(ENETUNREACH, e))?;
92
93        let (client_end, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
94        connector
95            .get_connection(&options, server_end, zx::MonotonicInstant::INFINITE)
96            .map_err(|e| errno!(ENETUNREACH, e))?
97            .map_err(qrtr_error_to_errno)?;
98
99        let proxy = fqrtr::QrtrClientConnectionSynchronousProxy::new(client_end.into_channel());
100        let events = proxy
101            .get_signals(zx::MonotonicInstant::INFINITE)
102            .map_err(|e| errno!(ENETUNREACH, e))?;
103
104        Ok(Self {
105            proxy,
106            events,
107            peer: None,
108            send_buf_size: SEND_BUF_DEFAULT_SIZE,
109            recv_buf_size: RECV_BUF_DEFAULT_SIZE,
110        })
111    }
112
113    /// Returns the [`sockaddr_qrtr`] of this connection.
114    fn bound_addr(&self) -> Result<sockaddr_qrtr, Errno> {
115        let addr = sockaddr_qrtr {
116            sq_family: AF_QIPCRTR,
117            sq_node: self
118                .proxy
119                .get_node_id(zx::MonotonicInstant::INFINITE)
120                .map_err(|e| errno!(EINVAL, e))?,
121            sq_port: self
122                .proxy
123                .get_port_id(zx::MonotonicInstant::INFINITE)
124                .map_err(|e| errno!(EINVAL, e))?,
125            ..Default::default()
126        };
127        Ok(addr)
128    }
129}
130
131impl Drop for QipcrtrSocketInner {
132    fn drop(&mut self) {
133        if let Err(e) = self.proxy.close_connection(zx::MonotonicInstant::INFINITE) {
134            log_warn!("Failed to close QRTR connection: {e:?}");
135        }
136    }
137}
138
139impl SocketOps for QipcrtrSocket {
140    fn connect(
141        &self,
142        _locked: &mut Locked<FileOpsCore>,
143        _socket: &SocketHandle,
144        _current_task: &CurrentTask,
145        peer: SocketPeer,
146    ) -> Result<(), Errno> {
147        let peer = match peer {
148            SocketPeer::Address(addr) => extract_qrtr_sockaddr(&addr)?,
149            _ => {
150                return error!(EINVAL);
151            }
152        };
153
154        let mut inner = self.inner.lock();
155        if inner.is_some() {
156            return error!(EISCONN);
157        }
158
159        // Establish a connection without a specific port number. The driver will automatically
160        // assign one, resulting in a bound socket.
161        let mut new_inner = QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
162            blocking: Some(false),
163            ..Default::default()
164        })?;
165        new_inner.peer = Some(peer);
166
167        *inner = Some(new_inner);
168        Ok(())
169    }
170
171    fn listen(
172        &self,
173        _locked: &mut Locked<FileOpsCore>,
174        _socket: &Socket,
175        _backlog: i32,
176        _credentials: ucred,
177    ) -> Result<(), Errno> {
178        error!(ENOTSUP)
179    }
180
181    fn accept(
182        &self,
183        _locked: &mut Locked<FileOpsCore>,
184        _socket: &Socket,
185        _current_task: &CurrentTask,
186    ) -> Result<SocketHandle, Errno> {
187        error!(ENOTSUP)
188    }
189
190    fn bind(
191        &self,
192        _locked: &mut Locked<FileOpsCore>,
193        _socket: &Socket,
194        _current_task: &CurrentTask,
195        socket_address: SocketAddress,
196    ) -> Result<(), Errno> {
197        let addr = extract_qrtr_sockaddr(&socket_address)?;
198
199        let mut inner = self.inner.lock();
200        if inner.is_some() {
201            return error!(EINVAL);
202        }
203
204        // Establish a connection with the specified port number.
205        *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
206            blocking: Some(false),
207            port: Some(addr.sq_port),
208            ..Default::default()
209        })?);
210
211        Ok(())
212    }
213
214    fn read(
215        &self,
216        _locked: &mut Locked<FileOpsCore>,
217        _socket: &Socket,
218        _current_task: &CurrentTask,
219        data: &mut dyn OutputBuffer,
220        flags: SocketMessageFlags,
221    ) -> Result<MessageReadInfo, Errno> {
222        if flags.contains(SocketMessageFlags::PEEK) {
223            track_stub!(
224                TODO("https://fxbug.dev/388082019"),
225                "SocketMessageFlags::PEEK is unsupported"
226            );
227            return error!(EINVAL);
228        }
229
230        let inner = self.connecting_lock()?;
231
232        if flags.contains(SocketMessageFlags::DONTWAIT) {
233            match inner.events.wait_one(
234                zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)
235                    | zx::Signals::EVENTPAIR_PEER_CLOSED,
236                zx::MonotonicInstant::INFINITE_PAST,
237            ) {
238                zx::WaitResult::Ok(_) => {}
239                zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
240                zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
241            }
242        }
243
244        let (src_node, src_port, src_data) = inner
245            .proxy
246            .read(zx::MonotonicInstant::INFINITE)
247            .map_err(|e| errno!(ECONNRESET, e))?
248            .map_err(qrtr_error_to_errno)?;
249
250        let bytes_read = data.write(src_data.as_bytes())?;
251        Ok(MessageReadInfo {
252            bytes_read,
253            message_length: src_data.len(),
254            address: Some(pack_qrtr_sockaddr(src_node, src_port)),
255            ..Default::default()
256        })
257    }
258
259    fn write(
260        &self,
261        _locked: &mut Locked<FileOpsCore>,
262        _socket: &Socket,
263        _current_task: &CurrentTask,
264        data: &mut dyn InputBuffer,
265        dest_address: &mut Option<SocketAddress>,
266        _ancillary_data: &mut Vec<AncillaryData>,
267    ) -> Result<usize, Errno> {
268        let inner = self.connecting_lock()?;
269
270        // If no destination address is specified, send to the peer address, which is set if
271        // connect() is called.
272        let dest = match dest_address {
273            Some(addr) => extract_qrtr_sockaddr(addr)?,
274            None => inner.peer.ok_or_else(|| errno!(EDESTADDRREQ))?,
275        };
276
277        match inner.events.wait_one(
278            zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)
279                | zx::Signals::EVENTPAIR_PEER_CLOSED,
280            zx::MonotonicInstant::INFINITE_PAST,
281        ) {
282            zx::WaitResult::Ok(_) => {}
283            zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
284            zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
285        }
286
287        let data_written = data.read_all()?;
288        let _ = inner
289            .proxy
290            .write(
291                dest.sq_node,
292                dest.sq_port,
293                data_written.as_ref(),
294                zx::MonotonicInstant::INFINITE,
295            )
296            .map_err(|e| errno!(ECONNRESET, e))?
297            .map_err(qrtr_error_to_errno)?;
298        Ok(data_written.len())
299    }
300
301    fn wait_async(
302        &self,
303        _locked: &mut Locked<FileOpsCore>,
304        _socket: &Socket,
305        _current_task: &CurrentTask,
306        waiter: &Waiter,
307        events: FdEvents,
308        handler: EventHandler,
309    ) -> WaitCanceler {
310        let Ok(inner) = self.connecting_lock() else {
311            return WaitCanceler::new_noop();
312        };
313        let signal_handler = SignalHandler {
314            inner: SignalHandlerInner::ZxHandle(qrtr_signals_to_fd_events),
315            event_handler: handler,
316            err_code: None,
317        };
318        let canceler = waiter
319            .wake_on_zircon_signals(
320                &inner.events,
321                fd_events_to_qrtr_signals(events),
322                signal_handler,
323            )
324            .unwrap();
325        WaitCanceler::new_port(canceler)
326    }
327
328    fn query_events(
329        &self,
330        _locked: &mut Locked<FileOpsCore>,
331        _socket: &Socket,
332        _current_task: &CurrentTask,
333    ) -> Result<FdEvents, Errno> {
334        let inner = self.connecting_lock()?;
335        let signals = inner
336            .events
337            .as_handle_ref()
338            .wait_one(zx::Signals::all(), zx::MonotonicInstant::INFINITE_PAST)
339            .map_err(|e| from_status_like_fdio!(e))?;
340        Ok(qrtr_signals_to_fd_events(signals))
341    }
342
343    fn shutdown(
344        &self,
345        _locked: &mut Locked<FileOpsCore>,
346        _socket: &Socket,
347        _how: SocketShutdownFlags,
348    ) -> Result<(), Errno> {
349        self.close();
350        Ok(())
351    }
352
353    fn close(
354        &self,
355        _locked: &mut Locked<FileOpsCore>,
356        _current_task: &CurrentTask,
357        _socket: &Socket,
358    ) {
359        self.close();
360    }
361
362    fn getsockname(
363        &self,
364        _locked: &mut Locked<FileOpsCore>,
365        _socket: &Socket,
366    ) -> Result<SocketAddress, Errno> {
367        let name = self.connecting_lock()?.bound_addr()?;
368        Ok(SocketAddress::Qipcrtr(name.as_bytes().to_vec()))
369    }
370
371    fn getpeername(
372        &self,
373        _locked: &mut Locked<FileOpsCore>,
374        _socket: &Socket,
375    ) -> Result<SocketAddress, Errno> {
376        let peer = self.connecting_lock()?.peer.ok_or_else(|| errno!(ENOTCONN))?;
377        Ok(SocketAddress::Qipcrtr(peer.as_bytes().to_vec()))
378    }
379
380    fn setsockopt(
381        &self,
382        _locked: &mut Locked<FileOpsCore>,
383        _socket: &Socket,
384        current_task: &CurrentTask,
385        level: u32,
386        optname: u32,
387        optval: SockOptValue,
388    ) -> Result<(), Errno> {
389        let mut inner = self.connecting_lock()?;
390        match level {
391            SOL_SOCKET => match optname {
392                SO_SNDBUF => {
393                    let requested_capacity: socklen_t = optval.read(current_task)?;
394                    // SO_SNDBUF doubles the requested capacity to leave space for bookkeeping.
395                    // See https://man7.org/linux/man-pages/man7/socket.7.html
396                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
397                    // TODO(https://fxbug.dev/322907334): Clamp to `wmem_max`.
398                    let capacity = capacity.clamp(SEND_BUF_MIN_SIZE, SEND_BUF_MAX_SIZE);
399                    inner.send_buf_size = capacity;
400                }
401                SO_RCVBUF => {
402                    let requested_capacity: socklen_t = optval.read(current_task)?;
403                    // SO_RCVBUF doubles the requested capacity to leave space for bookkeeping.
404                    // See https://man7.org/linux/man-pages/man7/socket.7.html
405                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
406                    // TODO(https://fxbug.dev/322906968): Clamp to `rmem_max`.
407                    let capacity = capacity.clamp(RECV_BUF_MIN_SIZE, RECV_BUF_MAX_SIZE);
408                    inner.recv_buf_size = capacity;
409                }
410                _ => return error!(ENOSYS),
411            },
412            _ => return error!(ENOSYS),
413        }
414
415        Ok(())
416    }
417
418    fn getsockopt(
419        &self,
420        _locked: &mut Locked<FileOpsCore>,
421        _socket: &Socket,
422        _current_task: &CurrentTask,
423        level: u32,
424        optname: u32,
425        _optlen: u32,
426    ) -> Result<Vec<u8>, Errno> {
427        let inner = self.connecting_lock()?;
428        Ok(match level {
429            SOL_SOCKET => match optname {
430                SO_SNDBUF => (inner.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
431                SO_RCVBUF => (inner.recv_buf_size as socklen_t).to_ne_bytes().to_vec(),
432                _ => return error!(ENOSYS),
433            },
434            _ => vec![],
435        })
436    }
437}
438
439/// Returns the [`sockaddr_qrtr`] within a [`SocketAddress`] or `EINVAL` if the address is not a
440/// QRTR address.
441fn extract_qrtr_sockaddr(addr: &SocketAddress) -> Result<sockaddr_qrtr, Errno> {
442    match addr {
443        SocketAddress::Qipcrtr(bytes) => sockaddr_qrtr::read_from_prefix(bytes.as_bytes())
444            .map(|(addr, _)| addr)
445            .map_err(|e| errno!(EINVAL, e)),
446        _ => error!(EINVAL),
447    }
448}
449
450/// Returns the [`SocketAddress`] representing a given node and port number.
451fn pack_qrtr_sockaddr(node: u32, port: u32) -> SocketAddress {
452    let addr =
453        sockaddr_qrtr { sq_family: AF_QIPCRTR, sq_node: node, sq_port: port, ..Default::default() };
454    SocketAddress::Qipcrtr(addr.as_bytes().into())
455}
456
457/// Maps a [`fqrtr::Error`] to an [`Errno`]. This mapping is not one-to-one.
458fn qrtr_error_to_errno(e: fqrtr::Error) -> Errno {
459    match e {
460        fqrtr::Error::InternalError => errno!(EINVAL),
461        fqrtr::Error::AlreadyPending => errno!(EBUSY),
462        fqrtr::Error::RemoteNodeUnavailable => errno!(ECONNRESET),
463        fqrtr::Error::AlreadyBound => errno!(EADDRINUSE),
464        fqrtr::Error::NotSupported => errno!(ENOTSUP),
465        fqrtr::Error::WouldBlock => errno!(EAGAIN),
466        fqrtr::Error::NoResources => errno!(ENOMEM),
467        fqrtr::Error::InvalidArgs => errno!(EINVAL),
468        _ => errno!(EINVAL),
469    }
470}
471
472/// Maps [`FdEvents`] to [`zx::Signals`] for a QRTR connection.
473fn fd_events_to_qrtr_signals(events: FdEvents) -> zx::Signals {
474    let mut signals = zx::Signals::empty();
475    if events.contains(FdEvents::POLLIN) {
476        signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE);
477    }
478    if events.contains(FdEvents::POLLOUT) {
479        signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE);
480    }
481
482    // Always wait for the peer to be closed, which can generate POLLHUP.
483    signals |= zx::Signals::EVENTPAIR_PEER_CLOSED;
484    signals
485}
486
487/// Maps [`zx::Signals`] to [`FdEvents`] for a QRTR connection.
488fn qrtr_signals_to_fd_events(signals: zx::Signals) -> FdEvents {
489    let mut events = FdEvents::empty();
490    if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)) {
491        events |= FdEvents::POLLIN;
492    }
493    if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)) {
494        events |= FdEvents::POLLOUT;
495    }
496    if signals.contains(zx::Signals::EVENTPAIR_PEER_CLOSED) {
497        events |= FdEvents::POLLHUP;
498    }
499    events
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::testing::spawn_kernel_and_run;
506    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
507    use crate::vfs::socket::{SocketDomain, SocketProtocol, SocketType};
508    use fidl::endpoints::create_sync_proxy;
509    use futures::StreamExt;
510
511    /// Creates a `QipcrtrSocket` with a mock inner state.
512    ///
513    /// The mock state is connected to a `QrtrClientConnection` proxy, and the stream for that
514    /// proxy is returned to allow the test to drive the mock FIDL behavior.
515    fn mock_qipcrtr_socket()
516    -> (QipcrtrSocket, fidl::endpoints::ServerEnd<fqrtr::QrtrClientConnectionMarker>) {
517        let (proxy, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
518
519        // We need an event pair for the socket.
520        let (events, _) = zx::EventPair::create();
521
522        let inner = QipcrtrSocketInner {
523            proxy,
524            events,
525            peer: None,
526            send_buf_size: SEND_BUF_DEFAULT_SIZE,
527            recv_buf_size: RECV_BUF_DEFAULT_SIZE,
528        };
529
530        (QipcrtrSocket { inner: Mutex::new(Some(inner)) }, server_end)
531    }
532
533    #[::fuchsia::test]
534    async fn test_qipcrtr_socket_new() {
535        spawn_kernel_and_run(async |locked, current_task| {
536            let _kernel = current_task.kernel();
537            // This test just checks basic creation without panic, but for QIPCRTR it tries
538            // to connect to the global service, which might fail in test env if not mocked
539            // correctly or if we rely on real service. The existing test `test_qipcrtr_socket_new`
540            // calls `Socket::new` which calls `QipcrtrSocket::new`.
541            // `QipcrtrSocket::new` creates a None inner, so it doesn't connect yet.
542            // Connection happens on first use or explicit connect.
543            let _socket = Socket::new(
544                locked,
545                &current_task,
546                SocketDomain::Qipcrtr,
547                SocketType::Datagram,
548                SocketProtocol::default(),
549                /* kernel_private = */ false,
550            )
551            .expect("Failed to create socket.");
552        })
553        .await;
554    }
555
556    #[::fuchsia::test]
557    async fn test_qipcrtr_sockopt() {
558        spawn_kernel_and_run(async |locked, current_task| {
559            let socket = mock_qipcrtr_socket();
560            let socket_obj = Socket::new_with_ops_and_info(
561                Box::new(socket.0),
562                SocketDomain::Qipcrtr,
563                SocketType::Datagram,
564                SocketProtocol::default(),
565            );
566            let _server_end = socket.1;
567
568            // Test SO_SNDBUF
569            let sndbuf =
570                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
571            let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
572            assert_eq!(sndbuf_val, SEND_BUF_DEFAULT_SIZE as u32);
573
574            let new_sndbuf: u32 = 4096;
575            socket_obj
576                .setsockopt(
577                    locked,
578                    &current_task,
579                    SOL_SOCKET,
580                    SO_SNDBUF,
581                    SockOptValue::from(new_sndbuf.as_bytes().to_vec()),
582                )
583                .unwrap();
584
585            let sndbuf =
586                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
587            let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
588            // Setsockopt doubles the value.
589            assert_eq!(sndbuf_val, new_sndbuf * 2);
590
591            // Test SO_RCVBUF
592            let rcvbuf =
593                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
594            let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
595            assert_eq!(rcvbuf_val, RECV_BUF_DEFAULT_SIZE as u32);
596
597            let new_rcvbuf: u32 = 1024;
598            socket_obj
599                .setsockopt(
600                    locked,
601                    &current_task,
602                    SOL_SOCKET,
603                    SO_RCVBUF,
604                    SockOptValue::from(new_rcvbuf.as_bytes().to_vec()),
605                )
606                .unwrap();
607
608            let rcvbuf =
609                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
610            let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
611            // Setsockopt doubles the value.
612            assert_eq!(rcvbuf_val, new_rcvbuf * 2);
613        })
614        .await;
615    }
616
617    #[::fuchsia::test]
618    async fn test_qipcrtr_sockname() {
619        let (socket_inner, server_end) = mock_qipcrtr_socket();
620        // Handle get_node_id and get_port_id requests
621        std::thread::spawn(move || {
622            let mut executor = fuchsia_async::LocalExecutor::default();
623            executor.run_singlethreaded(async move {
624                let mut stream = server_end.into_stream();
625                while let Some(Ok(request)) = stream.next().await {
626                    match request {
627                        fqrtr::QrtrClientConnectionRequest::GetNodeId { responder, .. } => {
628                            let _ = responder.send(123).unwrap();
629                        }
630                        fqrtr::QrtrClientConnectionRequest::GetPortId { responder, .. } => {
631                            let _ = responder.send(456).unwrap();
632                        }
633                        fqrtr::QrtrClientConnectionRequest::CloseConnection {
634                            responder, ..
635                        } => {
636                            let _ = responder.send();
637                        }
638                        _ => panic!("Unexpected request: {:?}", request),
639                    }
640                }
641            });
642        });
643
644        spawn_kernel_and_run(async |locked, _current_task| {
645            let socket_obj = Socket::new_with_ops_and_info(
646                Box::new(socket_inner),
647                SocketDomain::Qipcrtr,
648                SocketType::Datagram,
649                SocketProtocol::default(),
650            );
651
652            let addr = socket_obj.getsockname(locked).unwrap();
653            let qrtr_addr = extract_qrtr_sockaddr(&addr).unwrap();
654            assert_eq!(qrtr_addr.sq_node, 123);
655            assert_eq!(qrtr_addr.sq_port, 456);
656
657            // Set peer
658            let peer_addr = sockaddr_qrtr {
659                sq_family: AF_QIPCRTR,
660                sq_node: 10,
661                sq_port: 20,
662                ..Default::default()
663            };
664            socket_obj
665                .downcast_socket::<QipcrtrSocket>()
666                .unwrap()
667                .inner
668                .lock()
669                .as_mut()
670                .unwrap()
671                .peer = Some(peer_addr);
672
673            let peer = socket_obj.getpeername(locked).unwrap();
674            let peer_qrtr = extract_qrtr_sockaddr(&peer).unwrap();
675            assert_eq!(peer_qrtr.sq_node, 10);
676            assert_eq!(peer_qrtr.sq_port, 20);
677        })
678        .await;
679    }
680
681    #[::fuchsia::test]
682    async fn test_qipcrtr_read_write() {
683        let (socket_inner, server_end) = mock_qipcrtr_socket();
684        std::thread::spawn(move || {
685            let mut executor = fuchsia_async::LocalExecutor::default();
686            executor.run_singlethreaded(async move {
687                let mut stream = server_end.into_stream();
688                while let Some(Ok(request)) = stream.next().await {
689                    match request {
690                        fqrtr::QrtrClientConnectionRequest::Write {
691                            dst_node_id,
692                            dst_port,
693                            data,
694                            responder,
695                            ..
696                        } => {
697                            assert_eq!(dst_node_id, 10);
698                            assert_eq!(dst_port, 20);
699                            assert_eq!(data, b"hello");
700                            let _ = responder.send(Ok(())).unwrap();
701                        }
702                        fqrtr::QrtrClientConnectionRequest::Read { responder, .. } => {
703                            let _ = responder.send(Ok((5, 15, b"world"))).unwrap();
704                        }
705                        fqrtr::QrtrClientConnectionRequest::CloseConnection {
706                            responder, ..
707                        } => {
708                            let _ = responder.send();
709                        }
710                        _ => panic!("Unexpected request: {:?}", request),
711                    }
712                }
713            });
714        });
715
716        spawn_kernel_and_run(async |locked, current_task| {
717            let socket_obj = Socket::new_with_ops_and_info(
718                Box::new(socket_inner),
719                SocketDomain::Qipcrtr,
720                SocketType::Datagram,
721                SocketProtocol::default(),
722            );
723            // Connect to set default peer
724            let peer_addr = sockaddr_qrtr {
725                sq_family: AF_QIPCRTR,
726                sq_node: 10,
727                sq_port: 20,
728                ..Default::default()
729            };
730            socket_obj
731                .downcast_socket::<QipcrtrSocket>()
732                .unwrap()
733                .inner
734                .lock()
735                .as_mut()
736                .unwrap()
737                .peer = Some(peer_addr);
738
739            // Test Write
740            let mut input = VecInputBuffer::new(b"hello");
741            let written = socket_obj
742                .write(locked, &current_task, &mut input, &mut None, &mut vec![])
743                .unwrap();
744            assert_eq!(written, 5);
745
746            // Test Read
747            let mut output = VecOutputBuffer::new(100);
748            let info = socket_obj
749                .read(locked, &current_task, &mut output, SocketMessageFlags::empty())
750                .unwrap();
751            assert_eq!(info.bytes_read, 5);
752            assert_eq!(output.data(), b"world");
753
754            let addr = extract_qrtr_sockaddr(&info.address.unwrap()).unwrap();
755            assert_eq!(addr.sq_node, 5);
756            assert_eq!(addr.sq_port, 15);
757        })
758        .await;
759    }
760}