fidl_next_protocol/
mpsc.rs1use core::fmt;
8use core::mem::{ManuallyDrop, take};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use fidl_next_codec::Chunk;
13
14use crate::concurrency::future::AtomicWaker;
15use crate::concurrency::sync::atomic::{AtomicBool, Ordering};
16use crate::concurrency::sync::{Arc, mpsc};
17
18use crate::{NonBlockingTransport, Transport};
19
20pub struct Mpsc {
22 shared: Shared,
23 exclusive: Exclusive,
24}
25
26impl Mpsc {
27 pub fn new() -> (Self, Self) {
29 let state = Arc::new(State {
30 send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
31 is_closed: AtomicBool::new(false),
32 });
33 let (a_send, a_recv) = mpsc::channel();
34 let (b_send, b_recv) = mpsc::channel();
35 (
36 Mpsc {
37 shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
38 exclusive: Exclusive { receiver: b_recv },
39 },
40 Mpsc {
41 shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
42 exclusive: Exclusive { receiver: a_recv },
43 },
44 )
45 }
46}
47
48#[derive(Clone, Debug)]
50pub enum Error {}
51
52impl fmt::Display for Error {
53 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match *self {}
55 }
56}
57
58impl core::error::Error for Error {}
59
60struct State {
61 send_wakers: [AtomicWaker; 2],
62 is_closed: AtomicBool,
63}
64
65pub struct Shared {
67 state: Arc<State>,
68 end: usize,
69 sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
70}
71
72impl Drop for Shared {
73 fn drop(&mut self) {
74 unsafe {
76 ManuallyDrop::drop(&mut self.sender);
77 }
78 self.state.is_closed.store(true, Ordering::Relaxed);
79 self.state.send_wakers[self.end].wake();
80 }
81}
82
83pub struct SendFutureState {
85 buffer: Vec<Chunk>,
86}
87
88pub struct Exclusive {
90 receiver: mpsc::Receiver<Vec<Chunk>>,
91}
92
93impl Transport for Mpsc {
94 type Error = Error;
95
96 fn split(self) -> (Self::Shared, Self::Exclusive) {
97 (self.shared, self.exclusive)
98 }
99
100 type Shared = Shared;
101 type SendBuffer = Vec<Chunk>;
102 type SendFutureState = SendFutureState;
103
104 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
105 Vec::new()
106 }
107
108 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
109 SendFutureState { buffer }
110 }
111
112 fn poll_send(
113 future_state: Pin<&mut SendFutureState>,
114 _: &mut Context<'_>,
115 shared: &Self::Shared,
116 ) -> Poll<Result<(), Option<Error>>> {
117 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
118 }
119
120 type Exclusive = Exclusive;
121 type RecvFutureState = ();
122 type RecvBuffer = Vec<Chunk>;
123
124 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {}
125
126 fn poll_recv(
127 _: Pin<&mut Self::RecvFutureState>,
128 cx: &mut Context<'_>,
129 shared: &Self::Shared,
130 exclusive: &mut Self::Exclusive,
131 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
132 shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
133 if shared.state.is_closed.load(Ordering::Relaxed) {
134 return Poll::Ready(Err(None));
135 }
136
137 match exclusive.receiver.try_recv() {
138 Ok(chunks) => Poll::Ready(Ok(chunks)),
139 Err(mpsc::TryRecvError::Empty) => Poll::Pending,
140 Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
141 }
142 }
143}
144
145impl NonBlockingTransport for Mpsc {
146 fn send_immediately(
147 future_state: &mut Self::SendFutureState,
148 shared: &Self::Shared,
149 ) -> Result<(), Option<Self::Error>> {
150 let chunks = take(&mut future_state.buffer);
151 match shared.sender.send(chunks) {
152 Ok(()) => {
153 shared.state.send_wakers[shared.end].wake();
154 Ok(())
155 }
156 Err(_) => Err(None),
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use fuchsia_async as fasync;
164
165 use super::Mpsc;
166 use crate::testing::*;
167
168 #[fasync::run_singlethreaded(test)]
169 async fn close_on_drop() {
170 test_close_on_drop(Mpsc::new).await;
171 }
172
173 #[fasync::run_singlethreaded(test)]
174 async fn one_way() {
175 test_one_way(Mpsc::new).await;
176 }
177
178 #[fasync::run_singlethreaded(test)]
179 async fn one_way_nonblocking() {
180 test_one_way_nonblocking(Mpsc::new).await;
181 }
182
183 #[fasync::run_singlethreaded(test)]
184 async fn two_way() {
185 test_two_way(Mpsc::new).await;
186 }
187
188 #[fasync::run_singlethreaded(test)]
189 async fn multiple_two_way() {
190 test_multiple_two_way(Mpsc::new).await;
191 }
192
193 #[fasync::run_singlethreaded(test)]
194 async fn event() {
195 test_event(Mpsc::new).await;
196 }
197}