sl4f_lib/webdriver/
facade.rsuse crate::webdriver::types::{EnableDevToolsResult, GetDevToolsPortsResult};
use anyhow::{format_err, Error};
use fidl::endpoints::{create_request_stream, ServerEnd};
use fidl_fuchsia_web::{
DevToolsListenerMarker, DevToolsListenerRequest, DevToolsListenerRequestStream,
DevToolsPerContextListenerMarker, DevToolsPerContextListenerRequest,
DevToolsPerContextListenerRequestStream,
};
use fuchsia_async as fasync;
use fuchsia_sync::Mutex;
use futures::channel::mpsc;
use futures::prelude::*;
use std::collections::HashSet;
use std::ops::DerefMut;
use tracing::*;
#[derive(Debug)]
pub struct WebdriverFacade {
internal: Mutex<Option<WebdriverFacadeInternal>>,
}
impl WebdriverFacade {
pub fn new() -> WebdriverFacade {
WebdriverFacade { internal: Mutex::new(None) }
}
pub async fn enable_dev_tools(&self) -> Result<EnableDevToolsResult, Error> {
let mut internal = self.internal.lock();
if internal.is_none() {
let initialized_internal = WebdriverFacadeInternal::new().await?;
internal.replace(initialized_internal);
Ok(EnableDevToolsResult::Success)
} else {
Err(format_err!("DevTools already enabled."))
}
}
pub async fn get_dev_tools_ports(&self) -> Result<GetDevToolsPortsResult, Error> {
let mut internal = self.internal.lock();
match internal.deref_mut() {
Some(facade) => Ok(GetDevToolsPortsResult::new(facade.get_ports())),
None => Err(format_err!("DevTools are not enabled.")),
}
}
}
#[derive(Debug)]
struct WebdriverFacadeInternal {
dev_tools_ports: HashSet<u16>,
port_update_receiver: mpsc::UnboundedReceiver<PortUpdateMessage>,
}
impl WebdriverFacadeInternal {
pub async fn new() -> Result<WebdriverFacadeInternal, Error> {
let port_update_receiver = Self::get_port_event_receiver().await?;
Ok(WebdriverFacadeInternal { dev_tools_ports: HashSet::new(), port_update_receiver })
}
pub fn get_ports(&mut self) -> Vec<u16> {
self.update_port_set();
Vec::from_iter(self.dev_tools_ports.iter().cloned())
}
fn update_port_set(&mut self) {
while let Ok(Some(update)) = self.port_update_receiver.try_next() {
match update {
PortUpdateMessage::PortOpened(port) => self.dev_tools_ports.insert(port),
PortUpdateMessage::PortClosed(port) => self.dev_tools_ports.remove(&port),
};
}
}
async fn get_port_event_receiver() -> Result<mpsc::UnboundedReceiver<PortUpdateMessage>, Error>
{
let (port_update_sender, port_update_receiver) = mpsc::unbounded();
let debug = Self::spawn_dev_tools_listener_at_path(
"/svc/fuchsia.web.Debug",
port_update_sender.clone(),
);
let debug_context_provider = Self::spawn_dev_tools_listener_at_path(
"/svc/fuchsia.web.Debug-context_provider",
port_update_sender,
);
let debug_result = debug.await;
let debug_context_provider_result = debug_context_provider.await;
debug_result.or(debug_context_provider_result)?;
Ok(port_update_receiver)
}
async fn spawn_dev_tools_listener_at_path(
protocol_path: &str,
port_update_sender: mpsc::UnboundedSender<PortUpdateMessage>,
) -> Result<(), Error> {
let debug_proxy = fuchsia_component::client::connect_to_protocol_at_path::<
fidl_fuchsia_web::DebugMarker,
>(protocol_path)?;
let (dev_tools_client, dev_tools_stream) =
create_request_stream::<DevToolsListenerMarker>();
debug_proxy.enable_dev_tools(dev_tools_client).await?;
fasync::Task::spawn(async move {
let dev_tools_listener = DevToolsListener::new(port_update_sender);
dev_tools_listener
.handle_requests_from_stream(dev_tools_stream)
.await
.unwrap_or_else(|_| print!("Error handling DevToolsListener channel!"));
})
.detach();
Ok(())
}
}
#[derive(Debug)]
enum PortUpdateMessage {
PortOpened(u16),
PortClosed(u16),
}
struct DevToolsListener {
port_update_sender: mpsc::UnboundedSender<PortUpdateMessage>,
}
impl DevToolsListener {
fn new(port_update_sender: mpsc::UnboundedSender<PortUpdateMessage>) -> Self {
DevToolsListener { port_update_sender }
}
pub async fn handle_requests_from_stream(
&self,
mut stream: DevToolsListenerRequestStream,
) -> Result<(), Error> {
while let Some(request) = stream.try_next().await? {
let DevToolsListenerRequest::OnContextDevToolsAvailable { listener, .. } = request;
self.on_context_created(listener)?;
}
Ok(())
}
fn on_context_created(
&self,
listener: ServerEnd<DevToolsPerContextListenerMarker>,
) -> Result<(), Error> {
info!("Chrome context created");
let listener_request_stream = listener.into_stream();
let port_update_sender = mpsc::UnboundedSender::clone(&self.port_update_sender);
fasync::Task::spawn(async move {
let mut per_context_listener = DevToolsPerContextListener::new(port_update_sender);
per_context_listener
.handle_requests_from_stream(listener_request_stream)
.await
.unwrap_or_else(|_| warn!("Error handling DevToolsListener channel!"));
})
.detach();
Ok(())
}
}
struct DevToolsPerContextListener {
port_update_sender: mpsc::UnboundedSender<PortUpdateMessage>,
}
impl DevToolsPerContextListener {
fn new(port_update_sender: mpsc::UnboundedSender<PortUpdateMessage>) -> Self {
DevToolsPerContextListener { port_update_sender }
}
pub async fn handle_requests_from_stream(
&mut self,
mut stream: DevToolsPerContextListenerRequestStream,
) -> Result<(), Error> {
let mut context_port = None;
while let Ok(Some(request)) = stream.try_next().await {
let DevToolsPerContextListenerRequest::OnHttpPortOpen { port, .. } = request;
context_port.replace(port);
self.on_port_open(port)?;
}
if let Some(port) = context_port {
self.on_port_closed(port)?;
}
Ok(())
}
fn on_port_open(&mut self, port: u16) -> Result<(), Error> {
info!("DevTools port {:?} opened", port);
self.port_update_sender
.unbounded_send(PortUpdateMessage::PortOpened(port))
.map_err(|_| format_err!("Error sending port open message"))
}
fn on_port_closed(&mut self, port: u16) -> Result<(), Error> {
info!("DevTools port {:?} closed", port);
self.port_update_sender
.unbounded_send(PortUpdateMessage::PortClosed(port))
.map_err(|_| format_err!("Error sending port closed message"))
}
}