1use fuchsia_bluetooth::types::Channel;
6use fuchsia_sync::Mutex;
7
8use futures::ready;
9use futures::stream::{FusedStream, Stream, StreamExt};
10use futures::task::{Context, Poll, Waker};
11use log::{info, trace, warn};
12use packet_encoding::{Decodable, Encodable};
13use slab::Slab;
14use std::collections::VecDeque;
15use std::mem;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(test)]
21mod tests;
22mod types;
23
24use crate::{Error, Result};
25
26use self::types::AV_REMOTE_PROFILE;
27
28pub use self::types::{Header, MessageType, PacketType, TxLabel};
29
30#[derive(Debug)]
34pub struct Peer {
35 inner: Arc<PeerInner>,
36}
37
38#[derive(Debug)]
39struct PeerInner {
40 channel: Mutex<Channel>,
42
43 channel_closed: AtomicBool,
44
45 response_waiters: Mutex<Slab<ResponseWaiter>>,
51
52 incoming_requests: Mutex<CommandQueue>,
56}
57
58impl Peer {
59 pub fn new(channel: Channel) -> Self {
61 Self { inner: Arc::new(PeerInner::new(channel)) }
62 }
63
64 pub fn take_command_stream(&self) -> CommandStream {
67 {
68 let mut lock = self.inner.incoming_requests.lock();
69 if let CommandListener::None = lock.listener {
70 lock.listener = CommandListener::New;
71 } else {
72 panic!("Command stream has already been taken");
73 }
74 }
75
76 CommandStream { inner: self.inner.clone(), terminated: false }
77 }
78
79 pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> {
82 let id = self.inner.add_response_waiter()?;
83 let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false);
84 {
85 self.inner.send_packet(&avctp_header, payload)?;
86 }
87
88 Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone()))
89 }
90}
91
92impl PeerInner {
93 fn new(channel: Channel) -> Self {
94 Self {
95 channel: Mutex::new(channel),
96 channel_closed: AtomicBool::new(false),
97 response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
98 incoming_requests: Mutex::<CommandQueue>::default(),
99 }
100 }
101
102 fn add_response_waiter(&self) -> Result<TxLabel> {
105 let key = self.response_waiters.lock().insert(ResponseWaiter::default());
106 let id = TxLabel::try_from(key as u8);
107 if id.is_err() {
108 warn!("Transaction IDs are exhausted");
109 let _ = self.response_waiters.lock().remove(key);
110 }
111 id
112 }
113
114 fn remove_response_interest(&self, id: &TxLabel) {
117 let mut lock = self.response_waiters.lock();
118 let idx = usize::from(id);
119 let _ = lock.remove(idx);
120 }
121
122 fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
127 let is_closed = self.recv_all(cx)?;
128
129 let mut lock = self.incoming_requests.lock();
130
131 match lock.queue.pop_front() {
132 Some(request) => Poll::Ready(Ok(request)),
133 _ => {
134 if is_closed {
135 Poll::Ready(Err(Error::PeerDisconnected))
136 } else {
137 lock.listener = CommandListener::Some(cx.waker().clone());
139 Poll::Pending
140 }
141 }
142 }
143 }
144
145 fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
150 let is_closed = self.recv_all(cx)?;
151
152 let mut waiters = self.response_waiters.lock();
153 let idx = usize::from(label);
154 let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter");
157 if waiter.has_response() {
158 let packet = waiter.pop_received();
160 Poll::Ready(Ok(packet))
161 } else {
162 if is_closed {
163 Poll::Ready(Err(Error::PeerDisconnected))
164 } else {
165 waiter.listener = ResponseListener::Some(cx.waker().clone());
167 Poll::Pending
168 }
169 }
170 }
171
172 fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> {
176 if self.channel_closed.load(Ordering::SeqCst) {
177 return Ok(true);
178 }
179 loop {
180 let mut buf = {
181 let mut channel = self.channel.lock();
182 match channel.poll_next_unpin(cx) {
183 Poll::Ready(Some(Ok(packet))) => packet,
184 Poll::Ready(Some(Err(zx::Status::PEER_CLOSED))) | Poll::Ready(None) => {
185 trace!("Peer closed");
186 self.channel_closed.store(true, Ordering::SeqCst);
187 return Ok(true);
188 }
189 Poll::Ready(Some(Err(e))) => return Err(Error::PeerRead(e)),
190 Poll::Pending => return Ok(false),
191 }
192 };
193
194 trace!("received packet {:?}", buf);
195 let packet_size = buf.len();
196 let avctp_header = match Header::decode(buf.as_slice()) {
200 Err(_) => {
201 info!("received unrejectable message");
204 continue;
205 }
206 Ok(x) => x,
207 };
208
209 if avctp_header.profile_id() != AV_REMOTE_PROFILE {
212 info!("received packet not targeted at remote profile service class");
213 let resp_avct = avctp_header.create_invalid_profile_id_response();
214 self.send_packet(&resp_avct, &[])?;
215 continue;
216 }
217
218 if packet_size == avctp_header.encoded_len() {
219 info!("received incomplete packet");
221 continue;
222 }
223
224 let body = buf.split_off(avctp_header.encoded_len());
225 match avctp_header.message_type() {
227 MessageType::Command => {
228 let mut lock = self.incoming_requests.lock();
229 lock.queue.push_back(Packet { header: avctp_header, body });
230 if let CommandListener::Some(ref waker) = lock.listener {
231 waker.wake_by_ref();
232 }
233 }
234 MessageType::Response => {
235 let mut waiters = self.response_waiters.lock();
237 let idx = usize::from(avctp_header.label());
238
239 if let Some(waiter) = waiters.get_mut(idx) {
240 waiter.queue.push_back(Packet { header: avctp_header, body });
241 let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New);
242 if let ResponseListener::Some(waker) = old_entry {
243 waker.wake();
244 }
245 } else {
246 trace!("response for {:?} we did not send, dropping", avctp_header.label());
247 };
248 }
250 }
251 }
252 }
253
254 fn wake_any(&self) {
257 {
262 let lock = self.response_waiters.lock();
263 for (_, response_waiter) in lock.iter() {
264 if let ResponseListener::Some(ref waker) = response_waiter.listener {
265 waker.wake_by_ref();
266 return;
267 }
268 }
269 }
270 {
271 let lock = self.incoming_requests.lock();
272 if let CommandListener::Some(ref waker) = lock.listener {
273 waker.wake_by_ref();
274 return;
275 }
276 }
277 }
278
279 pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> {
280 let mut rbuf = vec![0 as u8; resp_header.encoded_len()];
281 resp_header.encode(&mut rbuf)?;
282 if body.len() > 0 {
283 rbuf.extend_from_slice(body);
284 }
285 let _ = self.channel.lock().write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?;
286 Ok(())
287 }
288}
289
290#[derive(Debug)]
292pub struct CommandStream {
293 inner: Arc<PeerInner>,
294 terminated: bool,
295}
296
297impl Unpin for CommandStream {}
298
299impl FusedStream for CommandStream {
300 fn is_terminated(&self) -> bool {
301 self.terminated
302 }
303}
304
305impl Stream for CommandStream {
306 type Item = Result<Command>;
307
308 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309 if self.terminated {
310 return Poll::Ready(None);
311 }
312
313 match ready!(self.inner.poll_recv_request(cx)) {
314 Ok(Packet { header, body, .. }) => Poll::Ready(Some(Ok(Command {
315 peer: self.inner.clone(),
316 avctp_header: header,
317 body,
318 }))),
319 Err(e) if e == Error::PeerDisconnected => {
320 self.terminated = true;
321 Poll::Ready(None)
322 }
323 Err(e) => Poll::Ready(Some(Err(e))),
324 }
325 }
326}
327
328impl Drop for CommandStream {
329 fn drop(&mut self) {
330 self.inner.incoming_requests.lock().listener = CommandListener::None;
331 self.inner.wake_any();
332 }
333}
334
335pub struct Command {
336 peer: Arc<PeerInner>,
337 avctp_header: Header,
338 body: Vec<u8>,
339}
340
341impl std::fmt::Debug for Command {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 f.debug_struct("Command")
344 .field("avctp_header", &self.avctp_header)
345 .field("body_len", &self.body.len())
346 .finish()
347 }
348}
349
350impl Command {
351 pub fn header(&self) -> &Header {
352 &self.avctp_header
353 }
354
355 pub fn body(&self) -> &[u8] {
356 &self.body[..]
357 }
358
359 pub fn send_response(&self, body: &[u8]) -> Result<()> {
360 let response_header = self.avctp_header.create_response(PacketType::Single);
361 self.peer.send_packet(&response_header, body)
362 }
363}
364
365#[derive(Debug)]
366pub struct Packet {
367 header: Header,
368 body: Vec<u8>,
369}
370
371impl Packet {
372 pub fn header(&self) -> &Header {
373 &self.header
374 }
375
376 pub fn body(&self) -> &[u8] {
377 &self.body[..]
378 }
379}
380
381#[derive(Debug, Default)]
382struct CommandQueue {
383 listener: CommandListener,
384 queue: VecDeque<Packet>,
385}
386
387#[derive(Debug)]
388enum CommandListener {
389 None,
391 New,
393 Some(Waker),
395}
396
397impl Default for CommandListener {
398 fn default() -> Self {
399 CommandListener::None
400 }
401}
402
403#[derive(Debug, Default)]
404struct ResponseWaiter {
405 listener: ResponseListener,
406 queue: VecDeque<Packet>,
407}
408
409#[derive(Debug)]
411enum ResponseListener {
412 New,
414 Some(Waker),
416}
417
418impl Default for ResponseListener {
419 fn default() -> Self {
420 ResponseListener::New
421 }
422}
423
424impl ResponseWaiter {
425 fn has_response(&self) -> bool {
427 !self.queue.is_empty()
428 }
429
430 fn pop_received(&mut self) -> Packet {
431 if !self.has_response() {
432 panic!("expected received buf");
433 }
434 self.queue.pop_front().expect("response listener packet queue is unexpectedly empty")
435 }
436}
437
438#[derive(Debug)]
442pub struct CommandResponseStream {
443 id: Option<TxLabel>,
444 inner: Arc<PeerInner>,
445 done: bool,
446}
447
448impl CommandResponseStream {
449 fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream {
450 CommandResponseStream { id: Some(id), inner, done: false }
451 }
452
453 pub fn complete(&mut self) {
454 if let Some(id) = &self.id {
455 self.inner.remove_response_interest(id);
456 self.id = None;
457 self.done = true;
458 self.inner.wake_any();
459 }
460 }
461}
462
463impl Unpin for CommandResponseStream {}
464
465impl FusedStream for CommandResponseStream {
466 fn is_terminated(&self) -> bool {
467 self.done == true
468 }
469}
470
471impl Stream for CommandResponseStream {
472 type Item = Result<Packet>;
473 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
474 let this = &mut *self;
475 if this.is_terminated() {
476 return Poll::Ready(None);
477 }
478 let Some(id) = &this.id else {
479 this.done = true;
480 return Poll::Ready(None);
481 };
482
483 match ready!(this.inner.poll_recv_response(id, cx)) {
484 Ok(packet) => {
485 trace!("received response packet {:?}", packet);
486 if packet.header().is_invalid_profile_id() {
487 Poll::Ready(Some(Err(Error::InvalidProfileId)))
488 } else {
489 Poll::Ready(Some(Ok(packet)))
490 }
491 }
492 Err(e) if e == Error::PeerDisconnected => {
493 this.done = true;
494 Poll::Ready(None)
495 }
496 Err(e) => Poll::Ready(Some(Err(e))),
497 }
498 }
499}
500
501impl Drop for CommandResponseStream {
502 fn drop(&mut self) {
503 self.complete();
504 }
505}