Skip to main content

bt_avctp/avctp/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use fuchsia_sync::Mutex;
7
8use futures::ready;
9use futures::stream::{FusedStream, Stream, StreamExt};
10use futures::task::{Context, Poll, Waker};
11use log::{info, trace, warn};
12use packet_encoding::{Decodable, Encodable};
13use slab::Slab;
14use std::collections::VecDeque;
15use std::mem;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(test)]
21mod tests;
22mod types;
23
24use crate::{Error, Result};
25
26use self::types::AV_REMOTE_PROFILE;
27
28pub use self::types::{Header, MessageType, PacketType, TxLabel};
29
30/// Peer connection to a remote device uses AVCTP protocol over an L2CAP socket. Used by the AVC
31/// peer that encapsulates this peer connection and directly in AVRCP for non AV\C connections like
32/// the browse channel.
33#[derive(Debug)]
34pub struct Peer {
35    inner: Arc<PeerInner>,
36}
37
38#[derive(Debug)]
39struct PeerInner {
40    /// Channel to the remote device owned by this peer object.
41    channel: Mutex<Channel>,
42
43    channel_closed: AtomicBool,
44
45    /// A map of transaction ids that have been sent but the response has not
46    /// been received and/or processed yet.
47    ///
48    /// Waiters are added with `add_response_waiter` and get removed when they are
49    /// polled or they are removed with `remove_waiter`
50    response_waiters: Mutex<Slab<ResponseWaiter>>,
51
52    /// A queue of requests that have been received and are waiting to
53    /// be responded to, along with the waker for the task that has
54    /// taken the request receiver (if it exists)
55    incoming_requests: Mutex<CommandQueue>,
56}
57
58impl Peer {
59    /// Create a new peer object from a established L2CAP socket with the peer.
60    pub fn new(channel: Channel) -> Self {
61        Self { inner: Arc::new(PeerInner::new(channel)) }
62    }
63
64    /// Returns a stream of incoming commands from a remote peer.
65    /// Stream returns Command objects on success that can be used to send back responses.
66    pub fn take_command_stream(&self) -> CommandStream {
67        {
68            let mut lock = self.inner.incoming_requests.lock();
69            if let CommandListener::None = lock.listener {
70                lock.listener = CommandListener::New;
71            } else {
72                panic!("Command stream has already been taken");
73            }
74        }
75
76        CommandStream { inner: self.inner.clone(), terminated: false }
77    }
78
79    /// Send an outgoing command to the remote peer. Returns a CommandResponseStream to
80    /// handle incoming response packets.
81    pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> {
82        let id = self.inner.add_response_waiter()?;
83        let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false);
84        {
85            self.inner.send_packet(&avctp_header, payload)?;
86        }
87
88        Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone()))
89    }
90}
91
92impl PeerInner {
93    fn new(channel: Channel) -> Self {
94        Self {
95            channel: Mutex::new(channel),
96            channel_closed: AtomicBool::new(false),
97            response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
98            incoming_requests: Mutex::<CommandQueue>::default(),
99        }
100    }
101
102    /// Add a response waiter, and return a id that can be used to send the
103    /// transaction.  Responses then can be received using poll_recv_response
104    fn add_response_waiter(&self) -> Result<TxLabel> {
105        let key = self.response_waiters.lock().insert(ResponseWaiter::default());
106        let id = TxLabel::try_from(key as u8);
107        if id.is_err() {
108            warn!("Transaction IDs are exhausted");
109            let _ = self.response_waiters.lock().remove(key);
110        }
111        id
112    }
113
114    /// When a waiter isn't interested in the response anymore, we need to just
115    /// throw it out.  This is called when the response future is dropped.
116    fn remove_response_interest(&self, id: &TxLabel) {
117        let mut lock = self.response_waiters.lock();
118        let idx = usize::from(id);
119        let _ = lock.remove(idx);
120    }
121
122    /// Attempts to receive a new request by processing all packets on the socket.
123    /// Resolves to an unprocessed request (header, body) if one was received.
124    /// Resolves to an error if there was an error reading from the socket or if the peer
125    /// disconnected.
126    fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
127        let is_closed = self.recv_all(cx)?;
128
129        let mut lock = self.incoming_requests.lock();
130
131        match lock.queue.pop_front() {
132            Some(request) => Poll::Ready(Ok(request)),
133            _ => {
134                if is_closed {
135                    Poll::Ready(Err(Error::PeerDisconnected))
136                } else {
137                    // Set the waker to be notified when a command shows up.
138                    lock.listener = CommandListener::Some(cx.waker().clone());
139                    Poll::Pending
140                }
141            }
142        }
143    }
144
145    /// Attempts to receive a response to a request by processing all packets on the socket.
146    /// Resolves to the bytes in the response body if one was received.
147    /// Resolves to an error if there was an error reading from the socket, if the peer
148    /// disconnected, or if the |label| is not being waited on.
149    fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
150        let is_closed = self.recv_all(cx)?;
151
152        let mut waiters = self.response_waiters.lock();
153        let idx = usize::from(label);
154        // We expect() below because the label above came from an internally-created object,
155        // so the waiters should always exist in the map.
156        let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter");
157        if waiter.has_response() {
158            // We got our response.
159            let packet = waiter.pop_received();
160            Poll::Ready(Ok(packet))
161        } else {
162            if is_closed {
163                Poll::Ready(Err(Error::PeerDisconnected))
164            } else {
165                // Set the waker to be notified when a response shows up.
166                waiter.listener = ResponseListener::Some(cx.waker().clone());
167                Poll::Pending
168            }
169        }
170    }
171
172    /// Poll for any packets on the socket
173    /// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite
174    /// if there was a problem communicating on the socket.
175    fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> {
176        if self.channel_closed.load(Ordering::SeqCst) {
177            return Ok(true);
178        }
179        loop {
180            let mut buf = {
181                let mut channel = self.channel.lock();
182                match channel.poll_next_unpin(cx) {
183                    Poll::Ready(Some(Ok(packet))) => packet,
184                    Poll::Ready(Some(Err(zx::Status::PEER_CLOSED))) | Poll::Ready(None) => {
185                        trace!("Peer closed");
186                        self.channel_closed.store(true, Ordering::SeqCst);
187                        return Ok(true);
188                    }
189                    Poll::Ready(Some(Err(e))) => return Err(Error::PeerRead(e)),
190                    Poll::Pending => return Ok(false),
191                }
192            };
193
194            trace!("received packet {:?}", buf);
195            let packet_size = buf.len();
196            // Detects General Reject condition and sends the response back.
197            // On other headers with errors, sends BAD_HEADER to the peer
198            // and attempts to continue.
199            let avctp_header = match Header::decode(buf.as_slice()) {
200                Err(_) => {
201                    // Only possible error is OutOfRange
202                    // Returned only when the packet is too small, can't make a meaningful reject.
203                    info!("received unrejectable message");
204                    continue;
205                }
206                Ok(x) => x,
207            };
208
209            // We only support AV remote targeted AVCTP messages on this socket.
210            // Send a rejection AVCTP messages with invalid profile id bit set to true.
211            if avctp_header.profile_id() != AV_REMOTE_PROFILE {
212                info!("received packet not targeted at remote profile service class");
213                let resp_avct = avctp_header.create_invalid_profile_id_response();
214                self.send_packet(&resp_avct, &[])?;
215                continue;
216            }
217
218            if packet_size == avctp_header.encoded_len() {
219                // Only the avctp header was sent with no payload.
220                info!("received incomplete packet");
221                continue;
222            }
223
224            let body = buf.split_off(avctp_header.encoded_len());
225            // Commands from the remote get translated into requests.
226            match avctp_header.message_type() {
227                MessageType::Command => {
228                    let mut lock = self.incoming_requests.lock();
229                    lock.queue.push_back(Packet { header: avctp_header, body });
230                    if let CommandListener::Some(ref waker) = lock.listener {
231                        waker.wake_by_ref();
232                    }
233                }
234                MessageType::Response => {
235                    // Should be a response to a command we sent.
236                    let mut waiters = self.response_waiters.lock();
237                    let idx = usize::from(avctp_header.label());
238
239                    if let Some(waiter) = waiters.get_mut(idx) {
240                        waiter.queue.push_back(Packet { header: avctp_header, body });
241                        let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New);
242                        if let ResponseListener::Some(waker) = old_entry {
243                            waker.wake();
244                        }
245                    } else {
246                        trace!("response for {:?} we did not send, dropping", avctp_header.label());
247                    };
248                    // Note: we drop any TxLabel response we are not waiting for
249                }
250            }
251        }
252    }
253
254    // Wakes up an arbitrary task that has begun polling on the channel so that
255    // it will call recv_all and be registered as the new channel reader.
256    fn wake_any(&self) {
257        // Try to wake up response waiters first, rather than the event listener.
258        // The event listener is a stream, and so could be between poll_nexts,
259        // Response waiters should always be actively polled once
260        // they've begun being polled on a task.
261        {
262            let lock = self.response_waiters.lock();
263            for (_, response_waiter) in lock.iter() {
264                if let ResponseListener::Some(ref waker) = response_waiter.listener {
265                    waker.wake_by_ref();
266                    return;
267                }
268            }
269        }
270        {
271            let lock = self.incoming_requests.lock();
272            if let CommandListener::Some(ref waker) = lock.listener {
273                waker.wake_by_ref();
274                return;
275            }
276        }
277    }
278
279    pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> {
280        let mut rbuf = vec![0 as u8; resp_header.encoded_len()];
281        resp_header.encode(&mut rbuf)?;
282        if body.len() > 0 {
283            rbuf.extend_from_slice(body);
284        }
285        let _ = self.channel.lock().write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?;
286        Ok(())
287    }
288}
289
290/// A stream of requests from the remote peer.
291#[derive(Debug)]
292pub struct CommandStream {
293    inner: Arc<PeerInner>,
294    terminated: bool,
295}
296
297impl Unpin for CommandStream {}
298
299impl FusedStream for CommandStream {
300    fn is_terminated(&self) -> bool {
301        self.terminated
302    }
303}
304
305impl Stream for CommandStream {
306    type Item = Result<Command>;
307
308    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309        if self.terminated {
310            return Poll::Ready(None);
311        }
312
313        match ready!(self.inner.poll_recv_request(cx)) {
314            Ok(Packet { header, body, .. }) => Poll::Ready(Some(Ok(Command {
315                peer: self.inner.clone(),
316                avctp_header: header,
317                body,
318            }))),
319            Err(e) if e == Error::PeerDisconnected => {
320                self.terminated = true;
321                Poll::Ready(None)
322            }
323            Err(e) => Poll::Ready(Some(Err(e))),
324        }
325    }
326}
327
328impl Drop for CommandStream {
329    fn drop(&mut self) {
330        self.inner.incoming_requests.lock().listener = CommandListener::None;
331        self.inner.wake_any();
332    }
333}
334
335#[derive(Debug)]
336pub struct Command {
337    peer: Arc<PeerInner>,
338    avctp_header: Header,
339    body: Vec<u8>,
340}
341
342impl Command {
343    pub fn header(&self) -> &Header {
344        &self.avctp_header
345    }
346
347    pub fn body(&self) -> &[u8] {
348        &self.body[..]
349    }
350
351    pub fn send_response(&self, body: &[u8]) -> Result<()> {
352        let response_header = self.avctp_header.create_response(PacketType::Single);
353        self.peer.send_packet(&response_header, body)
354    }
355}
356
357#[derive(Debug)]
358pub struct Packet {
359    header: Header,
360    body: Vec<u8>,
361}
362
363impl Packet {
364    pub fn header(&self) -> &Header {
365        &self.header
366    }
367
368    pub fn body(&self) -> &[u8] {
369        &self.body[..]
370    }
371}
372
373#[derive(Debug, Default)]
374struct CommandQueue {
375    listener: CommandListener,
376    queue: VecDeque<Packet>,
377}
378
379#[derive(Debug)]
380enum CommandListener {
381    /// No one is listening.
382    None,
383    /// Someone wants to listen but hasn't polled.
384    New,
385    /// Someone is listening, and can be woken with the waker.
386    Some(Waker),
387}
388
389impl Default for CommandListener {
390    fn default() -> Self {
391        CommandListener::None
392    }
393}
394
395#[derive(Debug, Default)]
396struct ResponseWaiter {
397    listener: ResponseListener,
398    queue: VecDeque<Packet>,
399}
400
401/// An enum representing an interest in the response to a command.
402#[derive(Debug)]
403enum ResponseListener {
404    /// A new waiter which hasn't been polled yet.
405    New,
406    /// A task waiting for a response, which can be woken with the waker.
407    Some(Waker),
408}
409
410impl Default for ResponseListener {
411    fn default() -> Self {
412        ResponseListener::New
413    }
414}
415
416impl ResponseWaiter {
417    /// Check if a message has been received.
418    fn has_response(&self) -> bool {
419        !self.queue.is_empty()
420    }
421
422    fn pop_received(&mut self) -> Packet {
423        if !self.has_response() {
424            panic!("expected received buf");
425        }
426        self.queue.pop_front().expect("response listener packet queue is unexpectedly empty")
427    }
428}
429
430/// A stream wrapper that polls for the responses to a command we sent.
431/// Removes the associated response waiter when dropped or explicitly
432/// completed.
433#[derive(Debug)]
434pub struct CommandResponseStream {
435    id: Option<TxLabel>,
436    inner: Arc<PeerInner>,
437    done: bool,
438}
439
440impl CommandResponseStream {
441    fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream {
442        CommandResponseStream { id: Some(id), inner, done: false }
443    }
444
445    pub fn complete(&mut self) {
446        if let Some(id) = &self.id {
447            self.inner.remove_response_interest(id);
448            self.id = None;
449            self.done = true;
450            self.inner.wake_any();
451        }
452    }
453}
454
455impl Unpin for CommandResponseStream {}
456
457impl FusedStream for CommandResponseStream {
458    fn is_terminated(&self) -> bool {
459        self.done == true
460    }
461}
462
463impl Stream for CommandResponseStream {
464    type Item = Result<Packet>;
465    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
466        let this = &mut *self;
467        if this.is_terminated() {
468            return Poll::Ready(None);
469        }
470        let Some(id) = &this.id else {
471            this.done = true;
472            return Poll::Ready(None);
473        };
474
475        match ready!(this.inner.poll_recv_response(id, cx)) {
476            Ok(packet) => {
477                trace!("received response packet {:?}", packet);
478                if packet.header().is_invalid_profile_id() {
479                    Poll::Ready(Some(Err(Error::InvalidProfileId)))
480                } else {
481                    Poll::Ready(Some(Ok(packet)))
482                }
483            }
484            Err(e) if e == Error::PeerDisconnected => {
485                this.done = true;
486                Poll::Ready(None)
487            }
488            Err(e) => Poll::Ready(Some(Err(e))),
489        }
490    }
491}
492
493impl Drop for CommandResponseStream {
494    fn drop(&mut self) {
495        self.complete();
496    }
497}