1use crate::ProtocolsExt;
6use crate::execution_scope::ExecutionScope;
7use crate::node::{self, Node};
8use fidl::epitaph::ChannelEpitaphExt;
9#[cfg(feature = "fdomain")]
10use flex_client::AsHandleRef;
11use flex_client::fidl::{ControlHandle, ProtocolMarker, RequestStream, ServerEnd};
12use flex_fuchsia_io as fio;
13#[cfg(not(feature = "fdomain"))]
14use fuchsia_async as fasync;
15use futures::FutureExt;
16use std::future::Future;
17use std::sync::Arc;
18use zx_status::Status;
19
20pub trait IntoAsyncChannel {
22 fn into_async_channel(self) -> flex_client::AsyncChannel;
23}
24
25#[cfg(not(feature = "fdomain"))]
26impl IntoAsyncChannel for flex_client::Channel {
27 fn into_async_channel(self) -> flex_client::AsyncChannel {
28 fasync::Channel::from_channel(self)
29 }
30}
31
32#[cfg(feature = "fdomain")]
33impl IntoAsyncChannel for flex_client::Channel {
34 fn into_async_channel(self) -> flex_client::AsyncChannel {
35 self
36 }
37}
38
39#[derive(Debug)]
43pub struct ObjectRequest {
44 object_request: flex_client::Channel,
46
47 what_to_send: ObjectRequestSend,
49
50 attributes: fio::NodeAttributesQuery,
52
53 create_attributes: Option<Box<fio::MutableNodeAttributes>>,
55
56 pub truncate: bool,
58}
59
60impl ObjectRequest {
61 pub(crate) fn new_deprecated(
62 object_request: flex_client::Channel,
63 what_to_send: ObjectRequestSend,
64 attributes: fio::NodeAttributesQuery,
65 create_attributes: Option<&fio::MutableNodeAttributes>,
66 truncate: bool,
67 ) -> Self {
68 let create_attributes = create_attributes.map(|a| Box::new(a.clone()));
69 Self {
70 object_request: object_request,
71 what_to_send,
72 attributes,
73 create_attributes,
74 truncate,
75 }
76 }
77
78 pub fn new(
80 flags: fio::Flags,
81 options: &fio::Options,
82 object_request: flex_client::Channel,
83 ) -> Self {
84 Self::new_deprecated(
85 object_request,
86 if flags.contains(fio::Flags::FLAG_SEND_REPRESENTATION) {
87 ObjectRequestSend::OnRepresentation
88 } else {
89 ObjectRequestSend::Nothing
90 },
91 options.attributes.unwrap_or(fio::NodeAttributesQuery::empty()),
92 options.create_attributes.as_ref(),
93 flags.is_truncate(),
94 )
95 }
96
97 pub(crate) fn what_to_send(&self) -> ObjectRequestSend {
98 self.what_to_send
99 }
100
101 pub fn attributes(&self) -> fio::NodeAttributesQuery {
102 self.attributes
103 }
104
105 pub fn create_attributes(&self) -> Option<&fio::MutableNodeAttributes> {
106 self.create_attributes.as_deref()
107 }
108
109 pub fn options(&self) -> fio::Options {
110 fio::Options {
111 attributes: (!self.attributes.is_empty()).then_some(self.attributes),
112 create_attributes: self
113 .create_attributes
114 .as_ref()
115 .map(|a| fio::MutableNodeAttributes::clone(&a)),
116 ..Default::default()
117 }
118 }
119
120 pub async fn into_request_stream<T: Representation>(
122 self,
123 connection: &T,
124 ) -> Result<<T::Protocol as ProtocolMarker>::RequestStream, Status> {
125 let stream = fio::NodeRequestStream::from_channel(self.object_request.into_async_channel());
126 match self.what_to_send {
127 #[cfg(any(
128 fuchsia_api_level_at_least = "PLATFORM",
129 not(fuchsia_api_level_at_least = "NEXT")
130 ))]
131 ObjectRequestSend::OnOpen => {
132 let control_handle = stream.control_handle();
133 let node_info = connection.node_info().await.map_err(|s| {
134 control_handle.shutdown_with_epitaph(s);
135 s
136 })?;
137 send_on_open(&stream.control_handle(), node_info)?;
138 }
139 ObjectRequestSend::OnRepresentation => {
140 let control_handle = stream.control_handle();
141 let representation =
142 connection.get_representation(self.attributes).await.map_err(|s| {
143 control_handle.shutdown_with_epitaph(s);
144 s
145 })?;
146 control_handle
147 .send_on_representation(representation)
148 .map_err(|_| Status::PEER_CLOSED)?;
149 }
150 ObjectRequestSend::Nothing => {}
151 }
152 let (inner, is_terminated) = stream.into_inner();
153 Ok(<<T as Representation>::Protocol as flex_client::fidl::ProtocolMarker>::RequestStream::from_inner(inner, is_terminated))
154 }
155
156 pub fn into_server_end<T: ProtocolMarker>(self) -> ServerEnd<T> {
158 ServerEnd::new(self.object_request)
159 }
160
161 pub fn into_channel(self) -> flex_client::Channel {
163 self.object_request
164 }
165
166 #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
168 pub fn into_channel_after_sending_on_open(
169 self,
170 node_info: fio::NodeInfoDeprecated,
171 ) -> Result<flex_client::Channel, Status> {
172 let stream = fio::NodeRequestStream::from_channel(self.object_request.into_async_channel());
173 send_on_open(&stream.control_handle(), node_info)?;
174 let (inner, _is_terminated) = stream.into_inner();
175 Ok(Arc::try_unwrap(inner).unwrap().into_channel().into())
177 }
178
179 pub fn shutdown(self, status: Status) {
181 if self.object_request.as_handle_ref().is_invalid() {
182 return;
183 }
184 #[cfg(any(
185 fuchsia_api_level_at_least = "PLATFORM",
186 not(fuchsia_api_level_at_least = "NEXT")
187 ))]
188 if let ObjectRequestSend::OnOpen = self.what_to_send {
189 let (_, control_handle) = ServerEnd::<fio::NodeMarker>::new(self.object_request)
190 .into_stream_and_control_handle();
191 let _ = control_handle.send_on_open_(status.into_raw(), None);
192 control_handle.shutdown_with_epitaph(status);
193 } else {
194 let _ = self.object_request.close_with_epitaph(status);
195 }
196 #[cfg(not(any(
197 fuchsia_api_level_at_least = "PLATFORM",
198 not(fuchsia_api_level_at_least = "NEXT")
199 )))]
200 {
201 let _ = self.object_request.close_with_epitaph(status);
202 }
203 }
204
205 pub fn handle<T>(
207 mut self,
208 f: impl FnOnce(ObjectRequestRef<'_>) -> Result<T, Status>,
209 ) -> Option<T> {
210 match f(&mut self) {
211 Ok(o) => Some(o),
212 Err(s) => {
213 self.shutdown(s);
214 None
215 }
216 }
217 }
218
219 pub async fn handle_async(
221 mut self,
222 f: impl AsyncFnOnce(&mut ObjectRequest) -> Result<(), Status>,
223 ) {
224 if let Err(s) = f(&mut self).await {
225 self.shutdown(s);
226 }
227 }
228
229 pub async fn wait_till_ready(&self) -> bool {
233 if !matches!(self.what_to_send, ObjectRequestSend::Nothing) {
234 return true;
235 }
236 flex_client::wait_for_signals(
237 &self.object_request.as_handle_ref(),
238 fidl::Signals::OBJECT_READABLE | fidl::Signals::CHANNEL_PEER_CLOSED,
239 )
240 .await
241 .map(|x| x.contains(fidl::Signals::OBJECT_READABLE))
242 .unwrap_or(false)
243 }
244
245 pub fn take(&mut self) -> ObjectRequest {
247 Self {
249 object_request: std::mem::replace(
250 &mut self.object_request,
251 flex_client::NullableHandle::invalid().into(),
252 ),
253 what_to_send: self.what_to_send,
254 attributes: self.attributes,
255 create_attributes: self.create_attributes.take(),
256 truncate: self.truncate,
257 }
258 }
259
260 pub async fn create_connection<C, N>(
265 &mut self,
266 scope: ExecutionScope,
267 node: Arc<N>,
268 protocols: impl ProtocolsExt,
269 ) -> Result<(), Status>
270 where
271 C: ConnectionCreator<N>,
272 N: Node,
273 {
274 if protocols.is_node() {
276 node::Connection::create(scope, node, protocols, self).await
277 } else {
278 C::create(scope, node, protocols, self).await
279 }
280 }
281
282 pub fn create_connection_sync<C, N>(
291 self,
292 scope: ExecutionScope,
293 node: Arc<N>,
294 protocols: impl ProtocolsExt,
295 ) where
296 C: ConnectionCreator<N>,
297 N: Node,
298 {
299 if protocols.is_node() {
301 self.create_connection_sync_or_spawn::<node::Connection<N>, N>(scope, node, protocols);
302 } else {
303 self.create_connection_sync_or_spawn::<C, N>(scope, node, protocols);
304 }
305 }
306
307 fn create_connection_sync_or_spawn<C, N>(
308 self,
309 scope: ExecutionScope,
310 node: Arc<N>,
311 protocols: impl ProtocolsExt,
312 ) where
313 C: ConnectionCreator<N>,
314 N: Node,
315 {
316 let scope2 = scope.clone();
317 let fut = self.handle_async(async |object_request| {
318 C::create(scope2, node, protocols, object_request).await
319 });
320 run_synchronous_future_or_spawn(scope, fut);
321 }
322}
323
324pub type ObjectRequestRef<'a> = &'a mut ObjectRequest;
325
326#[derive(Clone, Copy, Debug, PartialEq)]
327#[allow(dead_code)]
328pub(crate) enum ObjectRequestSend {
329 #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
330 OnOpen,
331 OnRepresentation,
332 Nothing,
333}
334
335pub trait Representation {
338 type Protocol: ProtocolMarker;
340
341 fn get_representation(
343 &self,
344 requested_attributes: fio::NodeAttributesQuery,
345 ) -> impl Future<Output = Result<fio::Representation, Status>> + Send;
346
347 #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
349 fn node_info(&self) -> impl Future<Output = Result<fio::NodeInfoDeprecated, Status>> + Send;
350}
351
352pub trait ToObjectRequest: ProtocolsExt {
356 fn to_object_request(
357 &self,
358 object_request: impl Into<flex_client::NullableHandle>,
359 ) -> ObjectRequest;
360}
361
362impl ToObjectRequest for fio::OpenFlags {
363 fn to_object_request(
364 &self,
365 object_request: impl Into<flex_client::NullableHandle>,
366 ) -> ObjectRequest {
367 ObjectRequest::new_deprecated(
368 object_request.into().into(),
369 #[cfg(any(
370 fuchsia_api_level_at_least = "PLATFORM",
371 not(fuchsia_api_level_at_least = "NEXT")
372 ))]
373 if self.contains(fio::OpenFlags::DESCRIBE) {
374 ObjectRequestSend::OnOpen
375 } else {
376 ObjectRequestSend::Nothing
377 },
378 #[cfg(not(any(
379 fuchsia_api_level_at_least = "PLATFORM",
380 not(fuchsia_api_level_at_least = "NEXT")
381 )))]
382 ObjectRequestSend::Nothing,
383 fio::NodeAttributesQuery::empty(),
384 None,
385 self.is_truncate(),
386 )
387 }
388}
389
390impl ToObjectRequest for fio::Flags {
391 fn to_object_request(
392 &self,
393 object_request: impl Into<flex_client::NullableHandle>,
394 ) -> ObjectRequest {
395 ObjectRequest::new(*self, &Default::default(), object_request.into().into())
396 }
397}
398
399#[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
400fn send_on_open(
401 control_handle: &fio::NodeControlHandle,
402 node_info: fio::NodeInfoDeprecated,
403) -> Result<(), Status> {
404 control_handle
405 .send_on_open_(Status::OK.into_raw(), Some(node_info))
406 .map_err(|_| Status::PEER_CLOSED)
407}
408
409pub trait ConnectionCreator<T: Node> {
411 fn create<'a>(
413 scope: ExecutionScope,
414 node: Arc<T>,
415 protocols: impl ProtocolsExt,
416 object_request: ObjectRequestRef<'a>,
417 ) -> impl Future<Output = Result<(), Status>> + Send + 'a;
418}
419
420pub(crate) fn run_synchronous_future_or_spawn(
423 scope: ExecutionScope,
424 future: impl Future<Output = ()> + Send + 'static,
425) {
426 let mut task = scope.new_task(future);
427 let mut cx = std::task::Context::from_waker(std::task::Waker::noop());
428
429 match task.poll_unpin(&mut cx) {
430 std::task::Poll::Pending => task.spawn(),
431 std::task::Poll::Ready(()) => {}
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use std::future::ready;
438
439 use crate::execution_scope::yield_to_executor;
440
441 use super::*;
442
443 #[fuchsia::test]
444 async fn test_run_synchronous_future_or_spawn_with_sync_future() {
445 #[cfg(feature = "fdomain")]
446 let scope = crate::execution_scope::ExecutionScope::new(flex_local::local_client_empty());
447 #[cfg(not(feature = "fdomain"))]
448 let scope = crate::execution_scope::ExecutionScope::new();
449 run_synchronous_future_or_spawn(scope.clone(), ready(()));
450 scope.wait().await;
451 }
452
453 #[fuchsia::test]
454 async fn test_run_synchronous_future_or_spawn_with_async_future() {
455 #[cfg(feature = "fdomain")]
456 let scope = crate::execution_scope::ExecutionScope::new(flex_local::local_client_empty());
457 #[cfg(not(feature = "fdomain"))]
458 let scope = crate::execution_scope::ExecutionScope::new();
459 run_synchronous_future_or_spawn(scope.clone(), yield_to_executor());
460 scope.wait().await;
461 }
462}