fidl_next_protocol/fuchsia/
channel.rs1use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::slice;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{AsDecoder, CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19 ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20 zx_channel_write, zx_handle_t,
21};
22use zx::{Channel, NullableHandle, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26pub struct Shared {
28 channel: RWHandle<Channel>,
29 }
31
32impl Shared {
33 fn new(channel: Channel) -> Self {
34 Self { channel: RWHandle::new(channel) }
35 }
36}
37
38#[derive(Default)]
40pub struct Buffer {
41 pub chunks: Vec<Chunk>,
43 pub handles: Vec<NullableHandle>,
45}
46
47impl Buffer {
48 pub fn new() -> Self {
50 Self::default()
51 }
52}
53
54impl InternalHandleEncoder for Buffer {
55 #[inline]
56 fn __internal_handle_count(&self) -> usize {
57 self.handles.len()
58 }
59}
60
61impl Encoder for Buffer {
62 #[inline]
63 fn bytes_written(&self) -> usize {
64 Encoder::bytes_written(&self.chunks)
65 }
66
67 #[inline]
68 fn write_zeroes(&mut self, len: usize) {
69 Encoder::write_zeroes(&mut self.chunks, len)
70 }
71
72 #[inline]
73 fn write(&mut self, bytes: &[u8]) {
74 Encoder::write(&mut self.chunks, bytes)
75 }
76
77 #[inline]
78 fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
79 Encoder::rewrite(&mut self.chunks, pos, bytes)
80 }
81}
82
83impl HandleEncoder for Buffer {
84 fn push_handle(&mut self, handle: NullableHandle) -> Result<(), EncodeError> {
85 self.handles.push(handle);
86 Ok(())
87 }
88
89 fn handles_pushed(&self) -> usize {
90 self.handles.len()
91 }
92}
93
94unsafe impl<'de> AsDecoder<'de> for Buffer {
97 type Decoder = BufferDecoder<'de>;
98
99 fn as_decoder(&'de mut self) -> Self::Decoder {
100 BufferDecoder { buffer: self, chunks_taken: 0, handles_taken: 0 }
101 }
102}
103
104pub struct SendFutureState {
106 buffer: Buffer,
107}
108
109pub struct Exclusive {
111 _phantom: PhantomData<()>,
112}
113
114pub struct RecvFutureState {
116 buffer: Option<Buffer>,
117}
118
119pub struct BufferDecoder<'de> {
121 buffer: &'de mut Buffer,
122 chunks_taken: usize,
123 handles_taken: usize,
124}
125
126impl InternalHandleDecoder for BufferDecoder<'_> {
127 fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
128 if count > self.buffer.handles.len() - self.handles_taken {
129 return Err(DecodeError::InsufficientHandles);
130 }
131
132 for i in self.handles_taken..self.handles_taken + count {
133 let handle = replace(&mut self.buffer.handles[i], NullableHandle::invalid());
134 drop(handle);
135 }
136 self.handles_taken += count;
137
138 Ok(())
139 }
140
141 fn __internal_handles_remaining(&self) -> usize {
142 self.buffer.handles.len() - self.handles_taken
143 }
144}
145
146impl<'de> Decoder<'de> for BufferDecoder<'de> {
147 fn take_chunks(&mut self, count: usize) -> Result<&'de mut [Chunk], DecodeError> {
148 if count > self.buffer.chunks.len() - self.chunks_taken {
149 return Err(DecodeError::InsufficientData);
150 }
151
152 let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
153 self.chunks_taken += count;
154
155 unsafe { Ok(slice::from_raw_parts_mut(chunks, count)) }
156 }
157
158 fn commit(&mut self) {
159 for handle in &mut self.buffer.handles[0..self.handles_taken] {
160 let _ = replace(handle, NullableHandle::invalid()).into_raw();
162 }
163 }
164
165 fn finish(&self) -> Result<(), DecodeError> {
166 if self.chunks_taken != self.buffer.chunks.len() {
167 return Err(DecodeError::ExtraBytes {
168 num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
169 });
170 }
171
172 if self.handles_taken != self.buffer.handles.len() {
173 return Err(DecodeError::ExtraHandles {
174 num_extra: self.buffer.handles.len() - self.handles_taken,
175 });
176 }
177
178 Ok(())
179 }
180}
181
182impl HandleDecoder for BufferDecoder<'_> {
183 fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
184 if self.handles_taken >= self.buffer.handles.len() {
185 return Err(DecodeError::InsufficientHandles);
186 }
187
188 let handle = self.buffer.handles[self.handles_taken].raw_handle();
189 self.handles_taken += 1;
190
191 Ok(handle)
192 }
193
194 fn handles_remaining(&mut self) -> usize {
195 self.buffer.handles.len() - self.handles_taken
196 }
197}
198
199impl Transport for Channel {
200 type Error = Status;
201
202 fn split(self) -> (Self::Shared, Self::Exclusive) {
203 (Shared::new(self), Exclusive { _phantom: PhantomData })
204 }
205
206 type Shared = Shared;
207 type SendBuffer = Buffer;
208 type SendFutureState = SendFutureState;
209
210 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
211 Buffer::new()
212 }
213
214 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
215 SendFutureState { buffer }
216 }
217
218 fn poll_send(
219 future_state: Pin<&mut Self::SendFutureState>,
220 _: &mut Context<'_>,
221 shared: &Self::Shared,
222 ) -> Poll<Result<(), Option<Self::Error>>> {
223 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
224 }
225
226 type Exclusive = Exclusive;
227 type RecvFutureState = RecvFutureState;
228 type RecvBuffer = Buffer;
229
230 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
231 RecvFutureState { buffer: Some(Buffer::new()) }
232 }
233
234 fn poll_recv(
235 mut future_state: Pin<&mut Self::RecvFutureState>,
236 cx: &mut Context<'_>,
237 shared: &Self::Shared,
238 _: &mut Self::Exclusive,
239 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
240 let buffer = future_state.buffer.as_mut().unwrap();
241
242 let mut actual_bytes = 0;
243 let mut actual_handles = 0;
244
245 loop {
246 let result = unsafe {
247 zx_channel_read(
248 shared.channel.get_ref().raw_handle(),
249 0,
250 buffer.chunks.as_mut_ptr().cast(),
251 buffer.handles.as_mut_ptr().cast(),
252 (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
253 buffer.handles.capacity() as u32,
254 &mut actual_bytes,
255 &mut actual_handles,
256 )
257 };
258
259 match result {
260 ZX_OK => {
261 unsafe {
262 buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
263 buffer.handles.set_len(actual_handles as usize);
264 }
265 return Poll::Ready(Ok(future_state.buffer.take().unwrap()));
266 }
267 ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
268 ZX_ERR_BUFFER_TOO_SMALL => {
269 let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
270 buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
271 buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
272 }
273 ZX_ERR_SHOULD_WAIT => {
274 if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
275 return Poll::Pending;
276 }
277 }
278 raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
279 }
280 }
281 }
282}
283
284impl NonBlockingTransport for Channel {
285 fn send_immediately(
286 future_state: &mut Self::SendFutureState,
287 shared: &Self::Shared,
288 ) -> Result<(), Option<Self::Error>> {
289 let result = unsafe {
290 zx_channel_write(
291 shared.channel.get_ref().raw_handle(),
292 0,
293 future_state.buffer.chunks.as_ptr().cast::<u8>(),
294 (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
295 future_state.buffer.handles.as_ptr().cast(),
296 future_state.buffer.handles.len() as u32,
297 )
298 };
299
300 match result {
301 ZX_OK => {
302 unsafe {
304 future_state.buffer.handles.set_len(0);
305 }
306 Ok(())
307 }
308 ZX_ERR_PEER_CLOSED => Err(None),
309 _ => Err(Some(Status::from_raw(result))),
310 }
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use fidl_next_codec::{AsDecoder as _, DecoderExt as _, EncoderExt as _};
317 use fuchsia_async as fasync;
318 use zx::{Channel, HandleBased as _, Instant, NullableHandle, Signals, WaitResult};
319
320 use crate::fuchsia::channel::Buffer;
321 use crate::testing::*;
322
323 #[fasync::run_singlethreaded(test)]
324 async fn close_on_drop() {
325 test_close_on_drop(Channel::create).await;
326 }
327
328 #[fasync::run_singlethreaded(test)]
329 async fn one_way() {
330 test_one_way(Channel::create).await;
331 }
332
333 #[fasync::run_singlethreaded(test)]
334 async fn one_way_nonblocking() {
335 test_one_way_nonblocking(Channel::create).await;
336 }
337
338 #[fasync::run_singlethreaded(test)]
339 async fn two_way() {
340 test_two_way(Channel::create).await;
341 }
342
343 #[fasync::run_singlethreaded(test)]
344 async fn multiple_two_way() {
345 test_multiple_two_way(Channel::create).await;
346 }
347
348 #[fasync::run_singlethreaded(test)]
349 async fn event() {
350 test_event(Channel::create).await;
351 }
352
353 struct HandleAndBoolean {
354 handle: NullableHandle,
355 boolean: bool,
356 }
357
358 mod wire {
359 use core::mem::MaybeUninit;
360
361 use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
362 use fidl_next_codec::{
363 Constrained, Decode, DecodeError, Encode, EncodeError, FromWire, Slot, ValidationError,
364 Wire, munge, wire,
365 };
366
367 #[derive(Debug)]
368 #[repr(C)]
369 pub struct HandleAndBoolean {
370 handle: wire::fuchsia::Handle,
371 boolean: bool,
372 }
373
374 impl Constrained for HandleAndBoolean {
375 type Constraint = ();
376
377 fn validate(_: Slot<'_, Self>, _: Self::Constraint) -> Result<(), ValidationError> {
378 Ok(())
379 }
380 }
381
382 unsafe impl Wire for HandleAndBoolean {
383 type Narrowed<'de> = Self;
384
385 fn zero_padding(out: &mut MaybeUninit<Self>) {
386 unsafe {
387 out.as_mut_ptr().write_bytes(0, 1);
388 }
389 }
390 }
391
392 unsafe impl<E: HandleEncoder + ?Sized> Encode<HandleAndBoolean, E> for super::HandleAndBoolean {
393 fn encode(
394 self,
395 encoder: &mut E,
396 out: &mut MaybeUninit<HandleAndBoolean>,
397 _: (),
398 ) -> Result<(), EncodeError> {
399 munge!(let HandleAndBoolean { handle, boolean } = out);
400 self.handle.encode(encoder, handle, ())?;
401 self.boolean.encode(encoder, boolean, ())?;
402 Ok(())
403 }
404 }
405
406 unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for HandleAndBoolean {
407 fn decode(slot: Slot<'_, Self>, decoder: &mut D, _: ()) -> Result<(), DecodeError> {
408 munge!(let Self { handle, boolean } = slot);
409 Decode::decode(handle, decoder, ())?;
410 Decode::decode(boolean, decoder, ())?;
411 Ok(())
412 }
413 }
414
415 impl FromWire<HandleAndBoolean> for super::HandleAndBoolean {
416 fn from_wire(wire: HandleAndBoolean) -> Self {
417 Self { handle: zx::NullableHandle::from_wire(wire.handle), boolean: wire.boolean }
418 }
419 }
420 }
421
422 #[test]
423 fn partial_decode_drops_handles() {
424 let (encode_end, check_end) = Channel::create();
425
426 let mut buffer =
427 Buffer::encode(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
428 .expect("encoding should succeed");
429 *buffer.chunks[0] |= 0x00000002_00000000;
431
432 let mut decoder = buffer.as_decoder();
433 decoder
434 .decode::<wire::HandleAndBoolean>()
435 .expect_err("decoding an invalid boolean should fail");
436
437 assert_eq!(
439 check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
440 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
441 );
442
443 drop(buffer);
444
445 assert_eq!(
447 check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
448 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
449 );
450 }
451
452 #[test]
453 fn complete_decode_moves_handles() {
454 let (encode_end, check_end) = Channel::create();
455
456 let mut buffer =
457 Buffer::encode(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
458 .expect("encoding should succeed");
459
460 let mut decoder = buffer.as_decoder();
461 let decoded = decoder.decode::<wire::HandleAndBoolean>().expect("decoding should succeed");
462
463 assert_eq!(
465 check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
466 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
467 );
468
469 drop(decoded);
470
471 assert_eq!(
473 check_end.wait_one(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
474 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
475 );
476 }
477}