fidl_next_protocol/fuchsia/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A transport implementation which uses Zircon channels.
6
7use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::ptr::NonNull;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19    ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20    zx_channel_write, zx_handle_t,
21};
22use zx::{AsHandleRef as _, Channel, Handle, HandleBased, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26/// The shared part of a channel.
27pub struct Shared {
28    channel: RWHandle<Channel>,
29    // TODO: recycle send/recv buffers to reduce allocations
30}
31
32impl Shared {
33    fn new(channel: Channel) -> Self {
34        Self { channel: RWHandle::new(channel) }
35    }
36}
37
38/// A channel buffer.
39#[derive(Default)]
40pub struct Buffer {
41    handles: Vec<Handle>,
42    chunks: Vec<Chunk>,
43}
44
45impl Buffer {
46    /// New buffer.
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    /// Retrieve the handles.
52    pub fn handles(&self) -> &[Handle] {
53        &self.handles
54    }
55
56    /// Retrieve the bytes.
57    pub fn bytes(&self) -> Vec<u8> {
58        self.chunks.iter().flat_map(|chunk| chunk.to_le_bytes()).collect()
59    }
60
61    /// Make a buffer out of handles and chunks.
62    pub fn from_raw(handles: Vec<Handle>, chunks: Vec<Chunk>) -> Self {
63        Self { handles, chunks }
64    }
65
66    /// Make a buffer out of handles and bytes. The bytes will be copied.
67    pub fn from_raw_bytes(handles: Vec<Handle>, bytes: impl AsRef<[u8]>) -> Self {
68        let bytes = bytes.as_ref();
69        assert!(bytes.len() % CHUNK_SIZE == 0);
70        let chunks = bytes
71            .chunks_exact(CHUNK_SIZE)
72            .map(|c| fidl_next_codec::WireU64(u64::from_le_bytes(c.try_into().unwrap())))
73            .collect();
74        Self::from_raw(handles, chunks)
75    }
76}
77
78impl InternalHandleEncoder for Buffer {
79    #[inline]
80    fn __internal_handle_count(&self) -> usize {
81        self.handles.len()
82    }
83}
84
85impl Encoder for Buffer {
86    #[inline]
87    fn bytes_written(&self) -> usize {
88        Encoder::bytes_written(&self.chunks)
89    }
90
91    #[inline]
92    fn write_zeroes(&mut self, len: usize) {
93        Encoder::write_zeroes(&mut self.chunks, len)
94    }
95
96    #[inline]
97    fn write(&mut self, bytes: &[u8]) {
98        Encoder::write(&mut self.chunks, bytes)
99    }
100
101    #[inline]
102    fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
103        Encoder::rewrite(&mut self.chunks, pos, bytes)
104    }
105}
106
107impl HandleEncoder for Buffer {
108    fn push_handle(&mut self, handle: Handle) -> Result<(), EncodeError> {
109        self.handles.push(handle);
110        Ok(())
111    }
112
113    fn handles_pushed(&self) -> usize {
114        self.handles.len()
115    }
116}
117
118/// The state for a channel send future.
119pub struct SendFutureState {
120    buffer: Buffer,
121}
122
123/// The exclusive part of a channel.
124pub struct Exclusive {
125    _phantom: PhantomData<()>,
126}
127
128/// The state for a channel receive future.
129pub struct RecvFutureState {
130    buffer: Option<Buffer>,
131}
132
133/// A channel receive buffer.
134pub struct RecvBuffer {
135    buffer: Buffer,
136    chunks_taken: usize,
137    handles_taken: usize,
138}
139
140impl RecvBuffer {
141    /// Create a new receive buffer from a buffer.
142    pub fn new(buffer: Buffer) -> Self {
143        Self { buffer, chunks_taken: 0, handles_taken: 0 }
144    }
145}
146
147unsafe impl Decoder for RecvBuffer {
148    fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
149        if count > self.buffer.chunks.len() - self.chunks_taken {
150            return Err(DecodeError::InsufficientData);
151        }
152
153        let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
154        self.chunks_taken += count;
155
156        unsafe { Ok(NonNull::new_unchecked(chunks)) }
157    }
158
159    fn commit(&mut self) {
160        for handle in &mut self.buffer.handles[0..self.handles_taken] {
161            // This handle was taken. To commit the current changes, we need to forget it.
162            let _ = replace(handle, Handle::invalid()).into_raw();
163        }
164    }
165
166    fn finish(&self) -> Result<(), DecodeError> {
167        if self.chunks_taken != self.buffer.chunks.len() {
168            return Err(DecodeError::ExtraBytes {
169                num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
170            });
171        }
172
173        if self.handles_taken != self.buffer.handles.len() {
174            return Err(DecodeError::ExtraHandles {
175                num_extra: self.buffer.handles.len() - self.handles_taken,
176            });
177        }
178
179        Ok(())
180    }
181}
182
183impl InternalHandleDecoder for RecvBuffer {
184    fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
185        if count > self.buffer.handles.len() - self.handles_taken {
186            return Err(DecodeError::InsufficientHandles);
187        }
188
189        for i in self.handles_taken..self.handles_taken + count {
190            let handle = replace(&mut self.buffer.handles[i], Handle::invalid());
191            drop(handle);
192        }
193        self.handles_taken += count;
194
195        Ok(())
196    }
197
198    fn __internal_handles_remaining(&self) -> usize {
199        self.buffer.handles.len() - self.handles_taken
200    }
201}
202
203impl HandleDecoder for RecvBuffer {
204    fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
205        if self.handles_taken >= self.buffer.handles.len() {
206            return Err(DecodeError::InsufficientHandles);
207        }
208
209        let handle = self.buffer.handles[self.handles_taken].raw_handle();
210        self.handles_taken += 1;
211
212        Ok(handle)
213    }
214
215    fn handles_remaining(&mut self) -> usize {
216        self.buffer.handles.len() - self.handles_taken
217    }
218}
219
220impl Transport for Channel {
221    type Error = Status;
222
223    fn split(self) -> (Self::Shared, Self::Exclusive) {
224        (Shared::new(self), Exclusive { _phantom: PhantomData })
225    }
226
227    type Shared = Shared;
228    type SendBuffer = Buffer;
229    type SendFutureState = SendFutureState;
230
231    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
232        Buffer::new()
233    }
234
235    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
236        SendFutureState { buffer }
237    }
238
239    fn poll_send(
240        future_state: Pin<&mut Self::SendFutureState>,
241        _: &mut Context<'_>,
242        shared: &Self::Shared,
243    ) -> Poll<Result<(), Option<Self::Error>>> {
244        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
245    }
246
247    type Exclusive = Exclusive;
248    type RecvFutureState = RecvFutureState;
249    type RecvBuffer = RecvBuffer;
250
251    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
252        RecvFutureState { buffer: Some(Buffer::new()) }
253    }
254
255    fn poll_recv(
256        mut future_state: Pin<&mut Self::RecvFutureState>,
257        cx: &mut Context<'_>,
258        shared: &Self::Shared,
259        _: &mut Self::Exclusive,
260    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
261        let buffer = future_state.buffer.as_mut().unwrap();
262
263        let mut actual_bytes = 0;
264        let mut actual_handles = 0;
265
266        loop {
267            let result = unsafe {
268                zx_channel_read(
269                    shared.channel.get_ref().raw_handle(),
270                    0,
271                    buffer.chunks.as_mut_ptr().cast(),
272                    buffer.handles.as_mut_ptr().cast(),
273                    (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
274                    buffer.handles.capacity() as u32,
275                    &mut actual_bytes,
276                    &mut actual_handles,
277                )
278            };
279
280            match result {
281                ZX_OK => {
282                    unsafe {
283                        buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
284                        buffer.handles.set_len(actual_handles as usize);
285                    }
286                    return Poll::Ready(Ok(RecvBuffer {
287                        buffer: future_state.buffer.take().unwrap(),
288                        chunks_taken: 0,
289                        handles_taken: 0,
290                    }));
291                }
292                ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
293                ZX_ERR_BUFFER_TOO_SMALL => {
294                    let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
295                    buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
296                    buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
297                }
298                ZX_ERR_SHOULD_WAIT => {
299                    if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
300                        return Poll::Pending;
301                    }
302                }
303                raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
304            }
305        }
306    }
307}
308
309impl NonBlockingTransport for Channel {
310    fn send_immediately(
311        future_state: &mut Self::SendFutureState,
312        shared: &Self::Shared,
313    ) -> Result<(), Option<Self::Error>> {
314        let result = unsafe {
315            zx_channel_write(
316                shared.channel.get_ref().raw_handle(),
317                0,
318                future_state.buffer.chunks.as_ptr().cast::<u8>(),
319                (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
320                future_state.buffer.handles.as_ptr().cast(),
321                future_state.buffer.handles.len() as u32,
322            )
323        };
324
325        match result {
326            ZX_OK => {
327                // Handles were written to the channel, so we must not drop them.
328                unsafe {
329                    future_state.buffer.handles.set_len(0);
330                }
331                Ok(())
332            }
333            ZX_ERR_PEER_CLOSED => Err(None),
334            _ => Err(Some(Status::from_raw(result))),
335        }
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use core::mem::MaybeUninit;
342
343    use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder, WireHandle};
344    use fidl_next_codec::{
345        Decode, DecodeError, DecoderExt as _, Encodable, Encode, EncodeError, EncoderExt as _,
346        FromWire, Slot, Wire, munge,
347    };
348    use fuchsia_async as fasync;
349    use zx::{AsHandleRef, Channel, Handle, HandleBased as _, Instant, Signals, WaitResult};
350
351    use crate::fuchsia::channel::{Buffer, RecvBuffer};
352    use crate::testing::*;
353
354    #[fasync::run_singlethreaded(test)]
355    async fn close_on_drop() {
356        test_close_on_drop(Channel::create).await;
357    }
358
359    #[fasync::run_singlethreaded(test)]
360    async fn one_way() {
361        test_one_way(Channel::create).await;
362    }
363
364    #[fasync::run_singlethreaded(test)]
365    async fn one_way_nonblocking() {
366        test_one_way_nonblocking(Channel::create).await;
367    }
368
369    #[fasync::run_singlethreaded(test)]
370    async fn two_way() {
371        test_two_way(Channel::create).await;
372    }
373
374    #[fasync::run_singlethreaded(test)]
375    async fn multiple_two_way() {
376        test_multiple_two_way(Channel::create).await;
377    }
378
379    #[fasync::run_singlethreaded(test)]
380    async fn event() {
381        test_event(Channel::create).await;
382    }
383
384    struct HandleAndBoolean {
385        handle: Handle,
386        boolean: bool,
387    }
388
389    #[derive(Debug)]
390    #[repr(C)]
391    struct WireHandleAndBoolean {
392        handle: WireHandle,
393        boolean: bool,
394    }
395
396    unsafe impl Wire for WireHandleAndBoolean {
397        type Decoded<'de> = Self;
398
399        fn zero_padding(out: &mut MaybeUninit<Self>) {
400            unsafe {
401                out.as_mut_ptr().write_bytes(0, 1);
402            }
403        }
404    }
405
406    impl Encodable for HandleAndBoolean {
407        type Encoded = WireHandleAndBoolean;
408    }
409
410    unsafe impl<E: HandleEncoder + ?Sized> Encode<E> for HandleAndBoolean {
411        fn encode(
412            self,
413            encoder: &mut E,
414            out: &mut MaybeUninit<Self::Encoded>,
415        ) -> Result<(), EncodeError> {
416            munge!(let Self::Encoded { handle, boolean } = out);
417            self.handle.encode(encoder, handle)?;
418            self.boolean.encode(encoder, boolean)?;
419            Ok(())
420        }
421    }
422
423    unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for WireHandleAndBoolean {
424        fn decode(slot: Slot<'_, Self>, decoder: &mut D) -> Result<(), DecodeError> {
425            munge!(let Self { handle, boolean } = slot);
426            Decode::decode(handle, decoder)?;
427            Decode::decode(boolean, decoder)?;
428            Ok(())
429        }
430    }
431
432    impl FromWire<WireHandleAndBoolean> for HandleAndBoolean {
433        fn from_wire(wire: WireHandleAndBoolean) -> Self {
434            Self { handle: Handle::from_wire(wire.handle), boolean: wire.boolean }
435        }
436    }
437
438    #[test]
439    fn partial_decode_drops_handles() {
440        let (encode_end, check_end) = Channel::create();
441
442        let mut buffer = Buffer::new();
443        buffer
444            .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
445            .expect("encoding should succeed");
446        // Modify the buffer so that the boolean value is invalid
447        *buffer.chunks[0] |= 0x00000002_00000000;
448
449        let mut recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
450        (&mut recv_buffer)
451            .decode_owned::<WireHandleAndBoolean>()
452            .expect_err("decoding an invalid boolean should fail");
453
454        // Decoding failed, so the handle should still be in the buffer.
455        assert_eq!(
456            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
457            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
458        );
459
460        drop(recv_buffer);
461
462        // The handle should have been dropped with the buffer.
463        assert_eq!(
464            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
465            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
466        );
467    }
468
469    #[test]
470    fn complete_decode_moves_handles() {
471        let (encode_end, check_end) = Channel::create();
472
473        let mut buffer = Buffer::new();
474        buffer
475            .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
476            .expect("encoding should succeed");
477
478        let recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
479        let decoded =
480            recv_buffer.decode::<WireHandleAndBoolean>().expect("decoding should succeed");
481
482        // The handle should remain un-signaled after successful decoding.
483        assert_eq!(
484            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
485            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
486        );
487
488        drop(decoded);
489
490        // Now the handle should be signaled.
491        assert_eq!(
492            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
493            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
494        );
495    }
496}