fidl_next_protocol/fuchsia/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A transport implementation which uses Zircon channels.
6
7use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::ptr::NonNull;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19    ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20    zx_channel_write, zx_handle_t,
21};
22use zx::{AsHandleRef as _, Channel, Handle, HandleBased, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26/// The shared part of a channel.
27pub struct Shared {
28    channel: RWHandle<Channel>,
29    // TODO: recycle send/recv buffers to reduce allocations
30}
31
32impl Shared {
33    fn new(channel: Channel) -> Self {
34        Self { channel: RWHandle::new(channel) }
35    }
36}
37
38/// A channel buffer.
39#[derive(Default)]
40pub struct Buffer {
41    handles: Vec<Handle>,
42    chunks: Vec<Chunk>,
43}
44
45impl Buffer {
46    /// New buffer.
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    /// Retrieve the handles.
52    pub fn handles(&self) -> &[Handle] {
53        &self.handles
54    }
55
56    /// Retrieve the bytes.
57    pub fn bytes(&self) -> Vec<u8> {
58        self.chunks.iter().flat_map(|chunk| chunk.to_le_bytes()).collect()
59    }
60}
61
62impl InternalHandleEncoder for Buffer {
63    #[inline]
64    fn __internal_handle_count(&self) -> usize {
65        self.handles.len()
66    }
67}
68
69impl Encoder for Buffer {
70    #[inline]
71    fn bytes_written(&self) -> usize {
72        Encoder::bytes_written(&self.chunks)
73    }
74
75    #[inline]
76    fn write_zeroes(&mut self, len: usize) {
77        Encoder::write_zeroes(&mut self.chunks, len)
78    }
79
80    #[inline]
81    fn write(&mut self, bytes: &[u8]) {
82        Encoder::write(&mut self.chunks, bytes)
83    }
84
85    #[inline]
86    fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
87        Encoder::rewrite(&mut self.chunks, pos, bytes)
88    }
89}
90
91impl HandleEncoder for Buffer {
92    fn push_handle(&mut self, handle: Handle) -> Result<(), EncodeError> {
93        self.handles.push(handle);
94        Ok(())
95    }
96
97    fn handles_pushed(&self) -> usize {
98        self.handles.len()
99    }
100}
101
102/// The state for a channel send future.
103pub struct SendFutureState {
104    buffer: Buffer,
105}
106
107/// The exclusive part of a channel.
108pub struct Exclusive {
109    _phantom: PhantomData<()>,
110}
111
112/// The state for a channel receive future.
113pub struct RecvFutureState {
114    buffer: Option<Buffer>,
115}
116
117/// A channel receive buffer.
118pub struct RecvBuffer {
119    buffer: Buffer,
120    chunks_taken: usize,
121    handles_taken: usize,
122}
123
124unsafe impl Decoder for RecvBuffer {
125    fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
126        if count > self.buffer.chunks.len() - self.chunks_taken {
127            return Err(DecodeError::InsufficientData);
128        }
129
130        let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
131        self.chunks_taken += count;
132
133        unsafe { Ok(NonNull::new_unchecked(chunks)) }
134    }
135
136    fn commit(&mut self) {
137        for handle in &mut self.buffer.handles[0..self.handles_taken] {
138            // This handle was taken. To commit the current changes, we need to forget it.
139            let _ = replace(handle, Handle::invalid()).into_raw();
140        }
141    }
142
143    fn finish(&self) -> Result<(), DecodeError> {
144        if self.chunks_taken != self.buffer.chunks.len() {
145            return Err(DecodeError::ExtraBytes {
146                num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
147            });
148        }
149
150        if self.handles_taken != self.buffer.handles.len() {
151            return Err(DecodeError::ExtraHandles {
152                num_extra: self.buffer.handles.len() - self.handles_taken,
153            });
154        }
155
156        Ok(())
157    }
158}
159
160impl InternalHandleDecoder for RecvBuffer {
161    fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
162        if count > self.buffer.handles.len() - self.handles_taken {
163            return Err(DecodeError::InsufficientHandles);
164        }
165
166        for i in self.handles_taken..self.handles_taken + count {
167            let handle = replace(&mut self.buffer.handles[i], Handle::invalid());
168            drop(handle);
169        }
170        self.handles_taken += count;
171
172        Ok(())
173    }
174
175    fn __internal_handles_remaining(&self) -> usize {
176        self.buffer.handles.len() - self.handles_taken
177    }
178}
179
180impl HandleDecoder for RecvBuffer {
181    fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
182        if self.handles_taken >= self.buffer.handles.len() {
183            return Err(DecodeError::InsufficientHandles);
184        }
185
186        let handle = self.buffer.handles[self.handles_taken].raw_handle();
187        self.handles_taken += 1;
188
189        Ok(handle)
190    }
191
192    fn handles_remaining(&mut self) -> usize {
193        self.buffer.handles.len() - self.handles_taken
194    }
195}
196
197impl Transport for Channel {
198    type Error = Status;
199
200    fn split(self) -> (Self::Shared, Self::Exclusive) {
201        (Shared::new(self), Exclusive { _phantom: PhantomData })
202    }
203
204    type Shared = Shared;
205    type SendBuffer = Buffer;
206    type SendFutureState = SendFutureState;
207
208    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
209        Buffer::new()
210    }
211
212    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
213        SendFutureState { buffer }
214    }
215
216    fn poll_send(
217        future_state: Pin<&mut Self::SendFutureState>,
218        _: &mut Context<'_>,
219        shared: &Self::Shared,
220    ) -> Poll<Result<(), Option<Self::Error>>> {
221        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
222    }
223
224    type Exclusive = Exclusive;
225    type RecvFutureState = RecvFutureState;
226    type RecvBuffer = RecvBuffer;
227
228    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
229        RecvFutureState { buffer: Some(Buffer::new()) }
230    }
231
232    fn poll_recv(
233        mut future_state: Pin<&mut Self::RecvFutureState>,
234        cx: &mut Context<'_>,
235        shared: &Self::Shared,
236        _: &mut Self::Exclusive,
237    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
238        let buffer = future_state.buffer.as_mut().unwrap();
239
240        let mut actual_bytes = 0;
241        let mut actual_handles = 0;
242
243        loop {
244            let result = unsafe {
245                zx_channel_read(
246                    shared.channel.get_ref().raw_handle(),
247                    0,
248                    buffer.chunks.as_mut_ptr().cast(),
249                    buffer.handles.as_mut_ptr().cast(),
250                    (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
251                    buffer.handles.capacity() as u32,
252                    &mut actual_bytes,
253                    &mut actual_handles,
254                )
255            };
256
257            match result {
258                ZX_OK => {
259                    unsafe {
260                        buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
261                        buffer.handles.set_len(actual_handles as usize);
262                    }
263                    return Poll::Ready(Ok(RecvBuffer {
264                        buffer: future_state.buffer.take().unwrap(),
265                        chunks_taken: 0,
266                        handles_taken: 0,
267                    }));
268                }
269                ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
270                ZX_ERR_BUFFER_TOO_SMALL => {
271                    let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
272                    buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
273                    buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
274                }
275                ZX_ERR_SHOULD_WAIT => {
276                    if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
277                        return Poll::Pending;
278                    }
279                }
280                raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
281            }
282        }
283    }
284}
285
286impl NonBlockingTransport for Channel {
287    fn send_immediately(
288        future_state: &mut Self::SendFutureState,
289        shared: &Self::Shared,
290    ) -> Result<(), Option<Self::Error>> {
291        let result = unsafe {
292            zx_channel_write(
293                shared.channel.get_ref().raw_handle(),
294                0,
295                future_state.buffer.chunks.as_ptr().cast::<u8>(),
296                (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
297                future_state.buffer.handles.as_ptr().cast(),
298                future_state.buffer.handles.len() as u32,
299            )
300        };
301
302        match result {
303            ZX_OK => {
304                // Handles were written to the channel, so we must not drop them.
305                unsafe {
306                    future_state.buffer.handles.set_len(0);
307                }
308                Ok(())
309            }
310            ZX_ERR_PEER_CLOSED => Err(None),
311            _ => Err(Some(Status::from_raw(result))),
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use core::mem::MaybeUninit;
319
320    use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder, WireHandle};
321    use fidl_next_codec::{
322        Decode, DecodeError, DecoderExt as _, Encodable, Encode, EncodeError, EncoderExt as _,
323        FromWire, Slot, Wire, munge,
324    };
325    use fuchsia_async as fasync;
326    use zx::{AsHandleRef, Channel, Handle, HandleBased as _, Instant, Signals, WaitResult};
327
328    use crate::fuchsia::channel::{Buffer, RecvBuffer};
329    use crate::testing::*;
330
331    #[fasync::run_singlethreaded(test)]
332    async fn close_on_drop() {
333        test_close_on_drop(Channel::create).await;
334    }
335
336    #[fasync::run_singlethreaded(test)]
337    async fn one_way() {
338        test_one_way(Channel::create).await;
339    }
340
341    #[fasync::run_singlethreaded(test)]
342    async fn one_way_nonblocking() {
343        test_one_way_nonblocking(Channel::create).await;
344    }
345
346    #[fasync::run_singlethreaded(test)]
347    async fn two_way() {
348        test_two_way(Channel::create).await;
349    }
350
351    #[fasync::run_singlethreaded(test)]
352    async fn multiple_two_way() {
353        test_multiple_two_way(Channel::create).await;
354    }
355
356    #[fasync::run_singlethreaded(test)]
357    async fn event() {
358        test_event(Channel::create).await;
359    }
360
361    struct HandleAndBoolean {
362        handle: Handle,
363        boolean: bool,
364    }
365
366    #[derive(Debug)]
367    #[repr(C)]
368    struct WireHandleAndBoolean {
369        handle: WireHandle,
370        boolean: bool,
371    }
372
373    unsafe impl Wire for WireHandleAndBoolean {
374        type Decoded<'de> = Self;
375
376        fn zero_padding(out: &mut MaybeUninit<Self>) {
377            unsafe {
378                out.as_mut_ptr().write_bytes(0, 1);
379            }
380        }
381    }
382
383    impl Encodable for HandleAndBoolean {
384        type Encoded = WireHandleAndBoolean;
385    }
386
387    unsafe impl<E: HandleEncoder + ?Sized> Encode<E> for HandleAndBoolean {
388        fn encode(
389            self,
390            encoder: &mut E,
391            out: &mut MaybeUninit<Self::Encoded>,
392        ) -> Result<(), EncodeError> {
393            munge!(let Self::Encoded { handle, boolean } = out);
394            self.handle.encode(encoder, handle)?;
395            self.boolean.encode(encoder, boolean)?;
396            Ok(())
397        }
398    }
399
400    unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for WireHandleAndBoolean {
401        fn decode(slot: Slot<'_, Self>, decoder: &mut D) -> Result<(), DecodeError> {
402            munge!(let Self { handle, boolean } = slot);
403            Decode::decode(handle, decoder)?;
404            Decode::decode(boolean, decoder)?;
405            Ok(())
406        }
407    }
408
409    impl FromWire<WireHandleAndBoolean> for HandleAndBoolean {
410        fn from_wire(wire: WireHandleAndBoolean) -> Self {
411            Self { handle: Handle::from_wire(wire.handle), boolean: wire.boolean }
412        }
413    }
414
415    #[test]
416    fn partial_decode_drops_handles() {
417        let (encode_end, check_end) = Channel::create();
418
419        let mut buffer = Buffer::new();
420        buffer
421            .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
422            .expect("encoding should succeed");
423        // Modify the buffer so that the boolean value is invalid
424        *buffer.chunks[0] |= 0x00000002_00000000;
425
426        let mut recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
427        (&mut recv_buffer)
428            .decode_owned::<WireHandleAndBoolean>()
429            .expect_err("decoding an invalid boolean should fail");
430
431        // Decoding failed, so the handle should still be in the buffer.
432        assert_eq!(
433            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
434            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
435        );
436
437        drop(recv_buffer);
438
439        // The handle should have been dropped with the buffer.
440        assert_eq!(
441            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
442            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
443        );
444    }
445
446    #[test]
447    fn complete_decode_moves_handles() {
448        let (encode_end, check_end) = Channel::create();
449
450        let mut buffer = Buffer::new();
451        buffer
452            .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
453            .expect("encoding should succeed");
454
455        let recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
456        let decoded =
457            recv_buffer.decode::<WireHandleAndBoolean>().expect("decoding should succeed");
458
459        // The handle should remain un-signaled after successful decoding.
460        assert_eq!(
461            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
462            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
463        );
464
465        drop(decoded);
466
467        // Now the handle should be signaled.
468        assert_eq!(
469            check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
470            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
471        );
472    }
473}