1use fuchsia_bluetooth::types::Channel;
6use fuchsia_sync::Mutex;
7
8use futures::ready;
9use futures::stream::{FusedStream, Stream, StreamExt};
10use futures::task::{Context, Poll, Waker};
11use log::{info, trace, warn};
12use packet_encoding::{Decodable, Encodable};
13use slab::Slab;
14use std::collections::VecDeque;
15use std::mem;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(test)]
21mod tests;
22mod types;
23
24use crate::{Error, Result};
25
26use self::types::AV_REMOTE_PROFILE;
27
28pub use self::types::{Header, MessageType, PacketType, TxLabel};
29
30#[derive(Debug)]
34pub struct Peer {
35 inner: Arc<PeerInner>,
36}
37
38#[derive(Debug)]
39struct PeerInner {
40 channel: Mutex<Channel>,
42
43 channel_closed: AtomicBool,
44
45 response_waiters: Mutex<Slab<ResponseWaiter>>,
51
52 incoming_requests: Mutex<CommandQueue>,
56}
57
58impl Peer {
59 pub fn new(channel: Channel) -> Self {
61 Self { inner: Arc::new(PeerInner::new(channel)) }
62 }
63
64 pub fn take_command_stream(&self) -> CommandStream {
67 {
68 let mut lock = self.inner.incoming_requests.lock();
69 if let CommandListener::None = lock.listener {
70 lock.listener = CommandListener::New;
71 } else {
72 panic!("Command stream has already been taken");
73 }
74 }
75
76 CommandStream { inner: self.inner.clone(), terminated: false }
77 }
78
79 pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> {
82 let id = self.inner.add_response_waiter()?;
83 let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false);
84 {
85 self.inner.send_packet(&avctp_header, payload)?;
86 }
87
88 Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone()))
89 }
90}
91
92impl PeerInner {
93 fn new(channel: Channel) -> Self {
94 Self {
95 channel: Mutex::new(channel),
96 channel_closed: AtomicBool::new(false),
97 response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
98 incoming_requests: Mutex::<CommandQueue>::default(),
99 }
100 }
101
102 fn add_response_waiter(&self) -> Result<TxLabel> {
105 let key = self.response_waiters.lock().insert(ResponseWaiter::default());
106 let id = TxLabel::try_from(key as u8);
107 if id.is_err() {
108 warn!("Transaction IDs are exhausted");
109 let _ = self.response_waiters.lock().remove(key);
110 }
111 id
112 }
113
114 fn remove_response_interest(&self, id: &TxLabel) {
117 let mut lock = self.response_waiters.lock();
118 let idx = usize::from(id);
119 let _ = lock.remove(idx);
120 }
121
122 fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
127 let is_closed = self.recv_all(cx)?;
128
129 let mut lock = self.incoming_requests.lock();
130
131 match lock.queue.pop_front() {
132 Some(request) => Poll::Ready(Ok(request)),
133 _ => {
134 if is_closed {
135 Poll::Ready(Err(Error::PeerDisconnected))
136 } else {
137 lock.listener = CommandListener::Some(cx.waker().clone());
139 Poll::Pending
140 }
141 }
142 }
143 }
144
145 fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> {
150 let is_closed = self.recv_all(cx)?;
151
152 let mut waiters = self.response_waiters.lock();
153 let idx = usize::from(label);
154 let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter");
157 if waiter.has_response() {
158 let packet = waiter.pop_received();
160 Poll::Ready(Ok(packet))
161 } else {
162 if is_closed {
163 Poll::Ready(Err(Error::PeerDisconnected))
164 } else {
165 waiter.listener = ResponseListener::Some(cx.waker().clone());
167 Poll::Pending
168 }
169 }
170 }
171
172 fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> {
176 if self.channel_closed.load(Ordering::SeqCst) {
177 return Ok(true);
178 }
179 loop {
180 let mut buf = {
181 let mut channel = self.channel.lock();
182 match channel.poll_next_unpin(cx) {
183 Poll::Ready(Some(Ok(packet))) => packet,
184 Poll::Ready(Some(Err(zx::Status::PEER_CLOSED))) | Poll::Ready(None) => {
185 trace!("Peer closed");
186 self.channel_closed.store(true, Ordering::SeqCst);
187 return Ok(true);
188 }
189 Poll::Ready(Some(Err(e))) => return Err(Error::PeerRead(e)),
190 Poll::Pending => return Ok(false),
191 }
192 };
193
194 trace!("received packet {:?}", buf);
195 let packet_size = buf.len();
196 let avctp_header = match Header::decode(buf.as_slice()) {
200 Err(_) => {
201 info!("received unrejectable message");
204 continue;
205 }
206 Ok(x) => x,
207 };
208
209 if avctp_header.profile_id() != AV_REMOTE_PROFILE {
212 info!("received packet not targeted at remote profile service class");
213 let resp_avct = avctp_header.create_invalid_profile_id_response();
214 self.send_packet(&resp_avct, &[])?;
215 continue;
216 }
217
218 if packet_size == avctp_header.encoded_len() {
219 info!("received incomplete packet");
221 continue;
222 }
223
224 let body = buf.split_off(avctp_header.encoded_len());
225 match avctp_header.message_type() {
227 MessageType::Command => {
228 let mut lock = self.incoming_requests.lock();
229 lock.queue.push_back(Packet { header: avctp_header, body });
230 if let CommandListener::Some(ref waker) = lock.listener {
231 waker.wake_by_ref();
232 }
233 }
234 MessageType::Response => {
235 let mut waiters = self.response_waiters.lock();
237 let idx = usize::from(avctp_header.label());
238
239 if let Some(waiter) = waiters.get_mut(idx) {
240 waiter.queue.push_back(Packet { header: avctp_header, body });
241 let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New);
242 if let ResponseListener::Some(waker) = old_entry {
243 waker.wake();
244 }
245 } else {
246 trace!("response for {:?} we did not send, dropping", avctp_header.label());
247 };
248 }
250 }
251 }
252 }
253
254 fn wake_any(&self) {
257 {
262 let lock = self.response_waiters.lock();
263 for (_, response_waiter) in lock.iter() {
264 if let ResponseListener::Some(ref waker) = response_waiter.listener {
265 waker.wake_by_ref();
266 return;
267 }
268 }
269 }
270 {
271 let lock = self.incoming_requests.lock();
272 if let CommandListener::Some(ref waker) = lock.listener {
273 waker.wake_by_ref();
274 return;
275 }
276 }
277 }
278
279 pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> {
280 let mut rbuf = vec![0 as u8; resp_header.encoded_len()];
281 resp_header.encode(&mut rbuf)?;
282 if body.len() > 0 {
283 rbuf.extend_from_slice(body);
284 }
285 let _ = self.channel.lock().write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?;
286 Ok(())
287 }
288}
289
290#[derive(Debug)]
292pub struct CommandStream {
293 inner: Arc<PeerInner>,
294 terminated: bool,
295}
296
297impl Unpin for CommandStream {}
298
299impl FusedStream for CommandStream {
300 fn is_terminated(&self) -> bool {
301 self.terminated
302 }
303}
304
305impl Stream for CommandStream {
306 type Item = Result<Command>;
307
308 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309 if self.terminated {
310 return Poll::Ready(None);
311 }
312
313 match ready!(self.inner.poll_recv_request(cx)) {
314 Ok(Packet { header, body, .. }) => Poll::Ready(Some(Ok(Command {
315 peer: self.inner.clone(),
316 avctp_header: header,
317 body,
318 }))),
319 Err(e) if e == Error::PeerDisconnected => {
320 self.terminated = true;
321 Poll::Ready(None)
322 }
323 Err(e) => Poll::Ready(Some(Err(e))),
324 }
325 }
326}
327
328impl Drop for CommandStream {
329 fn drop(&mut self) {
330 self.inner.incoming_requests.lock().listener = CommandListener::None;
331 self.inner.wake_any();
332 }
333}
334
335#[derive(Debug)]
336pub struct Command {
337 peer: Arc<PeerInner>,
338 avctp_header: Header,
339 body: Vec<u8>,
340}
341
342impl Command {
343 pub fn header(&self) -> &Header {
344 &self.avctp_header
345 }
346
347 pub fn body(&self) -> &[u8] {
348 &self.body[..]
349 }
350
351 pub fn send_response(&self, body: &[u8]) -> Result<()> {
352 let response_header = self.avctp_header.create_response(PacketType::Single);
353 self.peer.send_packet(&response_header, body)
354 }
355}
356
357#[derive(Debug)]
358pub struct Packet {
359 header: Header,
360 body: Vec<u8>,
361}
362
363impl Packet {
364 pub fn header(&self) -> &Header {
365 &self.header
366 }
367
368 pub fn body(&self) -> &[u8] {
369 &self.body[..]
370 }
371}
372
373#[derive(Debug, Default)]
374struct CommandQueue {
375 listener: CommandListener,
376 queue: VecDeque<Packet>,
377}
378
379#[derive(Debug)]
380enum CommandListener {
381 None,
383 New,
385 Some(Waker),
387}
388
389impl Default for CommandListener {
390 fn default() -> Self {
391 CommandListener::None
392 }
393}
394
395#[derive(Debug, Default)]
396struct ResponseWaiter {
397 listener: ResponseListener,
398 queue: VecDeque<Packet>,
399}
400
401#[derive(Debug)]
403enum ResponseListener {
404 New,
406 Some(Waker),
408}
409
410impl Default for ResponseListener {
411 fn default() -> Self {
412 ResponseListener::New
413 }
414}
415
416impl ResponseWaiter {
417 fn has_response(&self) -> bool {
419 !self.queue.is_empty()
420 }
421
422 fn pop_received(&mut self) -> Packet {
423 if !self.has_response() {
424 panic!("expected received buf");
425 }
426 self.queue.pop_front().expect("response listener packet queue is unexpectedly empty")
427 }
428}
429
430#[derive(Debug)]
434pub struct CommandResponseStream {
435 id: Option<TxLabel>,
436 inner: Arc<PeerInner>,
437 done: bool,
438}
439
440impl CommandResponseStream {
441 fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream {
442 CommandResponseStream { id: Some(id), inner, done: false }
443 }
444
445 pub fn complete(&mut self) {
446 if let Some(id) = &self.id {
447 self.inner.remove_response_interest(id);
448 self.id = None;
449 self.done = true;
450 self.inner.wake_any();
451 }
452 }
453}
454
455impl Unpin for CommandResponseStream {}
456
457impl FusedStream for CommandResponseStream {
458 fn is_terminated(&self) -> bool {
459 self.done == true
460 }
461}
462
463impl Stream for CommandResponseStream {
464 type Item = Result<Packet>;
465 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
466 let this = &mut *self;
467 if this.is_terminated() {
468 return Poll::Ready(None);
469 }
470 let Some(id) = &this.id else {
471 this.done = true;
472 return Poll::Ready(None);
473 };
474
475 match ready!(this.inner.poll_recv_response(id, cx)) {
476 Ok(packet) => {
477 trace!("received response packet {:?}", packet);
478 if packet.header().is_invalid_profile_id() {
479 Poll::Ready(Some(Err(Error::InvalidProfileId)))
480 } else {
481 Poll::Ready(Some(Ok(packet)))
482 }
483 }
484 Err(e) if e == Error::PeerDisconnected => {
485 this.done = true;
486 Poll::Ready(None)
487 }
488 Err(e) => Poll::Ready(Some(Err(e))),
489 }
490 }
491}
492
493impl Drop for CommandResponseStream {
494 fn drop(&mut self) {
495 self.complete();
496 }
497}