starnix_core/device/
remote_block_device.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::device::DeviceMode;
6use crate::device::kobject::DeviceMetadata;
7use crate::fs::sysfs::{BlockDeviceInfo, build_block_device_directory};
8use crate::mm::MemoryAccessorExt;
9use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
10use crate::task::{CurrentTask, KernelThreads, LockedAndTask};
11use crate::vfs::buffers::{InputBuffer, OutputBuffer};
12use crate::vfs::{
13    FileObject, FileOps, FsString, NamespaceNode, SeekTarget, default_ioctl, default_seek,
14};
15use anyhow::{Context as _, Error};
16use block_client::{BlockClient, BufferSlice, MutableBufferSlice, RemoteBlockClient};
17use fidl::endpoints::ClientEnd;
18use fidl_fuchsia_storage_block::BlockMarker;
19use futures::channel::oneshot;
20use futures::executor::block_on;
21use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, Unlocked};
22use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
23use starnix_uapi::device_type::{BLOCK_EXTENDED_MAJOR, DeviceType};
24use starnix_uapi::errors::Errno;
25use starnix_uapi::open_flags::OpenFlags;
26use starnix_uapi::user_address::UserRef;
27use starnix_uapi::{BLKGETSIZE, BLKGETSIZE64, errno, from_status_like_fdio, off_t};
28use std::collections::btree_map::BTreeMap;
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU32, Ordering};
31
32/// A block device, backed by a partition hosted by Fuchsia.
33pub struct RemoteBlockDevice {
34    block_client: Arc<SyncBlockClient>,
35}
36
37impl RemoteBlockDevice {
38    pub fn read(&self, offset: u64, buf: &mut [u8]) -> Result<(), Error> {
39        self.block_client
40            .read_at(MutableBufferSlice::Memory(buf), offset as u64)
41            .context("read_at failed")
42    }
43
44    fn new<L>(
45        locked: &mut Locked<L>,
46        current_task: &CurrentTask,
47        minor: u32,
48        name: &str,
49        block: ClientEnd<BlockMarker>,
50    ) -> Result<Arc<Self>, Errno>
51    where
52        L: LockEqualOrBefore<FileOpsCore>,
53    {
54        let kernel = current_task.kernel();
55        let registry = &kernel.device_registry;
56        let device_name = FsString::from(name);
57        let virtual_block_class = registry.objects.virtual_block_class();
58        let block_client = SyncBlockClient::new(&kernel.kthreads, block)?;
59        let device = Arc::new(Self { block_client });
60        let device_weak = Arc::<RemoteBlockDevice>::downgrade(&device);
61        registry.add_device(
62            locked,
63            current_task,
64            device_name.as_ref(),
65            DeviceMetadata::new(
66                device_name.clone(),
67                DeviceType::new(BLOCK_EXTENDED_MAJOR, minor),
68                DeviceMode::Block,
69            ),
70            virtual_block_class,
71            |device, dir| build_block_device_directory(device, device_weak, dir),
72        )?;
73        Ok(device)
74    }
75
76    pub fn create_file_ops(&self) -> Box<dyn FileOps> {
77        Box::new(RemoteBlockDeviceFile { block_client: self.block_client.clone() })
78    }
79}
80
81impl BlockDeviceInfo for RemoteBlockDevice {
82    fn size(&self) -> Result<usize, Errno> {
83        (self.block_client.block_count() as usize)
84            .checked_mul(self.block_client.block_size() as usize)
85            .ok_or_else(|| errno!(EINVAL))
86    }
87}
88
89pub struct SyncBlockClient {
90    client: Arc<RemoteBlockClient>,
91    terminate_tx: Option<oneshot::Sender<()>>,
92}
93
94impl SyncBlockClient {
95    fn new(kthreads: &KernelThreads, block: ClientEnd<BlockMarker>) -> Result<Arc<Self>, Errno> {
96        let (init_tx, init_rx) = std::sync::mpsc::channel();
97        let (terminate_tx, terminate_rx) = oneshot::channel();
98
99        // Spawn a thread to run the executor.
100        let closure = move |_: LockedAndTask<'_>| async move {
101            let proxy = block.into_proxy();
102            match RemoteBlockClient::new(proxy).await {
103                Ok(client) => {
104                    let _ = init_tx.send(Ok(Arc::new(Self {
105                        client: Arc::new(client),
106                        terminate_tx: Some(terminate_tx),
107                    })));
108                }
109                Err(e) => {
110                    let _ = init_tx.send(Err(e));
111                    return;
112                }
113            }
114            // RemoteBlockClient::new() spawns a future on this closure's executor to handle the
115            // block fifo. This keeps the executor alive and handling the fifo until the client is
116            // dropped.
117            let _ = terminate_rx.await;
118        };
119
120        let req = SpawnRequestBuilder::new()
121            .with_debug_name("remote-block-client")
122            .with_async_closure(closure)
123            .build();
124        kthreads.spawner().spawn_from_request(req);
125
126        match init_rx.recv() {
127            Ok(Ok(client)) => Ok(client),
128            Ok(Err(status)) => Err(from_status_like_fdio!(status)),
129            Err(_) => Err(errno!(EINVAL)),
130        }
131    }
132
133    fn block_size(&self) -> u32 {
134        self.client.block_size()
135    }
136
137    fn block_count(&self) -> u64 {
138        self.client.block_count()
139    }
140
141    fn read_at(
142        &self,
143        buffer_slice: MutableBufferSlice<'_>,
144        device_offset: u64,
145    ) -> Result<(), zx::Status> {
146        // TODO(https://fxbug.dev/475530917): block_on is uninterruptible. Once there is an
147        // interruptible block_on, switch to that. For now this is okay because we expect the block
148        // to be brief in most cases.
149        block_on(self.client.read_at(buffer_slice, device_offset))
150    }
151
152    fn write_at(
153        &self,
154        buffer_slice: BufferSlice<'_>,
155        device_offset: u64,
156    ) -> Result<(), zx::Status> {
157        // TODO(https://fxbug.dev/475530917): block_on is uninterruptible. Once there is an
158        // interruptible block_on, switch to that. For now this is okay because we expect the block
159        // to be brief in most cases.
160        block_on(self.client.write_at(buffer_slice, device_offset))
161    }
162
163    fn flush(&self) -> Result<(), zx::Status> {
164        // TODO(https://fxbug.dev/475530917): block_on is uninterruptible. Once there is an
165        // interruptible block_on, switch to that. For now this is okay because we expect the block
166        // to be brief in most cases.
167        block_on(self.client.flush())
168    }
169}
170
171impl Drop for SyncBlockClient {
172    fn drop(&mut self) {
173        if let Some(tx) = self.terminate_tx.take() {
174            let _ = tx.send(());
175        }
176    }
177}
178
179struct RemoteBlockDeviceFile {
180    block_client: Arc<SyncBlockClient>,
181}
182
183impl RemoteBlockDeviceFile {
184    fn size(&self) -> Result<usize, Errno> {
185        (self.block_client.block_count() as usize)
186            .checked_mul(self.block_client.block_size() as usize)
187            .ok_or_else(|| errno!(EINVAL))
188    }
189}
190
191impl FileOps for RemoteBlockDeviceFile {
192    fn has_persistent_offsets(&self) -> bool {
193        true
194    }
195
196    fn is_seekable(&self) -> bool {
197        true
198    }
199
200    // Manually implement seek, because default_eof_offset uses st_size (which is not used for block
201    // devices).
202    fn seek(
203        &self,
204        _locked: &mut Locked<FileOpsCore>,
205        _file: &FileObject,
206        _current_task: &CurrentTask,
207        current_offset: off_t,
208        target: SeekTarget,
209    ) -> Result<off_t, Errno> {
210        default_seek(current_offset, target, || self.size()?.try_into().map_err(|_| errno!(EINVAL)))
211    }
212
213    fn read(
214        &self,
215        _locked: &mut Locked<FileOpsCore>,
216        _file: &FileObject,
217        _current_task: &CurrentTask,
218        mut offset: usize,
219        data: &mut dyn OutputBuffer,
220    ) -> Result<usize, Errno> {
221        let size = self.size()?;
222        let block_size = self.block_client.block_size() as usize;
223        const MAX_CHUNK_SIZE: usize = 32 * 1024; // 32KB
224
225        let mut total_read = 0;
226        while data.available() > 0 && offset < size {
227            let chunk_len = std::cmp::min(data.available(), MAX_CHUNK_SIZE);
228            let chunk_len = std::cmp::min(chunk_len, size - offset);
229            if chunk_len == 0 {
230                break;
231            }
232
233            let aligned_offset = offset - offset % block_size;
234            let end_offset = offset + chunk_len;
235            let aligned_end_offset = std::cmp::min(
236                end_offset.checked_next_multiple_of(block_size).ok_or_else(|| errno!(EINVAL))?,
237                size,
238            );
239            let aligned_data_length = aligned_end_offset - aligned_offset;
240
241            let mut read_data = vec![0u8; aligned_data_length];
242            self.block_client
243                .read_at(MutableBufferSlice::Memory(&mut read_data), aligned_offset as u64)
244                .map_err(|status| from_status_like_fdio!(status))?;
245
246            let read_offset = offset - aligned_offset;
247            let read_end = read_offset + chunk_len;
248            let bytes_written = data.write(&read_data[read_offset..read_end])?;
249
250            offset += bytes_written;
251            total_read += bytes_written;
252
253            if bytes_written < chunk_len {
254                break;
255            }
256        }
257        Ok(total_read)
258    }
259
260    fn write(
261        &self,
262        _locked: &mut Locked<FileOpsCore>,
263        _file: &FileObject,
264        _current_task: &CurrentTask,
265        mut offset: usize,
266        data: &mut dyn InputBuffer,
267    ) -> Result<usize, Errno> {
268        let size = self.size()?;
269        let block_size = self.block_client.block_size() as usize;
270        const MAX_CHUNK_SIZE: usize = 32 * 1024; // 32KB
271
272        let mut total_written = 0;
273        while data.available() > 0 && offset < size {
274            let chunk_len = std::cmp::min(data.available(), MAX_CHUNK_SIZE);
275            let chunk_len = std::cmp::min(chunk_len, size - offset);
276            if chunk_len == 0 {
277                break;
278            }
279
280            let aligned_offset = offset - offset % block_size;
281            let end_offset = offset + chunk_len;
282            let aligned_end_offset = std::cmp::min(
283                end_offset.checked_next_multiple_of(block_size).ok_or_else(|| errno!(EINVAL))?,
284                size,
285            );
286            let aligned_data_length = aligned_end_offset - aligned_offset;
287
288            let mut write_buf = vec![0u8; aligned_data_length];
289
290            // Read-Modify-Write: If the write is not block-aligned at the start or end,
291            // we need to read the existing data for the first and/or last block to preserve it.
292            let head_unaligned = offset > aligned_offset;
293            let tail_unaligned = end_offset < aligned_end_offset;
294
295            if head_unaligned {
296                self.block_client
297                    .read_at(
298                        MutableBufferSlice::Memory(&mut write_buf[..block_size]),
299                        aligned_offset as u64,
300                    )
301                    .map_err(|status| from_status_like_fdio!(status))?;
302            }
303
304            if tail_unaligned {
305                let last_block_start = aligned_data_length - block_size;
306                // If we already read the first block and it's the same as the last block, don't
307                // read again.
308                if !head_unaligned || last_block_start > 0 {
309                    self.block_client
310                        .read_at(
311                            MutableBufferSlice::Memory(&mut write_buf[last_block_start..]),
312                            (aligned_offset + last_block_start) as u64,
313                        )
314                        .map_err(|status| from_status_like_fdio!(status))?;
315                }
316            }
317
318            let write_offset = offset - aligned_offset;
319            let write_slice = &mut write_buf[write_offset..write_offset + chunk_len];
320            // SAFETY: We are writing to a buffer of u8, which is always initialized.
321            // We can safely cast &mut [u8] to &mut [MaybeUninit<u8>].
322            let write_slice_uninit = unsafe {
323                std::slice::from_raw_parts_mut(
324                    write_slice.as_mut_ptr() as *mut std::mem::MaybeUninit<u8>,
325                    write_slice.len(),
326                )
327            };
328            let bytes_read = data.read(write_slice_uninit)?;
329
330            self.block_client
331                .write_at(BufferSlice::Memory(&write_buf), aligned_offset as u64)
332                .map_err(|status| from_status_like_fdio!(status))?;
333
334            offset += bytes_read;
335            total_written += bytes_read;
336
337            if bytes_read < chunk_len {
338                break;
339            }
340        }
341        Ok(total_written)
342    }
343
344    fn sync(&self, _file: &FileObject, _current_task: &CurrentTask) -> Result<(), Errno> {
345        self.block_client.flush().map_err(|status| from_status_like_fdio!(status))
346    }
347
348    fn ioctl(
349        &self,
350        locked: &mut Locked<Unlocked>,
351        file: &FileObject,
352        current_task: &CurrentTask,
353        request: u32,
354        arg: SyscallArg,
355    ) -> Result<SyscallResult, Errno> {
356        match request {
357            BLKGETSIZE => {
358                let user_size = UserRef::<u64>::from(arg);
359                let size = self.block_client.block_count();
360                current_task.write_object(user_size, &size)?;
361                Ok(SUCCESS)
362            }
363            BLKGETSIZE64 => {
364                let user_size = UserRef::<u64>::from(arg);
365                let size = self.size()? as u64;
366                current_task.write_object(user_size, &size)?;
367                Ok(SUCCESS)
368            }
369            _ => default_ioctl(file, locked, current_task, request, arg),
370        }
371    }
372}
373
374fn open_remote_block_device(
375    _locked: &mut Locked<FileOpsCore>,
376    current_task: &CurrentTask,
377    id: DeviceType,
378    _node: &NamespaceNode,
379    _flags: OpenFlags,
380) -> Result<Box<dyn FileOps>, Errno> {
381    Ok(current_task.kernel().remote_block_device_registry.open(id.minor())?.create_file_ops())
382}
383
384pub fn remote_block_device_init(locked: &mut Locked<Unlocked>, current_task: &CurrentTask) {
385    current_task
386        .kernel()
387        .device_registry
388        .register_major(
389            locked,
390            "remote-block".into(),
391            DeviceMode::Block,
392            BLOCK_EXTENDED_MAJOR,
393            open_remote_block_device,
394        )
395        .expect("remote block device register failed.");
396}
397
398#[derive(Default)]
399pub struct RemoteBlockDeviceRegistry {
400    devices: Mutex<BTreeMap<u32, Arc<RemoteBlockDevice>>>,
401    next_minor: AtomicU32,
402}
403
404impl RemoteBlockDeviceRegistry {
405    pub fn create_remote_block_device<L>(
406        &self,
407        locked: &mut Locked<L>,
408        current_task: &CurrentTask,
409        name: &str,
410        block: ClientEnd<BlockMarker>,
411    ) -> Result<(), Error>
412    where
413        L: LockEqualOrBefore<FileOpsCore>,
414    {
415        let mut devices = self.devices.lock();
416        let minor = self.next_minor.fetch_add(1, Ordering::Relaxed);
417        let device = RemoteBlockDevice::new(locked, current_task, minor, name, block)?;
418        devices.insert(minor, device);
419        Ok(())
420    }
421
422    pub fn open(&self, minor: u32) -> Result<Arc<RemoteBlockDevice>, Errno> {
423        self.devices.lock().get(&minor).ok_or_else(|| errno!(ENODEV)).cloned()
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430    use crate::testing::{anon_test_file, map_object_anywhere, spawn_kernel_and_run};
431    use crate::vfs::{SeekTarget, VecInputBuffer, VecOutputBuffer};
432    use starnix_uapi::open_flags::OpenFlags;
433    use starnix_uapi::{BLKGETSIZE, BLKGETSIZE64};
434    use vmo_backed_block_server::{VmoBackedServer, VmoBackedServerTestingExt};
435    use zerocopy::FromBytes as _;
436
437    #[::fuchsia::test]
438    async fn test_remote_block_device_registry() {
439        spawn_kernel_and_run(async |locked, current_task| {
440            let kernel = current_task.kernel();
441            remote_block_device_init(locked, &current_task);
442            let registry = kernel.remote_block_device_registry.clone();
443            let server = Arc::new(VmoBackedServer::new(2, 512, &[0u8; 1024]));
444            let (client, server_end) = fidl::endpoints::create_endpoints::<BlockMarker>();
445            std::thread::spawn(move || {
446                let mut executor = fuchsia_async::LocalExecutor::default();
447                executor.run_singlethreaded(async move {
448                    use fidl::endpoints::RequestStream;
449                    server.serve(server_end.into_stream().cast_stream()).await.unwrap();
450                });
451            });
452
453            registry
454                .create_remote_block_device(locked, &current_task, "test", client)
455                .expect("create_remote_block_device failed.");
456
457            let device = registry.open(0).expect("open failed.");
458            let file =
459                anon_test_file(locked, &current_task, device.create_file_ops(), OpenFlags::RDWR);
460
461            let arg_addr = map_object_anywhere(locked, &current_task, &0u64);
462            let mut arg = [0u8; 8];
463
464            file.ioctl(locked, &current_task, BLKGETSIZE64, arg_addr.into()).expect("ioctl failed");
465            current_task.read_memory_to_slice(arg_addr, &mut arg).unwrap();
466            assert_eq!(u64::read_from_bytes(&arg).unwrap(), 1024);
467
468            file.ioctl(locked, &current_task, BLKGETSIZE, arg_addr.into()).expect("ioctl failed");
469            current_task.read_memory_to_slice(arg_addr, &mut arg).unwrap();
470            assert_eq!(u64::read_from_bytes(&arg).unwrap(), 2);
471
472            // Deliberately read with a non-block-aligned buffer size. These reads come from
473            // uncontrolled sources so we need to be able to handle the alignment ourselves.
474            let mut buf = VecOutputBuffer::new(256);
475            file.read(locked, &current_task, &mut buf).expect("read failed.");
476            assert_eq!(buf.data(), &[0u8; 256]);
477
478            let mut buf = VecInputBuffer::from(vec![1u8; 256]);
479            file.seek(locked, &current_task, SeekTarget::Set(0)).expect("seek failed");
480            file.write(locked, &current_task, &mut buf).expect("write failed.");
481
482            let mut buf = VecOutputBuffer::new(256);
483            file.seek(locked, &current_task, SeekTarget::Set(0)).expect("seek failed");
484            file.read(locked, &current_task, &mut buf).expect("read failed.");
485            assert_eq!(buf.data(), &[1u8; 256]);
486        })
487        .await;
488    }
489
490    #[::fuchsia::test]
491    async fn test_read_write_past_eof() {
492        spawn_kernel_and_run(async |locked, current_task| {
493            let kernel = current_task.kernel();
494            remote_block_device_init(locked, &current_task);
495            let registry = kernel.remote_block_device_registry.clone();
496            let server = Arc::new(VmoBackedServer::new(2, 512, &[0u8; 1024]));
497            let (client, server_end) = fidl::endpoints::create_endpoints::<BlockMarker>();
498            std::thread::spawn(move || {
499                let mut executor = fuchsia_async::LocalExecutor::default();
500                executor.run_singlethreaded(async move {
501                    use fidl::endpoints::RequestStream;
502                    server.serve(server_end.into_stream().cast_stream()).await.unwrap();
503                });
504            });
505
506            registry
507                .create_remote_block_device(locked, &current_task, "test", client)
508                .expect("create_remote_block_device failed.");
509
510            let device = registry.open(0).expect("open failed.");
511            let file =
512                anon_test_file(locked, &current_task, device.create_file_ops(), OpenFlags::RDWR);
513
514            file.seek(locked, &current_task, SeekTarget::End(0)).expect("seek failed");
515            let mut buf = VecOutputBuffer::new(512);
516            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 0);
517
518            let mut buf = VecInputBuffer::from(vec![1u8; 512]);
519            assert_eq!(file.write(locked, &current_task, &mut buf).expect("write failed."), 0);
520        })
521        .await;
522    }
523
524    #[::fuchsia::test]
525    async fn test_unaligned_read_write_spanning_blocks() {
526        spawn_kernel_and_run(async |locked, current_task| {
527            let kernel = current_task.kernel();
528            remote_block_device_init(locked, &current_task);
529            let registry = kernel.remote_block_device_registry.clone();
530            // 3 blocks of 512 bytes = 1536 bytes
531            let server = Arc::new(VmoBackedServer::new(3, 512, &[0u8; 1536]));
532            let (client, server_end) = fidl::endpoints::create_endpoints::<BlockMarker>();
533            std::thread::spawn(move || {
534                let mut executor = fuchsia_async::LocalExecutor::default();
535                executor.run_singlethreaded(async move {
536                    use fidl::endpoints::RequestStream;
537                    server.serve(server_end.into_stream().cast_stream()).await.unwrap();
538                });
539            });
540
541            registry
542                .create_remote_block_device(locked, &current_task, "test", client)
543                .expect("create_remote_block_device failed.");
544
545            let device = registry.open(0).expect("open failed.");
546            let file =
547                anon_test_file(locked, &current_task, device.create_file_ops(), OpenFlags::RDWR);
548
549            // Write spanning across block boundaries (e.g., from 510 to 1026)
550            // Start at 510 (2 bytes before end of 1st block)
551            // Write 516 bytes (2 bytes in 1st block, 512 bytes in 2nd block, 2 bytes in 3rd block)
552            let mut buf = VecInputBuffer::from(vec![0xAAu8; 516]);
553            file.seek(locked, &current_task, SeekTarget::Set(510)).expect("seek failed");
554            assert_eq!(file.write(locked, &current_task, &mut buf).expect("write failed."), 516);
555
556            // Read back the data
557            let mut buf = VecOutputBuffer::new(516);
558            file.seek(locked, &current_task, SeekTarget::Set(510)).expect("seek failed");
559            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 516);
560            assert_eq!(buf.data(), &[0xAAu8; 516]);
561
562            // Verify surrounding data is still 0
563            let mut buf = VecOutputBuffer::new(1);
564            file.seek(locked, &current_task, SeekTarget::Set(509)).expect("seek failed");
565            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 1);
566            assert_eq!(buf.data(), &[0u8]);
567
568            let mut buf = VecOutputBuffer::new(1);
569            file.seek(locked, &current_task, SeekTarget::Set(1026)).expect("seek failed");
570            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 1);
571            assert_eq!(buf.data(), &[0u8]);
572        })
573        .await;
574    }
575
576    #[::fuchsia::test]
577    async fn test_exact_eof_boundary() {
578        spawn_kernel_and_run(async |locked, current_task| {
579            let kernel = current_task.kernel();
580            remote_block_device_init(locked, &current_task);
581            let registry = kernel.remote_block_device_registry.clone();
582            // 2 blocks of 512 bytes = 1024 bytes
583            let server = Arc::new(VmoBackedServer::new(2, 512, &[0u8; 1024]));
584            let (client, server_end) = fidl::endpoints::create_endpoints::<BlockMarker>();
585            std::thread::spawn(move || {
586                let mut executor = fuchsia_async::LocalExecutor::default();
587                executor.run_singlethreaded(async move {
588                    use fidl::endpoints::RequestStream;
589                    server.serve(server_end.into_stream().cast_stream()).await.unwrap();
590                });
591            });
592
593            registry
594                .create_remote_block_device(locked, &current_task, "test", client)
595                .expect("create_remote_block_device failed.");
596
597            let device = registry.open(0).expect("open failed.");
598            let file =
599                anon_test_file(locked, &current_task, device.create_file_ops(), OpenFlags::RDWR);
600
601            // Write ending exactly at EOF (1024)
602            // Start at 1020, write 4 bytes
603            let mut buf = VecInputBuffer::from(vec![0xBBu8; 4]);
604            file.seek(locked, &current_task, SeekTarget::Set(1020)).expect("seek failed");
605            assert_eq!(file.write(locked, &current_task, &mut buf).expect("write failed."), 4);
606
607            // Read back
608            let mut buf = VecOutputBuffer::new(4);
609            file.seek(locked, &current_task, SeekTarget::Set(1020)).expect("seek failed");
610            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 4);
611            assert_eq!(buf.data(), &[0xBBu8; 4]);
612
613            // Try to read past EOF from 1020 (request 5 bytes)
614            let mut buf = VecOutputBuffer::new(5);
615            file.seek(locked, &current_task, SeekTarget::Set(1020)).expect("seek failed");
616            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 4);
617            assert_eq!(buf.data()[..4], [0xBBu8; 4]);
618        })
619        .await;
620    }
621
622    #[::fuchsia::test]
623    async fn test_rmw_preserves_data() {
624        spawn_kernel_and_run(async |locked, current_task| {
625            let kernel = current_task.kernel();
626            remote_block_device_init(locked, &current_task);
627            let registry = kernel.remote_block_device_registry.clone();
628            // 3 blocks of 512 bytes = 1536 bytes
629            // Initialize with a known pattern (0xFF)
630            let server = Arc::new(VmoBackedServer::new(3, 512, &[0xFFu8; 1536]));
631            let (client, server_end) = fidl::endpoints::create_endpoints::<BlockMarker>();
632            std::thread::spawn(move || {
633                let mut executor = fuchsia_async::LocalExecutor::default();
634                executor.run_singlethreaded(async move {
635                    use fidl::endpoints::RequestStream;
636                    server.serve(server_end.into_stream().cast_stream()).await.unwrap();
637                });
638            });
639
640            registry
641                .create_remote_block_device(locked, &current_task, "test", client)
642                .expect("create_remote_block_device failed.");
643
644            let device = registry.open(0).expect("open failed.");
645            let file =
646                anon_test_file(locked, &current_task, device.create_file_ops(), OpenFlags::RDWR);
647
648            // Write a small chunk in the middle of the second block (offset 600, length 100)
649            // Block 1 is 512-1024. 600 is inside.
650            // This should trigger RMW for the second block.
651            let mut buf = VecInputBuffer::from(vec![0xAAu8; 100]);
652            file.seek(locked, &current_task, SeekTarget::Set(600)).expect("seek failed");
653            assert_eq!(file.write(locked, &current_task, &mut buf).expect("write failed."), 100);
654
655            // Read back the entire second block to verify
656            let mut buf = VecOutputBuffer::new(512);
657            file.seek(locked, &current_task, SeekTarget::Set(512)).expect("seek failed");
658            assert_eq!(file.read(locked, &current_task, &mut buf).expect("read failed."), 512);
659            let data = buf.data();
660
661            // 512 to 600 (88 bytes) should be 0xFF
662            assert_eq!(&data[0..88], &[0xFFu8; 88]);
663            // 600 to 700 (100 bytes) should be 0xAA
664            assert_eq!(&data[88..188], &[0xAAu8; 100]);
665            // 700 to 1024 (324 bytes) should be 0xFF
666            assert_eq!(&data[188..512], &[0xFFu8; 324]);
667        })
668        .await;
669    }
670}