1use fidl::endpoints::{ClientEnd, Proxy};
6use fidl_fuchsia_bluetooth as fidl_bt;
7use fidl_fuchsia_bluetooth_bredr as bredr;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::stream::{FusedStream, Stream};
11use futures::{Future, TryFutureExt, io};
12use log::warn;
13use std::fmt;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use crate::error::Error;
19
20#[derive(PartialEq, Debug, Clone)]
22pub enum ChannelMode {
23 Basic,
24 EnhancedRetransmissionMode,
25 LeCreditBasedFlowControl,
26 EnhancedCreditBasedFlowControl,
27}
28
29impl fmt::Display for ChannelMode {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 ChannelMode::Basic => write!(f, "Basic"),
33 ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
34 ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
35 ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
36 }
37 }
38}
39
40pub enum A2dpDirection {
41 Normal,
42 Source,
43 Sink,
44}
45
46impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
47 fn from(pri: A2dpDirection) -> Self {
48 match pri {
49 A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
50 A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
51 A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
52 }
53 }
54}
55
56impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
57 type Error = Error;
58 fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
59 match fidl {
60 fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
61 fidl_bt::ChannelMode::EnhancedRetransmission => {
62 Ok(ChannelMode::EnhancedRetransmissionMode)
63 }
64 fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
65 Ok(ChannelMode::LeCreditBasedFlowControl)
66 }
67 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
68 Ok(ChannelMode::EnhancedCreditBasedFlowControl)
69 }
70 x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
71 }
72 }
73}
74
75impl From<ChannelMode> for fidl_bt::ChannelMode {
76 fn from(x: ChannelMode) -> Self {
77 match x {
78 ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
79 ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
80 ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
81 ChannelMode::EnhancedCreditBasedFlowControl => {
82 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
83 }
84 }
85 }
86}
87
88#[derive(Debug)]
93pub struct Channel {
94 socket: fasync::Socket,
95 mode: ChannelMode,
96 max_tx_size: usize,
97 flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
98 audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
99 l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
100 audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
101 terminated: bool,
102}
103
104impl Channel {
105 pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
108 Ok(Self::from_socket_infallible(socket, max_tx_size))
109 }
110
111 pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
113 Channel {
114 socket: fasync::Socket::from_socket(socket),
115 mode: ChannelMode::Basic,
116 max_tx_size,
117 flush_timeout: Arc::new(Mutex::new(None)),
118 audio_direction_ext: None,
119 l2cap_parameters_ext: None,
120 audio_offload_ext: None,
121 terminated: false,
122 }
123 }
124
125 pub const DEFAULT_MAX_TX: usize = 672;
128
129 pub fn create() -> (Self, Self) {
132 Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
133 }
134
135 pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
138 let (remote, local) = zx::Socket::create_datagram();
139 (
140 Channel::from_socket(remote, max_tx_size).unwrap(),
141 Channel::from_socket(local, max_tx_size).unwrap(),
142 )
143 }
144
145 pub fn max_tx_size(&self) -> usize {
148 self.max_tx_size
149 }
150
151 pub fn channel_mode(&self) -> &ChannelMode {
152 &self.mode
153 }
154
155 pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
156 self.flush_timeout.lock().clone()
157 }
158
159 pub fn set_audio_priority(
162 &self,
163 dir: A2dpDirection,
164 ) -> impl Future<Output = Result<(), Error>> + use<> {
165 let proxy = self.audio_direction_ext.clone();
166 async move {
167 match proxy {
168 None => return Err(Error::profile("audio priority not supported")),
169 Some(proxy) => proxy
170 .set_priority(dir.into())
171 .await?
172 .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
173 }
174 }
175 }
176
177 pub fn set_flush_timeout(
184 &self,
185 duration: Option<zx::MonotonicDuration>,
186 ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
187 let flush_timeout = self.flush_timeout.clone();
188 let current = self.flush_timeout.lock().clone();
189 let proxy = self.l2cap_parameters_ext.clone();
190 async move {
191 match (current, duration) {
192 (None, None) => return Ok(None),
193 (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
194 return Ok(current);
195 }
196 _ => {}
197 };
198 let proxy =
199 proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
200 let parameters = fidl_bt::ChannelParameters {
201 flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
202 ..Default::default()
203 };
204 let new_params = proxy.request_parameters(¶meters).await?;
205 let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
206 *(flush_timeout.lock()) = new_timeout.clone();
207 Ok(new_timeout)
208 }
209 }
210
211 pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
213 self.audio_offload_ext.clone()
214 }
215
216 pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
217 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
218 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
219 close_wait.map_ok(|_o| ())
220 }
221
222 pub fn is_closed<'a>(&'a self) -> bool {
223 self.socket.is_closed()
224 }
225
226 pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
229 self.socket.as_ref().write(bytes)
230 }
231}
232
233impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
234 type Error = zx::Status;
235
236 fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
237 let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
238 Err(e) => {
239 warn!("Unsupported channel mode type: {e:?}");
240 return Err(zx::Status::INTERNAL);
241 }
242 Ok(c) => c,
243 };
244
245 Ok(Self {
246 socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
247 mode: channel,
248 max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
249 flush_timeout: Arc::new(Mutex::new(
250 fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
251 )),
252 audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
253 l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
254 audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
255 terminated: false,
256 })
257 }
258}
259
260impl TryFrom<Channel> for bredr::Channel {
261 type Error = Error;
262
263 fn try_from(channel: Channel) -> Result<Self, Self::Error> {
264 let socket = channel.socket.into_zx_socket();
265 let ext_direction = channel
266 .audio_direction_ext
267 .map(|proxy| {
268 let chan = proxy.into_channel()?;
269 Ok(ClientEnd::new(chan.into()))
270 })
271 .transpose()
272 .map_err(|_: bredr::AudioDirectionExtProxy| {
273 Error::profile("AudioDirection proxy in use")
274 })?;
275 let ext_l2cap = channel
276 .l2cap_parameters_ext
277 .map(|proxy| {
278 let chan = proxy.into_channel()?;
279 Ok(ClientEnd::new(chan.into()))
280 })
281 .transpose()
282 .map_err(|_: bredr::L2capParametersExtProxy| {
283 Error::profile("l2cap parameters proxy in use")
284 })?;
285 let ext_audio_offload = channel
286 .audio_offload_ext
287 .map(|proxy| {
288 let chan = proxy.into_channel()?;
289 Ok(ClientEnd::new(chan.into()))
290 })
291 .transpose()
292 .map_err(|_: bredr::AudioOffloadExtProxy| {
293 Error::profile("audio offload proxy in use")
294 })?;
295 let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
296 Ok(bredr::Channel {
297 socket: Some(socket),
298 channel_mode: Some(channel.mode.into()),
299 max_tx_sdu_size: Some(channel.max_tx_size as u16),
300 ext_direction,
301 flush_timeout,
302 ext_l2cap,
303 ext_audio_offload,
304 ..Default::default()
305 })
306 }
307}
308
309impl Stream for Channel {
310 type Item = Result<Vec<u8>, zx::Status>;
311
312 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
313 if self.terminated {
314 panic!("Channel polled after terminated");
315 }
316
317 let mut res = Vec::<u8>::new();
318 loop {
319 break match self.socket.poll_datagram(cx, &mut res) {
320 Poll::Ready(Ok(0)) => continue,
323 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
324 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
325 self.terminated = true;
326 Poll::Ready(None)
327 }
328 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
329 Poll::Pending => Poll::Pending,
330 };
331 }
332 }
333}
334
335impl FusedStream for Channel {
336 fn is_terminated(&self) -> bool {
337 self.terminated
338 }
339}
340
341impl io::AsyncRead for Channel {
342 fn poll_read(
343 mut self: Pin<&mut Self>,
344 cx: &mut Context<'_>,
345 buf: &mut [u8],
346 ) -> Poll<Result<usize, futures::io::Error>> {
347 Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
348 }
349}
350
351impl io::AsyncWrite for Channel {
352 fn poll_write(
353 mut self: Pin<&mut Self>,
354 cx: &mut Context<'_>,
355 buf: &[u8],
356 ) -> Poll<Result<usize, io::Error>> {
357 Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
358 }
359
360 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
361 Pin::new(&mut self.socket).as_mut().poll_flush(cx)
362 }
363
364 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
365 Pin::new(&mut self.socket).as_mut().poll_close(cx)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use fidl::endpoints::create_request_stream;
373 use futures::{AsyncReadExt, FutureExt, StreamExt};
374 use std::pin::pin;
375
376 #[test]
377 fn test_channel_create_and_write() {
378 let _exec = fasync::TestExecutor::new();
379 let (mut recv, send) = Channel::create();
380
381 let mut buf: [u8; 8] = [0; 8];
382 let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
383
384 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
385 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
386
387 match read_fut.now_or_never() {
388 Some(Ok(4)) => {}
389 x => panic!("Expected Ok(4) from the read, got {x:?}"),
390 };
391 assert_eq!(heart, &buf[0..4]);
392 }
393
394 #[test]
395 fn test_channel_from_fidl() {
396 let _exec = fasync::TestExecutor::new();
397 let empty = bredr::Channel::default();
398 assert!(Channel::try_from(empty).is_err());
399
400 let (remote, _local) = zx::Socket::create_datagram();
401
402 let okay = bredr::Channel {
403 socket: Some(remote),
404 channel_mode: Some(fidl_bt::ChannelMode::Basic),
405 max_tx_sdu_size: Some(1004),
406 ..Default::default()
407 };
408
409 let chan = Channel::try_from(okay).expect("okay channel to be converted");
410
411 assert_eq!(1004, chan.max_tx_size());
412 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
413 }
414
415 #[test]
416 fn test_channel_closed() {
417 let mut exec = fasync::TestExecutor::new();
418
419 let (recv, send) = Channel::create();
420
421 let closed_fut = recv.closed();
422 let mut closed_fut = pin!(closed_fut);
423
424 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
425 assert!(!recv.is_closed());
426
427 drop(send);
428
429 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
430 assert!(recv.is_closed());
431 }
432
433 #[test]
434 fn test_direction_ext() {
435 let mut exec = fasync::TestExecutor::new();
436
437 let (remote, _local) = zx::Socket::create_datagram();
438 let no_ext = bredr::Channel {
439 socket: Some(remote),
440 channel_mode: Some(fidl_bt::ChannelMode::Basic),
441 max_tx_sdu_size: Some(1004),
442 ..Default::default()
443 };
444 let channel = Channel::try_from(no_ext).unwrap();
445
446 assert!(
447 exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
448 );
449 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
450
451 let (remote, _local) = zx::Socket::create_datagram();
452 let (client_end, mut direction_request_stream) =
453 create_request_stream::<bredr::AudioDirectionExtMarker>();
454 let ext = bredr::Channel {
455 socket: Some(remote),
456 channel_mode: Some(fidl_bt::ChannelMode::Basic),
457 max_tx_sdu_size: Some(1004),
458 ext_direction: Some(client_end),
459 ..Default::default()
460 };
461
462 let channel = Channel::try_from(ext).unwrap();
463
464 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
465 let mut audio_direction_fut = pin!(audio_direction_fut);
466
467 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
468
469 match exec.run_until_stalled(&mut direction_request_stream.next()) {
470 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
471 priority,
472 responder,
473 }))) => {
474 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
475 responder.send(Ok(())).expect("response to send cleanly");
476 }
477 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
478 };
479
480 match exec.run_until_stalled(&mut audio_direction_fut) {
481 Poll::Ready(Ok(())) => {}
482 _x => panic!("Expected ok result from audio direction response"),
483 };
484
485 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
486 let mut audio_direction_fut = pin!(audio_direction_fut);
487
488 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
489
490 match exec.run_until_stalled(&mut direction_request_stream.next()) {
491 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
492 priority,
493 responder,
494 }))) => {
495 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
496 responder
497 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
498 .expect("response to send cleanly");
499 }
500 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
501 };
502
503 match exec.run_until_stalled(&mut audio_direction_fut) {
504 Poll::Ready(Err(_)) => {}
505 _x => panic!("Expected error result from audio direction response"),
506 };
507 }
508
509 #[test]
510 fn test_flush_timeout() {
511 let mut exec = fasync::TestExecutor::new();
512
513 let (remote, _local) = zx::Socket::create_datagram();
514 let no_ext = bredr::Channel {
515 socket: Some(remote),
516 channel_mode: Some(fidl_bt::ChannelMode::Basic),
517 max_tx_sdu_size: Some(1004),
518 flush_timeout: Some(50_000_000), ..Default::default()
520 };
521 let channel = Channel::try_from(no_ext).unwrap();
522
523 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
524
525 let res = exec.run_singlethreaded(
527 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
528 );
529 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
530 let res = exec.run_singlethreaded(
531 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
532 );
533 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
534
535 assert!(
536 exec.run_singlethreaded(
537 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
538 )
539 .is_err()
540 );
541 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
542
543 let (remote, _local) = zx::Socket::create_datagram();
544 let (client_end, mut l2cap_request_stream) =
545 create_request_stream::<bredr::L2capParametersExtMarker>();
546 let ext = bredr::Channel {
547 socket: Some(remote),
548 channel_mode: Some(fidl_bt::ChannelMode::Basic),
549 max_tx_sdu_size: Some(1004),
550 flush_timeout: None,
551 ext_l2cap: Some(client_end),
552 ..Default::default()
553 };
554
555 let channel = Channel::try_from(ext).unwrap();
556
557 {
558 let flush_timeout_fut = channel.set_flush_timeout(None);
559 let mut flush_timeout_fut = pin!(flush_timeout_fut);
560
561 match exec.run_until_stalled(&mut flush_timeout_fut) {
563 Poll::Ready(Ok(None)) => {}
564 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
565 }
566 }
567
568 let req_duration = zx::MonotonicDuration::from_millis(42);
569
570 {
571 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
572 let mut flush_timeout_fut = pin!(flush_timeout_fut);
573
574 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
575
576 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
577 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
578 request,
579 responder,
580 }))) => {
581 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
582 let params = fidl_bt::ChannelParameters {
584 flush_timeout: Some(50_000_000), ..Default::default()
586 };
587 responder.send(¶ms).expect("response to send cleanly");
588 }
589 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
590 };
591
592 match exec.run_until_stalled(&mut flush_timeout_fut) {
593 Poll::Ready(Ok(Some(duration))) => {
594 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
595 }
596 x => panic!("Expected ready result from params response, got {:?}", x),
597 };
598 }
599
600 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
602 }
603
604 #[test]
605 fn test_audio_offload() {
606 let _exec = fasync::TestExecutor::new();
607
608 let (remote, _local) = zx::Socket::create_datagram();
609 let no_ext = bredr::Channel {
610 socket: Some(remote),
611 channel_mode: Some(fidl_bt::ChannelMode::Basic),
612 max_tx_sdu_size: Some(1004),
613 ..Default::default()
614 };
615 let channel = Channel::try_from(no_ext).unwrap();
616
617 assert!(channel.audio_offload().is_none());
618
619 let (remote, _local) = zx::Socket::create_datagram();
620 let (client_end, mut _audio_offload_ext_req_stream) =
621 create_request_stream::<bredr::AudioOffloadExtMarker>();
622 let ext = bredr::Channel {
623 socket: Some(remote),
624 channel_mode: Some(fidl_bt::ChannelMode::Basic),
625 max_tx_sdu_size: Some(1004),
626 ext_audio_offload: Some(client_end),
627 ..Default::default()
628 };
629
630 let channel = Channel::try_from(ext).unwrap();
631
632 let offload_ext = channel.audio_offload();
633 assert!(offload_ext.is_some());
634 assert!(channel.audio_offload().is_some());
636 drop(offload_ext);
638 assert!(channel.audio_offload().is_some());
639 }
640
641 #[test]
642 fn channel_async_read() {
643 let mut exec = fasync::TestExecutor::new();
644 let (mut recv, send) = Channel::create();
645
646 let max_tx_size = recv.max_tx_size();
648 let mut read_buf = vec![0; max_tx_size];
649 let mut read_fut = recv.read(&mut read_buf[..]);
650
651 assert!(exec.run_until_stalled(&mut read_fut).is_pending());
652
653 let data = &[0x01, 0x02, 0x03, 0x04];
654 assert_eq!(data.len(), send.write(data).expect("should write successfully"));
655
656 let read_len = match exec.run_until_stalled(&mut read_fut) {
658 Poll::Ready(Ok(read_len)) => read_len,
659 x => panic!("Expected successful read, got {x:?}"),
660 };
661 assert_eq!(read_len, data.len());
662 assert_eq!(&data[..], &read_buf[..data.len()]);
663
664 let mut read_buf = [0; 4]; let mut read_fut = recv.read(&mut read_buf);
667
668 let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
669 assert_eq!(
670 oversized_data.len(),
671 send.write(oversized_data).expect("should write successfully")
672 );
673
674 let read_len = match exec.run_until_stalled(&mut read_fut) {
676 Poll::Ready(Ok(read_len)) => read_len,
677 x => panic!("Expected successful read, got {x:?}"),
678 };
679 assert_eq!(read_len, read_buf.len());
680 assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
681
682 let mut leftover_buf = [0; 1];
684 let mut leftover_fut = recv.read(&mut leftover_buf);
685 assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
686 }
687
688 #[test]
689 fn channel_stream() {
690 let mut exec = fasync::TestExecutor::new();
691 let (mut recv, send) = Channel::create();
692
693 let mut stream_fut = recv.next();
694
695 assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
696
697 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
698 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
699
700 match exec.run_until_stalled(&mut stream_fut) {
701 Poll::Ready(Some(Ok(bytes))) => {
702 assert_eq!(heart.to_vec(), bytes);
703 }
704 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
705 };
706
707 drop(send);
709
710 let mut stream_fut = recv.next();
711 match exec.run_until_stalled(&mut stream_fut) {
712 Poll::Ready(None) => {}
713 x => panic!("Expected None from the stream after close, got {x:?}"),
714 }
715
716 assert!(recv.is_terminated());
718 }
719}