fidl_handle_tests/
socket.rsuse crate::LoggingFixture;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::join;
use futures::lock::Mutex;
use futures::prelude::*;
fn reverse<T>(value: (T, T)) -> (T, T) {
(value.1, value.0)
}
#[async_trait]
pub trait Fixture: LoggingFixture {
async fn create_handles(&self, opt: fidl::SocketOpts) -> (fidl::Socket, fidl::Socket);
}
#[derive(Clone, Copy, PartialEq)]
enum AfterSend {
RemainOpen,
CloseSender,
}
async fn send_bytes(
fixture: &Mutex<impl Fixture + 'static>,
sockets: (fidl::Socket, fidl::Socket),
out: &'static [u8],
after_send: AfterSend,
) {
let mut tx = fidl::AsyncSocket::from_socket(sockets.0);
let mut rx = fidl::AsyncSocket::from_socket(sockets.1);
fixture.lock().await.log(&format!("# send bytes from {:?} to {:?}: {:?}", tx, rx, out));
let expect = out.to_vec();
let (tx_done, rx_done) = oneshot::channel();
join(
async move {
tx.write_all(out).await.unwrap();
match after_send {
AfterSend::RemainOpen => {
fixture.lock().await.log(&format!("# waiting for done"));
rx_done.await.unwrap()
}
AfterSend::CloseSender => drop(tx),
}
},
async move {
let mut in_bytes = Vec::new();
let mut buf = [0u8; 1];
while in_bytes.len() != out.len() {
rx.read_exact(&mut buf).await.unwrap();
in_bytes.push(buf[0]);
}
assert_eq!(in_bytes, expect);
let _ = tx_done.send(());
},
)
.await;
}
pub async fn run(fixture: impl Fixture + 'static) {
let fixture = &Mutex::new(fixture);
fixture.lock().await.log("# send bytes a->b remaining open");
let sockets = fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await;
send_bytes(&fixture, sockets, &[1, 2, 3], AfterSend::RemainOpen).await;
fixture.lock().await.log("# send bytes b->a remaining open");
let sockets = reverse(fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await);
send_bytes(&fixture, sockets, &[7, 8, 9], AfterSend::RemainOpen).await;
fixture.lock().await.log("# send bytes a->b then close");
let sockets = fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await;
send_bytes(&fixture, sockets, &[1, 2, 3], AfterSend::CloseSender).await;
fixture.lock().await.log("# send bytes b->a then close");
let sockets = reverse(fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await);
send_bytes(&fixture, sockets, &[7, 8, 9], AfterSend::CloseSender).await;
}
#[cfg(test)]
mod test {
use super::*;
struct FidlFixture;
#[async_trait]
impl Fixture for FidlFixture {
async fn create_handles(&self, opts: fidl::SocketOpts) -> (fidl::Socket, fidl::Socket) {
match opts {
fidl::SocketOpts::STREAM => fidl::Socket::create_stream(),
fidl::SocketOpts::DATAGRAM => fidl::Socket::create_datagram(),
#[cfg(target_os = "fuchsia")]
_ => panic!("unsupported socket options"),
}
}
}
impl LoggingFixture for FidlFixture {
fn log(&mut self, msg: &str) {
println!("{}", msg);
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn tests() {
run(FidlFixture).await
}
}