Skip to main content

fuchsia_bluetooth/types/channel/
socket.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_bredr as bredr;
6use fuchsia_async as fasync;
7use futures::sink::Sink;
8use futures::stream::Stream;
9use futures::{Future, TryFutureExt, ready};
10use log::error;
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use zx_status_ext::IoErrorKindExt;
15
16use super::{Connection, ConnectionBackendType};
17
18/// A socket-based implementation of the Bluetooth channel transport.
19#[derive(Debug)]
20pub struct SocketConnection {
21    socket: fasync::Socket,
22    send_buffer: VecDeque<Vec<u8>>,
23}
24
25impl SocketConnection {
26    const MAX_QUEUED_PACKETS: usize = 32;
27
28    pub fn new(socket: zx::Socket) -> Self {
29        Self {
30            socket: fasync::Socket::from_socket(socket),
31            send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
32        }
33    }
34
35    pub fn into_zx_socket(self) -> zx::Socket {
36        self.socket.into_zx_socket()
37    }
38}
39
40impl Connection for SocketConnection {
41    fn closed<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<(), zx::Status>> + 'a>> {
42        let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
43        let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
44        Box::pin(close_wait.map_ok(|_o| ()))
45    }
46
47    fn connection_type(&self) -> ConnectionBackendType {
48        ConnectionBackendType::Socket
49    }
50
51    fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
52        self.socket.as_ref().write(bytes)
53    }
54
55    fn is_closed(&self) -> bool {
56        self.socket.is_closed()
57    }
58
59    fn into_fidl_channel(self: Box<Self>) -> Result<bredr::Channel, zx::Status> {
60        let socket = self.into_zx_socket();
61        Ok(bredr::Channel { socket: Some(socket), ..Default::default() })
62    }
63}
64
65impl Stream for SocketConnection {
66    type Item = Result<Vec<u8>, zx::Status>;
67
68    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69        let mut res = Vec::<u8>::new();
70        loop {
71            break match self.socket.poll_datagram(cx, &mut res) {
72                Poll::Ready(Ok(0)) => continue,
73                Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
74                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => Poll::Ready(None),
75                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
76                Poll::Pending => Poll::Pending,
77            };
78        }
79    }
80}
81
82impl Sink<Vec<u8>> for SocketConnection {
83    type Error = zx::Status;
84
85    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        let _ = Sink::poll_flush(self.as_mut(), cx)?;
87
88        if self.send_buffer.len() >= SocketConnection::MAX_QUEUED_PACKETS {
89            return Poll::Pending;
90        }
91        Poll::Ready(Ok(()))
92    }
93
94    fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
95        self.get_mut().send_buffer.push_back(item);
96        Ok(())
97    }
98
99    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100        let this = self.get_mut();
101        use futures::io::AsyncWrite;
102        while let Some(item) = this.send_buffer.front() {
103            let res =
104                Pin::new(&mut this.socket).poll_write(cx, item).map_err(|e| e.kind().to_status());
105            match res {
106                Poll::Ready(Ok(size)) => {
107                    if size == item.len() {
108                        let _ = this.send_buffer.pop_front();
109                    } else {
110                        error!(
111                            "Partial write in SocketConnection::Sink::poll_flush: wrote {} bytes of {} byte packet.",
112                            size,
113                            item.len()
114                        );
115                        let item = this.send_buffer.front_mut().unwrap();
116                        *item = item.split_off(size);
117                    }
118                }
119                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
120                Poll::Pending => return Poll::Pending,
121            }
122        }
123        Pin::new(&mut this.socket).poll_flush(cx).map_err(|e| e.kind().to_status())
124    }
125
126    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127        ready!(Sink::poll_flush(self.as_mut(), cx))?;
128        let this = self.get_mut();
129        use futures::io::AsyncWrite as _;
130        Pin::new(&mut this.socket).poll_close(cx).map_err(|e| e.kind().to_status())
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::types::{A2dpDirection, Channel, ChannelMode};
138    use fidl::endpoints::create_request_stream;
139    use fidl_fuchsia_bluetooth as fidl_bt;
140    use fidl_fuchsia_bluetooth_bredr as bredr;
141    use futures::stream::FusedStream;
142    use futures::{SinkExt, StreamExt};
143    use std::pin::pin;
144
145    #[test]
146    fn channel_sync_write() {
147        let mut exec = fasync::TestExecutor::new();
148        let (mut recv, send) = Channel::create();
149
150        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
151        let size = send.write(heart).expect("write to succeed");
152        assert_eq!(size, heart.len());
153
154        let mut recv_fut = recv.next();
155        match exec.run_until_stalled(&mut recv_fut) {
156            Poll::Ready(Some(Ok(bytes))) => {
157                assert_eq!(heart, &bytes);
158            }
159            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
160        };
161    }
162
163    #[test]
164    fn channel_from_fidl() {
165        let _exec = fasync::TestExecutor::new();
166        let empty = bredr::Channel::default();
167        assert!(Channel::try_from(empty).is_err());
168
169        let (remote, _local) = zx::Socket::create_datagram();
170
171        let valid_fidl_channel = bredr::Channel {
172            socket: Some(remote),
173            channel_mode: Some(fidl_bt::ChannelMode::Basic),
174            max_tx_sdu_size: Some(1004),
175            ..Default::default()
176        };
177
178        let chan = Channel::try_from(valid_fidl_channel).expect("okay channel to be converted");
179
180        assert_eq!(1004, chan.max_tx_size());
181        assert_eq!(&ChannelMode::Basic, chan.channel_mode());
182    }
183
184    #[test]
185    fn channel_closed() {
186        let mut exec = fasync::TestExecutor::new();
187
188        let (recv, send) = Channel::create();
189
190        let closed_fut = recv.closed();
191        let mut closed_fut = pin!(closed_fut);
192
193        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
194        assert!(!recv.is_closed());
195
196        drop(send);
197
198        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
199        assert!(recv.is_closed());
200    }
201
202    #[test]
203    fn direction_ext() {
204        let mut exec = fasync::TestExecutor::new();
205
206        let (remote, _local) = zx::Socket::create_datagram();
207        let fidl_channel_no_ext = bredr::Channel {
208            socket: Some(remote),
209            channel_mode: Some(fidl_bt::ChannelMode::Basic),
210            max_tx_sdu_size: Some(1004),
211            ..Default::default()
212        };
213        let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
214
215        assert!(
216            exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
217        );
218        assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
219
220        let (remote, _local) = zx::Socket::create_datagram();
221        let (client_end, mut direction_request_stream) =
222            create_request_stream::<bredr::AudioDirectionExtMarker>();
223        let fidl_channel_with_ext = bredr::Channel {
224            socket: Some(remote),
225            channel_mode: Some(fidl_bt::ChannelMode::Basic),
226            max_tx_sdu_size: Some(1004),
227            ext_direction: Some(client_end),
228            ..Default::default()
229        };
230
231        let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
232
233        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
234        let mut audio_direction_fut = pin!(audio_direction_fut);
235
236        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
237
238        match exec.run_until_stalled(&mut direction_request_stream.next()) {
239            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
240                priority,
241                responder,
242            }))) => {
243                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
244                responder.send(Ok(())).expect("response to send cleanly");
245            }
246            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
247        };
248
249        match exec.run_until_stalled(&mut audio_direction_fut) {
250            Poll::Ready(Ok(())) => {}
251            _x => panic!("Expected ok result from audio direction response"),
252        };
253
254        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
255        let mut audio_direction_fut = pin!(audio_direction_fut);
256
257        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
258
259        match exec.run_until_stalled(&mut direction_request_stream.next()) {
260            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
261                priority,
262                responder,
263            }))) => {
264                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
265                responder
266                    .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
267                    .expect("response to send cleanly");
268            }
269            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
270        };
271
272        match exec.run_until_stalled(&mut audio_direction_fut) {
273            Poll::Ready(Err(_)) => {}
274            _x => panic!("Expected error result from audio direction response"),
275        };
276    }
277
278    #[test]
279    fn flush_timeout() {
280        let mut exec = fasync::TestExecutor::new();
281
282        let (remote, _local) = zx::Socket::create_datagram();
283        let fidl_channel_no_ext = bredr::Channel {
284            socket: Some(remote),
285            channel_mode: Some(fidl_bt::ChannelMode::Basic),
286            max_tx_sdu_size: Some(1004),
287            flush_timeout: Some(50_000_000), // 50 milliseconds
288            ..Default::default()
289        };
290        let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
291
292        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
293
294        // Within 2 milliseconds, doesn't change.
295        let res = exec.run_singlethreaded(
296            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
297        );
298        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
299        let res = exec.run_singlethreaded(
300            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
301        );
302        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
303
304        assert!(
305            exec.run_singlethreaded(
306                channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
307            )
308            .is_err()
309        );
310        assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
311
312        let (remote, _local) = zx::Socket::create_datagram();
313        let (client_end, mut l2cap_request_stream) =
314            create_request_stream::<bredr::L2capParametersExtMarker>();
315        let fidl_channel_with_ext = bredr::Channel {
316            socket: Some(remote),
317            channel_mode: Some(fidl_bt::ChannelMode::Basic),
318            max_tx_sdu_size: Some(1004),
319            flush_timeout: None,
320            ext_l2cap: Some(client_end),
321            ..Default::default()
322        };
323
324        let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
325
326        {
327            let flush_timeout_fut = channel.set_flush_timeout(None);
328            let mut flush_timeout_fut = pin!(flush_timeout_fut);
329
330            // Requesting no change returns right away with no change.
331            match exec.run_until_stalled(&mut flush_timeout_fut) {
332                Poll::Ready(Ok(None)) => {}
333                x => panic!("Expected no flush timeout to not stall, got {:?}", x),
334            }
335        }
336
337        let req_duration = zx::MonotonicDuration::from_millis(42);
338
339        {
340            let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
341            let mut flush_timeout_fut = pin!(flush_timeout_fut);
342
343            assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
344
345            match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
346                Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
347                    request,
348                    responder,
349                }))) => {
350                    assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
351                    // Send a different response
352                    let params = fidl_bt::ChannelParameters {
353                        flush_timeout: Some(50_000_000), // 50ms
354                        ..Default::default()
355                    };
356                    responder.send(&params).expect("response to send cleanly");
357                }
358                x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
359            };
360
361            match exec.run_until_stalled(&mut flush_timeout_fut) {
362                Poll::Ready(Ok(Some(duration))) => {
363                    assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
364                }
365                x => panic!("Expected ready result from params response, got {:?}", x),
366            };
367        }
368
369        // Channel should have recorded the new flush timeout.
370        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
371    }
372
373    #[test]
374    fn audio_offload() {
375        let _exec = fasync::TestExecutor::new();
376
377        let (remote, _local) = zx::Socket::create_datagram();
378        let fidl_channel_no_ext = bredr::Channel {
379            socket: Some(remote),
380            channel_mode: Some(fidl_bt::ChannelMode::Basic),
381            max_tx_sdu_size: Some(1004),
382            ..Default::default()
383        };
384        let channel = Channel::try_from(fidl_channel_no_ext).unwrap();
385
386        assert!(channel.audio_offload().is_none());
387
388        let (remote, _local) = zx::Socket::create_datagram();
389        let (client_end, mut _audio_offload_ext_req_stream) =
390            create_request_stream::<bredr::AudioOffloadExtMarker>();
391        let fidl_channel_with_ext = bredr::Channel {
392            socket: Some(remote),
393            channel_mode: Some(fidl_bt::ChannelMode::Basic),
394            max_tx_sdu_size: Some(1004),
395            ext_audio_offload: Some(client_end),
396            ..Default::default()
397        };
398
399        let channel = Channel::try_from(fidl_channel_with_ext).unwrap();
400
401        let offload_ext = channel.audio_offload();
402        assert!(offload_ext.is_some());
403        // We can get the audio offload multiple times without dropping
404        assert!(channel.audio_offload().is_some());
405        // And with dropping
406        drop(offload_ext);
407        assert!(channel.audio_offload().is_some());
408    }
409
410    #[test]
411    fn channel_sink() {
412        let mut exec = fasync::TestExecutor::new();
413        let (mut recv, mut send) = Channel::create();
414
415        let data = vec![0x01, 0x02, 0x03, 0x04];
416        let mut send_fut = send.send(data.clone());
417
418        // The send should complete immediately as the socket has space.
419        match exec.run_until_stalled(&mut send_fut) {
420            Poll::Ready(Ok(())) => {}
421            x => panic!("Expected Ready(Ok(())), got {:?}", x),
422        }
423
424        let mut recv_fut = recv.next();
425        match exec.run_until_stalled(&mut recv_fut) {
426            Poll::Ready(Some(Ok(bytes))) => assert_eq!(data, bytes),
427            x => panic!("Expected successful read, got {x:?}"),
428        }
429    }
430
431    #[test]
432    fn channel_stream() {
433        let mut exec = fasync::TestExecutor::new();
434        let (remote, local) = zx::Socket::create_datagram();
435        let mut recv = Channel::from_socket(remote, Channel::DEFAULT_MAX_TX).unwrap();
436        let send = local;
437
438        let mut stream_fut = recv.next();
439
440        assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
441
442        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
443        let _ = send.write(heart).expect("should write successfully");
444
445        match exec.run_until_stalled(&mut stream_fut) {
446            Poll::Ready(Some(Ok(bytes))) => {
447                assert_eq!(heart.to_vec(), bytes);
448            }
449            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
450        }
451
452        // After the sender is dropped, the stream should terminate.
453        drop(send);
454
455        let mut stream_fut = recv.next();
456        match exec.run_until_stalled(&mut stream_fut) {
457            Poll::Ready(None) => {}
458            x => panic!("Expected None from the stream after close, got {x:?}"),
459        }
460
461        // It should continue to report terminated.
462        assert!(recv.is_terminated());
463    }
464}