starnix_core/vfs/buffers/
message_queue.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6
7use super::message_types::{AncillaryData, Message, MessageData};
8use crate::vfs::buffers::{InputBuffer, OutputBuffer};
9use crate::vfs::socket::SocketAddress;
10use starnix_uapi::error;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::vfs::FdEvents;
13
14#[derive(Debug, Default, Clone)]
15pub struct MessageReadInfo {
16    pub bytes_read: usize,
17    pub message_length: usize,
18    pub address: Option<SocketAddress>,
19    pub ancillary_data: Vec<AncillaryData>,
20}
21
22impl MessageReadInfo {
23    /// Appends `info` to self.
24    pub fn append(&mut self, info: &mut MessageReadInfo) {
25        self.bytes_read += info.bytes_read;
26        self.message_length += info.message_length;
27        self.ancillary_data.append(&mut info.ancillary_data);
28    }
29}
30
31/// A `MessageQueue` stores a FIFO sequence of messages.
32#[derive(Debug)]
33pub struct MessageQueue<D: MessageData = Vec<u8>> {
34    /// The messages stored in the message queue.
35    ///
36    /// Writes are added at the end of the queue. Reads consume from the front of the queue.
37    messages: VecDeque<Message<D>>,
38
39    /// The total number of bytes currently in the message queue.
40    length: usize,
41
42    /// The maximum number of bytes that can be stored inside this pipe.
43    capacity: usize,
44}
45
46impl<D: MessageData> MessageQueue<D> {
47    pub fn new(capacity: usize) -> Self {
48        MessageQueue { messages: VecDeque::default(), length: 0, capacity }
49    }
50
51    /// Returns the number of bytes that can be written to the message queue before the buffer is
52    /// full.
53    pub fn available_capacity(&self) -> usize {
54        self.capacity - self.length
55    }
56
57    /// Returns the total number of bytes this message queue can store, regardless of the current
58    /// amount of data in the buffer.
59    pub fn capacity(&self) -> usize {
60        self.capacity
61    }
62
63    pub fn messages(&self) -> impl Iterator<Item = &Message<D>> {
64        self.messages.iter()
65    }
66
67    /// Sets the capacity of the message queue to the provided number of bytes.
68    ///
69    /// Reurns an error if the requested capacity could not be set (e.g., if the requested capacity
70    /// was less than the current number of bytes stored).
71    pub fn set_capacity(&mut self, requested_capacity: usize) -> Result<(), Errno> {
72        if requested_capacity < self.length {
73            return error!(EBUSY);
74        }
75        self.capacity = requested_capacity;
76        Ok(())
77    }
78
79    /// Returns true if the message queue is empty, or it only contains empty messages.
80    pub fn is_empty(&self) -> bool {
81        self.len() == 0
82    }
83
84    /// Returns the total length of all the messages in the message queue.
85    pub fn len(&self) -> usize {
86        self.length
87    }
88
89    fn update_address(message: &Message<D>, address: &mut Option<SocketAddress>) -> bool {
90        if message.address.is_some() && *address != message.address {
91            if address.is_some() {
92                return false;
93            }
94            *address = message.address.clone();
95        }
96        true
97    }
98
99    /// Reads messages until there are no more messages, a message with ancillary data is
100    /// encountered, or `data` are full.
101    ///
102    /// To read data from the queue without consuming the messages, see `peek_stream`.
103    ///
104    /// # Parameters
105    /// - `data`: The `OutputBuffer` to write the data to.
106    ///
107    /// Returns the number of bytes that were read into the buffer, and any ancillary data that was
108    /// read.
109    pub fn read_stream(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
110        let mut total_bytes_read = 0;
111        let mut address = None;
112        let mut ancillary_data = vec![];
113
114        while let Some(mut message) = self.read_message() {
115            if !Self::update_address(&message, &mut address) {
116                // We've already locked onto an address for this batch of messages, but we
117                // have found a message that doesn't match. We put it back for now and
118                // return the messages we have so far.
119                self.write_front(message);
120                break;
121            }
122
123            let bytes_read = message.data.copy_to_user(data)?;
124            total_bytes_read += bytes_read;
125
126            if let Some(remaining_data) = message.data.split_off(bytes_read) {
127                // If not all the message data could fit move the ancillary data to the split off
128                // message, so that the ancillary data is returned with the "last" message.
129                self.write_front(Message::new(
130                    remaining_data,
131                    message.address.clone(),
132                    message.ancillary_data,
133                ));
134                break;
135            }
136
137            if !message.ancillary_data.is_empty() {
138                ancillary_data = message.ancillary_data;
139                break;
140            }
141        }
142
143        Ok(MessageReadInfo {
144            bytes_read: total_bytes_read,
145            message_length: total_bytes_read,
146            address,
147            ancillary_data,
148        })
149    }
150
151    /// Peeks messages until there are no more messages, a message with ancillary data is
152    /// encountered, or `data` are full.
153    ///
154    /// Unlike `read_stream`, this function does not remove the messages from the queue.
155    ///
156    /// Used to implement MSG_PEEK.
157    ///
158    /// # Parameters
159    /// - `data`: The `OutputBuffer` to write the data to.
160    ///
161    /// Returns the number of bytes that were read into the buffer, and any ancillary data that was
162    /// read.
163    pub fn peek_stream(&self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
164        let mut total_bytes_read = 0;
165        let mut address = None;
166        let mut ancillary_data = vec![];
167
168        for message in self.messages.iter() {
169            if !Self::update_address(message, &mut address) {
170                break;
171            }
172
173            let bytes_read = message.data.copy_to_user(data)?;
174            total_bytes_read += bytes_read;
175
176            if bytes_read < message.len() {
177                break;
178            }
179
180            if !message.ancillary_data.is_empty() {
181                ancillary_data = message.ancillary_data.clone();
182                break;
183            }
184        }
185
186        Ok(MessageReadInfo {
187            bytes_read: total_bytes_read,
188            message_length: total_bytes_read,
189            address,
190            ancillary_data,
191        })
192    }
193
194    pub fn read_datagram(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
195        if let Some(message) = self.read_message() {
196            Ok(MessageReadInfo {
197                bytes_read: message.data.copy_to_user(data)?,
198                message_length: message.len(),
199                address: message.address,
200                ancillary_data: message.ancillary_data,
201            })
202        } else {
203            Ok(MessageReadInfo::default())
204        }
205    }
206
207    pub fn peek_datagram(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
208        if let Some(message) = self.peek_message() {
209            Ok(MessageReadInfo {
210                bytes_read: message.data.copy_to_user(data)?,
211                message_length: message.len(),
212                address: message.address.clone(),
213                ancillary_data: message.ancillary_data.clone(),
214            })
215        } else {
216            Ok(MessageReadInfo::default())
217        }
218    }
219
220    /// Reads the next message in the buffer, if such a message exists.
221    pub fn read_message(&mut self) -> Option<Message<D>> {
222        self.messages.pop_front().map(|message| {
223            self.length -= message.len();
224            message
225        })
226    }
227
228    pub fn peek_queue(&self) -> &VecDeque<Message<D>> {
229        &self.messages
230    }
231
232    /// Peeks the next message in the buffer, if such a message exists.
233    fn peek_message(&self) -> Option<&Message<D>> {
234        self.messages.front()
235    }
236
237    /// Writes the the contents of `InputBuffer` into this socket.
238    /// Will return EAGAIN if not enough capacity is available.
239    ///
240    /// # Parameters
241    /// - `task`: The task to read memory from.
242    /// - `data`: The `InputBuffer` to read the data from.
243    ///
244    /// Returns the number of bytes that were written to the socket.
245    pub fn write_stream(
246        &mut self,
247        data: &mut dyn InputBuffer,
248        address: Option<SocketAddress>,
249        ancillary_data: &mut Vec<AncillaryData>,
250    ) -> Result<usize, Errno> {
251        self.write_stream_with_filter(data, address, ancillary_data, Some)
252    }
253
254    /// Writes the the contents of `InputBuffer` into this socket.
255    /// Will return EAGAIN if not enough capacity is available.
256    ///
257    /// # Parameters
258    /// - `task`: The task to read memory from.
259    /// - `data`: The `InputBuffer` to read the data from.
260    /// - `filter`: A filter to run on the message before inserting it into the queue. If it
261    ///             returns None, no message is enqueued.
262    ///
263    /// Returns the number of bytes that were written to the socket.
264    pub fn write_stream_with_filter(
265        &mut self,
266        data: &mut dyn InputBuffer,
267        address: Option<SocketAddress>,
268        ancillary_data: &mut Vec<AncillaryData>,
269        filter: impl FnOnce(Message<D>) -> Option<Message<D>>,
270    ) -> Result<usize, Errno> {
271        let actual = std::cmp::min(self.available_capacity(), data.available());
272        if actual == 0 && data.available() > 0 {
273            return error!(EAGAIN);
274        }
275        let data = MessageData::copy_from_user(data, actual)?;
276        let message = Message::new(data, address, std::mem::take(ancillary_data));
277        if let Some(message) = filter(message) {
278            self.write_message(message);
279        }
280        Ok(actual)
281    }
282
283    /// Writes the the contents of `InputBuffer` into this socket as
284    /// single message. Will return EAGAIN if not enough capacity is available.
285    ///
286    /// # Parameters
287    /// - `task`: The task to read memory from.
288    /// - `data`: The `InputBuffer` to read the data from.
289    ///
290    /// Returns the number of bytes that were written to the socket.
291    pub fn write_datagram(
292        &mut self,
293        data: &mut dyn InputBuffer,
294        address: Option<SocketAddress>,
295        ancillary_data: &mut Vec<AncillaryData>,
296    ) -> Result<usize, Errno> {
297        self.write_datagram_with_filter(data, address, ancillary_data, Some)
298    }
299
300    /// Writes the the contents of `InputBuffer` into this socket as
301    /// single message. Will return EAGAIN if not enough capacity is available.
302    ///
303    /// # Parameters
304    /// - `task`: The task to read memory from.
305    /// - `data`: The `InputBuffer` to read the data from.
306    /// - `filter`: A filter to run on the message before inserting it into the queue. If it
307    ///             returns None, no message is enqueued.
308    ///
309    /// Returns the number of bytes that were written to the socket.
310    pub fn write_datagram_with_filter(
311        &mut self,
312        data: &mut dyn InputBuffer,
313        address: Option<SocketAddress>,
314        ancillary_data: &mut Vec<AncillaryData>,
315        filter: impl FnOnce(Message<D>) -> Option<Message<D>>,
316    ) -> Result<usize, Errno> {
317        let actual = data.available();
318        if actual > self.capacity() {
319            return error!(EMSGSIZE);
320        }
321        if actual > self.available_capacity() {
322            return error!(EAGAIN);
323        }
324        let data = MessageData::copy_from_user(data, actual)?;
325        let message = Message::new(data, address, std::mem::take(ancillary_data));
326        if let Some(message) = filter(message) {
327            self.write_message(message);
328        }
329        Ok(actual)
330    }
331
332    /// Writes a message to the front of the message queue.
333    pub fn write_front(&mut self, message: Message<D>) {
334        self.length += message.len();
335        debug_assert!(self.length <= self.capacity);
336        self.messages.push_front(message);
337    }
338
339    /// Writes a message to the back of the message queue.
340    pub fn write_message(&mut self, message: Message<D>) {
341        self.length += message.len();
342        debug_assert!(self.length <= self.capacity);
343        self.messages.push_back(message);
344    }
345
346    pub fn query_events(&self) -> FdEvents {
347        let mut events = FdEvents::empty();
348        if self.available_capacity() > 0 {
349            events |= FdEvents::POLLOUT;
350        }
351        if !self.is_empty() {
352            events |= FdEvents::POLLIN;
353        }
354        events
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use crate::vfs::UnixControlData;
362
363    /// Tests that a write followed by a read returns the written message.
364    #[::fuchsia::test]
365    fn test_read_write() {
366        let mut message_queue = MessageQueue::new(usize::MAX);
367        let bytes: Vec<u8> = vec![1, 2, 3];
368        let message: Message = bytes.into();
369        message_queue.write_message(message.clone());
370        assert_eq!(message_queue.len(), 3);
371        assert_eq!(message_queue.read_message(), Some(message));
372        assert!(message_queue.is_empty());
373    }
374
375    /// Tests that ancillary data does not contribute to the message queue length.
376    #[::fuchsia::test]
377    fn test_control_len() {
378        let mut message_queue = MessageQueue::new(usize::MAX);
379        let bytes: Vec<u8> = vec![1, 2, 3];
380        let ancillary_data =
381            vec![AncillaryData::Unix(UnixControlData::Security(bytes.clone().into()))];
382        let message = Message::new(vec![].into(), None, ancillary_data);
383        message_queue.write_message(message);
384        assert_eq!(message_queue.len(), 0);
385        message_queue.write_message(bytes.clone().into());
386        assert_eq!(message_queue.len(), bytes.len());
387    }
388
389    /// Tests that multiple writes followed by multiple reads return the data in the correct order.
390    #[::fuchsia::test]
391    fn test_read_write_multiple() {
392        let mut message_queue = MessageQueue::new(usize::MAX);
393        let first_bytes: Vec<u8> = vec![1, 2, 3];
394        let second_bytes: Vec<u8> = vec![3, 4, 5];
395
396        for message in [first_bytes.clone().into(), second_bytes.clone().into()] {
397            message_queue.write_message(message);
398        }
399
400        assert_eq!(message_queue.len(), first_bytes.len() + second_bytes.len());
401        assert_eq!(message_queue.read_message(), Some(first_bytes.into()));
402        assert_eq!(message_queue.len(), second_bytes.len());
403        assert_eq!(message_queue.read_message(), Some(second_bytes.into()));
404        assert_eq!(message_queue.read_message(), None);
405    }
406}