Skip to main content

fidl_next_protocol/
mpsc.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A basic [`Transport`] implementation based on MPSC channels.
6
7use core::fmt;
8use core::mem::{ManuallyDrop, take};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use fidl_next_codec::Chunk;
13
14use crate::concurrency::future::AtomicWaker;
15use crate::concurrency::sync::atomic::{AtomicBool, Ordering};
16use crate::concurrency::sync::{Arc, mpsc};
17
18use crate::{NonBlockingTransport, Transport};
19
20/// A paired mpsc transport.
21pub struct Mpsc {
22    shared: Shared,
23    exclusive: Exclusive,
24}
25
26impl Mpsc {
27    /// Creates two mpscs which can communicate with each other.
28    pub fn new() -> (Self, Self) {
29        let state = Arc::new(State {
30            send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
31            is_closed: AtomicBool::new(false),
32        });
33        let (a_send, a_recv) = mpsc::channel();
34        let (b_send, b_recv) = mpsc::channel();
35        (
36            Mpsc {
37                shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
38                exclusive: Exclusive { receiver: b_recv },
39            },
40            Mpsc {
41                shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
42                exclusive: Exclusive { receiver: a_recv },
43            },
44        )
45    }
46}
47
48/// The error type for paired mpsc transports.
49#[derive(Clone, Debug)]
50pub enum Error {}
51
52impl fmt::Display for Error {
53    fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
54        match *self {}
55    }
56}
57
58impl core::error::Error for Error {}
59
60struct State {
61    send_wakers: [AtomicWaker; 2],
62    is_closed: AtomicBool,
63}
64
65/// The shared part of a paired mpsc transport.
66pub struct Shared {
67    state: Arc<State>,
68    end: usize,
69    sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
70}
71
72impl Drop for Shared {
73    fn drop(&mut self) {
74        // Make sure that the mpsc is closed before waking the other end
75        unsafe {
76            ManuallyDrop::drop(&mut self.sender);
77        }
78        self.state.is_closed.store(true, Ordering::Relaxed);
79        self.state.send_wakers[self.end].wake();
80    }
81}
82
83/// The send future for a paired mpsc transport.
84pub struct SendFutureState {
85    buffer: Vec<Chunk>,
86}
87
88/// The exclusive part of a paired mpsc transport.
89pub struct Exclusive {
90    receiver: mpsc::Receiver<Vec<Chunk>>,
91}
92
93impl Transport for Mpsc {
94    type Error = Error;
95
96    fn split(self) -> (Self::Shared, Self::Exclusive) {
97        (self.shared, self.exclusive)
98    }
99
100    type Shared = Shared;
101    type SendBuffer = Vec<Chunk>;
102    type SendFutureState = SendFutureState;
103
104    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
105        Vec::new()
106    }
107
108    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
109        SendFutureState { buffer }
110    }
111
112    fn poll_send(
113        future_state: Pin<&mut SendFutureState>,
114        _: &mut Context<'_>,
115        shared: &Self::Shared,
116    ) -> Poll<Result<(), Option<Error>>> {
117        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
118    }
119
120    type Exclusive = Exclusive;
121    type RecvFutureState = ();
122    type RecvBuffer = Vec<Chunk>;
123
124    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {}
125
126    fn poll_recv(
127        _: Pin<&mut Self::RecvFutureState>,
128        cx: &mut Context<'_>,
129        shared: &Self::Shared,
130        exclusive: &mut Self::Exclusive,
131    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
132        shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
133        if shared.state.is_closed.load(Ordering::Relaxed) {
134            return Poll::Ready(Err(None));
135        }
136
137        match exclusive.receiver.try_recv() {
138            Ok(chunks) => Poll::Ready(Ok(chunks)),
139            Err(mpsc::TryRecvError::Empty) => Poll::Pending,
140            Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
141        }
142    }
143}
144
145impl NonBlockingTransport for Mpsc {
146    fn send_immediately(
147        future_state: &mut Self::SendFutureState,
148        shared: &Self::Shared,
149    ) -> Result<(), Option<Self::Error>> {
150        let chunks = take(&mut future_state.buffer);
151        match shared.sender.send(chunks) {
152            Ok(()) => {
153                shared.state.send_wakers[shared.end].wake();
154                Ok(())
155            }
156            Err(_) => Err(None),
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use fuchsia_async as fasync;
164
165    use super::Mpsc;
166    use crate::testing::*;
167
168    #[fasync::run_singlethreaded(test)]
169    async fn close_on_drop() {
170        test_close_on_drop(Mpsc::new).await;
171    }
172
173    #[fasync::run_singlethreaded(test)]
174    async fn one_way() {
175        test_one_way(Mpsc::new).await;
176    }
177
178    #[fasync::run_singlethreaded(test)]
179    async fn one_way_nonblocking() {
180        test_one_way_nonblocking(Mpsc::new).await;
181    }
182
183    #[fasync::run_singlethreaded(test)]
184    async fn two_way() {
185        test_two_way(Mpsc::new).await;
186    }
187
188    #[fasync::run_singlethreaded(test)]
189    async fn multiple_two_way() {
190        test_multiple_two_way(Mpsc::new).await;
191    }
192
193    #[fasync::run_singlethreaded(test)]
194    async fn event() {
195        test_event(Mpsc::new).await;
196    }
197}