fuchsia_bluetooth/types/channel/
socket.rs1use fidl_fuchsia_bluetooth_bredr as bredr;
6use fuchsia_async as fasync;
7use futures::sink::Sink;
8use futures::stream::Stream;
9use futures::{Future, TryFutureExt, ready};
10use log::error;
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use zx_status_ext::IoErrorKindExt;
15
16use super::{Connection, ConnectionBackendType};
17
18#[derive(Debug)]
20pub struct SocketConnection {
21 socket: fasync::Socket,
22 send_buffer: VecDeque<Vec<u8>>,
23}
24
25impl SocketConnection {
26 const MAX_QUEUED_PACKETS: usize = 32;
27
28 pub fn new(socket: zx::Socket) -> Self {
29 Self {
30 socket: fasync::Socket::from_socket(socket),
31 send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
32 }
33 }
34
35 pub fn into_zx_socket(self) -> zx::Socket {
36 self.socket.into_zx_socket()
37 }
38}
39
40impl Connection for SocketConnection {
41 fn closed<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<(), zx::Status>> + 'a>> {
42 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
43 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
44 Box::pin(close_wait.map_ok(|_o| ()))
45 }
46
47 fn connection_type(&self) -> ConnectionBackendType {
48 ConnectionBackendType::Socket
49 }
50
51 fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
52 self.socket.as_ref().write(bytes)
53 }
54
55 fn is_closed(&self) -> bool {
56 self.socket.is_closed()
57 }
58
59 fn into_fidl_channel(self: Box<Self>) -> Result<bredr::Channel, zx::Status> {
60 let socket = self.into_zx_socket();
61 Ok(bredr::Channel { socket: Some(socket), ..Default::default() })
62 }
63}
64
65impl Stream for SocketConnection {
66 type Item = Result<Vec<u8>, zx::Status>;
67
68 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69 let mut res = Vec::<u8>::new();
70 loop {
71 break match self.socket.poll_datagram(cx, &mut res) {
72 Poll::Ready(Ok(0)) => continue,
73 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
74 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => Poll::Ready(None),
75 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
76 Poll::Pending => Poll::Pending,
77 };
78 }
79 }
80}
81
82impl Sink<Vec<u8>> for SocketConnection {
83 type Error = zx::Status;
84
85 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 let _ = Sink::poll_flush(self.as_mut(), cx)?;
87
88 if self.send_buffer.len() >= SocketConnection::MAX_QUEUED_PACKETS {
89 return Poll::Pending;
90 }
91 Poll::Ready(Ok(()))
92 }
93
94 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
95 self.get_mut().send_buffer.push_back(item);
96 Ok(())
97 }
98
99 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100 let this = self.get_mut();
101 use futures::io::AsyncWrite;
102 while let Some(item) = this.send_buffer.front() {
103 let res =
104 Pin::new(&mut this.socket).poll_write(cx, item).map_err(|e| e.kind().to_status());
105 match res {
106 Poll::Ready(Ok(size)) => {
107 if size == item.len() {
108 let _ = this.send_buffer.pop_front();
109 } else {
110 error!(
111 "Partial write in SocketConnection::Sink::poll_flush: wrote {} bytes of {} byte packet.",
112 size,
113 item.len()
114 );
115 let item = this.send_buffer.front_mut().unwrap();
116 *item = item.split_off(size);
117 }
118 }
119 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
120 Poll::Pending => return Poll::Pending,
121 }
122 }
123 Pin::new(&mut this.socket).poll_flush(cx).map_err(|e| e.kind().to_status())
124 }
125
126 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127 ready!(Sink::poll_flush(self.as_mut(), cx))?;
128 let this = self.get_mut();
129 use futures::io::AsyncWrite as _;
130 Pin::new(&mut this.socket).poll_close(cx).map_err(|e| e.kind().to_status())
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use crate::types::{A2dpDirection, Channel, ChannelMode};
138 use fidl::endpoints::create_request_stream;
139 use fidl_fuchsia_bluetooth as fidl_bt;
140 use fidl_fuchsia_bluetooth_bredr as bredr;
141 use futures::stream::FusedStream;
142 use futures::{SinkExt, StreamExt};
143 use std::pin::pin;
144
145 #[test]
146 fn channel_sync_write() {
147 let mut exec = fasync::TestExecutor::new();
148 let (mut recv, send) = Channel::create();
149
150 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
151 let size = send.write(heart).expect("write to succeed");
152 assert_eq!(size, heart.len());
153
154 let mut recv_fut = recv.next();
155 match exec.run_until_stalled(&mut recv_fut) {
156 Poll::Ready(Some(Ok(bytes))) => {
157 assert_eq!(heart, &bytes);
158 }
159 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
160 };
161 }
162
163 #[test]
164 fn channel_from_fidl() {
165 let _exec = fasync::TestExecutor::new();
166 let empty = bredr::Channel::default();
167 assert!(Channel::try_from(empty).is_err());
168
169 let (remote, _local) = zx::Socket::create_datagram();
170
171 let valid_fidl_channel = bredr::Channel {
172 socket: Some(remote),
173 channel_mode: Some(fidl_bt::ChannelMode::Basic),
174 max_tx_sdu_size: Some(1004),
175 ..Default::default()
176 };
177
178 let chan = Channel::try_from(valid_fidl_channel).expect("okay channel to be converted");
179
180 assert_eq!(1004, chan.max_tx_size());
181 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
182 }
183
184 #[test]
185 fn channel_closed() {
186 let mut exec = fasync::TestExecutor::new();
187
188 let (recv, send) = Channel::create();
189
190 let closed_fut = recv.closed();
191 let mut closed_fut = pin!(closed_fut);
192
193 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
194 assert!(!recv.is_closed());
195
196 drop(send);
197
198 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
199 assert!(recv.is_closed());
200 }
201
202 #[test]
203 fn direction_ext() {
204 let mut exec = fasync::TestExecutor::new();
205
206 let (remote, _local) = zx::Socket::create_datagram();
207 let fidl_channel_no_ext = bredr::Channel {
208 socket: Some(remote),
209 channel_mode: Some(fidl_bt::ChannelMode::Basic),
210 max_tx_sdu_size: Some(1004),
211 ..Default::default()
212 };
213 let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
214
215 assert!(
216 exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
217 );
218 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
219
220 let (remote, _local) = zx::Socket::create_datagram();
221 let (client_end, mut direction_request_stream) =
222 create_request_stream::<bredr::AudioDirectionExtMarker>();
223 let fidl_channel_with_ext = bredr::Channel {
224 socket: Some(remote),
225 channel_mode: Some(fidl_bt::ChannelMode::Basic),
226 max_tx_sdu_size: Some(1004),
227 ext_direction: Some(client_end),
228 ..Default::default()
229 };
230
231 let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
232
233 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
234 let mut audio_direction_fut = pin!(audio_direction_fut);
235
236 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
237
238 match exec.run_until_stalled(&mut direction_request_stream.next()) {
239 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
240 priority,
241 responder,
242 }))) => {
243 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
244 responder.send(Ok(())).expect("response to send cleanly");
245 }
246 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
247 };
248
249 match exec.run_until_stalled(&mut audio_direction_fut) {
250 Poll::Ready(Ok(())) => {}
251 _x => panic!("Expected ok result from audio direction response"),
252 };
253
254 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
255 let mut audio_direction_fut = pin!(audio_direction_fut);
256
257 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
258
259 match exec.run_until_stalled(&mut direction_request_stream.next()) {
260 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
261 priority,
262 responder,
263 }))) => {
264 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
265 responder
266 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
267 .expect("response to send cleanly");
268 }
269 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
270 };
271
272 match exec.run_until_stalled(&mut audio_direction_fut) {
273 Poll::Ready(Err(_)) => {}
274 _x => panic!("Expected error result from audio direction response"),
275 };
276 }
277
278 #[test]
279 fn flush_timeout() {
280 let mut exec = fasync::TestExecutor::new();
281
282 let (remote, _local) = zx::Socket::create_datagram();
283 let fidl_channel_no_ext = bredr::Channel {
284 socket: Some(remote),
285 channel_mode: Some(fidl_bt::ChannelMode::Basic),
286 max_tx_sdu_size: Some(1004),
287 flush_timeout: Some(50_000_000), ..Default::default()
289 };
290 let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
291
292 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
293
294 let res = exec.run_singlethreaded(
296 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
297 );
298 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
299 let res = exec.run_singlethreaded(
300 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
301 );
302 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
303
304 assert!(
305 exec.run_singlethreaded(
306 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
307 )
308 .is_err()
309 );
310 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
311
312 let (remote, _local) = zx::Socket::create_datagram();
313 let (client_end, mut l2cap_request_stream) =
314 create_request_stream::<bredr::L2capParametersExtMarker>();
315 let fidl_channel_with_ext = bredr::Channel {
316 socket: Some(remote),
317 channel_mode: Some(fidl_bt::ChannelMode::Basic),
318 max_tx_sdu_size: Some(1004),
319 flush_timeout: None,
320 ext_l2cap: Some(client_end),
321 ..Default::default()
322 };
323
324 let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
325
326 {
327 let flush_timeout_fut = channel.set_flush_timeout(None);
328 let mut flush_timeout_fut = pin!(flush_timeout_fut);
329
330 match exec.run_until_stalled(&mut flush_timeout_fut) {
332 Poll::Ready(Ok(None)) => {}
333 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
334 }
335 }
336
337 let req_duration = zx::MonotonicDuration::from_millis(42);
338
339 {
340 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
341 let mut flush_timeout_fut = pin!(flush_timeout_fut);
342
343 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
344
345 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
346 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
347 request,
348 responder,
349 }))) => {
350 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
351 let params = fidl_bt::ChannelParameters {
353 flush_timeout: Some(50_000_000), ..Default::default()
355 };
356 responder.send(¶ms).expect("response to send cleanly");
357 }
358 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
359 };
360
361 match exec.run_until_stalled(&mut flush_timeout_fut) {
362 Poll::Ready(Ok(Some(duration))) => {
363 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
364 }
365 x => panic!("Expected ready result from params response, got {:?}", x),
366 };
367 }
368
369 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
371 }
372
373 #[test]
374 fn audio_offload() {
375 let _exec = fasync::TestExecutor::new();
376
377 let (remote, _local) = zx::Socket::create_datagram();
378 let fidl_channel_no_ext = bredr::Channel {
379 socket: Some(remote),
380 channel_mode: Some(fidl_bt::ChannelMode::Basic),
381 max_tx_sdu_size: Some(1004),
382 ..Default::default()
383 };
384 let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
385
386 assert!(channel.audio_offload().is_none());
387
388 let (remote, _local) = zx::Socket::create_datagram();
389 let (client_end, mut _audio_offload_ext_req_stream) =
390 create_request_stream::<bredr::AudioOffloadExtMarker>();
391 let fidl_channel_with_ext = bredr::Channel {
392 socket: Some(remote),
393 channel_mode: Some(fidl_bt::ChannelMode::Basic),
394 max_tx_sdu_size: Some(1004),
395 ext_audio_offload: Some(client_end),
396 ..Default::default()
397 };
398
399 let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
400
401 let offload_ext = channel.audio_offload();
402 assert!(offload_ext.is_some());
403 assert!(channel.audio_offload().is_some());
405 drop(offload_ext);
407 assert!(channel.audio_offload().is_some());
408 }
409
410 #[test]
411 fn channel_sink() {
412 let mut exec = fasync::TestExecutor::new();
413 let (mut recv, mut send) = Channel::create();
414
415 let data = vec![0x01, 0x02, 0x03, 0x04];
416 let mut send_fut = send.send(data.clone());
417
418 match exec.run_until_stalled(&mut send_fut) {
420 Poll::Ready(Ok(())) => {}
421 x => panic!("Expected Ready(Ok(())), got {:?}", x),
422 }
423
424 let mut recv_fut = recv.next();
425 match exec.run_until_stalled(&mut recv_fut) {
426 Poll::Ready(Some(Ok(bytes))) => assert_eq!(data, bytes),
427 x => panic!("Expected successful read, got {x:?}"),
428 }
429 }
430
431 #[test]
432 fn channel_stream() {
433 let mut exec = fasync::TestExecutor::new();
434 let (remote, local) = zx::Socket::create_datagram();
435 let mut recv = Channel::from_socket(remote, Channel::DEFAULT_MAX_TX).unwrap();
436 let send = local;
437
438 let mut stream_fut = recv.next();
439
440 assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
441
442 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
443 let _ = send.write(heart).expect("should write successfully");
444
445 match exec.run_until_stalled(&mut stream_fut) {
446 Poll::Ready(Some(Ok(bytes))) => {
447 assert_eq!(heart.to_vec(), bytes);
448 }
449 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
450 }
451
452 drop(send);
454
455 let mut stream_fut = recv.next();
456 match exec.run_until_stalled(&mut stream_fut) {
457 Poll::Ready(None) => {}
458 x => panic!("Expected None from the stream after close, got {x:?}"),
459 }
460
461 assert!(recv.is_terminated());
463 }
464}