fidl_next_protocol/fuchsia/
channel.rs1use core::marker::PhantomData;
8use core::mem::replace;
9use core::pin::Pin;
10use core::ptr::NonNull;
11use core::task::{Context, Poll};
12
13use fidl_next_codec::decoder::InternalHandleDecoder;
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder};
16use fidl_next_codec::{CHUNK_SIZE, Chunk, DecodeError, Decoder, EncodeError, Encoder};
17use fuchsia_async::{RWHandle, ReadableHandle as _};
18use zx::sys::{
19 ZX_ERR_BUFFER_TOO_SMALL, ZX_ERR_PEER_CLOSED, ZX_ERR_SHOULD_WAIT, ZX_OK, zx_channel_read,
20 zx_channel_write, zx_handle_t,
21};
22use zx::{AsHandleRef as _, Channel, Handle, HandleBased, Status};
23
24use crate::{NonBlockingTransport, Transport};
25
26pub struct Shared {
28 channel: RWHandle<Channel>,
29 }
31
32impl Shared {
33 fn new(channel: Channel) -> Self {
34 Self { channel: RWHandle::new(channel) }
35 }
36}
37
38#[derive(Default)]
40pub struct Buffer {
41 handles: Vec<Handle>,
42 chunks: Vec<Chunk>,
43}
44
45impl Buffer {
46 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn handles(&self) -> &[Handle] {
53 &self.handles
54 }
55
56 pub fn bytes(&self) -> Vec<u8> {
58 self.chunks.iter().flat_map(|chunk| chunk.to_le_bytes()).collect()
59 }
60}
61
62impl InternalHandleEncoder for Buffer {
63 #[inline]
64 fn __internal_handle_count(&self) -> usize {
65 self.handles.len()
66 }
67}
68
69impl Encoder for Buffer {
70 #[inline]
71 fn bytes_written(&self) -> usize {
72 Encoder::bytes_written(&self.chunks)
73 }
74
75 #[inline]
76 fn write_zeroes(&mut self, len: usize) {
77 Encoder::write_zeroes(&mut self.chunks, len)
78 }
79
80 #[inline]
81 fn write(&mut self, bytes: &[u8]) {
82 Encoder::write(&mut self.chunks, bytes)
83 }
84
85 #[inline]
86 fn rewrite(&mut self, pos: usize, bytes: &[u8]) {
87 Encoder::rewrite(&mut self.chunks, pos, bytes)
88 }
89}
90
91impl HandleEncoder for Buffer {
92 fn push_handle(&mut self, handle: Handle) -> Result<(), EncodeError> {
93 self.handles.push(handle);
94 Ok(())
95 }
96
97 fn handles_pushed(&self) -> usize {
98 self.handles.len()
99 }
100}
101
102pub struct SendFutureState {
104 buffer: Buffer,
105}
106
107pub struct Exclusive {
109 _phantom: PhantomData<()>,
110}
111
112pub struct RecvFutureState {
114 buffer: Option<Buffer>,
115}
116
117pub struct RecvBuffer {
119 buffer: Buffer,
120 chunks_taken: usize,
121 handles_taken: usize,
122}
123
124unsafe impl Decoder for RecvBuffer {
125 fn take_chunks_raw(&mut self, count: usize) -> Result<NonNull<Chunk>, DecodeError> {
126 if count > self.buffer.chunks.len() - self.chunks_taken {
127 return Err(DecodeError::InsufficientData);
128 }
129
130 let chunks = unsafe { self.buffer.chunks.as_mut_ptr().add(self.chunks_taken) };
131 self.chunks_taken += count;
132
133 unsafe { Ok(NonNull::new_unchecked(chunks)) }
134 }
135
136 fn commit(&mut self) {
137 for handle in &mut self.buffer.handles[0..self.handles_taken] {
138 let _ = replace(handle, Handle::invalid()).into_raw();
140 }
141 }
142
143 fn finish(&self) -> Result<(), DecodeError> {
144 if self.chunks_taken != self.buffer.chunks.len() {
145 return Err(DecodeError::ExtraBytes {
146 num_extra: (self.buffer.chunks.len() - self.chunks_taken) * CHUNK_SIZE,
147 });
148 }
149
150 if self.handles_taken != self.buffer.handles.len() {
151 return Err(DecodeError::ExtraHandles {
152 num_extra: self.buffer.handles.len() - self.handles_taken,
153 });
154 }
155
156 Ok(())
157 }
158}
159
160impl InternalHandleDecoder for RecvBuffer {
161 fn __internal_take_handles(&mut self, count: usize) -> Result<(), DecodeError> {
162 if count > self.buffer.handles.len() - self.handles_taken {
163 return Err(DecodeError::InsufficientHandles);
164 }
165
166 for i in self.handles_taken..self.handles_taken + count {
167 let handle = replace(&mut self.buffer.handles[i], Handle::invalid());
168 drop(handle);
169 }
170 self.handles_taken += count;
171
172 Ok(())
173 }
174
175 fn __internal_handles_remaining(&self) -> usize {
176 self.buffer.handles.len() - self.handles_taken
177 }
178}
179
180impl HandleDecoder for RecvBuffer {
181 fn take_raw_handle(&mut self) -> Result<zx_handle_t, DecodeError> {
182 if self.handles_taken >= self.buffer.handles.len() {
183 return Err(DecodeError::InsufficientHandles);
184 }
185
186 let handle = self.buffer.handles[self.handles_taken].raw_handle();
187 self.handles_taken += 1;
188
189 Ok(handle)
190 }
191
192 fn handles_remaining(&mut self) -> usize {
193 self.buffer.handles.len() - self.handles_taken
194 }
195}
196
197impl Transport for Channel {
198 type Error = Status;
199
200 fn split(self) -> (Self::Shared, Self::Exclusive) {
201 (Shared::new(self), Exclusive { _phantom: PhantomData })
202 }
203
204 type Shared = Shared;
205 type SendBuffer = Buffer;
206 type SendFutureState = SendFutureState;
207
208 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
209 Buffer::new()
210 }
211
212 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
213 SendFutureState { buffer }
214 }
215
216 fn poll_send(
217 future_state: Pin<&mut Self::SendFutureState>,
218 _: &mut Context<'_>,
219 shared: &Self::Shared,
220 ) -> Poll<Result<(), Option<Self::Error>>> {
221 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
222 }
223
224 type Exclusive = Exclusive;
225 type RecvFutureState = RecvFutureState;
226 type RecvBuffer = RecvBuffer;
227
228 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {
229 RecvFutureState { buffer: Some(Buffer::new()) }
230 }
231
232 fn poll_recv(
233 mut future_state: Pin<&mut Self::RecvFutureState>,
234 cx: &mut Context<'_>,
235 shared: &Self::Shared,
236 _: &mut Self::Exclusive,
237 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
238 let buffer = future_state.buffer.as_mut().unwrap();
239
240 let mut actual_bytes = 0;
241 let mut actual_handles = 0;
242
243 loop {
244 let result = unsafe {
245 zx_channel_read(
246 shared.channel.get_ref().raw_handle(),
247 0,
248 buffer.chunks.as_mut_ptr().cast(),
249 buffer.handles.as_mut_ptr().cast(),
250 (buffer.chunks.capacity() * CHUNK_SIZE) as u32,
251 buffer.handles.capacity() as u32,
252 &mut actual_bytes,
253 &mut actual_handles,
254 )
255 };
256
257 match result {
258 ZX_OK => {
259 unsafe {
260 buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE);
261 buffer.handles.set_len(actual_handles as usize);
262 }
263 return Poll::Ready(Ok(RecvBuffer {
264 buffer: future_state.buffer.take().unwrap(),
265 chunks_taken: 0,
266 handles_taken: 0,
267 }));
268 }
269 ZX_ERR_PEER_CLOSED => return Poll::Ready(Err(None)),
270 ZX_ERR_BUFFER_TOO_SMALL => {
271 let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE);
272 buffer.chunks.reserve(min_chunks - buffer.chunks.capacity());
273 buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity());
274 }
275 ZX_ERR_SHOULD_WAIT => {
276 if matches!(shared.channel.need_readable(cx)?, Poll::Pending) {
277 return Poll::Pending;
278 }
279 }
280 raw => return Poll::Ready(Err(Some(Status::from_raw(raw)))),
281 }
282 }
283 }
284}
285
286impl NonBlockingTransport for Channel {
287 fn send_immediately(
288 future_state: &mut Self::SendFutureState,
289 shared: &Self::Shared,
290 ) -> Result<(), Option<Self::Error>> {
291 let result = unsafe {
292 zx_channel_write(
293 shared.channel.get_ref().raw_handle(),
294 0,
295 future_state.buffer.chunks.as_ptr().cast::<u8>(),
296 (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32,
297 future_state.buffer.handles.as_ptr().cast(),
298 future_state.buffer.handles.len() as u32,
299 )
300 };
301
302 match result {
303 ZX_OK => {
304 unsafe {
306 future_state.buffer.handles.set_len(0);
307 }
308 Ok(())
309 }
310 ZX_ERR_PEER_CLOSED => Err(None),
311 _ => Err(Some(Status::from_raw(result))),
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use core::mem::MaybeUninit;
319
320 use fidl_next_codec::fuchsia::{HandleDecoder, HandleEncoder, WireHandle};
321 use fidl_next_codec::{
322 Decode, DecodeError, DecoderExt as _, Encodable, Encode, EncodeError, EncoderExt as _,
323 FromWire, Slot, Wire, munge,
324 };
325 use fuchsia_async as fasync;
326 use zx::{AsHandleRef, Channel, Handle, HandleBased as _, Instant, Signals, WaitResult};
327
328 use crate::fuchsia::channel::{Buffer, RecvBuffer};
329 use crate::testing::*;
330
331 #[fasync::run_singlethreaded(test)]
332 async fn close_on_drop() {
333 test_close_on_drop(Channel::create).await;
334 }
335
336 #[fasync::run_singlethreaded(test)]
337 async fn one_way() {
338 test_one_way(Channel::create).await;
339 }
340
341 #[fasync::run_singlethreaded(test)]
342 async fn one_way_nonblocking() {
343 test_one_way_nonblocking(Channel::create).await;
344 }
345
346 #[fasync::run_singlethreaded(test)]
347 async fn two_way() {
348 test_two_way(Channel::create).await;
349 }
350
351 #[fasync::run_singlethreaded(test)]
352 async fn multiple_two_way() {
353 test_multiple_two_way(Channel::create).await;
354 }
355
356 #[fasync::run_singlethreaded(test)]
357 async fn event() {
358 test_event(Channel::create).await;
359 }
360
361 struct HandleAndBoolean {
362 handle: Handle,
363 boolean: bool,
364 }
365
366 #[derive(Debug)]
367 #[repr(C)]
368 struct WireHandleAndBoolean {
369 handle: WireHandle,
370 boolean: bool,
371 }
372
373 unsafe impl Wire for WireHandleAndBoolean {
374 type Decoded<'de> = Self;
375
376 fn zero_padding(out: &mut MaybeUninit<Self>) {
377 unsafe {
378 out.as_mut_ptr().write_bytes(0, 1);
379 }
380 }
381 }
382
383 impl Encodable for HandleAndBoolean {
384 type Encoded = WireHandleAndBoolean;
385 }
386
387 unsafe impl<E: HandleEncoder + ?Sized> Encode<E> for HandleAndBoolean {
388 fn encode(
389 self,
390 encoder: &mut E,
391 out: &mut MaybeUninit<Self::Encoded>,
392 ) -> Result<(), EncodeError> {
393 munge!(let Self::Encoded { handle, boolean } = out);
394 self.handle.encode(encoder, handle)?;
395 self.boolean.encode(encoder, boolean)?;
396 Ok(())
397 }
398 }
399
400 unsafe impl<D: HandleDecoder + ?Sized> Decode<D> for WireHandleAndBoolean {
401 fn decode(slot: Slot<'_, Self>, decoder: &mut D) -> Result<(), DecodeError> {
402 munge!(let Self { handle, boolean } = slot);
403 Decode::decode(handle, decoder)?;
404 Decode::decode(boolean, decoder)?;
405 Ok(())
406 }
407 }
408
409 impl FromWire<WireHandleAndBoolean> for HandleAndBoolean {
410 fn from_wire(wire: WireHandleAndBoolean) -> Self {
411 Self { handle: Handle::from_wire(wire.handle), boolean: wire.boolean }
412 }
413 }
414
415 #[test]
416 fn partial_decode_drops_handles() {
417 let (encode_end, check_end) = Channel::create();
418
419 let mut buffer = Buffer::new();
420 buffer
421 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
422 .expect("encoding should succeed");
423 *buffer.chunks[0] |= 0x00000002_00000000;
425
426 let mut recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
427 (&mut recv_buffer)
428 .decode_owned::<WireHandleAndBoolean>()
429 .expect_err("decoding an invalid boolean should fail");
430
431 assert_eq!(
433 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
434 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
435 );
436
437 drop(recv_buffer);
438
439 assert_eq!(
441 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
442 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
443 );
444 }
445
446 #[test]
447 fn complete_decode_moves_handles() {
448 let (encode_end, check_end) = Channel::create();
449
450 let mut buffer = Buffer::new();
451 buffer
452 .encode_next(HandleAndBoolean { handle: encode_end.into_handle(), boolean: false })
453 .expect("encoding should succeed");
454
455 let recv_buffer = RecvBuffer { buffer, chunks_taken: 0, handles_taken: 0 };
456 let decoded =
457 recv_buffer.decode::<WireHandleAndBoolean>().expect("decoding should succeed");
458
459 assert_eq!(
461 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
462 WaitResult::TimedOut(Signals::CHANNEL_WRITABLE),
463 );
464
465 drop(decoded);
466
467 assert_eq!(
469 check_end.wait_handle(Signals::CHANNEL_PEER_CLOSED, Instant::INFINITE_PAST),
470 WaitResult::Ok(Signals::CHANNEL_PEER_CLOSED),
471 );
472 }
473}