1use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9
10#[derive(Clone)]
13pub struct Control {
14 proxy: Option<a2dp::ControllerProxy>,
15}
16
17pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
18
19impl Control {
20 pub fn connect() -> Self {
21 let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
22 Self { proxy }
23 }
24
25 #[cfg(all(test, feature = "test_a2dp_controller"))]
26 fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
27 Self { proxy: Some(proxy) }
28 }
29
30 pub fn pause(
31 &self,
32 peer_id: Option<PeerId>,
33 ) -> impl Future<Output = Result<PauseToken, fidl::Error>> {
34 let proxy = match self.proxy.as_ref() {
35 None => return futures::future::ok(None).left_future(),
36 Some(proxy) => proxy,
37 };
38
39 let res = (|| {
40 let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
41 let id = peer_id.map(Into::into);
42 Ok((suspender_proxy, proxy.suspend(id.as_ref(), server_end)))
43 })();
44
45 async move {
46 let (suspender_proxy, suspend_fut) = res?;
47 match suspender_proxy.take_event_stream().next().await {
48 Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
49 x => {
50 warn!("Failed to suspend A2DP: {:?}", x);
51 match suspend_fut.await {
54 Err(fidl::Error::ClientChannelClosed { status, .. })
55 if status == zx::Status::NOT_FOUND =>
56 {
57 Ok(None)
58 }
59 Err(e) => Err(e),
60 Ok(()) => Err(fidl::Error::OutOfRange),
61 }
62 }
63 }
64 }
65 .right_future()
66 }
67}
68
69#[cfg(all(test, feature = "test_a2dp_controller"))]
70mod tests {
71 use super::*;
72
73 use async_utils::PollExt;
74 use fidl::endpoints::RequestStream;
75 use fuchsia_async as fasync;
76 use futures::task::Poll;
77 use std::pin::pin;
78
79 #[fuchsia::test]
80 fn when_a2dp_not_accessible() {
81 let mut exec = fasync::TestExecutor::new();
82 let control = Control::connect();
83
84 let pause_fut = control.pause(None);
85 let mut pause_fut = pin!(pause_fut);
86
87 let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
88
89 let pause_single_fut = control.pause(Some(PeerId(1)));
90 let mut pause_single_fut = pin!(pause_single_fut);
91
92 let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
93 }
94
95 fn expect_suspend_request(
96 exec: &mut fasync::TestExecutor,
97 requests: &mut a2dp::ControllerRequestStream,
98 expected_peer: Option<PeerId>,
99 ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
100 match exec.run_until_stalled(&mut requests.next()) {
101 Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
102 responder,
103 token,
104 peer_id,
105 }))) => {
106 assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
107 (responder, token.into_stream())
108 }
109 x => panic!("Expected a ready controller suspend, got {:?}", x),
110 }
111 }
112
113 fn expect_suspender_close(
114 exec: &mut fasync::TestExecutor,
115 requests: &mut a2dp::StreamSuspenderRequestStream,
116 ) {
117 match exec.run_until_stalled(&mut requests.next()) {
118 Poll::Ready(None) => {}
119 x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
120 }
121 }
122
123 #[fuchsia::test]
124 fn suspend_and_release() {
125 let mut exec = fasync::TestExecutor::new();
126 let (proxy, mut control_requests) =
127 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
128 let control = Control::from_proxy(proxy);
129
130 let pause_fut = control.pause(Some(PeerId(1)));
131 let mut pause_fut = pin!(pause_fut);
132
133 let (responder_one, mut stream1) =
134 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
135
136 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
137
138 let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
139
140 let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
141
142 let pause_fut = control.pause(None);
144 let mut pause_fut = pin!(pause_fut);
145 let (responder_two, mut stream2) =
146 expect_suspend_request(&mut exec, &mut control_requests, None);
147 stream2.control_handle().send_on_suspended().expect("should send on suspended event");
148 let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
149
150 drop(token);
151
152 expect_suspender_close(&mut exec, &mut stream1);
153 let _ = responder_one.send().unwrap();
154
155 drop(token2);
156
157 expect_suspender_close(&mut exec, &mut stream2);
158 let _ = responder_two.send().unwrap();
159 }
160
161 #[fuchsia::test]
162 fn suspend_fails() {
163 let mut exec = fasync::TestExecutor::new();
164 let (proxy, mut control_requests) =
165 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
166 let control = Control::from_proxy(proxy);
167
168 let pause_fut = control.pause(Some(PeerId(1)));
169 let mut pause_fut = pin!(pause_fut);
170
171 let (responder, stream1) =
172 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
173
174 drop(responder);
175 drop(control_requests);
176 drop(stream1);
177
178 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
179 }
180
181 #[fuchsia::test]
182 fn proxy_is_closed_before_suspend_event() {
183 let mut exec = fasync::TestExecutor::new();
184 let (proxy, mut control_requests) =
185 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
186 let control = Control::from_proxy(proxy);
187
188 let pause_fut = control.pause(Some(PeerId(1)));
189 let mut pause_fut = pin!(pause_fut);
190
191 let (responder, stream1) =
192 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
193
194 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
195
196 drop(stream1);
197 let _ = responder.send().expect("should send response okay");
198
199 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
200 }
201}