fuchsia_async/net/fuchsia/
udp.rs#![deny(missing_docs)]
use crate::net::EventedFd;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use std::io;
use std::net::{self, SocketAddr};
use std::ops::Deref;
use std::pin::Pin;
fn new_socket_address_conversion_error() -> std::io::Error {
io::Error::new(io::ErrorKind::Other, "socket address is not IPv4 or IPv6")
}
#[derive(Debug)]
pub struct UdpSocket(DatagramSocket);
impl Deref for UdpSocket {
type Target = DatagramSocket;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl UdpSocket {
pub fn bind(addr: &SocketAddr) -> io::Result<UdpSocket> {
let socket = net::UdpSocket::bind(addr)?;
UdpSocket::from_socket(socket)
}
pub fn from_socket(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let socket: socket2::Socket = socket.into();
socket.set_nonblocking(true)?;
let socket = socket.into();
let evented_fd = unsafe { EventedFd::new(socket)? };
Ok(UdpSocket(DatagramSocket(evented_fd)))
}
pub fn from_datagram(socket: DatagramSocket) -> io::Result<Self> {
let sock: &socket2::Socket = socket.as_ref();
if sock.r#type()? != socket2::Type::DGRAM {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "socket type is not datagram"));
}
if sock.protocol()? != Some(socket2::Protocol::UDP) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "socket protocol is not UDP"));
}
let _: socket2::SockAddr = socket.local_addr()?;
Ok(Self(socket))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0
.local_addr()
.and_then(|sa| sa.as_socket().ok_or_else(new_socket_address_conversion_error))
}
pub fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> UdpRecvFrom<'a> {
UdpRecvFrom { socket: self, buf }
}
pub fn send_to<'a>(&'a self, buf: &'a [u8], addr: SocketAddr) -> SendTo<'a> {
SendTo { socket: self, buf, addr: addr.into() }
}
pub fn send_to_vectored<'a>(
&'a self,
bufs: &'a [io::IoSlice<'a>],
addr: SocketAddr,
) -> SendToVectored<'a> {
SendToVectored { socket: self, bufs, addr: addr.into() }
}
}
#[derive(Debug)]
pub struct DatagramSocket(EventedFd<socket2::Socket>);
impl Deref for DatagramSocket {
type Target = EventedFd<socket2::Socket>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DatagramSocket {
pub fn new(domain: socket2::Domain, protocol: Option<socket2::Protocol>) -> io::Result<Self> {
let socket = socket2::Socket::new(domain, socket2::Type::DGRAM.nonblocking(), protocol)?;
let evented_fd = unsafe { EventedFd::new(socket)? };
Ok(Self(evented_fd))
}
pub fn new_from_socket(socket: socket2::Socket) -> io::Result<Self> {
match socket.r#type()? {
socket2::Type::DGRAM
| socket2::Type::RAW => {
socket.set_nonblocking(true)?;
let evented_fd = unsafe { EventedFd::new(socket)? };
Ok(Self(evented_fd))
}
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid socket type.")),
}
}
pub fn local_addr(&self) -> io::Result<socket2::SockAddr> {
self.0.as_ref().local_addr()
}
pub fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> RecvFrom<'a> {
RecvFrom { socket: self, buf }
}
pub fn async_recv_from(
&self,
buf: &mut [u8],
cx: &mut Context<'_>,
) -> Poll<io::Result<(usize, socket2::SockAddr)>> {
ready!(EventedFd::poll_readable(&self.0, cx))?;
let buf = unsafe {
std::slice::from_raw_parts_mut(
buf.as_mut_ptr() as *mut core::mem::MaybeUninit<u8>,
buf.len(),
)
};
match self.0.as_ref().recv_from(buf) {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.0.need_read(cx);
Poll::Pending
} else {
Poll::Ready(Err(e))
}
}
Ok((size, addr)) => Poll::Ready(Ok((size, addr))),
}
}
pub fn send_to<'a>(&'a self, buf: &'a [u8], addr: socket2::SockAddr) -> SendTo<'a> {
SendTo { socket: self, buf, addr }
}
pub fn async_send_to(
&self,
buf: &[u8],
addr: &socket2::SockAddr,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
ready!(EventedFd::poll_writable(&self.0, cx))?;
match self.0.as_ref().send_to(buf, addr) {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.0.need_write(cx);
Poll::Pending
} else {
Poll::Ready(Err(e))
}
}
Ok(size) => Poll::Ready(Ok(size)),
}
}
pub fn send_to_vectored<'a>(
&'a self,
bufs: &'a [io::IoSlice<'a>],
addr: socket2::SockAddr,
) -> SendToVectored<'a> {
SendToVectored { socket: self, bufs, addr }
}
pub fn async_send_to_vectored<'a>(
&self,
bufs: &'a [io::IoSlice<'a>],
addr: &socket2::SockAddr,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
ready!(EventedFd::poll_writable(&self.0, cx))?;
match self.0.as_ref().send_to_vectored(bufs, addr) {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.0.need_write(cx);
Poll::Pending
} else {
Poll::Ready(Err(e))
}
}
Ok(size) => Poll::Ready(Ok(size)),
}
}
pub fn set_broadcast(&self, broadcast: bool) -> io::Result<()> {
self.0.as_ref().set_broadcast(broadcast)
}
pub fn broadcast(&self) -> io::Result<bool> {
self.0.as_ref().broadcast()
}
pub fn bind_device(&self, interface: Option<&[u8]>) -> io::Result<()> {
self.0.as_ref().bind_device(interface)
}
pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
self.0.as_ref().device()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct UdpRecvFrom<'a> {
socket: &'a UdpSocket,
buf: &'a mut [u8],
}
impl<'a> Future for UdpRecvFrom<'a> {
type Output = io::Result<(usize, SocketAddr)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (received, addr) = ready!(this.socket.0.async_recv_from(this.buf, cx))?;
Poll::Ready(
addr.as_socket()
.ok_or_else(new_socket_address_conversion_error)
.map(|addr| (received, addr)),
)
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct RecvFrom<'a> {
socket: &'a DatagramSocket,
buf: &'a mut [u8],
}
impl<'a> Future for RecvFrom<'a> {
type Output = io::Result<(usize, socket2::SockAddr)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (received, addr) = ready!(this.socket.async_recv_from(this.buf, cx))?;
Poll::Ready(Ok((received, addr)))
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendTo<'a> {
socket: &'a DatagramSocket,
buf: &'a [u8],
addr: socket2::SockAddr,
}
impl<'a> Future for SendTo<'a> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.socket.async_send_to(self.buf, &self.addr, cx)
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendToVectored<'a> {
socket: &'a DatagramSocket,
bufs: &'a [io::IoSlice<'a>],
addr: socket2::SockAddr,
}
impl<'a> Future for SendToVectored<'a> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.socket.async_send_to_vectored(self.bufs, &self.addr, cx)
}
}
#[cfg(test)]
mod test {
#[test]
fn datagram_socket_new_from_socket() {
let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)
.expect("failed to create stream socket");
match super::DatagramSocket::new_from_socket(sock) {
Err(e) => {
if e.kind() != std::io::ErrorKind::InvalidInput {
panic!("got: {:?}; want error of kind InvalidInput", e);
}
}
Ok(_) => panic!("DatagramSocket created from stream socket succeeded unexpectedly"),
}
}
}