1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::LoggingFixture;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::join;
use futures::lock::Mutex;
use futures::prelude::*;

fn reverse<T>(value: (T, T)) -> (T, T) {
    (value.1, value.0)
}

#[async_trait]
pub trait Fixture: LoggingFixture {
    async fn create_handles(&self, opt: fidl::SocketOpts) -> (fidl::Socket, fidl::Socket);
}

#[derive(Clone, Copy, PartialEq)]
enum AfterSend {
    RemainOpen,
    CloseSender,
}

async fn send_bytes(
    fixture: &Mutex<impl Fixture + 'static>,
    sockets: (fidl::Socket, fidl::Socket),
    out: &'static [u8],
    after_send: AfterSend,
) {
    let mut tx = fidl::AsyncSocket::from_socket(sockets.0);
    let mut rx = fidl::AsyncSocket::from_socket(sockets.1);
    fixture.lock().await.log(&format!("#    send bytes from {:?} to {:?}: {:?}", tx, rx, out));
    let expect = out.to_vec();
    let (tx_done, rx_done) = oneshot::channel();
    join(
        async move {
            tx.write_all(out).await.unwrap();
            match after_send {
                AfterSend::RemainOpen => {
                    fixture.lock().await.log(&format!("#    waiting for done"));
                    rx_done.await.unwrap()
                }
                AfterSend::CloseSender => drop(tx),
            }
        },
        async move {
            let mut in_bytes = Vec::new();
            let mut buf = [0u8; 1];
            while in_bytes.len() != out.len() {
                rx.read_exact(&mut buf).await.unwrap();
                in_bytes.push(buf[0]);
            }
            assert_eq!(in_bytes, expect);
            let _ = tx_done.send(());
        },
    )
    .await;
}

pub async fn run(fixture: impl Fixture + 'static) {
    let fixture = &Mutex::new(fixture);
    fixture.lock().await.log("# send bytes a->b remaining open");
    let sockets = fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await;
    send_bytes(&fixture, sockets, &[1, 2, 3], AfterSend::RemainOpen).await;
    fixture.lock().await.log("# send bytes b->a remaining open");
    let sockets = reverse(fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await);
    send_bytes(&fixture, sockets, &[7, 8, 9], AfterSend::RemainOpen).await;
    fixture.lock().await.log("# send bytes a->b then close");
    let sockets = fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await;
    send_bytes(&fixture, sockets, &[1, 2, 3], AfterSend::CloseSender).await;
    fixture.lock().await.log("# send bytes b->a then close");
    let sockets = reverse(fixture.lock().await.create_handles(fidl::SocketOpts::STREAM).await);
    send_bytes(&fixture, sockets, &[7, 8, 9], AfterSend::CloseSender).await;
}

#[cfg(test)]
mod test {
    use super::*;

    struct FidlFixture;

    #[async_trait]
    impl Fixture for FidlFixture {
        async fn create_handles(&self, opts: fidl::SocketOpts) -> (fidl::Socket, fidl::Socket) {
            match opts {
                fidl::SocketOpts::STREAM => fidl::Socket::create_stream(),
                fidl::SocketOpts::DATAGRAM => fidl::Socket::create_datagram(),

                #[cfg(target_os = "fuchsia")]
                _ => panic!("unsupported socket options"),
            }
        }
    }

    impl LoggingFixture for FidlFixture {
        fn log(&mut self, msg: &str) {
            println!("{}", msg);
        }
    }

    #[fuchsia_async::run_singlethreaded(test)]
    async fn tests() {
        run(FidlFixture).await
    }
}