fidl_next_protocol/
mpsc.rs1use core::fmt;
8use core::marker::PhantomData;
9use core::mem::{ManuallyDrop, take};
10use core::pin::Pin;
11use core::ptr::NonNull;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::decoder::InternalHandleDecoder;
15use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder};
16
17use crate::concurrency::future::AtomicWaker;
18use crate::concurrency::sync::atomic::{AtomicBool, Ordering};
19use crate::concurrency::sync::{Arc, mpsc};
20
21use crate::{NonBlockingTransport, Transport};
22
23pub struct Mpsc {
25 shared: Shared,
26 exclusive: Exclusive,
27}
28
29impl Mpsc {
30 pub fn new() -> (Self, Self) {
32 let state = Arc::new(State {
33 send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
34 is_closed: AtomicBool::new(false),
35 });
36 let (a_send, a_recv) = mpsc::channel();
37 let (b_send, b_recv) = mpsc::channel();
38 (
39 Mpsc {
40 shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
41 exclusive: Exclusive { receiver: b_recv },
42 },
43 Mpsc {
44 shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
45 exclusive: Exclusive { receiver: a_recv },
46 },
47 )
48 }
49}
50
51#[derive(Clone, Debug)]
53pub enum Error {}
54
55impl fmt::Display for Error {
56 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
57 match *self {}
58 }
59}
60
61impl core::error::Error for Error {}
62
63struct State {
64 send_wakers: [AtomicWaker; 2],
65 is_closed: AtomicBool,
66}
67
68pub struct Shared {
70 state: Arc<State>,
71 end: usize,
72 sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
73}
74
75impl Drop for Shared {
76 fn drop(&mut self) {
77 unsafe {
79 ManuallyDrop::drop(&mut self.sender);
80 }
81 self.state.is_closed.store(true, Ordering::Relaxed);
82 self.state.send_wakers[self.end].wake();
83 }
84}
85
86pub struct SendFutureState {
88 buffer: Vec<Chunk>,
89}
90
91pub struct Exclusive {
93 receiver: mpsc::Receiver<Vec<Chunk>>,
94}
95
96pub struct RecvFutureState {
98 _phantom: PhantomData<()>,
99}
100
101pub struct RecvBuffer {
103 chunks: Vec<Chunk>,
104 chunks_taken: usize,
105}
106
107impl InternalHandleDecoder for RecvBuffer {
108 fn __internal_take_handles(&mut self, _: usize) -> Result<(), DecodeError> {
109 Err(DecodeError::InsufficientHandles)
110 }
111
112 fn __internal_handles_remaining(&self) -> usize {
113 0
114 }
115}
116
117unsafe impl Decoder for RecvBuffer {
118 fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
119 if count > self.chunks.len() - self.chunks_taken {
120 return Err(DecodeError::InsufficientData);
121 }
122
123 let chunks = unsafe { self.chunks.as_mut_ptr().add(self.chunks_taken) };
124 self.chunks_taken += count;
125
126 unsafe { Ok(NonNull::new_unchecked(chunks)) }
127 }
128
129 fn commit(&mut self) {
130 }
132
133 fn finish(&self) -> Result<(), DecodeError> {
134 if self.chunks_taken != self.chunks.len() {
135 return Err(DecodeError::ExtraBytes {
136 num_extra: (self.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
137 });
138 }
139
140 Ok(())
141 }
142}
143
144impl Transport for Mpsc {
145 type Error = Error;
146
147 fn split(self) -> (Self::Shared, Self::Exclusive) {
148 (self.shared, self.exclusive)
149 }
150
151 type Shared = Shared;
152 type SendBuffer = Vec<Chunk>;
153 type SendFutureState = SendFutureState;
154
155 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
156 Vec::new()
157 }
158
159 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
160 SendFutureState { buffer }
161 }
162
163 fn poll_send(
164 future_state: Pin<&mut SendFutureState>,
165 _: &mut Context<'_>,
166 shared: &Self::Shared,
167 ) -> Poll<Result<(), Option<Error>>> {
168 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
169 }
170
171 type Exclusive = Exclusive;
172 type RecvFutureState = RecvFutureState;
173 type RecvBuffer = RecvBuffer;
174
175 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
176 RecvFutureState { _phantom: PhantomData }
177 }
178
179 fn poll_recv(
180 _: Pin<&mut Self::RecvFutureState>,
181 cx: &mut Context<'_>,
182 shared: &Self::Shared,
183 exclusive: &mut Self::Exclusive,
184 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
185 shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
186 if shared.state.is_closed.load(Ordering::Relaxed) {
187 return Poll::Ready(Err(None));
188 }
189
190 match exclusive.receiver.try_recv() {
191 Ok(chunks) => Poll::Ready(Ok(RecvBuffer { chunks, chunks_taken: 0 })),
192 Err(mpsc::TryRecvError::Empty) => Poll::Pending,
193 Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
194 }
195 }
196}
197
198impl NonBlockingTransport for Mpsc {
199 fn send_immediately(
200 future_state: &mut Self::SendFutureState,
201 shared: &Self::Shared,
202 ) -> Result<(), Option<Self::Error>> {
203 let chunks = take(&mut future_state.buffer);
204 match shared.sender.send(chunks) {
205 Ok(()) => {
206 shared.state.send_wakers[shared.end].wake();
207 Ok(())
208 }
209 Err(_) => Err(None),
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use fuchsia_async as fasync;
217
218 use super::Mpsc;
219 use crate::testing::*;
220
221 #[fasync::run_singlethreaded(test)]
222 async fn close_on_drop() {
223 test_close_on_drop(Mpsc::new).await;
224 }
225
226 #[fasync::run_singlethreaded(test)]
227 async fn one_way() {
228 test_one_way(Mpsc::new).await;
229 }
230
231 #[fasync::run_singlethreaded(test)]
232 async fn one_way_nonblocking() {
233 test_one_way_nonblocking(Mpsc::new).await;
234 }
235
236 #[fasync::run_singlethreaded(test)]
237 async fn two_way() {
238 test_two_way(Mpsc::new).await;
239 }
240
241 #[fasync::run_singlethreaded(test)]
242 async fn multiple_two_way() {
243 test_multiple_two_way(Mpsc::new).await;
244 }
245
246 #[fasync::run_singlethreaded(test)]
247 async fn event() {
248 test_event(Mpsc::new).await;
249 }
250}