Skip to main content

starnix_core/vfs/socket/
socket_unix.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::bpf::context::EbpfRunContextImpl;
6use crate::bpf::fs::get_bpf_object;
7use crate::mm::MemoryAccessorExt;
8use crate::security;
9use crate::task::{CurrentTask, EventHandler, WaitCanceler, WaitQueue, Waiter};
10use crate::vfs::buffers::{
11    AncillaryData, InputBuffer, MessageQueue, MessageReadInfo, OutputBuffer, UnixControlData,
12};
13use crate::vfs::socket::{
14    AcceptQueue, DEFAULT_LISTEN_BACKLOG, SockOptValue, Socket, SocketAddress, SocketDomain,
15    SocketFile, SocketHandle, SocketMessageFlags, SocketOps, SocketPeer, SocketProtocol,
16    SocketShutdownFlags, SocketType,
17};
18use crate::vfs::{
19    CheckAccessReason, FdNumber, FileHandle, FileObject, FsNodeHandle, FsStr, LookupContext,
20    Message, UcredPtr, default_ioctl,
21};
22use ebpf::{
23    BpfProgramContext, BpfValue, CbpfConfig, DataWidth, EbpfProgram, Packet, ProgramArgument, Type,
24};
25use ebpf_api::{
26    LoadBytesBase, PacketWithLoadBytes, PinnedMap, ProgramType, SOCKET_FILTER_CBPF_CONFIG,
27    SOCKET_FILTER_SK_BUF_TYPE, SocketCookieContext, SocketFilterProgramContext, SocketUidContext,
28};
29use starnix_logging::track_stub;
30use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, Unlocked};
31use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
32use starnix_uapi::errors::{EACCES, EINTR, EPERM, Errno};
33use starnix_uapi::file_mode::Access;
34use starnix_uapi::open_flags::OpenFlags;
35use starnix_uapi::user_address::{UserAddress, UserRef};
36use starnix_uapi::vfs::FdEvents;
37use starnix_uapi::{
38    __sk_buff, FIONREAD, SO_ACCEPTCONN, SO_ATTACH_BPF, SO_BROADCAST, SO_ERROR, SO_KEEPALIVE,
39    SO_LINGER, SO_NO_CHECK, SO_PASSCRED, SO_PASSSEC, SO_PEERCRED, SO_PEERSEC, SO_RCVBUF,
40    SO_REUSEADDR, SO_REUSEPORT, SO_SNDBUF, SOL_SOCKET, errno, error, gid_t, socklen_t, uapi, ucred,
41    uid_t,
42};
43use std::sync::Arc;
44use zerocopy::IntoBytes;
45
46// From unix.go in gVisor.
47const SOCKET_MIN_SIZE: usize = 4 << 10;
48const SOCKET_DEFAULT_SIZE: usize = 208 << 10;
49const SOCKET_MAX_SIZE: usize = 4 << 20;
50
51/// The data of a socket is stored in the "Inner" struct. Because both ends have separate locks,
52/// care must be taken to avoid taking both locks since there is no way to tell what order to
53/// take them in.
54///
55/// When writing, data is buffered in the "other" end of the socket's Inner.MessageQueue:
56///
57///            UnixSocket end #1          UnixSocket end #2
58///            +---------------+          +---------------+
59///            |               |          |   +-------+   |
60///   Writes -------------------------------->| Inner |------> Reads
61///            |               |          |   +-------+   |
62///            |   +-------+   |          |               |
63///   Reads <------| Inner |<-------------------------------- Writes
64///            |   +-------+   |          |               |
65///            +---------------+          +---------------+
66///
67pub struct UnixSocket {
68    inner: Mutex<UnixSocketInner>,
69    waiters: WaitQueue,
70}
71
72fn downcast_socket_to_unix(socket: &Socket) -> &UnixSocket {
73    // It is a programing error if we are downcasting
74    // a different type of socket as sockets from different families
75    // should not communicate, so unwrapping here
76    // will let us know that.
77    socket.downcast_socket::<UnixSocket>().unwrap()
78}
79
80enum UnixSocketState {
81    /// The socket has not been connected.
82    Disconnected,
83
84    /// The socket has had `listen` called and can accept incoming connections.
85    Listening(AcceptQueue),
86
87    /// The socket is connected to a peer.
88    Connected(SocketHandle),
89
90    /// The socket is closed.
91    Closed,
92}
93
94struct UnixSocketInner {
95    /// The `MessageQueue` that contains messages sent to this socket.
96    messages: MessageQueue,
97
98    /// The address that this socket has been bound to, if it has been bound.
99    address: Option<SocketAddress>,
100
101    /// Whether this end of the socket has been shut down and can no longer receive message. It is
102    /// still possible to send messages to the peer, if it exists and hasn't also been shut down.
103    is_shutdown: bool,
104
105    /// Whether the peer had unread data when it was closed. In this case, reads should return
106    /// ECONNRESET instead of 0 (eof).
107    peer_closed_with_unread_data: bool,
108
109    /// See SO_LINGER.
110    pub linger: uapi::linger,
111
112    /// See SO_PASSCRED.
113    pub passcred: bool,
114
115    /// See SO_PASSSEC.
116    pub passsec: bool,
117
118    /// See SO_BROADCAST.
119    pub broadcast: bool,
120
121    /// See SO_NO_CHECK.
122    pub no_check: bool,
123
124    /// See SO_REUSEPORT.
125    pub reuseport: bool,
126
127    /// See SO_REUSEADDR.
128    pub reuseaddr: bool,
129
130    /// See SO_KEEPALIVE.
131    pub keepalive: bool,
132
133    /// See SO_ATTACH_BPF.
134    bpf_program: Option<UnixSocketFilter>,
135
136    /// Unix credentials of the owner of this socket, for SO_PEERCRED.
137    credentials: Option<ucred>,
138
139    /// Socket state: a queue if this is a listening socket, or a peer if this is a connected
140    /// socket.
141    state: UnixSocketState,
142}
143
144impl UnixSocket {
145    pub fn new(_socket_type: SocketType) -> UnixSocket {
146        UnixSocket {
147            inner: Mutex::new(UnixSocketInner {
148                messages: MessageQueue::new(SOCKET_DEFAULT_SIZE),
149                address: None,
150                is_shutdown: false,
151                peer_closed_with_unread_data: false,
152                linger: uapi::linger::default(),
153                passcred: false,
154                passsec: false,
155                broadcast: false,
156                no_check: false,
157                reuseaddr: false,
158                reuseport: false,
159                keepalive: false,
160                bpf_program: None,
161                credentials: None,
162                state: UnixSocketState::Disconnected,
163            }),
164            waiters: WaitQueue::default(),
165        }
166    }
167
168    /// Creates a pair of connected sockets.
169    ///
170    /// # Parameters
171    /// - `domain`: The domain of the socket (e.g., `AF_UNIX`).
172    /// - `socket_type`: The type of the socket (e.g., `SOCK_STREAM`).
173    pub fn new_pair<L>(
174        locked: &mut Locked<L>,
175        current_task: &CurrentTask,
176        domain: SocketDomain,
177        socket_type: SocketType,
178        open_flags: OpenFlags,
179    ) -> Result<(FileHandle, FileHandle), Errno>
180    where
181        L: LockEqualOrBefore<FileOpsCore>,
182    {
183        let credentials = current_task.current_ucred();
184        let left = Socket::new(
185            locked,
186            current_task,
187            domain,
188            socket_type,
189            SocketProtocol::default(),
190            /* kernel_private = */ false,
191        )?;
192        let right = Socket::new(
193            locked,
194            current_task,
195            domain,
196            socket_type,
197            SocketProtocol::default(),
198            /* kernel_private = */ false,
199        )?;
200        downcast_socket_to_unix(&left).lock().state = UnixSocketState::Connected(right.clone());
201        downcast_socket_to_unix(&left).lock().credentials = Some(credentials.clone());
202        downcast_socket_to_unix(&right).lock().state = UnixSocketState::Connected(left.clone());
203        downcast_socket_to_unix(&right).lock().credentials = Some(credentials);
204        let left = SocketFile::from_socket(
205            locked,
206            current_task,
207            left,
208            open_flags,
209            /* kernel_private= */ false,
210        )?;
211        let right = SocketFile::from_socket(
212            locked,
213            current_task,
214            right,
215            open_flags,
216            /* kernel_private= */ false,
217        )?;
218        let left_socket = SocketFile::get_from_file(&left)?;
219        let right_socket = SocketFile::get_from_file(&right)?;
220
221        security::socket_socketpair(current_task, left_socket, right_socket)?;
222        Ok((left, right))
223    }
224
225    fn connect_stream(
226        &self,
227        locked: &mut Locked<FileOpsCore>,
228        socket: &SocketHandle,
229        current_task: &CurrentTask,
230        peer: &SocketHandle,
231    ) -> Result<(), Errno> {
232        // Only hold one lock at a time until we make sure the lock ordering
233        // is right: client before listener
234        match downcast_socket_to_unix(peer).lock().state {
235            UnixSocketState::Listening(_) => {}
236            _ => return error!(ECONNREFUSED),
237        }
238
239        let mut client = downcast_socket_to_unix(socket).lock();
240        match client.state {
241            UnixSocketState::Disconnected => {}
242            UnixSocketState::Connected(_) => return error!(EISCONN),
243            _ => return error!(EINVAL),
244        };
245
246        let unix_socket_peer = downcast_socket_to_unix(peer);
247        {
248            let mut listener = unix_socket_peer.lock();
249            // Must check this again because we released the listener lock for a moment
250            let queue = match &listener.state {
251                UnixSocketState::Listening(queue) => queue,
252                _ => return error!(ECONNREFUSED),
253            };
254
255            self.check_type_for_connect(socket, peer, &listener.address)?;
256
257            if queue.sockets.len() > queue.backlog {
258                return error!(EAGAIN);
259            }
260
261            let server = Socket::new(
262                locked,
263                current_task,
264                peer.domain,
265                peer.socket_type,
266                SocketProtocol::default(),
267                /* kernel_private = */ true,
268            )?;
269            security::unix_stream_connect(current_task, socket, peer, &server)?;
270            client.state = UnixSocketState::Connected(server.clone());
271            client.credentials = Some(current_task.current_ucred());
272            {
273                let mut server = downcast_socket_to_unix(&server).lock();
274                server.state = UnixSocketState::Connected(socket.clone());
275                server.address = listener.address.clone();
276                server.messages.set_capacity(listener.messages.capacity())?;
277                server.credentials = listener.credentials.clone();
278                server.passcred = listener.passcred;
279                server.passsec = listener.passsec;
280            }
281
282            // We already checked that the socket is in Listening state...but the borrow checker cannot
283            // be convinced that it's ok to combine these checks
284            let queue = match listener.state {
285                UnixSocketState::Listening(ref mut queue) => queue,
286                _ => panic!("something changed the server socket state while I held a lock on it"),
287            };
288            queue.sockets.push_back(server);
289        }
290        unix_socket_peer.waiters.notify_fd_events(FdEvents::POLLIN);
291        Ok(())
292    }
293
294    fn connect_datagram(&self, socket: &SocketHandle, peer: &SocketHandle) -> Result<(), Errno> {
295        {
296            let unix_socket = socket.downcast_socket::<UnixSocket>().unwrap();
297            let peer_inner = unix_socket.lock();
298            self.check_type_for_connect(socket, peer, &peer_inner.address)?;
299        }
300        let unix_socket = socket.downcast_socket::<UnixSocket>().unwrap();
301        unix_socket.lock().state = UnixSocketState::Connected(peer.clone());
302        Ok(())
303    }
304
305    pub fn check_type_for_connect(
306        &self,
307        socket: &Socket,
308        peer: &Socket,
309        peer_address: &Option<SocketAddress>,
310    ) -> Result<(), Errno> {
311        if socket.domain != peer.domain || socket.socket_type != peer.socket_type {
312            // According to ConnectWithWrongType in accept_bind_test, abstract
313            // UNIX domain sockets return ECONNREFUSED rather than EPROTOTYPE.
314            if let Some(address) = peer_address {
315                if address.is_abstract_unix() {
316                    return error!(ECONNREFUSED);
317                }
318            }
319            return error!(EPROTOTYPE);
320        }
321        Ok(())
322    }
323
324    /// Locks and returns the inner state of the Socket.
325    fn lock(&self) -> starnix_sync::MutexGuard<'_, UnixSocketInner> {
326        self.inner.lock()
327    }
328
329    fn is_listening(&self, _socket: &Socket) -> bool {
330        matches!(self.lock().state, UnixSocketState::Listening(_))
331    }
332
333    fn get_receive_capacity(&self) -> usize {
334        self.lock().messages.capacity()
335    }
336
337    fn set_receive_capacity(&self, requested_capacity: usize) {
338        self.lock().set_capacity(requested_capacity);
339    }
340
341    fn get_send_capacity(&self) -> usize {
342        let peer = {
343            if let Some(peer) = self.lock().peer() {
344                peer.clone()
345            } else {
346                return 0;
347            }
348        };
349        let unix_socket = downcast_socket_to_unix(&peer);
350        let capacity = unix_socket.lock().messages.capacity();
351        capacity
352    }
353
354    fn set_send_capacity(&self, requested_capacity: usize) {
355        let peer = {
356            if let Some(peer) = self.lock().peer() {
357                peer.clone()
358            } else {
359                return;
360            }
361        };
362        let unix_socket = downcast_socket_to_unix(&peer);
363        unix_socket.lock().set_capacity(requested_capacity);
364    }
365
366    fn get_linger(&self) -> uapi::linger {
367        let inner = self.lock();
368        inner.linger
369    }
370
371    fn set_linger(&self, linger: uapi::linger) {
372        let mut inner = self.lock();
373        inner.linger = linger;
374    }
375
376    fn get_passcred(&self) -> bool {
377        let inner = self.lock();
378        inner.passcred
379    }
380
381    fn set_passcred(&self, passcred: bool) {
382        let mut inner = self.lock();
383        inner.passcred = passcred;
384    }
385
386    fn get_passsec(&self) -> bool {
387        let inner = self.lock();
388        inner.passsec
389    }
390
391    fn set_passsec(&self, passsec: bool) {
392        let mut inner = self.lock();
393        inner.passsec = passsec;
394    }
395
396    fn get_broadcast(&self) -> bool {
397        let inner = self.lock();
398        inner.broadcast
399    }
400
401    fn set_broadcast(&self, broadcast: bool) {
402        let mut inner = self.lock();
403        inner.broadcast = broadcast;
404    }
405
406    fn get_no_check(&self) -> bool {
407        let inner = self.lock();
408        inner.no_check
409    }
410
411    fn set_no_check(&self, no_check: bool) {
412        let mut inner = self.lock();
413        inner.no_check = no_check;
414    }
415
416    fn get_reuseaddr(&self) -> bool {
417        let inner = self.lock();
418        inner.reuseaddr
419    }
420
421    fn set_reuseaddr(&self, reuseaddr: bool) {
422        let mut inner = self.lock();
423        inner.reuseaddr = reuseaddr;
424    }
425
426    fn get_reuseport(&self) -> bool {
427        let inner = self.lock();
428        inner.reuseport
429    }
430
431    fn set_reuseport(&self, reuseport: bool) {
432        let mut inner = self.lock();
433        inner.reuseport = reuseport;
434    }
435
436    fn get_keepalive(&self) -> bool {
437        let inner = self.lock();
438        inner.keepalive
439    }
440
441    fn set_keepalive(&self, keepalive: bool) {
442        let mut inner = self.lock();
443        inner.keepalive = keepalive;
444    }
445
446    fn set_bpf_program(&self, program: Option<UnixSocketFilter>) {
447        let mut inner = self.lock();
448        inner.bpf_program = program;
449    }
450
451    fn peer_cred(&self) -> Option<ucred> {
452        let peer = {
453            let inner = self.lock();
454            inner.peer().cloned()
455        };
456        if let Some(peer) = peer {
457            let unix_socket = downcast_socket_to_unix(&peer);
458            let unix_socket = unix_socket.lock();
459            unix_socket.credentials.clone()
460        } else {
461            None
462        }
463    }
464
465    pub fn bind_socket_to_node(
466        &self,
467        socket: &SocketHandle,
468        address: SocketAddress,
469        node: &FsNodeHandle,
470    ) -> Result<(), Errno> {
471        let unix_socket = downcast_socket_to_unix(socket);
472        let mut inner = unix_socket.lock();
473        inner.bind(address)?;
474        node.set_bound_socket(socket.clone());
475        Ok(())
476    }
477
478    fn notify_shutdown(&self) {
479        self.waiters.notify_fd_events(FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP);
480    }
481}
482
483impl SocketOps for UnixSocket {
484    fn connect(
485        &self,
486        locked: &mut Locked<FileOpsCore>,
487        socket: &SocketHandle,
488        current_task: &CurrentTask,
489        peer: SocketPeer,
490    ) -> Result<(), Errno> {
491        let peer = match peer {
492            SocketPeer::Handle(handle) => handle,
493            SocketPeer::Address(_) => return error!(EINVAL),
494        };
495        match socket.socket_type {
496            SocketType::Stream | SocketType::SeqPacket => {
497                self.connect_stream(locked, socket, current_task, &peer)
498            }
499            SocketType::Datagram | SocketType::Raw => self.connect_datagram(socket, &peer),
500            _ => error!(EINVAL),
501        }
502    }
503
504    fn listen(
505        &self,
506        _locked: &mut Locked<FileOpsCore>,
507        socket: &Socket,
508        backlog: i32,
509        credentials: ucred,
510    ) -> Result<(), Errno> {
511        match socket.socket_type {
512            SocketType::Stream | SocketType::SeqPacket => {}
513            _ => return error!(EOPNOTSUPP),
514        }
515        let mut inner = self.lock();
516        inner.credentials = Some(credentials);
517        let is_bound = inner.address.is_some();
518        let backlog = if backlog < 0 { DEFAULT_LISTEN_BACKLOG } else { backlog as usize };
519        match &mut inner.state {
520            UnixSocketState::Disconnected if is_bound => {
521                inner.state = UnixSocketState::Listening(AcceptQueue::new(backlog));
522                Ok(())
523            }
524            UnixSocketState::Listening(queue) => {
525                queue.set_backlog(backlog)?;
526                Ok(())
527            }
528            _ => error!(EINVAL),
529        }
530    }
531
532    fn accept(
533        &self,
534        _locked: &mut Locked<FileOpsCore>,
535        socket: &Socket,
536        _current_task: &CurrentTask,
537    ) -> Result<SocketHandle, Errno> {
538        match socket.socket_type {
539            SocketType::Stream | SocketType::SeqPacket => {}
540            _ => return error!(EOPNOTSUPP),
541        }
542        let mut inner = self.lock();
543        let queue = match &mut inner.state {
544            UnixSocketState::Listening(queue) => queue,
545            _ => return error!(EINVAL),
546        };
547        queue.sockets.pop_front().ok_or_else(|| errno!(EAGAIN))
548    }
549
550    fn bind(
551        &self,
552        _locked: &mut Locked<FileOpsCore>,
553        _socket: &Socket,
554        _current_task: &CurrentTask,
555        socket_address: SocketAddress,
556    ) -> Result<(), Errno> {
557        match socket_address {
558            SocketAddress::Unix(_) => {}
559            _ => return error!(EINVAL),
560        }
561        self.lock().bind(socket_address)
562    }
563
564    fn read(
565        &self,
566        _locked: &mut Locked<FileOpsCore>,
567        socket: &Socket,
568        _current_task: &CurrentTask,
569        data: &mut dyn OutputBuffer,
570        flags: SocketMessageFlags,
571    ) -> Result<MessageReadInfo, Errno> {
572        let info = self.lock().read(data, socket.socket_type, flags)?;
573        if info.bytes_read > 0 {
574            let peer = {
575                let inner = self.lock();
576                inner.peer().cloned()
577            };
578            if let Some(socket) = peer {
579                let unix_socket_peer = socket.downcast_socket::<UnixSocket>();
580                if let Some(socket) = unix_socket_peer {
581                    socket.waiters.notify_fd_events(FdEvents::POLLOUT);
582                }
583            }
584        }
585        Ok(info)
586    }
587
588    fn write(
589        &self,
590        locked: &mut Locked<FileOpsCore>,
591        socket: &Socket,
592        current_task: &CurrentTask,
593        data: &mut dyn InputBuffer,
594        dest_address: &mut Option<SocketAddress>,
595        ancillary_data: &mut Vec<AncillaryData>,
596    ) -> Result<usize, Errno> {
597        let (connected_peer, local_address, creds) = {
598            let inner = self.lock();
599            (inner.peer().map(|p| p.clone()), inner.address.clone(), inner.credentials.clone())
600        };
601
602        let peer = match (connected_peer, dest_address, socket.socket_type) {
603            (Some(peer), None, _) => peer,
604            (None, Some(_), SocketType::Stream) => return error!(EOPNOTSUPP),
605            (None, Some(_), SocketType::SeqPacket) => return error!(ENOTCONN),
606            (Some(_), Some(_), _) => return error!(EISCONN),
607            (_, Some(SocketAddress::Unix(name)), _) => {
608                resolve_unix_socket_address(locked, current_task, name.as_ref())?
609            }
610            (_, Some(_), _) => return error!(EINVAL),
611            (None, None, _) => return error!(ENOTCONN),
612        };
613
614        if socket.socket_type == SocketType::Datagram {
615            security::unix_may_send(current_task, socket, &peer)?;
616        }
617
618        let unix_socket = downcast_socket_to_unix(&peer);
619        let bytes_written = {
620            let mut peer = unix_socket.lock();
621            if peer.passcred {
622                let creds = creds.unwrap_or_else(|| current_task.current_ucred());
623                ancillary_data.push(AncillaryData::Unix(UnixControlData::Credentials(creds)));
624            }
625            if peer.passsec {
626                // TODO: https://fxbug.dev/364568855 - Store the opaque LSM property value, and expand
627                // it to a string upon readmsg.
628                let context = security::socket_getpeersec_dgram(current_task, socket);
629                ancillary_data.push(AncillaryData::Unix(UnixControlData::Security(context.into())));
630            }
631            peer.write(
632                locked,
633                current_task,
634                data,
635                local_address,
636                ancillary_data,
637                socket.socket_type,
638            )?
639        };
640        if bytes_written > 0 {
641            unix_socket.waiters.notify_fd_events(FdEvents::POLLIN);
642        }
643        Ok(bytes_written)
644    }
645
646    fn wait_async(
647        &self,
648        _locked: &mut Locked<FileOpsCore>,
649        _socket: &Socket,
650        _current_task: &CurrentTask,
651        waiter: &Waiter,
652        events: FdEvents,
653        handler: EventHandler,
654    ) -> WaitCanceler {
655        self.waiters.wait_async_fd_events(waiter, events, handler)
656    }
657
658    fn query_events(
659        &self,
660        _locked: &mut Locked<FileOpsCore>,
661        _socket: &Socket,
662        _current_task: &CurrentTask,
663    ) -> Result<FdEvents, Errno> {
664        // Note that self.lock() must be dropped before acquiring peer.inner.lock() to avoid
665        // potential deadlocks.
666        let (mut events, peer) = {
667            let inner = self.lock();
668
669            let mut events = FdEvents::empty();
670            let local_events = inner.messages.query_events();
671            // From our end's message queue we only care about POLLIN (whether we have data stored
672            // that's readable). POLLOUT is based on whether the peer end has room in its buffer.
673            if local_events.contains(FdEvents::POLLIN) {
674                events = FdEvents::POLLIN;
675            }
676
677            if inner.is_shutdown {
678                events |= FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP;
679            }
680
681            match &inner.state {
682                UnixSocketState::Listening(queue) => {
683                    if !queue.sockets.is_empty() {
684                        events |= FdEvents::POLLIN;
685                    }
686                }
687                UnixSocketState::Closed => {
688                    events |= FdEvents::POLLHUP;
689                }
690                _ => {}
691            }
692
693            (events, inner.peer().cloned())
694        };
695
696        // Check the peer (outside of our lock) to see if it can accept data written from our end.
697        if let Some(peer) = peer {
698            let unix_socket = downcast_socket_to_unix(&peer);
699            let peer_inner = unix_socket.lock();
700            let peer_events = peer_inner.messages.query_events();
701            if peer_events.contains(FdEvents::POLLOUT) {
702                events |= FdEvents::POLLOUT;
703            }
704        }
705
706        Ok(events)
707    }
708
709    /// Shuts down this socket according to how, preventing any future reads and/or writes.
710    ///
711    /// Used by the shutdown syscalls.
712    fn shutdown(
713        &self,
714        _locked: &mut Locked<FileOpsCore>,
715        _socket: &Socket,
716        how: SocketShutdownFlags,
717    ) -> Result<(), Errno> {
718        let mut should_notify_self = false;
719        let mut should_notify_peer = false;
720        let peer = {
721            let mut inner = self.lock();
722            let peer = inner.peer().ok_or_else(|| errno!(ENOTCONN))?.clone();
723            if how.contains(SocketShutdownFlags::READ) {
724                inner.is_shutdown = true;
725                should_notify_self = true;
726            }
727            peer
728        };
729        if how.contains(SocketShutdownFlags::WRITE) {
730            let unix_socket = downcast_socket_to_unix(&peer);
731            unix_socket.lock().is_shutdown = true;
732            should_notify_peer = true;
733        }
734        if should_notify_self {
735            self.waiters.notify_fd_events(FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP);
736        }
737        if should_notify_peer {
738            let unix_socket = downcast_socket_to_unix(&peer);
739            unix_socket
740                .waiters
741                .notify_fd_events(FdEvents::POLLIN | FdEvents::POLLOUT | FdEvents::POLLHUP);
742        }
743        Ok(())
744    }
745
746    /// Close this socket.
747    ///
748    /// Called by SocketFile when the file descriptor that is holding this
749    /// socket is closed.
750    ///
751    /// Close differs from shutdown in two ways. First, close will call
752    /// mark_peer_closed_with_unread_data if this socket has unread data,
753    /// which changes how read() behaves on that socket. Second, close
754    /// transitions the internal state of this socket to Closed, which breaks
755    /// the reference cycle that exists in the connected state.
756    fn close(
757        &self,
758        _locked: &mut Locked<FileOpsCore>,
759        _current_task: &CurrentTask,
760        socket: &Socket,
761    ) {
762        let (maybe_peer, has_unread) = {
763            let mut inner = self.lock();
764            let maybe_peer = inner.peer().map(Arc::clone);
765            inner.is_shutdown = true;
766            inner.state = UnixSocketState::Closed;
767            (maybe_peer, !inner.messages.is_empty())
768        };
769        self.notify_shutdown();
770        // If this is a connected socket type, also shut down the connected peer.
771        if socket.socket_type == SocketType::Stream || socket.socket_type == SocketType::SeqPacket {
772            if let Some(peer) = maybe_peer {
773                let unix_socket = downcast_socket_to_unix(&peer);
774
775                {
776                    let mut peer_inner = unix_socket.lock();
777                    if has_unread {
778                        peer_inner.peer_closed_with_unread_data = true;
779                    }
780                    peer_inner.is_shutdown = true;
781                }
782                unix_socket.notify_shutdown();
783            }
784        }
785    }
786
787    /// Returns the name of this socket.
788    ///
789    /// The name is derived from the address and domain. A socket
790    /// will always have a name, even if it is not bound to an address.
791    fn getsockname(
792        &self,
793        _locked: &mut Locked<FileOpsCore>,
794        socket: &Socket,
795    ) -> Result<SocketAddress, Errno> {
796        let inner = self.lock();
797        if let Some(address) = &inner.address {
798            Ok(address.clone())
799        } else {
800            Ok(SocketAddress::default_for_domain(socket.domain))
801        }
802    }
803
804    /// Returns the name of the peer of this socket, if such a peer exists.
805    ///
806    /// Returns an error if the socket is not connected.
807    fn getpeername(
808        &self,
809        locked: &mut Locked<FileOpsCore>,
810        _socket: &Socket,
811    ) -> Result<SocketAddress, Errno> {
812        let peer = self.lock().peer().ok_or_else(|| errno!(ENOTCONN))?.clone();
813        peer.getsockname(locked)
814    }
815
816    fn setsockopt(
817        &self,
818        _locked: &mut Locked<FileOpsCore>,
819        _socket: &Socket,
820        current_task: &CurrentTask,
821        level: u32,
822        optname: u32,
823        optval: SockOptValue,
824    ) -> Result<(), Errno> {
825        match level {
826            SOL_SOCKET => match optname {
827                SO_SNDBUF => {
828                    let requested_capacity: socklen_t = optval.read(current_task)?;
829                    // See StreamUnixSocketPairTest.SetSocketSendBuf for why we multiply by 2 here.
830                    self.set_send_capacity(requested_capacity as usize * 2);
831                }
832                SO_RCVBUF => {
833                    let requested_capacity: socklen_t = optval.read(current_task)?;
834                    self.set_receive_capacity(requested_capacity as usize);
835                }
836                SO_LINGER => {
837                    let mut linger: uapi::linger = optval.read(current_task)?;
838                    if linger.l_onoff != 0 {
839                        linger.l_onoff = 1;
840                    }
841                    self.set_linger(linger);
842                }
843                SO_PASSCRED => {
844                    let passcred: u32 = optval.read(current_task)?;
845                    self.set_passcred(passcred != 0);
846                }
847                SO_PASSSEC => {
848                    let passsec: u32 = optval.read(current_task)?;
849                    self.set_passsec(passsec != 0);
850                }
851                SO_BROADCAST => {
852                    let broadcast: u32 = optval.read(current_task)?;
853                    self.set_broadcast(broadcast != 0);
854                }
855                SO_NO_CHECK => {
856                    let no_check: u32 = optval.read(current_task)?;
857                    self.set_no_check(no_check != 0);
858                }
859                SO_REUSEADDR => {
860                    let reuseaddr: u32 = optval.read(current_task)?;
861                    self.set_reuseaddr(reuseaddr != 0);
862                }
863                SO_REUSEPORT => {
864                    let reuseport: u32 = optval.read(current_task)?;
865                    self.set_reuseport(reuseport != 0);
866                }
867                SO_KEEPALIVE => {
868                    let keepalive: u32 = optval.read(current_task)?;
869                    self.set_keepalive(keepalive != 0);
870                }
871                SO_ATTACH_BPF => {
872                    let fd: FdNumber = optval.read(current_task)?;
873                    let object = get_bpf_object(current_task, fd)?;
874                    let program = object.as_program()?;
875
876                    let linked_program = program.link(ProgramType::SocketFilter)?;
877
878                    self.set_bpf_program(Some(linked_program));
879                }
880                _ => return error!(ENOPROTOOPT),
881            },
882            _ => return error!(ENOPROTOOPT),
883        }
884        Ok(())
885    }
886
887    fn getsockopt(
888        &self,
889        _locked: &mut Locked<FileOpsCore>,
890        socket: &Socket,
891        current_task: &CurrentTask,
892        level: u32,
893        optname: u32,
894        _optlen: u32,
895    ) -> Result<Vec<u8>, Errno> {
896        match level {
897            SOL_SOCKET => match optname {
898                SO_PEERCRED => Ok(UcredPtr::into_bytes(
899                    current_task,
900                    self.peer_cred().unwrap_or(ucred { pid: 0, uid: uid_t::MAX, gid: gid_t::MAX }),
901                )
902                .map_err(|_| errno!(EINVAL))?),
903                SO_PEERSEC => match socket.socket_type {
904                    SocketType::Stream => security::socket_getpeersec_stream(current_task, socket),
905                    _ => error!(ENOPROTOOPT),
906                },
907                SO_ACCEPTCONN =>
908                {
909                    #[allow(clippy::bool_to_int_with_if)]
910                    Ok(if self.is_listening(socket) { 1u32 } else { 0u32 }.to_ne_bytes().to_vec())
911                }
912                SO_SNDBUF => Ok((self.get_send_capacity() as socklen_t).to_ne_bytes().to_vec()),
913                SO_RCVBUF => Ok((self.get_receive_capacity() as socklen_t).to_ne_bytes().to_vec()),
914                SO_LINGER => Ok(self.get_linger().as_bytes().to_vec()),
915                SO_PASSCRED => Ok((self.get_passcred() as u32).as_bytes().to_vec()),
916                SO_PASSSEC => Ok((self.get_passsec() as u32).as_bytes().to_vec()),
917                SO_BROADCAST => Ok((self.get_broadcast() as u32).as_bytes().to_vec()),
918                SO_NO_CHECK => Ok((self.get_no_check() as u32).as_bytes().to_vec()),
919                SO_REUSEADDR => Ok((self.get_reuseaddr() as u32).as_bytes().to_vec()),
920                SO_REUSEPORT => Ok((self.get_reuseport() as u32).as_bytes().to_vec()),
921                SO_KEEPALIVE => Ok((self.get_keepalive() as u32).as_bytes().to_vec()),
922                SO_ERROR => Ok((0u32).as_bytes().to_vec()),
923                _ => error!(ENOPROTOOPT),
924            },
925            _ => error!(ENOPROTOOPT),
926        }
927    }
928
929    fn ioctl(
930        &self,
931        locked: &mut Locked<Unlocked>,
932        socket: &Socket,
933        file: &FileObject,
934        current_task: &CurrentTask,
935        request: u32,
936        arg: SyscallArg,
937    ) -> Result<SyscallResult, Errno> {
938        let user_addr = UserAddress::from(arg);
939        match request {
940            FIONREAD if socket.socket_type == SocketType::Stream => {
941                let length: i32 =
942                    self.lock().messages.len().try_into().map_err(|_| errno!(EINVAL))?;
943                current_task.write_object(UserRef::<i32>::new(user_addr), &length)?;
944                Ok(SUCCESS)
945            }
946            _ => default_ioctl(file, locked, current_task, request, arg),
947        }
948    }
949}
950
951impl UnixSocketInner {
952    pub fn bind(&mut self, socket_address: SocketAddress) -> Result<(), Errno> {
953        if self.address.is_some() {
954            return error!(EINVAL);
955        }
956        self.address = Some(socket_address);
957        Ok(())
958    }
959
960    fn set_capacity(&mut self, requested_capacity: usize) {
961        let capacity = requested_capacity.clamp(SOCKET_MIN_SIZE, SOCKET_MAX_SIZE);
962        let capacity = std::cmp::max(capacity, self.messages.len());
963        // We have validated capacity sufficiently that set_capacity should always succeed.
964        self.messages.set_capacity(capacity).unwrap();
965    }
966
967    /// Returns the socket that is connected to this socket, if such a peer exists. Returns
968    /// ENOTCONN otherwise.
969    fn peer(&self) -> Option<&SocketHandle> {
970        match &self.state {
971            UnixSocketState::Connected(peer) => Some(peer),
972            _ => None,
973        }
974    }
975
976    /// Reads the the contents of this socket into `InputBuffer`.
977    ///
978    /// Will stop reading if a message with ancillary data is encountered (after the message with
979    /// ancillary data has been read).
980    ///
981    /// # Parameters
982    /// - `data`: The `OutputBuffer` to write the data to.
983    ///
984    /// Returns the number of bytes that were read into the buffer, and any ancillary data that was
985    /// read from the socket.
986    fn read(
987        &mut self,
988        data: &mut dyn OutputBuffer,
989        socket_type: SocketType,
990        flags: SocketMessageFlags,
991    ) -> Result<MessageReadInfo, Errno> {
992        let mut info = if socket_type == SocketType::Stream {
993            if data.available() == 0 {
994                return Ok(MessageReadInfo::default());
995            }
996
997            if flags.contains(SocketMessageFlags::PEEK) {
998                self.messages.peek_stream(data)?
999            } else {
1000                self.messages.read_stream(data)?
1001            }
1002        } else if flags.contains(SocketMessageFlags::PEEK) {
1003            self.messages.peek_datagram(data)?
1004        } else {
1005            self.messages.read_datagram(data)?
1006        };
1007        if info.message_length == 0 {
1008            if self.peer_closed_with_unread_data {
1009                // Reset the flag
1010                self.peer_closed_with_unread_data = false;
1011                return error!(ECONNRESET);
1012            }
1013            if !self.is_shutdown {
1014                return error!(EAGAIN);
1015            }
1016        }
1017
1018        // Remove any credentials message, so that it can be moved to the front if passcred is
1019        // enabled, or simply be removed if passcred is not enabled.
1020        let creds_message;
1021        if let Some(index) = info
1022            .ancillary_data
1023            .iter()
1024            .position(|m| matches!(m, AncillaryData::Unix(UnixControlData::Credentials { .. })))
1025        {
1026            creds_message = info.ancillary_data.remove(index)
1027        } else {
1028            // If passcred is enabled credentials are returned even if they were not sent.
1029            creds_message = AncillaryData::Unix(UnixControlData::unknown_creds());
1030        }
1031        if self.passcred {
1032            // Allow credentials to take priority if they are enabled, so insert at 0.
1033            info.ancillary_data.insert(0, creds_message);
1034        }
1035
1036        Ok(info)
1037    }
1038
1039    /// Writes the the contents of `InputBuffer` into this socket.
1040    ///
1041    /// # Parameters
1042    /// - `data`: The `InputBuffer` to read the data from.
1043    /// - `ancillary_data`: Any ancillary data to write to the socket. Note that the ancillary data
1044    ///                     will only be written if the entirety of the requested write completes.
1045    ///
1046    /// Returns the number of bytes that were written to the socket.
1047    fn write(
1048        &mut self,
1049        locked: &mut Locked<FileOpsCore>,
1050        current_task: &CurrentTask,
1051        data: &mut dyn InputBuffer,
1052        address: Option<SocketAddress>,
1053        ancillary_data: &mut Vec<AncillaryData>,
1054        socket_type: SocketType,
1055    ) -> Result<usize, Errno> {
1056        if self.is_shutdown {
1057            return error!(EPIPE);
1058        }
1059        let filter = |mut message: Message| {
1060            let Some(bpf_program) = self.bpf_program.as_ref() else {
1061                return Some(message);
1062            };
1063
1064            // TODO(https://fxbug.dev/385015056): Fill in SkBuf.
1065            let mut sk_buf = SkBuf::default();
1066
1067            let mut context = EbpfRunContextImpl::<'_>::new(locked.cast_locked(), current_task);
1068            let s = bpf_program.run(&mut context, &mut sk_buf);
1069            if s == 0 {
1070                None
1071            } else {
1072                message.truncate(s as usize);
1073                Some(message)
1074            }
1075        };
1076        let bytes_written = if socket_type == SocketType::Stream {
1077            self.messages.write_stream_with_filter(data, address, ancillary_data, filter)?
1078        } else {
1079            self.messages.write_datagram_with_filter(data, address, ancillary_data, filter)?
1080        };
1081        Ok(bytes_written)
1082    }
1083}
1084
1085pub fn resolve_unix_socket_address<L>(
1086    locked: &mut Locked<L>,
1087    current_task: &CurrentTask,
1088    name: &FsStr,
1089) -> Result<SocketHandle, Errno>
1090where
1091    L: LockEqualOrBefore<FileOpsCore>,
1092{
1093    if name[0] == b'\0' {
1094        current_task.live().abstract_socket_namespace.lookup(name)
1095    } else {
1096        let mut context = LookupContext::default();
1097        let (parent, basename) =
1098            current_task.lookup_parent_at(locked, &mut context, FdNumber::AT_FDCWD, name)?;
1099        let name =
1100            parent.lookup_child(locked, current_task, &mut context, basename).map_err(|errno| {
1101                if matches!(errno.code, EACCES | EPERM | EINTR) {
1102                    errno
1103                } else {
1104                    errno!(ECONNREFUSED)
1105                }
1106            })?;
1107        name.check_access(
1108            locked,
1109            current_task,
1110            Access::WRITE,
1111            CheckAccessReason::InternalPermissionChecks,
1112        )?;
1113        name.entry.node.bound_socket().map(|s| s.clone()).ok_or_else(|| errno!(ECONNREFUSED))
1114    }
1115}
1116
1117// Packet buffer representation used for eBPF filters.
1118#[repr(C)]
1119#[derive(Default)]
1120struct SkBuf {
1121    sk_buff: __sk_buff,
1122}
1123
1124impl Packet for &mut SkBuf {
1125    fn load(&self, _offset: i32, _width: DataWidth) -> Option<BpfValue> {
1126        // TODO(https://fxbug.dev/385015056): Implement packet access.
1127        None
1128    }
1129}
1130
1131impl<'a, 'b> SocketCookieContext<&'a mut SkBuf> for EbpfRunContextImpl<'b> {
1132    fn get_socket_cookie(&self, _sk_buf: &'a mut SkBuf) -> u64 {
1133        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_get_socket_cookie");
1134        0
1135    }
1136}
1137
1138impl<'a, 'b> SocketUidContext<&'a mut SkBuf> for EbpfRunContextImpl<'b> {
1139    fn get_socket_uid(&self, _sk_buf: &'a mut SkBuf) -> Option<uid_t> {
1140        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_get_socket_uid");
1141        None
1142    }
1143}
1144
1145impl<'a> PacketWithLoadBytes for &'a mut SkBuf {
1146    fn load_bytes_relative(&self, _base: LoadBytesBase, _offset: usize, _buf: &mut [u8]) -> i64 {
1147        track_stub!(TODO("https://fxbug.dev/385015056"), "bpf_load_bytes_relative");
1148        -1
1149    }
1150}
1151
1152impl ProgramArgument for &'_ mut SkBuf {
1153    fn get_type() -> &'static Type {
1154        &*SOCKET_FILTER_SK_BUF_TYPE
1155    }
1156}
1157
1158struct UnixSocketEbpfContext {}
1159impl BpfProgramContext for UnixSocketEbpfContext {
1160    type RunContext<'a> = EbpfRunContextImpl<'a>;
1161    type Packet<'a> = &'a mut SkBuf;
1162    type Map = PinnedMap;
1163    const CBPF_CONFIG: &'static CbpfConfig = &SOCKET_FILTER_CBPF_CONFIG;
1164}
1165
1166ebpf_api::ebpf_program_context_type!(UnixSocketEbpfContext, SocketFilterProgramContext);
1167
1168type UnixSocketFilter = EbpfProgram<UnixSocketEbpfContext>;
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173    use crate::mm::MemoryAccessor;
1174    use crate::testing::{map_memory, spawn_kernel_and_run};
1175    use starnix_types::user_buffer::UserBuffer;
1176
1177    #[::fuchsia::test]
1178    async fn test_socket_send_capacity() {
1179        spawn_kernel_and_run(async |locked, current_task| {
1180            let _kernel = current_task.kernel();
1181            let socket = Socket::new(
1182                locked,
1183                &current_task,
1184                SocketDomain::Unix,
1185                SocketType::Stream,
1186                SocketProtocol::default(),
1187                /* kernel_private = */ false,
1188            )
1189            .expect("Failed to create socket.");
1190            socket
1191                .bind(locked, &current_task, SocketAddress::Unix(b"\0".into()))
1192                .expect("Failed to bind socket.");
1193            socket.listen(locked, &current_task, 10).expect("Failed to listen.");
1194            let connecting_socket = Socket::new(
1195                locked,
1196                &current_task,
1197                SocketDomain::Unix,
1198                SocketType::Stream,
1199                SocketProtocol::default(),
1200                /* kernel_private = */ false,
1201            )
1202            .expect("Failed to connect socket.");
1203            connecting_socket
1204                .ops
1205                .connect(
1206                    locked.cast_locked(),
1207                    &connecting_socket,
1208                    &current_task,
1209                    SocketPeer::Handle(socket.clone()),
1210                )
1211                .expect("Failed to connect socket.");
1212            assert_eq!(Ok(FdEvents::POLLIN), socket.query_events(locked, &current_task));
1213            let server_socket = socket.accept(locked, &current_task).unwrap();
1214
1215            let opt_size = std::mem::size_of::<socklen_t>();
1216            let user_address =
1217                map_memory(locked, &current_task, UserAddress::default(), opt_size as u64);
1218            let send_capacity: socklen_t = 4 * 4096;
1219            current_task.write_memory(user_address, &send_capacity.to_ne_bytes()).unwrap();
1220            let user_buffer = UserBuffer { address: user_address, length: opt_size };
1221            server_socket
1222                .setsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, user_buffer.into())
1223                .unwrap();
1224
1225            let opt_bytes =
1226                server_socket.getsockopt(locked, &current_task, SOL_SOCKET, SO_SNDBUF, 0).unwrap();
1227            let retrieved_capacity = socklen_t::from_ne_bytes(opt_bytes.try_into().unwrap());
1228            // Setting SO_SNDBUF actually sets it to double the size
1229            assert_eq!(2 * send_capacity, retrieved_capacity);
1230        })
1231        .await;
1232    }
1233}