1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use anyhow::{Context, Error};
use fidl_fuchsia_overnet::{MeshControllerProxy, ServiceConsumerProxy, ServicePublisherProxy};
use once_cell::sync::OnceCell;
mod fuchsia;
mod not_fuchsia;
#[cfg(target_os = "fuchsia")]
pub use crate::fuchsia::*;
#[cfg(not(target_os = "fuchsia"))]
pub use not_fuchsia::*;
#[cfg(target_os = "fuchsia")]
pub mod logger {
pub fn init() -> Result<(), anyhow::Error> {
diagnostics_log::init!(&["overnet_hoist"]);
Ok(())
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum Cso {
Enabled,
Disabled,
}
pub trait OvernetInstance: std::fmt::Debug + Sync + Send {
fn connect_as_service_consumer(&self) -> Result<ServiceConsumerProxy, Error>;
fn connect_as_service_publisher(&self) -> Result<ServicePublisherProxy, Error>;
fn connect_as_mesh_controller(&self) -> Result<MeshControllerProxy, Error>;
fn publish_service(
&self,
service_name: &str,
provider: fidl::endpoints::ClientEnd<fidl_fuchsia_overnet::ServiceProviderMarker>,
) -> Result<(), anyhow::Error> {
self.connect_as_service_publisher()?.publish_service(service_name, provider)?;
Ok(())
}
}
static HOIST: OnceCell<Hoist> = OnceCell::new();
pub fn hoist() -> &'static Hoist {
if cfg!(target_os = "fuchsia") {
HOIST.get_or_init(|| Hoist::new().unwrap())
} else {
HOIST.get().expect("Tried to get overnet hoist before it was initialized")
}
}
pub fn init_hoist() -> Result<&'static Hoist, Error> {
let hoist = Hoist::new()?;
init_hoist_with(hoist)
}
pub fn init_hoist_with(hoist: Hoist) -> Result<&'static Hoist, Error> {
HOIST
.set(hoist.clone())
.map_err(|_| anyhow::anyhow!("Tried to set global hoist more than once"))?;
HOIST.get().context("Failed to retrieve the hoist we created back from the cell we put it in")
}
#[cfg(test)]
mod test {
use super::*;
use ::fuchsia as fuchsia_lib;
use anyhow::Error;
use fuchsia_async::{Task, TimeoutExt};
use futures::channel::oneshot;
use futures::future::{select, try_join, Either};
use futures::prelude::*;
use std::time::Duration;
async fn loop_on_list_peers_until_it_fails(
service_consumer: &impl fidl_fuchsia_overnet::ServiceConsumerProxyInterface,
) -> Result<(), Error> {
loop {
service_consumer.list_peers().await?;
}
}
#[fuchsia_lib::test]
async fn one_bad_channel_doesnt_take_everything_down() {
let hoist = Hoist::new().unwrap();
let (tx_complete, mut rx_complete) = oneshot::channel();
let (tx_complete_ack, rx_complete_ack) = oneshot::channel();
let service_consumer1 = hoist.connect_as_service_consumer().unwrap();
let _bg = Task::spawn(async move {
loop {
match select(service_consumer1.list_peers().boxed(), &mut rx_complete).await {
Either::Left((r, _)) => {
r.expect("list_peers on channel fulfilling contract should not fail");
}
Either::Right(_) => {
tx_complete_ack.send(()).unwrap();
return;
}
}
}
});
let service_consumer2 = hoist.connect_as_service_consumer().unwrap();
try_join(
loop_on_list_peers_until_it_fails(&service_consumer2),
loop_on_list_peers_until_it_fails(&service_consumer2),
)
.await
.expect_err("Multiple list_peers against the same channel should fail");
tx_complete.send(()).unwrap();
rx_complete_ack.await.unwrap();
}
#[fuchsia_lib::test]
async fn one_bad_link_doesnt_take_the_rest_down() {
let hoist = Hoist::new().unwrap();
let mesh_controller = &hoist.connect_as_mesh_controller().unwrap();
let (s1a, s1b) = fidl::Socket::create_stream();
let (s2a, s2b) = fidl::Socket::create_stream();
mesh_controller.attach_socket_link(s1a).unwrap();
mesh_controller.attach_socket_link(s2a).unwrap();
let mut s1b = fidl::AsyncSocket::from_socket(s1b).unwrap();
drop(s2b);
let mut buf = [0u8; 10];
async move {
loop {
match s1b.read(&mut buf).await {
Ok(0) => panic!("Should not see s1b closed"),
Ok(_) => (),
Err(e) => panic!("Should not see an error on s1b: {:?}", e),
}
}
}
.on_timeout(Duration::from_secs(2), || ())
.await
}
}