starnix_core/vfs/socket/
socket_netlink.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::security::{self, AuditLogger, AuditMessage, AuditRequest};
6use crate::vfs::socket::{SockOptValue, SocketDomain};
7use futures::channel::mpsc::{
8    UnboundedReceiver, UnboundedSender, {self},
9};
10use linux_uapi::{AUDIT_GET, audit_status};
11use netlink::messaging::{
12    AccessControl, MessageWithPermission, NetlinkContext, NetlinkMessageWithCreds, Permission,
13    Sender, UnparsedNetlinkMessage,
14};
15use netlink::multicast_groups::{
16    InvalidLegacyGroupsError, InvalidModernGroupError, LegacyGroups, ModernGroup,
17    NoMappingFromModernToLegacyGroupError, SingleLegacyGroup,
18};
19use netlink::protocol_family::route::NetlinkRouteClient;
20use netlink::{NETLINK_LOG_TAG, NewClientError};
21use netlink_packet_core::{
22    ErrorMessage, NETLINK_HEADER_LEN, NLMSG_ERROR, NetlinkBuffer, NetlinkDeserializable,
23    NetlinkHeader, NetlinkMessage, NetlinkPayload, NetlinkSerializable,
24};
25use netlink_packet_route::RouteNetlinkMessage;
26use netlink_packet_utils::{DecodeError, Emitable as _};
27use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
28use std::marker::PhantomData;
29use std::num::{NonZeroI32, NonZeroU32};
30use std::sync::Arc;
31use zerocopy::{FromBytes, IntoBytes};
32
33use crate::device::kobject::{Device, UEventAction, UEventContext};
34use crate::device::{DeviceListener, DeviceListenerKey};
35use crate::task::{
36    CurrentTask, EventHandler, FullCredentials, Kernel, WaitCanceler, WaitQueue, Waiter,
37};
38use crate::vfs::buffers::{
39    AncillaryData, InputBuffer, Message, MessageQueue, MessageReadInfo, OutputBuffer,
40    UnixControlData, VecInputBuffer,
41};
42use crate::vfs::socket::{
43    GenericMessage, GenericNetlinkClientHandle, Socket, SocketAddress, SocketHandle,
44    SocketMessageFlags, SocketOps, SocketPeer, SocketShutdownFlags, SocketType,
45};
46use starnix_logging::{log_debug, log_error, log_warn, track_stub};
47use starnix_uapi::auth::{CAP_AUDIT_CONTROL, CAP_AUDIT_WRITE, CAP_NET_ADMIN};
48use starnix_uapi::errors::Errno;
49use starnix_uapi::vfs::FdEvents;
50use starnix_uapi::{
51    AF_NETLINK, NETLINK_ADD_MEMBERSHIP, NETLINK_AUDIT, NETLINK_CONNECTOR, NETLINK_CRYPTO,
52    NETLINK_DNRTMSG, NETLINK_DROP_MEMBERSHIP, NETLINK_ECRYPTFS, NETLINK_FIB_LOOKUP,
53    NETLINK_FIREWALL, NETLINK_GENERIC, NETLINK_IP6_FW, NETLINK_ISCSI, NETLINK_KOBJECT_UEVENT,
54    NETLINK_NETFILTER, NETLINK_NFLOG, NETLINK_RDMA, NETLINK_ROUTE, NETLINK_SCSITRANSPORT,
55    NETLINK_SELINUX, NETLINK_SMC, NETLINK_SOCK_DIAG, NETLINK_USERSOCK, NETLINK_XFRM, NLM_F_MULTI,
56    NLMSG_DONE, SO_PASSCRED, SO_PROTOCOL, SO_RCVBUF, SO_RCVBUFFORCE, SO_SNDBUF, SO_SNDBUFFORCE,
57    SO_TIMESTAMP, SOL_SOCKET, errno, error, nlmsghdr, sockaddr_nl, socklen_t, ucred,
58};
59
60// From netlink/socket.go in gVisor.
61pub const SOCKET_MIN_SIZE: usize = 4 << 10;
62pub const SOCKET_DEFAULT_SIZE: usize = 16 * 1024;
63pub const SOCKET_MAX_SIZE: usize = 4 << 20;
64
65// From linux/socket.go in gVisor.
66const SOL_NETLINK: u32 = 270;
67
68pub fn new_netlink_socket(
69    kernel: &Arc<Kernel>,
70    socket_type: SocketType,
71    family: NetlinkFamily,
72) -> Result<Box<dyn SocketOps>, Errno> {
73    log_debug!(tag = NETLINK_LOG_TAG; "Creating {:?} Netlink Socket", family);
74    if socket_type != SocketType::Datagram && socket_type != SocketType::Raw {
75        return error!(ESOCKTNOSUPPORT);
76    }
77
78    let ops: Box<dyn SocketOps> = match family {
79        NetlinkFamily::KobjectUevent => Box::new(UEventNetlinkSocket::default()),
80        NetlinkFamily::Route => Box::new(RouteNetlinkSocket::new(kernel)?),
81        NetlinkFamily::Generic => Box::new(GenericNetlinkSocket::new(kernel)?),
82        NetlinkFamily::SockDiag => Box::new(DiagnosticNetlinkSocket::default()),
83        NetlinkFamily::Audit => Box::new(AuditNetlinkSocket::new(kernel)?),
84        NetlinkFamily::Usersock
85        | NetlinkFamily::Firewall
86        | NetlinkFamily::Nflog
87        | NetlinkFamily::Xfrm
88        | NetlinkFamily::Selinux
89        | NetlinkFamily::Iscsi
90        | NetlinkFamily::FibLookup
91        | NetlinkFamily::Connector
92        | NetlinkFamily::Netfilter
93        | NetlinkFamily::Ip6Fw
94        | NetlinkFamily::Dnrtmsg
95        | NetlinkFamily::Scsitransport
96        | NetlinkFamily::Ecryptfs
97        | NetlinkFamily::Rdma
98        | NetlinkFamily::Crypto
99        | NetlinkFamily::Smc => Box::new(StubbedNetlinkSocket::new(family)),
100        NetlinkFamily::Invalid => return error!(EINVAL),
101    };
102    Ok(ops)
103}
104
105#[derive(Default, Debug, Clone, PartialEq, Eq)]
106#[repr(C)]
107pub struct NetlinkAddress {
108    pid: u32,
109    groups: u32,
110}
111
112impl NetlinkAddress {
113    pub fn new(pid: u32, groups: u32) -> Self {
114        NetlinkAddress { pid, groups }
115    }
116
117    pub fn set_pid_if_zero(&mut self, pid: i32) {
118        if self.pid == 0 {
119            self.pid = pid as u32;
120        }
121    }
122
123    pub fn to_bytes(&self) -> Vec<u8> {
124        sockaddr_nl { nl_family: AF_NETLINK, nl_pid: self.pid, nl_pad: 0, nl_groups: self.groups }
125            .as_bytes()
126            .to_vec()
127    }
128}
129
130#[derive(Debug, Hash, Eq, PartialEq, Clone)]
131pub enum NetlinkFamily {
132    Invalid,
133    Route,
134    Usersock,
135    Firewall,
136    SockDiag,
137    Nflog,
138    Xfrm,
139    Selinux,
140    Iscsi,
141    Audit,
142    FibLookup,
143    Connector,
144    Netfilter,
145    Ip6Fw,
146    Dnrtmsg,
147    KobjectUevent,
148    Generic,
149    Scsitransport,
150    Ecryptfs,
151    Rdma,
152    Crypto,
153    Smc,
154}
155
156impl NetlinkFamily {
157    pub fn from_raw(family: u32) -> Self {
158        match family {
159            NETLINK_ROUTE => NetlinkFamily::Route,
160            NETLINK_USERSOCK => NetlinkFamily::Usersock,
161            NETLINK_FIREWALL => NetlinkFamily::Firewall,
162            NETLINK_SOCK_DIAG => NetlinkFamily::SockDiag,
163            NETLINK_NFLOG => NetlinkFamily::Nflog,
164            NETLINK_XFRM => NetlinkFamily::Xfrm,
165            NETLINK_SELINUX => NetlinkFamily::Selinux,
166            NETLINK_ISCSI => NetlinkFamily::Iscsi,
167            NETLINK_AUDIT => NetlinkFamily::Audit,
168            NETLINK_FIB_LOOKUP => NetlinkFamily::FibLookup,
169            NETLINK_CONNECTOR => NetlinkFamily::Connector,
170            NETLINK_NETFILTER => NetlinkFamily::Netfilter,
171            NETLINK_IP6_FW => NetlinkFamily::Ip6Fw,
172            NETLINK_DNRTMSG => NetlinkFamily::Dnrtmsg,
173            NETLINK_KOBJECT_UEVENT => NetlinkFamily::KobjectUevent,
174            NETLINK_GENERIC => NetlinkFamily::Generic,
175            NETLINK_SCSITRANSPORT => NetlinkFamily::Scsitransport,
176            NETLINK_ECRYPTFS => NetlinkFamily::Ecryptfs,
177            NETLINK_RDMA => NetlinkFamily::Rdma,
178            NETLINK_CRYPTO => NetlinkFamily::Crypto,
179            NETLINK_SMC => NetlinkFamily::Smc,
180            _ => NetlinkFamily::Invalid,
181        }
182    }
183
184    pub fn as_raw(&self) -> u32 {
185        match self {
186            NetlinkFamily::Route => NETLINK_ROUTE,
187            NetlinkFamily::KobjectUevent => NETLINK_KOBJECT_UEVENT,
188            NetlinkFamily::Audit => NETLINK_AUDIT,
189            _ => 0,
190        }
191    }
192}
193
194struct NetlinkSocketInner {
195    /// The specific type of netlink socket.
196    family: NetlinkFamily,
197
198    /// The [`MessageQueue`] that contains messages from netlink to the client.
199    receive_buffer: MessageQueue,
200
201    /// The socket's send buffer size. Note, This value is only used
202    /// to serve getsockopt calls for `SO_SNDBUF`. It does not yet enforce a
203    /// limit on the number of messages netlink will buffer from the client.
204    /// TODO(https://fxbug.dev/285880057): Limit the size of the send buffer.
205    send_buf_size: usize,
206
207    /// This queue will be notified on reads, writes, disconnects etc.
208    waiters: WaitQueue,
209
210    /// The address of this socket.
211    address: Option<NetlinkAddress>,
212
213    /// See SO_PASSCRED.
214    pub passcred: bool,
215
216    /// See SO_TIMESTAMP.
217    pub timestamp: bool,
218}
219
220impl NetlinkSocketInner {
221    fn new(family: NetlinkFamily) -> Self {
222        Self {
223            family,
224            receive_buffer: MessageQueue::new(SOCKET_DEFAULT_SIZE),
225            send_buf_size: SOCKET_DEFAULT_SIZE,
226            waiters: WaitQueue::default(),
227            address: None,
228            passcred: false,
229            timestamp: false,
230        }
231    }
232
233    fn bind(
234        &mut self,
235        current_task: &CurrentTask,
236        socket_address: SocketAddress,
237    ) -> Result<(), Errno> {
238        if self.address.is_some() {
239            return error!(EINVAL);
240        }
241
242        let netlink_address = match socket_address {
243            SocketAddress::Netlink(mut netlink_address) => {
244                // TODO: Support distinct IDs for processes with multiple netlink sockets.
245                netlink_address.set_pid_if_zero(current_task.get_pid());
246                netlink_address
247            }
248            _ => return error!(EINVAL),
249        };
250
251        self.address = Some(netlink_address);
252        Ok(())
253    }
254
255    fn connect(&mut self, current_task: &CurrentTask, peer: SocketPeer) -> Result<(), Errno> {
256        let address = match peer {
257            SocketPeer::Address(address) => address,
258            _ => return error!(EINVAL),
259        };
260        // Connect is equivalent to bind, but error are ignored.
261        let _ = self.bind(current_task, address);
262        Ok(())
263    }
264
265    fn read_message(&mut self) -> Option<Message> {
266        let message = self.receive_buffer.read_message();
267        if message.is_some() {
268            self.waiters.notify_fd_events(FdEvents::POLLOUT);
269        }
270        message
271    }
272
273    fn read_datagram(
274        &mut self,
275        data: &mut dyn OutputBuffer,
276        flags: SocketMessageFlags,
277    ) -> Result<MessageReadInfo, Errno> {
278        let mut info = if flags.contains(SocketMessageFlags::PEEK) {
279            self.receive_buffer.peek_datagram(data)
280        } else {
281            self.receive_buffer.read_datagram(data)
282        }?;
283        if info.message_length == 0 {
284            return error!(EAGAIN);
285        }
286
287        if self.passcred {
288            track_stub!(TODO("https://fxbug.dev/297373991"), "SCM_CREDENTIALS/SO_PASSCRED");
289            info.ancillary_data.push(AncillaryData::Unix(UnixControlData::unknown_creds()));
290        }
291
292        Ok(info)
293    }
294
295    fn write_to_queue(
296        &mut self,
297        data: &mut dyn InputBuffer,
298        address: Option<NetlinkAddress>,
299        ancillary_data: &mut Vec<AncillaryData>,
300    ) -> Result<usize, Errno> {
301        let socket_address = match address {
302            Some(addr) => Some(SocketAddress::Netlink(addr)),
303            None => self.address.as_ref().map(|addr| SocketAddress::Netlink(addr.clone())),
304        };
305        let bytes_written =
306            self.receive_buffer.write_datagram(data, socket_address, ancillary_data)?;
307        if bytes_written > 0 {
308            self.waiters.notify_fd_events(FdEvents::POLLIN);
309        }
310        Ok(bytes_written)
311    }
312
313    fn wait_async(
314        &mut self,
315        waiter: &Waiter,
316        events: FdEvents,
317        handler: EventHandler,
318    ) -> WaitCanceler {
319        self.waiters.wait_async_fd_events(waiter, events, handler)
320    }
321
322    fn query_events(&self) -> FdEvents {
323        self.receive_buffer.query_events()
324    }
325
326    fn getsockname(&self) -> Result<SocketAddress, Errno> {
327        match &self.address {
328            Some(addr) => Ok(SocketAddress::Netlink(addr.clone())),
329            _ => Ok(SocketAddress::default_for_domain(SocketDomain::Netlink)),
330        }
331    }
332
333    fn getpeername(&self) -> Result<SocketAddress, Errno> {
334        match &self.address {
335            Some(addr) => Ok(SocketAddress::Netlink(addr.clone())),
336            _ => Ok(SocketAddress::default_for_domain(SocketDomain::Netlink)),
337        }
338    }
339
340    fn getsockopt(&self, level: u32, optname: u32) -> Result<Vec<u8>, Errno> {
341        let opt_value = match level {
342            SOL_SOCKET => match optname {
343                SO_PASSCRED => (self.passcred as u32).as_bytes().to_vec(),
344                SO_TIMESTAMP => (self.timestamp as u32).as_bytes().to_vec(),
345                SO_SNDBUF => (self.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
346                SO_RCVBUF => (self.receive_buffer.capacity() as socklen_t).to_ne_bytes().to_vec(),
347                SO_SNDBUFFORCE => (self.send_buf_size as socklen_t).to_ne_bytes().to_vec(),
348                SO_RCVBUFFORCE => {
349                    (self.receive_buffer.capacity() as socklen_t).to_ne_bytes().to_vec()
350                }
351                SO_PROTOCOL => self.family.as_raw().as_bytes().to_vec(),
352                _ => return error!(ENOSYS),
353            },
354            _ => vec![],
355        };
356
357        Ok(opt_value)
358    }
359
360    fn setsockopt(
361        &mut self,
362        current_task: &CurrentTask,
363        level: u32,
364        optname: u32,
365        optval: SockOptValue,
366    ) -> Result<(), Errno> {
367        match level {
368            SOL_SOCKET => match optname {
369                SO_SNDBUF => {
370                    let requested_capacity: socklen_t = optval.read(current_task)?;
371                    // SO_SNDBUF doubles the requested capacity to leave space for bookkeeping.
372                    // See https://man7.org/linux/man-pages/man7/socket.7.html
373                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
374                    // TODO(https://fxbug.dev/322907334): Clamp to `wmem_max`.
375                    let capacity = capacity.clamp(SOCKET_MIN_SIZE, SOCKET_MAX_SIZE);
376                    self.send_buf_size = capacity;
377                }
378                SO_SNDBUFFORCE => {
379                    security::check_task_capable(current_task, CAP_NET_ADMIN)?;
380                    let requested_capacity: socklen_t = optval.read(current_task)?;
381                    // SO_SNDBUFFORE doubles the requested capacity to leave space for bookkeeping.
382                    // See https://man7.org/linux/man-pages/man7/socket.7.html
383                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
384                    self.send_buf_size = capacity;
385                }
386                SO_RCVBUF => {
387                    let requested_capacity: socklen_t = optval.read(current_task)?;
388                    // SO_RCVBUF doubles the requested capacity to leave space for bookkeeping.
389                    // See https://man7.org/linux/man-pages/man7/socket.7.html
390                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
391                    // TODO(https://fxbug.dev/322906968): Clamp to `rmem_max`.
392                    let capacity = capacity.clamp(SOCKET_MIN_SIZE, SOCKET_MAX_SIZE);
393                    self.receive_buffer.set_capacity(capacity)?;
394                }
395                SO_RCVBUFFORCE => {
396                    security::check_task_capable(current_task, CAP_NET_ADMIN)?;
397                    let requested_capacity: socklen_t = optval.read(current_task)?;
398                    // SO_RCVBUFFORE doubles the requested capacity to leave space for bookkeeping.
399                    // See https://man7.org/linux/man-pages/man7/socket.7.html
400                    let capacity = usize::try_from(requested_capacity * 2).unwrap_or(usize::MAX);
401                    self.receive_buffer.set_capacity(capacity)?;
402                }
403                SO_PASSCRED => {
404                    let passcred: u32 = optval.read(current_task)?;
405                    self.passcred = passcred != 0;
406                }
407                SO_TIMESTAMP => {
408                    let timestamp: u32 = optval.read(current_task)?;
409                    self.timestamp = timestamp != 0;
410                }
411                _ => return error!(ENOSYS),
412            },
413            _ => return error!(ENOSYS),
414        }
415
416        Ok(())
417    }
418}
419
420/// A fake Netlink socket that loops messages back to the client.
421///
422/// Used as a placeholder implementation for protocol families that lack a real
423/// implementation.
424struct StubbedNetlinkSocket {
425    inner: Mutex<NetlinkSocketInner>,
426}
427
428impl StubbedNetlinkSocket {
429    pub fn new(family: NetlinkFamily) -> Self {
430        track_stub!(
431            TODO("https://fxbug.dev/278565021"),
432            format!("Creating StubbedNetlinkSocket: {:?}", family).as_str()
433        );
434        StubbedNetlinkSocket { inner: Mutex::new(NetlinkSocketInner::new(family)) }
435    }
436
437    /// Locks and returns the inner state of the Socket.
438    fn lock(&self) -> starnix_sync::MutexGuard<'_, NetlinkSocketInner> {
439        self.inner.lock()
440    }
441}
442
443impl SocketOps for StubbedNetlinkSocket {
444    fn connect(
445        &self,
446        _locked: &mut Locked<FileOpsCore>,
447        _socket: &SocketHandle,
448        current_task: &CurrentTask,
449        peer: SocketPeer,
450    ) -> Result<(), Errno> {
451        self.lock().connect(current_task, peer)
452    }
453
454    fn listen(
455        &self,
456        _locked: &mut Locked<FileOpsCore>,
457        _socket: &Socket,
458        _backlog: i32,
459        _credentials: ucred,
460    ) -> Result<(), Errno> {
461        error!(EOPNOTSUPP)
462    }
463
464    fn accept(
465        &self,
466        _locked: &mut Locked<FileOpsCore>,
467        _socket: &Socket,
468        _current_task: &CurrentTask,
469    ) -> Result<SocketHandle, Errno> {
470        error!(EOPNOTSUPP)
471    }
472
473    fn bind(
474        &self,
475        _locked: &mut Locked<FileOpsCore>,
476        _socket: &Socket,
477        current_task: &CurrentTask,
478        socket_address: SocketAddress,
479    ) -> Result<(), Errno> {
480        self.lock().bind(current_task, socket_address)
481    }
482
483    fn read(
484        &self,
485        _locked: &mut Locked<FileOpsCore>,
486        _socket: &Socket,
487        _current_task: &CurrentTask,
488        data: &mut dyn OutputBuffer,
489        _flags: SocketMessageFlags,
490    ) -> Result<MessageReadInfo, Errno> {
491        let msg = self.lock().read_message();
492        match msg {
493            Some(message) => {
494                // Mark the message as complete and return it.
495                let (mut nl_msg, _) =
496                    nlmsghdr::read_from_prefix(&message.data).map_err(|_| errno!(EINVAL))?;
497                nl_msg.nlmsg_type = NLMSG_DONE as u16;
498                nl_msg.nlmsg_flags &= NLM_F_MULTI as u16;
499                let msg_bytes = nl_msg.as_bytes();
500                let bytes_read = data.write(msg_bytes)?;
501
502                let info = MessageReadInfo {
503                    bytes_read,
504                    message_length: msg_bytes.len(),
505                    address: Some(SocketAddress::Netlink(NetlinkAddress::default())),
506                    ancillary_data: vec![],
507                };
508                Ok(info)
509            }
510            None => Ok(MessageReadInfo::default()),
511        }
512    }
513
514    fn write(
515        &self,
516        _locked: &mut Locked<FileOpsCore>,
517        _socket: &Socket,
518        _current_task: &CurrentTask,
519        data: &mut dyn InputBuffer,
520        dest_address: &mut Option<SocketAddress>,
521        ancillary_data: &mut Vec<AncillaryData>,
522    ) -> Result<usize, Errno> {
523        let mut local_address = self.lock().address.clone();
524
525        let destination = match dest_address {
526            Some(SocketAddress::Netlink(addr)) => addr,
527            _ => match &mut local_address {
528                Some(addr) => addr,
529                _ => return Ok(data.drain()),
530            },
531        };
532
533        if destination.groups != 0 {
534            track_stub!(TODO("https://fxbug.dev/322874956"), "StubbedNetlinkSockets multicasting");
535            return Ok(data.drain());
536        }
537
538        self.lock().write_to_queue(data, Some(NetlinkAddress::default()), ancillary_data)
539    }
540
541    fn wait_async(
542        &self,
543        _locked: &mut Locked<FileOpsCore>,
544        _socket: &Socket,
545        _current_task: &CurrentTask,
546        waiter: &Waiter,
547        events: FdEvents,
548        handler: EventHandler,
549    ) -> WaitCanceler {
550        self.lock().wait_async(waiter, events, handler)
551    }
552
553    fn query_events(
554        &self,
555        _locked: &mut Locked<FileOpsCore>,
556        _socket: &Socket,
557        _current_task: &CurrentTask,
558    ) -> Result<FdEvents, Errno> {
559        Ok(self.lock().query_events() & FdEvents::POLLIN)
560    }
561
562    fn shutdown(
563        &self,
564        _locked: &mut Locked<FileOpsCore>,
565        _socket: &Socket,
566        _how: SocketShutdownFlags,
567    ) -> Result<(), Errno> {
568        track_stub!(TODO("https://fxbug.dev/322875507"), "StubbedNetlinkSocket::shutdown");
569        Ok(())
570    }
571
572    fn close(
573        &self,
574        _locked: &mut Locked<FileOpsCore>,
575        _current_task: &CurrentTask,
576        _socket: &Socket,
577    ) {
578    }
579
580    fn getsockname(
581        &self,
582        _locked: &mut Locked<FileOpsCore>,
583        _socket: &Socket,
584    ) -> Result<SocketAddress, Errno> {
585        self.lock().getsockname()
586    }
587
588    fn getpeername(
589        &self,
590        _locked: &mut Locked<FileOpsCore>,
591        _socket: &Socket,
592    ) -> Result<SocketAddress, Errno> {
593        self.lock().getpeername()
594    }
595
596    fn getsockopt(
597        &self,
598        _locked: &mut Locked<FileOpsCore>,
599        _socket: &Socket,
600        _current_task: &CurrentTask,
601        level: u32,
602        optname: u32,
603        _optlen: u32,
604    ) -> Result<Vec<u8>, Errno> {
605        self.lock().getsockopt(level, optname)
606    }
607
608    fn setsockopt(
609        &self,
610        _locked: &mut Locked<FileOpsCore>,
611        _socket: &Socket,
612        current_task: &CurrentTask,
613        level: u32,
614        optname: u32,
615        optval: SockOptValue,
616    ) -> Result<(), Errno> {
617        self.lock().setsockopt(current_task, level, optname, optval)
618    }
619}
620
621/// Socket implementation for the NETLINK_KOBJECT_UEVENT family of netlink sockets.
622struct UEventNetlinkSocket {
623    inner: Arc<Mutex<NetlinkSocketInner>>,
624    device_listener_key: Mutex<Option<DeviceListenerKey>>,
625}
626
627impl Default for UEventNetlinkSocket {
628    #[allow(clippy::let_and_return)]
629    fn default() -> Self {
630        let result = Self {
631            inner: Arc::new(Mutex::new(NetlinkSocketInner::new(NetlinkFamily::KobjectUevent))),
632            device_listener_key: Default::default(),
633        };
634        #[cfg(any(test, debug_assertions))]
635        {
636            let _l1 = result.device_listener_key.lock();
637            let _l2 = result.lock();
638        }
639        result
640    }
641}
642
643impl UEventNetlinkSocket {
644    /// Locks and returns the inner state of the Socket.
645    fn lock(&self) -> starnix_sync::MutexGuard<'_, NetlinkSocketInner> {
646        self.inner.lock()
647    }
648
649    fn register_listener<L>(
650        &self,
651        locked: &mut Locked<L>,
652        current_task: &CurrentTask,
653        state: starnix_sync::MutexGuard<'_, NetlinkSocketInner>,
654    ) where
655        L: LockEqualOrBefore<FileOpsCore>,
656    {
657        if state.address.is_none() {
658            return;
659        }
660        std::mem::drop(state);
661        let mut key_state = self.device_listener_key.lock();
662        if key_state.is_none() {
663            *key_state = Some(
664                current_task.kernel().device_registry.register_listener(locked, self.inner.clone()),
665            );
666        }
667    }
668}
669
670impl SocketOps for UEventNetlinkSocket {
671    fn connect(
672        &self,
673        locked: &mut Locked<FileOpsCore>,
674        _socket: &SocketHandle,
675        current_task: &CurrentTask,
676        peer: SocketPeer,
677    ) -> Result<(), Errno> {
678        let mut state = self.lock();
679        state.connect(current_task, peer)?;
680        self.register_listener(locked, current_task, state);
681        Ok(())
682    }
683
684    fn listen(
685        &self,
686        _locked: &mut Locked<FileOpsCore>,
687        _socket: &Socket,
688        _backlog: i32,
689        _credentials: ucred,
690    ) -> Result<(), Errno> {
691        error!(EOPNOTSUPP)
692    }
693
694    fn accept(
695        &self,
696        _locked: &mut Locked<FileOpsCore>,
697        _socket: &Socket,
698        _current_task: &CurrentTask,
699    ) -> Result<SocketHandle, Errno> {
700        error!(EOPNOTSUPP)
701    }
702
703    fn bind(
704        &self,
705        locked: &mut Locked<FileOpsCore>,
706        _socket: &Socket,
707        current_task: &CurrentTask,
708        socket_address: SocketAddress,
709    ) -> Result<(), Errno> {
710        let mut state = self.lock();
711        state.bind(current_task, socket_address)?;
712        self.register_listener(locked, current_task, state);
713        Ok(())
714    }
715
716    fn read(
717        &self,
718        _locked: &mut Locked<FileOpsCore>,
719        _socket: &Socket,
720        _current_task: &CurrentTask,
721        data: &mut dyn OutputBuffer,
722        flags: SocketMessageFlags,
723    ) -> Result<MessageReadInfo, Errno> {
724        self.lock().read_datagram(data, flags)
725    }
726
727    fn write(
728        &self,
729        _locked: &mut Locked<FileOpsCore>,
730        _socket: &Socket,
731        _current_task: &CurrentTask,
732        _data: &mut dyn InputBuffer,
733        _dest_address: &mut Option<SocketAddress>,
734        _ancillary_data: &mut Vec<AncillaryData>,
735    ) -> Result<usize, Errno> {
736        error!(EOPNOTSUPP)
737    }
738
739    fn wait_async(
740        &self,
741        _locked: &mut Locked<FileOpsCore>,
742        _socket: &Socket,
743        _current_task: &CurrentTask,
744        waiter: &Waiter,
745        events: FdEvents,
746        handler: EventHandler,
747    ) -> WaitCanceler {
748        self.lock().wait_async(waiter, events, handler)
749    }
750
751    fn query_events(
752        &self,
753        _locked: &mut Locked<FileOpsCore>,
754        _socket: &Socket,
755        _current_task: &CurrentTask,
756    ) -> Result<FdEvents, Errno> {
757        Ok(self.lock().query_events() & FdEvents::POLLIN)
758    }
759
760    fn shutdown(
761        &self,
762        _locked: &mut Locked<FileOpsCore>,
763        _socket: &Socket,
764        _how: SocketShutdownFlags,
765    ) -> Result<(), Errno> {
766        track_stub!(TODO("https://fxbug.dev/322875507"), "UEventNetlinkSocket::shutdown");
767        Ok(())
768    }
769
770    fn close(
771        &self,
772        locked: &mut Locked<FileOpsCore>,
773        current_task: &CurrentTask,
774        _socket: &Socket,
775    ) {
776        let id = self.device_listener_key.lock().take();
777        if let Some(id) = id {
778            current_task.kernel().device_registry.unregister_listener(locked, &id);
779        }
780    }
781
782    fn getsockname(
783        &self,
784        _locked: &mut Locked<FileOpsCore>,
785        _socket: &Socket,
786    ) -> Result<SocketAddress, Errno> {
787        self.lock().getsockname()
788    }
789
790    fn getpeername(
791        &self,
792        _locked: &mut Locked<FileOpsCore>,
793        _socket: &Socket,
794    ) -> Result<SocketAddress, Errno> {
795        self.lock().getpeername()
796    }
797
798    fn getsockopt(
799        &self,
800        _locked: &mut Locked<FileOpsCore>,
801        _socket: &Socket,
802        _current_task: &CurrentTask,
803        level: u32,
804        optname: u32,
805        _optlen: u32,
806    ) -> Result<Vec<u8>, Errno> {
807        self.lock().getsockopt(level, optname)
808    }
809
810    fn setsockopt(
811        &self,
812        _locked: &mut Locked<FileOpsCore>,
813        _socket: &Socket,
814        current_task: &CurrentTask,
815        level: u32,
816        optname: u32,
817        optval: SockOptValue,
818    ) -> Result<(), Errno> {
819        self.lock().setsockopt(current_task, level, optname, optval)
820    }
821}
822
823impl DeviceListener for Arc<Mutex<NetlinkSocketInner>> {
824    fn on_device_event(&self, action: UEventAction, device: Device, context: UEventContext) {
825        let path = device.path_from_depth(0);
826        let message = format!(
827            "{action}@/{path}\0\
828                            ACTION={action}\0\
829                            SEQNUM={seqnum}\0\
830                            {other_props}",
831            seqnum = context.seqnum,
832            other_props = device.uevent_properties('\0'),
833        );
834        let ancillary_data = AncillaryData::Unix(UnixControlData::Credentials(Default::default()));
835        let mut ancillary_data = vec![ancillary_data];
836        // Ignore write errors
837        let _ = self.lock().write_to_queue(
838            &mut VecInputBuffer::new(message.as_bytes()),
839            Some(NetlinkAddress { pid: 0, groups: 1 }),
840            &mut ancillary_data,
841        );
842    }
843}
844
845/// Type for sending messages from [`netlink::Netlink`] to an individual socket.
846#[derive(Clone)]
847pub struct NetlinkToClientSender<M> {
848    /// The inner socket implementation, which holds a message queue.
849    inner: Arc<Mutex<NetlinkSocketInner>>,
850
851    /// `PhantomData<fn(M) -> M>` is used instead of `PhantomData<M>` in order
852    /// to ensure that the type is invariant over `M` and that it implements
853    /// `Sync` even if `M` is not `Sync`.
854    _message_type: PhantomData<fn(M) -> M>,
855}
856
857impl<M> NetlinkToClientSender<M> {
858    fn new(inner: Arc<Mutex<NetlinkSocketInner>>) -> Self {
859        NetlinkToClientSender { _message_type: Default::default(), inner }
860    }
861}
862
863impl<M: Clone + NetlinkSerializable + Send> Sender<M> for NetlinkToClientSender<M> {
864    fn send(&mut self, message: NetlinkMessage<M>, group: Option<ModernGroup>) {
865        // Serialize the message
866        let mut buf = vec![0; message.buffer_len()];
867        message.emit(&mut buf);
868        let mut buf: VecInputBuffer = buf.into();
869        // Write the message into the inner socket buffer.
870        let NetlinkToClientSender { _message_type: _, inner } = self;
871        let mut guard = inner.lock();
872
873        // To avoid dropping messages when the receive buffer is
874        // full, grow the buffer on behalf of the client.
875        // This is a stop gap measure to avoid dropping messages
876        // when netlink produces a large response to a
877        // NLM_F_DUMP request.
878        //
879        // TODO(https://fxbug.dev/459883760): The memory
880        // implications of this may be problematic. It should be
881        // replaced with a proper mechanism to handle a backlog
882        // of NLM_F_DUMP responses.
883        let available = guard.receive_buffer.available_capacity();
884        let required = buf.available();
885        if available < required {
886            let delta = required - available;
887            let current_capacity = guard.receive_buffer.capacity();
888            let new_capacity = (current_capacity + delta).min(SOCKET_MAX_SIZE);
889            match guard.receive_buffer.set_capacity(new_capacity) {
890                Ok(()) => {}
891                Err(e) => {
892                    log_error!(
893                        tag = NETLINK_LOG_TAG;
894                        "Failed to increase receive buffer size: {:?}",
895                        e
896                    );
897                }
898            }
899        }
900
901        let _bytes_written: usize = guard
902            .write_to_queue(
903                &mut buf,
904                Some(NetlinkAddress {
905                    // All messages come from the "kernel" which has PID of 0.
906                    pid: 0,
907                    // If this is a multicast message, set the group the multicast
908                    // message is from.
909                    groups: group
910                        .map(SingleLegacyGroup::try_from)
911                        .and_then(Result::<_, NoMappingFromModernToLegacyGroupError>::ok)
912                        .map_or(0, |g| g.inner()),
913                }),
914                &mut Vec::new(),
915            )
916            .unwrap_or_else(|e| {
917                log_error!(
918                    tag = NETLINK_LOG_TAG;
919                    "Failed to write message into buffer for socket. Errno: {:?}",
920                    e
921                );
922                0
923            });
924    }
925}
926
927#[derive(Clone)]
928pub struct NetlinkAccessControl<'a> {
929    current_task: &'a CurrentTask,
930}
931
932impl<'a> NetlinkAccessControl<'a> {
933    pub fn new(current_task: &'a CurrentTask) -> Self {
934        Self { current_task }
935    }
936}
937
938impl<'a> AccessControl<FullCredentials> for NetlinkAccessControl<'a> {
939    fn grant_assess(
940        &self,
941        creds: &FullCredentials,
942        permission: Permission,
943    ) -> Result<(), netlink::Errno> {
944        let need_cap_net_admin = match permission {
945            Permission::NetlinkRouteRead => false,
946            Permission::NetlinkRouteWrite => true,
947        };
948        if !need_cap_net_admin {
949            return Ok(());
950        }
951
952        self.current_task.override_creds(
953            |overridden_creds| {
954                *overridden_creds = creds.clone();
955            },
956            || {
957                security::check_task_capable(self.current_task, CAP_NET_ADMIN).map_err(|error| {
958                    netlink::Errno::new(error.code.error_code() as i32)
959                        .expect("Errno::error_code() is expected to be in range [1..max_i32]")
960                })
961            },
962        )
963    }
964}
965pub struct NetlinkContextImpl;
966
967impl NetlinkContext for NetlinkContextImpl {
968    type Creds = FullCredentials;
969    type Sender<M: Clone + NetlinkSerializable + Send> = NetlinkToClientSender<M>;
970    type Receiver<
971        M: Send + MessageWithPermission + NetlinkDeserializable<Error: Into<DecodeError>>,
972    > = UnboundedReceiver<NetlinkMessageWithCreds<UnparsedNetlinkMessage<Vec<u8>, M>, Self::Creds>>;
973    type AccessControl<'a> = NetlinkAccessControl<'a>;
974}
975
976/// Socket implementation for the NETLINK_ROUTE family of netlink sockets.
977struct RouteNetlinkSocket {
978    /// The inner Netlink socket implementation
979    inner: Arc<Mutex<NetlinkSocketInner>>,
980    /// The implementation of a client (socket connection) to NETLINK_ROUTE.
981    client: NetlinkRouteClient,
982    /// The sender of messages from this socket to Netlink.
983    // TODO(https://issuetracker.google.com/285880057): Bound the capacity of
984    // the "send buffer".
985    message_sender: UnboundedSender<
986        NetlinkMessageWithCreds<
987            UnparsedNetlinkMessage<Vec<u8>, RouteNetlinkMessage>,
988            FullCredentials,
989        >,
990    >,
991}
992
993impl RouteNetlinkSocket {
994    pub fn new(kernel: &Arc<Kernel>) -> Result<Self, Errno> {
995        let inner = Arc::new(Mutex::new(NetlinkSocketInner::new(NetlinkFamily::Route)));
996        let (message_sender, message_receiver) = mpsc::unbounded();
997        let client = match kernel
998            .network_netlink()
999            .new_route_client(NetlinkToClientSender::new(inner.clone()), message_receiver)
1000        {
1001            Ok(client) => client,
1002            Err(NewClientError::Disconnected) => {
1003                log_error!(
1004                    tag = NETLINK_LOG_TAG;
1005                    "Netlink async worker is unexpectedly disconnected"
1006                );
1007                return error!(EPIPE);
1008            }
1009        };
1010        Ok(RouteNetlinkSocket { inner, client, message_sender })
1011    }
1012}
1013
1014impl SocketOps for RouteNetlinkSocket {
1015    fn connect(
1016        &self,
1017        _locked: &mut Locked<FileOpsCore>,
1018        _socket: &SocketHandle,
1019        current_task: &CurrentTask,
1020        peer: SocketPeer,
1021    ) -> Result<(), Errno> {
1022        let RouteNetlinkSocket { inner, client: _, message_sender: _ } = self;
1023        inner.lock().connect(current_task, peer)
1024    }
1025
1026    fn listen(
1027        &self,
1028        _locked: &mut Locked<FileOpsCore>,
1029        _socket: &Socket,
1030        _backlog: i32,
1031        _credentials: ucred,
1032    ) -> Result<(), Errno> {
1033        error!(EOPNOTSUPP)
1034    }
1035
1036    fn accept(
1037        &self,
1038        _locked: &mut Locked<FileOpsCore>,
1039        _socket: &Socket,
1040        _current_task: &CurrentTask,
1041    ) -> Result<SocketHandle, Errno> {
1042        error!(EOPNOTSUPP)
1043    }
1044
1045    fn bind(
1046        &self,
1047        _locked: &mut Locked<FileOpsCore>,
1048        _socket: &Socket,
1049        current_task: &CurrentTask,
1050        socket_address: SocketAddress,
1051    ) -> Result<(), Errno> {
1052        let RouteNetlinkSocket { inner, client, message_sender: _ } = self;
1053        let multicast_groups = match socket_address {
1054            SocketAddress::Netlink(NetlinkAddress { pid: _, groups }) => groups,
1055            _ => return error!(EINVAL),
1056        };
1057        let pid = {
1058            let mut inner = inner.lock();
1059            inner.bind(current_task, socket_address)?;
1060            inner
1061                .address
1062                .as_ref()
1063                .and_then(|NetlinkAddress { pid, groups: _ }| NonZeroU32::new(*pid))
1064        };
1065        if let Some(pid) = pid {
1066            client.set_pid(pid);
1067        }
1068        // This "blocks" in order to synchronize with the internal
1069        // state of the rtnetlink worker, but we're not blocking on
1070        // the completion of any i/o or any expensive computation,
1071        // so there's no need to support interrupts here.
1072        client
1073            .set_legacy_memberships(LegacyGroups(multicast_groups))
1074            .map_err(|InvalidLegacyGroupsError {}| errno!(EPERM))?
1075            .wait_until_complete();
1076        Ok(())
1077    }
1078
1079    fn read(
1080        &self,
1081        _locked: &mut Locked<FileOpsCore>,
1082        _socket: &Socket,
1083        _current_task: &CurrentTask,
1084        data: &mut dyn OutputBuffer,
1085        flags: SocketMessageFlags,
1086    ) -> Result<MessageReadInfo, Errno> {
1087        let RouteNetlinkSocket { inner, client: _, message_sender: _ } = self;
1088        inner.lock().read_datagram(data, flags)
1089    }
1090
1091    fn write(
1092        &self,
1093        _locked: &mut Locked<FileOpsCore>,
1094        socket: &Socket,
1095        current_task: &CurrentTask,
1096        data: &mut dyn InputBuffer,
1097        _dest_address: &mut Option<SocketAddress>,
1098        _ancillary_data: &mut Vec<AncillaryData>,
1099    ) -> Result<usize, Errno> {
1100        let RouteNetlinkSocket { inner: _, client: _, message_sender } = self;
1101        let bytes = data.peek_all()?;
1102        let bytes_len = bytes.len();
1103
1104        // Parse only the netlink header to send it through security check.
1105        match NetlinkBuffer::new(&bytes) {
1106            Ok(buffer) => {
1107                security::check_netlink_send_access(current_task, socket, buffer.message_type())?;
1108            }
1109            Err(e) => {
1110                // If we can't even decode the header of the netlink message,
1111                // then return early here as a stronger statement that we're not
1112                // going to accidentally operate on it and violate the security
1113                // check. The netlink crate would end up dropping this with no
1114                // response as well.
1115                log_warn!(tag = NETLINK_LOG_TAG;
1116                    "Failed to parse netlink header {e:?}"
1117                );
1118                data.drain();
1119                return Ok(bytes_len);
1120            }
1121        }
1122
1123        let msg = NetlinkMessageWithCreds::new(
1124            UnparsedNetlinkMessage::new(bytes),
1125            current_task.full_current_creds(),
1126        );
1127        message_sender.unbounded_send(msg).map_err(|e| {
1128            log_warn!(
1129                tag = NETLINK_LOG_TAG;
1130                "Netlink receiver unexpectedly disconnected for socket: {:?}",
1131                e
1132            );
1133            errno!(EPIPE)
1134        })?;
1135        data.drain();
1136        Ok(bytes_len)
1137    }
1138
1139    fn wait_async(
1140        &self,
1141        _locked: &mut Locked<FileOpsCore>,
1142        _socket: &Socket,
1143        _current_task: &CurrentTask,
1144        waiter: &Waiter,
1145        events: FdEvents,
1146        handler: EventHandler,
1147    ) -> WaitCanceler {
1148        let RouteNetlinkSocket { inner, client: _, message_sender: _ } = self;
1149        inner.lock().wait_async(waiter, events, handler)
1150    }
1151
1152    fn query_events(
1153        &self,
1154        _locked: &mut Locked<FileOpsCore>,
1155        _socket: &Socket,
1156        _current_task: &CurrentTask,
1157    ) -> Result<FdEvents, Errno> {
1158        let RouteNetlinkSocket { inner, client: _, message_sender: _ } = self;
1159        Ok(inner.lock().query_events() & FdEvents::POLLIN)
1160    }
1161
1162    fn shutdown(
1163        &self,
1164        _locked: &mut Locked<FileOpsCore>,
1165        _socket: &Socket,
1166        _how: SocketShutdownFlags,
1167    ) -> Result<(), Errno> {
1168        error!(EOPNOTSUPP)
1169    }
1170
1171    fn close(
1172        &self,
1173        _locked: &mut Locked<FileOpsCore>,
1174        _current_task: &CurrentTask,
1175        _socket: &Socket,
1176    ) {
1177        // Close the underlying channel to the Netlink worker.
1178        self.message_sender.close_channel();
1179    }
1180
1181    fn getsockname(
1182        &self,
1183        _locked: &mut Locked<FileOpsCore>,
1184        _socket: &Socket,
1185    ) -> Result<SocketAddress, Errno> {
1186        let RouteNetlinkSocket { inner, client: _, message_sender: _ } = self;
1187        inner.lock().getsockname()
1188    }
1189
1190    fn getpeername(
1191        &self,
1192        _locked: &mut Locked<FileOpsCore>,
1193        _socket: &Socket,
1194    ) -> Result<SocketAddress, Errno> {
1195        self.inner.lock().getpeername()
1196    }
1197
1198    fn getsockopt(
1199        &self,
1200        _locked: &mut Locked<FileOpsCore>,
1201        _socket: &Socket,
1202        _current_task: &CurrentTask,
1203        level: u32,
1204        optname: u32,
1205        _optlen: u32,
1206    ) -> Result<Vec<u8>, Errno> {
1207        self.inner.lock().getsockopt(level, optname)
1208    }
1209
1210    fn setsockopt(
1211        &self,
1212        _locked: &mut Locked<FileOpsCore>,
1213        _socket: &Socket,
1214        current_task: &CurrentTask,
1215        level: u32,
1216        optname: u32,
1217        optval: SockOptValue,
1218    ) -> Result<(), Errno> {
1219        match (level, optname) {
1220            (SOL_NETLINK, NETLINK_ADD_MEMBERSHIP) => {
1221                let RouteNetlinkSocket { inner: _, client, message_sender: _ } = self;
1222                let group: u32 = optval.read(current_task)?;
1223                let async_work = client
1224                    .add_membership(ModernGroup(group))
1225                    .map_err(|InvalidModernGroupError| errno!(EINVAL))?;
1226                // This "blocks" in order to synchronize with the internal
1227                // state of the rtnetlink worker, but we're not blocking on
1228                // the completion of any i/o or any expensive computation,
1229                // so there's no need to support interrupts here.
1230                async_work.wait_until_complete();
1231                Ok(())
1232            }
1233            (SOL_NETLINK, NETLINK_DROP_MEMBERSHIP) => {
1234                let RouteNetlinkSocket { inner: _, client, message_sender: _ } = self;
1235                let group: u32 = optval.read(current_task)?;
1236                client
1237                    .del_membership(ModernGroup(group))
1238                    .map_err(|InvalidModernGroupError| errno!(EINVAL))?;
1239                Ok(())
1240            }
1241            _ => self.inner.lock().setsockopt(current_task, level, optname, optval),
1242        }
1243    }
1244}
1245
1246/// Socket implementation for the NETLINK_SOCK_DIAG family of netlink sockets.
1247struct DiagnosticNetlinkSocket {
1248    /// The inner Netlink socket implementation
1249    inner: Arc<Mutex<NetlinkSocketInner>>,
1250}
1251
1252impl Default for DiagnosticNetlinkSocket {
1253    fn default() -> Self {
1254        Self { inner: Arc::new(Mutex::new(NetlinkSocketInner::new(NetlinkFamily::SockDiag))) }
1255    }
1256}
1257
1258impl SocketOps for DiagnosticNetlinkSocket {
1259    fn connect(
1260        &self,
1261        _locked: &mut Locked<FileOpsCore>,
1262        _socket: &SocketHandle,
1263        current_task: &CurrentTask,
1264        peer: SocketPeer,
1265    ) -> Result<(), Errno> {
1266        self.inner.lock().connect(current_task, peer)
1267    }
1268
1269    fn listen(
1270        &self,
1271        _locked: &mut Locked<FileOpsCore>,
1272        _socket: &Socket,
1273        _backlog: i32,
1274        _credentials: ucred,
1275    ) -> Result<(), Errno> {
1276        error!(EOPNOTSUPP)
1277    }
1278
1279    fn accept(
1280        &self,
1281        _locked: &mut Locked<FileOpsCore>,
1282        _socket: &Socket,
1283        _current_task: &CurrentTask,
1284    ) -> Result<SocketHandle, Errno> {
1285        error!(EOPNOTSUPP)
1286    }
1287
1288    fn bind(
1289        &self,
1290        _locked: &mut Locked<FileOpsCore>,
1291        _socket: &Socket,
1292        current_task: &CurrentTask,
1293        socket_address: SocketAddress,
1294    ) -> Result<(), Errno> {
1295        self.inner.lock().bind(current_task, socket_address)
1296    }
1297
1298    fn read(
1299        &self,
1300        _locked: &mut Locked<FileOpsCore>,
1301        _socket: &Socket,
1302        _current_task: &CurrentTask,
1303        data: &mut dyn OutputBuffer,
1304        flags: SocketMessageFlags,
1305    ) -> Result<MessageReadInfo, Errno> {
1306        self.inner.lock().read_datagram(data, flags)
1307    }
1308
1309    fn write(
1310        &self,
1311        _locked: &mut Locked<FileOpsCore>,
1312        _socket: &Socket,
1313        _current_task: &CurrentTask,
1314        _data: &mut dyn InputBuffer,
1315        _dest_address: &mut Option<SocketAddress>,
1316        _ancillary_data: &mut Vec<AncillaryData>,
1317    ) -> Result<usize, Errno> {
1318        track_stub!(TODO("https://fxbug.dev/323590076"), "NETLINK_SOCK_DIAG handle request");
1319        error!(ENOTSUP)
1320    }
1321
1322    fn wait_async(
1323        &self,
1324        _locked: &mut Locked<FileOpsCore>,
1325        _socket: &Socket,
1326        _current_task: &CurrentTask,
1327        waiter: &Waiter,
1328        events: FdEvents,
1329        handler: EventHandler,
1330    ) -> WaitCanceler {
1331        self.inner.lock().wait_async(waiter, events, handler)
1332    }
1333
1334    fn query_events(
1335        &self,
1336        _locked: &mut Locked<FileOpsCore>,
1337        _socket: &Socket,
1338        _current_task: &CurrentTask,
1339    ) -> Result<FdEvents, Errno> {
1340        Ok(self.inner.lock().query_events() & FdEvents::POLLIN)
1341    }
1342
1343    fn shutdown(
1344        &self,
1345        _locked: &mut Locked<FileOpsCore>,
1346        _socket: &Socket,
1347        _how: SocketShutdownFlags,
1348    ) -> Result<(), Errno> {
1349        error!(EOPNOTSUPP)
1350    }
1351
1352    fn close(
1353        &self,
1354        _locked: &mut Locked<FileOpsCore>,
1355        _current_task: &CurrentTask,
1356        _socket: &Socket,
1357    ) {
1358    }
1359
1360    fn getsockname(
1361        &self,
1362        _locked: &mut Locked<FileOpsCore>,
1363        _socket: &Socket,
1364    ) -> Result<SocketAddress, Errno> {
1365        self.inner.lock().getsockname()
1366    }
1367
1368    fn getpeername(
1369        &self,
1370        _locked: &mut Locked<FileOpsCore>,
1371        _socket: &Socket,
1372    ) -> Result<SocketAddress, Errno> {
1373        self.inner.lock().getpeername()
1374    }
1375
1376    fn getsockopt(
1377        &self,
1378        _locked: &mut Locked<FileOpsCore>,
1379        _socket: &Socket,
1380        _current_task: &CurrentTask,
1381        level: u32,
1382        optname: u32,
1383        _optlen: u32,
1384    ) -> Result<Vec<u8>, Errno> {
1385        self.inner.lock().getsockopt(level, optname)
1386    }
1387
1388    fn setsockopt(
1389        &self,
1390        _locked: &mut Locked<FileOpsCore>,
1391        _socket: &Socket,
1392        current_task: &CurrentTask,
1393        level: u32,
1394        optname: u32,
1395        optval: SockOptValue,
1396    ) -> Result<(), Errno> {
1397        self.inner.lock().setsockopt(current_task, level, optname, optval)
1398    }
1399}
1400
1401/// Socket implementation for the NETLINK_GENERIC family of netlink sockets.
1402struct GenericNetlinkSocket {
1403    inner: Arc<Mutex<NetlinkSocketInner>>,
1404    client: GenericNetlinkClientHandle<NetlinkToClientSender<GenericMessage>>,
1405    message_sender: mpsc::UnboundedSender<NetlinkMessage<GenericMessage>>,
1406}
1407
1408impl GenericNetlinkSocket {
1409    pub fn new(kernel: &Kernel) -> Result<Self, Errno> {
1410        let inner = Arc::new(Mutex::new(NetlinkSocketInner::new(NetlinkFamily::Generic)));
1411        let (message_sender, message_receiver) = mpsc::unbounded();
1412        match kernel
1413            .generic_netlink()
1414            .new_generic_client(NetlinkToClientSender::new(inner.clone()), message_receiver)
1415        {
1416            Ok(client) => Ok(Self { inner, client, message_sender }),
1417            Err(e) => {
1418                log_warn!(
1419                    tag = NETLINK_LOG_TAG;
1420                    "Failed to connect to generic netlink server. Errno: {:?}",
1421                    e
1422                );
1423                error!(EPIPE)
1424            }
1425        }
1426    }
1427
1428    /// Locks and returns the inner state of the Socket.
1429    fn lock(&self) -> starnix_sync::MutexGuard<'_, NetlinkSocketInner> {
1430        self.inner.lock()
1431    }
1432}
1433
1434impl SocketOps for GenericNetlinkSocket {
1435    fn connect(
1436        &self,
1437        _locked: &mut Locked<FileOpsCore>,
1438        _socket: &SocketHandle,
1439        current_task: &CurrentTask,
1440        peer: SocketPeer,
1441    ) -> Result<(), Errno> {
1442        let mut state = self.lock();
1443        state.connect(current_task, peer)
1444    }
1445
1446    fn listen(
1447        &self,
1448        _locked: &mut Locked<FileOpsCore>,
1449        _socket: &Socket,
1450        _backlog: i32,
1451        _credentials: ucred,
1452    ) -> Result<(), Errno> {
1453        error!(EOPNOTSUPP)
1454    }
1455
1456    fn accept(
1457        &self,
1458        _locked: &mut Locked<FileOpsCore>,
1459        _socket: &Socket,
1460        _current_task: &CurrentTask,
1461    ) -> Result<SocketHandle, Errno> {
1462        error!(EOPNOTSUPP)
1463    }
1464
1465    fn bind(
1466        &self,
1467        _locked: &mut Locked<FileOpsCore>,
1468        _socket: &Socket,
1469        current_task: &CurrentTask,
1470        socket_address: SocketAddress,
1471    ) -> Result<(), Errno> {
1472        let mut state = self.lock();
1473        state.bind(current_task, socket_address)
1474    }
1475
1476    fn read(
1477        &self,
1478        _locked: &mut Locked<FileOpsCore>,
1479        _socket: &Socket,
1480        _current_task: &CurrentTask,
1481        data: &mut dyn OutputBuffer,
1482        flags: SocketMessageFlags,
1483    ) -> Result<MessageReadInfo, Errno> {
1484        self.lock().read_datagram(data, flags)
1485    }
1486
1487    fn write(
1488        &self,
1489        _locked: &mut Locked<FileOpsCore>,
1490        _socket: &Socket,
1491        _current_task: &CurrentTask,
1492        data: &mut dyn InputBuffer,
1493        _dest_address: &mut Option<SocketAddress>,
1494        _ancillary_data: &mut Vec<AncillaryData>,
1495    ) -> Result<usize, Errno> {
1496        let bytes = data.read_all()?;
1497        match NetlinkMessage::<GenericMessage>::deserialize(&bytes) {
1498            Err(e) => {
1499                log_warn!("Failed to process write; data could not be deserialized: {:?}", e);
1500                error!(EINVAL)
1501            }
1502            Ok(msg) => match self.message_sender.unbounded_send(msg) {
1503                Ok(()) => Ok(bytes.len()),
1504                Err(e) => {
1505                    log_warn!("Netlink receiver unexpectedly disconnected for socket: {:?}", e);
1506                    error!(EPIPE)
1507                }
1508            },
1509        }
1510    }
1511
1512    fn wait_async(
1513        &self,
1514        _locked: &mut Locked<FileOpsCore>,
1515        _socket: &Socket,
1516        _current_task: &CurrentTask,
1517        waiter: &Waiter,
1518        events: FdEvents,
1519        handler: EventHandler,
1520    ) -> WaitCanceler {
1521        self.lock().wait_async(waiter, events, handler)
1522    }
1523
1524    fn query_events(
1525        &self,
1526        _locked: &mut Locked<FileOpsCore>,
1527        _socket: &Socket,
1528        _current_task: &CurrentTask,
1529    ) -> Result<FdEvents, Errno> {
1530        Ok(self.lock().query_events() & FdEvents::POLLIN)
1531    }
1532
1533    fn shutdown(
1534        &self,
1535        _locked: &mut Locked<FileOpsCore>,
1536        _socket: &Socket,
1537        _how: SocketShutdownFlags,
1538    ) -> Result<(), Errno> {
1539        track_stub!(TODO("https://fxbug.dev/322875507"), "GenericNetlinkSocket::shutdown");
1540        Ok(())
1541    }
1542
1543    fn close(
1544        &self,
1545        _locked: &mut Locked<FileOpsCore>,
1546        _current_task: &CurrentTask,
1547        _socket: &Socket,
1548    ) {
1549    }
1550
1551    fn getsockname(
1552        &self,
1553        _locked: &mut Locked<FileOpsCore>,
1554        _socket: &Socket,
1555    ) -> Result<SocketAddress, Errno> {
1556        self.lock().getsockname()
1557    }
1558
1559    fn getpeername(
1560        &self,
1561        _locked: &mut Locked<FileOpsCore>,
1562        _socket: &Socket,
1563    ) -> Result<SocketAddress, Errno> {
1564        self.lock().getpeername()
1565    }
1566
1567    fn getsockopt(
1568        &self,
1569        _locked: &mut Locked<FileOpsCore>,
1570        _socket: &Socket,
1571        _current_task: &CurrentTask,
1572        level: u32,
1573        optname: u32,
1574        _optlen: u32,
1575    ) -> Result<Vec<u8>, Errno> {
1576        self.lock().getsockopt(level, optname)
1577    }
1578
1579    fn setsockopt(
1580        &self,
1581        _locked: &mut Locked<FileOpsCore>,
1582        _socket: &Socket,
1583        current_task: &CurrentTask,
1584        level: u32,
1585        optname: u32,
1586        optval: SockOptValue,
1587    ) -> Result<(), Errno> {
1588        match (level, optname) {
1589            (SOL_NETLINK, NETLINK_ADD_MEMBERSHIP) => {
1590                let group_id: u32 = optval.read(current_task)?;
1591                self.client.add_membership(ModernGroup(group_id))
1592            }
1593            _ => self.lock().setsockopt(current_task, level, optname, optval),
1594        }
1595    }
1596}
1597
1598/// Audit client that can be attached to the `AuditLogger`.
1599pub struct AuditNetlinkClient {
1600    /// Reference to the `AuditLogger`.
1601    audit_logger: Arc<AuditLogger>,
1602    /// The waiters queue present in `AuditNetlinkSocket`.
1603    waiters: WaitQueue,
1604    /// Optional response from the `AuditLogger`.
1605    audit_response: Mutex<Option<NetlinkMessage<GenericMessage>>>,
1606}
1607
1608impl AuditNetlinkClient {
1609    fn new(audit_logger: Arc<AuditLogger>) -> Self {
1610        Self { audit_logger, waiters: Default::default(), audit_response: Mutex::new(None) }
1611    }
1612
1613    pub fn notify(&self) {
1614        self.waiters.notify_fd_events(FdEvents::POLLIN);
1615    }
1616
1617    /// Function to check the capabilities of the current task against CAP_AUDIT_*
1618    fn check_audit_access(
1619        &self,
1620        current_task: &CurrentTask,
1621        request_type: &AuditRequest,
1622    ) -> Result<(), Errno> {
1623        match request_type {
1624            AuditRequest::AuditGet | AuditRequest::AuditSet => {
1625                security::check_task_capable(current_task, CAP_AUDIT_CONTROL)
1626            }
1627            AuditRequest::AuditUser => security::check_task_capable(current_task, CAP_AUDIT_WRITE),
1628        }
1629    }
1630
1631    /// Function to process request coming from userspace, it returns the response after processing
1632    fn process_request(
1633        self: &Arc<Self>,
1634        current_task: &CurrentTask,
1635        nl_message: NetlinkMessage<GenericMessage>,
1636    ) -> Result<NetlinkMessage<GenericMessage>, Errno> {
1637        let (nl_header, nl_payload) = nl_message.into_parts();
1638        let audit_request_type = AuditRequest::try_from(nl_header.message_type as u32)?;
1639        self.check_audit_access(current_task, &audit_request_type)?;
1640
1641        // If there is no GenericMessage, return an ErrorMessage.
1642        let NetlinkPayload::InnerMessage(GenericMessage::Other { payload, .. }) = nl_payload else {
1643            return error!(EINVAL);
1644        };
1645        match audit_request_type {
1646            AuditRequest::AuditGet => self.process_get_status(nl_header.sequence_number),
1647            AuditRequest::AuditSet => self.process_set_status(current_task, nl_header, payload),
1648            AuditRequest::AuditUser => self.process_user_audit(nl_header, payload),
1649        }
1650    }
1651
1652    fn get_nl_response(&self, flags: SocketMessageFlags) -> Option<Vec<u8>> {
1653        if flags.contains(SocketMessageFlags::PEEK) {
1654            if let Some(message) = self.audit_response.lock().as_ref() {
1655                return Some(AuditNetlinkClient::serialize_nlmsg(message.clone()));
1656            }
1657        } else if let Some(message) = self.audit_response.lock().take() {
1658            return Some(AuditNetlinkClient::serialize_nlmsg(message));
1659        }
1660        None
1661    }
1662
1663    /// Function to read an audit message from `AuditLogger`.
1664    fn read_audit_log(self: &Arc<Self>) -> Option<Vec<u8>> {
1665        if let Some(AuditMessage { audit_type, message }) = self.audit_logger.read_audit_log(self) {
1666            return Some(AuditNetlinkClient::serialize_nlmsg(
1667                AuditNetlinkClient::build_audit_nlmsg(0, audit_type, message),
1668            ));
1669        }
1670        None
1671    }
1672
1673    /// Function to read the optional response if present or an audit message.
1674    fn read_nlmsg(self: &Arc<Self>, flags: SocketMessageFlags) -> Result<Vec<u8>, Errno> {
1675        // First check if there is a response and send it if present.
1676        // Send an audit message otherwise or return EAGAIN.
1677        self.get_nl_response(flags).or_else(|| self.read_audit_log()).ok_or_else(|| errno!(EAGAIN))
1678    }
1679
1680    fn process_get_status(
1681        &self,
1682        sequence_number: u32,
1683    ) -> Result<NetlinkMessage<GenericMessage>, Errno> {
1684        Ok(AuditNetlinkClient::build_audit_nlmsg(
1685            sequence_number,
1686            AUDIT_GET as u16,
1687            self.audit_logger.get_status().as_bytes().to_vec(),
1688        ))
1689    }
1690
1691    fn process_set_status(
1692        self: &Arc<Self>,
1693        current_task: &CurrentTask,
1694        nl_hdr: NetlinkHeader,
1695        nl_payload: Vec<u8>,
1696    ) -> Result<NetlinkMessage<GenericMessage>, Errno> {
1697        let Some(status) = audit_status::read_from_bytes(nl_payload.as_bytes()).ok() else {
1698            return error!(EINVAL);
1699        };
1700        self.audit_logger.set_status(current_task, status, self)?;
1701        Ok(AuditNetlinkClient::build_audit_ack(Ok(()), nl_hdr))
1702    }
1703
1704    fn process_user_audit(
1705        &self,
1706        nl_hdr: NetlinkHeader,
1707        nl_payload: Vec<u8>,
1708    ) -> Result<NetlinkMessage<GenericMessage>, Errno> {
1709        let audit_msg = String::from_utf8_lossy(nl_payload.as_bytes());
1710        self.audit_logger.audit_log(nl_hdr.message_type as u16, move || audit_msg);
1711        Ok(AuditNetlinkClient::build_audit_ack(Ok(()), nl_hdr))
1712    }
1713
1714    fn query_events(self: &Arc<Self>) -> FdEvents {
1715        if self.audit_response.lock().is_some() || self.audit_logger.get_backlog_count(self) != 0 {
1716            return FdEvents::POLLIN;
1717        }
1718        FdEvents::empty()
1719    }
1720
1721    fn detach(self: &Arc<Self>) {
1722        self.audit_logger.detach_client(self);
1723    }
1724
1725    fn build_audit_nlmsg(
1726        seq_number: u32,
1727        msg_type: u16,
1728        payload: Vec<u8>,
1729    ) -> NetlinkMessage<GenericMessage> {
1730        // The family in GenericMessage can be used for message type, not only for the Netlink Family,
1731        // because after finalizing the message, the message type is equal to family.
1732        let nl_payload =
1733            NetlinkPayload::InnerMessage(GenericMessage::Other { family: msg_type, payload });
1734        let mut nl_header = NetlinkHeader::default();
1735        nl_header.sequence_number = seq_number;
1736        let mut message = NetlinkMessage::new(nl_header, nl_payload);
1737        message.finalize();
1738        message
1739    }
1740
1741    fn build_audit_ack(
1742        error: Result<(), Errno>,
1743        req_header: NetlinkHeader,
1744    ) -> NetlinkMessage<GenericMessage> {
1745        let error = {
1746            assert_eq!(req_header.buffer_len(), NETLINK_HEADER_LEN);
1747            let mut buffer = vec![0; NETLINK_HEADER_LEN];
1748            req_header.emit(&mut buffer);
1749
1750            let code = match error {
1751                Ok(()) => None,
1752                Err(e) => Some(
1753                    // Audit netlink errors are negative.
1754                    NonZeroI32::new(-(e.code.error_code() as i32))
1755                        .expect("Errno's code must be non-zero"),
1756                ),
1757            };
1758
1759            let mut error = ErrorMessage::default();
1760            error.code = code;
1761            error.header = buffer;
1762            error
1763        };
1764
1765        let payload = NetlinkPayload::<GenericMessage>::Error(error);
1766        let mut resp_header = NetlinkHeader::default();
1767        resp_header.message_type = NLMSG_ERROR;
1768        resp_header.sequence_number = req_header.sequence_number;
1769        let mut message = NetlinkMessage::new(resp_header, payload);
1770        message.finalize();
1771        message
1772    }
1773
1774    fn serialize_nlmsg(message: NetlinkMessage<GenericMessage>) -> Vec<u8> {
1775        let mut buf = vec![0; message.buffer_len()];
1776        message.serialize(&mut buf);
1777        buf
1778    }
1779}
1780
1781/// Audit Netlink Socket structure.
1782pub struct AuditNetlinkSocket {
1783    /// Reference to the `AuditNetlinkClient` associated with self.
1784    audit_client: Arc<AuditNetlinkClient>,
1785}
1786
1787impl AuditNetlinkSocket {
1788    pub fn new(kernel: &Kernel) -> Result<Self, Errno> {
1789        if kernel.audit_logger().is_disabled() {
1790            return error!(EPROTONOSUPPORT);
1791        }
1792        Ok(Self { audit_client: Arc::new(AuditNetlinkClient::new(kernel.audit_logger())) })
1793    }
1794}
1795
1796impl SocketOps for AuditNetlinkSocket {
1797    fn read(
1798        &self,
1799        _locked: &mut Locked<FileOpsCore>,
1800        _socket: &Socket,
1801        _current_task: &CurrentTask,
1802        data: &mut dyn OutputBuffer,
1803        flags: SocketMessageFlags,
1804    ) -> Result<MessageReadInfo, Errno> {
1805        let buf = self.audit_client.read_nlmsg(flags)?;
1806
1807        let size = data.write_all(buf.as_bytes())?;
1808        Ok(MessageReadInfo {
1809            bytes_read: size,
1810            message_length: size,
1811            address: Some(SocketAddress::Netlink(NetlinkAddress::default())),
1812            ancillary_data: vec![],
1813        })
1814    }
1815
1816    fn write(
1817        &self,
1818        _locked: &mut Locked<FileOpsCore>,
1819        socket: &Socket,
1820        current_task: &CurrentTask,
1821        data: &mut dyn InputBuffer,
1822        _dest_address: &mut Option<SocketAddress>,
1823        _ancillary_data: &mut Vec<AncillaryData>,
1824    ) -> Result<usize, Errno> {
1825        match NetlinkMessage::<GenericMessage>::deserialize(&(data.peek_all()?)) {
1826            Ok(nl_message) => {
1827                let header = nl_message.header;
1828                security::check_netlink_send_access(current_task, socket, header.message_type)?;
1829
1830                // Send request to the `AuditNetlinkClient`.
1831                let audit_ack = self
1832                    .audit_client
1833                    .process_request(current_task, nl_message)
1834                    .map_err(|e| AuditNetlinkClient::build_audit_ack(Err(e), header))
1835                    .unwrap_or_else(|nlerr| nlerr);
1836                *self.audit_client.audit_response.lock() = Some(audit_ack);
1837                data.drain();
1838                Ok(header.length as usize)
1839            }
1840            Err(e) => {
1841                log_warn!("Failed to process write; data could not be deserialized: {:?}", e);
1842                error!(EINVAL)
1843            }
1844        }
1845    }
1846
1847    fn wait_async(
1848        &self,
1849        _locked: &mut Locked<FileOpsCore>,
1850        _socket: &Socket,
1851        _current_task: &CurrentTask,
1852        waiter: &Waiter,
1853        events: FdEvents,
1854        handler: EventHandler,
1855    ) -> WaitCanceler {
1856        self.audit_client.waiters.wait_async_fd_events(waiter, events, handler)
1857    }
1858
1859    fn query_events(
1860        &self,
1861        _locked: &mut Locked<FileOpsCore>,
1862        _socket: &Socket,
1863        _current_task: &CurrentTask,
1864    ) -> Result<FdEvents, Errno> {
1865        Ok(self.audit_client.query_events() & FdEvents::POLLIN)
1866    }
1867
1868    fn close(
1869        &self,
1870        _locked: &mut Locked<FileOpsCore>,
1871        _current_task: &CurrentTask,
1872        _socket: &Socket,
1873    ) {
1874        // If the `AuditNetlinkClient` disconnects, detach it.
1875        self.audit_client.detach();
1876    }
1877
1878    fn shutdown(
1879        &self,
1880        _locked: &mut Locked<FileOpsCore>,
1881        _socket: &Socket,
1882        _how: SocketShutdownFlags,
1883    ) -> Result<(), Errno> {
1884        error!(EOPNOTSUPP)
1885    }
1886
1887    fn connect(
1888        &self,
1889        _locked: &mut Locked<FileOpsCore>,
1890        _socket: &SocketHandle,
1891        _current_task: &CurrentTask,
1892        _peer: SocketPeer,
1893    ) -> Result<(), Errno> {
1894        error!(EOPNOTSUPP)
1895    }
1896
1897    fn listen(
1898        &self,
1899        _locked: &mut Locked<FileOpsCore>,
1900        _socket: &Socket,
1901        _backlog: i32,
1902        _credentials: ucred,
1903    ) -> Result<(), Errno> {
1904        error!(EOPNOTSUPP)
1905    }
1906
1907    fn accept(
1908        &self,
1909        _locked: &mut Locked<FileOpsCore>,
1910        _socket: &Socket,
1911        _current_task: &CurrentTask,
1912    ) -> Result<SocketHandle, Errno> {
1913        error!(EOPNOTSUPP)
1914    }
1915
1916    fn bind(
1917        &self,
1918        _locked: &mut Locked<FileOpsCore>,
1919        _socket: &Socket,
1920        _current_task: &CurrentTask,
1921        _socket_address: SocketAddress,
1922    ) -> Result<(), Errno> {
1923        error!(EOPNOTSUPP)
1924    }
1925
1926    fn getsockname(
1927        &self,
1928        _locked: &mut Locked<FileOpsCore>,
1929        _socket: &Socket,
1930    ) -> Result<SocketAddress, Errno> {
1931        error!(EOPNOTSUPP)
1932    }
1933
1934    fn getpeername(
1935        &self,
1936        _locked: &mut Locked<FileOpsCore>,
1937        _socket: &Socket,
1938    ) -> Result<SocketAddress, Errno> {
1939        error!(EOPNOTSUPP)
1940    }
1941
1942    fn getsockopt(
1943        &self,
1944        _locked: &mut Locked<FileOpsCore>,
1945        _socket: &Socket,
1946        _current_task: &CurrentTask,
1947        _level: u32,
1948        _optname: u32,
1949        _optlen: u32,
1950    ) -> Result<Vec<u8>, Errno> {
1951        error!(EOPNOTSUPP)
1952    }
1953
1954    fn setsockopt(
1955        &self,
1956        _locked: &mut Locked<FileOpsCore>,
1957        _socket: &Socket,
1958        _current_task: &CurrentTask,
1959        _level: u32,
1960        _optname: u32,
1961        _optval: SockOptValue,
1962    ) -> Result<(), Errno> {
1963        error!(EOPNOTSUPP)
1964    }
1965}
1966
1967#[cfg(test)]
1968mod tests {
1969    use super::*;
1970
1971    use netlink_packet_route::route::RouteMessage;
1972    use test_case::test_case;
1973
1974    // Successfully send the message and observe it's stored in the queue.
1975    #[test_case(true; "sufficient_capacity")]
1976    // Attempting to send when the queue is full should succeed by increasing
1977    // the size of the queue.
1978    #[test_case(false; "insufficient_capacity")]
1979    fn test_netlink_to_client_sender(sufficient_capacity: bool) {
1980        const MODERN_GROUP: u32 = 5;
1981
1982        let mut message: NetlinkMessage<RouteNetlinkMessage> =
1983            RouteNetlinkMessage::NewRoute(RouteMessage::default()).into();
1984        message.finalize();
1985
1986        let (initial_queue_size, final_queue_size) = if sufficient_capacity {
1987            (SOCKET_DEFAULT_SIZE, SOCKET_DEFAULT_SIZE)
1988        } else {
1989            (0, message.buffer_len())
1990        };
1991
1992        let socket_inner = Arc::new(Mutex::new(NetlinkSocketInner {
1993            receive_buffer: MessageQueue::new(initial_queue_size),
1994            ..NetlinkSocketInner::new(NetlinkFamily::Route)
1995        }));
1996
1997        let mut sender = NetlinkToClientSender::<RouteNetlinkMessage>::new(socket_inner.clone());
1998        sender.send(message.clone(), Some(ModernGroup(MODERN_GROUP)));
1999        let Message { data, address, ancillary_data: _ } =
2000            socket_inner.lock().read_message().expect("should read message");
2001
2002        assert_eq!(
2003            address,
2004            Some(SocketAddress::Netlink(NetlinkAddress { pid: 0, groups: 1 << MODERN_GROUP }))
2005        );
2006        let actual_message = NetlinkMessage::<RouteNetlinkMessage>::deserialize(&data)
2007            .expect("message should deserialize into RtnlMessage");
2008        assert_eq!(actual_message, message);
2009        assert_eq!(socket_inner.lock().receive_buffer.capacity(), final_queue_size);
2010    }
2011
2012    fn getsockopt_u32(socket: &NetlinkSocketInner, level: u32, optname: u32) -> u32 {
2013        let byte_vec = socket.getsockopt(level, optname).expect("getsockopt should succeed");
2014        let bytes: [u8; 4] = byte_vec.as_slice().try_into().expect("expected 4 bytes");
2015        u32::from_ne_bytes(bytes)
2016    }
2017
2018    fn sock_opt_value(val: u32) -> SockOptValue {
2019        SockOptValue::Value(val.to_ne_bytes().to_vec())
2020    }
2021
2022    #[::fuchsia::test]
2023    async fn test_set_get_snd_rcv_buf() {
2024        crate::testing::spawn_kernel_and_run_sync(|_locked, current_task| {
2025            let mut socket = NetlinkSocketInner::new(NetlinkFamily::Route);
2026
2027            // Verify initialization uses the default value.
2028            let expected_default = u32::try_from(SOCKET_DEFAULT_SIZE).unwrap();
2029            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_SNDBUF), expected_default);
2030            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_RCVBUF), expected_default);
2031
2032            // Set new values and observe that they were applied.
2033            // Note that applied value is 2 times the requested value.
2034            const SNDBUF_SIZE: u32 = 12345;
2035            const RCVBUF_SIZE: u32 = 54321;
2036            socket
2037                .setsockopt(current_task, SOL_SOCKET, SO_SNDBUF, sock_opt_value(SNDBUF_SIZE))
2038                .expect("setsockopt should succeed");
2039            socket
2040                .setsockopt(current_task, SOL_SOCKET, SO_RCVBUF, sock_opt_value(RCVBUF_SIZE))
2041                .expect("setsockopt should succeed");
2042            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_SNDBUF), SNDBUF_SIZE * 2);
2043            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_RCVBUF), RCVBUF_SIZE * 2);
2044        })
2045        .await;
2046    }
2047
2048    #[::fuchsia::test]
2049    async fn test_snd_rcv_buf_limits() {
2050        crate::testing::spawn_kernel_and_run_sync(|_locked, current_task| {
2051            let mut socket = NetlinkSocketInner::new(NetlinkFamily::Route);
2052            let too_big = u32::try_from(SOCKET_MAX_SIZE).unwrap() + 1;
2053
2054            // SO_SNDBUF and SO_RCVBUF clamp the size to the limit.
2055            socket
2056                .setsockopt(current_task, SOL_SOCKET, SO_SNDBUF, sock_opt_value(too_big))
2057                .expect("setsockopt should succeed");
2058            socket
2059                .setsockopt(current_task, SOL_SOCKET, SO_RCVBUF, sock_opt_value(too_big))
2060                .expect("setsockopt should succeed");
2061            let expected_max = u32::try_from(SOCKET_MAX_SIZE).unwrap();
2062            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_SNDBUF), expected_max);
2063            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_RCVBUF), expected_max);
2064
2065            // SO_SNDBUFFORCE and SO_RCVBUFFORCE do not.
2066            // Note that the applied value is two times the requested value.
2067            socket
2068                .setsockopt(current_task, SOL_SOCKET, SO_SNDBUFFORCE, sock_opt_value(too_big))
2069                .expect("setsockopt should succeed");
2070            socket
2071                .setsockopt(current_task, SOL_SOCKET, SO_RCVBUFFORCE, sock_opt_value(too_big))
2072                .expect("setsockopt should succeed");
2073            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_SNDBUF), too_big * 2);
2074            assert_eq!(getsockopt_u32(&socket, SOL_SOCKET, SO_RCVBUF), too_big * 2);
2075        })
2076        .await;
2077    }
2078}