1use fidl::client::QueryResponseFut;
32use fidl::endpoints::create_request_stream;
33use fuchsia_bluetooth::types::{Channel, PeerId};
34use futures::stream::{FusedStream, Stream, StreamExt};
35use futures::task::{Context, Poll, Waker};
36use futures::FutureExt;
37use log::trace;
38use std::pin::Pin;
39use {fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr};
40
41mod error;
43
44pub use crate::error::Error;
45
46pub type Result<T> = std::result::Result<T, Error>;
47
48#[derive(Debug)]
49pub enum ProfileEvent {
50 PeerConnected { id: PeerId, protocol: Vec<bredr::ProtocolDescriptor>, channel: Channel },
52 SearchResult {
54 id: PeerId,
55 protocol: Option<Vec<bredr::ProtocolDescriptor>>,
56 attributes: Vec<bredr::Attribute>,
57 },
58}
59
60impl ProfileEvent {
61 pub fn peer_id(&self) -> PeerId {
62 match self {
63 Self::PeerConnected { id, .. } => *id,
64 Self::SearchResult { id, .. } => *id,
65 }
66 }
67}
68
69impl TryFrom<bredr::SearchResultsRequest> for ProfileEvent {
70 type Error = Error;
71 fn try_from(value: bredr::SearchResultsRequest) -> Result<Self> {
72 let bredr::SearchResultsRequest::ServiceFound { peer_id, protocol, attributes, responder } =
73 value
74 else {
75 return Err(Error::search_result(fidl::Error::Invalid));
76 };
77 let id: PeerId = peer_id.into();
78 responder.send()?;
79 trace!(id:%, protocol:?, attributes:?; "Profile Search Result");
80 Ok(ProfileEvent::SearchResult { id, protocol, attributes })
81 }
82}
83
84impl TryFrom<bredr::ConnectionReceiverRequest> for ProfileEvent {
85 type Error = Error;
86 fn try_from(value: bredr::ConnectionReceiverRequest) -> Result<Self> {
87 let bredr::ConnectionReceiverRequest::Connected { peer_id, channel, protocol, .. } = value
88 else {
89 return Err(Error::connection_receiver(fidl::Error::Invalid));
90 };
91 let id = peer_id.into();
92 let channel = channel.try_into().map_err(Error::connection_receiver)?;
93 trace!(id:%, protocol:?; "Incoming connection");
94 Ok(ProfileEvent::PeerConnected { id, channel, protocol })
95 }
96}
97
98pub struct ProfileClient {
112 proxy: bredr::ProfileProxy,
114 advertisement: Option<QueryResponseFut<bredr::ProfileAdvertiseResult>>,
116 connection_receiver: Option<bredr::ConnectionReceiverRequestStream>,
117 searches: Vec<bredr::SearchResultsRequestStream>,
119 stream_waker: Option<Waker>,
121 terminated: bool,
123}
124
125impl ProfileClient {
126 pub fn new(proxy: bredr::ProfileProxy) -> Self {
128 Self {
129 proxy,
130 advertisement: None,
131 connection_receiver: None,
132 searches: Vec::new(),
133 stream_waker: None,
134 terminated: false,
135 }
136 }
137
138 pub fn advertise(
141 proxy: bredr::ProfileProxy,
142 services: Vec<bredr::ServiceDefinition>,
143 channel_params: fidl_bt::ChannelParameters,
144 ) -> Result<Self> {
145 if services.is_empty() {
146 return Ok(Self::new(proxy));
147 }
148 let (connect_client, connection_receiver) = create_request_stream();
149 let advertisement = proxy
150 .advertise(bredr::ProfileAdvertiseRequest {
151 services: Some(services),
152 parameters: Some(channel_params),
153 receiver: Some(connect_client),
154 ..Default::default()
155 })
156 .check()?;
157 Ok(Self {
158 advertisement: Some(advertisement),
159 connection_receiver: Some(connection_receiver),
160 ..Self::new(proxy)
161 })
162 }
163
164 pub fn add_search(
165 &mut self,
166 service_uuid: bredr::ServiceClassProfileIdentifier,
167 attributes: Option<Vec<u16>>,
168 ) -> Result<()> {
169 if self.terminated {
170 return Err(Error::AlreadyTerminated);
171 }
172
173 let (results_client, results_stream) = create_request_stream();
174 self.proxy.search(bredr::ProfileSearchRequest {
175 service_uuid: Some(service_uuid),
176 attr_ids: attributes,
177 results: Some(results_client),
178 ..Default::default()
179 })?;
180 self.searches.push(results_stream);
181
182 if let Some(waker) = self.stream_waker.take() {
183 waker.wake();
184 }
185 Ok(())
186 }
187
188 }
191
192impl FusedStream for ProfileClient {
193 fn is_terminated(&self) -> bool {
194 self.terminated
195 }
196}
197
198impl Stream for ProfileClient {
199 type Item = Result<ProfileEvent>;
200
201 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
202 if self.terminated {
203 panic!("Profile polled after terminated");
204 }
205
206 if let Some(advertisement) = self.advertisement.as_mut() {
207 if let Poll::Ready(_result) = advertisement.poll_unpin(cx) {
208 self.advertisement = None;
211 };
212 }
213
214 if let Some(receiver) = self.connection_receiver.as_mut() {
215 if let Poll::Ready(item) = receiver.poll_next_unpin(cx) {
216 match item {
217 Some(Ok(request)) => return Poll::Ready(Some(request.try_into())),
218 Some(Err(e)) => return Poll::Ready(Some(Err(Error::connection_receiver(e)))),
219 None => {
220 self.terminated = true;
221 return Poll::Ready(None);
222 }
223 };
224 };
225 }
226
227 for search in &mut self.searches {
228 if let Poll::Ready(item) = search.poll_next_unpin(cx) {
229 match item {
230 Some(Ok(request)) => return Poll::Ready(Some(request.try_into())),
231 Some(Err(e)) => return Poll::Ready(Some(Err(Error::search_result(e)))),
232 None => {
233 self.terminated = true;
234 return Poll::Ready(None);
235 }
236 }
237 }
238 }
239
240 self.stream_waker = Some(cx.waker().clone());
242 Poll::Pending
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use fidl::endpoints::create_proxy_and_stream;
250 use fuchsia_async as fasync;
251 use fuchsia_bluetooth::types::Uuid;
252 use futures::Future;
253 use futures_test::task::new_count_waker;
254 use std::pin::pin;
255
256 fn make_profile_service_definition(service_uuid: Uuid) -> bredr::ServiceDefinition {
257 bredr::ServiceDefinition {
258 service_class_uuids: Some(vec![service_uuid.into()]),
259 protocol_descriptor_list: Some(vec![
260 bredr::ProtocolDescriptor {
261 protocol: Some(bredr::ProtocolIdentifier::L2Cap),
262 params: Some(vec![bredr::DataElement::Uint16(bredr::PSM_AVDTP)]),
263 ..Default::default()
264 },
265 bredr::ProtocolDescriptor {
266 protocol: Some(bredr::ProtocolIdentifier::Avdtp),
267 params: Some(vec![bredr::DataElement::Uint16(0x0103)]), ..Default::default()
269 },
270 ]),
271 profile_descriptors: Some(vec![bredr::ProfileDescriptor {
272 profile_id: Some(bredr::ServiceClassProfileIdentifier::AdvancedAudioDistribution),
273 major_version: Some(1),
274 minor_version: Some(2),
275 ..Default::default()
276 }]),
277 ..Default::default()
278 }
279 }
280
281 #[test]
282 fn service_advertisement_result_is_no_op() {
283 let mut exec = fasync::TestExecutor::new();
284 let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
285
286 let source_uuid =
287 Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSource.into_primitive());
288 let defs = vec![make_profile_service_definition(source_uuid)];
289 let channel_params = fidl_bt::ChannelParameters {
290 channel_mode: Some(fidl_bt::ChannelMode::Basic),
291 ..Default::default()
292 };
293
294 let mut profile = ProfileClient::advertise(proxy, defs.clone(), channel_params.clone())
295 .expect("Advertise succeeds");
296
297 let (_connect_proxy, adv_responder) = expect_advertisement_registration(
298 &mut exec,
299 &mut profile_stream,
300 defs,
301 Some(channel_params.into()),
302 );
303
304 {
305 let event_fut = profile.next();
306 let mut event_fut = pin!(event_fut);
307 assert!(exec.run_until_stalled(&mut event_fut).is_pending());
308
309 adv_responder
312 .send(Ok(&bredr::ProfileAdvertiseResponse::default()))
313 .expect("able to respond");
314
315 match exec.run_until_stalled(&mut event_fut) {
316 Poll::Pending => {}
317 x => panic!("Expected pending but got {x:?}"),
318 };
319 }
320
321 assert!(!profile.is_terminated());
322 }
323
324 #[test]
325 fn connection_request_relayed_to_stream() {
326 let mut exec = fasync::TestExecutor::new();
327 let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
328
329 let source_uuid =
330 Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSource.into_primitive());
331 let defs = vec![make_profile_service_definition(source_uuid)];
332 let channel_params = fidl_bt::ChannelParameters {
333 channel_mode: Some(fidl_bt::ChannelMode::Basic),
334 ..Default::default()
335 };
336
337 let mut profile = ProfileClient::advertise(proxy, defs.clone(), channel_params.clone())
338 .expect("Advertise succeeds");
339
340 let (connect_proxy, _adv_responder) = expect_advertisement_registration(
341 &mut exec,
342 &mut profile_stream,
343 defs,
344 Some(channel_params.into()),
345 );
346
347 let remote_peer = PeerId(12343);
348 {
349 let event_fut = profile.next();
350 let mut event_fut = pin!(event_fut);
351 assert!(exec.run_until_stalled(&mut event_fut).is_pending());
352
353 let (_local, remote) = Channel::create();
354 connect_proxy
355 .connected(&remote_peer.into(), bredr::Channel::try_from(remote).unwrap(), &[])
356 .expect("connection should work");
357
358 match exec.run_until_stalled(&mut event_fut) {
359 Poll::Ready(Some(Ok(ProfileEvent::PeerConnected { id, .. }))) => {
360 assert_eq!(id, remote_peer);
361 }
362 x => panic!("Expected an error from the advertisement, got {:?}", x),
363 };
364 }
365
366 drop(connect_proxy);
368
369 match exec.run_until_stalled(&mut profile.next()) {
370 Poll::Ready(None) => {}
371 x => panic!("Expected profile to end on advertisement drop, got {:?}", x),
372 };
373
374 assert!(profile.is_terminated());
375 }
376
377 #[track_caller]
378 fn expect_advertisement_registration(
379 exec: &mut fasync::TestExecutor,
380 profile_stream: &mut bredr::ProfileRequestStream,
381 expected_defs: Vec<bredr::ServiceDefinition>,
382 expected_params: Option<fidl_bt::ChannelParameters>,
383 ) -> (bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder) {
384 match exec.run_until_stalled(&mut profile_stream.next()) {
385 Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { payload, responder }))) => {
386 assert!(payload.services.is_some());
387 assert_eq!(payload.services.unwrap(), expected_defs);
388 assert_eq!(payload.parameters, expected_params);
389 assert!(payload.receiver.is_some());
390 (payload.receiver.unwrap().into_proxy(), responder)
391 }
392 x => panic!("Expected ready advertisement request, got {:?}", x),
393 }
394 }
395
396 #[track_caller]
397 fn expect_search_registration(
398 exec: &mut fasync::TestExecutor,
399 profile_stream: &mut bredr::ProfileRequestStream,
400 search_uuid: bredr::ServiceClassProfileIdentifier,
401 search_attrs: &[u16],
402 ) -> bredr::SearchResultsProxy {
403 match exec.run_until_stalled(&mut profile_stream.next()) {
404 Poll::Ready(Some(Ok(bredr::ProfileRequest::Search { payload, .. }))) => {
405 let bredr::ProfileSearchRequest {
406 service_uuid: Some(service_uuid),
407 attr_ids,
408 results: Some(results),
409 ..
410 } = payload
411 else {
412 panic!("invalid parameters");
413 };
414 let attr_ids = attr_ids.unwrap_or_default();
415 assert_eq!(&attr_ids[..], search_attrs);
416 assert_eq!(service_uuid, search_uuid);
417 results.into_proxy()
418 }
419 x => panic!("Expected ready request for a search, got: {:?}", x),
420 }
421 }
422
423 #[test]
424 fn responds_to_search_results() {
425 let mut exec = fasync::TestExecutor::new();
426 let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
427
428 let mut profile = ProfileClient::new(proxy);
429
430 let search_attrs = vec![bredr::ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST];
431
432 let source_uuid = bredr::ServiceClassProfileIdentifier::AudioSource;
433 profile
434 .add_search(source_uuid, Some(search_attrs.clone()))
435 .expect("adding search succeeds");
436
437 let sink_uuid = bredr::ServiceClassProfileIdentifier::AudioSink;
438 profile.add_search(sink_uuid, Some(search_attrs.clone())).expect("adding search succeeds");
439
440 let source_results_proxy = expect_search_registration(
442 &mut exec,
443 &mut profile_stream,
444 source_uuid,
445 &search_attrs[..],
446 );
447 let sink_results_proxy = expect_search_registration(
448 &mut exec,
449 &mut profile_stream,
450 sink_uuid,
451 &search_attrs[..],
452 );
453
454 let attributes = &[];
458 let found_peer_id = PeerId(1);
459 let results_fut =
460 source_results_proxy.service_found(&found_peer_id.into(), None, attributes);
461 let mut results_fut = pin!(results_fut);
462
463 match exec.run_until_stalled(&mut profile.next()) {
464 Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
465 assert_eq!(found_peer_id, id);
466 }
467 x => panic!("Expected search result to be ready: {:?}", x),
468 }
469
470 match exec.run_until_stalled(&mut results_fut) {
471 Poll::Ready(Ok(())) => {}
472 x => panic!("Expected a response from the source result, got {:?}", x),
473 };
474
475 let results_fut = sink_results_proxy.service_found(&found_peer_id.into(), None, attributes);
476 let mut results_fut = pin!(results_fut);
477
478 match exec.run_until_stalled(&mut profile.next()) {
479 Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
480 assert_eq!(found_peer_id, id);
481 }
482 x => panic!("Expected search result to be ready: {:?}", x),
483 }
484
485 match exec.run_until_stalled(&mut results_fut) {
486 Poll::Ready(Ok(())) => {}
487 x => panic!("Expected a response from the sink result, got {:?}", x),
488 };
489
490 drop(source_results_proxy);
492
493 match exec.run_until_stalled(&mut profile.next()) {
494 Poll::Ready(None) => {}
495 x => panic!("Expected profile to end on search result drop, got {:?}", x),
496 };
497
498 assert!(profile.is_terminated());
499
500 assert!(profile.add_search(sink_uuid, None).is_err());
502 }
503
504 #[test]
505 fn waker_gets_awoken_when_search_added() {
506 let mut exec = fasync::TestExecutor::new();
507 let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
508
509 let mut profile = ProfileClient::new(proxy);
510
511 let profile_fut = profile.next();
514
515 let (waker, profile_fut_wake_count) = new_count_waker();
516 let mut counting_ctx = Context::from_waker(&waker);
517
518 let profile_fut = pin!(profile_fut);
519 assert!(profile_fut.poll(&mut counting_ctx).is_pending());
520
521 let initial_count = profile_fut_wake_count.get();
524
525 let source_uuid = bredr::ServiceClassProfileIdentifier::AudioSource;
528 profile.add_search(source_uuid, None).expect("adding search succeeds");
529 let search_proxy =
530 expect_search_registration(&mut exec, &mut profile_stream, source_uuid, &[]);
531
532 let after_search_count = profile_fut_wake_count.get();
534 assert_eq!(after_search_count, initial_count + 1);
535
536 let attributes = &[];
538 let found_peer_id = PeerId(123);
539 let results_fut = search_proxy.service_found(&found_peer_id.into(), None, attributes);
540 let mut results_fut = pin!(results_fut);
541
542 match exec.run_until_stalled(&mut profile.next()) {
543 Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
544 assert_eq!(found_peer_id, id);
545 }
546 x => panic!("Expected search result to be ready: {:?}", x),
547 }
548
549 match exec.run_until_stalled(&mut results_fut) {
550 Poll::Ready(Ok(())) => {}
551 x => panic!("Expected a response from the source result, got {:?}", x),
552 };
553 }
554}