fidl_next_protocol/fuchsia/
channel.rs1use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::ptr::NonNull;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19 ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20 zx_channel_write, zx_handle_t,
21};
22use zx::{AsHandleRef as _, Channel, Handle, HandleBased, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26pub struct Shared {
28 channel: RWHandle<Channel>,
29 }
31
32impl Shared {
33 fn new(channel: Channel) -> Self {
34 Self { channel: RWHandle::new(channel) }
35 }
36}
37
38#[derive(Default)]
40pub struct Buffer {
41 handles: Vec<Handle>,
42 chunks: Vec<Chunk>,
43}
44
45impl Buffer {
46 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn handles(&self) -> &[Handle] {
53 &self.handles
54 }
55
56 pub fn bytes(&self) -> Vec<u8> {
58 self.chunks.iter().flat_map(|chunk| chunk.to_le_bytes()).collect()
59 }
60
61 pub fn from_raw(handles: Vec<Handle>, chunks: Vec<Chunk>) -> Self {
63 Self { handles, chunks }
64 }
65
66 pub fn from_raw_bytes(handles: Vec<Handle>, bytes: impl AsRef<[u8]>) -> Self {
68 let bytes = bytes.as_ref();
69 assert!(bytes.len() % CHUNK_SIZE == 0);
70 let chunks = bytes
71 .chunks_exact(CHUNK_SIZE)
72 .map(|c| fidl_next_codec::WireU64(u64::from_le_bytes(c.try_into().unwrap())))
73 .collect();
74 Self::from_raw(handles, chunks)
75 }
76}
77
78impl InternalHandleEncoder for Buffer {
79 #[inline]
80 fn __internal_handle_count(&self) -> usize {
81 self.handles.len()
82 }
83}
84
85impl Encoder for Buffer {
86 #[inline]
87 fn bytes_written(&self) -> usize {
88 Encoder::bytes_written(&self.chunks)
89 }
90
91 #[inline]
92 fn write_zeroes(&mut self, len: usize) {
93 Encoder::write_zeroes(&mut self.chunks, len)
94 }
95
96 #[inline]
97 fn write(&mut self, bytes: &[u8]) {
98 Encoder::write(&mut self.chunks, bytes)
99 }
100
101 #[inline]
102 fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
103 Encoder::rewrite(&mut self.chunks, pos, bytes)
104 }
105}
106
107impl HandleEncoder for Buffer {
108 fn push_handle(&mut self, handle: Handle) -> Result<(), EncodeError> {
109 self.handles.push(handle);
110 Ok(())
111 }
112
113 fn handles_pushed(&self) -> usize {
114 self.handles.len()
115 }
116}
117
118pub struct SendFutureState {
120 buffer: Buffer,
121}
122
123pub struct Exclusive {
125 _phantom: PhantomData<()>,
126}
127
128pub struct RecvFutureState {
130 buffer: Option<Buffer>,
131}
132
133pub struct RecvBuffer {
135 buffer: Buffer,
136 chunks_taken: usize,
137 handles_taken: usize,
138}
139
140impl RecvBuffer {
141 pub fn new(buffer: Buffer) -> Self {
143 Self { buffer, chunks_taken: 0, handles_taken: 0 }
144 }
145}
146
147unsafe impl Decoder for RecvBuffer {
148 fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
149 if count > self.buffer.chunks.len() - self.chunks_taken {
150 return Err(DecodeError::InsufficientData);
151 }
152
153 let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
154 self.chunks_taken += count;
155
156 unsafe { Ok(NonNull::new_unchecked(chunks)) }
157 }
158
159 fn commit(&mut self) {
160 for handle in &mut self.buffer.handles[0..self.handles_taken] {
161 let _ = replace(handle, Handle::invalid()).into_raw();
163 }
164 }
165
166 fn finish(&self) -> Result<(), DecodeError> {
167 if self.chunks_taken != self.buffer.chunks.len() {
168 return Err(DecodeError::ExtraBytes {
169 num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
170 });
171 }
172
173 if self.handles_taken != self.buffer.handles.len() {
174 return Err(DecodeError::ExtraHandles {
175 num_extra: self.buffer.handles.len() - self.handles_taken,
176 });
177 }
178
179 Ok(())
180 }
181}
182
183impl InternalHandleDecoder for RecvBuffer {
184 fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
185 if count > self.buffer.handles.len() - self.handles_taken {
186 return Err(DecodeError::InsufficientHandles);
187 }
188
189 for i in self.handles_taken..self.handles_taken + count {
190 let handle = replace(&mut self.buffer.handles[i], Handle::invalid());
191 drop(handle);
192 }
193 self.handles_taken += count;
194
195 Ok(())
196 }
197
198 fn __internal_handles_remaining(&self) -> usize {
199 self.buffer.handles.len() - self.handles_taken
200 }
201}
202
203impl HandleDecoder for RecvBuffer {
204 fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
205 if self.handles_taken >= self.buffer.handles.len() {
206 return Err(DecodeError::InsufficientHandles);
207 }
208
209 let handle = self.buffer.handles[self.handles_taken].raw_handle();
210 self.handles_taken += 1;
211
212 Ok(handle)
213 }
214
215 fn handles_remaining(&mut self) -> usize {
216 self.buffer.handles.len() - self.handles_taken
217 }
218}
219
220impl Transport for Channel {
221 type Error = Status;
222
223 fn split(self) -> (Self::Shared, Self::Exclusive) {
224 (Shared::new(self), Exclusive { _phantom: PhantomData })
225 }
226
227 type Shared = Shared;
228 type SendBuffer = Buffer;
229 type SendFutureState = SendFutureState;
230
231 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
232 Buffer::new()
233 }
234
235 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
236 SendFutureState { buffer }
237 }
238
239 fn poll_send(
240 future_state: Pin<&mut Self::SendFutureState>,
241 _: &mut Context<'_>,
242 shared: &Self::Shared,
243 ) -> Poll<Result<(), Option<Self::Error>>> {
244 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
245 }
246
247 type Exclusive = Exclusive;
248 type RecvFutureState = RecvFutureState;
249 type RecvBuffer = RecvBuffer;
250
251 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
252 RecvFutureState { buffer: Some(Buffer::new()) }
253 }
254
255 fn poll_recv(
256 mut future_state: Pin<&mut Self::RecvFutureState>,
257 cx: &mut Context<'_>,
258 shared: &Self::Shared,
259 _: &mut Self::Exclusive,
260 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
261 let buffer = future_state.buffer.as_mut().unwrap();
262
263 let mut actual_bytes = 0;
264 let mut actual_handles = 0;
265
266 loop {
267 let result = unsafe {
268 zx_channel_read(
269 shared.channel.get_ref().raw_handle(),
270 0,
271 buffer.chunks.as_mut_ptr().cast(),
272 buffer.handles.as_mut_ptr().cast(),
273 (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
274 buffer.handles.capacity() as u32,
275 &mut actual_bytes,
276 &mut actual_handles,
277 )
278 };
279
280 match result {
281 ZX_OK => {
282 unsafe {
283 buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
284 buffer.handles.set_len(actual_handles as usize);
285 }
286 return Poll::Ready(Ok(RecvBuffer {
287 buffer: future_state.buffer.take().unwrap(),
288 chunks_taken: 0,
289 handles_taken: 0,
290 }));
291 }
292 ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
293 ZX_ERR_BUFFER_TOO_SMALL => {
294 let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
295 buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
296 buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
297 }
298 ZX_ERR_SHOULD_WAIT => {
299 if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
300 return Poll::Pending;
301 }
302 }
303 raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
304 }
305 }
306 }
307}
308
309impl NonBlockingTransport for Channel {
310 fn send_immediately(
311 future_state: &mut Self::SendFutureState,
312 shared: &Self::Shared,
313 ) -> Result<(), Option<Self::Error>> {
314 let result = unsafe {
315 zx_channel_write(
316 shared.channel.get_ref().raw_handle(),
317 0,
318 future_state.buffer.chunks.as_ptr().cast::<u8>(),
319 (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
320 future_state.buffer.handles.as_ptr().cast(),
321 future_state.buffer.handles.len() as u32,
322 )
323 };
324
325 match result {
326 ZX_OK => {
327 unsafe {
329 future_state.buffer.handles.set_len(0);
330 }
331 Ok(())
332 }
333 ZX_ERR_PEER_CLOSED => Err(None),
334 _ => Err(Some(Status::from_raw(result))),
335 }
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use core::mem::MaybeUninit;
342
343 use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder, WireHandle};
344 use fidl_next_codec::{
345 Decode, DecodeError, DecoderExt as _, Encodable, Encode, EncodeError, EncoderExt as _,
346 FromWire, Slot, Wire, munge,
347 };
348 use fuchsia_async as fasync;
349 use zx::{AsHandleRef, Channel, Handle, HandleBased as _, Instant, Signals, WaitResult};
350
351 use crate::fuchsia::channel::{Buffer, RecvBuffer};
352 use crate::testing::*;
353
354 #[fasync::run_singlethreaded(test)]
355 async fn close_on_drop() {
356 test_close_on_drop(Channel::create).await;
357 }
358
359 #[fasync::run_singlethreaded(test)]
360 async fn one_way() {
361 test_one_way(Channel::create).await;
362 }
363
364 #[fasync::run_singlethreaded(test)]
365 async fn one_way_nonblocking() {
366 test_one_way_nonblocking(Channel::create).await;
367 }
368
369 #[fasync::run_singlethreaded(test)]
370 async fn two_way() {
371 test_two_way(Channel::create).await;
372 }
373
374 #[fasync::run_singlethreaded(test)]
375 async fn multiple_two_way() {
376 test_multiple_two_way(Channel::create).await;
377 }
378
379 #[fasync::run_singlethreaded(test)]
380 async fn event() {
381 test_event(Channel::create).await;
382 }
383
384 struct HandleAndBoolean {
385 handle: Handle,
386 boolean: bool,
387 }
388
389 #[derive(Debug)]
390 #[repr(C)]
391 struct WireHandleAndBoolean {
392 handle: WireHandle,
393 boolean: bool,
394 }
395
396 unsafe impl Wire for WireHandleAndBoolean {
397 type Decoded<'de> = Self;
398
399 fn zero_padding(out: &mut MaybeUninit<Self>) {
400 unsafe {
401 out.as_mut_ptr().write_bytes(0, 1);
402 }
403 }
404 }
405
406 impl Encodable for HandleAndBoolean {
407 type Encoded = WireHandleAndBoolean;
408 }
409
410 unsafe impl<E: HandleEncoder + ?Sized> Encode<E> for HandleAndBoolean {
411 fn encode(
412 self,
413 encoder: &mut E,
414 out: &mut MaybeUninit<Self::Encoded>,
415 ) -> Result<(), EncodeError> {
416 munge!(let Self::Encoded { handle, boolean } = out);
417 self.handle.encode(encoder, handle)?;
418 self.boolean.encode(encoder, boolean)?;
419 Ok(())
420 }
421 }
422
423 unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for WireHandleAndBoolean {
424 fn decode(slot: Slot<'_, Self>, decoder: &mut D) -> Result<(), DecodeError> {
425 munge!(let Self { handle, boolean } = slot);
426 Decode::decode(handle, decoder)?;
427 Decode::decode(boolean, decoder)?;
428 Ok(())
429 }
430 }
431
432 impl FromWire<WireHandleAndBoolean> for HandleAndBoolean {
433 fn from_wire(wire: WireHandleAndBoolean) -> Self {
434 Self { handle: Handle::from_wire(wire.handle), boolean: wire.boolean }
435 }
436 }
437
438 #[test]
439 fn partial_decode_drops_handles() {
440 let (encode_end, check_end) = Channel::create();
441
442 let mut buffer = Buffer::new();
443 buffer
444 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
445 .expect("encoding should succeed");
446 *buffer.chunks[0] |= 0x00000002_00000000;
448
449 let mut recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
450 (&mut recv_buffer)
451 .decode_owned::<WireHandleAndBoolean>()
452 .expect_err("decoding an invalid boolean should fail");
453
454 assert_eq!(
456 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
457 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
458 );
459
460 drop(recv_buffer);
461
462 assert_eq!(
464 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
465 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
466 );
467 }
468
469 #[test]
470 fn complete_decode_moves_handles() {
471 let (encode_end, check_end) = Channel::create();
472
473 let mut buffer = Buffer::new();
474 buffer
475 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
476 .expect("encoding should succeed");
477
478 let recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
479 let decoded =
480 recv_buffer.decode::<WireHandleAndBoolean>().expect("decoding should succeed");
481
482 assert_eq!(
484 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
485 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
486 );
487
488 drop(decoded);
489
490 assert_eq!(
492 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
493 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
494 );
495 }
496}