Skip to main content

starnix_core/fs/fuchsia/
remote_unix_domain_socket.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fs::fuchsia::{OpenFlags, new_remote_file};
6use crate::task::{
7    CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
8};
9use crate::vfs::buffers::{InputBuffer, OutputBuffer};
10use crate::vfs::socket::{
11    SockOptValue, Socket, SocketAddress, SocketDomain, SocketHandle, SocketMessageFlags, SocketOps,
12    SocketPeer, SocketProtocol, SocketShutdownFlags, SocketType,
13};
14use crate::vfs::{AncillaryData, FileHandle, MessageReadInfo, UnixControlData};
15use fidl::endpoints::SynchronousProxy;
16use fidl_fuchsia_io as fio;
17use fidl_fuchsia_starnix_binder as fbinder;
18use linux_uapi::{SO_LINGER, SOL_SOCKET};
19use starnix_sync::{FileOpsCore, Locked};
20use starnix_uapi::auth::Credentials;
21use starnix_uapi::errors::Errno;
22use starnix_uapi::vfs::FdEvents;
23use starnix_uapi::{errno, error, from_status_like_fdio, uapi, ucred};
24use std::sync::Arc;
25use zerocopy::IntoBytes;
26static READABLE_SIGNAL: zx::Signals =
27    zx::Signals::from_bits_retain(fio::FileSignal::READABLE.bits());
28static WRITABLE_SIGNAL: zx::Signals =
29    zx::Signals::from_bits_retain(fio::FileSignal::WRITABLE.bits());
30
31pub struct RemoteUnixDomainSocket {
32    client: fbinder::UnixDomainSocketSynchronousProxy,
33    event: zx::EventPair,
34    remote_creds: Arc<Credentials>,
35}
36
37impl RemoteUnixDomainSocket {
38    pub fn new(channel: zx::Channel, remote_creds: Arc<Credentials>) -> Result<Self, Errno> {
39        let client = fbinder::UnixDomainSocketSynchronousProxy::from_channel(channel);
40        let response = client
41            .get_event(
42                &fbinder::UnixDomainSocketGetEventRequest::default(),
43                zx::MonotonicInstant::INFINITE,
44            )
45            .map_err(|_| errno!(ECONNREFUSED))?
46            .map_err(|e: i32| from_status_like_fdio!(zx::Status::from_raw(e)))?;
47        let event = response.event.ok_or_else(|| errno!(ECONNREFUSED))?;
48        Ok(Self { client, event, remote_creds })
49    }
50
51    fn get_signals_from_events(events: FdEvents) -> zx::Signals {
52        let mut signals = zx::Signals::NONE;
53        if events.contains(FdEvents::POLLIN) {
54            signals |= READABLE_SIGNAL;
55        }
56        if events.contains(FdEvents::POLLOUT) {
57            signals |= WRITABLE_SIGNAL;
58        }
59        signals
60    }
61
62    fn get_events_from_signals(signals: zx::Signals) -> FdEvents {
63        let mut events = FdEvents::empty();
64        if signals.contains(READABLE_SIGNAL) {
65            events |= FdEvents::POLLIN;
66        }
67        if signals.contains(WRITABLE_SIGNAL) {
68            events |= FdEvents::POLLOUT;
69        }
70        events
71    }
72
73    /// Perform an action using the credentials of the remote task.
74    fn with_remote_creds<F, R>(&self, current_task: &CurrentTask, f: F) -> Result<R, Errno>
75    where
76        F: FnOnce() -> Result<R, Errno>,
77    {
78        current_task.override_creds(self.remote_creds.clone(), f)
79    }
80}
81
82impl SocketOps for RemoteUnixDomainSocket {
83    fn get_socket_info(&self) -> Result<(SocketDomain, SocketType, SocketProtocol), Errno> {
84        Ok((SocketDomain::Unix, SocketType::Datagram, SocketProtocol::from_raw(0)))
85    }
86
87    fn connect(
88        &self,
89        _locked: &mut Locked<FileOpsCore>,
90        _socket: &SocketHandle,
91        _current_task: &CurrentTask,
92        _peer: SocketPeer,
93    ) -> Result<(), Errno> {
94        error!(EISCONN)
95    }
96
97    fn listen(
98        &self,
99        _locked: &mut Locked<FileOpsCore>,
100        _socket: &Socket,
101        _backlog: i32,
102        _credentials: ucred,
103    ) -> Result<(), Errno> {
104        error!(EOPNOTSUPP)
105    }
106
107    fn accept(
108        &self,
109        _locked: &mut Locked<FileOpsCore>,
110        _socket: &Socket,
111        _current_task: &CurrentTask,
112    ) -> Result<SocketHandle, Errno> {
113        error!(EOPNOTSUPP)
114    }
115
116    fn bind(
117        &self,
118        _locked: &mut Locked<FileOpsCore>,
119        _socket: &Socket,
120        _current_task: &CurrentTask,
121        _socket_address: SocketAddress,
122    ) -> Result<(), Errno> {
123        error!(EOPNOTSUPP)
124    }
125
126    fn read(
127        &self,
128        locked: &mut Locked<FileOpsCore>,
129        _socket: &Socket,
130        current_task: &CurrentTask,
131        data: &mut dyn OutputBuffer,
132        flags: SocketMessageFlags,
133    ) -> Result<MessageReadInfo, Errno> {
134        if self.client.is_closed().map_err(|_| errno!(ECONNREFUSED))? {
135            return error!(ECONNREFUSED);
136        }
137        let mut read_flags = fbinder::ReadFlags::empty();
138        if flags.contains(SocketMessageFlags::PEEK) {
139            read_flags |= fbinder::ReadFlags::PEEK;
140        }
141
142        let response = self
143            .client
144            .read(
145                &fbinder::UnixDomainSocketReadRequest {
146                    count: Some(data.available() as u64),
147                    flags: Some(read_flags),
148                    ..Default::default()
149                },
150                zx::MonotonicInstant::INFINITE,
151            )
152            .map_err(|_| errno!(ECONNREFUSED))?
153            .map_err(|e: i32| {
154                let status = zx::Status::from_raw(e);
155                if status == zx::Status::PEER_CLOSED {
156                    errno!(ECONNRESET)
157                } else {
158                    from_status_like_fdio!(status)
159                }
160            })?;
161
162        let written =
163            if let Some(received_data) = response.data { data.write(&received_data)? } else { 0 };
164
165        let mut file_handles: Vec<FileHandle> = vec![];
166        if let Some(handles) = response.handles {
167            // Use the remote task's credentials to create the remote_file object. This ensures
168            // that the SID associated to the fd is set to the correct value.
169            self.with_remote_creds(current_task, || {
170                for handle in handles {
171                    file_handles.push(new_remote_file(
172                        locked,
173                        current_task,
174                        handle,
175                        OpenFlags::RDWR,
176                    )?);
177                }
178                Ok(())
179            })?;
180        }
181        let ancillary_data = vec![AncillaryData::Unix(UnixControlData::Rights(file_handles))];
182
183        let message_length = response.data_original_length.unwrap_or(written as u64) as usize;
184
185        Ok(MessageReadInfo { bytes_read: written, message_length, address: None, ancillary_data })
186    }
187
188    fn write(
189        &self,
190        _locked: &mut Locked<FileOpsCore>,
191        _socket: &Socket,
192        current_task: &CurrentTask,
193        data: &mut dyn InputBuffer,
194        _dest_address: &mut Option<SocketAddress>,
195        ancillary_data: &mut Vec<AncillaryData>,
196    ) -> Result<usize, Errno> {
197        if self.client.is_closed().map_err(|_| errno!(ECONNREFUSED))? {
198            return error!(ECONNREFUSED);
199        }
200
201        let mut handles: Vec<zx::NullableHandle> = vec![];
202        for data in ancillary_data {
203            match data {
204                AncillaryData::Unix(UnixControlData::Rights(file_handles)) => {
205                    // Access the served files with the credentials of the remote end.
206                    self.with_remote_creds(current_task, || {
207                        for file_handle in file_handles {
208                            let Some(handle) = file_handle.to_handle(current_task)? else {
209                                return error!(EINVAL);
210                            };
211                            handles.push(handle);
212                        }
213                        Ok(())
214                    })?;
215                }
216                _ => return error!(EINVAL),
217            }
218        }
219
220        let bytes = data.read_all()?;
221
222        let response = self
223            .client
224            .write(
225                fbinder::UnixDomainSocketWriteRequest {
226                    data: Some(bytes),
227                    handles: Some(handles),
228                    ..Default::default()
229                },
230                zx::MonotonicInstant::INFINITE,
231            )
232            .map_err(|_| errno!(ECONNREFUSED))?
233            .map_err(|e: i32| from_status_like_fdio!(zx::Status::from_raw(e)))?;
234
235        let written = response.actual_count.unwrap_or(0);
236        Ok(written as usize)
237    }
238
239    fn wait_async(
240        &self,
241        _locked: &mut Locked<FileOpsCore>,
242        _socket: &Socket,
243        _current_task: &CurrentTask,
244        waiter: &Waiter,
245        events: FdEvents,
246        handler: EventHandler,
247    ) -> WaitCanceler {
248        let signal_handler = SignalHandler {
249            inner: SignalHandlerInner::ZxHandle(Self::get_events_from_signals),
250            event_handler: handler,
251            err_code: None,
252        };
253        let canceler = waiter
254            .wake_on_zircon_signals(
255                &self.event,
256                Self::get_signals_from_events(events),
257                signal_handler,
258            )
259            .unwrap();
260        WaitCanceler::new_port(canceler)
261    }
262
263    fn query_events(
264        &self,
265        _locked: &mut Locked<FileOpsCore>,
266        _socket: &Socket,
267        _current_task: &CurrentTask,
268    ) -> Result<FdEvents, Errno> {
269        let signals = self
270            .event
271            .as_handle_ref()
272            .wait_one(zx::Signals::NONE, zx::MonotonicInstant::INFINITE_PAST)
273            .map_err(|e| from_status_like_fdio!(e))?;
274        Ok(Self::get_events_from_signals(signals))
275    }
276
277    fn shutdown(
278        &self,
279        _locked: &mut Locked<FileOpsCore>,
280        _socket: &Socket,
281        _how: SocketShutdownFlags,
282    ) -> Result<(), Errno> {
283        Ok(())
284    }
285
286    fn close(
287        &self,
288        _locked: &mut Locked<FileOpsCore>,
289        _current_task: &CurrentTask,
290        _socket: &Socket,
291    ) {
292        let _ = self.client.close(zx::MonotonicInstant::INFINITE);
293    }
294
295    fn getsockname(
296        &self,
297        _locked: &mut Locked<FileOpsCore>,
298        _socket: &Socket,
299    ) -> Result<SocketAddress, Errno> {
300        Ok(SocketAddress::default_for_domain(SocketDomain::Unix))
301    }
302
303    fn getpeername(
304        &self,
305        locked: &mut Locked<FileOpsCore>,
306        socket: &Socket,
307    ) -> Result<SocketAddress, Errno> {
308        self.getsockname(locked, socket)
309    }
310
311    fn setsockopt(
312        &self,
313        _locked: &mut Locked<FileOpsCore>,
314        _socket: &Socket,
315        _current_task: &CurrentTask,
316        _level: u32,
317        _optname: u32,
318        _optval: SockOptValue,
319    ) -> Result<(), Errno> {
320        error!(EOPNOTSUPP)
321    }
322
323    fn getsockopt(
324        &self,
325        _locked: &mut Locked<FileOpsCore>,
326        _socket: &Socket,
327        _current_task: &CurrentTask,
328        level: u32,
329        optname: u32,
330        _optlen: u32,
331    ) -> Result<Vec<u8>, Errno> {
332        if level != SOL_SOCKET {
333            return error!(EINVAL);
334        }
335        let data = match optname {
336            SO_LINGER => uapi::linger::default().as_bytes().to_vec(),
337            _ => return error!(EINVAL),
338        };
339
340        Ok(data)
341    }
342
343    fn to_handle(
344        &self,
345        _socket: &Socket,
346        _current_task: &CurrentTask,
347    ) -> Result<Option<zx::NullableHandle>, Errno> {
348        let (proxy, server) = zx::Channel::create();
349        self.client.clone(server.into()).map_err(|_| errno!(ECONNREFUSED))?;
350        Ok(Some(zx::NullableHandle::from(proxy).into()))
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::testing::spawn_kernel_and_run;
358    use crate::vfs::socket::SocketFile;
359    use crate::vfs::{VecInputBuffer, VecOutputBuffer};
360    use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream};
361    use fidl_fuchsia_unknown as funknown;
362    use fuchsia_async as fasync;
363    use futures::StreamExt;
364    use starnix_sync::Mutex;
365    use std::sync::Arc;
366
367    #[derive(Debug)]
368    struct Data {
369        bytes: Vec<u8>,
370        handles: Vec<zx::NullableHandle>,
371    }
372
373    impl Data {
374        fn try_clone(&mut self) -> Result<Self, zx::Status> {
375            let mut new_handles = vec![];
376            for handle in std::mem::take(&mut self.handles) {
377                let (new_handle, old_handle) = {
378                    let handle_type = handle.basic_info()?.object_type;
379                    match handle_type {
380                        zx::ObjectType::CHANNEL => {
381                            let channel = zx::Channel::from(handle);
382                            let client = funknown::CloneableSynchronousProxy::new(channel);
383                            let (proxy, server) = zx::Channel::create();
384                            let new_handle = client
385                                .clone(server.into())
386                                .map(|_| proxy.into())
387                                .map_err(|_| zx::Status::NOT_SUPPORTED);
388                            (new_handle, client.into_channel().into_handle())
389                        }
390                        _ => {
391                            let new_handle = handle.duplicate_handle(zx::Rights::SAME_RIGHTS);
392                            (new_handle, handle)
393                        }
394                    }
395                };
396                self.handles.push(old_handle);
397                new_handles.push(new_handle);
398            }
399            let new_handles =
400                new_handles.into_iter().collect::<Result<Vec<zx::NullableHandle>, zx::Status>>()?;
401            Ok(Self { bytes: self.bytes.clone(), handles: new_handles })
402        }
403    }
404
405    #[derive(Debug)]
406    struct UnixDomainSocketImplState {
407        _local_event: zx::EventPair,
408        remote_event: zx::EventPair,
409        buffer: Vec<Data>,
410    }
411
412    impl Default for UnixDomainSocketImplState {
413        fn default() -> Self {
414            let (_local_event, remote_event) = zx::EventPair::create();
415            Self { _local_event, remote_event, buffer: vec![] }
416        }
417    }
418
419    #[derive(Debug, Default)]
420    struct UnixDomainSocketImpl {
421        state: Mutex<UnixDomainSocketImplState>,
422        close_on_read: bool,
423    }
424
425    impl UnixDomainSocketImpl {
426        fn read(
427            &self,
428            payload: fbinder::UnixDomainSocketReadRequest,
429        ) -> Result<fbinder::UnixDomainSocketReadResponse, zx::Status> {
430            if self.close_on_read {
431                return Err(zx::Status::PEER_CLOSED);
432            }
433            let Some(count) = payload.count else {
434                return Err(zx::Status::INVALID_ARGS);
435            };
436            let Some(flags) = payload.flags else {
437                return Err(zx::Status::INVALID_ARGS);
438            };
439            let mut state = self.state.lock();
440            if state.buffer.is_empty() {
441                return Err(zx::Status::SHOULD_WAIT);
442            }
443            let mut data = if flags.contains(fbinder::ReadFlags::PEEK) {
444                state.buffer[0].try_clone()?
445            } else {
446                state.buffer.remove(0)
447            };
448
449            if state.buffer.is_empty() {
450                state.remote_event.as_handle_ref().signal(READABLE_SIGNAL, WRITABLE_SIGNAL)?;
451            }
452
453            let actual_count = data.bytes.len() as u64;
454            data.bytes.truncate(count as usize);
455
456            Ok(fbinder::UnixDomainSocketReadResponse {
457                data: Some(data.bytes),
458                data_original_length: Some(actual_count),
459                handles: Some(data.handles),
460                ..Default::default()
461            })
462        }
463
464        fn write(
465            &self,
466            payload: fbinder::UnixDomainSocketWriteRequest,
467        ) -> Result<fbinder::UnixDomainSocketWriteResponse, zx::Status> {
468            let Some(bytes) = payload.data else {
469                return Err(zx::Status::INVALID_ARGS);
470            };
471            let actual_count = bytes.len() as u64;
472            let Some(handles) = payload.handles else {
473                return Err(zx::Status::INVALID_ARGS);
474            };
475            let mut state = self.state.lock();
476            state.buffer.push(Data { bytes, handles });
477            state
478                .remote_event
479                .as_handle_ref()
480                .signal(zx::Signals::NONE, READABLE_SIGNAL | WRITABLE_SIGNAL)?;
481            Ok(fbinder::UnixDomainSocketWriteResponse {
482                actual_count: Some(actual_count),
483                ..Default::default()
484            })
485        }
486
487        async fn serve(self: &Arc<Self>, channel: zx::Channel) {
488            let stream = fbinder::UnixDomainSocketRequestStream::from_channel(
489                fasync::Channel::from_channel(channel),
490            );
491            stream
492                .for_each_concurrent(None, |message| async {
493                    match message {
494                        Ok(fbinder::UnixDomainSocketRequest::GetEvent { responder, .. }) => {
495                            let state = self.state.lock();
496                            let event = state
497                                .remote_event
498                                .duplicate_handle(zx::Rights::SAME_RIGHTS)
499                                .expect("duplicate event");
500                            responder
501                                .send(Ok(fbinder::UnixDomainSocketGetEventResponse {
502                                    event: Some(event),
503                                    ..Default::default()
504                                }))
505                                .expect("respond");
506                        }
507                        Ok(fbinder::UnixDomainSocketRequest::Read {
508                            payload, responder, ..
509                        }) => {
510                            assert!(
511                                responder
512                                    .send(self.read(payload).map_err(|e| e.into_raw()))
513                                    .is_ok()
514                            );
515                        }
516                        Ok(fbinder::UnixDomainSocketRequest::Write {
517                            payload, responder, ..
518                        }) => {
519                            assert!(
520                                responder
521                                    .send(self.write(payload).as_ref().map_err(|e| e.into_raw()))
522                                    .is_ok()
523                            );
524                        }
525                        Ok(fbinder::UnixDomainSocketRequest::Query { responder }) => {
526                            assert!(
527                                responder
528                                    .send(fbinder::UnixDomainSocketMarker::PROTOCOL_NAME.as_bytes())
529                                    .is_ok()
530                            );
531                        }
532                        Ok(fbinder::UnixDomainSocketRequest::Clone { request, .. }) => {
533                            self.serve(request.into()).await;
534                        }
535                        Ok(fbinder::UnixDomainSocketRequest::Close { responder }) => {
536                            assert!(responder.send(Ok(())).is_ok());
537                        }
538                        _ => {
539                            return;
540                        }
541                    }
542                })
543                .await;
544        }
545    }
546
547    #[::fuchsia::test]
548    async fn test_remote_uds() {
549        let (client, server) = zx::Channel::create();
550        let handle = std::thread::spawn(|| {
551            let mut executor = fasync::LocalExecutor::default();
552            executor.run_singlethreaded(async move {
553                let uds_impl = UnixDomainSocketImpl::default();
554                Arc::new(uds_impl).serve(server).await;
555            });
556        });
557        spawn_kernel_and_run(async move |locked, current_task| {
558            let original_file =
559                new_remote_file(locked, current_task, client.into(), OpenFlags::RDWR)
560                    .expect("new_remote_file");
561            assert!(original_file.node().is_sock());
562            let file = new_remote_file(
563                locked,
564                current_task,
565                original_file.to_handle(current_task).expect("to_handle").expect("has_handle"),
566                OpenFlags::RDWR,
567            )
568            .expect("new_remote_file");
569            let ancillary_data =
570                vec![AncillaryData::Unix(UnixControlData::Rights(vec![original_file]))];
571            let socket_ops = file.downcast_file::<SocketFile>().unwrap();
572            let data = "HelloWorld";
573            let mut input_buffer = VecInputBuffer::new(data.as_bytes());
574            assert_eq!(
575                socket_ops.sendmsg(
576                    locked,
577                    current_task,
578                    &file,
579                    &mut input_buffer,
580                    None,
581                    ancillary_data,
582                    SocketMessageFlags::empty()
583                ),
584                Ok(data.len())
585            );
586
587            let flags = SocketMessageFlags::CTRUNC
588                | SocketMessageFlags::TRUNC
589                | SocketMessageFlags::NOSIGNAL
590                | SocketMessageFlags::CMSG_CLOEXEC;
591
592            let mut buffer = VecOutputBuffer::new(1024);
593            let info = socket_ops
594                .recvmsg(
595                    locked,
596                    &current_task,
597                    &file,
598                    &mut buffer,
599                    flags | SocketMessageFlags::PEEK,
600                    None,
601                )
602                .expect("recvmsg");
603
604            assert_eq!(info.ancillary_data.len(), 1);
605            assert_eq!(info.message_length, data.len());
606
607            let mut buffer = VecOutputBuffer::new(1024);
608            let info = socket_ops
609                .recvmsg(locked, &current_task, &file, &mut buffer, flags, None)
610                .expect("recvmsg");
611
612            assert_eq!(info.ancillary_data.len(), 1);
613            assert_eq!(info.message_length, data.len());
614
615            let mut buffer = VecOutputBuffer::new(1024);
616            let err = socket_ops
617                .recvmsg(
618                    locked,
619                    &current_task,
620                    &file,
621                    &mut buffer,
622                    flags | SocketMessageFlags::DONTWAIT,
623                    None,
624                )
625                .unwrap_err();
626            assert_eq!(err, errno!(EAGAIN));
627        })
628        .await;
629        handle.join().expect("join");
630    }
631
632    #[::fuchsia::test]
633    async fn test_remote_uds_peer_closed() {
634        let (client, server) = zx::Channel::create();
635        let handle = std::thread::spawn(move || {
636            let mut executor = fasync::LocalExecutor::default();
637            executor.run_singlethreaded(async move {
638                let uds_impl = UnixDomainSocketImpl { close_on_read: true, ..Default::default() };
639                Arc::new(uds_impl).serve(server).await;
640            });
641        });
642        spawn_kernel_and_run(async move |locked, current_task| {
643            let file = new_remote_file(locked, current_task, client.into(), OpenFlags::RDWR)
644                .expect("new_remote_file");
645            let socket_ops = file.downcast_file::<SocketFile>().unwrap();
646
647            let mut buffer = VecOutputBuffer::new(1024);
648            let err = socket_ops
649                .recvmsg(
650                    locked,
651                    &current_task,
652                    &file,
653                    &mut buffer,
654                    SocketMessageFlags::empty(),
655                    None,
656                )
657                .unwrap_err();
658            assert_eq!(err, errno!(ECONNRESET));
659        })
660        .await;
661        handle.join().expect("join");
662    }
663}