starnix_core/vfs/socket/
socket_unix.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::bpf::context::EbpfRunContextImpl;
6use crate::bpf::fs::get_bpf_object;
7use crate::mm::MemoryAccessorExt;
8use crate::security;
9use crate::task::{CurrentTask, EventHandler, WaitCanceler, WaitQueue, Waiter};
10use crate::vfs::buffers::{
11    AncillaryData, InputBuffer, MessageQueue, MessageReadInfo, OutputBuffer, UnixControlData,
12};
13use crate::vfs::socket::{
14    AcceptQueue, DEFAULT_LISTEN_BACKLOG, SockOptValue, Socket, SocketAddress, SocketDomain,
15    SocketFile, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer, SocketProtocol,
16    SocketShutdownFlags, SocketType,
17};
18use crate::vfs::{
19    CheckAccessReason, FdNumber, FileHandle, FileObject, FsNodeHandle, FsStr, LookupContext,
20    Message, UcredPtr, default_ioctl,
21};
22use ebpf::{
23    BpfProgramContext, BpfValue, CbpfConfig, DataWidth, EbpfProgram, Packet, ProgramArgument, Type,
24};
25use ebpf_api::{
26    LoadBytesBase, PacketWithLoadBytes, PinnedMap, ProgramType, SOCKET_FILTER_CBPF_CONFIG,
27    SOCKET_FILTER_SK_BUF_TYPE, SocketCookieContext, SocketFilterProgramContext, SocketUidContext,
28};
29use starnix_logging::track_stub;
30use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, Unlocked};
31use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
32use starnix_uapi::errors::{EACCES, EINTR, EPERM, Errno};
33use starnix_uapi::file_mode::Access;
34use starnix_uapi::open_flags::OpenFlags;
35use starnix_uapi::user_address::{UserAddress, UserRef};
36use starnix_uapi::vfs::FdEvents;
37use starnix_uapi::{
38    __sk_buff, FIONREAD, SO_ACCEPTCONN, SO_ATTACH_BPF, SO_BROADCAST, SO_ERROR, SO_KEEPALIVE,
39    SO_LINGER, SO_NO_CHECK, SO_PASSCRED, SO_PASSSEC, SO_PEERCRED, SO_PEERSEC, SO_RCVBUF,
40    SO_REUSEADDR, SO_REUSEPORT, SO_SNDBUF, SOL_SOCKET, errno, error, gid_t, socklen_t, uapi, ucred,
41    uid_t,
42};
43use std::sync::Arc;
44use zerocopy::IntoBytes;
45
46// From unix.go in gVisor.
47const SOCKET_MIN_SIZE: usize = 4 << 10;
48const SOCKET_DEFAULT_SIZE: usize = 208 << 10;
49const SOCKET_MAX_SIZE: usize = 4 << 20;
50
51/// The data of a socket is stored in the "Inner" struct. Because both ends have separate locks,
52/// care must be taken to avoid taking both locks since there is no way to tell what order to
53/// take them in.
54///
55/// When writing, data is buffered in the "other" end of the socket's Inner.MessageQueue:
56///
57///            UnixSocket end #1          UnixSocket end #2
58///            +---------------+          +---------------+
59///            |               |          |   +-------+   |
60///   Writes -------------------------------->| Inner |------> Reads
61///            |               |          |   +-------+   |
62///            |   +-------+   |          |               |
63///   Reads <------| Inner |<-------------------------------- Writes
64///            |   +-------+   |          |               |
65///            +---------------+          +---------------+
66///
67pub struct UnixSocket {
68    inner: Mutex<UnixSocketInner>,
69}
70
71fn downcast_socket_to_unix(socket: &Socket) -> &UnixSocket {
72    // It is a programing error if we are downcasting
73    // a different type of socket as sockets from different families
74    // should not communicate, so unwrapping here
75    // will let us know that.
76    socket.downcast_socket::<UnixSocket>().unwrap()
77}
78
79enum UnixSocketState {
80    /// The socket has not been connected.
81    Disconnected,
82
83    /// The socket has had `listen` called and can accept incoming connections.
84    Listening(AcceptQueue),
85
86    /// The socket is connected to a peer.
87    Connected(SocketHandle),
88
89    /// The socket is closed.
90    Closed,
91}
92
93struct UnixSocketInner {
94    /// The `MessageQueue` that contains messages sent to this socket.
95    messages: MessageQueue,
96
97    /// This queue will be notified on reads, writes, disconnects etc.
98    waiters: WaitQueue,
99
100    /// The address that this socket has been bound to, if it has been bound.
101    address: Option<SocketAddress>,
102
103    /// Whether this end of the socket has been shut down and can no longer receive message. It is
104    /// still possible to send messages to the peer, if it exists and hasn't also been shut down.
105    is_shutdown: bool,
106
107    /// Whether the peer had unread data when it was closed. In this case, reads should return
108    /// ECONNRESET instead of 0 (eof).
109    peer_closed_with_unread_data: bool,
110
111    /// See SO_LINGER.
112    pub linger: uapi::linger,
113
114    /// See SO_PASSCRED.
115    pub passcred: bool,
116
117    /// See SO_PASSSEC.
118    pub passsec: bool,
119
120    /// See SO_BROADCAST.
121    pub broadcast: bool,
122
123    /// See SO_NO_CHECK.
124    pub no_check: bool,
125
126    /// See SO_REUSEPORT.
127    pub reuseport: bool,
128
129    /// See SO_REUSEADDR.
130    pub reuseaddr: bool,
131
132    /// See SO_KEEPALIVE.
133    pub keepalive: bool,
134
135    /// See SO_ATTACH_BPF.
136    bpf_program: Option<UnixSocketFilter>,
137
138    /// Unix credentials of the owner of this socket, for SO_PEERCRED.
139    credentials: Option<ucred>,
140
141    /// Socket state: a queue if this is a listening socket, or a peer if this is a connected
142    /// socket.
143    state: UnixSocketState,
144}
145
146impl UnixSocket {
147    pub fn new(_socket_type: SocketType) -> UnixSocket {
148        UnixSocket {
149            inner: Mutex::new(UnixSocketInner {
150                messages: MessageQueue::new(SOCKET_DEFAULT_SIZE),
151                waiters: WaitQueue::default(),
152                address: None,
153                is_shutdown: false,
154                peer_closed_with_unread_data: false,
155                linger: uapi::linger::default(),
156                passcred: false,
157                passsec: false,
158                broadcast: false,
159                no_check: false,
160                reuseaddr: false,
161                reuseport: false,
162                keepalive: false,
163                bpf_program: None,
164                credentials: None,
165                state: UnixSocketState::Disconnected,
166            }),
167        }
168    }
169
170    /// Creates a pair of connected sockets.
171    ///
172    /// # Parameters
173    /// - `domain`: The domain of the socket (e.g., `AF_UNIX`).
174    /// - `socket_type`: The type of the socket (e.g., `SOCK_STREAM`).
175    pub fn new_pair<L>(
176        locked: &mut Locked<L>,
177        current_task: &CurrentTask,
178        domain: SocketDomain,
179        socket_type: SocketType,
180        open_flags: OpenFlags,
181    ) -> Result<(FileHandle, FileHandle), Errno>
182    where
183        L: LockEqualOrBefore<FileOpsCore>,
184    {
185        let credentials = current_task.current_ucred();
186        let left = Socket::new(
187            locked,
188            current_task,
189            domain,
190            socket_type,
191            SocketProtocol::default(),
192            /* kernel_private = */ false,
193        )?;
194        let right = Socket::new(
195            locked,
196            current_task,
197            domain,
198            socket_type,
199            SocketProtocol::default(),
200            /* kernel_private = */ false,
201        )?;
202        downcast_socket_to_unix(&left).lock().state = UnixSocketState::Connected(right.clone());
203        downcast_socket_to_unix(&left).lock().credentials = Some(credentials.clone());
204        downcast_socket_to_unix(&right).lock().state = UnixSocketState::Connected(left.clone());
205        downcast_socket_to_unix(&right).lock().credentials = Some(credentials);
206        let left = SocketFile::from_socket(
207            locked,
208            current_task,
209            left,
210            open_flags,
211            /* kernel_private= */ false,
212        )?;
213        let right = SocketFile::from_socket(
214            locked,
215            current_task,
216            right,
217            open_flags,
218            /* kernel_private= */ false,
219        )?;
220        let left_socket = SocketFile::get_from_file(&left)?;
221        let right_socket = SocketFile::get_from_file(&right)?;
222
223        security::socket_socketpair(current_task, left_socket, right_socket)?;
224        Ok((left, right))
225    }
226
227    fn connect_stream(
228        &self,
229        locked: &mut Locked<FileOpsCore>,
230        socket: &SocketHandle,
231        current_task: &CurrentTask,
232        peer: &SocketHandle,
233    ) -> Result<(), Errno> {
234        // Only hold one lock at a time until we make sure the lock ordering
235        // is right: client before listener
236        match downcast_socket_to_unix(peer).lock().state {
237            UnixSocketState::Listening(_) => {}
238            _ => return error!(ECONNREFUSED),
239        }
240
241        let mut client = downcast_socket_to_unix(socket).lock();
242        match client.state {
243            UnixSocketState::Disconnected => {}
244            UnixSocketState::Connected(_) => return error!(EISCONN),
245            _ => return error!(EINVAL),
246        };
247
248        let mut listener = downcast_socket_to_unix(peer).lock();
249        // Must check this again because we released the listener lock for a moment
250        let queue = match &listener.state {
251            UnixSocketState::Listening(queue) => queue,
252            _ => return error!(ECONNREFUSED),
253        };
254
255        self.check_type_for_connect(socket, peer, &listener.address)?;
256
257        if queue.sockets.len() > queue.backlog {
258            return error!(EAGAIN);
259        }
260
261        let server = Socket::new(
262            locked,
263            current_task,
264            peer.domain,
265            peer.socket_type,
266            SocketProtocol::default(),
267            /* kernel_private = */ true,
268        )?;
269        security::unix_stream_connect(current_task, socket, peer, &server)?;
270        client.state = UnixSocketState::Connected(server.clone());
271        client.credentials = Some(current_task.current_ucred());
272        {
273            let mut server = downcast_socket_to_unix(&server).lock();
274            server.state = UnixSocketState::Connected(socket.clone());
275            server.address = listener.address.clone();
276            server.messages.set_capacity(listener.messages.capacity())?;
277            server.credentials = listener.credentials.clone();
278            server.passcred = listener.passcred;
279            server.passsec = listener.passsec;
280        }
281
282        // We already checked that the socket is in Listening state...but the borrow checker cannot
283        // be convinced that it's ok to combine these checks
284        let queue = match listener.state {
285            UnixSocketState::Listening(ref mut queue) => queue,
286            _ => panic!("something changed the server socket state while I held a lock on it"),
287        };
288        queue.sockets.push_back(server);
289        listener.waiters.notify_fd_events(FdEvents::POLLIN);
290        Ok(())
291    }
292
293    fn connect_datagram(&self, socket: &SocketHandle, peer: &SocketHandle) -> Result<(), Errno> {
294        {
295            let unix_socket = socket.downcast_socket::<UnixSocket>().unwrap();
296            let peer_inner = unix_socket.lock();
297            self.check_type_for_connect(socket, peer, &peer_inner.address)?;
298        }
299        let unix_socket = socket.downcast_socket::<UnixSocket>().unwrap();
300        unix_socket.lock().state = UnixSocketState::Connected(peer.clone());
301        Ok(())
302    }
303
304    pub fn check_type_for_connect(
305        &self,
306        socket: &Socket,
307        peer: &Socket,
308        peer_address: &Option<SocketAddress>,
309    ) -> Result<(), Errno> {
310        if socket.domain != peer.domain || socket.socket_type != peer.socket_type {
311            // According to ConnectWithWrongType in accept_bind_test, abstract
312            // UNIX domain sockets return ECONNREFUSED rather than EPROTOTYPE.
313            if let Some(address) = peer_address {
314                if address.is_abstract_unix() {
315                    return error!(ECONNREFUSED);
316                }
317            }
318            return error!(EPROTOTYPE);
319        }
320        Ok(())
321    }
322
323    /// Locks and returns the inner state of the Socket.
324    fn lock(&self) -> starnix_sync::MutexGuard<'_, UnixSocketInner> {
325        self.inner.lock()
326    }
327
328    fn is_listening(&self, _socket: &Socket) -> bool {
329        matches!(self.lock().state, UnixSocketState::Listening(_))
330    }
331
332    fn get_receive_capacity(&self) -> usize {
333        self.lock().messages.capacity()
334    }
335
336    fn set_receive_capacity(&self, requested_capacity: usize) {
337        self.lock().set_capacity(requested_capacity);
338    }
339
340    fn get_send_capacity(&self) -> usize {
341        let peer = {
342            if let Some(peer) = self.lock().peer() {
343                peer.clone()
344            } else {
345                return 0;
346            }
347        };
348        let unix_socket = downcast_socket_to_unix(&peer);
349        let capacity = unix_socket.lock().messages.capacity();
350        capacity
351    }
352
353    fn set_send_capacity(&self, requested_capacity: usize) {
354        let peer = {
355            if let Some(peer) = self.lock().peer() {
356                peer.clone()
357            } else {
358                return;
359            }
360        };
361        let unix_socket = downcast_socket_to_unix(&peer);
362        unix_socket.lock().set_capacity(requested_capacity);
363    }
364
365    fn get_linger(&self) -> uapi::linger {
366        let inner = self.lock();
367        inner.linger
368    }
369
370    fn set_linger(&self, linger: uapi::linger) {
371        let mut inner = self.lock();
372        inner.linger = linger;
373    }
374
375    fn get_passcred(&self) -> bool {
376        let inner = self.lock();
377        inner.passcred
378    }
379
380    fn set_passcred(&self, passcred: bool) {
381        let mut inner = self.lock();
382        inner.passcred = passcred;
383    }
384
385    fn get_passsec(&self) -> bool {
386        let inner = self.lock();
387        inner.passsec
388    }
389
390    fn set_passsec(&self, passsec: bool) {
391        let mut inner = self.lock();
392        inner.passsec = passsec;
393    }
394
395    fn get_broadcast(&self) -> bool {
396        let inner = self.lock();
397        inner.broadcast
398    }
399
400    fn set_broadcast(&self, broadcast: bool) {
401        let mut inner = self.lock();
402        inner.broadcast = broadcast;
403    }
404
405    fn get_no_check(&self) -> bool {
406        let inner = self.lock();
407        inner.no_check
408    }
409
410    fn set_no_check(&self, no_check: bool) {
411        let mut inner = self.lock();
412        inner.no_check = no_check;
413    }
414
415    fn get_reuseaddr(&self) -> bool {
416        let inner = self.lock();
417        inner.reuseaddr
418    }
419
420    fn set_reuseaddr(&self, reuseaddr: bool) {
421        let mut inner = self.lock();
422        inner.reuseaddr = reuseaddr;
423    }
424
425    fn get_reuseport(&self) -> bool {
426        let inner = self.lock();
427        inner.reuseport
428    }
429
430    fn set_reuseport(&self, reuseport: bool) {
431        let mut inner = self.lock();
432        inner.reuseport = reuseport;
433    }
434
435    fn get_keepalive(&self) -> bool {
436        let inner = self.lock();
437        inner.keepalive
438    }
439
440    fn set_keepalive(&self, keepalive: bool) {
441        let mut inner = self.lock();
442        inner.keepalive = keepalive;
443    }
444
445    fn set_bpf_program(&self, program: Option<UnixSocketFilter>) {
446        let mut inner = self.lock();
447        inner.bpf_program = program;
448    }
449
450    fn peer_cred(&self) -> Option<ucred> {
451        let peer = {
452            let inner = self.lock();
453            inner.peer().cloned()
454        };
455        if let Some(peer) = peer {
456            let unix_socket = downcast_socket_to_unix(&peer);
457            let unix_socket = unix_socket.lock();
458            unix_socket.credentials.clone()
459        } else {
460            None
461        }
462    }
463
464    pub fn bind_socket_to_node(
465        &self,
466        socket: &SocketHandle,
467        address: SocketAddress,
468        node: &FsNodeHandle,
469    ) -> Result<(), Errno> {
470        let unix_socket = downcast_socket_to_unix(socket);
471        let mut inner = unix_socket.lock();
472        inner.bind(address)?;
473        node.set_bound_socket(socket.clone());
474        Ok(())
475    }
476}
477
478impl SocketOps for UnixSocket {
479    fn connect(
480        &self,
481        locked: &mut Locked<FileOpsCore>,
482        socket: &SocketHandle,
483        current_task: &CurrentTask,
484        peer: SocketPeer,
485    ) -> Result<(), Errno> {
486        let peer = match peer {
487            SocketPeer::Handle(handle) => handle,
488            SocketPeer::Address(_) => return error!(EINVAL),
489        };
490        match socket.socket_type {
491            SocketType::Stream | SocketType::SeqPacket => {
492                self.connect_stream(locked, socket, current_task, &peer)
493            }
494            SocketType::Datagram | SocketType::Raw => self.connect_datagram(socket, &peer),
495            _ => error!(EINVAL),
496        }
497    }
498
499    fn listen(
500        &self,
501        _locked: &mut Locked<FileOpsCore>,
502        socket: &Socket,
503        backlog: i32,
504        credentials: ucred,
505    ) -> Result<(), Errno> {
506        match socket.socket_type {
507            SocketType::Stream | SocketType::SeqPacket => {}
508            _ => return error!(EOPNOTSUPP),
509        }
510        let mut inner = self.lock();
511        inner.credentials = Some(credentials);
512        let is_bound = inner.address.is_some();
513        let backlog = if backlog < 0 { DEFAULT_LISTEN_BACKLOG } else { backlog as usize };
514        match &mut inner.state {
515            UnixSocketState::Disconnected if is_bound => {
516                inner.state = UnixSocketState::Listening(AcceptQueue::new(backlog));
517                Ok(())
518            }
519            UnixSocketState::Listening(queue) => {
520                queue.set_backlog(backlog)?;
521                Ok(())
522            }
523            _ => error!(EINVAL),
524        }
525    }
526
527    fn accept(
528        &self,
529        _locked: &mut Locked<FileOpsCore>,
530        socket: &Socket,
531        _current_task: &CurrentTask,
532    ) -> Result<SocketHandle, Errno> {
533        match socket.socket_type {
534            SocketType::Stream | SocketType::SeqPacket => {}
535            _ => return error!(EOPNOTSUPP),
536        }
537        let mut inner = self.lock();
538        let queue = match &mut inner.state {
539            UnixSocketState::Listening(queue) => queue,
540            _ => return error!(EINVAL),
541        };
542        queue.sockets.pop_front().ok_or_else(|| errno!(EAGAIN))
543    }
544
545    fn bind(
546        &self,
547        _locked: &mut Locked<FileOpsCore>,
548        _socket: &Socket,
549        _current_task: &CurrentTask,
550        socket_address: SocketAddress,
551    ) -> Result<(), Errno> {
552        match socket_address {
553            SocketAddress::Unix(_) => {}
554            _ => return error!(EINVAL),
555        }
556        self.lock().bind(socket_address)
557    }
558
559    fn read(
560        &self,
561        _locked: &mut Locked<FileOpsCore>,
562        socket: &Socket,
563        _current_task: &CurrentTask,
564        data: &mut dyn OutputBuffer,
565        flags: SocketMessageFlags,
566    ) -> Result<MessageReadInfo, Errno> {
567        let info = self.lock().read(data, socket.socket_type, flags)?;
568        if info.bytes_read > 0 {
569            let peer = {
570                let inner = self.lock();
571                inner.peer().cloned()
572            };
573            if let Some(socket) = peer {
574                let unix_socket_peer = socket.downcast_socket::<UnixSocket>();
575                if let Some(socket) = unix_socket_peer {
576                    socket.lock().waiters.notify_fd_events(FdEvents::POLLOUT);
577                }
578            }
579        }
580        Ok(info)
581    }
582
583    fn write(
584        &self,
585        locked: &mut Locked<FileOpsCore>,
586        socket: &Socket,
587        current_task: &CurrentTask,
588        data: &mut dyn InputBuffer,
589        dest_address: &mut Option<SocketAddress>,
590        ancillary_data: &mut Vec<AncillaryData>,
591    ) -> Result<usize, Errno> {
592        let (connected_peer, local_address, creds) = {
593            let inner = self.lock();
594            (inner.peer().map(|p| p.clone()), inner.address.clone(), inner.credentials.clone())
595        };
596
597        let peer = match (connected_peer, dest_address, socket.socket_type) {
598            (Some(peer), None, _) => peer,
599            (None, Some(_), SocketType::Stream) => return error!(EOPNOTSUPP),
600            (None, Some(_), SocketType::SeqPacket) => return error!(ENOTCONN),
601            (Some(_), Some(_), _) => return error!(EISCONN),
602            (_, Some(SocketAddress::Unix(name)), _) => {
603                resolve_unix_socket_address(locked, current_task, name.as_ref())?
604            }
605            (_, Some(_), _) => return error!(EINVAL),
606            (None, None, _) => return error!(ENOTCONN),
607        };
608
609        if socket.socket_type == SocketType::Datagram {
610            security::unix_may_send(current_task, socket, &peer)?;
611        }
612
613        let unix_socket = downcast_socket_to_unix(&peer);
614        let mut peer = unix_socket.lock();
615        if peer.passcred {
616            let creds = creds.unwrap_or_else(|| current_task.current_ucred());
617            ancillary_data.push(AncillaryData::Unix(UnixControlData::Credentials(creds)));
618        }
619        if peer.passsec {
620            // TODO: https://fxbug.dev/364568855 - Store the opaque LSM property value, and expand
621            // it to a string upon readmsg.
622            let context = security::socket_getpeersec_dgram(current_task, socket);
623            ancillary_data.push(AncillaryData::Unix(UnixControlData::Security(context.into())));
624        }
625        peer.write(locked, current_task, data, local_address, ancillary_data, socket.socket_type)
626    }
627
628    fn wait_async(
629        &self,
630        _locked: &mut Locked<FileOpsCore>,
631        _socket: &Socket,
632        _current_task: &CurrentTask,
633        waiter: &Waiter,
634        events: FdEvents,
635        handler: EventHandler,
636    ) -> WaitCanceler {
637        self.lock().waiters.wait_async_fd_events(waiter, events, handler)
638    }
639
640    fn query_events(
641        &self,
642        _locked: &mut Locked<FileOpsCore>,
643        _socket: &Socket,
644        _current_task: &CurrentTask,
645    ) -> Result<FdEvents, Errno> {
646        // Note that self.lock() must be dropped before acquiring peer.inner.lock() to avoid
647        // potential deadlocks.
648        let (mut events, peer) = {
649            let inner = self.lock();
650
651            let mut events = FdEvents::empty();
652            let local_events = inner.messages.query_events();
653            // From our end's message queue we only care about POLLIN (whether we have data stored
654            // that's readable). POLLOUT is based on whether the peer end has room in its buffer.
655            if local_events.contains(FdEvents::POLLIN) {
656                events = FdEvents::POLLIN;
657            }
658
659            if inner.is_shutdown {
660                events |= FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP;
661            }
662
663            match &inner.state {
664                UnixSocketState::Listening(queue) => {
665                    if !queue.sockets.is_empty() {
666                        events |= FdEvents::POLLIN;
667                    }
668                }
669                UnixSocketState::Closed => {
670                    events |= FdEvents::POLLHUP;
671                }
672                _ => {}
673            }
674
675            (events, inner.peer().cloned())
676        };
677
678        // Check the peer (outside of our lock) to see if it can accept data written from our end.
679        if let Some(peer) = peer {
680            let unix_socket = downcast_socket_to_unix(&peer);
681            let peer_inner = unix_socket.lock();
682            let peer_events = peer_inner.messages.query_events();
683            if peer_events.contains(FdEvents::POLLOUT) {
684                events |= FdEvents::POLLOUT;
685            }
686        }
687
688        Ok(events)
689    }
690
691    /// Shuts down this socket according to how, preventing any future reads and/or writes.
692    ///
693    /// Used by the shutdown syscalls.
694    fn shutdown(
695        &self,
696        _locked: &mut Locked<FileOpsCore>,
697        _socket: &Socket,
698        how: SocketShutdownFlags,
699    ) -> Result<(), Errno> {
700        let peer = {
701            let mut inner = self.lock();
702            let peer = inner.peer().ok_or_else(|| errno!(ENOTCONN))?.clone();
703            if how.contains(SocketShutdownFlags::READ) {
704                inner.shutdown_one_end();
705            }
706            peer
707        };
708        if how.contains(SocketShutdownFlags::WRITE) {
709            let unix_socket = downcast_socket_to_unix(&peer);
710            unix_socket.lock().shutdown_one_end();
711        }
712        Ok(())
713    }
714
715    /// Close this socket.
716    ///
717    /// Called by SocketFile when the file descriptor that is holding this
718    /// socket is closed.
719    ///
720    /// Close differs from shutdown in two ways. First, close will call
721    /// mark_peer_closed_with_unread_data if this socket has unread data,
722    /// which changes how read() behaves on that socket. Second, close
723    /// transitions the internal state of this socket to Closed, which breaks
724    /// the reference cycle that exists in the connected state.
725    fn close(
726        &self,
727        _locked: &mut Locked<FileOpsCore>,
728        _current_task: &CurrentTask,
729        socket: &Socket,
730    ) {
731        let (maybe_peer, has_unread) = {
732            let mut inner = self.lock();
733            let maybe_peer = inner.peer().map(Arc::clone);
734            inner.shutdown_one_end();
735            (maybe_peer, !inner.messages.is_empty())
736        };
737        // If this is a connected socket type, also shut down the connected peer.
738        if socket.socket_type == SocketType::Stream || socket.socket_type == SocketType::SeqPacket {
739            if let Some(peer) = maybe_peer {
740                let unix_socket = downcast_socket_to_unix(&peer);
741
742                let mut peer_inner = unix_socket.lock();
743                if has_unread {
744                    peer_inner.peer_closed_with_unread_data = true;
745                }
746                peer_inner.shutdown_one_end();
747            }
748        }
749        self.lock().state = UnixSocketState::Closed;
750    }
751
752    /// Returns the name of this socket.
753    ///
754    /// The name is derived from the address and domain. A socket
755    /// will always have a name, even if it is not bound to an address.
756    fn getsockname(
757        &self,
758        _locked: &mut Locked<FileOpsCore>,
759        socket: &Socket,
760    ) -> Result<SocketAddress, Errno> {
761        let inner = self.lock();
762        if let Some(address) = &inner.address {
763            Ok(address.clone())
764        } else {
765            Ok(SocketAddress::default_for_domain(socket.domain))
766        }
767    }
768
769    /// Returns the name of the peer of this socket, if such a peer exists.
770    ///
771    /// Returns an error if the socket is not connected.
772    fn getpeername(
773        &self,
774        locked: &mut Locked<FileOpsCore>,
775        _socket: &Socket,
776    ) -> Result<SocketAddress, Errno> {
777        let peer = self.lock().peer().ok_or_else(|| errno!(ENOTCONN))?.clone();
778        peer.getsockname(locked)
779    }
780
781    fn setsockopt(
782        &self,
783        _locked: &mut Locked<FileOpsCore>,
784        _socket: &Socket,
785        current_task: &CurrentTask,
786        level: u32,
787        optname: u32,
788        optval: SockOptValue,
789    ) -> Result<(), Errno> {
790        match level {
791            SOL_SOCKET => match optname {
792                SO_SNDBUF => {
793                    let requested_capacity: socklen_t = optval.read(current_task)?;
794                    // See StreamUnixSocketPairTest.SetSocketSendBuf for why we multiply by 2 here.
795                    self.set_send_capacity(requested_capacity as usize * 2);
796                }
797                SO_RCVBUF => {
798                    let requested_capacity: socklen_t = optval.read(current_task)?;
799                    self.set_receive_capacity(requested_capacity as usize);
800                }
801                SO_LINGER => {
802                    let mut linger: uapi::linger = optval.read(current_task)?;
803                    if linger.l_onoff != 0 {
804                        linger.l_onoff = 1;
805                    }
806                    self.set_linger(linger);
807                }
808                SO_PASSCRED => {
809                    let passcred: u32 = optval.read(current_task)?;
810                    self.set_passcred(passcred != 0);
811                }
812                SO_PASSSEC => {
813                    let passsec: u32 = optval.read(current_task)?;
814                    self.set_passsec(passsec != 0);
815                }
816                SO_BROADCAST => {
817                    let broadcast: u32 = optval.read(current_task)?;
818                    self.set_broadcast(broadcast != 0);
819                }
820                SO_NO_CHECK => {
821                    let no_check: u32 = optval.read(current_task)?;
822                    self.set_no_check(no_check != 0);
823                }
824                SO_REUSEADDR => {
825                    let reuseaddr: u32 = optval.read(current_task)?;
826                    self.set_reuseaddr(reuseaddr != 0);
827                }
828                SO_REUSEPORT => {
829                    let reuseport: u32 = optval.read(current_task)?;
830                    self.set_reuseport(reuseport != 0);
831                }
832                SO_KEEPALIVE => {
833                    let keepalive: u32 = optval.read(current_task)?;
834                    self.set_keepalive(keepalive != 0);
835                }
836                SO_ATTACH_BPF => {
837                    let fd: FdNumber = optval.read(current_task)?;
838                    let object = get_bpf_object(current_task, fd)?;
839                    let program = object.as_program()?;
840
841                    let linked_program = program.link(ProgramType::SocketFilter)?;
842
843                    self.set_bpf_program(Some(linked_program));
844                }
845                _ => return error!(ENOPROTOOPT),
846            },
847            _ => return error!(ENOPROTOOPT),
848        }
849        Ok(())
850    }
851
852    fn getsockopt(
853        &self,
854        _locked: &mut Locked<FileOpsCore>,
855        socket: &Socket,
856        current_task: &CurrentTask,
857        level: u32,
858        optname: u32,
859        _optlen: u32,
860    ) -> Result<Vec<u8>, Errno> {
861        match level {
862            SOL_SOCKET => match optname {
863                SO_PEERCRED => Ok(UcredPtr::into_bytes(
864                    current_task,
865                    self.peer_cred().unwrap_or(ucred { pid: 0, uid: uid_t::MAX, gid: gid_t::MAX }),
866                )
867                .map_err(|_| errno!(EINVAL))?),
868                SO_PEERSEC => match socket.socket_type {
869                    SocketType::Stream => security::socket_getpeersec_stream(current_task, socket),
870                    _ => error!(ENOPROTOOPT),
871                },
872                SO_ACCEPTCONN =>
873                {
874                    #[allow(clippy::bool_to_int_with_if)]
875                    Ok(if self.is_listening(socket) { 1u32 } else { 0u32 }.to_ne_bytes().to_vec())
876                }
877                SO_SNDBUF => Ok((self.get_send_capacity() as socklen_t).to_ne_bytes().to_vec()),
878                SO_RCVBUF => Ok((self.get_receive_capacity() as socklen_t).to_ne_bytes().to_vec()),
879                SO_LINGER => Ok(self.get_linger().as_bytes().to_vec()),
880                SO_PASSCRED => Ok((self.get_passcred() as u32).as_bytes().to_vec()),
881                SO_PASSSEC => Ok((self.get_passsec() as u32).as_bytes().to_vec()),
882                SO_BROADCAST => Ok((self.get_broadcast() as u32).as_bytes().to_vec()),
883                SO_NO_CHECK => Ok((self.get_no_check() as u32).as_bytes().to_vec()),
884                SO_REUSEADDR => Ok((self.get_reuseaddr() as u32).as_bytes().to_vec()),
885                SO_REUSEPORT => Ok((self.get_reuseport() as u32).as_bytes().to_vec()),
886                SO_KEEPALIVE => Ok((self.get_keepalive() as u32).as_bytes().to_vec()),
887                SO_ERROR => Ok((0u32).as_bytes().to_vec()),
888                _ => error!(ENOPROTOOPT),
889            },
890            _ => error!(ENOPROTOOPT),
891        }
892    }
893
894    fn ioctl(
895        &self,
896        locked: &mut Locked<Unlocked>,
897        socket: &Socket,
898        file: &FileObject,
899        current_task: &CurrentTask,
900        request: u32,
901        arg: SyscallArg,
902    ) -> Result<SyscallResult, Errno> {
903        let user_addr = UserAddress::from(arg);
904        match request {
905            FIONREAD if socket.socket_type == SocketType::Stream => {
906                let length: i32 =
907                    self.lock().messages.len().try_into().map_err(|_| errno!(EINVAL))?;
908                current_task.write_object(UserRef::<i32>::new(user_addr), &length)?;
909                Ok(SUCCESS)
910            }
911            _ => default_ioctl(file, locked, current_task, request, arg),
912        }
913    }
914}
915
916impl UnixSocketInner {
917    pub fn bind(&mut self, socket_address: SocketAddress) -> Result<(), Errno> {
918        if self.address.is_some() {
919            return error!(EINVAL);
920        }
921        self.address = Some(socket_address);
922        Ok(())
923    }
924
925    fn set_capacity(&mut self, requested_capacity: usize) {
926        let capacity = requested_capacity.clamp(SOCKET_MIN_SIZE, SOCKET_MAX_SIZE);
927        let capacity = std::cmp::max(capacity, self.messages.len());
928        // We have validated capacity sufficiently that set_capacity should always succeed.
929        self.messages.set_capacity(capacity).unwrap();
930    }
931
932    /// Returns the socket that is connected to this socket, if such a peer exists. Returns
933    /// ENOTCONN otherwise.
934    fn peer(&self) -> Option<&SocketHandle> {
935        match &self.state {
936            UnixSocketState::Connected(peer) => Some(peer),
937            _ => None,
938        }
939    }
940
941    /// Reads the the contents of this socket into `InputBuffer`.
942    ///
943    /// Will stop reading if a message with ancillary data is encountered (after the message with
944    /// ancillary data has been read).
945    ///
946    /// # Parameters
947    /// - `data`: The `OutputBuffer` to write the data to.
948    ///
949    /// Returns the number of bytes that were read into the buffer, and any ancillary data that was
950    /// read from the socket.
951    fn read(
952        &mut self,
953        data: &mut dyn OutputBuffer,
954        socket_type: SocketType,
955        flags: SocketMessageFlags,
956    ) -> Result<MessageReadInfo, Errno> {
957        let mut info = if socket_type == SocketType::Stream {
958            if data.available() == 0 {
959                return Ok(MessageReadInfo::default());
960            }
961
962            if flags.contains(SocketMessageFlags::PEEK) {
963                self.messages.peek_stream(data)?
964            } else {
965                self.messages.read_stream(data)?
966            }
967        } else if flags.contains(SocketMessageFlags::PEEK) {
968            self.messages.peek_datagram(data)?
969        } else {
970            self.messages.read_datagram(data)?
971        };
972        if info.message_length == 0 {
973            if self.peer_closed_with_unread_data {
974                // Reset the flag
975                self.peer_closed_with_unread_data = false;
976                return error!(ECONNRESET);
977            }
978            if !self.is_shutdown {
979                return error!(EAGAIN);
980            }
981        }
982
983        // Remove any credentials message, so that it can be moved to the front if passcred is
984        // enabled, or simply be removed if passcred is not enabled.
985        let creds_message;
986        if let Some(index) = info
987            .ancillary_data
988            .iter()
989            .position(|m| matches!(m, AncillaryData::Unix(UnixControlData::Credentials { .. })))
990        {
991            creds_message = info.ancillary_data.remove(index)
992        } else {
993            // If passcred is enabled credentials are returned even if they were not sent.
994            creds_message = AncillaryData::Unix(UnixControlData::unknown_creds());
995        }
996        if self.passcred {
997            // Allow credentials to take priority if they are enabled, so insert at 0.
998            info.ancillary_data.insert(0, creds_message);
999        }
1000
1001        Ok(info)
1002    }
1003
1004    /// Writes the the contents of `InputBuffer` into this socket.
1005    ///
1006    /// # Parameters
1007    /// - `data`: The `InputBuffer` to read the data from.
1008    /// - `ancillary_data`: Any ancillary data to write to the socket. Note that the ancillary data
1009    ///                     will only be written if the entirety of the requested write completes.
1010    ///
1011    /// Returns the number of bytes that were written to the socket.
1012    fn write(
1013        &mut self,
1014        locked: &mut Locked<FileOpsCore>,
1015        current_task: &CurrentTask,
1016        data: &mut dyn InputBuffer,
1017        address: Option<SocketAddress>,
1018        ancillary_data: &mut Vec<AncillaryData>,
1019        socket_type: SocketType,
1020    ) -> Result<usize, Errno> {
1021        if self.is_shutdown {
1022            return error!(EPIPE);
1023        }
1024        let filter = |mut message: Message| {
1025            let Some(bpf_program) = self.bpf_program.as_ref() else {
1026                return Some(message);
1027            };
1028
1029            // TODO(https://fxbug.dev/385015056): Fill in SkBuf.
1030            let mut sk_buf = SkBuf::default();
1031
1032            let mut context = EbpfRunContextImpl::<'_>::new(locked.cast_locked(), current_task);
1033            let s = bpf_program.run(&mut context, &mut sk_buf);
1034            if s == 0 {
1035                None
1036            } else {
1037                message.truncate(s as usize);
1038                Some(message)
1039            }
1040        };
1041        let bytes_written = if socket_type == SocketType::Stream {
1042            self.messages.write_stream_with_filter(data, address, ancillary_data, filter)?
1043        } else {
1044            self.messages.write_datagram_with_filter(data, address, ancillary_data, filter)?
1045        };
1046        if bytes_written > 0 {
1047            self.waiters.notify_fd_events(FdEvents::POLLIN);
1048        }
1049        Ok(bytes_written)
1050    }
1051
1052    fn shutdown_one_end(&mut self) {
1053        self.is_shutdown = true;
1054        self.waiters.notify_fd_events(FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP);
1055    }
1056}
1057
1058pub fn resolve_unix_socket_address<L>(
1059    locked: &mut Locked<L>,
1060    current_task: &CurrentTask,
1061    name: &FsStr,
1062) -> Result<SocketHandle, Errno>
1063where
1064    L: LockEqualOrBefore<FileOpsCore>,
1065{
1066    if name[0] == b'\0' {
1067        current_task.abstract_socket_namespace.lookup(name)
1068    } else {
1069        let mut context = LookupContext::default();
1070        let (parent, basename) =
1071            current_task.lookup_parent_at(locked, &mut context, FdNumber::AT_FDCWD, name)?;
1072        let name =
1073            parent.lookup_child(locked, current_task, &mut context, basename).map_err(|errno| {
1074                if matches!(errno.code, EACCES | EPERM | EINTR) {
1075                    errno
1076                } else {
1077                    errno!(ECONNREFUSED)
1078                }
1079            })?;
1080        name.check_access(
1081            locked,
1082            current_task,
1083            Access::WRITE,
1084            CheckAccessReason::InternalPermissionChecks,
1085        )?;
1086        name.entry.node.bound_socket().map(|s| s.clone()).ok_or_else(|| errno!(ECONNREFUSED))
1087    }
1088}
1089
1090// Packet buffer representation used for eBPF filters.
1091#[repr(C)]
1092#[derive(Default)]
1093struct SkBuf {
1094    sk_buff: __sk_buff,
1095}
1096
1097impl Packet for &mut SkBuf {
1098    fn load(&self, _offset: i32, _width: DataWidth) -> Option<BpfValue> {
1099        // TODO(https://fxbug.dev/385015056): Implement packet access.
1100        None
1101    }
1102}
1103
1104impl<'a, 'b> SocketCookieContext<&'a mut SkBuf> for EbpfRunContextImpl<'b> {
1105    fn get_socket_cookie(&self, _sk_buf: &'a mut SkBuf) -> u64 {
1106        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_get_socket_cookie");
1107        0
1108    }
1109}
1110
1111impl<'a, 'b> SocketUidContext<&'a mut SkBuf> for EbpfRunContextImpl<'b> {
1112    fn get_socket_uid(&self, _sk_buf: &'a mut SkBuf) -> Option<uid_t> {
1113        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_get_socket_uid");
1114        None
1115    }
1116}
1117
1118impl<'a> PacketWithLoadBytes for &'a mut SkBuf {
1119    fn load_bytes_relative(&self, _base: LoadBytesBase, _offset: usize, _buf: &mut [u8]) -> i64 {
1120        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_load_bytes_relative");
1121        -1
1122    }
1123}
1124
1125impl ProgramArgument for &'_ mut SkBuf {
1126    fn get_type() -> &'static Type {
1127        &*SOCKET_FILTER_SK_BUF_TYPE
1128    }
1129}
1130
1131struct UnixSocketEbpfContext {}
1132impl BpfProgramContext for UnixSocketEbpfContext {
1133    type RunContext<'a> = EbpfRunContextImpl<'a>;
1134    type Packet<'a> = &'a mut SkBuf;
1135    type Map = PinnedMap;
1136    const CBPF_CONFIG: &'static CbpfConfig = &SOCKET_FILTER_CBPF_CONFIG;
1137}
1138
1139ebpf_api::ebpf_program_context_type!(UnixSocketEbpfContext, SocketFilterProgramContext);
1140
1141type UnixSocketFilter = EbpfProgram<UnixSocketEbpfContext>;
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146    use crate::mm::MemoryAccessor;
1147    use crate::testing::{map_memory, spawn_kernel_and_run};
1148    use starnix_types::user_buffer::UserBuffer;
1149
1150    #[::fuchsia::test]
1151    async fn test_socket_send_capacity() {
1152        spawn_kernel_and_run(async |locked, current_task| {
1153            let _kernel = current_task.kernel();
1154            let socket = Socket::new(
1155                locked,
1156                &current_task,
1157                SocketDomain::Unix,
1158                SocketType::Stream,
1159                SocketProtocol::default(),
1160                /* kernel_private = */ false,
1161            )
1162            .expect("Failed to create socket.");
1163            socket
1164                .bind(locked, &current_task, SocketAddress::Unix(b"\0".into()))
1165                .expect("Failed to bind socket.");
1166            socket.listen(locked, &current_task, 10).expect("Failed to listen.");
1167            let connecting_socket = Socket::new(
1168                locked,
1169                &current_task,
1170                SocketDomain::Unix,
1171                SocketType::Stream,
1172                SocketProtocol::default(),
1173                /* kernel_private = */ false,
1174            )
1175            .expect("Failed to connect socket.");
1176            connecting_socket
1177                .ops
1178                .connect(
1179                    locked.cast_locked(),
1180                    &connecting_socket,
1181                    &current_task,
1182                    SocketPeer::Handle(socket.clone()),
1183                )
1184                .expect("Failed to connect socket.");
1185            assert_eq!(Ok(FdEvents::POLLIN), socket.query_events(locked, &current_task));
1186            let server_socket = socket.accept(locked, &current_task).unwrap();
1187
1188            let opt_size = std::mem::size_of::<socklen_t>();
1189            let user_address =
1190                map_memory(locked, &current_task, UserAddress::default(), opt_size as u64);
1191            let send_capacity: socklen_t = 4 * 4096;
1192            current_task.write_memory(user_address, &send_capacity.to_ne_bytes()).unwrap();
1193            let user_buffer = UserBuffer { address: user_address, length: opt_size };
1194            server_socket
1195                .setsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, user_buffer.into())
1196                .unwrap();
1197
1198            let opt_bytes =
1199                server_socket.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 0).unwrap();
1200            let retrieved_capacity = socklen_t::from_ne_bytes(opt_bytes.try_into().unwrap());
1201            // Setting SO_SNDBUF actually sets it to double the size
1202            assert_eq!(2 * send_capacity, retrieved_capacity);
1203        })
1204        .await;
1205    }
1206}