overnet_core/proxy/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod handle;
6mod run;
7mod stream;
8
9use self::handle::{ProxyableHandle, ReadValue};
10use self::stream::StreamWriter;
11use crate::labels::{NodeId, TransferKey};
12use crate::peer::FramedStreamWriter;
13use crate::router::Router;
14use anyhow::{format_err, Error};
15use fidl_fuchsia_overnet_protocol::{
16    SignalUpdate, StreamId, StreamRef, TransferInitiator, TransferWaiter,
17};
18use futures::prelude::*;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::task::{Context, Poll};
22use zx_status;
23
24pub(crate) use self::handle::{IntoProxied, Proxyable, ProxyableRW};
25pub(crate) use self::run::spawn::{recv as spawn_recv, send as spawn_send};
26
27pub use self::run::set_proxy_drop_event_handler;
28
29#[cfg(not(target_os = "fuchsia"))]
30use fuchsia_async::emulated_handle::ChannelProxyProtocol;
31
32#[derive(Debug)]
33pub(crate) enum RemoveFromProxyTable {
34    InitiateTransfer {
35        paired_handle: fidl::Handle,
36        drain_stream: FramedStreamWriter,
37        stream_ref_sender: StreamRefSender,
38    },
39    Dropped,
40}
41
42impl RemoveFromProxyTable {
43    pub(crate) fn is_dropped(&self) -> bool {
44        match self {
45            RemoveFromProxyTable::Dropped => true,
46            _ => false,
47        }
48    }
49}
50
51pub(crate) struct ProxyTransferInitiationReceiver(
52    Pin<Box<dyn Send + Future<Output = Result<RemoveFromProxyTable, Error>>>>,
53);
54
55impl Future for ProxyTransferInitiationReceiver {
56    type Output = Result<RemoveFromProxyTable, Error>;
57    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
58        self.0.as_mut().poll(ctx)
59    }
60}
61
62impl std::fmt::Debug for ProxyTransferInitiationReceiver {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        "_".fmt(f)
65    }
66}
67
68impl ProxyTransferInitiationReceiver {
69    pub(crate) fn new(
70        f: impl 'static + Send + Future<Output = Result<RemoveFromProxyTable, Error>>,
71    ) -> Self {
72        Self(Box::pin(f))
73    }
74}
75
76pub(crate) struct StreamRefSender {
77    chan: futures::channel::oneshot::Sender<StreamRef>,
78}
79
80impl std::fmt::Debug for StreamRefSender {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        "StreamRefSender".fmt(f)
83    }
84}
85
86impl StreamRefSender {
87    pub(crate) fn new() -> (Self, futures::channel::oneshot::Receiver<StreamRef>) {
88        let (tx, rx) = futures::channel::oneshot::channel();
89        (Self { chan: tx }, rx)
90    }
91
92    fn send(self, stream_ref: StreamRef) -> Result<(), Error> {
93        Ok(self.chan.send(stream_ref).map_err(|_| format_err!("Failed sending StreamRef"))?)
94    }
95
96    pub(crate) fn draining_initiate(
97        self,
98        stream_id: u64,
99        new_destination_node: NodeId,
100        transfer_key: TransferKey,
101    ) -> Result<(), Error> {
102        Ok(self.send(StreamRef::TransferInitiator(TransferInitiator {
103            stream_id: StreamId { id: stream_id },
104            new_destination_node: new_destination_node.into(),
105            transfer_key,
106        }))?)
107    }
108
109    pub(crate) fn draining_await(
110        self,
111        stream_id: u64,
112        transfer_key: TransferKey,
113    ) -> Result<(), Error> {
114        Ok(self.send(StreamRef::TransferWaiter(TransferWaiter {
115            stream_id: StreamId { id: stream_id },
116            transfer_key,
117        }))?)
118    }
119}
120
121mod proxy_count {
122    use std::sync::Mutex;
123
124    pub struct ProxyCount {
125        count: usize,
126        high_water: usize,
127        increment: usize,
128    }
129
130    pub static PROXY_COUNT: Mutex<ProxyCount> =
131        Mutex::new(ProxyCount { count: 0, high_water: 0, increment: 100 });
132
133    impl ProxyCount {
134        pub fn increment(&mut self) {
135            self.count += 1;
136
137            if self.count == self.high_water + self.increment {
138                self.high_water += self.increment;
139                if self.count == self.increment * 10 {
140                    self.increment *= 10;
141                }
142                log::info!("{} proxies extant or never reaped", self.count)
143            }
144        }
145
146        pub fn decrement(&mut self) {
147            if self.count == 0 {
148                log::warn!("proxy counter went below zero");
149            } else {
150                self.count -= 1;
151            }
152        }
153    }
154}
155
156#[derive(Debug)]
157pub(crate) struct Proxy<Hdl: Proxyable + 'static> {
158    hdl: Option<ProxyableHandle<Hdl>>,
159}
160
161impl<Hdl: 'static + Proxyable> Drop for Proxy<Hdl> {
162    fn drop(&mut self) {
163        proxy_count::PROXY_COUNT.lock().unwrap().decrement();
164    }
165}
166
167impl<Hdl: 'static + Proxyable> Proxy<Hdl> {
168    fn new(hdl: Hdl, router: Weak<Router>) -> Arc<Self> {
169        proxy_count::PROXY_COUNT.lock().unwrap().increment();
170        Arc::new(Self { hdl: Some(ProxyableHandle::new(hdl, router)) })
171    }
172
173    fn close_with_reason(mut self, msg: String) {
174        if let Some(hdl) = self.hdl.take() {
175            hdl.close_with_reason(msg);
176        }
177    }
178
179    fn hdl(&self) -> &ProxyableHandle<Hdl> {
180        self.hdl.as_ref().unwrap()
181    }
182
183    #[cfg(not(target_os = "fuchsia"))]
184    fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
185        if let Some(hdl) = self.hdl.as_ref() {
186            hdl.set_channel_proxy_protocol(proto)
187        }
188    }
189
190    async fn write_to_handle(&self, msg: &mut Hdl::Message) -> Result<(), zx_status::Status>
191    where
192        Hdl: for<'a> ProxyableRW<'a>,
193    {
194        self.hdl().write(msg).await
195    }
196
197    fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
198        self.hdl().apply_signal_update(signal_update)
199    }
200
201    fn read_from_handle<'a>(
202        &'a self,
203        msg: &'a mut Hdl::Message,
204    ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
205    where
206        Hdl: ProxyableRW<'a>,
207    {
208        self.hdl().read(msg)
209    }
210
211    async fn drain_handle_to_stream(
212        &self,
213        stream_writer: &mut StreamWriter<Hdl::Message>,
214    ) -> Result<(), Error>
215    where
216        Hdl: for<'a> ProxyableRW<'a>,
217    {
218        self.hdl().drain_to_stream(stream_writer).await
219    }
220}