fidl_next_protocol/fuchsia/
channel.rs1use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::ptr::NonNull;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19 ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20 zx_channel_write, zx_handle_t,
21};
22use zx::{AsHandleRef as _, Channel, HandleBased, NullableHandle, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26pub struct Shared {
28 channel: RWHandle<Channel>,
29 }
31
32impl Shared {
33 fn new(channel: Channel) -> Self {
34 Self { channel: RWHandle::new(channel) }
35 }
36}
37
38#[derive(Default)]
40pub struct Buffer {
41 handles: Vec<NullableHandle>,
42 chunks: Vec<Chunk>,
43}
44
45impl Buffer {
46 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn handles(&self) -> &[NullableHandle] {
53 &self.handles
54 }
55
56 pub fn bytes(&self) -> Vec<u8> {
58 self.chunks.iter().flat_map(|chunk| chunk.to_le_bytes()).collect()
59 }
60
61 pub fn from_raw(handles: Vec<NullableHandle>, chunks: Vec<Chunk>) -> Self {
63 Self { handles, chunks }
64 }
65
66 pub fn from_raw_bytes(handles: Vec<NullableHandle>, bytes: impl AsRef<[u8]>) -> Self {
68 let bytes = bytes.as_ref();
69 assert!(bytes.len() % CHUNK_SIZE == 0);
70 let chunks = bytes
71 .chunks_exact(CHUNK_SIZE)
72 .map(|c| fidl_next_codec::WireU64(u64::from_le_bytes(c.try_into().unwrap())))
73 .collect();
74 Self::from_raw(handles, chunks)
75 }
76}
77
78impl InternalHandleEncoder for Buffer {
79 #[inline]
80 fn __internal_handle_count(&self) -> usize {
81 self.handles.len()
82 }
83}
84
85impl Encoder for Buffer {
86 #[inline]
87 fn bytes_written(&self) -> usize {
88 Encoder::bytes_written(&self.chunks)
89 }
90
91 #[inline]
92 fn write_zeroes(&mut self, len: usize) {
93 Encoder::write_zeroes(&mut self.chunks, len)
94 }
95
96 #[inline]
97 fn write(&mut self, bytes: &[u8]) {
98 Encoder::write(&mut self.chunks, bytes)
99 }
100
101 #[inline]
102 fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
103 Encoder::rewrite(&mut self.chunks, pos, bytes)
104 }
105}
106
107impl HandleEncoder for Buffer {
108 fn push_handle(&mut self, handle: NullableHandle) -> Result<(), EncodeError> {
109 self.handles.push(handle);
110 Ok(())
111 }
112
113 fn handles_pushed(&self) -> usize {
114 self.handles.len()
115 }
116}
117
118pub struct SendFutureState {
120 buffer: Buffer,
121}
122
123pub struct Exclusive {
125 _phantom: PhantomData<()>,
126}
127
128pub struct RecvFutureState {
130 buffer: Option<Buffer>,
131}
132
133pub struct RecvBuffer {
135 buffer: Buffer,
136 chunks_taken: usize,
137 handles_taken: usize,
138}
139
140impl RecvBuffer {
141 pub fn new(buffer: Buffer) -> Self {
143 Self { buffer, chunks_taken: 0, handles_taken: 0 }
144 }
145}
146
147unsafe impl Decoder for RecvBuffer {
148 fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
149 if count > self.buffer.chunks.len() - self.chunks_taken {
150 return Err(DecodeError::InsufficientData);
151 }
152
153 let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
154 self.chunks_taken += count;
155
156 unsafe { Ok(NonNull::new_unchecked(chunks)) }
157 }
158
159 fn commit(&mut self) {
160 for handle in &mut self.buffer.handles[0..self.handles_taken] {
161 let _ = replace(handle, NullableHandle::invalid()).into_raw();
163 }
164 }
165
166 fn finish(&self) -> Result<(), DecodeError> {
167 if self.chunks_taken != self.buffer.chunks.len() {
168 return Err(DecodeError::ExtraBytes {
169 num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
170 });
171 }
172
173 if self.handles_taken != self.buffer.handles.len() {
174 return Err(DecodeError::ExtraHandles {
175 num_extra: self.buffer.handles.len() - self.handles_taken,
176 });
177 }
178
179 Ok(())
180 }
181}
182
183impl InternalHandleDecoder for RecvBuffer {
184 fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
185 if count > self.buffer.handles.len() - self.handles_taken {
186 return Err(DecodeError::InsufficientHandles);
187 }
188
189 for i in self.handles_taken..self.handles_taken + count {
190 let handle = replace(&mut self.buffer.handles[i], NullableHandle::invalid());
191 drop(handle);
192 }
193 self.handles_taken += count;
194
195 Ok(())
196 }
197
198 fn __internal_handles_remaining(&self) -> usize {
199 self.buffer.handles.len() - self.handles_taken
200 }
201}
202
203impl HandleDecoder for RecvBuffer {
204 fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
205 if self.handles_taken >= self.buffer.handles.len() {
206 return Err(DecodeError::InsufficientHandles);
207 }
208
209 let handle = self.buffer.handles[self.handles_taken].raw_handle();
210 self.handles_taken += 1;
211
212 Ok(handle)
213 }
214
215 fn handles_remaining(&mut self) -> usize {
216 self.buffer.handles.len() - self.handles_taken
217 }
218}
219
220impl Transport for Channel {
221 type Error = Status;
222
223 fn split(self) -> (Self::Shared, Self::Exclusive) {
224 (Shared::new(self), Exclusive { _phantom: PhantomData })
225 }
226
227 type Shared = Shared;
228 type SendBuffer = Buffer;
229 type SendFutureState = SendFutureState;
230
231 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
232 Buffer::new()
233 }
234
235 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
236 SendFutureState { buffer }
237 }
238
239 fn poll_send(
240 future_state: Pin<&mut Self::SendFutureState>,
241 _: &mut Context<'_>,
242 shared: &Self::Shared,
243 ) -> Poll<Result<(), Option<Self::Error>>> {
244 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
245 }
246
247 type Exclusive = Exclusive;
248 type RecvFutureState = RecvFutureState;
249 type RecvBuffer = RecvBuffer;
250
251 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
252 RecvFutureState { buffer: Some(Buffer::new()) }
253 }
254
255 fn poll_recv(
256 mut future_state: Pin<&mut Self::RecvFutureState>,
257 cx: &mut Context<'_>,
258 shared: &Self::Shared,
259 _: &mut Self::Exclusive,
260 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
261 let buffer = future_state.buffer.as_mut().unwrap();
262
263 let mut actual_bytes = 0;
264 let mut actual_handles = 0;
265
266 loop {
267 let result = unsafe {
268 zx_channel_read(
269 shared.channel.get_ref().raw_handle(),
270 0,
271 buffer.chunks.as_mut_ptr().cast(),
272 buffer.handles.as_mut_ptr().cast(),
273 (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
274 buffer.handles.capacity() as u32,
275 &mut actual_bytes,
276 &mut actual_handles,
277 )
278 };
279
280 match result {
281 ZX_OK => {
282 unsafe {
283 buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
284 buffer.handles.set_len(actual_handles as usize);
285 }
286 return Poll::Ready(Ok(RecvBuffer {
287 buffer: future_state.buffer.take().unwrap(),
288 chunks_taken: 0,
289 handles_taken: 0,
290 }));
291 }
292 ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
293 ZX_ERR_BUFFER_TOO_SMALL => {
294 let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
295 buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
296 buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
297 }
298 ZX_ERR_SHOULD_WAIT => {
299 if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
300 return Poll::Pending;
301 }
302 }
303 raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
304 }
305 }
306 }
307}
308
309impl NonBlockingTransport for Channel {
310 fn send_immediately(
311 future_state: &mut Self::SendFutureState,
312 shared: &Self::Shared,
313 ) -> Result<(), Option<Self::Error>> {
314 let result = unsafe {
315 zx_channel_write(
316 shared.channel.get_ref().raw_handle(),
317 0,
318 future_state.buffer.chunks.as_ptr().cast::<u8>(),
319 (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
320 future_state.buffer.handles.as_ptr().cast(),
321 future_state.buffer.handles.len() as u32,
322 )
323 };
324
325 match result {
326 ZX_OK => {
327 unsafe {
329 future_state.buffer.handles.set_len(0);
330 }
331 Ok(())
332 }
333 ZX_ERR_PEER_CLOSED => Err(None),
334 _ => Err(Some(Status::from_raw(result))),
335 }
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use core::mem::MaybeUninit;
342
343 use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder, WireHandle};
344 use fidl_next_codec::{
345 Decode, DecodeError, DecoderExt as _, Encode, EncodeError, EncoderExt as _, FromWire, Slot,
346 Unconstrained, Wire, munge,
347 };
348 use fuchsia_async as fasync;
349 use zx::{
350 AsHandleRef, Channel, HandleBased as _, Instant, NullableHandle, Signals, WaitResult,
351 };
352
353 use crate::fuchsia::channel::{Buffer, RecvBuffer};
354 use crate::testing::*;
355
356 #[fasync::run_singlethreaded(test)]
357 async fn close_on_drop() {
358 test_close_on_drop(Channel::create).await;
359 }
360
361 #[fasync::run_singlethreaded(test)]
362 async fn one_way() {
363 test_one_way(Channel::create).await;
364 }
365
366 #[fasync::run_singlethreaded(test)]
367 async fn one_way_nonblocking() {
368 test_one_way_nonblocking(Channel::create).await;
369 }
370
371 #[fasync::run_singlethreaded(test)]
372 async fn two_way() {
373 test_two_way(Channel::create).await;
374 }
375
376 #[fasync::run_singlethreaded(test)]
377 async fn multiple_two_way() {
378 test_multiple_two_way(Channel::create).await;
379 }
380
381 #[fasync::run_singlethreaded(test)]
382 async fn event() {
383 test_event(Channel::create).await;
384 }
385
386 struct HandleAndBoolean {
387 handle: NullableHandle,
388 boolean: bool,
389 }
390
391 #[derive(Debug)]
392 #[repr(C)]
393 struct WireHandleAndBoolean {
394 handle: WireHandle,
395 boolean: bool,
396 }
397
398 impl Unconstrained for WireHandleAndBoolean {}
399
400 unsafe impl Wire for WireHandleAndBoolean {
401 type Owned<'de> = Self;
402
403 fn zero_padding(out: &mut MaybeUninit<Self>) {
404 unsafe {
405 out.as_mut_ptr().write_bytes(0, 1);
406 }
407 }
408 }
409
410 unsafe impl<E: HandleEncoder + ?Sized> Encode<WireHandleAndBoolean, E> for HandleAndBoolean {
411 fn encode(
412 self,
413 encoder: &mut E,
414 out: &mut MaybeUninit<WireHandleAndBoolean>,
415 _: (),
416 ) -> Result<(), EncodeError> {
417 munge!(let WireHandleAndBoolean { handle, boolean } = out);
418 self.handle.encode(encoder, handle, ())?;
419 self.boolean.encode(encoder, boolean, ())?;
420 Ok(())
421 }
422 }
423
424 unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for WireHandleAndBoolean {
425 fn decode(slot: Slot<'_, Self>, decoder: &mut D, _: ()) -> Result<(), DecodeError> {
426 munge!(let Self { handle, boolean } = slot);
427 Decode::decode(handle, decoder, ())?;
428 Decode::decode(boolean, decoder, ())?;
429 Ok(())
430 }
431 }
432
433 impl FromWire<WireHandleAndBoolean> for HandleAndBoolean {
434 fn from_wire(wire: WireHandleAndBoolean) -> Self {
435 Self { handle: NullableHandle::from_wire(wire.handle), boolean: wire.boolean }
436 }
437 }
438
439 #[test]
440 fn partial_decode_drops_handles() {
441 let (encode_end, check_end) = Channel::create();
442
443 let mut buffer = Buffer::new();
444 buffer
445 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false }, ())
446 .expect("encoding should succeed");
447 *buffer.chunks[0] |= 0x00000002_00000000;
449
450 let mut recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
451 (&mut recv_buffer)
452 .decode_owned::<WireHandleAndBoolean>()
453 .expect_err("decoding an invalid boolean should fail");
454
455 assert_eq!(
457 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
458 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
459 );
460
461 drop(recv_buffer);
462
463 assert_eq!(
465 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
466 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
467 );
468 }
469
470 #[test]
471 fn complete_decode_moves_handles() {
472 let (encode_end, check_end) = Channel::create();
473
474 let mut buffer = Buffer::new();
475 buffer
476 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false }, ())
477 .expect("encoding should succeed");
478
479 let recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
480 let decoded =
481 recv_buffer.decode::<WireHandleAndBoolean>().expect("decoding should succeed");
482
483 assert_eq!(
485 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
486 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
487 );
488
489 drop(decoded);
490
491 assert_eq!(
493 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
494 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
495 );
496 }
497}