overnet_core/proxy/
mod.rs
1mod handle;
6mod run;
7mod stream;
8
9use self::handle::{ProxyableHandle, ReadValue};
10use self::stream::StreamWriter;
11use crate::labels::{NodeId, TransferKey};
12use crate::peer::FramedStreamWriter;
13use crate::router::Router;
14use anyhow::{format_err, Error};
15use fidl_fuchsia_overnet_protocol::{
16 SignalUpdate, StreamId, StreamRef, TransferInitiator, TransferWaiter,
17};
18use futures::prelude::*;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::task::{Context, Poll};
22use zx_status;
23
24pub(crate) use self::handle::{IntoProxied, Proxyable, ProxyableRW};
25pub(crate) use self::run::spawn::{recv as spawn_recv, send as spawn_send};
26
27pub use self::run::set_proxy_drop_event_handler;
28
29#[cfg(not(target_os = "fuchsia"))]
30use fuchsia_async::emulated_handle::ChannelProxyProtocol;
31
32#[derive(Debug)]
33pub(crate) enum RemoveFromProxyTable {
34 InitiateTransfer {
35 paired_handle: fidl::Handle,
36 drain_stream: FramedStreamWriter,
37 stream_ref_sender: StreamRefSender,
38 },
39 Dropped,
40}
41
42impl RemoveFromProxyTable {
43 pub(crate) fn is_dropped(&self) -> bool {
44 match self {
45 RemoveFromProxyTable::Dropped => true,
46 _ => false,
47 }
48 }
49}
50
51pub(crate) struct ProxyTransferInitiationReceiver(
52 Pin<Box<dyn Send + Future<Output = Result<RemoveFromProxyTable, Error>>>>,
53);
54
55impl Future for ProxyTransferInitiationReceiver {
56 type Output = Result<RemoveFromProxyTable, Error>;
57 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
58 self.0.as_mut().poll(ctx)
59 }
60}
61
62impl std::fmt::Debug for ProxyTransferInitiationReceiver {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 "_".fmt(f)
65 }
66}
67
68impl ProxyTransferInitiationReceiver {
69 pub(crate) fn new(
70 f: impl 'static + Send + Future<Output = Result<RemoveFromProxyTable, Error>>,
71 ) -> Self {
72 Self(Box::pin(f))
73 }
74}
75
76pub(crate) struct StreamRefSender {
77 chan: futures::channel::oneshot::Sender<StreamRef>,
78}
79
80impl std::fmt::Debug for StreamRefSender {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 "StreamRefSender".fmt(f)
83 }
84}
85
86impl StreamRefSender {
87 pub(crate) fn new() -> (Self, futures::channel::oneshot::Receiver<StreamRef>) {
88 let (tx, rx) = futures::channel::oneshot::channel();
89 (Self { chan: tx }, rx)
90 }
91
92 fn send(self, stream_ref: StreamRef) -> Result<(), Error> {
93 Ok(self.chan.send(stream_ref).map_err(|_| format_err!("Failed sending StreamRef"))?)
94 }
95
96 pub(crate) fn draining_initiate(
97 self,
98 stream_id: u64,
99 new_destination_node: NodeId,
100 transfer_key: TransferKey,
101 ) -> Result<(), Error> {
102 Ok(self.send(StreamRef::TransferInitiator(TransferInitiator {
103 stream_id: StreamId { id: stream_id },
104 new_destination_node: new_destination_node.into(),
105 transfer_key,
106 }))?)
107 }
108
109 pub(crate) fn draining_await(
110 self,
111 stream_id: u64,
112 transfer_key: TransferKey,
113 ) -> Result<(), Error> {
114 Ok(self.send(StreamRef::TransferWaiter(TransferWaiter {
115 stream_id: StreamId { id: stream_id },
116 transfer_key,
117 }))?)
118 }
119}
120
121mod proxy_count {
122 use std::sync::Mutex;
123
124 pub struct ProxyCount {
125 count: usize,
126 high_water: usize,
127 increment: usize,
128 }
129
130 pub static PROXY_COUNT: Mutex<ProxyCount> =
131 Mutex::new(ProxyCount { count: 0, high_water: 0, increment: 100 });
132
133 impl ProxyCount {
134 pub fn increment(&mut self) {
135 self.count += 1;
136
137 if self.count == self.high_water + self.increment {
138 self.high_water += self.increment;
139 if self.count == self.increment * 10 {
140 self.increment *= 10;
141 }
142 log::info!("{} proxies extant or never reaped", self.count)
143 }
144 }
145
146 pub fn decrement(&mut self) {
147 if self.count == 0 {
148 log::warn!("proxy counter went below zero");
149 } else {
150 self.count -= 1;
151 }
152 }
153 }
154}
155
156#[derive(Debug)]
157pub(crate) struct Proxy<Hdl: Proxyable + 'static> {
158 hdl: Option<ProxyableHandle<Hdl>>,
159}
160
161impl<Hdl: 'static + Proxyable> Drop for Proxy<Hdl> {
162 fn drop(&mut self) {
163 proxy_count::PROXY_COUNT.lock().unwrap().decrement();
164 }
165}
166
167impl<Hdl: 'static + Proxyable> Proxy<Hdl> {
168 fn new(hdl: Hdl, router: Weak<Router>) -> Arc<Self> {
169 proxy_count::PROXY_COUNT.lock().unwrap().increment();
170 Arc::new(Self { hdl: Some(ProxyableHandle::new(hdl, router)) })
171 }
172
173 fn close_with_reason(mut self, msg: String) {
174 if let Some(hdl) = self.hdl.take() {
175 hdl.close_with_reason(msg);
176 }
177 }
178
179 fn hdl(&self) -> &ProxyableHandle<Hdl> {
180 self.hdl.as_ref().unwrap()
181 }
182
183 #[cfg(not(target_os = "fuchsia"))]
184 fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
185 if let Some(hdl) = self.hdl.as_ref() {
186 hdl.set_channel_proxy_protocol(proto)
187 }
188 }
189
190 async fn write_to_handle(&self, msg: &mut Hdl::Message) -> Result<(), zx_status::Status>
191 where
192 Hdl: for<'a> ProxyableRW<'a>,
193 {
194 self.hdl().write(msg).await
195 }
196
197 fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
198 self.hdl().apply_signal_update(signal_update)
199 }
200
201 fn read_from_handle<'a>(
202 &'a self,
203 msg: &'a mut Hdl::Message,
204 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
205 where
206 Hdl: ProxyableRW<'a>,
207 {
208 self.hdl().read(msg)
209 }
210
211 async fn drain_handle_to_stream(
212 &self,
213 stream_writer: &mut StreamWriter<Hdl::Message>,
214 ) -> Result<(), Error>
215 where
216 Hdl: for<'a> ProxyableRW<'a>,
217 {
218 self.hdl().drain_to_stream(stream_writer).await
219 }
220}