diff --git a/kernel/src/net/socket/unix/datagram/message.rs b/kernel/src/net/socket/unix/datagram/message.rs new file mode 100644 index 000000000..1c1b2db78 --- /dev/null +++ b/kernel/src/net/socket/unix/datagram/message.rs @@ -0,0 +1,282 @@ +// SPDX-License-Identifier: MPL-2.0 + +use alloc::{ + collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + sync::Arc, +}; +use core::sync::atomic::{AtomicBool, Ordering}; + +use ostd::sync::{RwLock, WaitQueue}; +use spin::Once; + +use crate::{ + events::IoEvents, + net::socket::{ + unix::{ + addr::{UnixSocketAddrBound, UnixSocketAddrKey}, + ctrl_msg::AuxiliaryData, + UnixSocketAddr, + }, + util::ControlMessage, + }, + prelude::*, + process::signal::Pollee, + util::{MultiRead, MultiWrite}, +}; + +pub(super) struct MessageQueue { + addr: Once, + inner: Mutex>, + is_pass_cred: AtomicBool, + pollee: Pollee, + send_wait_queue: WaitQueue, +} + +struct Inner { + messages: VecDeque, + total_length: usize, + is_shutdown: bool, +} + +struct Message { + bytes: Vec, + aux: AuxiliaryData, + src: UnixSocketAddr, +} + +impl MessageQueue { + /// Looks up a message queue bound to the specific address in the global table. + pub(super) fn lookup_bound(addr: &UnixSocketAddrKey) -> Result> { + QUEUE_TABLE.get_queue(addr).ok_or_else(|| { + Error::with_message(Errno::ECONNREFUSED, "the target socket does not exist") + }) + } + + pub(super) fn try_send( + &self, + reader: &mut dyn MultiRead, + aux_data: &mut AuxiliaryData, + source: &MessageReceiver, + ) -> Result { + let mut inner = self.inner.lock(); + let Some(inner) = inner.as_mut() else { + return_errno_with_message!(Errno::ECONNREFUSED, "the target socket is closed"); + }; + if inner.is_shutdown { + return_errno_with_message!(Errno::EPIPE, "the target socket is shut down"); + } + + let len = reader.sum_lens(); + if len > UNIX_DATAGRAM_DEFAULT_BUF_SIZE { + return_errno_with_message!(Errno::EMSGSIZE, "the message is too large"); + } + if UNIX_DATAGRAM_DEFAULT_BUF_SIZE - inner.total_length < len { + return_errno_with_message!( + Errno::EAGAIN, + "the receive buffer does not have enough space" + ); + } + + let msg = { + let mut bytes = vec![0; len]; + reader.read(&mut VmWriter::from(bytes.as_mut_slice()))?; + + let mut aux = core::mem::take(aux_data); + if self.is_pass_cred.load(Ordering::Relaxed) + || source.queue.is_pass_cred.load(Ordering::Relaxed) + { + aux.fill_cred(); + } + + let src = source.queue.addr(); + + Message { bytes, aux, src } + }; + + inner.total_length += msg.bytes.len(); + inner.messages.push_back(msg); + + self.pollee.notify(IoEvents::IN); + + Ok(len) + } + + pub(super) fn addr(&self) -> UnixSocketAddr { + self.addr.get().cloned().unwrap_or(UnixSocketAddr::Unnamed) + } + + /// Blocks until the buffer is free and the `try_send` succeeds, or until interrupted. + pub(super) fn block_send(&self, mut try_send: F) -> Result + where + F: FnMut() -> Result, + { + self.send_wait_queue.pause_until(|| match try_send() { + Err(err) if err.error() == Errno::EAGAIN => None, + result => Some(result), + })? + } +} + +// Note that a message receiver corresponds to a live socket and maintains certain invariants. For +// instance, `queue.inner` is always `Some(_)`, and the queue is in the global table if it is bound +// (i.e., `addr` is not `None`). +pub(super) struct MessageReceiver { + // `addr` should be dropped as soon as the socket file is closed, + // so it must not belong to `MessageQueue`. + addr: SpinLock>, + queue: Arc, +} + +impl MessageReceiver { + pub(super) fn new() -> MessageReceiver { + let inner = Inner { + messages: VecDeque::new(), + total_length: 0, + is_shutdown: false, + }; + + let queue = MessageQueue { + addr: Once::new(), + inner: Mutex::new(Some(inner)), + pollee: Pollee::new(), + send_wait_queue: WaitQueue::new(), + is_pass_cred: AtomicBool::new(false), + }; + + Self { + addr: SpinLock::new(None), + queue: Arc::new(queue), + } + } + + pub(super) fn bind(&self, addr_to_bind: UnixSocketAddr) -> Result<()> { + let mut addr = self.addr.lock(); + + if addr.is_some() { + return addr_to_bind.bind_unnamed(); + } + + let bound_addr = addr_to_bind.bind()?; + QUEUE_TABLE.add_queue(bound_addr.to_key(), self.queue.clone()); + self.queue.addr.call_once(|| bound_addr.clone().into()); + *addr = Some(bound_addr); + + Ok(()) + } + + pub(super) fn try_recv( + &self, + writer: &mut dyn MultiWrite, + ) -> Result<(usize, Vec, UnixSocketAddr)> { + let mut inner = self.queue.inner.lock(); + let inner = inner.as_mut().unwrap(); + + let Some(msg) = inner.messages.front() else { + if !inner.is_shutdown { + return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty"); + } else { + return Ok((0, Vec::new(), UnixSocketAddr::Unnamed)); + } + }; + + let len = writer.write(&mut VmReader::from(msg.bytes.as_slice()))?; + if len != msg.bytes.len() { + warn!("setting MSG_TRUNC is not supported"); + } + + let mut msg = inner.messages.pop_front().unwrap(); + inner.total_length -= msg.bytes.len(); + + let is_pass_cred = self.queue.is_pass_cred.load(Ordering::Relaxed); + let ctrl_msgs = msg.aux.generate_control(is_pass_cred); + + self.queue.pollee.invalidate(); + // A writer may still fail if the free space is not enough. + // So we have to wake up all the writers here. + self.queue.send_wait_queue.wake_all(); + + Ok((len, ctrl_msgs, msg.src)) + } + + pub(super) fn shutdown(&self) { + let mut inner = self.queue.inner.lock(); + let inner = inner.as_mut().unwrap(); + + inner.is_shutdown = true; + self.queue.send_wait_queue.wake_all(); + + // The caller will notify the pollee. + } + + pub(super) fn set_pass_cred(&self, is_pass_cred: bool) { + self.queue + .is_pass_cred + .store(is_pass_cred, Ordering::Relaxed); + } + + pub(super) fn addr(&self) -> UnixSocketAddr { + self.queue.addr() + } + + pub(super) fn queue(&self) -> &Arc { + &self.queue + } + + pub(super) fn pollee(&self) -> &Pollee { + &self.queue.pollee + } + + pub(super) fn check_io_events(&self) -> IoEvents { + let inner = self.queue.inner.lock(); + let inner = inner.as_ref().unwrap(); + + if inner.is_shutdown { + IoEvents::IN | IoEvents::RDHUP + } else if !inner.messages.is_empty() { + IoEvents::IN + } else { + IoEvents::empty() + } + } +} + +impl Drop for MessageReceiver { + fn drop(&mut self) { + if let Some(addr) = self.addr.get_mut().as_mut() { + QUEUE_TABLE.remove_queue(&addr.to_key()); + } + + *self.queue.inner.lock() = None; + self.queue.send_wait_queue.wake_all(); + } +} + +static QUEUE_TABLE: QueueTable = QueueTable::new(); + +struct QueueTable { + message_queues: RwLock>>, +} + +impl QueueTable { + pub(self) const fn new() -> Self { + Self { + message_queues: RwLock::new(BTreeMap::new()), + } + } + + pub(self) fn add_queue(&self, addr_key: UnixSocketAddrKey, queue: Arc) { + let old_queue = self.message_queues.write().insert(addr_key, queue); + debug_assert!(old_queue.is_none()); + } + + pub(self) fn get_queue(&self, addr_key: &UnixSocketAddrKey) -> Option> { + self.message_queues.read().get(addr_key).cloned() + } + + pub(self) fn remove_queue(&self, addr_key: &UnixSocketAddrKey) { + let old_queue = self.message_queues.write().remove(addr_key); + debug_assert!(old_queue.is_some()); + } +} + +pub(in crate::net) const UNIX_DATAGRAM_DEFAULT_BUF_SIZE: usize = 65536; diff --git a/kernel/src/net/socket/unix/datagram/mod.rs b/kernel/src/net/socket/unix/datagram/mod.rs new file mode 100644 index 000000000..a1d465452 --- /dev/null +++ b/kernel/src/net/socket/unix/datagram/mod.rs @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: MPL-2.0 + +mod message; +mod socket; + +pub(in crate::net) use message::UNIX_DATAGRAM_DEFAULT_BUF_SIZE; +pub use socket::UnixDatagramSocket; diff --git a/kernel/src/net/socket/unix/datagram/socket.rs b/kernel/src/net/socket/unix/datagram/socket.rs new file mode 100644 index 000000000..e21a7d81e --- /dev/null +++ b/kernel/src/net/socket/unix/datagram/socket.rs @@ -0,0 +1,289 @@ +// SPDX-License-Identifier: MPL-2.0 + +use core::sync::atomic::{AtomicBool, Ordering}; + +use super::message::{MessageQueue, MessageReceiver}; +use crate::{ + events::IoEvents, + net::socket::{ + options::SocketOption, + private::SocketPrivate, + unix::{ctrl_msg::AuxiliaryData, UnixSocketAddr}, + util::{ + options::{GetSocketLevelOption, SetSocketLevelOption, SocketOptionSet}, + MessageHeader, SendRecvFlags, SockShutdownCmd, SocketAddr, + }, + Socket, + }, + prelude::*, + process::signal::{PollHandle, Pollable}, + util::{MultiRead, MultiWrite}, +}; + +pub struct UnixDatagramSocket { + local_receiver: MessageReceiver, + remote_queue: RwLock>>, + options: RwLock, + + is_nonblocking: AtomicBool, + is_write_shutdown: AtomicBool, +} + +#[derive(Clone, Debug)] +struct OptionSet { + socket: SocketOptionSet, +} + +impl OptionSet { + pub(self) fn new() -> Self { + Self { + socket: SocketOptionSet::new_unix_datagram(), + } + } +} + +impl UnixDatagramSocket { + pub fn new(is_nonblocking: bool) -> Arc { + Arc::new(Self::new_raw(is_nonblocking)) + } + + pub fn new_pair(is_nonblocking: bool) -> (Arc, Arc) { + let mut socket_a = Self::new_raw(is_nonblocking); + let mut socket_b = Self::new_raw(is_nonblocking); + + let remote_queue_a = socket_a.remote_queue.get_mut(); + let remote_queue_b = socket_b.remote_queue.get_mut(); + + *remote_queue_a = Some(socket_b.local_receiver.queue().clone()); + *remote_queue_b = Some(socket_a.local_receiver.queue().clone()); + + (Arc::new(socket_a), Arc::new(socket_b)) + } + + fn new_raw(is_nonblocking: bool) -> Self { + Self { + local_receiver: MessageReceiver::new(), + remote_queue: RwLock::new(None), + options: RwLock::new(OptionSet::new()), + is_nonblocking: AtomicBool::new(is_nonblocking), + is_write_shutdown: AtomicBool::new(false), + } + } + + fn do_send( + &self, + reader: &mut dyn MultiRead, + mut aux_data: AuxiliaryData, + remote: Option, + _flags: SendRecvFlags, + ) -> Result { + if self.is_write_shutdown.load(Ordering::Relaxed) { + return_errno_with_message!(Errno::EPIPE, "the socket is shut down for writing"); + } + + let queue = if let Some(remote_addr) = remote.as_ref() { + let connected_addr = remote_addr.connect()?; + MessageQueue::lookup_bound(&connected_addr)? + } else { + let remote_queue = self.remote_queue.read(); + remote_queue.clone().ok_or_else(|| { + Error::with_message(Errno::ENOTCONN, "the socket is not connected") + })? + }; + + let res = if self.is_nonblocking() { + queue.try_send(reader, &mut aux_data, &self.local_receiver) + } else { + queue.block_send(|| queue.try_send(reader, &mut aux_data, &self.local_receiver)) + }; + + // A connected socket will automatically be disconnected if the remote has been closed. + if remote.is_none() && res.is_err_and(|err| err.error() == Errno::ECONNREFUSED) { + let mut remote_queue = self.remote_queue.write(); + // Check to ensure that we are still connected to the same remote. + if remote_queue + .as_ref() + .is_some_and(|remote| Arc::ptr_eq(remote, &queue)) + { + *remote_queue = None; + } + } + + res + } + + fn check_io_events(&self) -> IoEvents { + // POLLOUT should be reported as long as there is space in the socket's send buffer. + // Currently, we only limit the size of the receive buffer, not the send buffer. Therefore, + // POLLOUT is always reported. + let mut io_events = IoEvents::OUT; + + io_events |= self.local_receiver.check_io_events(); + + if self.is_write_shutdown.load(Ordering::Relaxed) && io_events.contains(IoEvents::RDHUP) { + io_events |= IoEvents::HUP; + } + + io_events + } +} + +impl Pollable for UnixDatagramSocket { + fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents { + self.local_receiver + .pollee() + .poll_with(mask, poller, || self.check_io_events()) + } +} + +impl SocketPrivate for UnixDatagramSocket { + fn is_nonblocking(&self) -> bool { + self.is_nonblocking.load(Ordering::Relaxed) + } + + fn set_nonblocking(&self, nonblocking: bool) { + self.is_nonblocking.store(nonblocking, Ordering::Relaxed); + } +} + +impl Socket for UnixDatagramSocket { + fn bind(&self, socket_addr: SocketAddr) -> Result<()> { + let addr = UnixSocketAddr::try_from(socket_addr)?; + self.local_receiver.bind(addr) + } + + fn connect(&self, socket_addr: SocketAddr) -> Result<()> { + let remote_addr = UnixSocketAddr::try_from(socket_addr)?; + + let connected_addr = remote_addr.connect()?; + let queue = MessageQueue::lookup_bound(&connected_addr)?; + + let mut remote_queue = self.remote_queue.write(); + *remote_queue = Some(queue); + + Ok(()) + } + + fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { + let mut io_events = IoEvents::empty(); + + if cmd.shut_read() { + self.local_receiver.shutdown(); + io_events |= IoEvents::IN | IoEvents::RDHUP | IoEvents::HUP; + } + + if cmd.shut_write() { + self.is_write_shutdown.store(true, Ordering::Relaxed); + io_events |= IoEvents::HUP; + } + + self.local_receiver.pollee().notify(io_events); + + Ok(()) + } + + fn addr(&self) -> Result { + Ok(self.local_receiver.addr().into()) + } + + fn peer_addr(&self) -> Result { + let remote_queue = self.remote_queue.read(); + match remote_queue.as_ref() { + Some(queue) => Ok(queue.addr().into()), + None => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"), + } + } + + fn get_option(&self, option: &mut dyn SocketOption) -> Result<()> { + let options = self.options.read(); + + // Deal with socket-level options + match options.socket.get_option(option, &self.local_receiver) { + Err(err) if err.error() == Errno::ENOPROTOOPT => (), + res => return res, + } + + // TODO: Deal with socket options from other levels + warn!("only socket-level options are supported"); + + return_errno_with_message!(Errno::ENOPROTOOPT, "the socket option to get is unknown") + } + + fn set_option(&self, option: &dyn SocketOption) -> Result<()> { + let mut options = self.options.write(); + + match options.socket.set_option(option, &self.local_receiver) { + Ok(_) => Ok(()), + Err(err) if err.error() == Errno::ENOPROTOOPT => { + // TODO: Deal with socket options from other levels + warn!("only socket-level options are supported"); + return_errno_with_message!( + Errno::ENOPROTOOPT, + "the socket option to get is unknown" + ) + } + Err(e) => Err(e), + } + } + + fn sendmsg( + &self, + reader: &mut dyn MultiRead, + message_header: MessageHeader, + flags: SendRecvFlags, + ) -> Result { + // TODO: Deal with flags + if !flags.is_all_supported() { + warn!("unsupported flags: {:?}", flags); + } + + let MessageHeader { + addr, + control_messages, + } = message_header; + + let remote_addr = match addr { + Some(addr) => Some(addr.try_into()?), + None => None, + }; + + let auxiliary_data = AuxiliaryData::from_control(control_messages)?; + + self.do_send(reader, auxiliary_data, remote_addr, flags) + } + + fn recvmsg( + &self, + writer: &mut dyn MultiWrite, + flags: SendRecvFlags, + ) -> Result<(usize, MessageHeader)> { + // TODO: Deal with flags + if !flags.is_all_supported() { + warn!("unsupported flags: {:?}", flags); + } + + let (received_bytes, control_messages, peer_addr) = + self.block_on(IoEvents::IN, || self.local_receiver.try_recv(writer))?; + + let message_header = MessageHeader::new(Some(peer_addr.into()), control_messages); + + Ok((received_bytes, message_header)) + } +} + +impl GetSocketLevelOption for MessageReceiver { + fn is_listening(&self) -> bool { + false + } +} + +impl SetSocketLevelOption for MessageReceiver { + fn set_pass_cred(&self, pass_cred: bool) { + // TODO: According to the Linux man pages, "When this option is set and the socket + // is not yet connected, a unique name in the abstract namespace will be generated + // automatically." See for + // details. + + self.set_pass_cred(pass_cred); + } +} diff --git a/kernel/src/net/socket/unix/mod.rs b/kernel/src/net/socket/unix/mod.rs index 1184d464f..475ecdbdf 100644 --- a/kernel/src/net/socket/unix/mod.rs +++ b/kernel/src/net/socket/unix/mod.rs @@ -3,11 +3,14 @@ mod addr; mod cred; mod ctrl_msg; +mod datagram; mod ns; mod stream; pub use addr::UnixSocketAddr; pub use cred::CUserCred; pub(super) use ctrl_msg::UnixControlMessage; +pub use datagram::UnixDatagramSocket; +pub(super) use datagram::UNIX_DATAGRAM_DEFAULT_BUF_SIZE; pub use stream::UnixStreamSocket; pub(super) use stream::UNIX_STREAM_DEFAULT_BUF_SIZE; diff --git a/kernel/src/net/socket/util/options.rs b/kernel/src/net/socket/util/options.rs index 4d0c9bb03..343aae460 100644 --- a/kernel/src/net/socket/util/options.rs +++ b/kernel/src/net/socket/util/options.rs @@ -14,7 +14,7 @@ use crate::{ AcceptConn, KeepAlive, Linger, PassCred, PeerCred, PeerGroups, Priority, RecvBuf, RecvBufForce, ReuseAddr, ReusePort, SendBuf, SendBufForce, SocketOption, }, - unix::{CUserCred, UNIX_STREAM_DEFAULT_BUF_SIZE}, + unix::{CUserCred, UNIX_DATAGRAM_DEFAULT_BUF_SIZE, UNIX_STREAM_DEFAULT_BUF_SIZE}, }, prelude::*, process::{credentials::capabilities::CapSet, posix_thread::AsPosixThread}, @@ -77,6 +77,15 @@ impl SocketOptionSet { } } + /// Returns the default socket level options for unix datagram socket. + pub(in crate::net) fn new_unix_datagram() -> Self { + Self { + send_buf: UNIX_DATAGRAM_DEFAULT_BUF_SIZE as u32, + recv_buf: UNIX_DATAGRAM_DEFAULT_BUF_SIZE as u32, + ..Default::default() + } + } + /// Gets socket-level options. /// /// Note that the socket error has to be handled separately, because it is automatically diff --git a/kernel/src/syscall/socket.rs b/kernel/src/syscall/socket.rs index fd0a1414c..b085b161e 100644 --- a/kernel/src/syscall/socket.rs +++ b/kernel/src/syscall/socket.rs @@ -8,7 +8,7 @@ use crate::{ netlink::{ is_valid_protocol, NetlinkRouteSocket, NetlinkUeventSocket, StandardNetlinkProtocol, }, - unix::UnixStreamSocket, + unix::{UnixDatagramSocket, UnixStreamSocket}, vsock::VsockStreamSocket, }, prelude::*, @@ -32,6 +32,9 @@ pub fn sys_socket(domain: i32, type_: i32, protocol: i32, ctx: &Context) -> Resu (CSocketAddrFamily::AF_UNIX, SockType::SOCK_SEQPACKET) => { UnixStreamSocket::new(is_nonblocking, true) as Arc } + (CSocketAddrFamily::AF_UNIX, SockType::SOCK_RAW | SockType::SOCK_DGRAM) => { + UnixDatagramSocket::new(is_nonblocking) as Arc + } (CSocketAddrFamily::AF_INET, SockType::SOCK_STREAM) => { let protocol = Protocol::try_from(protocol)?; debug!("protocol = {:?}", protocol); diff --git a/kernel/src/syscall/socketpair.rs b/kernel/src/syscall/socketpair.rs index 0aad21e93..ca46f0cda 100644 --- a/kernel/src/syscall/socketpair.rs +++ b/kernel/src/syscall/socketpair.rs @@ -2,8 +2,11 @@ use super::SyscallReturn; use crate::{ - fs::file_table::{FdFlags, FileDesc}, - net::socket::unix::UnixStreamSocket, + fs::{ + file_handle::FileLike, + file_table::{FdFlags, FileDesc}, + }, + net::socket::unix::{UnixDatagramSocket, UnixStreamSocket}, prelude::*, util::net::{CSocketAddrFamily, Protocol, SockFlags, SockType, SOCK_TYPE_MASK}, }; @@ -24,18 +27,27 @@ pub fn sys_socketpair( domain, sock_type, sock_flags, protocol ); - // TODO: deal with all sock_flags and protocol + macro_rules! file_pair { + ($expr:expr) => {{ + let (socket_a, socket_b) = $expr; + (socket_a as Arc, socket_b as Arc) + }}; + } + let nonblocking = sock_flags.contains(SockFlags::SOCK_NONBLOCK); let (socket_a, socket_b) = match (domain, sock_type) { (CSocketAddrFamily::AF_UNIX, SockType::SOCK_STREAM) => { - UnixStreamSocket::new_pair(nonblocking, false) + file_pair!(UnixStreamSocket::new_pair(nonblocking, false)) } (CSocketAddrFamily::AF_UNIX, SockType::SOCK_SEQPACKET) => { - UnixStreamSocket::new_pair(nonblocking, true) + file_pair!(UnixStreamSocket::new_pair(nonblocking, true)) + } + (CSocketAddrFamily::AF_UNIX, SockType::SOCK_RAW | SockType::SOCK_DGRAM) => { + file_pair!(UnixDatagramSocket::new_pair(nonblocking)) } _ => return_errno_with_message!( Errno::EAFNOSUPPORT, - "cannot create socket pair for this family" + "creating a socket pair for this family is not supported" ), }; diff --git a/test/src/apps/network/unix_datagram_err.c b/test/src/apps/network/unix_datagram_err.c new file mode 100644 index 000000000..b4766da83 --- /dev/null +++ b/test/src/apps/network/unix_datagram_err.c @@ -0,0 +1,483 @@ +// SPDX-License-Identifier: MPL-2.0 + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include + +#include "../test.h" + +static int sk_unbound; +static int sk_bound; +static int sk_connected; + +#define UNIX_ADDR(path) \ + ((struct sockaddr_un){ .sun_family = AF_UNIX, .sun_path = path }) + +#define PATH_OFFSET offsetof(struct sockaddr_un, sun_path) + +#define UNNAMED_ADDR UNIX_ADDR("") +#define UNNAMED_ADDRLEN PATH_OFFSET + +#define BOUND_ADDR UNIX_ADDR("//tmp/B0") +#define BOUND_ADDRLEN (PATH_OFFSET + 9) + +FN_SETUP(unbound) +{ + sk_unbound = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0)); +} +END_SETUP() + +FN_SETUP(bound) +{ + sk_bound = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0)); + + CHECK(bind(sk_bound, (struct sockaddr *)&BOUND_ADDR, BOUND_ADDRLEN)); +} +END_SETUP() + +FN_SETUP(connected) +{ + sk_connected = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0)); + + CHECK(connect(sk_connected, (struct sockaddr *)&BOUND_ADDR, + BOUND_ADDRLEN)); +} +END_SETUP() + +FN_TEST(getsockname) +{ + struct sockaddr_un addr; + socklen_t addrlen; + + addrlen = sizeof(addr); + TEST_RES(getsockname(sk_unbound, (struct sockaddr *)&addr, &addrlen), + addrlen == UNNAMED_ADDRLEN && + memcmp(&addr, &UNNAMED_ADDR, UNNAMED_ADDRLEN) == 0); + + addrlen = sizeof(addr); + TEST_RES(getsockname(sk_bound, (struct sockaddr *)&addr, &addrlen), + addrlen == BOUND_ADDRLEN && + memcmp(&addr, &BOUND_ADDR, BOUND_ADDRLEN) == 0); + + addrlen = sizeof(addr); + TEST_RES(getsockname(sk_connected, (struct sockaddr *)&addr, &addrlen), + addrlen == UNNAMED_ADDRLEN && + memcmp(&addr, &UNNAMED_ADDR, UNNAMED_ADDRLEN) == 0); +} +END_TEST() + +FN_TEST(getpeername) +{ + struct sockaddr_un addr; + socklen_t addrlen; + + addrlen = sizeof(addr); + TEST_ERRNO(getpeername(sk_unbound, (struct sockaddr *)&addr, &addrlen), + ENOTCONN); + + addrlen = sizeof(addr); + TEST_ERRNO(getpeername(sk_bound, (struct sockaddr *)&addr, &addrlen), + ENOTCONN); + + addrlen = sizeof(addr); + TEST_RES(getpeername(sk_connected, (struct sockaddr *)&addr, &addrlen), + addrlen == BOUND_ADDRLEN && + memcmp(&addr, &BOUND_ADDR, BOUND_ADDRLEN) == 0); +} +END_TEST() + +FN_TEST(bind) +{ + TEST_ERRNO(bind(sk_bound, (struct sockaddr *)&UNIX_ADDR("\0Z"), + PATH_OFFSET + 1), + EINVAL); + + TEST_SUCC(bind(sk_bound, (struct sockaddr *)&UNNAMED_ADDR, + UNNAMED_ADDRLEN)); +} +END_TEST() + +FN_TEST(bind_connected) +{ + int fildes[2], sk; + struct sockaddr_un addr; + socklen_t addrlen; + + TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); + sk = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0)); + + TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2)); + addrlen = sizeof(addr); + TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen), + addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"), + PATH_OFFSET + 2) == 0); + + TEST_SUCC(bind(fildes[1], (struct sockaddr *)&UNIX_ADDR("\0Y"), + PATH_OFFSET + 2)); + addrlen = sizeof(addr); + TEST_RES(getpeername(fildes[0], (struct sockaddr *)&addr, &addrlen), + addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0Y"), + PATH_OFFSET + 2) == 0); + + TEST_ERRNO(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0Z"), + PATH_OFFSET + 2), + EINVAL); + TEST_ERRNO(bind(fildes[1], (struct sockaddr *)&UNIX_ADDR("\0Z"), + PATH_OFFSET + 2), + EINVAL); + TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNNAMED_ADDR, + UNNAMED_ADDRLEN)); + TEST_SUCC(bind(fildes[1], (struct sockaddr *)&UNNAMED_ADDR, + UNNAMED_ADDRLEN)); + + // Closing the socket will release the bound address. + // So another socket can bind to it again. + TEST_ERRNO(bind(sk, (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2), + EADDRINUSE); + TEST_SUCC(close(fildes[0])); + TEST_SUCC(bind(sk, (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2)); + + // But the released address is still "visible" from + // the previously connected socket. + addrlen = sizeof(addr); + TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen), + addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"), + PATH_OFFSET + 2) == 0); + + TEST_SUCC(close(fildes[1])); + TEST_SUCC(close(sk)); +} +END_TEST() + +FN_TEST(connect) +{ + TEST_ERRNO(connect(sk_unbound, (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2), + ECONNREFUSED); + + TEST_ERRNO(connect(sk_bound, (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2), + ECONNREFUSED); + + TEST_SUCC(connect(sk_connected, (struct sockaddr *)&BOUND_ADDR, + BOUND_ADDRLEN)); + + TEST_ERRNO(connect(sk_connected, (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2), + ECONNREFUSED); +} +END_TEST() + +FN_TEST(listen) +{ + TEST_ERRNO(listen(sk_unbound, 10), EOPNOTSUPP); + + TEST_ERRNO(listen(sk_bound, 10), EOPNOTSUPP); + + TEST_ERRNO(listen(sk_connected, 10), EOPNOTSUPP); +} +END_TEST() + +FN_TEST(accept) +{ + TEST_ERRNO(accept(sk_unbound, NULL, NULL), EOPNOTSUPP); + + TEST_ERRNO(accept(sk_bound, NULL, NULL), EOPNOTSUPP); + + TEST_ERRNO(accept(sk_connected, NULL, NULL), EOPNOTSUPP); +} +END_TEST() + +FN_TEST(send) +{ + char buf[1] = { 'z' }; + + TEST_ERRNO(send(sk_unbound, buf, 1, 0), ENOTCONN); + TEST_ERRNO(send(sk_unbound, buf, 0, 0), ENOTCONN); + TEST_ERRNO(write(sk_unbound, buf, 1), ENOTCONN); + TEST_ERRNO(write(sk_unbound, buf, 0), ENOTCONN); + + TEST_ERRNO(send(sk_bound, buf, 1, 0), ENOTCONN); + TEST_ERRNO(send(sk_bound, buf, 0, 0), ENOTCONN); + TEST_ERRNO(write(sk_bound, buf, 1), ENOTCONN); + TEST_ERRNO(write(sk_bound, buf, 0), ENOTCONN); +} +END_TEST() + +FN_TEST(recv) +{ + char buf[1] = { 'z' }; + + TEST_ERRNO(recv(sk_unbound, buf, 1, 0), EAGAIN); + TEST_ERRNO(recv(sk_unbound, buf, 0, 0), EAGAIN); + TEST_ERRNO(read(sk_unbound, buf, 1), EAGAIN); + TEST_SUCC(read(sk_unbound, buf, 0)); + + TEST_ERRNO(recv(sk_bound, buf, 1, 0), EAGAIN); + TEST_ERRNO(recv(sk_bound, buf, 0, 0), EAGAIN); + TEST_ERRNO(read(sk_bound, buf, 1), EAGAIN); + TEST_SUCC(read(sk_bound, buf, 0)); +} +END_TEST() + +FN_TEST(blocking_recv) +{ + int i; + int sk1, sk2; + int pid; + char buf[20]; + + // Setup + + sk1 = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0)); + TEST_SUCC(bind(sk1, (struct sockaddr *)&UNIX_ADDR("\0"), + PATH_OFFSET + 1)); + + sk2 = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0)); + TEST_SUCC(connect(sk2, (struct sockaddr *)&UNIX_ADDR("\0"), + PATH_OFFSET + 1)); + +#define MAKE_TEST(child, retval) \ + pid = TEST_SUCC(fork()); \ + if (pid == 0) { \ + usleep(300 * 1000); \ + CHECK(child); \ + exit(0); \ + } \ + \ + TEST_RES(recv(sk1, buf, sizeof(buf), 0), _ret == retval); \ + TEST_SUCC(wait(NULL)); + + // Test 1: Sends a message resumes the blocked receiving + MAKE_TEST(send(sk2, "hello", 5, 0), 5); + + // Test 2: Shuts down for reading resumes the blocked receiving + MAKE_TEST(shutdown(sk1, SHUT_RD), 0); + +#undef MAKE_TEST + + // Clean up + + TEST_SUCC(close(sk1)); + TEST_SUCC(close(sk2)); +} +END_TEST() + +FN_TEST(send_recv_trunc) +{ + char buf[1]; + + TEST_SUCC(send(sk_connected, "abc", 3, 0)); + TEST_SUCC(send(sk_connected, "def", 3, 0)); + TEST_SUCC(send(sk_connected, "hij", 3, 0)); + + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'a'); + TEST_RES(recv(sk_bound, buf, 0, 0), _ret == 0); + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'h'); +} +END_TEST() + +FN_TEST(send_recv_zero) +{ + char buf[1]; + + buf[0] = 'a'; + TEST_SUCC(send(sk_connected, buf, 1, 0)); + buf[0] = 'b'; + TEST_SUCC(send(sk_connected, buf, 0, 0)); + buf[0] = 'c'; + TEST_SUCC(send(sk_connected, buf, 0, 0)); + buf[0] = 'd'; + TEST_SUCC(send(sk_connected, buf, 1, 0)); + + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'a'); + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 0 && buf[0] == 'a'); + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 0 && buf[0] == 'a'); + TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'd'); +} +END_TEST() + +FN_TEST(shutdown_connected) +{ + int fildes[2]; + + TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); + + TEST_SUCC(shutdown(fildes[0], SHUT_RD)); + TEST_SUCC(shutdown(fildes[0], SHUT_WR)); + TEST_SUCC(shutdown(fildes[0], SHUT_RDWR)); + + TEST_SUCC(shutdown(fildes[0], SHUT_RD)); + TEST_SUCC(shutdown(fildes[0], SHUT_WR)); + TEST_SUCC(shutdown(fildes[0], SHUT_RDWR)); + + TEST_SUCC(close(fildes[0])); + TEST_SUCC(close(fildes[1])); +} +END_TEST() + +FN_TEST(shutdown_close_send) +{ + int fildes[2]; + struct sockaddr_un addr; + socklen_t addrlen; + + TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); + TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0X"), + PATH_OFFSET + 2)); + + // Test 1: Sending a message after shutting down the receiver. + TEST_SUCC(shutdown(fildes[0], SHUT_RDWR)); + TEST_ERRNO(send(fildes[1], "", 0, 0), EPIPE); + + // The socket is still connected. + addrlen = sizeof(addr); + TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen), + addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"), + PATH_OFFSET + 2) == 0); + + // Test 2: Sending a message after closing the receiver. + TEST_SUCC(close(fildes[0])); + TEST_ERRNO(send(fildes[1], "", 0, 0), ECONNREFUSED); + + // The socket is no longer connected. + TEST_ERRNO(send(fildes[1], "", 0, 0), ENOTCONN); + TEST_ERRNO(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen), + ENOTCONN); + + TEST_SUCC(close(fildes[1])); +} +END_TEST() + +FN_TEST(poll) +{ + int sk; + struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP }; + + sk = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0)); + pfd.fd = sk; + + TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); + + TEST_SUCC(shutdown(sk, SHUT_WR)); + TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); + + TEST_SUCC(shutdown(sk, SHUT_RD)); + TEST_RES(poll(&pfd, 1, 0), + pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP)); + + TEST_SUCC( + bind(sk, (struct sockaddr *)&UNIX_ADDR("\0"), PATH_OFFSET + 1)); + TEST_RES(poll(&pfd, 1, 0), + pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP)); + + TEST_SUCC(connect(sk, (struct sockaddr *)&BOUND_ADDR, BOUND_ADDRLEN)); + TEST_RES(poll(&pfd, 1, 0), + pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP)); + + TEST_SUCC(close(sk)); +} +END_TEST() + +FN_TEST(poll_connected_close) +{ + int fildes[2]; + struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP }; + + TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); + + pfd.fd = fildes[1]; + TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); + + TEST_SUCC(close(fildes[0])); + + pfd.fd = fildes[1]; + TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); + + TEST_SUCC(close(fildes[1])); +} +END_TEST() + +FN_TEST(poll_connected_shutdown) +{ + int fildes[2]; + struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP }; + +#define MAKE_TEST(shut, ev1) \ + TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); \ + \ + TEST_SUCC(shutdown(fildes[0], shut)); \ + \ + pfd.fd = fildes[0]; \ + TEST_RES(poll(&pfd, 1, 0), pfd.revents == (ev1)); \ + \ + pfd.fd = fildes[1]; \ + TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); \ + \ + TEST_SUCC(close(fildes[0])); \ + TEST_SUCC(close(fildes[1])); + + MAKE_TEST(SHUT_RD, POLLIN | POLLOUT | POLLRDHUP); + + MAKE_TEST(SHUT_WR, POLLOUT); + + MAKE_TEST(SHUT_RDWR, POLLIN | POLLOUT | POLLRDHUP | POLLHUP); + +#undef MAKE_TEST +} +END_TEST() + +// See also `zero_reads_always_succeed` in `pipe_err.c` +FN_TEST(zero_recvs_may_fail) +{ + int fildes[2]; + char buf[1] = { 'z' }; + + TEST_SUCC(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, fildes)); + + TEST_ERRNO(recv(fildes[0], buf, 0, 0), EAGAIN); + + TEST_RES(send(fildes[1], buf, 1, 0), _ret == 1); + TEST_SUCC(recv(fildes[0], buf, 0, 0)); + + TEST_SUCC(close(fildes[0])); + TEST_SUCC(close(fildes[1])); +} +END_TEST() + +// See also `zero_writes_always_succeed` in `pipe_err.c` +FN_TEST(zero_sends_may_fail) +{ + int fildes[2]; + char buf[1] = { 'z' }; + + TEST_SUCC(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, fildes)); + + TEST_SUCC(send(fildes[1], buf, 0, 0)); + + TEST_SUCC(close(fildes[0])); + TEST_ERRNO(send(fildes[1], buf, 0, 0), ECONNREFUSED); + + TEST_SUCC(close(fildes[1])); +} +END_TEST() + +FN_SETUP(cleanup) +{ + CHECK(close(sk_unbound)); + + CHECK(close(sk_bound)); + + CHECK(close(sk_connected)); + + CHECK(unlink(BOUND_ADDR.sun_path)); +} +END_SETUP() diff --git a/test/src/apps/scripts/network.sh b/test/src/apps/scripts/network.sh index 2fcb6e73c..471f6a3ed 100755 --- a/test/src/apps/scripts/network.sh +++ b/test/src/apps/scripts/network.sh @@ -36,6 +36,7 @@ sleep 0.2 ./udp_err ./unix_stream_err ./unix_seqpacket_err +./unix_datagram_err ./netlink_route ./rtnl_err diff --git a/test/src/syscall/gvisor/Makefile b/test/src/syscall/gvisor/Makefile index fbe30b8d1..4ae1a432f 100644 --- a/test/src/syscall/gvisor/Makefile +++ b/test/src/syscall/gvisor/Makefile @@ -51,9 +51,12 @@ TESTS ?= \ sigaltstack_test \ signalfd_test \ socket_netlink_route_test \ + socket_unix_dgram_local_test \ + socket_unix_dgram_non_blocking_test \ socket_unix_pair_test \ socket_unix_seqpacket_local_test \ socket_unix_stream_test \ + socket_unix_unbound_dgram_test \ socket_unix_unbound_seqpacket_test \ socket_unix_unbound_stream_test \ stat_test \ diff --git a/test/src/syscall/gvisor/blocklists/socket_unix_dgram_local_test b/test/src/syscall/gvisor/blocklists/socket_unix_dgram_local_test new file mode 100644 index 000000000..95d6bdbc7 --- /dev/null +++ b/test/src/syscall/gvisor/blocklists/socket_unix_dgram_local_test @@ -0,0 +1,13 @@ +# TODO: Support `MSG_DONTWAIT` and `MSG_PEEK` +DgramUnixSockets/NonStreamSocketPairTest.SplitRecv/* +DgramUnixSockets/NonStreamSocketPairTest.SinglePeek/* +DgramUnixSockets/NonStreamSocketPairTest.RecvmsgTruncPeekDontwaitZeroLen/* + +# TODO: Support `SO_SNDTIMEO` +DgramUnixSockets/UnixNonStreamSocketPairTest.SendTimeout/* + +# TODO: Support `MSG_TRUNC` +DgramUnixSockets/NonStreamSocketPairTest.MsgTruncTruncation/* +DgramUnixSockets/NonStreamSocketPairTest.MsgTruncTruncationRecvmsgMsghdrFlagMsgTrunc/* +DgramUnixSockets/NonStreamSocketPairTest.RecvmsgMsgTruncZeroLen/* +DgramUnixSockets/NonStreamSocketPairTest.RecvmsgMsgTruncMsgPeekZeroLen/* diff --git a/test/src/syscall/gvisor/blocklists/socket_unix_pair_test b/test/src/syscall/gvisor/blocklists/socket_unix_pair_test index fe057a117..e4f8c1b4e 100644 --- a/test/src/syscall/gvisor/blocklists/socket_unix_pair_test +++ b/test/src/syscall/gvisor/blocklists/socket_unix_pair_test @@ -1,9 +1,3 @@ -# TODO: Support `SOCK_DGRAM` sockets -AllUnixDomainSockets/*/2 -AllUnixDomainSockets/*/3 -AllUnixDomainSockets/*/8 -AllUnixDomainSockets/*/9 - # TODO: Support the `recvmmsg` system call AllUnixDomainSockets/UnixSocketPairTest.RecvmmsgTimeoutAfterRecv/*