use anyhow::{format_err, Context as _, Error};
use fidl::endpoints::create_request_stream;
use fidl_fuchsia_bluetooth_le::{
AdvertisedPeripheralMarker, AdvertisedPeripheralRequest, AdvertisedPeripheralRequestStream,
AdvertisingParameters, ConnectionProxy, PeripheralMarker, PeripheralProxy,
};
use fuchsia_bluetooth::types::PeerId;
use fuchsia_sync::RwLock;
use futures::{pin_mut, select, FutureExt, StreamExt};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use {fuchsia_async as fasync, fuchsia_component as app};
use crate::bluetooth::types::FacadeArg;
use crate::common_utils::common::macros::with_line;
#[derive(Debug)]
struct Connection {
_connection: ConnectionProxy,
_task: fasync::Task<()>,
}
#[derive(Debug)]
struct InnerBleAdvertiseFacade {
advertise_task: Option<fasync::Task<()>>,
connections: HashMap<PeerId, Connection>,
peripheral: Option<PeripheralProxy>,
}
#[derive(Debug)]
pub struct BleAdvertiseFacade {
inner: Arc<RwLock<InnerBleAdvertiseFacade>>,
}
impl BleAdvertiseFacade {
pub fn new() -> BleAdvertiseFacade {
BleAdvertiseFacade {
inner: Arc::new(RwLock::new(InnerBleAdvertiseFacade {
advertise_task: None,
connections: HashMap::new(),
peripheral: None,
})),
}
}
fn set_advertise_task(
inner: &Arc<RwLock<InnerBleAdvertiseFacade>>,
task: Option<fasync::Task<()>>,
) {
let tag = "BleAdvertiseFacade::set_advertise_task";
if task.is_some() {
info!(tag = &with_line!(tag), "Assigned new advertise task");
} else if inner.read().advertise_task.is_some() {
info!(tag = &with_line!(tag), "Cleared advertise task");
}
inner.write().advertise_task = task;
}
pub fn print(&self) {
let adv_status = match &self.inner.read().advertise_task {
Some(_) => "Valid",
None => "None",
};
info!(tag = &with_line!("BleAdvertiseFacade::print"),
%adv_status,
peripheral = ?self.get_peripheral_proxy(),
"BleAdvertiseFacade",
);
}
pub fn set_peripheral_proxy(&self) {
let tag = "BleAdvertiseFacade::set_peripheral_proxy";
let new_peripheral = match self.inner.read().peripheral.clone() {
Some(p) => {
warn!(tag = &with_line!(tag), current_peripheral = ?p);
Some(p)
}
None => {
let peripheral_svc: PeripheralProxy =
app::client::connect_to_protocol::<PeripheralMarker>()
.context("Failed to connect to BLE Peripheral service.")
.unwrap();
Some(peripheral_svc)
}
};
self.inner.write().peripheral = new_peripheral
}
pub async fn start_adv(&self, args: Value) -> Result<(), Error> {
self.set_peripheral_proxy();
let parameters: AdvertisingParameters = FacadeArg::new(args).try_into()?;
let periph = &self.inner.read().peripheral.clone();
match &periph {
Some(p) => {
BleAdvertiseFacade::set_advertise_task(&self.inner, None);
let advertise_task = fasync::Task::spawn(BleAdvertiseFacade::advertise(
self.inner.clone(),
p.clone(),
parameters,
));
info!(tag = "start_adv", "Started advertising");
BleAdvertiseFacade::set_advertise_task(&self.inner, Some(advertise_task));
Ok(())
}
None => {
error!(tag = "start_adv", "No peripheral created.");
return Err(format_err!("No peripheral proxy created."));
}
}
}
fn process_new_connection(
inner: Arc<RwLock<InnerBleAdvertiseFacade>>,
proxy: ConnectionProxy,
peer_id: PeerId,
) {
let tag = "BleAdvertiseFacade::process_new_connection";
let mut stream = proxy.take_event_stream();
let inner_clone = inner.clone();
let stream_fut = async move {
while let Some(event) = stream.next().await {
match event {
Ok(_) => debug!(tag = &with_line!(tag), "ignoring event for Connection"),
Err(err) => {
info!(tag = &with_line!(tag), "Connection ({}) error: {:?}", peer_id, err)
}
}
}
info!(tag = &with_line!(tag), "peer {} disconnected", peer_id);
inner_clone.write().connections.remove(&peer_id);
};
let event_task = fasync::Task::spawn(stream_fut);
inner
.write()
.connections
.insert(peer_id, Connection { _connection: proxy, _task: event_task });
}
async fn process_advertised_peripheral_stream(
inner: Arc<RwLock<InnerBleAdvertiseFacade>>,
mut stream: AdvertisedPeripheralRequestStream,
) {
let tag = "BleAdvertiseFacade::process_advertised_peripheral_stream";
while let Some(request) = stream.next().await {
match request {
Ok(AdvertisedPeripheralRequest::OnConnected { peer, connection, responder }) => {
if let Err(err) = responder.send() {
warn!(
tag = &with_line!(tag),
"error sending response to AdvertisedPeripheral::OnConnected: {}", err
);
}
let proxy = connection.into_proxy();
let peer_id: PeerId = peer.id.unwrap().into();
BleAdvertiseFacade::process_new_connection(inner.clone(), proxy, peer_id);
}
Err(err) => {
info!(tag = &with_line!(tag), "AdvertisedPeripheral error: {:?}", err);
}
}
}
info!(tag = &with_line!(tag), "AdvertisedPeripheral closed, stopping advertising");
BleAdvertiseFacade::set_advertise_task(&inner, None);
}
async fn advertise(
inner: Arc<RwLock<InnerBleAdvertiseFacade>>,
peripheral: PeripheralProxy,
parameters: AdvertisingParameters,
) {
let tag = "BleAdvertiseFacade::advertise";
let (client_end, server_request_stream) =
create_request_stream::<AdvertisedPeripheralMarker>();
let advertise_fut = peripheral.advertise(¶meters, client_end);
let server_fut = BleAdvertiseFacade::process_advertised_peripheral_stream(
inner.clone(),
server_request_stream,
);
let advertise_fut_fused = advertise_fut.fuse();
let server_fut_fused = server_fut.fuse();
pin_mut!(advertise_fut_fused, server_fut_fused);
select! {
result = advertise_fut_fused => {
info!(tag = &with_line!(tag), "advertise() returned with result {:?}", result);
}
_ = server_fut_fused => {
info!(tag = &with_line!(tag), "AdvertisedPeripheral closed");
}
};
inner.write().advertise_task.take();
}
pub fn stop_adv(&self) {
info!(tag = &with_line!("BleAdvertiseFacade::stop_adv"), "Stop advertising");
BleAdvertiseFacade::set_advertise_task(&self.inner, None);
}
pub fn get_peripheral_proxy(&self) -> Option<PeripheralProxy> {
self.inner.read().peripheral.clone()
}
pub fn cleanup_peripheral_proxy(&self) {
self.inner.write().peripheral = None;
}
pub fn cleanup(&self) {
self.inner.write().connections.clear();
self.stop_adv();
self.cleanup_peripheral_proxy();
}
}