starnix_core/vfs/buffers/
message_queue.rs1use std::collections::VecDeque;
6
7use super::message_types::{AncillaryData, Message, MessageData};
8use crate::vfs::buffers::{InputBuffer, OutputBuffer};
9use crate::vfs::socket::SocketAddress;
10use starnix_uapi::error;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::vfs::FdEvents;
13
14#[derive(Debug, Default, Clone)]
15pub struct MessageReadInfo {
16 pub bytes_read: usize,
17 pub message_length: usize,
18 pub address: Option<SocketAddress>,
19 pub ancillary_data: Vec<AncillaryData>,
20}
21
22impl MessageReadInfo {
23 pub fn append(&mut self, info: &mut MessageReadInfo) {
25 self.bytes_read += info.bytes_read;
26 self.message_length += info.message_length;
27 self.ancillary_data.append(&mut info.ancillary_data);
28 }
29}
30
31#[derive(Debug)]
33pub struct MessageQueue<D: MessageData = Vec<u8>> {
34 messages: VecDeque<Message<D>>,
38
39 length: usize,
41
42 capacity: usize,
44}
45
46impl<D: MessageData> MessageQueue<D> {
47 pub fn new(capacity: usize) -> Self {
48 MessageQueue { messages: VecDeque::default(), length: 0, capacity }
49 }
50
51 pub fn available_capacity(&self) -> usize {
54 self.capacity - self.length
55 }
56
57 pub fn capacity(&self) -> usize {
60 self.capacity
61 }
62
63 pub fn messages(&self) -> impl Iterator<Item = &Message<D>> {
64 self.messages.iter()
65 }
66
67 pub fn set_capacity(&mut self, requested_capacity: usize) -> Result<(), Errno> {
72 if requested_capacity < self.length {
73 return error!(EBUSY);
74 }
75 self.capacity = requested_capacity;
76 Ok(())
77 }
78
79 pub fn is_empty(&self) -> bool {
81 self.len() == 0
82 }
83
84 pub fn len(&self) -> usize {
86 self.length
87 }
88
89 fn update_address(message: &Message<D>, address: &mut Option<SocketAddress>) -> bool {
90 if message.address.is_some() && *address != message.address {
91 if address.is_some() {
92 return false;
93 }
94 *address = message.address.clone();
95 }
96 true
97 }
98
99 pub fn read_stream(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
110 let mut total_bytes_read = 0;
111 let mut address = None;
112 let mut ancillary_data = vec![];
113
114 while let Some(mut message) = self.read_message() {
115 if !Self::update_address(&message, &mut address) {
116 self.write_front(message);
120 break;
121 }
122
123 let bytes_read = message.data.copy_to_user(data)?;
124 total_bytes_read += bytes_read;
125
126 if let Some(remaining_data) = message.data.split_off(bytes_read) {
127 self.write_front(Message::new(
130 remaining_data,
131 message.address.clone(),
132 message.ancillary_data,
133 ));
134 break;
135 }
136
137 if !message.ancillary_data.is_empty() {
138 ancillary_data = message.ancillary_data;
139 break;
140 }
141 }
142
143 Ok(MessageReadInfo {
144 bytes_read: total_bytes_read,
145 message_length: total_bytes_read,
146 address,
147 ancillary_data,
148 })
149 }
150
151 pub fn peek_stream(&self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
164 let mut total_bytes_read = 0;
165 let mut address = None;
166 let mut ancillary_data = vec![];
167
168 for message in self.messages.iter() {
169 if !Self::update_address(message, &mut address) {
170 break;
171 }
172
173 let bytes_read = message.data.copy_to_user(data)?;
174 total_bytes_read += bytes_read;
175
176 if bytes_read < message.len() {
177 break;
178 }
179
180 if !message.ancillary_data.is_empty() {
181 ancillary_data = message.ancillary_data.clone();
182 break;
183 }
184 }
185
186 Ok(MessageReadInfo {
187 bytes_read: total_bytes_read,
188 message_length: total_bytes_read,
189 address,
190 ancillary_data,
191 })
192 }
193
194 pub fn read_datagram(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
195 if let Some(message) = self.read_message() {
196 Ok(MessageReadInfo {
197 bytes_read: message.data.copy_to_user(data)?,
198 message_length: message.len(),
199 address: message.address,
200 ancillary_data: message.ancillary_data,
201 })
202 } else {
203 Ok(MessageReadInfo::default())
204 }
205 }
206
207 pub fn peek_datagram(&mut self, data: &mut dyn OutputBuffer) -> Result<MessageReadInfo, Errno> {
208 if let Some(message) = self.peek_message() {
209 Ok(MessageReadInfo {
210 bytes_read: message.data.copy_to_user(data)?,
211 message_length: message.len(),
212 address: message.address.clone(),
213 ancillary_data: message.ancillary_data.clone(),
214 })
215 } else {
216 Ok(MessageReadInfo::default())
217 }
218 }
219
220 pub fn read_message(&mut self) -> Option<Message<D>> {
222 self.messages.pop_front().map(|message| {
223 self.length -= message.len();
224 message
225 })
226 }
227
228 pub fn peek_queue(&self) -> &VecDeque<Message<D>> {
229 &self.messages
230 }
231
232 fn peek_message(&self) -> Option<&Message<D>> {
234 self.messages.front()
235 }
236
237 pub fn write_stream(
246 &mut self,
247 data: &mut dyn InputBuffer,
248 address: Option<SocketAddress>,
249 ancillary_data: &mut Vec<AncillaryData>,
250 ) -> Result<usize, Errno> {
251 self.write_stream_with_filter(data, address, ancillary_data, Some)
252 }
253
254 pub fn write_stream_with_filter(
265 &mut self,
266 data: &mut dyn InputBuffer,
267 address: Option<SocketAddress>,
268 ancillary_data: &mut Vec<AncillaryData>,
269 filter: impl FnOnce(Message<D>) -> Option<Message<D>>,
270 ) -> Result<usize, Errno> {
271 let actual = std::cmp::min(self.available_capacity(), data.available());
272 if actual == 0 && data.available() > 0 {
273 return error!(EAGAIN);
274 }
275 let data = MessageData::copy_from_user(data, actual)?;
276 let message = Message::new(data, address, std::mem::take(ancillary_data));
277 if let Some(message) = filter(message) {
278 self.write_message(message);
279 }
280 Ok(actual)
281 }
282
283 pub fn write_datagram(
292 &mut self,
293 data: &mut dyn InputBuffer,
294 address: Option<SocketAddress>,
295 ancillary_data: &mut Vec<AncillaryData>,
296 ) -> Result<usize, Errno> {
297 self.write_datagram_with_filter(data, address, ancillary_data, Some)
298 }
299
300 pub fn write_datagram_with_filter(
311 &mut self,
312 data: &mut dyn InputBuffer,
313 address: Option<SocketAddress>,
314 ancillary_data: &mut Vec<AncillaryData>,
315 filter: impl FnOnce(Message<D>) -> Option<Message<D>>,
316 ) -> Result<usize, Errno> {
317 let actual = data.available();
318 if actual > self.capacity() {
319 return error!(EMSGSIZE);
320 }
321 if actual > self.available_capacity() {
322 return error!(EAGAIN);
323 }
324 let data = MessageData::copy_from_user(data, actual)?;
325 let message = Message::new(data, address, std::mem::take(ancillary_data));
326 if let Some(message) = filter(message) {
327 self.write_message(message);
328 }
329 Ok(actual)
330 }
331
332 pub fn write_front(&mut self, message: Message<D>) {
334 self.length += message.len();
335 debug_assert!(self.length <= self.capacity);
336 self.messages.push_front(message);
337 }
338
339 pub fn write_message(&mut self, message: Message<D>) {
341 self.length += message.len();
342 debug_assert!(self.length <= self.capacity);
343 self.messages.push_back(message);
344 }
345
346 pub fn query_events(&self) -> FdEvents {
347 let mut events = FdEvents::empty();
348 if self.available_capacity() > 0 {
349 events |= FdEvents::POLLOUT;
350 }
351 if !self.is_empty() {
352 events |= FdEvents::POLLIN;
353 }
354 events
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use crate::vfs::UnixControlData;
362
363 #[::fuchsia::test]
365 fn test_read_write() {
366 let mut message_queue = MessageQueue::new(usize::MAX);
367 let bytes: Vec<u8> = vec![1, 2, 3];
368 let message: Message = bytes.into();
369 message_queue.write_message(message.clone());
370 assert_eq!(message_queue.len(), 3);
371 assert_eq!(message_queue.read_message(), Some(message));
372 assert!(message_queue.is_empty());
373 }
374
375 #[::fuchsia::test]
377 fn test_control_len() {
378 let mut message_queue = MessageQueue::new(usize::MAX);
379 let bytes: Vec<u8> = vec![1, 2, 3];
380 let ancillary_data =
381 vec![AncillaryData::Unix(UnixControlData::Security(bytes.clone().into()))];
382 let message = Message::new(vec![].into(), None, ancillary_data);
383 message_queue.write_message(message);
384 assert_eq!(message_queue.len(), 0);
385 message_queue.write_message(bytes.clone().into());
386 assert_eq!(message_queue.len(), bytes.len());
387 }
388
389 #[::fuchsia::test]
391 fn test_read_write_multiple() {
392 let mut message_queue = MessageQueue::new(usize::MAX);
393 let first_bytes: Vec<u8> = vec![1, 2, 3];
394 let second_bytes: Vec<u8> = vec![3, 4, 5];
395
396 for message in [first_bytes.clone().into(), second_bytes.clone().into()] {
397 message_queue.write_message(message);
398 }
399
400 assert_eq!(message_queue.len(), first_bytes.len() + second_bytes.len());
401 assert_eq!(message_queue.read_message(), Some(first_bytes.into()));
402 assert_eq!(message_queue.len(), second_bytes.len());
403 assert_eq!(message_queue.read_message(), Some(second_bytes.into()));
404 assert_eq!(message_queue.read_message(), None);
405 }
406}