Skip to main content

vfs/
object_request.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::ProtocolsExt;
6use crate::execution_scope::ExecutionScope;
7use crate::node::{self, Node};
8use fidl::epitaph::ChannelEpitaphExt;
9#[cfg(feature = "fdomain")]
10use flex_client::AsHandleRef;
11use flex_client::fidl::{ControlHandle, ProtocolMarker, RequestStream, ServerEnd};
12use flex_fuchsia_io as fio;
13#[cfg(not(feature = "fdomain"))]
14use fuchsia_async as fasync;
15use futures::FutureExt;
16use std::future::Future;
17use std::sync::Arc;
18use zx_status::Status;
19
20/// Wraps the channel provided in the open methods and provide convenience methods for sending
21pub trait IntoAsyncChannel {
22    fn into_async_channel(self) -> flex_client::AsyncChannel;
23}
24
25#[cfg(not(feature = "fdomain"))]
26impl IntoAsyncChannel for flex_client::Channel {
27    fn into_async_channel(self) -> flex_client::AsyncChannel {
28        fasync::Channel::from_channel(self)
29    }
30}
31
32#[cfg(feature = "fdomain")]
33impl IntoAsyncChannel for flex_client::Channel {
34    fn into_async_channel(self) -> flex_client::AsyncChannel {
35        self
36    }
37}
38
39/// Wraps the channel provided in the open methods and provide convenience methods for sending
40/// appropriate responses.  It also records actions that should be taken upon successful connection
41/// such as truncating file objects.
42#[derive(Debug)]
43pub struct ObjectRequest {
44    // The channel.
45    object_request: flex_client::Channel,
46
47    // What should be sent first.
48    what_to_send: ObjectRequestSend,
49
50    // Attributes required in the open method.
51    attributes: fio::NodeAttributesQuery,
52
53    // Creation attributes.
54    create_attributes: Option<Box<fio::MutableNodeAttributes>>,
55
56    /// Truncate the object before use.
57    pub truncate: bool,
58}
59
60impl ObjectRequest {
61    pub(crate) fn new_deprecated(
62        object_request: flex_client::Channel,
63        what_to_send: ObjectRequestSend,
64        attributes: fio::NodeAttributesQuery,
65        create_attributes: Option<&fio::MutableNodeAttributes>,
66        truncate: bool,
67    ) -> Self {
68        let create_attributes = create_attributes.map(|a| Box::new(a.clone()));
69        Self {
70            object_request: object_request,
71            what_to_send,
72            attributes,
73            create_attributes,
74            truncate,
75        }
76    }
77
78    /// Create a new [`ObjectRequest`] from a set of [`fio::Flags`] and [`fio::Options`]`.
79    pub fn new(
80        flags: fio::Flags,
81        options: &fio::Options,
82        object_request: flex_client::Channel,
83    ) -> Self {
84        Self::new_deprecated(
85            object_request,
86            if flags.contains(fio::Flags::FLAG_SEND_REPRESENTATION) {
87                ObjectRequestSend::OnRepresentation
88            } else {
89                ObjectRequestSend::Nothing
90            },
91            options.attributes.unwrap_or(fio::NodeAttributesQuery::empty()),
92            options.create_attributes.as_ref(),
93            flags.is_truncate(),
94        )
95    }
96
97    pub(crate) fn what_to_send(&self) -> ObjectRequestSend {
98        self.what_to_send
99    }
100
101    pub fn attributes(&self) -> fio::NodeAttributesQuery {
102        self.attributes
103    }
104
105    pub fn create_attributes(&self) -> Option<&fio::MutableNodeAttributes> {
106        self.create_attributes.as_deref()
107    }
108
109    pub fn options(&self) -> fio::Options {
110        fio::Options {
111            attributes: (!self.attributes.is_empty()).then_some(self.attributes),
112            create_attributes: self
113                .create_attributes
114                .as_ref()
115                .map(|a| fio::MutableNodeAttributes::clone(&a)),
116            ..Default::default()
117        }
118    }
119
120    /// Returns the request stream after sending requested information.
121    pub async fn into_request_stream<T: Representation>(
122        self,
123        connection: &T,
124    ) -> Result<<T::Protocol as ProtocolMarker>::RequestStream, Status> {
125        let stream = fio::NodeRequestStream::from_channel(self.object_request.into_async_channel());
126        match self.what_to_send {
127            #[cfg(any(
128                fuchsia_api_level_at_least = "PLATFORM",
129                not(fuchsia_api_level_at_least = "NEXT")
130            ))]
131            ObjectRequestSend::OnOpen => {
132                let control_handle = stream.control_handle();
133                let node_info = connection.node_info().await.map_err(|s| {
134                    control_handle.shutdown_with_epitaph(s);
135                    s
136                })?;
137                send_on_open(&stream.control_handle(), node_info)?;
138            }
139            ObjectRequestSend::OnRepresentation => {
140                let control_handle = stream.control_handle();
141                let representation =
142                    connection.get_representation(self.attributes).await.map_err(|s| {
143                        control_handle.shutdown_with_epitaph(s);
144                        s
145                    })?;
146                control_handle
147                    .send_on_representation(representation)
148                    .map_err(|_| Status::PEER_CLOSED)?;
149            }
150            ObjectRequestSend::Nothing => {}
151        }
152        let (inner, is_terminated) = stream.into_inner();
153        Ok(<<T as Representation>::Protocol as flex_client::fidl::ProtocolMarker>::RequestStream::from_inner(inner, is_terminated))
154    }
155
156    /// Converts to ServerEnd<T>.
157    pub fn into_server_end<T: ProtocolMarker>(self) -> ServerEnd<T> {
158        ServerEnd::new(self.object_request)
159    }
160
161    /// Extracts the channel (without sending on_open).
162    pub fn into_channel(self) -> flex_client::Channel {
163        self.object_request
164    }
165
166    /// Extracts the channel after sending on_open.
167    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
168    pub fn into_channel_after_sending_on_open(
169        self,
170        node_info: fio::NodeInfoDeprecated,
171    ) -> Result<flex_client::Channel, Status> {
172        let stream = fio::NodeRequestStream::from_channel(self.object_request.into_async_channel());
173        send_on_open(&stream.control_handle(), node_info)?;
174        let (inner, _is_terminated) = stream.into_inner();
175        // It's safe to unwrap here because inner is clearly the only Arc reference left.
176        Ok(Arc::try_unwrap(inner).unwrap().into_channel().into())
177    }
178
179    /// Terminates the object request with the given status.
180    pub fn shutdown(self, status: Status) {
181        if self.object_request.as_handle_ref().is_invalid() {
182            return;
183        }
184        #[cfg(any(
185            fuchsia_api_level_at_least = "PLATFORM",
186            not(fuchsia_api_level_at_least = "NEXT")
187        ))]
188        if let ObjectRequestSend::OnOpen = self.what_to_send {
189            let (_, control_handle) = ServerEnd::<fio::NodeMarker>::new(self.object_request)
190                .into_stream_and_control_handle();
191            let _ = control_handle.send_on_open_(status.into_raw(), None);
192            control_handle.shutdown_with_epitaph(status);
193        } else {
194            let _ = self.object_request.close_with_epitaph(status);
195        }
196        #[cfg(not(any(
197            fuchsia_api_level_at_least = "PLATFORM",
198            not(fuchsia_api_level_at_least = "NEXT")
199        )))]
200        {
201            let _ = self.object_request.close_with_epitaph(status);
202        }
203    }
204
205    /// Calls `f` and sends an error on the object request channel upon failure.
206    pub fn handle<T>(
207        mut self,
208        f: impl FnOnce(ObjectRequestRef<'_>) -> Result<T, Status>,
209    ) -> Option<T> {
210        match f(&mut self) {
211            Ok(o) => Some(o),
212            Err(s) => {
213                self.shutdown(s);
214                None
215            }
216        }
217    }
218
219    /// Calls `f` and sends an error on the object request channel upon failure.
220    pub async fn handle_async(
221        mut self,
222        f: impl AsyncFnOnce(&mut ObjectRequest) -> Result<(), Status>,
223    ) {
224        if let Err(s) = f(&mut self).await {
225            self.shutdown(s);
226        }
227    }
228
229    /// Waits until the request has a request waiting in its channel.  Returns immediately if this
230    /// request requires sending an initial event such as OnOpen or OnRepresentation.  Returns
231    /// `true` if the channel is readable (rather than just closed).
232    pub async fn wait_till_ready(&self) -> bool {
233        if !matches!(self.what_to_send, ObjectRequestSend::Nothing) {
234            return true;
235        }
236        flex_client::wait_for_signals(
237            &self.object_request.as_handle_ref(),
238            fidl::Signals::OBJECT_READABLE | fidl::Signals::CHANNEL_PEER_CLOSED,
239        )
240        .await
241        .map(|x| x.contains(fidl::Signals::OBJECT_READABLE))
242        .unwrap_or(false)
243    }
244
245    /// Take the ObjectRequest.  The caller is responsible for sending errors.
246    pub fn take(&mut self) -> ObjectRequest {
247        // assert!(!self.object_request.as_handle_ref().is_invalid());
248        Self {
249            object_request: std::mem::replace(
250                &mut self.object_request,
251                flex_client::NullableHandle::invalid().into(),
252            ),
253            what_to_send: self.what_to_send,
254            attributes: self.attributes,
255            create_attributes: self.create_attributes.take(),
256            truncate: self.truncate,
257        }
258    }
259
260    /// Constructs a new connection to `node` and spawns an async `Task` that will handle requests
261    /// on the connection. `f` is a callback that constructs the connection but it will not be
262    /// called if the connection is supposed to be a node connection. This should be called from
263    /// within a [`ObjectRequest::handle_async`] callback.
264    pub async fn create_connection<C, N>(
265        &mut self,
266        scope: ExecutionScope,
267        node: Arc<N>,
268        protocols: impl ProtocolsExt,
269    ) -> Result<(), Status>
270    where
271        C: ConnectionCreator<N>,
272        N: Node,
273    {
274        // assert!(!self.object_request.as_handle_ref().is_invalid());
275        if protocols.is_node() {
276            node::Connection::create(scope, node, protocols, self).await
277        } else {
278            C::create(scope, node, protocols, self).await
279        }
280    }
281
282    /// Constructs a new connection to `node` and spawns an async `Task` that will handle requests
283    /// on the connection. `f` is a callback that constructs the connection but it will not be
284    /// called if the connection is supposed to be a node connection. This should be called from
285    /// within a [`ObjectRequest::handle`] callback.
286    ///
287    /// This method synchronously calls async code and may require spawning an extra Task if the
288    /// async code does something asynchronous. `create_connection` should be preferred if the
289    /// caller is already in an async context.
290    pub fn create_connection_sync<C, N>(
291        self,
292        scope: ExecutionScope,
293        node: Arc<N>,
294        protocols: impl ProtocolsExt,
295    ) where
296        C: ConnectionCreator<N>,
297        N: Node,
298    {
299        // assert!(!self.object_request.as_handle_ref().is_invalid());
300        if protocols.is_node() {
301            self.create_connection_sync_or_spawn::<node::Connection<N>, N>(scope, node, protocols);
302        } else {
303            self.create_connection_sync_or_spawn::<C, N>(scope, node, protocols);
304        }
305    }
306
307    fn create_connection_sync_or_spawn<C, N>(
308        self,
309        scope: ExecutionScope,
310        node: Arc<N>,
311        protocols: impl ProtocolsExt,
312    ) where
313        C: ConnectionCreator<N>,
314        N: Node,
315    {
316        let scope2 = scope.clone();
317        let fut = self.handle_async(async |object_request| {
318            C::create(scope2, node, protocols, object_request).await
319        });
320        run_synchronous_future_or_spawn(scope, fut);
321    }
322}
323
324pub type ObjectRequestRef<'a> = &'a mut ObjectRequest;
325
326#[derive(Clone, Copy, Debug, PartialEq)]
327#[allow(dead_code)]
328pub(crate) enum ObjectRequestSend {
329    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
330    OnOpen,
331    OnRepresentation,
332    Nothing,
333}
334
335/// Trait to get either fio::Representation or fio::NodeInfoDeprecated.  Connection types
336/// should implement this.
337pub trait Representation {
338    /// The protocol used for the connection.
339    type Protocol: ProtocolMarker;
340
341    /// Returns io2's Representation for the object.
342    fn get_representation(
343        &self,
344        requested_attributes: fio::NodeAttributesQuery,
345    ) -> impl Future<Output = Result<fio::Representation, Status>> + Send;
346
347    /// Returns io1's NodeInfoDeprecated.
348    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
349    fn node_info(&self) -> impl Future<Output = Result<fio::NodeInfoDeprecated, Status>> + Send;
350}
351
352/// Convenience trait for converting [`fio::Flags`] and [`fio::OpenFlags`] into ObjectRequest.
353///
354/// If [`fio::Options`] need to be specified, use [`ObjectRequest::new`].
355pub trait ToObjectRequest: ProtocolsExt {
356    fn to_object_request(
357        &self,
358        object_request: impl Into<flex_client::NullableHandle>,
359    ) -> ObjectRequest;
360}
361
362impl ToObjectRequest for fio::OpenFlags {
363    fn to_object_request(
364        &self,
365        object_request: impl Into<flex_client::NullableHandle>,
366    ) -> ObjectRequest {
367        ObjectRequest::new_deprecated(
368            object_request.into().into(),
369            #[cfg(any(
370                fuchsia_api_level_at_least = "PLATFORM",
371                not(fuchsia_api_level_at_least = "NEXT")
372            ))]
373            if self.contains(fio::OpenFlags::DESCRIBE) {
374                ObjectRequestSend::OnOpen
375            } else {
376                ObjectRequestSend::Nothing
377            },
378            #[cfg(not(any(
379                fuchsia_api_level_at_least = "PLATFORM",
380                not(fuchsia_api_level_at_least = "NEXT")
381            )))]
382            ObjectRequestSend::Nothing,
383            fio::NodeAttributesQuery::empty(),
384            None,
385            self.is_truncate(),
386        )
387    }
388}
389
390impl ToObjectRequest for fio::Flags {
391    fn to_object_request(
392        &self,
393        object_request: impl Into<flex_client::NullableHandle>,
394    ) -> ObjectRequest {
395        ObjectRequest::new(*self, &Default::default(), object_request.into().into())
396    }
397}
398
399#[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
400fn send_on_open(
401    control_handle: &fio::NodeControlHandle,
402    node_info: fio::NodeInfoDeprecated,
403) -> Result<(), Status> {
404    control_handle
405        .send_on_open_(Status::OK.into_raw(), Some(node_info))
406        .map_err(|_| Status::PEER_CLOSED)
407}
408
409/// Trait for constructing connections to nodes.
410pub trait ConnectionCreator<T: Node> {
411    /// Creates a new connection to `node` and spawns a new `Task` to run the connection.
412    fn create<'a>(
413        scope: ExecutionScope,
414        node: Arc<T>,
415        protocols: impl ProtocolsExt,
416        object_request: ObjectRequestRef<'a>,
417    ) -> impl Future<Output = Result<(), Status>> + Send + 'a;
418}
419
420/// Synchronously polls `future` with the expectation that it won't return Pending. If the future
421/// does return Pending then this function will spawn a Task to run the future.
422pub(crate) fn run_synchronous_future_or_spawn(
423    scope: ExecutionScope,
424    future: impl Future<Output = ()> + Send + 'static,
425) {
426    let mut task = scope.new_task(future);
427    let mut cx = std::task::Context::from_waker(std::task::Waker::noop());
428
429    match task.poll_unpin(&mut cx) {
430        std::task::Poll::Pending => task.spawn(),
431        std::task::Poll::Ready(()) => {}
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use std::future::ready;
438
439    use crate::execution_scope::yield_to_executor;
440
441    use super::*;
442
443    #[fuchsia::test]
444    async fn test_run_synchronous_future_or_spawn_with_sync_future() {
445        #[cfg(feature = "fdomain")]
446        let scope = crate::execution_scope::ExecutionScope::new(flex_local::local_client_empty());
447        #[cfg(not(feature = "fdomain"))]
448        let scope = crate::execution_scope::ExecutionScope::new();
449        run_synchronous_future_or_spawn(scope.clone(), ready(()));
450        scope.wait().await;
451    }
452
453    #[fuchsia::test]
454    async fn test_run_synchronous_future_or_spawn_with_async_future() {
455        #[cfg(feature = "fdomain")]
456        let scope = crate::execution_scope::ExecutionScope::new(flex_local::local_client_empty());
457        #[cfg(not(feature = "fdomain"))]
458        let scope = crate::execution_scope::ExecutionScope::new();
459        run_synchronous_future_or_spawn(scope.clone(), yield_to_executor());
460        scope.wait().await;
461    }
462}