1use fidl::endpoints::{ClientEnd, Proxy};
6use fuchsia_sync::Mutex;
7use futures::stream::{FusedStream, Stream};
8use futures::{Future, TryFutureExt, io};
9use log::warn;
10use std::fmt;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use {
15 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
16 fuchsia_async as fasync,
17};
18
19use crate::error::Error;
20
21#[derive(PartialEq, Debug, Clone)]
23pub enum ChannelMode {
24 Basic,
25 EnhancedRetransmissionMode,
26 LeCreditBasedFlowControl,
27 EnhancedCreditBasedFlowControl,
28}
29
30impl fmt::Display for ChannelMode {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 ChannelMode::Basic => write!(f, "Basic"),
34 ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
35 ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
36 ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
37 }
38 }
39}
40
41pub enum A2dpDirection {
42 Normal,
43 Source,
44 Sink,
45}
46
47impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
48 fn from(pri: A2dpDirection) -> Self {
49 match pri {
50 A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
51 A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
52 A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
53 }
54 }
55}
56
57impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
58 type Error = Error;
59 fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
60 match fidl {
61 fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
62 fidl_bt::ChannelMode::EnhancedRetransmission => {
63 Ok(ChannelMode::EnhancedRetransmissionMode)
64 }
65 fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
66 Ok(ChannelMode::LeCreditBasedFlowControl)
67 }
68 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
69 Ok(ChannelMode::EnhancedCreditBasedFlowControl)
70 }
71 x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
72 }
73 }
74}
75
76impl From<ChannelMode> for fidl_bt::ChannelMode {
77 fn from(x: ChannelMode) -> Self {
78 match x {
79 ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
80 ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
81 ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
82 ChannelMode::EnhancedCreditBasedFlowControl => {
83 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
84 }
85 }
86 }
87}
88
89#[derive(Debug)]
94pub struct Channel {
95 socket: fasync::Socket,
96 mode: ChannelMode,
97 max_tx_size: usize,
98 flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
99 audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
100 l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
101 audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
102 terminated: bool,
103}
104
105impl Channel {
106 pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
109 Ok(Self::from_socket_infallible(socket, max_tx_size))
110 }
111
112 pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
114 Channel {
115 socket: fasync::Socket::from_socket(socket),
116 mode: ChannelMode::Basic,
117 max_tx_size,
118 flush_timeout: Arc::new(Mutex::new(None)),
119 audio_direction_ext: None,
120 l2cap_parameters_ext: None,
121 audio_offload_ext: None,
122 terminated: false,
123 }
124 }
125
126 pub const DEFAULT_MAX_TX: usize = 672;
129
130 pub fn create() -> (Self, Self) {
133 Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
134 }
135
136 pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
139 let (remote, local) = zx::Socket::create_datagram();
140 (
141 Channel::from_socket(remote, max_tx_size).unwrap(),
142 Channel::from_socket(local, max_tx_size).unwrap(),
143 )
144 }
145
146 pub fn max_tx_size(&self) -> usize {
149 self.max_tx_size
150 }
151
152 pub fn channel_mode(&self) -> &ChannelMode {
153 &self.mode
154 }
155
156 pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
157 self.flush_timeout.lock().clone()
158 }
159
160 pub fn set_audio_priority(
163 &self,
164 dir: A2dpDirection,
165 ) -> impl Future<Output = Result<(), Error>> + use<> {
166 let proxy = self.audio_direction_ext.clone();
167 async move {
168 match proxy {
169 None => return Err(Error::profile("audio priority not supported")),
170 Some(proxy) => proxy
171 .set_priority(dir.into())
172 .await?
173 .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
174 }
175 }
176 }
177
178 pub fn set_flush_timeout(
185 &self,
186 duration: Option<zx::MonotonicDuration>,
187 ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
188 let flush_timeout = self.flush_timeout.clone();
189 let current = self.flush_timeout.lock().clone();
190 let proxy = self.l2cap_parameters_ext.clone();
191 async move {
192 match (current, duration) {
193 (None, None) => return Ok(None),
194 (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
195 return Ok(current);
196 }
197 _ => {}
198 };
199 let proxy =
200 proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
201 let parameters = fidl_bt::ChannelParameters {
202 flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
203 ..Default::default()
204 };
205 let new_params = proxy.request_parameters(¶meters).await?;
206 let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
207 *(flush_timeout.lock()) = new_timeout.clone();
208 Ok(new_timeout)
209 }
210 }
211
212 pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
214 self.audio_offload_ext.clone()
215 }
216
217 pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
218 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
219 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
220 close_wait.map_ok(|_o| ())
221 }
222
223 pub fn is_closed<'a>(&'a self) -> bool {
224 self.socket.is_closed()
225 }
226
227 pub fn poll_datagram(
228 &self,
229 cx: &mut Context<'_>,
230 out: &mut Vec<u8>,
231 ) -> Poll<Result<usize, zx::Status>> {
232 self.socket.poll_datagram(cx, out)
233 }
234
235 pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
238 self.socket.as_ref().write(bytes)
239 }
240}
241
242impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
243 type Error = zx::Status;
244
245 fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
246 let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
247 Err(e) => {
248 warn!("Unsupported channel mode type: {e:?}");
249 return Err(zx::Status::INTERNAL);
250 }
251 Ok(c) => c,
252 };
253
254 Ok(Self {
255 socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
256 mode: channel,
257 max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
258 flush_timeout: Arc::new(Mutex::new(
259 fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
260 )),
261 audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
262 l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
263 audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
264 terminated: false,
265 })
266 }
267}
268
269impl TryFrom<Channel> for bredr::Channel {
270 type Error = Error;
271
272 fn try_from(channel: Channel) -> Result<Self, Self::Error> {
273 let socket = channel.socket.into_zx_socket();
274 let ext_direction = channel
275 .audio_direction_ext
276 .map(|proxy| {
277 let chan = proxy.into_channel()?;
278 Ok(ClientEnd::new(chan.into()))
279 })
280 .transpose()
281 .map_err(|_: bredr::AudioDirectionExtProxy| {
282 Error::profile("AudioDirection proxy in use")
283 })?;
284 let ext_l2cap = channel
285 .l2cap_parameters_ext
286 .map(|proxy| {
287 let chan = proxy.into_channel()?;
288 Ok(ClientEnd::new(chan.into()))
289 })
290 .transpose()
291 .map_err(|_: bredr::L2capParametersExtProxy| {
292 Error::profile("l2cap parameters proxy in use")
293 })?;
294 let ext_audio_offload = channel
295 .audio_offload_ext
296 .map(|proxy| {
297 let chan = proxy.into_channel()?;
298 Ok(ClientEnd::new(chan.into()))
299 })
300 .transpose()
301 .map_err(|_: bredr::AudioOffloadExtProxy| {
302 Error::profile("audio offload proxy in use")
303 })?;
304 let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
305 Ok(bredr::Channel {
306 socket: Some(socket),
307 channel_mode: Some(channel.mode.into()),
308 max_tx_sdu_size: Some(channel.max_tx_size as u16),
309 ext_direction,
310 flush_timeout,
311 ext_l2cap,
312 ext_audio_offload,
313 ..Default::default()
314 })
315 }
316}
317
318impl Stream for Channel {
319 type Item = Result<Vec<u8>, zx::Status>;
320
321 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
322 if self.terminated {
323 panic!("Channel polled after terminated");
324 }
325
326 let mut res = Vec::<u8>::new();
327 loop {
328 break match self.socket.poll_datagram(cx, &mut res) {
329 Poll::Ready(Ok(0)) => continue,
332 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
333 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
334 self.terminated = true;
335 Poll::Ready(None)
336 }
337 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
338 Poll::Pending => Poll::Pending,
339 };
340 }
341 }
342}
343
344impl FusedStream for Channel {
345 fn is_terminated(&self) -> bool {
346 self.terminated
347 }
348}
349
350impl io::AsyncRead for Channel {
351 fn poll_read(
352 mut self: Pin<&mut Self>,
353 cx: &mut Context<'_>,
354 buf: &mut [u8],
355 ) -> Poll<Result<usize, futures::io::Error>> {
356 Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
357 }
358}
359
360impl io::AsyncWrite for Channel {
361 fn poll_write(
362 mut self: Pin<&mut Self>,
363 cx: &mut Context<'_>,
364 buf: &[u8],
365 ) -> Poll<Result<usize, io::Error>> {
366 Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
367 }
368
369 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
370 Pin::new(&mut self.socket).as_mut().poll_flush(cx)
371 }
372
373 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
374 Pin::new(&mut self.socket).as_mut().poll_close(cx)
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use fidl::endpoints::create_request_stream;
382 use futures::{AsyncReadExt, FutureExt, StreamExt};
383 use std::pin::pin;
384
385 #[test]
386 fn test_channel_create_and_write() {
387 let _exec = fasync::TestExecutor::new();
388 let (mut recv, send) = Channel::create();
389
390 let mut buf: [u8; 8] = [0; 8];
391 let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
392
393 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
394 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
395
396 match read_fut.now_or_never() {
397 Some(Ok(4)) => {}
398 x => panic!("Expected Ok(4) from the read, got {x:?}"),
399 };
400 assert_eq!(heart, &buf[0..4]);
401 }
402
403 #[test]
404 fn test_channel_from_fidl() {
405 let _exec = fasync::TestExecutor::new();
406 let empty = bredr::Channel::default();
407 assert!(Channel::try_from(empty).is_err());
408
409 let (remote, _local) = zx::Socket::create_datagram();
410
411 let okay = bredr::Channel {
412 socket: Some(remote),
413 channel_mode: Some(fidl_bt::ChannelMode::Basic),
414 max_tx_sdu_size: Some(1004),
415 ..Default::default()
416 };
417
418 let chan = Channel::try_from(okay).expect("okay channel to be converted");
419
420 assert_eq!(1004, chan.max_tx_size());
421 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
422 }
423
424 #[test]
425 fn test_channel_closed() {
426 let mut exec = fasync::TestExecutor::new();
427
428 let (recv, send) = Channel::create();
429
430 let closed_fut = recv.closed();
431 let mut closed_fut = pin!(closed_fut);
432
433 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
434 assert!(!recv.is_closed());
435
436 drop(send);
437
438 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
439 assert!(recv.is_closed());
440 }
441
442 #[test]
443 fn test_direction_ext() {
444 let mut exec = fasync::TestExecutor::new();
445
446 let (remote, _local) = zx::Socket::create_datagram();
447 let no_ext = bredr::Channel {
448 socket: Some(remote),
449 channel_mode: Some(fidl_bt::ChannelMode::Basic),
450 max_tx_sdu_size: Some(1004),
451 ..Default::default()
452 };
453 let channel = Channel::try_from(no_ext).unwrap();
454
455 assert!(
456 exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
457 );
458 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
459
460 let (remote, _local) = zx::Socket::create_datagram();
461 let (client_end, mut direction_request_stream) =
462 create_request_stream::<bredr::AudioDirectionExtMarker>();
463 let ext = bredr::Channel {
464 socket: Some(remote),
465 channel_mode: Some(fidl_bt::ChannelMode::Basic),
466 max_tx_sdu_size: Some(1004),
467 ext_direction: Some(client_end),
468 ..Default::default()
469 };
470
471 let channel = Channel::try_from(ext).unwrap();
472
473 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
474 let mut audio_direction_fut = pin!(audio_direction_fut);
475
476 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
477
478 match exec.run_until_stalled(&mut direction_request_stream.next()) {
479 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
480 priority,
481 responder,
482 }))) => {
483 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
484 responder.send(Ok(())).expect("response to send cleanly");
485 }
486 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
487 };
488
489 match exec.run_until_stalled(&mut audio_direction_fut) {
490 Poll::Ready(Ok(())) => {}
491 _x => panic!("Expected ok result from audio direction response"),
492 };
493
494 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
495 let mut audio_direction_fut = pin!(audio_direction_fut);
496
497 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
498
499 match exec.run_until_stalled(&mut direction_request_stream.next()) {
500 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
501 priority,
502 responder,
503 }))) => {
504 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
505 responder
506 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
507 .expect("response to send cleanly");
508 }
509 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
510 };
511
512 match exec.run_until_stalled(&mut audio_direction_fut) {
513 Poll::Ready(Err(_)) => {}
514 _x => panic!("Expected error result from audio direction response"),
515 };
516 }
517
518 #[test]
519 fn test_flush_timeout() {
520 let mut exec = fasync::TestExecutor::new();
521
522 let (remote, _local) = zx::Socket::create_datagram();
523 let no_ext = bredr::Channel {
524 socket: Some(remote),
525 channel_mode: Some(fidl_bt::ChannelMode::Basic),
526 max_tx_sdu_size: Some(1004),
527 flush_timeout: Some(50_000_000), ..Default::default()
529 };
530 let channel = Channel::try_from(no_ext).unwrap();
531
532 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
533
534 let res = exec.run_singlethreaded(
536 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
537 );
538 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
539 let res = exec.run_singlethreaded(
540 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
541 );
542 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
543
544 assert!(
545 exec.run_singlethreaded(
546 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
547 )
548 .is_err()
549 );
550 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
551
552 let (remote, _local) = zx::Socket::create_datagram();
553 let (client_end, mut l2cap_request_stream) =
554 create_request_stream::<bredr::L2capParametersExtMarker>();
555 let ext = bredr::Channel {
556 socket: Some(remote),
557 channel_mode: Some(fidl_bt::ChannelMode::Basic),
558 max_tx_sdu_size: Some(1004),
559 flush_timeout: None,
560 ext_l2cap: Some(client_end),
561 ..Default::default()
562 };
563
564 let channel = Channel::try_from(ext).unwrap();
565
566 {
567 let flush_timeout_fut = channel.set_flush_timeout(None);
568 let mut flush_timeout_fut = pin!(flush_timeout_fut);
569
570 match exec.run_until_stalled(&mut flush_timeout_fut) {
572 Poll::Ready(Ok(None)) => {}
573 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
574 }
575 }
576
577 let req_duration = zx::MonotonicDuration::from_millis(42);
578
579 {
580 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
581 let mut flush_timeout_fut = pin!(flush_timeout_fut);
582
583 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
584
585 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
586 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
587 request,
588 responder,
589 }))) => {
590 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
591 let params = fidl_bt::ChannelParameters {
593 flush_timeout: Some(50_000_000), ..Default::default()
595 };
596 responder.send(¶ms).expect("response to send cleanly");
597 }
598 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
599 };
600
601 match exec.run_until_stalled(&mut flush_timeout_fut) {
602 Poll::Ready(Ok(Some(duration))) => {
603 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
604 }
605 x => panic!("Expected ready result from params response, got {:?}", x),
606 };
607 }
608
609 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
611 }
612
613 #[test]
614 fn test_audio_offload() {
615 let _exec = fasync::TestExecutor::new();
616
617 let (remote, _local) = zx::Socket::create_datagram();
618 let no_ext = bredr::Channel {
619 socket: Some(remote),
620 channel_mode: Some(fidl_bt::ChannelMode::Basic),
621 max_tx_sdu_size: Some(1004),
622 ..Default::default()
623 };
624 let channel = Channel::try_from(no_ext).unwrap();
625
626 assert!(channel.audio_offload().is_none());
627
628 let (remote, _local) = zx::Socket::create_datagram();
629 let (client_end, mut _audio_offload_ext_req_stream) =
630 create_request_stream::<bredr::AudioOffloadExtMarker>();
631 let ext = bredr::Channel {
632 socket: Some(remote),
633 channel_mode: Some(fidl_bt::ChannelMode::Basic),
634 max_tx_sdu_size: Some(1004),
635 ext_audio_offload: Some(client_end),
636 ..Default::default()
637 };
638
639 let channel = Channel::try_from(ext).unwrap();
640
641 let offload_ext = channel.audio_offload();
642 assert!(offload_ext.is_some());
643 assert!(channel.audio_offload().is_some());
645 drop(offload_ext);
647 assert!(channel.audio_offload().is_some());
648 }
649
650 #[test]
651 fn channel_async_read() {
652 let mut exec = fasync::TestExecutor::new();
653 let (mut recv, send) = Channel::create();
654
655 let max_tx_size = recv.max_tx_size();
657 let mut read_buf = vec![0; max_tx_size];
658 let mut read_fut = recv.read(&mut read_buf[..]);
659
660 assert!(exec.run_until_stalled(&mut read_fut).is_pending());
661
662 let data = &[0x01, 0x02, 0x03, 0x04];
663 assert_eq!(data.len(), send.write(data).expect("should write successfully"));
664
665 let read_len = match exec.run_until_stalled(&mut read_fut) {
667 Poll::Ready(Ok(read_len)) => read_len,
668 x => panic!("Expected successful read, got {x:?}"),
669 };
670 assert_eq!(read_len, data.len());
671 assert_eq!(&data[..], &read_buf[..data.len()]);
672
673 let mut read_buf = [0; 4]; let mut read_fut = recv.read(&mut read_buf);
676
677 let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
678 assert_eq!(
679 oversized_data.len(),
680 send.write(oversized_data).expect("should write successfully")
681 );
682
683 let read_len = match exec.run_until_stalled(&mut read_fut) {
685 Poll::Ready(Ok(read_len)) => read_len,
686 x => panic!("Expected successful read, got {x:?}"),
687 };
688 assert_eq!(read_len, read_buf.len());
689 assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
690
691 let mut leftover_buf = [0; 1];
693 let mut leftover_fut = recv.read(&mut leftover_buf);
694 assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
695 }
696
697 #[test]
698 fn channel_stream() {
699 let mut exec = fasync::TestExecutor::new();
700 let (mut recv, send) = Channel::create();
701
702 let mut stream_fut = recv.next();
703
704 assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
705
706 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
707 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
708
709 match exec.run_until_stalled(&mut stream_fut) {
710 Poll::Ready(Some(Ok(bytes))) => {
711 assert_eq!(heart.to_vec(), bytes);
712 }
713 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
714 };
715
716 drop(send);
718
719 let mut stream_fut = recv.next();
720 match exec.run_until_stalled(&mut stream_fut) {
721 Poll::Ready(None) => {}
722 x => panic!("Expected None from the stream after close, got {x:?}"),
723 }
724
725 assert!(recv.is_terminated());
727 }
728}