Skip to main content

fidl_next_protocol/fuchsia/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A transport implementation which uses Zircon channels.
6
7use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::slice;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{AsDecoder, CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19    ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20    zx_channel_write, zx_handle_t,
21};
22use zx::{Channel, NullableHandle, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26/// The shared part of a channel.
27pub struct Shared {
28    channel: RWHandle<Channel>,
29    // TODO: recycle send/recv buffers to reduce allocations
30}
31
32impl Shared {
33    fn new(channel: Channel) -> Self {
34        Self { channel: RWHandle::new(channel) }
35    }
36}
37
38/// A channel buffer that contains handles and chunks.
39#[derive(Default)]
40pub struct Buffer {
41    /// The chunks of the buffer.
42    pub chunks: Vec<Chunk>,
43    /// The handles of the buffer.
44    pub handles: Vec<NullableHandle>,
45}
46
47impl Buffer {
48    /// New buffer.
49    pub fn new() -> Self {
50        Self::default()
51    }
52}
53
54impl InternalHandleEncoder for Buffer {
55    #[inline]
56    fn __internal_handle_count(&self) -> usize {
57        self.handles.len()
58    }
59}
60
61impl Encoder for Buffer {
62    #[inline]
63    fn bytes_written(&self) -> usize {
64        Encoder::bytes_written(&self.chunks)
65    }
66
67    #[inline]
68    fn write_zeroes(&mut self, len: usize) {
69        Encoder::write_zeroes(&mut self.chunks, len)
70    }
71
72    #[inline]
73    fn write(&mut self, bytes: &[u8]) {
74        Encoder::write(&mut self.chunks, bytes)
75    }
76
77    #[inline]
78    fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
79        Encoder::rewrite(&mut self.chunks, pos, bytes)
80    }
81}
82
83impl HandleEncoder for Buffer {
84    fn push_handle(&mut self, handle: NullableHandle) -> Result<(), EncodeError> {
85        self.handles.push(handle);
86        Ok(())
87    }
88
89    fn handles_pushed(&self) -> usize {
90        self.handles.len()
91    }
92}
93
94// SAFETY: Moving a `Vec` does not invalidate any references to its elements.
95// The chunks returned from `take_chunks` are located on the heap.
96unsafe impl<'de> AsDecoder<'de> for Buffer {
97    type Decoder = BufferDecoder<'de>;
98
99    fn as_decoder(&'de mut self) -> Self::Decoder {
100        BufferDecoder { buffer: self, chunks_taken: 0, handles_taken: 0 }
101    }
102}
103
104/// The state for a channel send future.
105pub struct SendFutureState {
106    buffer: Buffer,
107}
108
109/// The exclusive part of a channel.
110pub struct Exclusive {
111    _phantom: PhantomData<()>,
112}
113
114/// The state for a channel receive future.
115pub struct RecvFutureState {
116    buffer: Option<Buffer>,
117}
118
119/// A decoder for a [`Buffer`].
120pub struct BufferDecoder<'de> {
121    buffer: &'de mut Buffer,
122    chunks_taken: usize,
123    handles_taken: usize,
124}
125
126impl InternalHandleDecoder for BufferDecoder<'_> {
127    fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
128        if count > self.buffer.handles.len() - self.handles_taken {
129            return Err(DecodeError::InsufficientHandles);
130        }
131
132        for i in self.handles_taken..self.handles_taken + count {
133            let handle = replace(&mut self.buffer.handles[i], NullableHandle::invalid());
134            drop(handle);
135        }
136        self.handles_taken += count;
137
138        Ok(())
139    }
140
141    fn __internal_handles_remaining(&self) -> usize {
142        self.buffer.handles.len() - self.handles_taken
143    }
144}
145
146impl<'de> Decoder<'de> for BufferDecoder<'de> {
147    fn take_chunks(&mut self, count: usize) -> Result<&'de mut [Chunk], DecodeError> {
148        if count > self.buffer.chunks.len() - self.chunks_taken {
149            return Err(DecodeError::InsufficientData);
150        }
151
152        let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
153        self.chunks_taken += count;
154
155        unsafe { Ok(slice::from_raw_parts_mut(chunks, count)) }
156    }
157
158    fn commit(&mut self) {
159        for handle in &mut self.buffer.handles[0..self.handles_taken] {
160            // This handle was taken. To commit the current changes, we need to forget it.
161            let _ = replace(handle, NullableHandle::invalid()).into_raw();
162        }
163    }
164
165    fn finish(&self) -> Result<(), DecodeError> {
166        if self.chunks_taken != self.buffer.chunks.len() {
167            return Err(DecodeError::ExtraBytes {
168                num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
169            });
170        }
171
172        if self.handles_taken != self.buffer.handles.len() {
173            return Err(DecodeError::ExtraHandles {
174                num_extra: self.buffer.handles.len() - self.handles_taken,
175            });
176        }
177
178        Ok(())
179    }
180}
181
182impl HandleDecoder for BufferDecoder<'_> {
183    fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
184        if self.handles_taken >= self.buffer.handles.len() {
185            return Err(DecodeError::InsufficientHandles);
186        }
187
188        let handle = self.buffer.handles[self.handles_taken].raw_handle();
189        self.handles_taken += 1;
190
191        Ok(handle)
192    }
193
194    fn handles_remaining(&mut self) -> usize {
195        self.buffer.handles.len() - self.handles_taken
196    }
197}
198
199impl Transport for Channel {
200    type Error = Status;
201
202    fn split(self) -> (Self::Shared, Self::Exclusive) {
203        (Shared::new(self), Exclusive { _phantom: PhantomData })
204    }
205
206    type Shared = Shared;
207    type SendBuffer = Buffer;
208    type SendFutureState = SendFutureState;
209
210    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
211        Buffer::new()
212    }
213
214    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
215        SendFutureState { buffer }
216    }
217
218    fn poll_send(
219        future_state: Pin<&mut Self::SendFutureState>,
220        _: &mut Context<'_>,
221        shared: &Self::Shared,
222    ) -> Poll<Result<(), Option<Self::Error>>> {
223        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
224    }
225
226    type Exclusive = Exclusive;
227    type RecvFutureState = RecvFutureState;
228    type RecvBuffer = Buffer;
229
230    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
231        RecvFutureState { buffer: Some(Buffer::new()) }
232    }
233
234    fn poll_recv(
235        mut future_state: Pin<&mut Self::RecvFutureState>,
236        cx: &mut Context<'_>,
237        shared: &Self::Shared,
238        _: &mut Self::Exclusive,
239    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
240        let buffer = future_state.buffer.as_mut().unwrap();
241
242        let mut actual_bytes = 0;
243        let mut actual_handles = 0;
244
245        loop {
246            let result = unsafe {
247                zx_channel_read(
248                    shared.channel.get_ref().raw_handle(),
249                    0,
250                    buffer.chunks.as_mut_ptr().cast(),
251                    buffer.handles.as_mut_ptr().cast(),
252                    (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
253                    buffer.handles.capacity() as u32,
254                    &mut actual_bytes,
255                    &mut actual_handles,
256                )
257            };
258
259            match result {
260                ZX_OK => {
261                    unsafe {
262                        buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
263                        buffer.handles.set_len(actual_handles as usize);
264                    }
265                    return Poll::Ready(Ok(future_state.buffer.take().unwrap()));
266                }
267                ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
268                ZX_ERR_BUFFER_TOO_SMALL => {
269                    let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
270                    buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
271                    buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
272                }
273                ZX_ERR_SHOULD_WAIT => {
274                    if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
275                        return Poll::Pending;
276                    }
277                }
278                raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
279            }
280        }
281    }
282}
283
284impl NonBlockingTransport for Channel {
285    fn send_immediately(
286        future_state: &mut Self::SendFutureState,
287        shared: &Self::Shared,
288    ) -> Result<(), Option<Self::Error>> {
289        let result = unsafe {
290            zx_channel_write(
291                shared.channel.get_ref().raw_handle(),
292                0,
293                future_state.buffer.chunks.as_ptr().cast::<u8>(),
294                (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
295                future_state.buffer.handles.as_ptr().cast(),
296                future_state.buffer.handles.len() as u32,
297            )
298        };
299
300        match result {
301            ZX_OK => {
302                // Handles were written to the channel, so we must not drop them.
303                unsafe {
304                    future_state.buffer.handles.set_len(0);
305                }
306                Ok(())
307            }
308            ZX_ERR_PEER_CLOSED => Err(None),
309            _ => Err(Some(Status::from_raw(result))),
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use fidl_next_codec::{AsDecoder as _, DecoderExt as _, EncoderExt as _};
317    use fuchsia_async as fasync;
318    use zx::{Channel, HandleBased as _, Instant, NullableHandle, Signals, WaitResult};
319
320    use crate::fuchsia::channel::Buffer;
321    use crate::testing::*;
322
323    #[fasync::run_singlethreaded(test)]
324    async fn close_on_drop() {
325        test_close_on_drop(Channel::create).await;
326    }
327
328    #[fasync::run_singlethreaded(test)]
329    async fn one_way() {
330        test_one_way(Channel::create).await;
331    }
332
333    #[fasync::run_singlethreaded(test)]
334    async fn one_way_nonblocking() {
335        test_one_way_nonblocking(Channel::create).await;
336    }
337
338    #[fasync::run_singlethreaded(test)]
339    async fn two_way() {
340        test_two_way(Channel::create).await;
341    }
342
343    #[fasync::run_singlethreaded(test)]
344    async fn multiple_two_way() {
345        test_multiple_two_way(Channel::create).await;
346    }
347
348    #[fasync::run_singlethreaded(test)]
349    async fn event() {
350        test_event(Channel::create).await;
351    }
352
353    struct HandleAndBoolean {
354        handle: NullableHandle,
355        boolean: bool,
356    }
357
358    mod wire {
359        use core::mem::MaybeUninit;
360
361        use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
362        use fidl_next_codec::{
363            Constrained, Decode, DecodeError, Encode, EncodeError, FromWire, Slot, ValidationError,
364            Wire, munge, wire,
365        };
366
367        #[derive(Debug)]
368        #[repr(C)]
369        pub struct HandleAndBoolean {
370            handle: wire::fuchsia::Handle,
371            boolean: bool,
372        }
373
374        impl Constrained for HandleAndBoolean {
375            type Constraint = ();
376
377            fn validate(_: Slot<'_, Self>, _: Self::Constraint) -> Result<(), ValidationError> {
378                Ok(())
379            }
380        }
381
382        unsafe impl Wire for HandleAndBoolean {
383            type Narrowed<'de> = Self;
384
385            fn zero_padding(out: &mut MaybeUninit<Self>) {
386                unsafe {
387                    out.as_mut_ptr().write_bytes(0, 1);
388                }
389            }
390        }
391
392        unsafe impl<E: HandleEncoder + ?Sized> Encode<HandleAndBoolean, E> for super::HandleAndBoolean {
393            fn encode(
394                self,
395                encoder: &mut E,
396                out: &mut MaybeUninit<HandleAndBoolean>,
397                _: (),
398            ) -> Result<(), EncodeError> {
399                munge!(let HandleAndBoolean { handle, boolean } = out);
400                self.handle.encode(encoder, handle, ())?;
401                self.boolean.encode(encoder, boolean, ())?;
402                Ok(())
403            }
404        }
405
406        unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for HandleAndBoolean {
407            fn decode(slot: Slot<'_, Self>, decoder: &mut D, _: ()) -> Result<(), DecodeError> {
408                munge!(let Self { handle, boolean } = slot);
409                Decode::decode(handle, decoder, ())?;
410                Decode::decode(boolean, decoder, ())?;
411                Ok(())
412            }
413        }
414
415        impl FromWire<HandleAndBoolean> for super::HandleAndBoolean {
416            fn from_wire(wire: HandleAndBoolean) -> Self {
417                Self { handle: zx::NullableHandle::from_wire(wire.handle), boolean: wire.boolean }
418            }
419        }
420    }
421
422    #[test]
423    fn partial_decode_drops_handles() {
424        let (encode_end, check_end) = Channel::create();
425
426        let mut buffer =
427            Buffer::encode(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
428                .expect("encoding should succeed");
429        // Modify the buffer so that the boolean value is invalid
430        *buffer.chunks[0] |= 0x00000002_00000000;
431
432        let mut decoder = buffer.as_decoder();
433        decoder
434            .decode::<wire::HandleAndBoolean>()
435            .expect_err("decoding an invalid boolean should fail");
436
437        // Decoding failed, so the handle should still be in the buffer.
438        assert_eq!(
439            check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
440            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
441        );
442
443        drop(buffer);
444
445        // The handle should have been dropped with the buffer.
446        assert_eq!(
447            check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
448            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
449        );
450    }
451
452    #[test]
453    fn complete_decode_moves_handles() {
454        let (encode_end, check_end) = Channel::create();
455
456        let mut buffer =
457            Buffer::encode(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
458                .expect("encoding should succeed");
459
460        let mut decoder = buffer.as_decoder();
461        let decoded = decoder.decode::<wire::HandleAndBoolean>().expect("decoding should succeed");
462
463        // The handle should remain un-signaled after successful decoding.
464        assert_eq!(
465            check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
466            WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
467        );
468
469        drop(decoded);
470
471        // Now the handle should be signaled.
472        assert_eq!(
473            check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
474            WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
475        );
476    }
477}