fidl_next_protocol/
mpsc.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A basic [`Transport`] implementation based on MPSC channels.
6
7use core::fmt;
8use core::marker::PhantomData;
9use core::mem::{ManuallyDrop, take};
10use core::pin::Pin;
11use core::ptr::NonNull;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::decoder::InternalHandleDecoder;
15use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder};
16
17use crate::concurrency::future::AtomicWaker;
18use crate::concurrency::sync::atomic::{AtomicBool, Ordering};
19use crate::concurrency::sync::{Arc, mpsc};
20
21use crate::{NonBlockingTransport, Transport};
22
23/// A paired mpsc transport.
24pub struct Mpsc {
25    shared: Shared,
26    exclusive: Exclusive,
27}
28
29impl Mpsc {
30    /// Creates two mpscs which can communicate with each other.
31    pub fn new() -> (Self, Self) {
32        let state = Arc::new(State {
33            send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
34            is_closed: AtomicBool::new(false),
35        });
36        let (a_send, a_recv) = mpsc::channel();
37        let (b_send, b_recv) = mpsc::channel();
38        (
39            Mpsc {
40                shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
41                exclusive: Exclusive { receiver: b_recv },
42            },
43            Mpsc {
44                shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
45                exclusive: Exclusive { receiver: a_recv },
46            },
47        )
48    }
49}
50
51/// The error type for paired mpsc transports.
52#[derive(Clone, Debug)]
53pub enum Error {}
54
55impl fmt::Display for Error {
56    fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
57        match *self {}
58    }
59}
60
61impl core::error::Error for Error {}
62
63struct State {
64    send_wakers: [AtomicWaker; 2],
65    is_closed: AtomicBool,
66}
67
68/// The shared part of a paired mpsc transport.
69pub struct Shared {
70    state: Arc<State>,
71    end: usize,
72    sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
73}
74
75impl Drop for Shared {
76    fn drop(&mut self) {
77        // Make sure that the mpsc is closed before waking the other end
78        unsafe {
79            ManuallyDrop::drop(&mut self.sender);
80        }
81        self.state.is_closed.store(true, Ordering::Relaxed);
82        self.state.send_wakers[self.end].wake();
83    }
84}
85
86/// The send future for a paired mpsc transport.
87pub struct SendFutureState {
88    buffer: Vec<Chunk>,
89}
90
91/// The exclusive part of a paired mpsc transport.
92pub struct Exclusive {
93    receiver: mpsc::Receiver<Vec<Chunk>>,
94}
95
96/// The receive future for a paired mpsc transport.
97pub struct RecvFutureState {
98    _phantom: PhantomData<()>,
99}
100
101/// A received message buffer.
102pub struct RecvBuffer {
103    chunks: Vec<Chunk>,
104    chunks_taken: usize,
105}
106
107impl InternalHandleDecoder for RecvBuffer {
108    fn __internal_take_handles(&mut self, _: usize) -> Result<(), DecodeError> {
109        Err(DecodeError::InsufficientHandles)
110    }
111
112    fn __internal_handles_remaining(&self) -> usize {
113        0
114    }
115}
116
117unsafe impl Decoder for RecvBuffer {
118    fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
119        if count > self.chunks.len() - self.chunks_taken {
120            return Err(DecodeError::InsufficientData);
121        }
122
123        let chunks = unsafe { self.chunks.as_mut_ptr().add(self.chunks_taken) };
124        self.chunks_taken += count;
125
126        unsafe { Ok(NonNull::new_unchecked(chunks)) }
127    }
128
129    fn commit(&mut self) {
130        // No resources to take, so commit is a no-op
131    }
132
133    fn finish(&self) -> Result<(), DecodeError> {
134        if self.chunks_taken != self.chunks.len() {
135            return Err(DecodeError::ExtraBytes {
136                num_extra: (self.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
137            });
138        }
139
140        Ok(())
141    }
142}
143
144impl Transport for Mpsc {
145    type Error = Error;
146
147    fn split(self) -> (Self::Shared, Self::Exclusive) {
148        (self.shared, self.exclusive)
149    }
150
151    type Shared = Shared;
152    type SendBuffer = Vec<Chunk>;
153    type SendFutureState = SendFutureState;
154
155    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
156        Vec::new()
157    }
158
159    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
160        SendFutureState { buffer }
161    }
162
163    fn poll_send(
164        future_state: Pin<&mut SendFutureState>,
165        _: &mut Context<'_>,
166        shared: &Self::Shared,
167    ) -> Poll<Result<(), Option<Error>>> {
168        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
169    }
170
171    type Exclusive = Exclusive;
172    type RecvFutureState = RecvFutureState;
173    type RecvBuffer = RecvBuffer;
174
175    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
176        RecvFutureState { _phantom: PhantomData }
177    }
178
179    fn poll_recv(
180        _: Pin<&mut Self::RecvFutureState>,
181        cx: &mut Context<'_>,
182        shared: &Self::Shared,
183        exclusive: &mut Self::Exclusive,
184    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
185        shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
186        if shared.state.is_closed.load(Ordering::Relaxed) {
187            return Poll::Ready(Err(None));
188        }
189
190        match exclusive.receiver.try_recv() {
191            Ok(chunks) => Poll::Ready(Ok(RecvBuffer { chunks, chunks_taken: 0 })),
192            Err(mpsc::TryRecvError::Empty) => Poll::Pending,
193            Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
194        }
195    }
196}
197
198impl NonBlockingTransport for Mpsc {
199    fn send_immediately(
200        future_state: &mut Self::SendFutureState,
201        shared: &Self::Shared,
202    ) -> Result<(), Option<Self::Error>> {
203        let chunks = take(&mut future_state.buffer);
204        match shared.sender.send(chunks) {
205            Ok(()) => {
206                shared.state.send_wakers[shared.end].wake();
207                Ok(())
208            }
209            Err(_) => Err(None),
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use fuchsia_async as fasync;
217
218    use super::Mpsc;
219    use crate::testing::*;
220
221    #[fasync::run_singlethreaded(test)]
222    async fn close_on_drop() {
223        test_close_on_drop(Mpsc::new).await;
224    }
225
226    #[fasync::run_singlethreaded(test)]
227    async fn one_way() {
228        test_one_way(Mpsc::new).await;
229    }
230
231    #[fasync::run_singlethreaded(test)]
232    async fn one_way_nonblocking() {
233        test_one_way_nonblocking(Mpsc::new).await;
234    }
235
236    #[fasync::run_singlethreaded(test)]
237    async fn two_way() {
238        test_two_way(Mpsc::new).await;
239    }
240
241    #[fasync::run_singlethreaded(test)]
242    async fn multiple_two_way() {
243        test_multiple_two_way(Mpsc::new).await;
244    }
245
246    #[fasync::run_singlethreaded(test)]
247    async fn event() {
248        test_event(Mpsc::new).await;
249    }
250}