Skip to main content

vfs/
object_request.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::ProtocolsExt;
6use crate::execution_scope::ExecutionScope;
7use crate::node::{self, Node};
8use fidl::endpoints::{ControlHandle, ProtocolMarker, RequestStream, ServerEnd};
9use fidl::epitaph::ChannelEpitaphExt;
10use fidl_fuchsia_io as fio;
11use fuchsia_async as fasync;
12use futures::FutureExt;
13use std::future::Future;
14use std::sync::Arc;
15use zx_status::Status;
16
17/// Wraps the channel provided in the open methods and provide convenience methods for sending
18/// appropriate responses.  It also records actions that should be taken upon successful connection
19/// such as truncating file objects.
20#[derive(Debug)]
21pub struct ObjectRequest {
22    // The channel.
23    object_request: fidl::Channel,
24
25    // What should be sent first.
26    what_to_send: ObjectRequestSend,
27
28    // Attributes required in the open method.
29    attributes: fio::NodeAttributesQuery,
30
31    // Creation attributes.
32    create_attributes: Option<Box<fio::MutableNodeAttributes>>,
33
34    /// Truncate the object before use.
35    pub truncate: bool,
36}
37
38impl ObjectRequest {
39    pub(crate) fn new_deprecated(
40        object_request: fidl::Channel,
41        what_to_send: ObjectRequestSend,
42        attributes: fio::NodeAttributesQuery,
43        create_attributes: Option<&fio::MutableNodeAttributes>,
44        truncate: bool,
45    ) -> Self {
46        assert!(!object_request.as_handle_ref().is_invalid());
47        let create_attributes = create_attributes.map(|a| Box::new(a.clone()));
48        Self { object_request, what_to_send, attributes, create_attributes, truncate }
49    }
50
51    /// Create a new [`ObjectRequest`] from a set of [`fio::Flags`] and [`fio::Options`]`.
52    pub fn new(flags: fio::Flags, options: &fio::Options, object_request: fidl::Channel) -> Self {
53        Self::new_deprecated(
54            object_request,
55            if flags.contains(fio::Flags::FLAG_SEND_REPRESENTATION) {
56                ObjectRequestSend::OnRepresentation
57            } else {
58                ObjectRequestSend::Nothing
59            },
60            options.attributes.unwrap_or(fio::NodeAttributesQuery::empty()),
61            options.create_attributes.as_ref(),
62            flags.is_truncate(),
63        )
64    }
65
66    pub(crate) fn what_to_send(&self) -> ObjectRequestSend {
67        self.what_to_send
68    }
69
70    pub fn attributes(&self) -> fio::NodeAttributesQuery {
71        self.attributes
72    }
73
74    pub fn create_attributes(&self) -> Option<&fio::MutableNodeAttributes> {
75        self.create_attributes.as_deref()
76    }
77
78    pub fn options(&self) -> fio::Options {
79        fio::Options {
80            attributes: (!self.attributes.is_empty()).then_some(self.attributes),
81            create_attributes: self
82                .create_attributes
83                .as_ref()
84                .map(|a| fio::MutableNodeAttributes::clone(&a)),
85            ..Default::default()
86        }
87    }
88
89    /// Returns the request stream after sending requested information.
90    pub async fn into_request_stream<T: Representation>(
91        self,
92        connection: &T,
93    ) -> Result<<T::Protocol as ProtocolMarker>::RequestStream, Status> {
94        let stream = fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(
95            self.object_request,
96        ));
97        match self.what_to_send {
98            #[cfg(any(
99                fuchsia_api_level_at_least = "PLATFORM",
100                not(fuchsia_api_level_at_least = "NEXT")
101            ))]
102            ObjectRequestSend::OnOpen => {
103                let control_handle = stream.control_handle();
104                let node_info = connection.node_info().await.map_err(|s| {
105                    control_handle.shutdown_with_epitaph(s);
106                    s
107                })?;
108                send_on_open(&stream.control_handle(), node_info)?;
109            }
110            ObjectRequestSend::OnRepresentation => {
111                let control_handle = stream.control_handle();
112                let representation =
113                    connection.get_representation(self.attributes).await.map_err(|s| {
114                        control_handle.shutdown_with_epitaph(s);
115                        s
116                    })?;
117                control_handle
118                    .send_on_representation(representation)
119                    .map_err(|_| Status::PEER_CLOSED)?;
120            }
121            ObjectRequestSend::Nothing => {}
122        }
123        Ok(stream.cast_stream())
124    }
125
126    /// Converts to ServerEnd<T>.
127    pub fn into_server_end<T>(self) -> ServerEnd<T> {
128        ServerEnd::new(self.object_request)
129    }
130
131    /// Extracts the channel (without sending on_open).
132    pub fn into_channel(self) -> fidl::Channel {
133        self.object_request
134    }
135
136    /// Extracts the channel after sending on_open.
137    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
138    pub fn into_channel_after_sending_on_open(
139        self,
140        node_info: fio::NodeInfoDeprecated,
141    ) -> Result<fidl::Channel, Status> {
142        let stream = fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(
143            self.object_request,
144        ));
145        send_on_open(&stream.control_handle(), node_info)?;
146        let (inner, _is_terminated) = stream.into_inner();
147        // It's safe to unwrap here because inner is clearly the only Arc reference left.
148        Ok(Arc::try_unwrap(inner).unwrap().into_channel().into())
149    }
150
151    /// Terminates the object request with the given status.
152    pub fn shutdown(self, status: Status) {
153        if self.object_request.as_handle_ref().is_invalid() {
154            return;
155        }
156        #[cfg(any(
157            fuchsia_api_level_at_least = "PLATFORM",
158            not(fuchsia_api_level_at_least = "NEXT")
159        ))]
160        if let ObjectRequestSend::OnOpen = self.what_to_send {
161            let (_, control_handle) = ServerEnd::<fio::NodeMarker>::new(self.object_request)
162                .into_stream_and_control_handle();
163            let _ = control_handle.send_on_open_(status.into_raw(), None);
164            control_handle.shutdown_with_epitaph(status);
165        } else {
166            let _ = self.object_request.close_with_epitaph(status);
167        }
168        #[cfg(not(any(
169            fuchsia_api_level_at_least = "PLATFORM",
170            not(fuchsia_api_level_at_least = "NEXT")
171        )))]
172        {
173            let _ = self.object_request.close_with_epitaph(status);
174        }
175    }
176
177    /// Calls `f` and sends an error on the object request channel upon failure.
178    pub fn handle<T>(
179        mut self,
180        f: impl FnOnce(ObjectRequestRef<'_>) -> Result<T, Status>,
181    ) -> Option<T> {
182        match f(&mut self) {
183            Ok(o) => Some(o),
184            Err(s) => {
185                self.shutdown(s);
186                None
187            }
188        }
189    }
190
191    /// Calls `f` and sends an error on the object request channel upon failure.
192    pub async fn handle_async(
193        mut self,
194        f: impl AsyncFnOnce(&mut ObjectRequest) -> Result<(), Status>,
195    ) {
196        if let Err(s) = f(&mut self).await {
197            self.shutdown(s);
198        }
199    }
200
201    /// Waits until the request has a request waiting in its channel.  Returns immediately if this
202    /// request requires sending an initial event such as OnOpen or OnRepresentation.  Returns
203    /// `true` if the channel is readable (rather than just closed).
204    pub async fn wait_till_ready(&self) -> bool {
205        if !matches!(self.what_to_send, ObjectRequestSend::Nothing) {
206            return true;
207        }
208        let signals = fasync::OnSignalsRef::new(
209            self.object_request.as_handle_ref(),
210            fidl::Signals::OBJECT_READABLE | fidl::Signals::CHANNEL_PEER_CLOSED,
211        )
212        .await
213        .unwrap();
214        signals.contains(fidl::Signals::OBJECT_READABLE)
215    }
216
217    /// Take the ObjectRequest.  The caller is responsible for sending errors.
218    pub fn take(&mut self) -> ObjectRequest {
219        assert!(!self.object_request.as_handle_ref().is_invalid());
220        Self {
221            object_request: std::mem::replace(
222                &mut self.object_request,
223                fidl::NullableHandle::invalid().into(),
224            ),
225            what_to_send: self.what_to_send,
226            attributes: self.attributes,
227            create_attributes: self.create_attributes.take(),
228            truncate: self.truncate,
229        }
230    }
231
232    /// Constructs a new connection to `node` and spawns an async `Task` that will handle requests
233    /// on the connection. `f` is a callback that constructs the connection but it will not be
234    /// called if the connection is supposed to be a node connection. This should be called from
235    /// within a [`ObjectRequest::handle_async`] callback.
236    pub async fn create_connection<C, N>(
237        &mut self,
238        scope: ExecutionScope,
239        node: Arc<N>,
240        protocols: impl ProtocolsExt,
241    ) -> Result<(), Status>
242    where
243        C: ConnectionCreator<N>,
244        N: Node,
245    {
246        assert!(!self.object_request.as_handle_ref().is_invalid());
247        if protocols.is_node() {
248            node::Connection::create(scope, node, protocols, self).await
249        } else {
250            C::create(scope, node, protocols, self).await
251        }
252    }
253
254    /// Constructs a new connection to `node` and spawns an async `Task` that will handle requests
255    /// on the connection. `f` is a callback that constructs the connection but it will not be
256    /// called if the connection is supposed to be a node connection. This should be called from
257    /// within a [`ObjectRequest::handle`] callback.
258    ///
259    /// This method synchronously calls async code and may require spawning an extra Task if the
260    /// async code does something asynchronous. `create_connection` should be preferred if the
261    /// caller is already in an async context.
262    pub fn create_connection_sync<C, N>(
263        self,
264        scope: ExecutionScope,
265        node: Arc<N>,
266        protocols: impl ProtocolsExt,
267    ) where
268        C: ConnectionCreator<N>,
269        N: Node,
270    {
271        assert!(!self.object_request.as_handle_ref().is_invalid());
272        if protocols.is_node() {
273            self.create_connection_sync_or_spawn::<node::Connection<N>, N>(scope, node, protocols);
274        } else {
275            self.create_connection_sync_or_spawn::<C, N>(scope, node, protocols);
276        }
277    }
278
279    fn create_connection_sync_or_spawn<C, N>(
280        self,
281        scope: ExecutionScope,
282        node: Arc<N>,
283        protocols: impl ProtocolsExt,
284    ) where
285        C: ConnectionCreator<N>,
286        N: Node,
287    {
288        let scope2 = scope.clone();
289        let fut = self.handle_async(async |object_request| {
290            C::create(scope2, node, protocols, object_request).await
291        });
292        run_synchronous_future_or_spawn(scope, fut);
293    }
294}
295
296pub type ObjectRequestRef<'a> = &'a mut ObjectRequest;
297
298#[derive(Clone, Copy, Debug, PartialEq)]
299#[allow(dead_code)]
300pub(crate) enum ObjectRequestSend {
301    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
302    OnOpen,
303    OnRepresentation,
304    Nothing,
305}
306
307/// Trait to get either fio::Representation or fio::NodeInfoDeprecated.  Connection types
308/// should implement this.
309pub trait Representation {
310    /// The protocol used for the connection.
311    type Protocol: ProtocolMarker;
312
313    /// Returns io2's Representation for the object.
314    fn get_representation(
315        &self,
316        requested_attributes: fio::NodeAttributesQuery,
317    ) -> impl Future<Output = Result<fio::Representation, Status>> + Send;
318
319    /// Returns io1's NodeInfoDeprecated.
320    #[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
321    fn node_info(&self) -> impl Future<Output = Result<fio::NodeInfoDeprecated, Status>> + Send;
322}
323
324/// Convenience trait for converting [`fio::Flags`] and [`fio::OpenFlags`] into ObjectRequest.
325///
326/// If [`fio::Options`] need to be specified, use [`ObjectRequest::new`].
327pub trait ToObjectRequest: ProtocolsExt {
328    fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest;
329}
330
331impl ToObjectRequest for fio::OpenFlags {
332    fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest {
333        ObjectRequest::new_deprecated(
334            object_request.into().into(),
335            #[cfg(any(
336                fuchsia_api_level_at_least = "PLATFORM",
337                not(fuchsia_api_level_at_least = "NEXT")
338            ))]
339            if self.contains(fio::OpenFlags::DESCRIBE) {
340                ObjectRequestSend::OnOpen
341            } else {
342                ObjectRequestSend::Nothing
343            },
344            #[cfg(not(any(
345                fuchsia_api_level_at_least = "PLATFORM",
346                not(fuchsia_api_level_at_least = "NEXT")
347            )))]
348            ObjectRequestSend::Nothing,
349            fio::NodeAttributesQuery::empty(),
350            None,
351            self.is_truncate(),
352        )
353    }
354}
355
356impl ToObjectRequest for fio::Flags {
357    fn to_object_request(&self, object_request: impl Into<fidl::NullableHandle>) -> ObjectRequest {
358        ObjectRequest::new(*self, &Default::default(), object_request.into().into())
359    }
360}
361
362#[cfg(any(fuchsia_api_level_at_least = "PLATFORM", not(fuchsia_api_level_at_least = "NEXT")))]
363fn send_on_open(
364    control_handle: &fio::NodeControlHandle,
365    node_info: fio::NodeInfoDeprecated,
366) -> Result<(), Status> {
367    control_handle
368        .send_on_open_(Status::OK.into_raw(), Some(node_info))
369        .map_err(|_| Status::PEER_CLOSED)
370}
371
372/// Trait for constructing connections to nodes.
373pub trait ConnectionCreator<T: Node> {
374    /// Creates a new connection to `node` and spawns a new `Task` to run the connection.
375    fn create<'a>(
376        scope: ExecutionScope,
377        node: Arc<T>,
378        protocols: impl ProtocolsExt,
379        object_request: ObjectRequestRef<'a>,
380    ) -> impl Future<Output = Result<(), Status>> + Send + 'a;
381}
382
383/// Synchronously polls `future` with the expectation that it won't return Pending. If the future
384/// does return Pending then this function will spawn a Task to run the future.
385pub(crate) fn run_synchronous_future_or_spawn(
386    scope: ExecutionScope,
387    future: impl Future<Output = ()> + Send + 'static,
388) {
389    let mut task = scope.new_task(future);
390    let mut cx = std::task::Context::from_waker(std::task::Waker::noop());
391
392    match task.poll_unpin(&mut cx) {
393        std::task::Poll::Pending => task.spawn(),
394        std::task::Poll::Ready(()) => {}
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use std::future::ready;
401
402    use crate::execution_scope::yield_to_executor;
403
404    use super::*;
405
406    #[fuchsia::test]
407    async fn test_run_synchronous_future_or_spawn_with_sync_future() {
408        let scope = ExecutionScope::new();
409        run_synchronous_future_or_spawn(scope.clone(), ready(()));
410        scope.wait().await;
411    }
412
413    #[fuchsia::test]
414    async fn test_run_synchronous_future_or_spawn_with_async_future() {
415        let scope = ExecutionScope::new();
416        run_synchronous_future_or_spawn(scope.clone(), yield_to_executor());
417        scope.wait().await;
418    }
419}