profile_client/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An interface for interacting with the `fuchsia.bluetooth.bredr.Profile` protocol.
6//! This interface provides convenience methods to register service searches and advertisements
7//! using the `Profile` protocol and includes a Stream implementation which can be polled to
8//! receive Profile API updates.
9//!
10//! ### Example Usage:
11//!
12//! // Connect to the `f.b.bredr.Profile` protocol.
13//! let profile_svc = fuchsia_component::client::connect_to_protocol::<ProfileMarker>()?;
14//!
15//! // Create a new `ProfileClient` by registering an advertisement. Register searches.
16//! let svc_defs = vec![..];
17//! let channel_params = ChannelParameters { .. };
18//! let mut profile_client = ProfileClient::advertise(profile_svc, &svc_defs, channel_params)?;
19//! profile_client.add_search(..)?;
20//! profile_client.add_search(..)?;
21//!
22//! // Listen for events from the ProfileClient stream implementation.
23//! while let Some(event) = profile_client.next().await? {
24//!     match event {
25//!         ProfileEvent::PeerConnected { .. } => {} // Do something
26//!         ProfileEvent::SearchResult { .. } => {} // Do something
27//!     }
28//! }
29//!
30
31use fidl::client::QueryResponseFut;
32use fidl::endpoints::create_request_stream;
33use fuchsia_bluetooth::types::{Channel, PeerId};
34use futures::stream::{FusedStream, Stream, StreamExt};
35use futures::task::{Context, Poll, Waker};
36use futures::FutureExt;
37use log::trace;
38use std::pin::Pin;
39use {fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr};
40
41/// Error type used by this library.
42mod error;
43
44pub use crate::error::Error;
45
46pub type Result<T> = std::result::Result<T, Error>;
47
48#[derive(Debug)]
49pub enum ProfileEvent {
50    /// A peer has connected.
51    PeerConnected { id: PeerId, protocol: Vec<bredr::ProtocolDescriptor>, channel: Channel },
52    /// A peer matched one of the search results that was started.
53    SearchResult {
54        id: PeerId,
55        protocol: Option<Vec<bredr::ProtocolDescriptor>>,
56        attributes: Vec<bredr::Attribute>,
57    },
58}
59
60impl ProfileEvent {
61    pub fn peer_id(&self) -> PeerId {
62        match self {
63            Self::PeerConnected { id, .. } => *id,
64            Self::SearchResult { id, .. } => *id,
65        }
66    }
67}
68
69impl TryFrom<bredr::SearchResultsRequest> for ProfileEvent {
70    type Error = Error;
71    fn try_from(value: bredr::SearchResultsRequest) -> Result<Self> {
72        let bredr::SearchResultsRequest::ServiceFound { peer_id, protocol, attributes, responder } =
73            value
74        else {
75            return Err(Error::search_result(fidl::Error::Invalid));
76        };
77        let id: PeerId = peer_id.into();
78        responder.send()?;
79        trace!(id:%, protocol:?, attributes:?; "Profile Search Result");
80        Ok(ProfileEvent::SearchResult { id, protocol, attributes })
81    }
82}
83
84impl TryFrom<bredr::ConnectionReceiverRequest> for ProfileEvent {
85    type Error = Error;
86    fn try_from(value: bredr::ConnectionReceiverRequest) -> Result<Self> {
87        let bredr::ConnectionReceiverRequest::Connected { peer_id, channel, protocol, .. } = value
88        else {
89            return Err(Error::connection_receiver(fidl::Error::Invalid));
90        };
91        let id = peer_id.into();
92        let channel = channel.try_into().map_err(Error::connection_receiver)?;
93        trace!(id:%, protocol:?; "Incoming connection");
94        Ok(ProfileEvent::PeerConnected { id, channel, protocol })
95    }
96}
97
98/// Provides an interface to interact with the `fuchsia.bluetooth.bredr.Profile` protocol.
99///
100/// Currently, this implementation supports a single advertisement and multiple searches.
101/// Search result events can be returned for any of the registered services. In the case of
102/// multiple registered searches, consider using the `profile::find_service_class`
103/// function in the `fuchsia_bluetooth` crate to identify the Service Class of the returned event.
104///
105/// The `ProfileClient` is typically used as a stream of ConnectionReceiver connection requests
106/// and SearchResults events. The stream is considered terminated if the advertisement (if set)
107/// has terminated, the ConnectionReceiver stream associated with the advertisement has terminated,
108/// or if _any_ of the registered searches have terminated.
109///
110/// For information about the Profile API, see the [FIDL Docs](//sdk/fidl/fuchsia.bluetooth.bredr/profile.fidl).
111pub struct ProfileClient {
112    /// The proxy that is used to start new searches and advertise.
113    proxy: bredr::ProfileProxy,
114    /// The result for the advertisement.
115    advertisement: Option<QueryResponseFut<bredr::ProfileAdvertiseResult>>,
116    connection_receiver: Option<bredr::ConnectionReceiverRequestStream>,
117    /// The registered results from the search streams. Polled in order.
118    searches: Vec<bredr::SearchResultsRequestStream>,
119    /// This waker will be woken if a new search is added.
120    stream_waker: Option<Waker>,
121    /// True once any of the searches, or the advertisement, have completed.
122    terminated: bool,
123}
124
125impl ProfileClient {
126    /// Create a new Profile that doesn't advertise any services.
127    pub fn new(proxy: bredr::ProfileProxy) -> Self {
128        Self {
129            proxy,
130            advertisement: None,
131            connection_receiver: None,
132            searches: Vec::new(),
133            stream_waker: None,
134            terminated: false,
135        }
136    }
137
138    /// Create a new Profile that advertises the services in `services`.
139    /// Incoming connections will request the `channel mode` provided.
140    pub fn advertise(
141        proxy: bredr::ProfileProxy,
142        services: Vec<bredr::ServiceDefinition>,
143        channel_params: fidl_bt::ChannelParameters,
144    ) -> Result<Self> {
145        if services.is_empty() {
146            return Ok(Self::new(proxy));
147        }
148        let (connect_client, connection_receiver) = create_request_stream();
149        let advertisement = proxy
150            .advertise(bredr::ProfileAdvertiseRequest {
151                services: Some(services),
152                parameters: Some(channel_params),
153                receiver: Some(connect_client),
154                ..Default::default()
155            })
156            .check()?;
157        Ok(Self {
158            advertisement: Some(advertisement),
159            connection_receiver: Some(connection_receiver),
160            ..Self::new(proxy)
161        })
162    }
163
164    pub fn add_search(
165        &mut self,
166        service_uuid: bredr::ServiceClassProfileIdentifier,
167        attributes: Option<Vec<u16>>,
168    ) -> Result<()> {
169        if self.terminated {
170            return Err(Error::AlreadyTerminated);
171        }
172
173        let (results_client, results_stream) = create_request_stream();
174        self.proxy.search(bredr::ProfileSearchRequest {
175            service_uuid: Some(service_uuid),
176            attr_ids: attributes,
177            results: Some(results_client),
178            ..Default::default()
179        })?;
180        self.searches.push(results_stream);
181
182        if let Some(waker) = self.stream_waker.take() {
183            waker.wake();
184        }
185        Ok(())
186    }
187
188    // TODO(https://fxbug.dev/333456020): Consider adding a shutdown method to revoke the active
189    // advertisement.
190}
191
192impl FusedStream for ProfileClient {
193    fn is_terminated(&self) -> bool {
194        self.terminated
195    }
196}
197
198impl Stream for ProfileClient {
199    type Item = Result<ProfileEvent>;
200
201    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
202        if self.terminated {
203            panic!("Profile polled after terminated");
204        }
205
206        if let Some(advertisement) = self.advertisement.as_mut() {
207            if let Poll::Ready(_result) = advertisement.poll_unpin(cx) {
208                // TODO(https://fxbug.dev/333456020): Consider returning to the client of the
209                // library. Not required by any profiles right now.
210                self.advertisement = None;
211            };
212        }
213
214        if let Some(receiver) = self.connection_receiver.as_mut() {
215            if let Poll::Ready(item) = receiver.poll_next_unpin(cx) {
216                match item {
217                    Some(Ok(request)) => return Poll::Ready(Some(request.try_into())),
218                    Some(Err(e)) => return Poll::Ready(Some(Err(Error::connection_receiver(e)))),
219                    None => {
220                        self.terminated = true;
221                        return Poll::Ready(None);
222                    }
223                };
224            };
225        }
226
227        for search in &mut self.searches {
228            if let Poll::Ready(item) = search.poll_next_unpin(cx) {
229                match item {
230                    Some(Ok(request)) => return Poll::Ready(Some(request.try_into())),
231                    Some(Err(e)) => return Poll::Ready(Some(Err(Error::search_result(e)))),
232                    None => {
233                        self.terminated = true;
234                        return Poll::Ready(None);
235                    }
236                }
237            }
238        }
239
240        // Return pending, store the waker to wake if a new poll target is added.
241        self.stream_waker = Some(cx.waker().clone());
242        Poll::Pending
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use fidl::endpoints::create_proxy_and_stream;
250    use fuchsia_async as fasync;
251    use fuchsia_bluetooth::types::Uuid;
252    use futures::Future;
253    use futures_test::task::new_count_waker;
254    use std::pin::pin;
255
256    fn make_profile_service_definition(service_uuid: Uuid) -> bredr::ServiceDefinition {
257        bredr::ServiceDefinition {
258            service_class_uuids: Some(vec![service_uuid.into()]),
259            protocol_descriptor_list: Some(vec![
260                bredr::ProtocolDescriptor {
261                    protocol: Some(bredr::ProtocolIdentifier::L2Cap),
262                    params: Some(vec![bredr::DataElement::Uint16(bredr::PSM_AVDTP)]),
263                    ..Default::default()
264                },
265                bredr::ProtocolDescriptor {
266                    protocol: Some(bredr::ProtocolIdentifier::Avdtp),
267                    params: Some(vec![bredr::DataElement::Uint16(0x0103)]), // Indicate v1.3
268                    ..Default::default()
269                },
270            ]),
271            profile_descriptors: Some(vec![bredr::ProfileDescriptor {
272                profile_id: Some(bredr::ServiceClassProfileIdentifier::AdvancedAudioDistribution),
273                major_version: Some(1),
274                minor_version: Some(2),
275                ..Default::default()
276            }]),
277            ..Default::default()
278        }
279    }
280
281    #[test]
282    fn service_advertisement_result_is_no_op() {
283        let mut exec = fasync::TestExecutor::new();
284        let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
285
286        let source_uuid =
287            Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSource.into_primitive());
288        let defs = vec![make_profile_service_definition(source_uuid)];
289        let channel_params = fidl_bt::ChannelParameters {
290            channel_mode: Some(fidl_bt::ChannelMode::Basic),
291            ..Default::default()
292        };
293
294        let mut profile = ProfileClient::advertise(proxy, defs.clone(), channel_params.clone())
295            .expect("Advertise succeeds");
296
297        let (_connect_proxy, adv_responder) = expect_advertisement_registration(
298            &mut exec,
299            &mut profile_stream,
300            defs,
301            Some(channel_params.into()),
302        );
303
304        {
305            let event_fut = profile.next();
306            let mut event_fut = pin!(event_fut);
307            assert!(exec.run_until_stalled(&mut event_fut).is_pending());
308
309            // The lifetime of the advertisement is not tied to the `Advertise` response. The
310            // `ProfileClient` stream should still be active.
311            adv_responder
312                .send(Ok(&bredr::ProfileAdvertiseResponse::default()))
313                .expect("able to respond");
314
315            match exec.run_until_stalled(&mut event_fut) {
316                Poll::Pending => {}
317                x => panic!("Expected pending but got {x:?}"),
318            };
319        }
320
321        assert!(!profile.is_terminated());
322    }
323
324    #[test]
325    fn connection_request_relayed_to_stream() {
326        let mut exec = fasync::TestExecutor::new();
327        let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
328
329        let source_uuid =
330            Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSource.into_primitive());
331        let defs = vec![make_profile_service_definition(source_uuid)];
332        let channel_params = fidl_bt::ChannelParameters {
333            channel_mode: Some(fidl_bt::ChannelMode::Basic),
334            ..Default::default()
335        };
336
337        let mut profile = ProfileClient::advertise(proxy, defs.clone(), channel_params.clone())
338            .expect("Advertise succeeds");
339
340        let (connect_proxy, _adv_responder) = expect_advertisement_registration(
341            &mut exec,
342            &mut profile_stream,
343            defs,
344            Some(channel_params.into()),
345        );
346
347        let remote_peer = PeerId(12343);
348        {
349            let event_fut = profile.next();
350            let mut event_fut = pin!(event_fut);
351            assert!(exec.run_until_stalled(&mut event_fut).is_pending());
352
353            let (_local, remote) = Channel::create();
354            connect_proxy
355                .connected(&remote_peer.into(), bredr::Channel::try_from(remote).unwrap(), &[])
356                .expect("connection should work");
357
358            match exec.run_until_stalled(&mut event_fut) {
359                Poll::Ready(Some(Ok(ProfileEvent::PeerConnected { id, .. }))) => {
360                    assert_eq!(id, remote_peer);
361                }
362                x => panic!("Expected an error from the advertisement, got {:?}", x),
363            };
364        }
365
366        // Stream should error and terminate when the advertisement is disconnected.
367        drop(connect_proxy);
368
369        match exec.run_until_stalled(&mut profile.next()) {
370            Poll::Ready(None) => {}
371            x => panic!("Expected profile to end on advertisement drop, got {:?}", x),
372        };
373
374        assert!(profile.is_terminated());
375    }
376
377    #[track_caller]
378    fn expect_advertisement_registration(
379        exec: &mut fasync::TestExecutor,
380        profile_stream: &mut bredr::ProfileRequestStream,
381        expected_defs: Vec<bredr::ServiceDefinition>,
382        expected_params: Option<fidl_bt::ChannelParameters>,
383    ) -> (bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder) {
384        match exec.run_until_stalled(&mut profile_stream.next()) {
385            Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { payload, responder }))) => {
386                assert!(payload.services.is_some());
387                assert_eq!(payload.services.unwrap(), expected_defs);
388                assert_eq!(payload.parameters, expected_params);
389                assert!(payload.receiver.is_some());
390                (payload.receiver.unwrap().into_proxy(), responder)
391            }
392            x => panic!("Expected ready advertisement request, got {:?}", x),
393        }
394    }
395
396    #[track_caller]
397    fn expect_search_registration(
398        exec: &mut fasync::TestExecutor,
399        profile_stream: &mut bredr::ProfileRequestStream,
400        search_uuid: bredr::ServiceClassProfileIdentifier,
401        search_attrs: &[u16],
402    ) -> bredr::SearchResultsProxy {
403        match exec.run_until_stalled(&mut profile_stream.next()) {
404            Poll::Ready(Some(Ok(bredr::ProfileRequest::Search { payload, .. }))) => {
405                let bredr::ProfileSearchRequest {
406                    service_uuid: Some(service_uuid),
407                    attr_ids,
408                    results: Some(results),
409                    ..
410                } = payload
411                else {
412                    panic!("invalid parameters");
413                };
414                let attr_ids = attr_ids.unwrap_or_default();
415                assert_eq!(&attr_ids[..], search_attrs);
416                assert_eq!(service_uuid, search_uuid);
417                results.into_proxy()
418            }
419            x => panic!("Expected ready request for a search, got: {:?}", x),
420        }
421    }
422
423    #[test]
424    fn responds_to_search_results() {
425        let mut exec = fasync::TestExecutor::new();
426        let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
427
428        let mut profile = ProfileClient::new(proxy);
429
430        let search_attrs = vec![bredr::ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST];
431
432        let source_uuid = bredr::ServiceClassProfileIdentifier::AudioSource;
433        profile
434            .add_search(source_uuid, Some(search_attrs.clone()))
435            .expect("adding search succeeds");
436
437        let sink_uuid = bredr::ServiceClassProfileIdentifier::AudioSink;
438        profile.add_search(sink_uuid, Some(search_attrs.clone())).expect("adding search succeeds");
439
440        // Get the search clients out
441        let source_results_proxy = expect_search_registration(
442            &mut exec,
443            &mut profile_stream,
444            source_uuid,
445            &search_attrs[..],
446        );
447        let sink_results_proxy = expect_search_registration(
448            &mut exec,
449            &mut profile_stream,
450            sink_uuid,
451            &search_attrs[..],
452        );
453
454        // Send a search request, process the request (by polling event stream) and confirm it responds.
455
456        // Report a search result, which should be replied to.
457        let attributes = &[];
458        let found_peer_id = PeerId(1);
459        let results_fut =
460            source_results_proxy.service_found(&found_peer_id.into(), None, attributes);
461        let mut results_fut = pin!(results_fut);
462
463        match exec.run_until_stalled(&mut profile.next()) {
464            Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
465                assert_eq!(found_peer_id, id);
466            }
467            x => panic!("Expected search result to be ready: {:?}", x),
468        }
469
470        match exec.run_until_stalled(&mut results_fut) {
471            Poll::Ready(Ok(())) => {}
472            x => panic!("Expected a response from the source result, got {:?}", x),
473        };
474
475        let results_fut = sink_results_proxy.service_found(&found_peer_id.into(), None, attributes);
476        let mut results_fut = pin!(results_fut);
477
478        match exec.run_until_stalled(&mut profile.next()) {
479            Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
480                assert_eq!(found_peer_id, id);
481            }
482            x => panic!("Expected search result to be ready: {:?}", x),
483        }
484
485        match exec.run_until_stalled(&mut results_fut) {
486            Poll::Ready(Ok(())) => {}
487            x => panic!("Expected a response from the sink result, got {:?}", x),
488        };
489
490        // Stream should error and terminate when one of the result streams is disconnected.
491        drop(source_results_proxy);
492
493        match exec.run_until_stalled(&mut profile.next()) {
494            Poll::Ready(None) => {}
495            x => panic!("Expected profile to end on search result drop, got {:?}", x),
496        };
497
498        assert!(profile.is_terminated());
499
500        // Adding a search after termination should fail.
501        assert!(profile.add_search(sink_uuid, None).is_err());
502    }
503
504    #[test]
505    fn waker_gets_awoken_when_search_added() {
506        let mut exec = fasync::TestExecutor::new();
507        let (proxy, mut profile_stream) = create_proxy_and_stream::<bredr::ProfileMarker>();
508
509        let mut profile = ProfileClient::new(proxy);
510
511        // Polling the ProfileClient stream before any poll targets have been added should save
512        // a waker to be awoken when a new search is added.
513        let profile_fut = profile.next();
514
515        let (waker, profile_fut_wake_count) = new_count_waker();
516        let mut counting_ctx = Context::from_waker(&waker);
517
518        let profile_fut = pin!(profile_fut);
519        assert!(profile_fut.poll(&mut counting_ctx).is_pending());
520
521        // Since there are no poll targets, save the initial count. We expect this count
522        // to change when a new poll target is added.
523        let initial_count = profile_fut_wake_count.get();
524
525        // Adding a search should be OK. We expect to get the search request and the
526        // waker should be awoken.
527        let source_uuid = bredr::ServiceClassProfileIdentifier::AudioSource;
528        profile.add_search(source_uuid, None).expect("adding search succeeds");
529        let search_proxy =
530            expect_search_registration(&mut exec, &mut profile_stream, source_uuid, &[]);
531
532        // Since we've added a search, we expect the wake count to increase by one.
533        let after_search_count = profile_fut_wake_count.get();
534        assert_eq!(after_search_count, initial_count + 1);
535
536        // Reporting a search result should work as intended. The stream should produce an event.
537        let attributes = &[];
538        let found_peer_id = PeerId(123);
539        let results_fut = search_proxy.service_found(&found_peer_id.into(), None, attributes);
540        let mut results_fut = pin!(results_fut);
541
542        match exec.run_until_stalled(&mut profile.next()) {
543            Poll::Ready(Some(Ok(ProfileEvent::SearchResult { id, .. }))) => {
544                assert_eq!(found_peer_id, id);
545            }
546            x => panic!("Expected search result to be ready: {:?}", x),
547        }
548
549        match exec.run_until_stalled(&mut results_fut) {
550            Poll::Ready(Ok(())) => {}
551            x => panic!("Expected a response from the source result, got {:?}", x),
552        };
553    }
554}