fuchsia_async/net/fuchsia/
udp.rs1#![deny(missing_docs)]
6
7use crate::net::EventedFd;
8use futures::future::Future;
9use futures::ready;
10use futures::task::{Context, Poll};
11use std::io;
12use std::net::{self, SocketAddr};
13use std::ops::Deref;
14use std::os::fd::{AsRawFd, RawFd};
15use std::pin::Pin;
16
17fn new_socket_address_conversion_error() -> std::io::Error {
18 io::Error::other("socket address is not IPv4 or IPv6")
19}
20
21#[derive(Debug)]
26pub struct UdpSocket(DatagramSocket);
27
28impl Deref for UdpSocket {
29 type Target = DatagramSocket;
30
31 fn deref(&self) -> &Self::Target {
32 &self.0
33 }
34}
35
36impl UdpSocket {
37 pub fn bind(addr: &SocketAddr) -> io::Result<UdpSocket> {
41 let socket = net::UdpSocket::bind(addr)?;
42 UdpSocket::from_socket(socket)
43 }
44
45 pub fn from_socket(socket: net::UdpSocket) -> io::Result<UdpSocket> {
47 let socket: socket2::Socket = socket.into();
48 socket.set_nonblocking(true)?;
49 let evented_fd = unsafe { EventedFd::new(socket)? };
50 Ok(UdpSocket(DatagramSocket(evented_fd)))
51 }
52
53 pub fn from_datagram(socket: DatagramSocket) -> io::Result<Self> {
55 let sock: &socket2::Socket = socket.as_ref();
56 if sock.r#type()? != socket2::Type::DGRAM {
57 return Err(io::Error::new(io::ErrorKind::InvalidInput, "socket type is not datagram"));
58 }
59 if sock.protocol()? != Some(socket2::Protocol::UDP) {
60 return Err(io::Error::new(io::ErrorKind::InvalidInput, "socket protocol is not UDP"));
61 }
62 let _: socket2::SockAddr = socket.local_addr()?;
64 Ok(Self(socket))
65 }
66
67 pub fn local_addr(&self) -> io::Result<SocketAddr> {
69 self.0
70 .local_addr()
71 .and_then(|sa| sa.as_socket().ok_or_else(new_socket_address_conversion_error))
72 }
73
74 pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
78 let addr: socket2::SockAddr = (*addr).into();
79 self.0.as_ref().connect(&addr)
80 }
81
82 pub fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> UdpRecvFrom<'a> {
86 UdpRecvFrom { socket: self, buf }
87 }
88
89 pub fn send<'a>(&'a self, buf: &'a [u8]) -> SendFuture<'a> {
93 SendFuture { socket: self, buf }
94 }
95
96 pub fn send_to<'a>(&'a self, buf: &'a [u8], addr: SocketAddr) -> SendTo<'a> {
100 SendTo { socket: self, buf, addr: addr.into() }
101 }
102
103 pub fn send_to_vectored<'a>(
105 &'a self,
106 bufs: &'a [io::IoSlice<'a>],
107 addr: SocketAddr,
108 ) -> SendToVectored<'a> {
109 SendToVectored { socket: self, bufs, addr: addr.into() }
110 }
111}
112
113impl AsRawFd for UdpSocket {
114 fn as_raw_fd(&self) -> RawFd {
115 self.0.as_raw_fd()
116 }
117}
118
119#[derive(Debug)]
121pub struct DatagramSocket(EventedFd<socket2::Socket>);
122
123impl Deref for DatagramSocket {
124 type Target = EventedFd<socket2::Socket>;
125
126 fn deref(&self) -> &Self::Target {
127 &self.0
128 }
129}
130
131impl DatagramSocket {
132 pub fn new(domain: socket2::Domain, protocol: Option<socket2::Protocol>) -> io::Result<Self> {
134 let socket = socket2::Socket::new(domain, socket2::Type::DGRAM.nonblocking(), protocol)?;
135 let evented_fd = unsafe { EventedFd::new(socket)? };
136 Ok(Self(evented_fd))
137 }
138
139 pub fn new_from_socket(socket: socket2::Socket) -> io::Result<Self> {
141 match socket.r#type()? {
142 socket2::Type::DGRAM
143 | socket2::Type::RAW => {
147 socket.set_nonblocking(true)?;
148 let evented_fd = unsafe { EventedFd::new(socket)? };
149 Ok(Self(evented_fd))
150 }
151 _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid socket type.")),
152 }
153 }
154
155 pub fn local_addr(&self) -> io::Result<socket2::SockAddr> {
157 self.0.as_ref().local_addr()
158 }
159
160 pub fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> RecvFrom<'a> {
165 RecvFrom { socket: self, buf }
166 }
167
168 pub fn async_recv_from(
170 &self,
171 buf: &mut [u8],
172 cx: &mut Context<'_>,
173 ) -> Poll<io::Result<(usize, socket2::SockAddr)>> {
174 ready!(EventedFd::poll_readable(&self.0, cx))?;
175 let buf = unsafe {
179 std::slice::from_raw_parts_mut(
180 buf.as_mut_ptr() as *mut core::mem::MaybeUninit<u8>,
181 buf.len(),
182 )
183 };
184 match self.0.as_ref().recv_from(buf) {
185 Err(e) => {
186 if e.kind() == io::ErrorKind::WouldBlock {
187 self.0.need_read(cx);
188 Poll::Pending
189 } else {
190 Poll::Ready(Err(e))
191 }
192 }
193 Ok((size, addr)) => Poll::Ready(Ok((size, addr))),
194 }
195 }
196
197 pub fn send_to<'a>(&'a self, buf: &'a [u8], addr: socket2::SockAddr) -> SendTo<'a> {
201 SendTo { socket: self, buf, addr }
202 }
203
204 fn send_result_to_poll_result(
205 &self,
206 r: io::Result<usize>,
207 cx: &mut Context<'_>,
208 ) -> Poll<io::Result<usize>> {
209 match r {
210 Err(e) => {
211 if e.kind() == io::ErrorKind::WouldBlock {
212 self.0.need_write(cx);
213 Poll::Pending
214 } else {
215 Poll::Ready(Err(e))
216 }
217 }
218 Ok(size) => Poll::Ready(Ok(size)),
219 }
220 }
221
222 pub fn async_send(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
224 ready!(EventedFd::poll_writable(&self.0, cx))?;
225 self.send_result_to_poll_result(self.0.as_ref().send(buf), cx)
226 }
227
228 pub fn async_send_to(
231 &self,
232 buf: &[u8],
233 addr: &socket2::SockAddr,
234 cx: &mut Context<'_>,
235 ) -> Poll<io::Result<usize>> {
236 ready!(EventedFd::poll_writable(&self.0, cx))?;
237 self.send_result_to_poll_result(self.0.as_ref().send_to(buf, addr), cx)
238 }
239
240 pub fn send_to_vectored<'a>(
242 &'a self,
243 bufs: &'a [io::IoSlice<'a>],
244 addr: socket2::SockAddr,
245 ) -> SendToVectored<'a> {
246 SendToVectored { socket: self, bufs, addr }
247 }
248
249 pub fn async_send_to_vectored<'a>(
252 &self,
253 bufs: &'a [io::IoSlice<'a>],
254 addr: &socket2::SockAddr,
255 cx: &mut Context<'_>,
256 ) -> Poll<io::Result<usize>> {
257 ready!(EventedFd::poll_writable(&self.0, cx))?;
258 self.send_result_to_poll_result(self.0.as_ref().send_to_vectored(bufs, addr), cx)
259 }
260
261 pub fn set_broadcast(&self, broadcast: bool) -> io::Result<()> {
265 self.0.as_ref().set_broadcast(broadcast)
266 }
267
268 pub fn broadcast(&self) -> io::Result<bool> {
270 self.0.as_ref().broadcast()
271 }
272
273 pub fn bind_device(&self, interface: Option<&[u8]>) -> io::Result<()> {
281 self.0.as_ref().bind_device(interface)
282 }
283
284 pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
288 self.0.as_ref().device()
289 }
290}
291
292#[must_use = "futures do nothing unless you `.await` or poll them"]
294pub struct UdpRecvFrom<'a> {
295 socket: &'a UdpSocket,
296 buf: &'a mut [u8],
297}
298
299impl<'a> Future for UdpRecvFrom<'a> {
300 type Output = io::Result<(usize, SocketAddr)>;
301
302 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
303 let this = &mut *self;
304 let (received, addr) = ready!(this.socket.0.async_recv_from(this.buf, cx))?;
305 Poll::Ready(
306 addr.as_socket()
307 .ok_or_else(new_socket_address_conversion_error)
308 .map(|addr| (received, addr)),
309 )
310 }
311}
312
313#[must_use = "futures do nothing unless you `.await` or poll them"]
315pub struct RecvFrom<'a> {
316 socket: &'a DatagramSocket,
317 buf: &'a mut [u8],
318}
319
320impl<'a> Future for RecvFrom<'a> {
321 type Output = io::Result<(usize, socket2::SockAddr)>;
322
323 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
324 let this = &mut *self;
325 let (received, addr) = ready!(this.socket.async_recv_from(this.buf, cx))?;
326 Poll::Ready(Ok((received, addr)))
327 }
328}
329
330#[must_use = "futures do nothing unless you `.await` or poll them"]
332pub struct SendFuture<'a> {
333 socket: &'a DatagramSocket,
334 buf: &'a [u8],
335}
336
337impl<'a> Future for SendFuture<'a> {
338 type Output = io::Result<usize>;
339
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 self.socket.async_send(self.buf, cx)
342 }
343}
344
345#[must_use = "futures do nothing unless you `.await` or poll them"]
347pub struct SendTo<'a> {
348 socket: &'a DatagramSocket,
349 buf: &'a [u8],
350 addr: socket2::SockAddr,
351}
352
353impl<'a> Future for SendTo<'a> {
354 type Output = io::Result<usize>;
355
356 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
357 self.socket.async_send_to(self.buf, &self.addr, cx)
358 }
359}
360
361#[must_use = "futures do nothing unless you `.await` or poll them"]
363pub struct SendToVectored<'a> {
364 socket: &'a DatagramSocket,
365 bufs: &'a [io::IoSlice<'a>],
366 addr: socket2::SockAddr,
367}
368
369impl<'a> Future for SendToVectored<'a> {
370 type Output = io::Result<usize>;
371
372 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373 self.socket.async_send_to_vectored(self.bufs, &self.addr, cx)
374 }
375}
376
377#[cfg(test)]
378mod test {
379 #[test]
380 fn datagram_socket_new_from_socket() {
381 let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)
382 .expect("failed to create stream socket");
383 match super::DatagramSocket::new_from_socket(sock) {
384 Err(e) => {
385 if e.kind() != std::io::ErrorKind::InvalidInput {
386 panic!("got: {e:?}; want error of kind InvalidInput");
387 }
388 }
389 Ok(_) => panic!("DatagramSocket created from stream socket succeeded unexpectedly"),
390 }
391 }
392}