Skip to main content

starnix_core/vfs/socket/
socket_qipcrtr.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task::{
6    CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
7};
8use crate::vfs::buffers::{AncillaryData, InputBuffer, MessageReadInfo, OutputBuffer};
9use crate::vfs::socket::{
10    SockOptValue, Socket, SocketAddress, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer,
11    SocketShutdownFlags, SocketType,
12};
13use anyhow::Context;
14use fidl::endpoints::{SynchronousProxy, create_sync_proxy};
15use fidl_fuchsia_hardware_qualcomm_router as fqrtr;
16use starnix_logging::{log_warn, track_stub};
17use starnix_sync::{FileOpsCore, Locked, MappedMutexGuard, Mutex, MutexGuard};
18use starnix_uapi::errors::{Errno, from_status_like_fdio};
19use starnix_uapi::vfs::FdEvents;
20use starnix_uapi::{
21    AF_QIPCRTR, SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, errno, error, sockaddr_qrtr, socklen_t, ucred,
22};
23use zerocopy::{FromBytes, IntoBytes};
24
25const QRTR_CLIENT_SERVICE_DIRECTORY: &str = "/svc/fuchsia.hardware.qualcomm.router.ClientService";
26fn connect_to_connector() -> Result<fqrtr::QrtrConnectorSynchronousProxy, anyhow::Error> {
27    let mut dir = std::fs::read_dir(QRTR_CLIENT_SERVICE_DIRECTORY)
28        .context("Failed to read ClientService directory")?;
29    let entry = dir
30        .next()
31        .ok_or_else(|| anyhow::format_err!("Missing ClientService instance"))?
32        .context("Unable to read ClientService instance")?;
33    let path = entry
34        .path()
35        .join("qrtr_connector")
36        .into_os_string()
37        .into_string()
38        .map_err(|_| anyhow::format_err!("Failed to get qrtr_connector path"))?;
39
40    let (client_end, server_end) = zx::Channel::create();
41    fdio::service_connect(&path, server_end)?;
42    Ok(fqrtr::QrtrConnectorSynchronousProxy::from_channel(client_end))
43}
44
45// From socket(7).
46pub const SEND_BUF_MIN_SIZE: usize = 2048;
47pub const SEND_BUF_MAX_SIZE: usize = 1 << 31;
48pub const SEND_BUF_DEFAULT_SIZE: usize = 2048;
49
50// From socket(7).
51pub const RECV_BUF_MIN_SIZE: usize = 256;
52pub const RECV_BUF_MAX_SIZE: usize = 1 << 31;
53pub const RECV_BUF_DEFAULT_SIZE: usize = 256;
54
55pub struct QipcrtrSocket {
56    inner: Mutex<Option<QipcrtrSocketInner>>,
57}
58
59struct QipcrtrSocketInner {
60    /// The proxy representing the socket in the QRTR driver.
61    proxy: fqrtr::QrtrClientConnectionSynchronousProxy,
62
63    /// The event pair representing the readable and writable signals.
64    events: zx::EventPair,
65
66    /// The peer for a connected socket, which is the default address to send messages to when no
67    /// destination is given.
68    peer: Option<sockaddr_qrtr>,
69
70    /// The socket's send buffer size.
71    ///
72    /// This value is only used to serve getsockopt calls for `SO_SNDBUF`. It does not yet enforce
73    /// a limit on the buffer size.
74    /// TODO(https://fxbug.dev/478337980): Limit the size of the send buffer.
75    send_buf_size: usize,
76
77    /// The socket's receive buffer size.
78    ///
79    /// This value is only used to serve getsockopt calls for `SO_RCVBUF`. It does not yet enforce
80    /// a limit on the buffer size.
81    /// TODO(https://fxbug.dev/478337980): Limit the size of the receive buffer.
82    recv_buf_size: usize,
83}
84
85impl QipcrtrSocket {
86    pub fn new(_socket_type: SocketType) -> Self {
87        Self { inner: Mutex::new(None) }
88    }
89
90    /// Locks and returns the inner state of the socket. If the socket is not connected to the
91    /// driver, a connection will be established, binding to an ephemeral port number.
92    fn connecting_lock(&self) -> Result<MappedMutexGuard<'_, QipcrtrSocketInner>, Errno> {
93        let mut inner = self.inner.lock();
94        if inner.is_none() {
95            *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
96                blocking: Some(false),
97                ..Default::default()
98            })?);
99        }
100        Ok(MutexGuard::map(inner, |inner| inner.as_mut().unwrap()))
101    }
102
103    fn close(&self) {
104        *self.inner.lock() = None;
105    }
106}
107
108impl QipcrtrSocketInner {
109    fn new(options: fqrtr::ConnectionOptions) -> Result<Self, Errno> {
110        let connector = connect_to_connector().map_err(|e| errno!(ENETUNREACH, e))?;
111
112        let (client_end, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
113        connector
114            .get_connection(&options, server_end, zx::MonotonicInstant::INFINITE)
115            .map_err(|e| errno!(ENETUNREACH, e))?
116            .map_err(qrtr_error_to_errno)?;
117
118        let proxy = fqrtr::QrtrClientConnectionSynchronousProxy::new(client_end.into_channel());
119        let events = proxy
120            .get_signals(zx::MonotonicInstant::INFINITE)
121            .map_err(|e| errno!(ENETUNREACH, e))?;
122
123        Ok(Self {
124            proxy,
125            events,
126            peer: None,
127            send_buf_size: SEND_BUF_DEFAULT_SIZE,
128            recv_buf_size: RECV_BUF_DEFAULT_SIZE,
129        })
130    }
131
132    /// Returns the [`sockaddr_qrtr`] of this connection.
133    fn bound_addr(&self) -> Result<sockaddr_qrtr, Errno> {
134        let addr = sockaddr_qrtr {
135            sq_family: AF_QIPCRTR,
136            sq_node: self
137                .proxy
138                .get_node_id(zx::MonotonicInstant::INFINITE)
139                .map_err(|e| errno!(EINVAL, e))?,
140            sq_port: self
141                .proxy
142                .get_port_id(zx::MonotonicInstant::INFINITE)
143                .map_err(|e| errno!(EINVAL, e))?,
144            ..Default::default()
145        };
146        Ok(addr)
147    }
148}
149
150impl Drop for QipcrtrSocketInner {
151    fn drop(&mut self) {
152        if let Err(e) = self.proxy.close_connection(zx::MonotonicInstant::INFINITE) {
153            log_warn!("Failed to close QRTR connection: {e:?}");
154        }
155    }
156}
157
158impl SocketOps for QipcrtrSocket {
159    fn connect(
160        &self,
161        _locked: &mut Locked<FileOpsCore>,
162        _socket: &SocketHandle,
163        _current_task: &CurrentTask,
164        peer: SocketPeer,
165    ) -> Result<(), Errno> {
166        let peer = match peer {
167            SocketPeer::Address(addr) => extract_qrtr_sockaddr(&addr)?,
168            _ => {
169                return error!(EINVAL);
170            }
171        };
172
173        let mut inner = self.inner.lock();
174        if inner.is_some() {
175            return error!(EISCONN);
176        }
177
178        // Establish a connection without a specific port number. The driver will automatically
179        // assign one, resulting in a bound socket.
180        let mut new_inner = QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
181            blocking: Some(false),
182            ..Default::default()
183        })?;
184        new_inner.peer = Some(peer);
185
186        *inner = Some(new_inner);
187        Ok(())
188    }
189
190    fn listen(
191        &self,
192        _locked: &mut Locked<FileOpsCore>,
193        _socket: &Socket,
194        _backlog: i32,
195        _credentials: ucred,
196    ) -> Result<(), Errno> {
197        error!(ENOTSUP)
198    }
199
200    fn accept(
201        &self,
202        _locked: &mut Locked<FileOpsCore>,
203        _socket: &Socket,
204        _current_task: &CurrentTask,
205    ) -> Result<SocketHandle, Errno> {
206        error!(ENOTSUP)
207    }
208
209    fn bind(
210        &self,
211        _locked: &mut Locked<FileOpsCore>,
212        _socket: &Socket,
213        _current_task: &CurrentTask,
214        socket_address: SocketAddress,
215    ) -> Result<(), Errno> {
216        let addr = extract_qrtr_sockaddr(&socket_address)?;
217
218        let mut inner = self.inner.lock();
219        if inner.is_some() {
220            return error!(EINVAL);
221        }
222
223        // Establish a connection with the specified port number.
224        *inner = Some(QipcrtrSocketInner::new(fqrtr::ConnectionOptions {
225            blocking: Some(false),
226            port: Some(addr.sq_port),
227            ..Default::default()
228        })?);
229
230        Ok(())
231    }
232
233    fn read(
234        &self,
235        _locked: &mut Locked<FileOpsCore>,
236        _socket: &Socket,
237        _current_task: &CurrentTask,
238        data: &mut dyn OutputBuffer,
239        flags: SocketMessageFlags,
240    ) -> Result<MessageReadInfo, Errno> {
241        if flags.contains(SocketMessageFlags::PEEK) {
242            track_stub!(
243                TODO("https://fxbug.dev/388082019"),
244                "SocketMessageFlags::PEEK is unsupported"
245            );
246            return error!(EINVAL);
247        }
248
249        let inner = self.connecting_lock()?;
250
251        if flags.contains(SocketMessageFlags::DONTWAIT) {
252            match inner.events.wait_one(
253                zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)
254                    | zx::Signals::EVENTPAIR_PEER_CLOSED,
255                zx::MonotonicInstant::INFINITE_PAST,
256            ) {
257                zx::WaitResult::Ok(_) => {}
258                zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
259                zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
260            }
261        }
262
263        let (src_node, src_port, src_data) = inner
264            .proxy
265            .read(zx::MonotonicInstant::INFINITE)
266            .map_err(|e| errno!(ECONNRESET, e))?
267            .map_err(qrtr_error_to_errno)?;
268
269        let bytes_read = data.write(src_data.as_bytes())?;
270        Ok(MessageReadInfo {
271            bytes_read,
272            message_length: src_data.len(),
273            address: Some(pack_qrtr_sockaddr(src_node, src_port)),
274            ..Default::default()
275        })
276    }
277
278    fn write(
279        &self,
280        _locked: &mut Locked<FileOpsCore>,
281        _socket: &Socket,
282        _current_task: &CurrentTask,
283        data: &mut dyn InputBuffer,
284        dest_address: &mut Option<SocketAddress>,
285        _ancillary_data: &mut Vec<AncillaryData>,
286    ) -> Result<usize, Errno> {
287        let inner = self.connecting_lock()?;
288
289        // If no destination address is specified, send to the peer address, which is set if
290        // connect() is called.
291        let dest = match dest_address {
292            Some(addr) => extract_qrtr_sockaddr(addr)?,
293            None => inner.peer.ok_or_else(|| errno!(EDESTADDRREQ))?,
294        };
295
296        match inner.events.wait_one(
297            zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)
298                | zx::Signals::EVENTPAIR_PEER_CLOSED,
299            zx::MonotonicInstant::INFINITE_PAST,
300        ) {
301            zx::WaitResult::Ok(_) => {}
302            zx::WaitResult::TimedOut(_) | zx::WaitResult::Canceled(_) => return error!(EAGAIN),
303            zx::WaitResult::Err(status) => return Err(from_status_like_fdio!(status)),
304        }
305
306        let data_written = data.read_all()?;
307        let _ = inner
308            .proxy
309            .write(
310                dest.sq_node,
311                dest.sq_port,
312                data_written.as_ref(),
313                zx::MonotonicInstant::INFINITE,
314            )
315            .map_err(|e| errno!(ECONNRESET, e))?
316            .map_err(qrtr_error_to_errno)?;
317        Ok(data_written.len())
318    }
319
320    fn wait_async(
321        &self,
322        _locked: &mut Locked<FileOpsCore>,
323        _socket: &Socket,
324        _current_task: &CurrentTask,
325        waiter: &Waiter,
326        events: FdEvents,
327        handler: EventHandler,
328    ) -> WaitCanceler {
329        let Ok(inner) = self.connecting_lock() else {
330            return WaitCanceler::new_noop();
331        };
332        let signal_handler = SignalHandler {
333            inner: SignalHandlerInner::ZxHandle(qrtr_signals_to_fd_events),
334            event_handler: handler,
335            err_code: None,
336        };
337        let canceler = waiter
338            .wake_on_zircon_signals(
339                &inner.events,
340                fd_events_to_qrtr_signals(events),
341                signal_handler,
342            )
343            .unwrap();
344        WaitCanceler::new_port(canceler)
345    }
346
347    fn query_events(
348        &self,
349        _locked: &mut Locked<FileOpsCore>,
350        _socket: &Socket,
351        _current_task: &CurrentTask,
352    ) -> Result<FdEvents, Errno> {
353        let inner = self.connecting_lock()?;
354        let signals = inner
355            .events
356            .as_handle_ref()
357            .wait_one(zx::Signals::all(), zx::MonotonicInstant::INFINITE_PAST)
358            .map_err(|e| from_status_like_fdio!(e))?;
359        Ok(qrtr_signals_to_fd_events(signals))
360    }
361
362    fn shutdown(
363        &self,
364        _locked: &mut Locked<FileOpsCore>,
365        _socket: &Socket,
366        _how: SocketShutdownFlags,
367    ) -> Result<(), Errno> {
368        self.close();
369        Ok(())
370    }
371
372    fn close(
373        &self,
374        _locked: &mut Locked<FileOpsCore>,
375        _current_task: &CurrentTask,
376        _socket: &Socket,
377    ) {
378        self.close();
379    }
380
381    fn getsockname(
382        &self,
383        _locked: &mut Locked<FileOpsCore>,
384        _socket: &Socket,
385    ) -> Result<SocketAddress, Errno> {
386        let name = self.connecting_lock()?.bound_addr()?;
387        Ok(SocketAddress::Qipcrtr(name.as_bytes().to_vec()))
388    }
389
390    fn getpeername(
391        &self,
392        _locked: &mut Locked<FileOpsCore>,
393        _socket: &Socket,
394    ) -> Result<SocketAddress, Errno> {
395        let peer = self.connecting_lock()?.peer.ok_or_else(|| errno!(ENOTCONN))?;
396        Ok(SocketAddress::Qipcrtr(peer.as_bytes().to_vec()))
397    }
398
399    fn setsockopt(
400        &self,
401        _locked: &mut Locked<FileOpsCore>,
402        _socket: &Socket,
403        current_task: &CurrentTask,
404        level: u32,
405        optname: u32,
406        optval: SockOptValue,
407    ) -> Result<(), Errno> {
408        let mut inner = self.connecting_lock()?;
409        match level {
410            SOL_SOCKET => match optname {
411                SO_SNDBUF => {
412                    let requested_capacity: socklen_t = optval.read(current_task)?;
413                    // SO_SNDBUF doubles the requested capacity to leave space for bookkeeping.
414                    // See https://man7.org/linux/man-pages/man7/socket.7.html
415                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
416                    // TODO(https://fxbug.dev/322907334): Clamp to `wmem_max`.
417                    let capacity = capacity.clamp(SEND_BUF_MIN_SIZE, SEND_BUF_MAX_SIZE);
418                    inner.send_buf_size = capacity;
419                }
420                SO_RCVBUF => {
421                    let requested_capacity: socklen_t = optval.read(current_task)?;
422                    // SO_RCVBUF doubles the requested capacity to leave space for bookkeeping.
423                    // See https://man7.org/linux/man-pages/man7/socket.7.html
424                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
425                    // TODO(https://fxbug.dev/322906968): Clamp to `rmem_max`.
426                    let capacity = capacity.clamp(RECV_BUF_MIN_SIZE, RECV_BUF_MAX_SIZE);
427                    inner.recv_buf_size = capacity;
428                }
429                _ => return error!(ENOSYS),
430            },
431            _ => return error!(ENOSYS),
432        }
433
434        Ok(())
435    }
436
437    fn getsockopt(
438        &self,
439        _locked: &mut Locked<FileOpsCore>,
440        _socket: &Socket,
441        _current_task: &CurrentTask,
442        level: u32,
443        optname: u32,
444        _optlen: u32,
445    ) -> Result<Vec<u8>, Errno> {
446        let inner = self.connecting_lock()?;
447        Ok(match level {
448            SOL_SOCKET => match optname {
449                SO_SNDBUF => (inner.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
450                SO_RCVBUF => (inner.recv_buf_size as socklen_t).to_ne_bytes().to_vec(),
451                _ => return error!(ENOSYS),
452            },
453            _ => vec![],
454        })
455    }
456}
457
458/// Returns the [`sockaddr_qrtr`] within a [`SocketAddress`] or `EINVAL` if the address is not a
459/// QRTR address.
460fn extract_qrtr_sockaddr(addr: &SocketAddress) -> Result<sockaddr_qrtr, Errno> {
461    match addr {
462        SocketAddress::Qipcrtr(bytes) => sockaddr_qrtr::read_from_prefix(bytes.as_bytes())
463            .map(|(addr, _)| addr)
464            .map_err(|e| errno!(EINVAL, e)),
465        _ => error!(EINVAL),
466    }
467}
468
469/// Returns the [`SocketAddress`] representing a given node and port number.
470fn pack_qrtr_sockaddr(node: u32, port: u32) -> SocketAddress {
471    let addr =
472        sockaddr_qrtr { sq_family: AF_QIPCRTR, sq_node: node, sq_port: port, ..Default::default() };
473    SocketAddress::Qipcrtr(addr.as_bytes().into())
474}
475
476/// Maps a [`fqrtr::Error`] to an [`Errno`]. This mapping is not one-to-one.
477fn qrtr_error_to_errno(e: fqrtr::Error) -> Errno {
478    match e {
479        fqrtr::Error::InternalError => errno!(EINVAL),
480        fqrtr::Error::AlreadyPending => errno!(EBUSY),
481        fqrtr::Error::RemoteNodeUnavailable => errno!(ECONNRESET),
482        fqrtr::Error::AlreadyBound => errno!(EADDRINUSE),
483        fqrtr::Error::NotSupported => errno!(ENOTSUP),
484        fqrtr::Error::WouldBlock => errno!(EAGAIN),
485        fqrtr::Error::NoResources => errno!(ENOMEM),
486        fqrtr::Error::InvalidArgs => errno!(EINVAL),
487        _ => errno!(EINVAL),
488    }
489}
490
491/// Maps [`FdEvents`] to [`zx::Signals`] for a QRTR connection.
492fn fd_events_to_qrtr_signals(events: FdEvents) -> zx::Signals {
493    let mut signals = zx::Signals::empty();
494    if events.contains(FdEvents::POLLIN) {
495        signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE);
496    }
497    if events.contains(FdEvents::POLLOUT) {
498        signals |= zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE);
499    }
500
501    // Always wait for the peer to be closed, which can generate POLLHUP.
502    signals |= zx::Signals::EVENTPAIR_PEER_CLOSED;
503    signals
504}
505
506/// Maps [`zx::Signals`] to [`FdEvents`] for a QRTR connection.
507fn qrtr_signals_to_fd_events(signals: zx::Signals) -> FdEvents {
508    let mut events = FdEvents::empty();
509    if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_READABLE)) {
510        events |= FdEvents::POLLIN;
511    }
512    if signals.contains(zx::Signals::from_bits_truncate(fqrtr::SIGNAL_WRITABLE)) {
513        events |= FdEvents::POLLOUT;
514    }
515    if signals.contains(zx::Signals::EVENTPAIR_PEER_CLOSED) {
516        events |= FdEvents::POLLHUP;
517    }
518    events
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use crate::testing::spawn_kernel_and_run;
525    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
526    use crate::vfs::socket::{SocketDomain, SocketProtocol, SocketType};
527    use fidl::endpoints::create_sync_proxy;
528    use futures::StreamExt;
529
530    /// Creates a `QipcrtrSocket` with a mock inner state.
531    ///
532    /// The mock state is connected to a `QrtrClientConnection` proxy, and the stream for that
533    /// proxy is returned to allow the test to drive the mock FIDL behavior.
534    fn mock_qipcrtr_socket()
535    -> (QipcrtrSocket, fidl::endpoints::ServerEnd<fqrtr::QrtrClientConnectionMarker>) {
536        let (proxy, server_end) = create_sync_proxy::<fqrtr::QrtrClientConnectionMarker>();
537
538        // We need an event pair for the socket.
539        let (events, _) = zx::EventPair::create();
540
541        let inner = QipcrtrSocketInner {
542            proxy,
543            events,
544            peer: None,
545            send_buf_size: SEND_BUF_DEFAULT_SIZE,
546            recv_buf_size: RECV_BUF_DEFAULT_SIZE,
547        };
548
549        (QipcrtrSocket { inner: Mutex::new(Some(inner)) }, server_end)
550    }
551
552    #[::fuchsia::test]
553    async fn test_qipcrtr_socket_new() {
554        spawn_kernel_and_run(async |locked, current_task| {
555            let _kernel = current_task.kernel();
556            // This test just checks basic creation without panic, but for QIPCRTR it tries
557            // to connect to the global service, which might fail in test env if not mocked
558            // correctly or if we rely on real service. The existing test `test_qipcrtr_socket_new`
559            // calls `Socket::new` which calls `QipcrtrSocket::new`.
560            // `QipcrtrSocket::new` creates a None inner, so it doesn't connect yet.
561            // Connection happens on first use or explicit connect.
562            let _socket = Socket::new(
563                locked,
564                &current_task,
565                SocketDomain::Qipcrtr,
566                SocketType::Datagram,
567                SocketProtocol::default(),
568                /* kernel_private = */ false,
569            )
570            .expect("Failed to create socket.");
571        })
572        .await;
573    }
574
575    #[::fuchsia::test]
576    async fn test_qipcrtr_sockopt() {
577        spawn_kernel_and_run(async |locked, current_task| {
578            let socket = mock_qipcrtr_socket();
579            let socket_obj = Socket::new_with_ops_and_info(
580                Box::new(socket.0),
581                SocketDomain::Qipcrtr,
582                SocketType::Datagram,
583                SocketProtocol::default(),
584            );
585            let _server_end = socket.1;
586
587            // Test SO_SNDBUF
588            let sndbuf =
589                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
590            let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
591            assert_eq!(sndbuf_val, SEND_BUF_DEFAULT_SIZE as u32);
592
593            let new_sndbuf: u32 = 4096;
594            socket_obj
595                .setsockopt(
596                    locked,
597                    &current_task,
598                    SOL_SOCKET,
599                    SO_SNDBUF,
600                    SockOptValue::from(new_sndbuf.as_bytes().to_vec()),
601                )
602                .unwrap();
603
604            let sndbuf =
605                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 4).unwrap();
606            let sndbuf_val = u32::from_ne_bytes(sndbuf.as_slice().try_into().unwrap());
607            // Setsockopt doubles the value.
608            assert_eq!(sndbuf_val, new_sndbuf * 2);
609
610            // Test SO_RCVBUF
611            let rcvbuf =
612                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
613            let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
614            assert_eq!(rcvbuf_val, RECV_BUF_DEFAULT_SIZE as u32);
615
616            let new_rcvbuf: u32 = 1024;
617            socket_obj
618                .setsockopt(
619                    locked,
620                    &current_task,
621                    SOL_SOCKET,
622                    SO_RCVBUF,
623                    SockOptValue::from(new_rcvbuf.as_bytes().to_vec()),
624                )
625                .unwrap();
626
627            let rcvbuf =
628                socket_obj.getsockopt(locked, &current_task, SOL_SOCKET, SO_RCVBUF, 4).unwrap();
629            let rcvbuf_val = u32::from_ne_bytes(rcvbuf.as_slice().try_into().unwrap());
630            // Setsockopt doubles the value.
631            assert_eq!(rcvbuf_val, new_rcvbuf * 2);
632        })
633        .await;
634    }
635
636    #[::fuchsia::test]
637    async fn test_qipcrtr_sockname() {
638        let (socket_inner, server_end) = mock_qipcrtr_socket();
639        // Handle get_node_id and get_port_id requests
640        std::thread::spawn(move || {
641            let mut executor = fuchsia_async::LocalExecutor::default();
642            executor.run_singlethreaded(async move {
643                let mut stream = server_end.into_stream();
644                while let Some(Ok(request)) = stream.next().await {
645                    match request {
646                        fqrtr::QrtrClientConnectionRequest::GetNodeId { responder, .. } => {
647                            let _ = responder.send(123).unwrap();
648                        }
649                        fqrtr::QrtrClientConnectionRequest::GetPortId { responder, .. } => {
650                            let _ = responder.send(456).unwrap();
651                        }
652                        fqrtr::QrtrClientConnectionRequest::CloseConnection {
653                            responder, ..
654                        } => {
655                            let _ = responder.send();
656                        }
657                        _ => panic!("Unexpected request: {:?}", request),
658                    }
659                }
660            });
661        });
662
663        spawn_kernel_and_run(async |locked, _current_task| {
664            let socket_obj = Socket::new_with_ops_and_info(
665                Box::new(socket_inner),
666                SocketDomain::Qipcrtr,
667                SocketType::Datagram,
668                SocketProtocol::default(),
669            );
670
671            let addr = socket_obj.getsockname(locked).unwrap();
672            let qrtr_addr = extract_qrtr_sockaddr(&addr).unwrap();
673            assert_eq!(qrtr_addr.sq_node, 123);
674            assert_eq!(qrtr_addr.sq_port, 456);
675
676            // Set peer
677            let peer_addr = sockaddr_qrtr {
678                sq_family: AF_QIPCRTR,
679                sq_node: 10,
680                sq_port: 20,
681                ..Default::default()
682            };
683            socket_obj
684                .downcast_socket::<QipcrtrSocket>()
685                .unwrap()
686                .inner
687                .lock()
688                .as_mut()
689                .unwrap()
690                .peer = Some(peer_addr);
691
692            let peer = socket_obj.getpeername(locked).unwrap();
693            let peer_qrtr = extract_qrtr_sockaddr(&peer).unwrap();
694            assert_eq!(peer_qrtr.sq_node, 10);
695            assert_eq!(peer_qrtr.sq_port, 20);
696        })
697        .await;
698    }
699
700    #[::fuchsia::test]
701    async fn test_qipcrtr_read_write() {
702        let (socket_inner, server_end) = mock_qipcrtr_socket();
703        std::thread::spawn(move || {
704            let mut executor = fuchsia_async::LocalExecutor::default();
705            executor.run_singlethreaded(async move {
706                let mut stream = server_end.into_stream();
707                while let Some(Ok(request)) = stream.next().await {
708                    match request {
709                        fqrtr::QrtrClientConnectionRequest::Write {
710                            dst_node_id,
711                            dst_port,
712                            data,
713                            responder,
714                            ..
715                        } => {
716                            assert_eq!(dst_node_id, 10);
717                            assert_eq!(dst_port, 20);
718                            assert_eq!(data, b"hello");
719                            let _ = responder.send(Ok(())).unwrap();
720                        }
721                        fqrtr::QrtrClientConnectionRequest::Read { responder, .. } => {
722                            let _ = responder.send(Ok((5, 15, b"world"))).unwrap();
723                        }
724                        fqrtr::QrtrClientConnectionRequest::CloseConnection {
725                            responder, ..
726                        } => {
727                            let _ = responder.send();
728                        }
729                        _ => panic!("Unexpected request: {:?}", request),
730                    }
731                }
732            });
733        });
734
735        spawn_kernel_and_run(async |locked, current_task| {
736            let socket_obj = Socket::new_with_ops_and_info(
737                Box::new(socket_inner),
738                SocketDomain::Qipcrtr,
739                SocketType::Datagram,
740                SocketProtocol::default(),
741            );
742            // Connect to set default peer
743            let peer_addr = sockaddr_qrtr {
744                sq_family: AF_QIPCRTR,
745                sq_node: 10,
746                sq_port: 20,
747                ..Default::default()
748            };
749            socket_obj
750                .downcast_socket::<QipcrtrSocket>()
751                .unwrap()
752                .inner
753                .lock()
754                .as_mut()
755                .unwrap()
756                .peer = Some(peer_addr);
757
758            // Test Write
759            let mut input = VecInputBuffer::new(b"hello");
760            let written = socket_obj
761                .write(locked, &current_task, &mut input, &mut None, &mut vec![])
762                .unwrap();
763            assert_eq!(written, 5);
764
765            // Test Read
766            let mut output = VecOutputBuffer::new(100);
767            let info = socket_obj
768                .read(locked, &current_task, &mut output, SocketMessageFlags::empty())
769                .unwrap();
770            assert_eq!(info.bytes_read, 5);
771            assert_eq!(output.data(), b"world");
772
773            let addr = extract_qrtr_sockaddr(&info.address.unwrap()).unwrap();
774            assert_eq!(addr.sq_node, 5);
775            assert_eq!(addr.sq_port, 15);
776        })
777        .await;
778    }
779}