blobfs/
mock.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Mock implementation of blobfs for blobfs::Client.
6
7use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use vfs::attributes;
14use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
15use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17/// A testing server implementation of /blob.
18///
19/// Mock does not handle requests until instructed to do so.
20pub struct Mock {
21    pub(super) stream: fio::DirectoryRequestStream,
22}
23
24impl Mock {
25    /// Consume the next directory request, verifying it is intended to read the blob identified
26    /// by `merkle`.  Returns a `Blob` representing the open blob file.
27    ///
28    /// # Panics
29    ///
30    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
31    pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
32        match self.stream.next().await {
33            Some(Ok(fio::DirectoryRequest::Open {
34                path,
35                flags,
36                options: _,
37                object,
38                control_handle: _,
39            })) => {
40                assert_eq!(path, merkle.to_string());
41                assert!(flags.contains(fio::PERM_READABLE));
42                assert!(
43                    !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
44                );
45
46                let stream =
47                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
48                        .cast_stream();
49                Blob { stream }
50            }
51            other => panic!("unexpected request: {other:?}"),
52        }
53    }
54
55    /// Consume the next directory request, verifying it is intended to create the blob identified
56    /// by `merkle`.  Returns a `Blob` representing the open blob file.
57    ///
58    /// # Panics
59    ///
60    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
61    pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
62        match self.stream.next().await {
63            Some(Ok(fio::DirectoryRequest::Open {
64                path,
65                flags,
66                options: _,
67                object,
68                control_handle: _,
69            })) => {
70                assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
71                assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
72                let stream =
73                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
74                        .cast_stream();
75                Blob { stream }
76            }
77            other => panic!("unexpected request: {other:?}"),
78        }
79    }
80
81    async fn handle_rewind(&mut self) {
82        match self.stream.next().await {
83            Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
84                responder.send(Status::OK.into_raw()).unwrap();
85            }
86            other => panic!("unexpected request: {other:?}"),
87        }
88    }
89
90    /// Consume directory requests, verifying they are requests to read directory entries.  Respond
91    /// with dirents constructed from the given entries.
92    ///
93    /// # Panics
94    ///
95    /// Panics on error or assertion violation (unexpected requests or not all entries are read)
96    pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
97        // fuchsia_fs::directory starts by resetting the directory channel's readdir position.
98        self.handle_rewind().await;
99
100        const NAME_LEN: usize = 64;
101        #[repr(C, packed)]
102        struct Dirent {
103            ino: u64,
104            size: u8,
105            kind: u8,
106            name: [u8; NAME_LEN],
107        }
108
109        impl Dirent {
110            fn as_bytes(&self) -> &'_ [u8] {
111                let start = self as *const Self as *const u8;
112                // Safe because the FIDL wire format for directory entries is
113                // defined to be the C packed struct representation used here.
114                unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
115            }
116        }
117
118        let mut entries_iter = entries.map(|hash| Dirent {
119            ino: fio::INO_UNKNOWN,
120            size: NAME_LEN as u8,
121            kind: fio::DirentType::File.into_primitive(),
122            name: hash.to_string().as_bytes().try_into().unwrap(),
123        });
124
125        loop {
126            match self.stream.try_next().await.unwrap() {
127                Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
128                    let max_bytes = max_bytes as usize;
129                    assert!(max_bytes >= std::mem::size_of::<Dirent>());
130
131                    let mut buf = vec![];
132                    while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
133                        match entries_iter.next() {
134                            Some(need) => {
135                                buf.extend(need.as_bytes());
136                            }
137                            None => break,
138                        }
139                    }
140
141                    responder.send(Status::OK.into_raw(), &buf).unwrap();
142
143                    // Finish after providing an empty chunk.
144                    if buf.is_empty() {
145                        break;
146                    }
147                }
148                Some(other) => panic!("unexpected request: {other:?}"),
149                None => panic!("unexpected stream termination"),
150            }
151        }
152    }
153
154    /// Consume N directory requests, verifying they are intended to determine whether the blobs
155    /// specified `readable` and `missing` are readable or not, responding to the check based on
156    /// which collection the hash is in.
157    ///
158    /// # Panics
159    ///
160    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
161    pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
162        let mut readable = readable.iter().copied().collect::<HashSet<_>>();
163        let mut missing = missing.iter().copied().collect::<HashSet<_>>();
164
165        while !(readable.is_empty() && missing.is_empty()) {
166            match self.stream.next().await {
167                Some(Ok(fio::DirectoryRequest::Open {
168                    path,
169                    flags,
170                    options: _,
171                    object,
172                    control_handle: _,
173                })) => {
174                    assert!(flags.contains(fio::PERM_READABLE));
175                    assert!(
176                        !flags.intersects(
177                            fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE
178                        )
179                    );
180                    let path: Hash = path.parse().unwrap();
181
182                    let stream =
183                        fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
184                            .cast_stream();
185                    let blob = Blob { stream };
186                    if readable.remove(&path) {
187                        blob.succeed_open_with_blob_readable().await;
188                    } else if missing.remove(&path) {
189                        blob.fail_open_with_not_found();
190                    } else {
191                        panic!("Unexpected blob existance check for {path}");
192                    }
193                }
194                other => panic!("unexpected request: {other:?}"),
195            }
196        }
197    }
198
199    /// Expects and handles a call to [`Client::filter_to_missing_blobs`].
200    /// Verifies the call intends to determine whether the blobs specified in `readable` and
201    /// `missing` are readable or not, responding to the check based on which collection the hash is
202    /// in.
203    ///
204    /// # Panics
205    ///
206    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
207    pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
208        &mut self,
209        readable: &[Hash],
210        missing: &[Hash],
211    ) {
212        self.expect_readable_missing_checks(readable, missing).await;
213    }
214
215    /// Asserts that the request stream closes without any further requests.
216    ///
217    /// # Panics
218    ///
219    /// Panics on error
220    pub async fn expect_done(mut self) {
221        match self.stream.next().await {
222            None => {}
223            Some(request) => panic!("unexpected request: {request:?}"),
224        }
225    }
226}
227
228/// A testing server implementation of an open /blob/<merkle> file.
229///
230/// Blob does not send the OnOpen event or handle requests until instructed to do so.
231pub struct Blob {
232    stream: fio::FileRequestStream,
233}
234
235impl Blob {
236    fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
237        let event = fidl::Event::create();
238        event.signal_handle(zx::Signals::NONE, signals).unwrap();
239
240        let info =
241            fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
242        let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
243    }
244
245    fn send_on_open(&mut self, status: Status) {
246        self.send_on_open_with_file_signals(status, zx::Signals::NONE);
247    }
248
249    fn send_on_open_with_readable(&mut self, status: Status) {
250        // Send USER_0 signal to indicate that the blob is available.
251        self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
252    }
253
254    fn fail_open_with_error(mut self, status: Status) {
255        assert_ne!(status, Status::OK);
256        self.send_on_open(status);
257    }
258
259    /// Fail the open request with an error indicating the blob already exists.
260    ///
261    /// # Panics
262    ///
263    /// Panics on error
264    pub fn fail_open_with_already_exists(self) {
265        self.fail_open_with_error(Status::ACCESS_DENIED);
266    }
267
268    /// Fail the open request with an error indicating the blob does not exist.
269    ///
270    /// # Panics
271    ///
272    /// Panics on error
273    pub fn fail_open_with_not_found(self) {
274        self.fail_open_with_error(Status::NOT_FOUND);
275    }
276
277    /// Fail the open request with a generic IO error.
278    ///
279    /// # Panics
280    ///
281    /// Panics on error
282    pub fn fail_open_with_io_error(self) {
283        self.fail_open_with_error(Status::IO);
284    }
285
286    /// Succeeds the open request, but indicate the blob is not yet readable by not asserting the
287    /// USER_0 signal on the file event handle, then asserts that the connection to the blob is
288    /// closed.
289    ///
290    /// # Panics
291    ///
292    /// Panics on error
293    pub async fn fail_open_with_not_readable(mut self) {
294        self.send_on_open(Status::OK);
295        self.expect_done().await;
296    }
297
298    /// Succeeds the open request, indicating that the blob is readable, then asserts that the
299    /// connection to the blob is closed.
300    ///
301    /// # Panics
302    ///
303    /// Panics on error
304    pub async fn succeed_open_with_blob_readable(mut self) {
305        self.send_on_open_with_readable(Status::OK);
306        self.expect_done().await;
307    }
308
309    /// Succeeds the open request, then verifies the blob is immediately closed (possibly after
310    /// handling a single Close request).
311    ///
312    /// # Panics
313    ///
314    /// Panics on error
315    pub async fn expect_close(mut self) {
316        self.send_on_open_with_readable(Status::OK);
317
318        match self.stream.next().await {
319            None => {}
320            Some(Ok(fio::FileRequest::Close { responder })) => {
321                let _ = responder.send(Ok(()));
322                self.expect_done().await;
323            }
324            Some(other) => panic!("unexpected request: {other:?}"),
325        }
326    }
327
328    /// Asserts that the request stream closes without any further requests.
329    ///
330    /// # Panics
331    ///
332    /// Panics on error
333    pub async fn expect_done(mut self) {
334        match self.stream.next().await {
335            None => {}
336            Some(request) => panic!("unexpected request: {request:?}"),
337        }
338    }
339
340    async fn handle_read(&mut self, data: &[u8]) -> usize {
341        match self.stream.next().await {
342            Some(Ok(fio::FileRequest::Read { count, responder })) => {
343                let count = min(count.try_into().unwrap(), data.len());
344                responder.send(Ok(&data[..count])).unwrap();
345                count
346            }
347            other => panic!("unexpected request: {other:?}"),
348        }
349    }
350
351    /// Succeeds the open request, then handle read request with the given blob data.
352    ///
353    /// # Panics
354    ///
355    /// Panics on error
356    pub async fn expect_read(mut self, blob: &[u8]) {
357        self.send_on_open_with_readable(Status::OK);
358
359        let mut rest = blob;
360        while !rest.is_empty() {
361            let count = self.handle_read(rest).await;
362            rest = &rest[count..];
363        }
364
365        // Handle one extra request with empty buffer to signal EOF.
366        self.handle_read(rest).await;
367
368        match self.stream.next().await {
369            None => {}
370            Some(Ok(fio::FileRequest::Close { responder })) => {
371                let _ = responder.send(Ok(()));
372            }
373            Some(other) => panic!("unexpected request: {other:?}"),
374        }
375    }
376
377    /// Succeeds the open request. Then handles get_attr, read, read_at, and possibly a final close
378    /// requests with the given blob data.
379    ///
380    /// # Panics
381    ///
382    /// Panics on error
383    pub async fn serve_contents(mut self, data: &[u8]) {
384        self.send_on_open_with_readable(Status::OK);
385
386        let mut pos: usize = 0;
387
388        loop {
389            match self.stream.next().await {
390                Some(Ok(fio::FileRequest::Read { count, responder })) => {
391                    let avail = data.len() - pos;
392                    let count = min(count.try_into().unwrap(), avail);
393                    responder.send(Ok(&data[pos..pos + count])).unwrap();
394                    pos += count;
395                }
396                Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
397                    let pos: usize = offset.try_into().unwrap();
398                    let avail = data.len() - pos;
399                    let count = min(count.try_into().unwrap(), avail);
400                    responder.send(Ok(&data[pos..pos + count])).unwrap();
401                }
402                Some(Ok(fio::FileRequest::GetAttributes { query, responder })) => {
403                    let attrs = attributes!(
404                        query,
405                        Mutable { creation_time: 0, modification_time: 0, mode: 0 },
406                        Immutable {
407                            protocols: fio::NodeProtocolKinds::FILE,
408                            content_size: data.len() as u64,
409                            storage_size: 0,
410                            link_count: 0,
411                            id: 0,
412                        }
413                    );
414                    responder
415                        .send(Ok((&attrs.mutable_attributes, &attrs.immutable_attributes)))
416                        .unwrap();
417                }
418                Some(Ok(fio::FileRequest::Close { responder })) => {
419                    let _ = responder.send(Ok(()));
420                    return;
421                }
422                Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
423                    assert!(flags.contains(fio::VmoFlags::READ));
424                    assert!(!flags.contains(fio::VmoFlags::WRITE));
425                    assert!(!flags.contains(fio::VmoFlags::EXECUTE));
426                    let vmo = zx::Vmo::create(data.len() as u64).unwrap();
427                    vmo.write(data, 0).unwrap();
428                    let vmo = vmo
429                        .replace_handle(
430                            zx::Rights::READ
431                                | zx::Rights::BASIC
432                                | zx::Rights::MAP
433                                | zx::Rights::GET_PROPERTY,
434                        )
435                        .unwrap();
436                    responder.send(Ok(vmo)).unwrap();
437                }
438                None => {
439                    return;
440                }
441                other => panic!("unexpected request: {other:?}"),
442            }
443        }
444    }
445
446    async fn handle_truncate(&mut self, status: Status) -> u64 {
447        match self.stream.next().await {
448            Some(Ok(fio::FileRequest::Resize { length, responder })) => {
449                responder
450                    .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
451                    .unwrap();
452
453                length
454            }
455            other => panic!("unexpected request: {other:?}"),
456        }
457    }
458
459    async fn expect_truncate(&mut self) -> u64 {
460        self.handle_truncate(Status::OK).await
461    }
462
463    async fn handle_write(&mut self, status: Status) -> Vec<u8> {
464        match self.stream.next().await {
465            Some(Ok(fio::FileRequest::Write { data, responder })) => {
466                responder
467                    .send(if status == Status::OK {
468                        Ok(data.len() as u64)
469                    } else {
470                        Err(status.into_raw())
471                    })
472                    .unwrap();
473
474                data
475            }
476            other => panic!("unexpected request: {other:?}"),
477        }
478    }
479
480    async fn fail_write_with_status(mut self, status: Status) {
481        self.send_on_open(Status::OK);
482
483        let length = self.expect_truncate().await;
484        // divide rounding up
485        let expected_write_calls = length.div_ceil(fio::MAX_BUF);
486        for _ in 0..(expected_write_calls - 1) {
487            self.handle_write(Status::OK).await;
488        }
489        self.handle_write(status).await;
490    }
491
492    /// Succeeds the open request, consumes the truncate request, the initial write calls, then
493    /// fails the final write indicating the written data was corrupt.
494    ///
495    /// # Panics
496    ///
497    /// Panics on error
498    pub async fn fail_write_with_corrupt(self) {
499        self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
500    }
501
502    /// Succeeds the open request, then returns a future that, when awaited, verifies the blob is
503    /// truncated, written, and closed with the given `expected` payload.
504    ///
505    /// # Panics
506    ///
507    /// Panics on error
508    pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
509        self.send_on_open(Status::OK);
510
511        async move {
512            assert_eq!(self.expect_truncate().await, expected.len() as u64);
513
514            let mut rest = expected;
515            while !rest.is_empty() {
516                let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
517                    &rest[..fio::MAX_BUF as usize]
518                } else {
519                    rest
520                };
521                assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
522                rest = &rest[expected_chunk.len()..];
523            }
524
525            match self.stream.next().await {
526                Some(Ok(fio::FileRequest::Close { responder })) => {
527                    responder.send(Ok(())).unwrap();
528                }
529                other => panic!("unexpected request: {other:?}"),
530            }
531
532            self.expect_done().await;
533        }
534    }
535}