1use crate::ProtocolsExt;
6use crate::execution_scope::ExecutionScope;
7use crate::node::{self, Node};
8use fidl::endpoints::{ControlHandle, ProtocolMarker, RequestStream, ServerEnd};
9use fidl::epitaph::ChannelEpitaphExt;
10use futures::FutureExt;
11use std::future::Future;
12use std::sync::Arc;
13use zx_status::Status;
14use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
15
16#[derive(Debug)]
20pub struct ObjectRequest {
21 object_request: fidl::Channel,
23
24 what_to_send: ObjectRequestSend,
26
27 attributes: fio::NodeAttributesQuery,
29
30 create_attributes: Option<Box<fio::MutableNodeAttributes>>,
32
33 pub truncate: bool,
35}
36
37impl ObjectRequest {
38 pub(crate) fn new_deprecated(
39 object_request: fidl::Channel,
40 what_to_send: ObjectRequestSend,
41 attributes: fio::NodeAttributesQuery,
42 create_attributes: Option<&fio::MutableNodeAttributes>,
43 truncate: bool,
44 ) -> Self {
45 assert!(!object_request.as_handle_ref().is_invalid());
46 let create_attributes = create_attributes.map(|a| Box::new(a.clone()));
47 Self { object_request, what_to_send, attributes, create_attributes, truncate }
48 }
49
50 pub fn new(flags: fio::Flags, options: &fio::Options, object_request: fidl::Channel) -> Self {
52 Self::new_deprecated(
53 object_request,
54 if flags.contains(fio::Flags::FLAG_SEND_REPRESENTATION) {
55 ObjectRequestSend::OnRepresentation
56 } else {
57 ObjectRequestSend::Nothing
58 },
59 options.attributes.unwrap_or(fio::NodeAttributesQuery::empty()),
60 options.create_attributes.as_ref(),
61 flags.is_truncate(),
62 )
63 }
64
65 pub(crate) fn what_to_send(&self) -> ObjectRequestSend {
66 self.what_to_send
67 }
68
69 pub fn attributes(&self) -> fio::NodeAttributesQuery {
70 self.attributes
71 }
72
73 pub fn create_attributes(&self) -> Option<&fio::MutableNodeAttributes> {
74 self.create_attributes.as_deref()
75 }
76
77 pub fn options(&self) -> fio::Options {
78 fio::Options {
79 attributes: (!self.attributes.is_empty()).then_some(self.attributes),
80 create_attributes: self
81 .create_attributes
82 .as_ref()
83 .map(|a| fio::MutableNodeAttributes::clone(&a)),
84 ..Default::default()
85 }
86 }
87
88 pub async fn into_request_stream<T: Representation>(
90 self,
91 connection: &T,
92 ) -> Result<<T::Protocol as ProtocolMarker>::RequestStream, Status> {
93 let stream = fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(
94 self.object_request,
95 ));
96 match self.what_to_send {
97 ObjectRequestSend::OnOpen => {
98 let control_handle = stream.control_handle();
99 let node_info = connection.node_info().await.map_err(|s| {
100 control_handle.shutdown_with_epitaph(s);
101 s
102 })?;
103 send_on_open(&stream.control_handle(), node_info)?;
104 }
105 ObjectRequestSend::OnRepresentation => {
106 let control_handle = stream.control_handle();
107 let representation =
108 connection.get_representation(self.attributes).await.map_err(|s| {
109 control_handle.shutdown_with_epitaph(s);
110 s
111 })?;
112 control_handle
113 .send_on_representation(representation)
114 .map_err(|_| Status::PEER_CLOSED)?;
115 }
116 ObjectRequestSend::Nothing => {}
117 }
118 Ok(stream.cast_stream())
119 }
120
121 pub fn into_server_end<T>(self) -> ServerEnd<T> {
123 ServerEnd::new(self.object_request)
124 }
125
126 pub fn into_channel(self) -> fidl::Channel {
128 self.object_request
129 }
130
131 pub fn into_channel_after_sending_on_open(
133 self,
134 node_info: fio::NodeInfoDeprecated,
135 ) -> Result<fidl::Channel, Status> {
136 let stream = fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(
137 self.object_request,
138 ));
139 send_on_open(&stream.control_handle(), node_info)?;
140 let (inner, _is_terminated) = stream.into_inner();
141 Ok(Arc::try_unwrap(inner).unwrap().into_channel().into())
143 }
144
145 pub fn shutdown(self, status: Status) {
147 if self.object_request.as_handle_ref().is_invalid() {
148 return;
149 }
150 if let ObjectRequestSend::OnOpen = self.what_to_send {
151 let (_, control_handle) = ServerEnd::<fio::NodeMarker>::new(self.object_request)
152 .into_stream_and_control_handle();
153 let _ = control_handle.send_on_open_(status.into_raw(), None);
154 control_handle.shutdown_with_epitaph(status);
155 } else {
156 let _ = self.object_request.close_with_epitaph(status);
157 }
158 }
159
160 pub fn handle<T>(
162 mut self,
163 f: impl FnOnce(ObjectRequestRef<'_>) -> Result<T, Status>,
164 ) -> Option<T> {
165 match f(&mut self) {
166 Ok(o) => Some(o),
167 Err(s) => {
168 self.shutdown(s);
169 None
170 }
171 }
172 }
173
174 pub async fn handle_async(
176 mut self,
177 f: impl AsyncFnOnce(&mut ObjectRequest) -> Result<(), Status>,
178 ) {
179 if let Err(s) = f(&mut self).await {
180 self.shutdown(s);
181 }
182 }
183
184 pub async fn wait_till_ready(&self) -> bool {
188 if !matches!(self.what_to_send, ObjectRequestSend::Nothing) {
189 return true;
190 }
191 let signals = fasync::OnSignalsRef::new(
192 self.object_request.as_handle_ref(),
193 fidl::Signals::OBJECT_READABLE | fidl::Signals::CHANNEL_PEER_CLOSED,
194 )
195 .await
196 .unwrap();
197 signals.contains(fidl::Signals::OBJECT_READABLE)
198 }
199
200 pub fn take(&mut self) -> ObjectRequest {
202 assert!(!self.object_request.as_handle_ref().is_invalid());
203 Self {
204 object_request: std::mem::replace(
205 &mut self.object_request,
206 fidl::NullableHandle::invalid().into(),
207 ),
208 what_to_send: self.what_to_send,
209 attributes: self.attributes,
210 create_attributes: self.create_attributes.take(),
211 truncate: self.truncate,
212 }
213 }
214
215 pub async fn create_connection<C, N>(
220 &mut self,
221 scope: ExecutionScope,
222 node: Arc<N>,
223 protocols: impl ProtocolsExt,
224 ) -> Result<(), Status>
225 where
226 C: ConnectionCreator<N>,
227 N: Node,
228 {
229 assert!(!self.object_request.as_handle_ref().is_invalid());
230 if protocols.is_node() {
231 node::Connection::create(scope, node, protocols, self).await
232 } else {
233 C::create(scope, node, protocols, self).await
234 }
235 }
236
237 pub fn create_connection_sync<C, N>(
246 self,
247 scope: ExecutionScope,
248 node: Arc<N>,
249 protocols: impl ProtocolsExt,
250 ) where
251 C: ConnectionCreator<N>,
252 N: Node,
253 {
254 assert!(!self.object_request.as_handle_ref().is_invalid());
255 if protocols.is_node() {
256 self.create_connection_sync_or_spawn::<node::Connection<N>, N>(scope, node, protocols);
257 } else {
258 self.create_connection_sync_or_spawn::<C, N>(scope, node, protocols);
259 }
260 }
261
262 fn create_connection_sync_or_spawn<C, N>(
263 self,
264 scope: ExecutionScope,
265 node: Arc<N>,
266 protocols: impl ProtocolsExt,
267 ) where
268 C: ConnectionCreator<N>,
269 N: Node,
270 {
271 let scope2 = scope.clone();
272 let fut = self.handle_async(async |object_request| {
273 C::create(scope2, node, protocols, object_request).await
274 });
275 run_synchronous_future_or_spawn(scope, fut);
276 }
277}
278
279pub type ObjectRequestRef<'a> = &'a mut ObjectRequest;
280
281#[derive(Clone, Copy, Debug, PartialEq)]
282#[allow(dead_code)]
283pub(crate) enum ObjectRequestSend {
284 OnOpen,
285 OnRepresentation,
286 Nothing,
287}
288
289pub trait Representation {
292 type Protocol: ProtocolMarker;
294
295 fn get_representation(
297 &self,
298 requested_attributes: fio::NodeAttributesQuery,
299 ) -> impl Future<Output = Result<fio::Representation, Status>> + Send;
300
301 fn node_info(&self) -> impl Future<Output = Result<fio::NodeInfoDeprecated, Status>> + Send;
303}
304
305pub trait ToObjectRequest: ProtocolsExt {
309 fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest;
310}
311
312impl ToObjectRequest for fio::OpenFlags {
313 fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest {
314 ObjectRequest::new_deprecated(
315 object_request.into().into(),
316 if self.contains(fio::OpenFlags::DESCRIBE) {
317 ObjectRequestSend::OnOpen
318 } else {
319 ObjectRequestSend::Nothing
320 },
321 fio::NodeAttributesQuery::empty(),
322 None,
323 self.is_truncate(),
324 )
325 }
326}
327
328impl ToObjectRequest for fio::Flags {
329 fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest {
330 ObjectRequest::new(*self, &Default::default(), object_request.into().into())
331 }
332}
333
334fn send_on_open(
335 control_handle: &fio::NodeControlHandle,
336 node_info: fio::NodeInfoDeprecated,
337) -> Result<(), Status> {
338 control_handle
339 .send_on_open_(Status::OK.into_raw(), Some(node_info))
340 .map_err(|_| Status::PEER_CLOSED)
341}
342
343pub trait ConnectionCreator<T: Node> {
345 fn create<'a>(
347 scope: ExecutionScope,
348 node: Arc<T>,
349 protocols: impl ProtocolsExt,
350 object_request: ObjectRequestRef<'a>,
351 ) -> impl Future<Output = Result<(), Status>> + Send + 'a;
352}
353
354pub(crate) fn run_synchronous_future_or_spawn(
357 scope: ExecutionScope,
358 future: impl Future<Output = ()> + Send + 'static,
359) {
360 let mut task = scope.new_task(future);
361 let mut cx = std::task::Context::from_waker(std::task::Waker::noop());
362
363 match task.poll_unpin(&mut cx) {
364 std::task::Poll::Pending => task.spawn(),
365 std::task::Poll::Ready(()) => {}
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::future::ready;
372
373 use crate::execution_scope::yield_to_executor;
374
375 use super::*;
376
377 #[fuchsia::test]
378 async fn test_run_synchronous_future_or_spawn_with_sync_future() {
379 let scope = ExecutionScope::new();
380 run_synchronous_future_or_spawn(scope.clone(), ready(()));
381 scope.wait().await;
382 }
383
384 #[fuchsia::test]
385 async fn test_run_synchronous_future_or_spawn_with_async_future() {
386 let scope = ExecutionScope::new();
387 run_synchronous_future_or_spawn(scope.clone(), yield_to_executor());
388 scope.wait().await;
389 }
390}