1use crate::fs::fuchsia::{OpenFlags, new_remote_file};
6use crate::task::{
7 CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, WaitCanceler, Waiter,
8};
9use crate::vfs::buffers::{InputBuffer, OutputBuffer};
10use crate::vfs::socket::{
11 SockOptValue, Socket, SocketAddress, SocketDomain, SocketHandle, SocketMessageFlags, SocketOps,
12 SocketPeer, SocketProtocol, SocketShutdownFlags, SocketType,
13};
14use crate::vfs::{AncillaryData, FileHandle, MessageReadInfo, UnixControlData};
15use fidl::endpoints::SynchronousProxy;
16use fidl_fuchsia_io as fio;
17use fidl_fuchsia_starnix_binder as fbinder;
18use linux_uapi::{SO_LINGER, SOL_SOCKET};
19use starnix_sync::{FileOpsCore, Locked};
20use starnix_uapi::auth::Credentials;
21use starnix_uapi::errors::Errno;
22use starnix_uapi::vfs::FdEvents;
23use starnix_uapi::{errno, error, from_status_like_fdio, uapi, ucred};
24use std::sync::Arc;
25use zerocopy::IntoBytes;
26static READABLE_SIGNAL: zx::Signals =
27 zx::Signals::from_bits_retain(fio::FileSignal::READABLE.bits());
28static WRITABLE_SIGNAL: zx::Signals =
29 zx::Signals::from_bits_retain(fio::FileSignal::WRITABLE.bits());
30
31pub struct RemoteUnixDomainSocket {
32 client: fbinder::UnixDomainSocketSynchronousProxy,
33 event: zx::EventPair,
34 remote_creds: Arc<Credentials>,
35}
36
37impl RemoteUnixDomainSocket {
38 pub fn new(channel: zx::Channel, remote_creds: Arc<Credentials>) -> Result<Self, Errno> {
39 let client = fbinder::UnixDomainSocketSynchronousProxy::from_channel(channel);
40 let response = client
41 .get_event(
42 &fbinder::UnixDomainSocketGetEventRequest::default(),
43 zx::MonotonicInstant::INFINITE,
44 )
45 .map_err(|_| errno!(ECONNREFUSED))?
46 .map_err(|e: i32| from_status_like_fdio!(zx::Status::from_raw(e)))?;
47 let event = response.event.ok_or_else(|| errno!(ECONNREFUSED))?;
48 Ok(Self { client, event, remote_creds })
49 }
50
51 fn get_signals_from_events(events: FdEvents) -> zx::Signals {
52 let mut signals = zx::Signals::NONE;
53 if events.contains(FdEvents::POLLIN) {
54 signals |= READABLE_SIGNAL;
55 }
56 if events.contains(FdEvents::POLLOUT) {
57 signals |= WRITABLE_SIGNAL;
58 }
59 signals
60 }
61
62 fn get_events_from_signals(signals: zx::Signals) -> FdEvents {
63 let mut events = FdEvents::empty();
64 if signals.contains(READABLE_SIGNAL) {
65 events |= FdEvents::POLLIN;
66 }
67 if signals.contains(WRITABLE_SIGNAL) {
68 events |= FdEvents::POLLOUT;
69 }
70 events
71 }
72
73 fn with_remote_creds<F, R>(&self, current_task: &CurrentTask, f: F) -> Result<R, Errno>
75 where
76 F: FnOnce() -> Result<R, Errno>,
77 {
78 current_task.override_creds(self.remote_creds.clone(), f)
79 }
80}
81
82impl SocketOps for RemoteUnixDomainSocket {
83 fn get_socket_info(&self) -> Result<(SocketDomain, SocketType, SocketProtocol), Errno> {
84 Ok((SocketDomain::Unix, SocketType::Datagram, SocketProtocol::from_raw(0)))
85 }
86
87 fn connect(
88 &self,
89 _locked: &mut Locked<FileOpsCore>,
90 _socket: &SocketHandle,
91 _current_task: &CurrentTask,
92 _peer: SocketPeer,
93 ) -> Result<(), Errno> {
94 error!(EISCONN)
95 }
96
97 fn listen(
98 &self,
99 _locked: &mut Locked<FileOpsCore>,
100 _socket: &Socket,
101 _backlog: i32,
102 _credentials: ucred,
103 ) -> Result<(), Errno> {
104 error!(EOPNOTSUPP)
105 }
106
107 fn accept(
108 &self,
109 _locked: &mut Locked<FileOpsCore>,
110 _socket: &Socket,
111 _current_task: &CurrentTask,
112 ) -> Result<SocketHandle, Errno> {
113 error!(EOPNOTSUPP)
114 }
115
116 fn bind(
117 &self,
118 _locked: &mut Locked<FileOpsCore>,
119 _socket: &Socket,
120 _current_task: &CurrentTask,
121 _socket_address: SocketAddress,
122 ) -> Result<(), Errno> {
123 error!(EOPNOTSUPP)
124 }
125
126 fn read(
127 &self,
128 locked: &mut Locked<FileOpsCore>,
129 _socket: &Socket,
130 current_task: &CurrentTask,
131 data: &mut dyn OutputBuffer,
132 flags: SocketMessageFlags,
133 ) -> Result<MessageReadInfo, Errno> {
134 if self.client.is_closed().map_err(|_| errno!(ECONNREFUSED))? {
135 return error!(ECONNREFUSED);
136 }
137 let mut read_flags = fbinder::ReadFlags::empty();
138 if flags.contains(SocketMessageFlags::PEEK) {
139 read_flags |= fbinder::ReadFlags::PEEK;
140 }
141
142 let response = self
143 .client
144 .read(
145 &fbinder::UnixDomainSocketReadRequest {
146 count: Some(data.available() as u64),
147 flags: Some(read_flags),
148 ..Default::default()
149 },
150 zx::MonotonicInstant::INFINITE,
151 )
152 .map_err(|_| errno!(ECONNREFUSED))?
153 .map_err(|e: i32| {
154 let status = zx::Status::from_raw(e);
155 if status == zx::Status::PEER_CLOSED {
156 errno!(ECONNRESET)
157 } else {
158 from_status_like_fdio!(status)
159 }
160 })?;
161
162 let written =
163 if let Some(received_data) = response.data { data.write(&received_data)? } else { 0 };
164
165 let mut file_handles: Vec<FileHandle> = vec![];
166 if let Some(handles) = response.handles {
167 self.with_remote_creds(current_task, || {
170 for handle in handles {
171 file_handles.push(new_remote_file(
172 locked,
173 current_task,
174 handle,
175 OpenFlags::RDWR,
176 )?);
177 }
178 Ok(())
179 })?;
180 }
181 let ancillary_data = vec![AncillaryData::Unix(UnixControlData::Rights(file_handles))];
182
183 let message_length = response.data_original_length.unwrap_or(written as u64) as usize;
184
185 Ok(MessageReadInfo { bytes_read: written, message_length, address: None, ancillary_data })
186 }
187
188 fn write(
189 &self,
190 _locked: &mut Locked<FileOpsCore>,
191 _socket: &Socket,
192 current_task: &CurrentTask,
193 data: &mut dyn InputBuffer,
194 _dest_address: &mut Option<SocketAddress>,
195 ancillary_data: &mut Vec<AncillaryData>,
196 ) -> Result<usize, Errno> {
197 if self.client.is_closed().map_err(|_| errno!(ECONNREFUSED))? {
198 return error!(ECONNREFUSED);
199 }
200
201 let mut handles: Vec<zx::NullableHandle> = vec![];
202 for data in ancillary_data {
203 match data {
204 AncillaryData::Unix(UnixControlData::Rights(file_handles)) => {
205 self.with_remote_creds(current_task, || {
207 for file_handle in file_handles {
208 let Some(handle) = file_handle.to_handle(current_task)? else {
209 return error!(EINVAL);
210 };
211 handles.push(handle);
212 }
213 Ok(())
214 })?;
215 }
216 _ => return error!(EINVAL),
217 }
218 }
219
220 let bytes = data.read_all()?;
221
222 let response = self
223 .client
224 .write(
225 fbinder::UnixDomainSocketWriteRequest {
226 data: Some(bytes),
227 handles: Some(handles),
228 ..Default::default()
229 },
230 zx::MonotonicInstant::INFINITE,
231 )
232 .map_err(|_| errno!(ECONNREFUSED))?
233 .map_err(|e: i32| from_status_like_fdio!(zx::Status::from_raw(e)))?;
234
235 let written = response.actual_count.unwrap_or(0);
236 Ok(written as usize)
237 }
238
239 fn wait_async(
240 &self,
241 _locked: &mut Locked<FileOpsCore>,
242 _socket: &Socket,
243 _current_task: &CurrentTask,
244 waiter: &Waiter,
245 events: FdEvents,
246 handler: EventHandler,
247 ) -> WaitCanceler {
248 let signal_handler = SignalHandler {
249 inner: SignalHandlerInner::ZxHandle(Self::get_events_from_signals),
250 event_handler: handler,
251 err_code: None,
252 };
253 let canceler = waiter
254 .wake_on_zircon_signals(
255 &self.event,
256 Self::get_signals_from_events(events),
257 signal_handler,
258 )
259 .unwrap();
260 WaitCanceler::new_port(canceler)
261 }
262
263 fn query_events(
264 &self,
265 _locked: &mut Locked<FileOpsCore>,
266 _socket: &Socket,
267 _current_task: &CurrentTask,
268 ) -> Result<FdEvents, Errno> {
269 let signals = self
270 .event
271 .as_handle_ref()
272 .wait_one(zx::Signals::NONE, zx::MonotonicInstant::INFINITE_PAST)
273 .map_err(|e| from_status_like_fdio!(e))?;
274 Ok(Self::get_events_from_signals(signals))
275 }
276
277 fn shutdown(
278 &self,
279 _locked: &mut Locked<FileOpsCore>,
280 _socket: &Socket,
281 _how: SocketShutdownFlags,
282 ) -> Result<(), Errno> {
283 Ok(())
284 }
285
286 fn close(
287 &self,
288 _locked: &mut Locked<FileOpsCore>,
289 _current_task: &CurrentTask,
290 _socket: &Socket,
291 ) {
292 let _ = self.client.close(zx::MonotonicInstant::INFINITE);
293 }
294
295 fn getsockname(
296 &self,
297 _locked: &mut Locked<FileOpsCore>,
298 _socket: &Socket,
299 ) -> Result<SocketAddress, Errno> {
300 Ok(SocketAddress::default_for_domain(SocketDomain::Unix))
301 }
302
303 fn getpeername(
304 &self,
305 locked: &mut Locked<FileOpsCore>,
306 socket: &Socket,
307 ) -> Result<SocketAddress, Errno> {
308 self.getsockname(locked, socket)
309 }
310
311 fn setsockopt(
312 &self,
313 _locked: &mut Locked<FileOpsCore>,
314 _socket: &Socket,
315 _current_task: &CurrentTask,
316 _level: u32,
317 _optname: u32,
318 _optval: SockOptValue,
319 ) -> Result<(), Errno> {
320 error!(EOPNOTSUPP)
321 }
322
323 fn getsockopt(
324 &self,
325 _locked: &mut Locked<FileOpsCore>,
326 _socket: &Socket,
327 _current_task: &CurrentTask,
328 level: u32,
329 optname: u32,
330 _optlen: u32,
331 ) -> Result<Vec<u8>, Errno> {
332 if level != SOL_SOCKET {
333 return error!(EINVAL);
334 }
335 let data = match optname {
336 SO_LINGER => uapi::linger::default().as_bytes().to_vec(),
337 _ => return error!(EINVAL),
338 };
339
340 Ok(data)
341 }
342
343 fn to_handle(
344 &self,
345 _socket: &Socket,
346 _current_task: &CurrentTask,
347 ) -> Result<Option<zx::NullableHandle>, Errno> {
348 let (proxy, server) = zx::Channel::create();
349 self.client.clone(server.into()).map_err(|_| errno!(ECONNREFUSED))?;
350 Ok(Some(zx::NullableHandle::from(proxy).into()))
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::testing::spawn_kernel_and_run;
358 use crate::vfs::socket::SocketFile;
359 use crate::vfs::{VecInputBuffer, VecOutputBuffer};
360 use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream};
361 use fidl_fuchsia_unknown as funknown;
362 use fuchsia_async as fasync;
363 use futures::StreamExt;
364 use starnix_sync::Mutex;
365 use std::sync::Arc;
366
367 #[derive(Debug)]
368 struct Data {
369 bytes: Vec<u8>,
370 handles: Vec<zx::NullableHandle>,
371 }
372
373 impl Data {
374 fn try_clone(&mut self) -> Result<Self, zx::Status> {
375 let mut new_handles = vec![];
376 for handle in std::mem::take(&mut self.handles) {
377 let (new_handle, old_handle) = {
378 let handle_type = handle.basic_info()?.object_type;
379 match handle_type {
380 zx::ObjectType::CHANNEL => {
381 let channel = zx::Channel::from(handle);
382 let client = funknown::CloneableSynchronousProxy::new(channel);
383 let (proxy, server) = zx::Channel::create();
384 let new_handle = client
385 .clone(server.into())
386 .map(|_| proxy.into())
387 .map_err(|_| zx::Status::NOT_SUPPORTED);
388 (new_handle, client.into_channel().into_handle())
389 }
390 _ => {
391 let new_handle = handle.duplicate_handle(zx::Rights::SAME_RIGHTS);
392 (new_handle, handle)
393 }
394 }
395 };
396 self.handles.push(old_handle);
397 new_handles.push(new_handle);
398 }
399 let new_handles =
400 new_handles.into_iter().collect::<Result<Vec<zx::NullableHandle>, zx::Status>>()?;
401 Ok(Self { bytes: self.bytes.clone(), handles: new_handles })
402 }
403 }
404
405 #[derive(Debug)]
406 struct UnixDomainSocketImplState {
407 _local_event: zx::EventPair,
408 remote_event: zx::EventPair,
409 buffer: Vec<Data>,
410 }
411
412 impl Default for UnixDomainSocketImplState {
413 fn default() -> Self {
414 let (_local_event, remote_event) = zx::EventPair::create();
415 Self { _local_event, remote_event, buffer: vec![] }
416 }
417 }
418
419 #[derive(Debug, Default)]
420 struct UnixDomainSocketImpl {
421 state: Mutex<UnixDomainSocketImplState>,
422 close_on_read: bool,
423 }
424
425 impl UnixDomainSocketImpl {
426 fn read(
427 &self,
428 payload: fbinder::UnixDomainSocketReadRequest,
429 ) -> Result<fbinder::UnixDomainSocketReadResponse, zx::Status> {
430 if self.close_on_read {
431 return Err(zx::Status::PEER_CLOSED);
432 }
433 let Some(count) = payload.count else {
434 return Err(zx::Status::INVALID_ARGS);
435 };
436 let Some(flags) = payload.flags else {
437 return Err(zx::Status::INVALID_ARGS);
438 };
439 let mut state = self.state.lock();
440 if state.buffer.is_empty() {
441 return Err(zx::Status::SHOULD_WAIT);
442 }
443 let mut data = if flags.contains(fbinder::ReadFlags::PEEK) {
444 state.buffer[0].try_clone()?
445 } else {
446 state.buffer.remove(0)
447 };
448
449 if state.buffer.is_empty() {
450 state.remote_event.as_handle_ref().signal(READABLE_SIGNAL, WRITABLE_SIGNAL)?;
451 }
452
453 let actual_count = data.bytes.len() as u64;
454 data.bytes.truncate(count as usize);
455
456 Ok(fbinder::UnixDomainSocketReadResponse {
457 data: Some(data.bytes),
458 data_original_length: Some(actual_count),
459 handles: Some(data.handles),
460 ..Default::default()
461 })
462 }
463
464 fn write(
465 &self,
466 payload: fbinder::UnixDomainSocketWriteRequest,
467 ) -> Result<fbinder::UnixDomainSocketWriteResponse, zx::Status> {
468 let Some(bytes) = payload.data else {
469 return Err(zx::Status::INVALID_ARGS);
470 };
471 let actual_count = bytes.len() as u64;
472 let Some(handles) = payload.handles else {
473 return Err(zx::Status::INVALID_ARGS);
474 };
475 let mut state = self.state.lock();
476 state.buffer.push(Data { bytes, handles });
477 state
478 .remote_event
479 .as_handle_ref()
480 .signal(zx::Signals::NONE, READABLE_SIGNAL | WRITABLE_SIGNAL)?;
481 Ok(fbinder::UnixDomainSocketWriteResponse {
482 actual_count: Some(actual_count),
483 ..Default::default()
484 })
485 }
486
487 async fn serve(self: &Arc<Self>, channel: zx::Channel) {
488 let stream = fbinder::UnixDomainSocketRequestStream::from_channel(
489 fasync::Channel::from_channel(channel),
490 );
491 stream
492 .for_each_concurrent(None, |message| async {
493 match message {
494 Ok(fbinder::UnixDomainSocketRequest::GetEvent { responder, .. }) => {
495 let state = self.state.lock();
496 let event = state
497 .remote_event
498 .duplicate_handle(zx::Rights::SAME_RIGHTS)
499 .expect("duplicate event");
500 responder
501 .send(Ok(fbinder::UnixDomainSocketGetEventResponse {
502 event: Some(event),
503 ..Default::default()
504 }))
505 .expect("respond");
506 }
507 Ok(fbinder::UnixDomainSocketRequest::Read {
508 payload, responder, ..
509 }) => {
510 assert!(
511 responder
512 .send(self.read(payload).map_err(|e| e.into_raw()))
513 .is_ok()
514 );
515 }
516 Ok(fbinder::UnixDomainSocketRequest::Write {
517 payload, responder, ..
518 }) => {
519 assert!(
520 responder
521 .send(self.write(payload).as_ref().map_err(|e| e.into_raw()))
522 .is_ok()
523 );
524 }
525 Ok(fbinder::UnixDomainSocketRequest::Query { responder }) => {
526 assert!(
527 responder
528 .send(fbinder::UnixDomainSocketMarker::PROTOCOL_NAME.as_bytes())
529 .is_ok()
530 );
531 }
532 Ok(fbinder::UnixDomainSocketRequest::Clone { request, .. }) => {
533 self.serve(request.into()).await;
534 }
535 Ok(fbinder::UnixDomainSocketRequest::Close { responder }) => {
536 assert!(responder.send(Ok(())).is_ok());
537 }
538 _ => {
539 return;
540 }
541 }
542 })
543 .await;
544 }
545 }
546
547 #[::fuchsia::test]
548 async fn test_remote_uds() {
549 let (client, server) = zx::Channel::create();
550 let handle = std::thread::spawn(|| {
551 let mut executor = fasync::LocalExecutor::default();
552 executor.run_singlethreaded(async move {
553 let uds_impl = UnixDomainSocketImpl::default();
554 Arc::new(uds_impl).serve(server).await;
555 });
556 });
557 spawn_kernel_and_run(async move |locked, current_task| {
558 let original_file =
559 new_remote_file(locked, current_task, client.into(), OpenFlags::RDWR)
560 .expect("new_remote_file");
561 assert!(original_file.node().is_sock());
562 let file = new_remote_file(
563 locked,
564 current_task,
565 original_file.to_handle(current_task).expect("to_handle").expect("has_handle"),
566 OpenFlags::RDWR,
567 )
568 .expect("new_remote_file");
569 let ancillary_data =
570 vec![AncillaryData::Unix(UnixControlData::Rights(vec![original_file]))];
571 let socket_ops = file.downcast_file::<SocketFile>().unwrap();
572 let data = "HelloWorld";
573 let mut input_buffer = VecInputBuffer::new(data.as_bytes());
574 assert_eq!(
575 socket_ops.sendmsg(
576 locked,
577 current_task,
578 &file,
579 &mut input_buffer,
580 None,
581 ancillary_data,
582 SocketMessageFlags::empty()
583 ),
584 Ok(data.len())
585 );
586
587 let flags = SocketMessageFlags::CTRUNC
588 | SocketMessageFlags::TRUNC
589 | SocketMessageFlags::NOSIGNAL
590 | SocketMessageFlags::CMSG_CLOEXEC;
591
592 let mut buffer = VecOutputBuffer::new(1024);
593 let info = socket_ops
594 .recvmsg(
595 locked,
596 ¤t_task,
597 &file,
598 &mut buffer,
599 flags | SocketMessageFlags::PEEK,
600 None,
601 )
602 .expect("recvmsg");
603
604 assert_eq!(info.ancillary_data.len(), 1);
605 assert_eq!(info.message_length, data.len());
606
607 let mut buffer = VecOutputBuffer::new(1024);
608 let info = socket_ops
609 .recvmsg(locked, ¤t_task, &file, &mut buffer, flags, None)
610 .expect("recvmsg");
611
612 assert_eq!(info.ancillary_data.len(), 1);
613 assert_eq!(info.message_length, data.len());
614
615 let mut buffer = VecOutputBuffer::new(1024);
616 let err = socket_ops
617 .recvmsg(
618 locked,
619 ¤t_task,
620 &file,
621 &mut buffer,
622 flags | SocketMessageFlags::DONTWAIT,
623 None,
624 )
625 .unwrap_err();
626 assert_eq!(err, errno!(EAGAIN));
627 })
628 .await;
629 handle.join().expect("join");
630 }
631
632 #[::fuchsia::test]
633 async fn test_remote_uds_peer_closed() {
634 let (client, server) = zx::Channel::create();
635 let handle = std::thread::spawn(move || {
636 let mut executor = fasync::LocalExecutor::default();
637 executor.run_singlethreaded(async move {
638 let uds_impl = UnixDomainSocketImpl { close_on_read: true, ..Default::default() };
639 Arc::new(uds_impl).serve(server).await;
640 });
641 });
642 spawn_kernel_and_run(async move |locked, current_task| {
643 let file = new_remote_file(locked, current_task, client.into(), OpenFlags::RDWR)
644 .expect("new_remote_file");
645 let socket_ops = file.downcast_file::<SocketFile>().unwrap();
646
647 let mut buffer = VecOutputBuffer::new(1024);
648 let err = socket_ops
649 .recvmsg(
650 locked,
651 ¤t_task,
652 &file,
653 &mut buffer,
654 SocketMessageFlags::empty(),
655 None,
656 )
657 .unwrap_err();
658 assert_eq!(err, errno!(ECONNRESET));
659 })
660 .await;
661 handle.join().expect("join");
662 }
663}