1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use fuchsia_bluetooth::types::Channel;
6use fuchsia_sync::Mutex;
78use futures::ready;
9use futures::stream::{FusedStream, Stream};
10use futures::task::{Context, Poll, Waker};
11use log::{info, trace, warn};
12use packet_encoding::{Decodable, Encodable};
13use slab::Slab;
14use std::collections::VecDeque;
15use std::mem;
16use std::pin::Pin;
17use std::sync::Arc;
1819#[cfg(test)]
20mod tests;
2122mod types;
2324use crate::{Error, Result};
2526use self::types::AV_REMOTE_PROFILE;
2728pub use self::types::{Header, MessageType, PacketType, TxLabel};
2930/// Peer connection to a remote device uses AVCTP protocol over an L2CAP socket. Used by the AVC
31/// peer that encapsulates this peer connection and directly in AVRCP for non AV\C connections like
32/// the browse channel.
33#[derive(Debug)]
34pub struct Peer {
35 inner: Arc<PeerInner>,
36}
3738#[derive(Debug)]
39struct PeerInner {
40/// Channel to the remote device owned by this peer object.
41channel: Channel,
4243/// A map of transaction ids that have been sent but the response has not
44 /// been received and/or processed yet.
45 ///
46 /// Waiters are added with `add_response_waiter` and get removed when they are
47 /// polled or they are removed with `remove_waiter`
48response_waiters: Mutex<Slab<ResponseWaiter>>,
4950/// A queue of requests that have been received and are waiting to
51 /// be responded to, along with the waker for the task that has
52 /// taken the request receiver (if it exists)
53incoming_requests: Mutex<CommandQueue>,
54}
5556impl Peer {
57/// Create a new peer object from a established L2CAP socket with the peer.
58pub fn new(channel: Channel) -> Self {
59Self { inner: Arc::new(PeerInner::new(channel)) }
60 }
6162/// Returns a stream of incoming commands from a remote peer.
63 /// Stream returns Command objects on success that can be used to send back responses.
64pub fn take_command_stream(&self) -> CommandStream {
65 {
66let mut lock = self.inner.incoming_requests.lock();
67if let CommandListener::None = lock.listener {
68 lock.listener = CommandListener::New;
69 } else {
70panic!("Command stream has already been taken");
71 }
72 }
7374 CommandStream { inner: self.inner.clone() }
75 }
7677/// Send an outgoing command to the remote peer. Returns a CommandResponseStream to
78 /// handle incoming response packets.
79pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> {
80let id = self.inner.add_response_waiter()?;
81let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false);
82 {
83self.inner.send_packet(&avctp_header, payload)?;
84 }
8586Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone()))
87 }
88}
8990impl PeerInner {
91fn new(channel: Channel) -> Self {
92Self {
93 channel,
94 response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
95 incoming_requests: Mutex::<CommandQueue>::default(),
96 }
97 }
9899/// Add a response waiter, and return a id that can be used to send the
100 /// transaction. Responses then can be received using poll_recv_response
101fn add_response_waiter(&self) -> Result<TxLabel> {
102let key = self.response_waiters.lock().insert(ResponseWaiter::default());
103let id = TxLabel::try_from(key as u8);
104if id.is_err() {
105warn!("Transaction IDs are exhausted");
106let _ = self.response_waiters.lock().remove(key);
107 }
108 id
109 }
110111/// When a waiter isn't interested in the response anymore, we need to just
112 /// throw it out. This is called when the response future is dropped.
113fn remove_response_interest(&self, id: &TxLabel) {
114let mut lock = self.response_waiters.lock();
115let idx = usize::from(id);
116let _ = lock.remove(idx);
117 }
118119/// Attempts to receive a new request by processing all packets on the socket.
120 /// Resolves to an unprocessed request (header, body) if one was received.
121 /// Resolves to an error if there was an error reading from the socket or if the peer
122 /// disconnected.
123fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
124let is_closed = self.recv_all(cx)?;
125126let mut lock = self.incoming_requests.lock();
127128match lock.queue.pop_front() {
129Some(request) => Poll::Ready(Ok(request)),
130_ => {
131if is_closed {
132 Poll::Ready(Err(Error::PeerDisconnected))
133 } else {
134// Set the waker to be notified when a command shows up.
135lock.listener = CommandListener::Some(cx.waker().clone());
136 Poll::Pending
137 }
138 }
139 }
140 }
141142/// Attempts to receive a response to a request by processing all packets on the socket.
143 /// Resolves to the bytes in the response body if one was received.
144 /// Resolves to an error if there was an error reading from the socket, if the peer
145 /// disconnected, or if the |label| is not being waited on.
146fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
147let is_closed = self.recv_all(cx)?;
148149let mut waiters = self.response_waiters.lock();
150let idx = usize::from(label);
151// We expect() below because the label above came from an internally-created object,
152 // so the waiters should always exist in the map.
153let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter");
154if waiter.has_response() {
155// We got our response.
156let packet = waiter.pop_received();
157 Poll::Ready(Ok(packet))
158 } else {
159if is_closed {
160 Poll::Ready(Err(Error::PeerDisconnected))
161 } else {
162// Set the waker to be notified when a response shows up.
163waiter.listener = ResponseListener::Some(cx.waker().clone());
164 Poll::Pending
165 }
166 }
167 }
168169/// Poll for any packets on the socket
170 /// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite
171 /// if there was a problem communicating on the socket.
172fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> {
173let mut buf = Vec::<u8>::new();
174loop {
175let packet_size = match self.channel.poll_datagram(cx, &mut buf) {
176 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
177trace!("Peer closed");
178return Ok(true);
179 }
180 Poll::Ready(Err(e)) => return Err(Error::PeerRead(e)),
181 Poll::Pending => return Ok(false),
182 Poll::Ready(Ok(size)) => size,
183 };
184if packet_size == 0 {
185continue;
186 }
187trace!("received packet {:?}", buf);
188// Detects General Reject condition and sends the response back.
189 // On other headers with errors, sends BAD_HEADER to the peer
190 // and attempts to continue.
191let avctp_header = match Header::decode(buf.as_slice()) {
192Err(_) => {
193// Only possible error is OutOfRange
194 // Returned only when the packet is too small, can't make a meaningful reject.
195info!("received unrejectable message");
196 buf = buf.split_off(packet_size);
197continue;
198 }
199Ok(x) => x,
200 };
201202// We only support AV remote targeted AVCTP messages on this socket.
203 // Send a rejection AVCTP messages with invalid profile id bit set to true.
204if avctp_header.profile_id() != AV_REMOTE_PROFILE {
205info!("received packet not targeted at remote profile service class");
206let resp_avct = avctp_header.create_invalid_profile_id_response();
207self.send_packet(&resp_avct, &[])?;
208 buf = buf.split_off(packet_size);
209continue;
210 }
211212if packet_size == avctp_header.encoded_len() {
213// Only the avctp header was sent with no payload.
214info!("received incomplete packet");
215 buf = buf.split_off(packet_size);
216continue;
217 }
218219let rest = buf.split_off(packet_size);
220let body = buf.split_off(avctp_header.encoded_len());
221// Commands from the remote get translated into requests.
222match avctp_header.message_type() {
223 MessageType::Command => {
224let mut lock = self.incoming_requests.lock();
225 lock.queue.push_back(Packet { header: avctp_header, body: body.to_vec() });
226if let CommandListener::Some(ref waker) = lock.listener {
227 waker.wake_by_ref();
228 }
229 buf = rest;
230 }
231 MessageType::Response => {
232// Should be a response to a command we sent.
233let mut waiters = self.response_waiters.lock();
234let idx = usize::from(avctp_header.label());
235236if let Some(waiter) = waiters.get_mut(idx) {
237 waiter
238 .queue
239 .push_back(Packet { header: avctp_header, body: body.to_vec() });
240let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New);
241if let ResponseListener::Some(waker) = old_entry {
242 waker.wake();
243 }
244 } else {
245trace!("response for {:?} we did not send, dropping", avctp_header.label());
246 };
247 buf = rest;
248// Note: we drop any TxLabel response we are not waiting for
249}
250 }
251 }
252 }
253254// Wakes up an arbitrary task that has begun polling on the channel so that
255 // it will call recv_all and be registered as the new channel reader.
256fn wake_any(&self) {
257// Try to wake up response waiters first, rather than the event listener.
258 // The event listener is a stream, and so could be between poll_nexts,
259 // Response waiters should always be actively polled once
260 // they've begun being polled on a task.
261{
262let lock = self.response_waiters.lock();
263for (_, response_waiter) in lock.iter() {
264if let ResponseListener::Some(ref waker) = response_waiter.listener {
265 waker.wake_by_ref();
266return;
267 }
268 }
269 }
270 {
271let lock = self.incoming_requests.lock();
272if let CommandListener::Some(ref waker) = lock.listener {
273 waker.wake_by_ref();
274return;
275 }
276 }
277 }
278279pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> {
280let mut rbuf = vec![0 as u8; resp_header.encoded_len()];
281 resp_header.encode(&mut rbuf)?;
282if body.len() > 0 {
283 rbuf.extend_from_slice(body);
284 }
285let _ = self.channel.write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?;
286Ok(())
287 }
288}
289290/// A stream of requests from the remote peer.
291#[derive(Debug)]
292pub struct CommandStream {
293 inner: Arc<PeerInner>,
294}
295296impl Unpin for CommandStream {}
297298impl Stream for CommandStream {
299type Item = Result<Command>;
300301fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
302 Poll::Ready(match ready!(self.inner.poll_recv_request(cx)) {
303Ok(Packet { header, body, .. }) => {
304Some(Ok(Command { peer: self.inner.clone(), avctp_header: header, body }))
305 }
306Err(Error::PeerDisconnected) => None,
307Err(e) => Some(Err(e)),
308 })
309 }
310}
311312impl Drop for CommandStream {
313fn drop(&mut self) {
314self.inner.incoming_requests.lock().listener = CommandListener::None;
315self.inner.wake_any();
316 }
317}
318319#[derive(Debug)]
320pub struct Command {
321 peer: Arc<PeerInner>,
322 avctp_header: Header,
323 body: Vec<u8>,
324}
325326impl Command {
327pub fn header(&self) -> &Header {
328&self.avctp_header
329 }
330331pub fn body(&self) -> &[u8] {
332&self.body[..]
333 }
334335pub fn send_response(&self, body: &[u8]) -> Result<()> {
336let response_header = self.avctp_header.create_response(PacketType::Single);
337self.peer.send_packet(&response_header, body)
338 }
339}
340341#[derive(Debug)]
342pub struct Packet {
343 header: Header,
344 body: Vec<u8>,
345}
346347impl Packet {
348pub fn header(&self) -> &Header {
349&self.header
350 }
351352pub fn body(&self) -> &[u8] {
353&self.body[..]
354 }
355}
356357#[derive(Debug, Default)]
358struct CommandQueue {
359 listener: CommandListener,
360 queue: VecDeque<Packet>,
361}
362363#[derive(Debug)]
364enum CommandListener {
365/// No one is listening.
366None,
367/// Someone wants to listen but hasn't polled.
368New,
369/// Someone is listening, and can be woken with the waker.
370Some(Waker),
371}
372373impl Default for CommandListener {
374fn default() -> Self {
375 CommandListener::None
376 }
377}
378379#[derive(Debug, Default)]
380struct ResponseWaiter {
381 listener: ResponseListener,
382 queue: VecDeque<Packet>,
383}
384385/// An enum representing an interest in the response to a command.
386#[derive(Debug)]
387enum ResponseListener {
388/// A new waiter which hasn't been polled yet.
389New,
390/// A task waiting for a response, which can be woken with the waker.
391Some(Waker),
392}
393394impl Default for ResponseListener {
395fn default() -> Self {
396 ResponseListener::New
397 }
398}
399400impl ResponseWaiter {
401/// Check if a message has been received.
402fn has_response(&self) -> bool {
403 !self.queue.is_empty()
404 }
405406fn pop_received(&mut self) -> Packet {
407if !self.has_response() {
408panic!("expected received buf");
409 }
410self.queue.pop_front().expect("response listener packet queue is unexpectedly empty")
411 }
412}
413414/// A stream wrapper that polls for the responses to a command we sent.
415/// Removes the associated response waiter when dropped or explicitly
416/// completed.
417#[derive(Debug)]
418pub struct CommandResponseStream {
419 id: Option<TxLabel>,
420 inner: Arc<PeerInner>,
421 done: bool,
422}
423424impl CommandResponseStream {
425fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream {
426 CommandResponseStream { id: Some(id), inner, done: false }
427 }
428429pub fn complete(&mut self) {
430if let Some(id) = &self.id {
431self.inner.remove_response_interest(id);
432self.id = None;
433self.done = true;
434self.inner.wake_any();
435 }
436 }
437}
438439impl Unpin for CommandResponseStream {}
440441impl FusedStream for CommandResponseStream {
442fn is_terminated(&self) -> bool {
443self.done == true
444}
445}
446447impl Stream for CommandResponseStream {
448type Item = Result<Packet>;
449fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
450let this = &mut *self;
451if let Some(id) = &this.id {
452 Poll::Ready(match ready!(this.inner.poll_recv_response(id, cx)) {
453Ok(packet) => {
454trace!("received response packet {:?}", packet);
455if packet.header().is_invalid_profile_id() {
456Some(Err(Error::InvalidProfileId))
457 } else {
458Some(Ok(packet))
459 }
460 }
461Err(Error::PeerDisconnected) => {
462 this.done = true;
463None
464}
465Err(e) => Some(Err(e)),
466 })
467 } else {
468 this.done = true;
469return Poll::Ready(None);
470 }
471 }
472}
473474impl Drop for CommandResponseStream {
475fn drop(&mut self) {
476self.complete();
477 }
478}