1use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use vfs::attributes;
14use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
15use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17pub struct Mock {
21 pub(super) stream: fio::DirectoryRequestStream,
22}
23
24impl Mock {
25 pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
32 match self.stream.next().await {
33 Some(Ok(fio::DirectoryRequest::Open {
34 path,
35 flags,
36 options: _,
37 object,
38 control_handle: _,
39 })) => {
40 assert_eq!(path, merkle.to_string());
41 assert!(flags.contains(fio::PERM_READABLE));
42 assert!(
43 !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
44 );
45
46 let stream =
47 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
48 .cast_stream();
49 Blob { stream }
50 }
51 other => panic!("unexpected request: {other:?}"),
52 }
53 }
54
55 pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
62 match self.stream.next().await {
63 Some(Ok(fio::DirectoryRequest::Open {
64 path,
65 flags,
66 options: _,
67 object,
68 control_handle: _,
69 })) => {
70 assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
71 assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
72 let stream =
73 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
74 .cast_stream();
75 Blob { stream }
76 }
77 other => panic!("unexpected request: {other:?}"),
78 }
79 }
80
81 async fn handle_rewind(&mut self) {
82 match self.stream.next().await {
83 Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
84 responder.send(Status::OK.into_raw()).unwrap();
85 }
86 other => panic!("unexpected request: {other:?}"),
87 }
88 }
89
90 pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
97 self.handle_rewind().await;
99
100 const NAME_LEN: usize = 64;
101 #[repr(C, packed)]
102 struct Dirent {
103 ino: u64,
104 size: u8,
105 kind: u8,
106 name: [u8; NAME_LEN],
107 }
108
109 impl Dirent {
110 fn as_bytes(&self) -> &'_ [u8] {
111 let start = self as *const Self as *const u8;
112 unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
115 }
116 }
117
118 let mut entries_iter = entries.map(|hash| Dirent {
119 ino: fio::INO_UNKNOWN,
120 size: NAME_LEN as u8,
121 kind: fio::DirentType::File.into_primitive(),
122 name: hash.to_string().as_bytes().try_into().unwrap(),
123 });
124
125 loop {
126 match self.stream.try_next().await.unwrap() {
127 Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
128 let max_bytes = max_bytes as usize;
129 assert!(max_bytes >= std::mem::size_of::<Dirent>());
130
131 let mut buf = vec![];
132 while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
133 match entries_iter.next() {
134 Some(need) => {
135 buf.extend(need.as_bytes());
136 }
137 None => break,
138 }
139 }
140
141 responder.send(Status::OK.into_raw(), &buf).unwrap();
142
143 if buf.is_empty() {
145 break;
146 }
147 }
148 Some(other) => panic!("unexpected request: {other:?}"),
149 None => panic!("unexpected stream termination"),
150 }
151 }
152 }
153
154 pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
162 let mut readable = readable.iter().copied().collect::<HashSet<_>>();
163 let mut missing = missing.iter().copied().collect::<HashSet<_>>();
164
165 while !(readable.is_empty() && missing.is_empty()) {
166 match self.stream.next().await {
167 Some(Ok(fio::DirectoryRequest::Open {
168 path,
169 flags,
170 options: _,
171 object,
172 control_handle: _,
173 })) => {
174 assert!(flags.contains(fio::PERM_READABLE));
175 assert!(
176 !flags.intersects(
177 fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE
178 )
179 );
180 let path: Hash = path.parse().unwrap();
181
182 let stream =
183 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
184 .cast_stream();
185 let blob = Blob { stream };
186 if readable.remove(&path) {
187 blob.succeed_open_with_blob_readable().await;
188 } else if missing.remove(&path) {
189 blob.fail_open_with_not_found();
190 } else {
191 panic!("Unexpected blob existance check for {path}");
192 }
193 }
194 other => panic!("unexpected request: {other:?}"),
195 }
196 }
197 }
198
199 pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
208 &mut self,
209 readable: &[Hash],
210 missing: &[Hash],
211 ) {
212 self.expect_readable_missing_checks(readable, missing).await;
213 }
214
215 pub async fn expect_done(mut self) {
221 match self.stream.next().await {
222 None => {}
223 Some(request) => panic!("unexpected request: {request:?}"),
224 }
225 }
226}
227
228pub struct Blob {
232 stream: fio::FileRequestStream,
233}
234
235impl Blob {
236 fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
237 let event = fidl::Event::create();
238 event.signal_handle(zx::Signals::NONE, signals).unwrap();
239
240 let info =
241 fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
242 let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
243 }
244
245 fn send_on_open(&mut self, status: Status) {
246 self.send_on_open_with_file_signals(status, zx::Signals::NONE);
247 }
248
249 fn send_on_open_with_readable(&mut self, status: Status) {
250 self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
252 }
253
254 fn fail_open_with_error(mut self, status: Status) {
255 assert_ne!(status, Status::OK);
256 self.send_on_open(status);
257 }
258
259 pub fn fail_open_with_already_exists(self) {
265 self.fail_open_with_error(Status::ACCESS_DENIED);
266 }
267
268 pub fn fail_open_with_not_found(self) {
274 self.fail_open_with_error(Status::NOT_FOUND);
275 }
276
277 pub fn fail_open_with_io_error(self) {
283 self.fail_open_with_error(Status::IO);
284 }
285
286 pub async fn fail_open_with_not_readable(mut self) {
294 self.send_on_open(Status::OK);
295 self.expect_done().await;
296 }
297
298 pub async fn succeed_open_with_blob_readable(mut self) {
305 self.send_on_open_with_readable(Status::OK);
306 self.expect_done().await;
307 }
308
309 pub async fn expect_close(mut self) {
316 self.send_on_open_with_readable(Status::OK);
317
318 match self.stream.next().await {
319 None => {}
320 Some(Ok(fio::FileRequest::Close { responder })) => {
321 let _ = responder.send(Ok(()));
322 self.expect_done().await;
323 }
324 Some(other) => panic!("unexpected request: {other:?}"),
325 }
326 }
327
328 pub async fn expect_done(mut self) {
334 match self.stream.next().await {
335 None => {}
336 Some(request) => panic!("unexpected request: {request:?}"),
337 }
338 }
339
340 async fn handle_read(&mut self, data: &[u8]) -> usize {
341 match self.stream.next().await {
342 Some(Ok(fio::FileRequest::Read { count, responder })) => {
343 let count = min(count.try_into().unwrap(), data.len());
344 responder.send(Ok(&data[..count])).unwrap();
345 count
346 }
347 other => panic!("unexpected request: {other:?}"),
348 }
349 }
350
351 pub async fn expect_read(mut self, blob: &[u8]) {
357 self.send_on_open_with_readable(Status::OK);
358
359 let mut rest = blob;
360 while !rest.is_empty() {
361 let count = self.handle_read(rest).await;
362 rest = &rest[count..];
363 }
364
365 self.handle_read(rest).await;
367
368 match self.stream.next().await {
369 None => {}
370 Some(Ok(fio::FileRequest::Close { responder })) => {
371 let _ = responder.send(Ok(()));
372 }
373 Some(other) => panic!("unexpected request: {other:?}"),
374 }
375 }
376
377 pub async fn serve_contents(mut self, data: &[u8]) {
384 self.send_on_open_with_readable(Status::OK);
385
386 let mut pos: usize = 0;
387
388 loop {
389 match self.stream.next().await {
390 Some(Ok(fio::FileRequest::Read { count, responder })) => {
391 let avail = data.len() - pos;
392 let count = min(count.try_into().unwrap(), avail);
393 responder.send(Ok(&data[pos..pos + count])).unwrap();
394 pos += count;
395 }
396 Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
397 let pos: usize = offset.try_into().unwrap();
398 let avail = data.len() - pos;
399 let count = min(count.try_into().unwrap(), avail);
400 responder.send(Ok(&data[pos..pos + count])).unwrap();
401 }
402 Some(Ok(fio::FileRequest::GetAttributes { query, responder })) => {
403 let attrs = attributes!(
404 query,
405 Mutable { creation_time: 0, modification_time: 0, mode: 0 },
406 Immutable {
407 protocols: fio::NodeProtocolKinds::FILE,
408 content_size: data.len() as u64,
409 storage_size: 0,
410 link_count: 0,
411 id: 0,
412 }
413 );
414 responder
415 .send(Ok((&attrs.mutable_attributes, &attrs.immutable_attributes)))
416 .unwrap();
417 }
418 Some(Ok(fio::FileRequest::Close { responder })) => {
419 let _ = responder.send(Ok(()));
420 return;
421 }
422 Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
423 assert!(flags.contains(fio::VmoFlags::READ));
424 assert!(!flags.contains(fio::VmoFlags::WRITE));
425 assert!(!flags.contains(fio::VmoFlags::EXECUTE));
426 let vmo = zx::Vmo::create(data.len() as u64).unwrap();
427 vmo.write(data, 0).unwrap();
428 let vmo = vmo
429 .replace_handle(
430 zx::Rights::READ
431 | zx::Rights::BASIC
432 | zx::Rights::MAP
433 | zx::Rights::GET_PROPERTY,
434 )
435 .unwrap();
436 responder.send(Ok(vmo)).unwrap();
437 }
438 None => {
439 return;
440 }
441 other => panic!("unexpected request: {other:?}"),
442 }
443 }
444 }
445
446 async fn handle_truncate(&mut self, status: Status) -> u64 {
447 match self.stream.next().await {
448 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
449 responder
450 .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
451 .unwrap();
452
453 length
454 }
455 other => panic!("unexpected request: {other:?}"),
456 }
457 }
458
459 async fn expect_truncate(&mut self) -> u64 {
460 self.handle_truncate(Status::OK).await
461 }
462
463 async fn handle_write(&mut self, status: Status) -> Vec<u8> {
464 match self.stream.next().await {
465 Some(Ok(fio::FileRequest::Write { data, responder })) => {
466 responder
467 .send(if status == Status::OK {
468 Ok(data.len() as u64)
469 } else {
470 Err(status.into_raw())
471 })
472 .unwrap();
473
474 data
475 }
476 other => panic!("unexpected request: {other:?}"),
477 }
478 }
479
480 async fn fail_write_with_status(mut self, status: Status) {
481 self.send_on_open(Status::OK);
482
483 let length = self.expect_truncate().await;
484 let expected_write_calls = length.div_ceil(fio::MAX_BUF);
486 for _ in 0..(expected_write_calls - 1) {
487 self.handle_write(Status::OK).await;
488 }
489 self.handle_write(status).await;
490 }
491
492 pub async fn fail_write_with_corrupt(self) {
499 self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
500 }
501
502 pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
509 self.send_on_open(Status::OK);
510
511 async move {
512 assert_eq!(self.expect_truncate().await, expected.len() as u64);
513
514 let mut rest = expected;
515 while !rest.is_empty() {
516 let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
517 &rest[..fio::MAX_BUF as usize]
518 } else {
519 rest
520 };
521 assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
522 rest = &rest[expected_chunk.len()..];
523 }
524
525 match self.stream.next().await {
526 Some(Ok(fio::FileRequest::Close { responder })) => {
527 responder.send(Ok(())).unwrap();
528 }
529 other => panic!("unexpected request: {other:?}"),
530 }
531
532 self.expect_done().await;
533 }
534 }
535}