Skip to main content

bt_avctp/avctp/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use fuchsia_sync::Mutex;
7
8use futures::ready;
9use futures::stream::{FusedStream, Stream, StreamExt};
10use futures::task::{Context, Poll, Waker};
11use log::{info, trace, warn};
12use packet_encoding::{Decodable, Encodable};
13use slab::Slab;
14use std::collections::VecDeque;
15use std::mem;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(test)]
21mod tests;
22mod types;
23
24use crate::{Error, Result};
25
26use self::types::AV_REMOTE_PROFILE;
27
28pub use self::types::{Header, MessageType, PacketType, TxLabel};
29
30/// Peer connection to a remote device uses AVCTP protocol over an L2CAP socket. Used by the AVC
31/// peer that encapsulates this peer connection and directly in AVRCP for non AV\C connections like
32/// the browse channel.
33#[derive(Debug)]
34pub struct Peer {
35    inner: Arc<PeerInner>,
36}
37
38#[derive(Debug)]
39struct PeerInner {
40    /// Channel to the remote device owned by this peer object.
41    channel: Mutex<Channel>,
42
43    channel_closed: AtomicBool,
44
45    /// A map of transaction ids that have been sent but the response has not
46    /// been received and/or processed yet.
47    ///
48    /// Waiters are added with `add_response_waiter` and get removed when they are
49    /// polled or they are removed with `remove_waiter`
50    response_waiters: Mutex<Slab<ResponseWaiter>>,
51
52    /// A queue of requests that have been received and are waiting to
53    /// be responded to, along with the waker for the task that has
54    /// taken the request receiver (if it exists)
55    incoming_requests: Mutex<CommandQueue>,
56}
57
58impl Peer {
59    /// Create a new peer object from a established L2CAP socket with the peer.
60    pub fn new(channel: Channel) -> Self {
61        Self { inner: Arc::new(PeerInner::new(channel)) }
62    }
63
64    /// Returns a stream of incoming commands from a remote peer.
65    /// Stream returns Command objects on success that can be used to send back responses.
66    pub fn take_command_stream(&self) -> CommandStream {
67        {
68            let mut lock = self.inner.incoming_requests.lock();
69            if let CommandListener::None = lock.listener {
70                lock.listener = CommandListener::New;
71            } else {
72                panic!("Command stream has already been taken");
73            }
74        }
75
76        CommandStream { inner: self.inner.clone(), terminated: false }
77    }
78
79    /// Send an outgoing command to the remote peer. Returns a CommandResponseStream to
80    /// handle incoming response packets.
81    pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> {
82        let id = self.inner.add_response_waiter()?;
83        let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false);
84        {
85            self.inner.send_packet(&avctp_header, payload)?;
86        }
87
88        Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone()))
89    }
90}
91
92impl PeerInner {
93    fn new(channel: Channel) -> Self {
94        Self {
95            channel: Mutex::new(channel),
96            channel_closed: AtomicBool::new(false),
97            response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
98            incoming_requests: Mutex::<CommandQueue>::default(),
99        }
100    }
101
102    /// Add a response waiter, and return a id that can be used to send the
103    /// transaction.  Responses then can be received using poll_recv_response
104    fn add_response_waiter(&self) -> Result<TxLabel> {
105        let key = self.response_waiters.lock().insert(ResponseWaiter::default());
106        let id = TxLabel::try_from(key as u8);
107        if id.is_err() {
108            warn!("Transaction IDs are exhausted");
109            let _ = self.response_waiters.lock().remove(key);
110        }
111        id
112    }
113
114    /// When a waiter isn't interested in the response anymore, we need to just
115    /// throw it out.  This is called when the response future is dropped.
116    fn remove_response_interest(&self, id: &TxLabel) {
117        let mut lock = self.response_waiters.lock();
118        let idx = usize::from(id);
119        let _ = lock.remove(idx);
120    }
121
122    /// Attempts to receive a new request by processing all packets on the socket.
123    /// Resolves to an unprocessed request (header, body) if one was received.
124    /// Resolves to an error if there was an error reading from the socket or if the peer
125    /// disconnected.
126    fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
127        let is_closed = self.recv_all(cx)?;
128
129        let mut lock = self.incoming_requests.lock();
130
131        match lock.queue.pop_front() {
132            Some(request) => Poll::Ready(Ok(request)),
133            _ => {
134                if is_closed {
135                    Poll::Ready(Err(Error::PeerDisconnected))
136                } else {
137                    // Set the waker to be notified when a command shows up.
138                    lock.listener = CommandListener::Some(cx.waker().clone());
139                    Poll::Pending
140                }
141            }
142        }
143    }
144
145    /// Attempts to receive a response to a request by processing all packets on the socket.
146    /// Resolves to the bytes in the response body if one was received.
147    /// Resolves to an error if there was an error reading from the socket, if the peer
148    /// disconnected, or if the |label| is not being waited on.
149    fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
150        let is_closed = self.recv_all(cx)?;
151
152        let mut waiters = self.response_waiters.lock();
153        let idx = usize::from(label);
154        // We expect() below because the label above came from an internally-created object,
155        // so the waiters should always exist in the map.
156        let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter");
157        if waiter.has_response() {
158            // We got our response.
159            let packet = waiter.pop_received();
160            Poll::Ready(Ok(packet))
161        } else {
162            if is_closed {
163                Poll::Ready(Err(Error::PeerDisconnected))
164            } else {
165                // Set the waker to be notified when a response shows up.
166                waiter.listener = ResponseListener::Some(cx.waker().clone());
167                Poll::Pending
168            }
169        }
170    }
171
172    /// Poll for any packets on the socket
173    /// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite
174    /// if there was a problem communicating on the socket.
175    fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> {
176        if self.channel_closed.load(Ordering::SeqCst) {
177            return Ok(true);
178        }
179        loop {
180            let mut buf = {
181                let mut channel = self.channel.lock();
182                match channel.poll_next_unpin(cx) {
183                    Poll::Ready(Some(Ok(packet))) => packet,
184                    Poll::Ready(Some(Err(zx::Status::PEER_CLOSED))) | Poll::Ready(None) => {
185                        trace!("Peer closed");
186                        self.channel_closed.store(true, Ordering::SeqCst);
187                        return Ok(true);
188                    }
189                    Poll::Ready(Some(Err(e))) => return Err(Error::PeerRead(e)),
190                    Poll::Pending => return Ok(false),
191                }
192            };
193
194            trace!("received packet {:?}", buf);
195            let packet_size = buf.len();
196            // Detects General Reject condition and sends the response back.
197            // On other headers with errors, sends BAD_HEADER to the peer
198            // and attempts to continue.
199            let avctp_header = match Header::decode(buf.as_slice()) {
200                Err(_) => {
201                    // Only possible error is OutOfRange
202                    // Returned only when the packet is too small, can't make a meaningful reject.
203                    info!("received unrejectable message");
204                    continue;
205                }
206                Ok(x) => x,
207            };
208
209            // We only support AV remote targeted AVCTP messages on this socket.
210            // Send a rejection AVCTP messages with invalid profile id bit set to true.
211            if avctp_header.profile_id() != AV_REMOTE_PROFILE {
212                info!("received packet not targeted at remote profile service class");
213                let resp_avct = avctp_header.create_invalid_profile_id_response();
214                self.send_packet(&resp_avct, &[])?;
215                continue;
216            }
217
218            if packet_size == avctp_header.encoded_len() {
219                // Only the avctp header was sent with no payload.
220                info!("received incomplete packet");
221                continue;
222            }
223
224            let body = buf.split_off(avctp_header.encoded_len());
225            // Commands from the remote get translated into requests.
226            match avctp_header.message_type() {
227                MessageType::Command => {
228                    let mut lock = self.incoming_requests.lock();
229                    lock.queue.push_back(Packet { header: avctp_header, body });
230                    if let CommandListener::Some(ref waker) = lock.listener {
231                        waker.wake_by_ref();
232                    }
233                }
234                MessageType::Response => {
235                    // Should be a response to a command we sent.
236                    let mut waiters = self.response_waiters.lock();
237                    let idx = usize::from(avctp_header.label());
238
239                    if let Some(waiter) = waiters.get_mut(idx) {
240                        waiter.queue.push_back(Packet { header: avctp_header, body });
241                        let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New);
242                        if let ResponseListener::Some(waker) = old_entry {
243                            waker.wake();
244                        }
245                    } else {
246                        trace!("response for {:?} we did not send, dropping", avctp_header.label());
247                    };
248                    // Note: we drop any TxLabel response we are not waiting for
249                }
250            }
251        }
252    }
253
254    // Wakes up an arbitrary task that has begun polling on the channel so that
255    // it will call recv_all and be registered as the new channel reader.
256    fn wake_any(&self) {
257        // Try to wake up response waiters first, rather than the event listener.
258        // The event listener is a stream, and so could be between poll_nexts,
259        // Response waiters should always be actively polled once
260        // they've begun being polled on a task.
261        {
262            let lock = self.response_waiters.lock();
263            for (_, response_waiter) in lock.iter() {
264                if let ResponseListener::Some(ref waker) = response_waiter.listener {
265                    waker.wake_by_ref();
266                    return;
267                }
268            }
269        }
270        {
271            let lock = self.incoming_requests.lock();
272            if let CommandListener::Some(ref waker) = lock.listener {
273                waker.wake_by_ref();
274                return;
275            }
276        }
277    }
278
279    pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> {
280        let mut rbuf = vec![0 as u8; resp_header.encoded_len()];
281        resp_header.encode(&mut rbuf)?;
282        if body.len() > 0 {
283            rbuf.extend_from_slice(body);
284        }
285        let _ = self.channel.lock().write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?;
286        Ok(())
287    }
288}
289
290/// A stream of requests from the remote peer.
291#[derive(Debug)]
292pub struct CommandStream {
293    inner: Arc<PeerInner>,
294    terminated: bool,
295}
296
297impl Unpin for CommandStream {}
298
299impl FusedStream for CommandStream {
300    fn is_terminated(&self) -> bool {
301        self.terminated
302    }
303}
304
305impl Stream for CommandStream {
306    type Item = Result<Command>;
307
308    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309        if self.terminated {
310            return Poll::Ready(None);
311        }
312
313        match ready!(self.inner.poll_recv_request(cx)) {
314            Ok(Packet { header, body, .. }) => Poll::Ready(Some(Ok(Command {
315                peer: self.inner.clone(),
316                avctp_header: header,
317                body,
318            }))),
319            Err(e) if e == Error::PeerDisconnected => {
320                self.terminated = true;
321                Poll::Ready(None)
322            }
323            Err(e) => Poll::Ready(Some(Err(e))),
324        }
325    }
326}
327
328impl Drop for CommandStream {
329    fn drop(&mut self) {
330        self.inner.incoming_requests.lock().listener = CommandListener::None;
331        self.inner.wake_any();
332    }
333}
334
335pub struct Command {
336    peer: Arc<PeerInner>,
337    avctp_header: Header,
338    body: Vec<u8>,
339}
340
341impl std::fmt::Debug for Command {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        f.debug_struct("Command")
344            .field("avctp_header", &self.avctp_header)
345            .field("body_len", &self.body.len())
346            .finish()
347    }
348}
349
350impl Command {
351    pub fn header(&self) -> &Header {
352        &self.avctp_header
353    }
354
355    pub fn body(&self) -> &[u8] {
356        &self.body[..]
357    }
358
359    pub fn send_response(&self, body: &[u8]) -> Result<()> {
360        let response_header = self.avctp_header.create_response(PacketType::Single);
361        self.peer.send_packet(&response_header, body)
362    }
363}
364
365#[derive(Debug)]
366pub struct Packet {
367    header: Header,
368    body: Vec<u8>,
369}
370
371impl Packet {
372    pub fn header(&self) -> &Header {
373        &self.header
374    }
375
376    pub fn body(&self) -> &[u8] {
377        &self.body[..]
378    }
379}
380
381#[derive(Debug, Default)]
382struct CommandQueue {
383    listener: CommandListener,
384    queue: VecDeque<Packet>,
385}
386
387#[derive(Debug)]
388enum CommandListener {
389    /// No one is listening.
390    None,
391    /// Someone wants to listen but hasn't polled.
392    New,
393    /// Someone is listening, and can be woken with the waker.
394    Some(Waker),
395}
396
397impl Default for CommandListener {
398    fn default() -> Self {
399        CommandListener::None
400    }
401}
402
403#[derive(Debug, Default)]
404struct ResponseWaiter {
405    listener: ResponseListener,
406    queue: VecDeque<Packet>,
407}
408
409/// An enum representing an interest in the response to a command.
410#[derive(Debug)]
411enum ResponseListener {
412    /// A new waiter which hasn't been polled yet.
413    New,
414    /// A task waiting for a response, which can be woken with the waker.
415    Some(Waker),
416}
417
418impl Default for ResponseListener {
419    fn default() -> Self {
420        ResponseListener::New
421    }
422}
423
424impl ResponseWaiter {
425    /// Check if a message has been received.
426    fn has_response(&self) -> bool {
427        !self.queue.is_empty()
428    }
429
430    fn pop_received(&mut self) -> Packet {
431        if !self.has_response() {
432            panic!("expected received buf");
433        }
434        self.queue.pop_front().expect("response listener packet queue is unexpectedly empty")
435    }
436}
437
438/// A stream wrapper that polls for the responses to a command we sent.
439/// Removes the associated response waiter when dropped or explicitly
440/// completed.
441#[derive(Debug)]
442pub struct CommandResponseStream {
443    id: Option<TxLabel>,
444    inner: Arc<PeerInner>,
445    done: bool,
446}
447
448impl CommandResponseStream {
449    fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream {
450        CommandResponseStream { id: Some(id), inner, done: false }
451    }
452
453    pub fn complete(&mut self) {
454        if let Some(id) = &self.id {
455            self.inner.remove_response_interest(id);
456            self.id = None;
457            self.done = true;
458            self.inner.wake_any();
459        }
460    }
461}
462
463impl Unpin for CommandResponseStream {}
464
465impl FusedStream for CommandResponseStream {
466    fn is_terminated(&self) -> bool {
467        self.done == true
468    }
469}
470
471impl Stream for CommandResponseStream {
472    type Item = Result<Packet>;
473    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
474        let this = &mut *self;
475        if this.is_terminated() {
476            return Poll::Ready(None);
477        }
478        let Some(id) = &this.id else {
479            this.done = true;
480            return Poll::Ready(None);
481        };
482
483        match ready!(this.inner.poll_recv_response(id, cx)) {
484            Ok(packet) => {
485                trace!("received response packet {:?}", packet);
486                if packet.header().is_invalid_profile_id() {
487                    Poll::Ready(Some(Err(Error::InvalidProfileId)))
488                } else {
489                    Poll::Ready(Some(Ok(packet)))
490                }
491            }
492            Err(e) if e == Error::PeerDisconnected => {
493                this.done = true;
494                Poll::Ready(None)
495            }
496            Err(e) => Poll::Ready(Some(Err(e))),
497        }
498    }
499}
500
501impl Drop for CommandResponseStream {
502    fn drop(&mut self) {
503        self.complete();
504    }
505}