bt_hfp/
a2dp.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9
10/// A client for fuchsia.bluetooth.internal.a2dp.
11
12#[derive(Clone)]
13pub struct Control {
14    proxy: Option<a2dp::ControllerProxy>,
15}
16
17pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
18
19impl Control {
20    pub fn connect() -> Self {
21        let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
22        Self { proxy }
23    }
24
25    #[cfg(all(test, feature = "test_a2dp_controller"))]
26    fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
27        Self { proxy: Some(proxy) }
28    }
29
30    pub fn pause(
31        &self,
32        peer_id: Option<PeerId>,
33    ) -> impl Future<Output = Result<PauseToken, fidl::Error>> {
34        let proxy = match self.proxy.as_ref() {
35            None => return futures::future::ok(None).left_future(),
36            Some(proxy) => proxy,
37        };
38
39        let res = (|| {
40            let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
41            let id = peer_id.map(Into::into);
42            Ok((suspender_proxy, proxy.suspend(id.as_ref(), server_end)))
43        })();
44
45        async move {
46            let (suspender_proxy, suspend_fut) = res?;
47            match suspender_proxy.take_event_stream().next().await {
48                Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
49                x => {
50                    warn!("Failed to suspend A2DP: {:?}", x);
51                    // Check to see the result of the suspend future.  It should finish, and it
52                    // might have finished because we couldn't connect (delayed)
53                    match suspend_fut.await {
54                        Err(fidl::Error::ClientChannelClosed { status, .. })
55                            if status == zx::Status::NOT_FOUND =>
56                        {
57                            Ok(None)
58                        }
59                        Err(e) => Err(e),
60                        Ok(()) => Err(fidl::Error::OutOfRange),
61                    }
62                }
63            }
64        }
65        .right_future()
66    }
67}
68
69#[cfg(all(test, feature = "test_a2dp_controller"))]
70mod tests {
71    use super::*;
72
73    use async_utils::PollExt;
74    use fidl::endpoints::RequestStream;
75    use fuchsia_async as fasync;
76    use futures::task::Poll;
77    use std::pin::pin;
78
79    #[fuchsia::test]
80    fn when_a2dp_not_accessible() {
81        let mut exec = fasync::TestExecutor::new();
82        let control = Control::connect();
83
84        let pause_fut = control.pause(None);
85        let mut pause_fut = pin!(pause_fut);
86
87        let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
88
89        let pause_single_fut = control.pause(Some(PeerId(1)));
90        let mut pause_single_fut = pin!(pause_single_fut);
91
92        let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
93    }
94
95    fn expect_suspend_request(
96        exec: &mut fasync::TestExecutor,
97        requests: &mut a2dp::ControllerRequestStream,
98        expected_peer: Option<PeerId>,
99    ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
100        match exec.run_until_stalled(&mut requests.next()) {
101            Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
102                responder,
103                token,
104                peer_id,
105            }))) => {
106                assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
107                (responder, token.into_stream())
108            }
109            x => panic!("Expected a ready controller suspend, got {:?}", x),
110        }
111    }
112
113    fn expect_suspender_close(
114        exec: &mut fasync::TestExecutor,
115        requests: &mut a2dp::StreamSuspenderRequestStream,
116    ) {
117        match exec.run_until_stalled(&mut requests.next()) {
118            Poll::Ready(None) => {}
119            x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
120        }
121    }
122
123    #[fuchsia::test]
124    fn suspend_and_release() {
125        let mut exec = fasync::TestExecutor::new();
126        let (proxy, mut control_requests) =
127            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
128        let control = Control::from_proxy(proxy);
129
130        let pause_fut = control.pause(Some(PeerId(1)));
131        let mut pause_fut = pin!(pause_fut);
132
133        let (responder_one, mut stream1) =
134            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
135
136        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
137
138        let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
139
140        let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
141
142        // Should be able to have overlapping pauses.
143        let pause_fut = control.pause(None);
144        let mut pause_fut = pin!(pause_fut);
145        let (responder_two, mut stream2) =
146            expect_suspend_request(&mut exec, &mut control_requests, None);
147        stream2.control_handle().send_on_suspended().expect("should send on suspended event");
148        let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
149
150        drop(token);
151
152        expect_suspender_close(&mut exec, &mut stream1);
153        let _ = responder_one.send().unwrap();
154
155        drop(token2);
156
157        expect_suspender_close(&mut exec, &mut stream2);
158        let _ = responder_two.send().unwrap();
159    }
160
161    #[fuchsia::test]
162    fn suspend_fails() {
163        let mut exec = fasync::TestExecutor::new();
164        let (proxy, mut control_requests) =
165            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
166        let control = Control::from_proxy(proxy);
167
168        let pause_fut = control.pause(Some(PeerId(1)));
169        let mut pause_fut = pin!(pause_fut);
170
171        let (responder, stream1) =
172            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
173
174        drop(responder);
175        drop(control_requests);
176        drop(stream1);
177
178        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
179    }
180
181    #[fuchsia::test]
182    fn proxy_is_closed_before_suspend_event() {
183        let mut exec = fasync::TestExecutor::new();
184        let (proxy, mut control_requests) =
185            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
186        let control = Control::from_proxy(proxy);
187
188        let pause_fut = control.pause(Some(PeerId(1)));
189        let mut pause_fut = pin!(pause_fut);
190
191        let (responder, stream1) =
192            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
193
194        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
195
196        drop(stream1);
197        let _ = responder.send().expect("should send response okay");
198
199        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
200    }
201}