Support UNIX datagram sockets

This commit is contained in:
Ruihan Li 2025-09-05 23:33:32 +08:00 committed by Tate, Hongliang Tian
parent 87640d4b27
commit fe1d4fe15f
12 changed files with 1113 additions and 14 deletions

View File

@ -0,0 +1,282 @@
// SPDX-License-Identifier: MPL-2.0
use alloc::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
sync::Arc,
};
use core::sync::atomic::{AtomicBool, Ordering};
use ostd::sync::{RwLock, WaitQueue};
use spin::Once;
use crate::{
events::IoEvents,
net::socket::{
unix::{
addr::{UnixSocketAddrBound, UnixSocketAddrKey},
ctrl_msg::AuxiliaryData,
UnixSocketAddr,
},
util::ControlMessage,
},
prelude::*,
process::signal::Pollee,
util::{MultiRead, MultiWrite},
};
pub(super) struct MessageQueue {
addr: Once<UnixSocketAddr>,
inner: Mutex<Option<Inner>>,
is_pass_cred: AtomicBool,
pollee: Pollee,
send_wait_queue: WaitQueue,
}
struct Inner {
messages: VecDeque<Message>,
total_length: usize,
is_shutdown: bool,
}
struct Message {
bytes: Vec<u8>,
aux: AuxiliaryData,
src: UnixSocketAddr,
}
impl MessageQueue {
/// Looks up a message queue bound to the specific address in the global table.
pub(super) fn lookup_bound(addr: &UnixSocketAddrKey) -> Result<Arc<MessageQueue>> {
QUEUE_TABLE.get_queue(addr).ok_or_else(|| {
Error::with_message(Errno::ECONNREFUSED, "the target socket does not exist")
})
}
pub(super) fn try_send(
&self,
reader: &mut dyn MultiRead,
aux_data: &mut AuxiliaryData,
source: &MessageReceiver,
) -> Result<usize> {
let mut inner = self.inner.lock();
let Some(inner) = inner.as_mut() else {
return_errno_with_message!(Errno::ECONNREFUSED, "the target socket is closed");
};
if inner.is_shutdown {
return_errno_with_message!(Errno::EPIPE, "the target socket is shut down");
}
let len = reader.sum_lens();
if len > UNIX_DATAGRAM_DEFAULT_BUF_SIZE {
return_errno_with_message!(Errno::EMSGSIZE, "the message is too large");
}
if UNIX_DATAGRAM_DEFAULT_BUF_SIZE - inner.total_length < len {
return_errno_with_message!(
Errno::EAGAIN,
"the receive buffer does not have enough space"
);
}
let msg = {
let mut bytes = vec![0; len];
reader.read(&mut VmWriter::from(bytes.as_mut_slice()))?;
let mut aux = core::mem::take(aux_data);
if self.is_pass_cred.load(Ordering::Relaxed)
|| source.queue.is_pass_cred.load(Ordering::Relaxed)
{
aux.fill_cred();
}
let src = source.queue.addr();
Message { bytes, aux, src }
};
inner.total_length += msg.bytes.len();
inner.messages.push_back(msg);
self.pollee.notify(IoEvents::IN);
Ok(len)
}
pub(super) fn addr(&self) -> UnixSocketAddr {
self.addr.get().cloned().unwrap_or(UnixSocketAddr::Unnamed)
}
/// Blocks until the buffer is free and the `try_send` succeeds, or until interrupted.
pub(super) fn block_send<F, R>(&self, mut try_send: F) -> Result<R>
where
F: FnMut() -> Result<R>,
{
self.send_wait_queue.pause_until(|| match try_send() {
Err(err) if err.error() == Errno::EAGAIN => None,
result => Some(result),
})?
}
}
// Note that a message receiver corresponds to a live socket and maintains certain invariants. For
// instance, `queue.inner` is always `Some(_)`, and the queue is in the global table if it is bound
// (i.e., `addr` is not `None`).
pub(super) struct MessageReceiver {
// `addr` should be dropped as soon as the socket file is closed,
// so it must not belong to `MessageQueue`.
addr: SpinLock<Option<UnixSocketAddrBound>>,
queue: Arc<MessageQueue>,
}
impl MessageReceiver {
pub(super) fn new() -> MessageReceiver {
let inner = Inner {
messages: VecDeque::new(),
total_length: 0,
is_shutdown: false,
};
let queue = MessageQueue {
addr: Once::new(),
inner: Mutex::new(Some(inner)),
pollee: Pollee::new(),
send_wait_queue: WaitQueue::new(),
is_pass_cred: AtomicBool::new(false),
};
Self {
addr: SpinLock::new(None),
queue: Arc::new(queue),
}
}
pub(super) fn bind(&self, addr_to_bind: UnixSocketAddr) -> Result<()> {
let mut addr = self.addr.lock();
if addr.is_some() {
return addr_to_bind.bind_unnamed();
}
let bound_addr = addr_to_bind.bind()?;
QUEUE_TABLE.add_queue(bound_addr.to_key(), self.queue.clone());
self.queue.addr.call_once(|| bound_addr.clone().into());
*addr = Some(bound_addr);
Ok(())
}
pub(super) fn try_recv(
&self,
writer: &mut dyn MultiWrite,
) -> Result<(usize, Vec<ControlMessage>, UnixSocketAddr)> {
let mut inner = self.queue.inner.lock();
let inner = inner.as_mut().unwrap();
let Some(msg) = inner.messages.front() else {
if !inner.is_shutdown {
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty");
} else {
return Ok((0, Vec::new(), UnixSocketAddr::Unnamed));
}
};
let len = writer.write(&mut VmReader::from(msg.bytes.as_slice()))?;
if len != msg.bytes.len() {
warn!("setting MSG_TRUNC is not supported");
}
let mut msg = inner.messages.pop_front().unwrap();
inner.total_length -= msg.bytes.len();
let is_pass_cred = self.queue.is_pass_cred.load(Ordering::Relaxed);
let ctrl_msgs = msg.aux.generate_control(is_pass_cred);
self.queue.pollee.invalidate();
// A writer may still fail if the free space is not enough.
// So we have to wake up all the writers here.
self.queue.send_wait_queue.wake_all();
Ok((len, ctrl_msgs, msg.src))
}
pub(super) fn shutdown(&self) {
let mut inner = self.queue.inner.lock();
let inner = inner.as_mut().unwrap();
inner.is_shutdown = true;
self.queue.send_wait_queue.wake_all();
// The caller will notify the pollee.
}
pub(super) fn set_pass_cred(&self, is_pass_cred: bool) {
self.queue
.is_pass_cred
.store(is_pass_cred, Ordering::Relaxed);
}
pub(super) fn addr(&self) -> UnixSocketAddr {
self.queue.addr()
}
pub(super) fn queue(&self) -> &Arc<MessageQueue> {
&self.queue
}
pub(super) fn pollee(&self) -> &Pollee {
&self.queue.pollee
}
pub(super) fn check_io_events(&self) -> IoEvents {
let inner = self.queue.inner.lock();
let inner = inner.as_ref().unwrap();
if inner.is_shutdown {
IoEvents::IN | IoEvents::RDHUP
} else if !inner.messages.is_empty() {
IoEvents::IN
} else {
IoEvents::empty()
}
}
}
impl Drop for MessageReceiver {
fn drop(&mut self) {
if let Some(addr) = self.addr.get_mut().as_mut() {
QUEUE_TABLE.remove_queue(&addr.to_key());
}
*self.queue.inner.lock() = None;
self.queue.send_wait_queue.wake_all();
}
}
static QUEUE_TABLE: QueueTable = QueueTable::new();
struct QueueTable {
message_queues: RwLock<BTreeMap<UnixSocketAddrKey, Arc<MessageQueue>>>,
}
impl QueueTable {
pub(self) const fn new() -> Self {
Self {
message_queues: RwLock::new(BTreeMap::new()),
}
}
pub(self) fn add_queue(&self, addr_key: UnixSocketAddrKey, queue: Arc<MessageQueue>) {
let old_queue = self.message_queues.write().insert(addr_key, queue);
debug_assert!(old_queue.is_none());
}
pub(self) fn get_queue(&self, addr_key: &UnixSocketAddrKey) -> Option<Arc<MessageQueue>> {
self.message_queues.read().get(addr_key).cloned()
}
pub(self) fn remove_queue(&self, addr_key: &UnixSocketAddrKey) {
let old_queue = self.message_queues.write().remove(addr_key);
debug_assert!(old_queue.is_some());
}
}
pub(in crate::net) const UNIX_DATAGRAM_DEFAULT_BUF_SIZE: usize = 65536;

View File

@ -0,0 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
mod message;
mod socket;
pub(in crate::net) use message::UNIX_DATAGRAM_DEFAULT_BUF_SIZE;
pub use socket::UnixDatagramSocket;

View File

@ -0,0 +1,289 @@
// SPDX-License-Identifier: MPL-2.0
use core::sync::atomic::{AtomicBool, Ordering};
use super::message::{MessageQueue, MessageReceiver};
use crate::{
events::IoEvents,
net::socket::{
options::SocketOption,
private::SocketPrivate,
unix::{ctrl_msg::AuxiliaryData, UnixSocketAddr},
util::{
options::{GetSocketLevelOption, SetSocketLevelOption, SocketOptionSet},
MessageHeader, SendRecvFlags, SockShutdownCmd, SocketAddr,
},
Socket,
},
prelude::*,
process::signal::{PollHandle, Pollable},
util::{MultiRead, MultiWrite},
};
pub struct UnixDatagramSocket {
local_receiver: MessageReceiver,
remote_queue: RwLock<Option<Arc<MessageQueue>>>,
options: RwLock<OptionSet>,
is_nonblocking: AtomicBool,
is_write_shutdown: AtomicBool,
}
#[derive(Clone, Debug)]
struct OptionSet {
socket: SocketOptionSet,
}
impl OptionSet {
pub(self) fn new() -> Self {
Self {
socket: SocketOptionSet::new_unix_datagram(),
}
}
}
impl UnixDatagramSocket {
pub fn new(is_nonblocking: bool) -> Arc<Self> {
Arc::new(Self::new_raw(is_nonblocking))
}
pub fn new_pair(is_nonblocking: bool) -> (Arc<Self>, Arc<Self>) {
let mut socket_a = Self::new_raw(is_nonblocking);
let mut socket_b = Self::new_raw(is_nonblocking);
let remote_queue_a = socket_a.remote_queue.get_mut();
let remote_queue_b = socket_b.remote_queue.get_mut();
*remote_queue_a = Some(socket_b.local_receiver.queue().clone());
*remote_queue_b = Some(socket_a.local_receiver.queue().clone());
(Arc::new(socket_a), Arc::new(socket_b))
}
fn new_raw(is_nonblocking: bool) -> Self {
Self {
local_receiver: MessageReceiver::new(),
remote_queue: RwLock::new(None),
options: RwLock::new(OptionSet::new()),
is_nonblocking: AtomicBool::new(is_nonblocking),
is_write_shutdown: AtomicBool::new(false),
}
}
fn do_send(
&self,
reader: &mut dyn MultiRead,
mut aux_data: AuxiliaryData,
remote: Option<UnixSocketAddr>,
_flags: SendRecvFlags,
) -> Result<usize> {
if self.is_write_shutdown.load(Ordering::Relaxed) {
return_errno_with_message!(Errno::EPIPE, "the socket is shut down for writing");
}
let queue = if let Some(remote_addr) = remote.as_ref() {
let connected_addr = remote_addr.connect()?;
MessageQueue::lookup_bound(&connected_addr)?
} else {
let remote_queue = self.remote_queue.read();
remote_queue.clone().ok_or_else(|| {
Error::with_message(Errno::ENOTCONN, "the socket is not connected")
})?
};
let res = if self.is_nonblocking() {
queue.try_send(reader, &mut aux_data, &self.local_receiver)
} else {
queue.block_send(|| queue.try_send(reader, &mut aux_data, &self.local_receiver))
};
// A connected socket will automatically be disconnected if the remote has been closed.
if remote.is_none() && res.is_err_and(|err| err.error() == Errno::ECONNREFUSED) {
let mut remote_queue = self.remote_queue.write();
// Check to ensure that we are still connected to the same remote.
if remote_queue
.as_ref()
.is_some_and(|remote| Arc::ptr_eq(remote, &queue))
{
*remote_queue = None;
}
}
res
}
fn check_io_events(&self) -> IoEvents {
// POLLOUT should be reported as long as there is space in the socket's send buffer.
// Currently, we only limit the size of the receive buffer, not the send buffer. Therefore,
// POLLOUT is always reported.
let mut io_events = IoEvents::OUT;
io_events |= self.local_receiver.check_io_events();
if self.is_write_shutdown.load(Ordering::Relaxed) && io_events.contains(IoEvents::RDHUP) {
io_events |= IoEvents::HUP;
}
io_events
}
}
impl Pollable for UnixDatagramSocket {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.local_receiver
.pollee()
.poll_with(mask, poller, || self.check_io_events())
}
}
impl SocketPrivate for UnixDatagramSocket {
fn is_nonblocking(&self) -> bool {
self.is_nonblocking.load(Ordering::Relaxed)
}
fn set_nonblocking(&self, nonblocking: bool) {
self.is_nonblocking.store(nonblocking, Ordering::Relaxed);
}
}
impl Socket for UnixDatagramSocket {
fn bind(&self, socket_addr: SocketAddr) -> Result<()> {
let addr = UnixSocketAddr::try_from(socket_addr)?;
self.local_receiver.bind(addr)
}
fn connect(&self, socket_addr: SocketAddr) -> Result<()> {
let remote_addr = UnixSocketAddr::try_from(socket_addr)?;
let connected_addr = remote_addr.connect()?;
let queue = MessageQueue::lookup_bound(&connected_addr)?;
let mut remote_queue = self.remote_queue.write();
*remote_queue = Some(queue);
Ok(())
}
fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
let mut io_events = IoEvents::empty();
if cmd.shut_read() {
self.local_receiver.shutdown();
io_events |= IoEvents::IN | IoEvents::RDHUP | IoEvents::HUP;
}
if cmd.shut_write() {
self.is_write_shutdown.store(true, Ordering::Relaxed);
io_events |= IoEvents::HUP;
}
self.local_receiver.pollee().notify(io_events);
Ok(())
}
fn addr(&self) -> Result<SocketAddr> {
Ok(self.local_receiver.addr().into())
}
fn peer_addr(&self) -> Result<SocketAddr> {
let remote_queue = self.remote_queue.read();
match remote_queue.as_ref() {
Some(queue) => Ok(queue.addr().into()),
None => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
}
}
fn get_option(&self, option: &mut dyn SocketOption) -> Result<()> {
let options = self.options.read();
// Deal with socket-level options
match options.socket.get_option(option, &self.local_receiver) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
}
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(Errno::ENOPROTOOPT, "the socket option to get is unknown")
}
fn set_option(&self, option: &dyn SocketOption) -> Result<()> {
let mut options = self.options.write();
match options.socket.set_option(option, &self.local_receiver) {
Ok(_) => Ok(()),
Err(err) if err.error() == Errno::ENOPROTOOPT => {
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(
Errno::ENOPROTOOPT,
"the socket option to get is unknown"
)
}
Err(e) => Err(e),
}
}
fn sendmsg(
&self,
reader: &mut dyn MultiRead,
message_header: MessageHeader,
flags: SendRecvFlags,
) -> Result<usize> {
// TODO: Deal with flags
if !flags.is_all_supported() {
warn!("unsupported flags: {:?}", flags);
}
let MessageHeader {
addr,
control_messages,
} = message_header;
let remote_addr = match addr {
Some(addr) => Some(addr.try_into()?),
None => None,
};
let auxiliary_data = AuxiliaryData::from_control(control_messages)?;
self.do_send(reader, auxiliary_data, remote_addr, flags)
}
fn recvmsg(
&self,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, MessageHeader)> {
// TODO: Deal with flags
if !flags.is_all_supported() {
warn!("unsupported flags: {:?}", flags);
}
let (received_bytes, control_messages, peer_addr) =
self.block_on(IoEvents::IN, || self.local_receiver.try_recv(writer))?;
let message_header = MessageHeader::new(Some(peer_addr.into()), control_messages);
Ok((received_bytes, message_header))
}
}
impl GetSocketLevelOption for MessageReceiver {
fn is_listening(&self) -> bool {
false
}
}
impl SetSocketLevelOption for MessageReceiver {
fn set_pass_cred(&self, pass_cred: bool) {
// TODO: According to the Linux man pages, "When this option is set and the socket
// is not yet connected, a unique name in the abstract namespace will be generated
// automatically." See <https://man7.org/linux/man-pages/man7/unix.7.html> for
// details.
self.set_pass_cred(pass_cred);
}
}

View File

@ -3,11 +3,14 @@
mod addr;
mod cred;
mod ctrl_msg;
mod datagram;
mod ns;
mod stream;
pub use addr::UnixSocketAddr;
pub use cred::CUserCred;
pub(super) use ctrl_msg::UnixControlMessage;
pub use datagram::UnixDatagramSocket;
pub(super) use datagram::UNIX_DATAGRAM_DEFAULT_BUF_SIZE;
pub use stream::UnixStreamSocket;
pub(super) use stream::UNIX_STREAM_DEFAULT_BUF_SIZE;

View File

@ -14,7 +14,7 @@ use crate::{
AcceptConn, KeepAlive, Linger, PassCred, PeerCred, PeerGroups, Priority, RecvBuf,
RecvBufForce, ReuseAddr, ReusePort, SendBuf, SendBufForce, SocketOption,
},
unix::{CUserCred, UNIX_STREAM_DEFAULT_BUF_SIZE},
unix::{CUserCred, UNIX_DATAGRAM_DEFAULT_BUF_SIZE, UNIX_STREAM_DEFAULT_BUF_SIZE},
},
prelude::*,
process::{credentials::capabilities::CapSet, posix_thread::AsPosixThread},
@ -77,6 +77,15 @@ impl SocketOptionSet {
}
}
/// Returns the default socket level options for unix datagram socket.
pub(in crate::net) fn new_unix_datagram() -> Self {
Self {
send_buf: UNIX_DATAGRAM_DEFAULT_BUF_SIZE as u32,
recv_buf: UNIX_DATAGRAM_DEFAULT_BUF_SIZE as u32,
..Default::default()
}
}
/// Gets socket-level options.
///
/// Note that the socket error has to be handled separately, because it is automatically

View File

@ -8,7 +8,7 @@ use crate::{
netlink::{
is_valid_protocol, NetlinkRouteSocket, NetlinkUeventSocket, StandardNetlinkProtocol,
},
unix::UnixStreamSocket,
unix::{UnixDatagramSocket, UnixStreamSocket},
vsock::VsockStreamSocket,
},
prelude::*,
@ -32,6 +32,9 @@ pub fn sys_socket(domain: i32, type_: i32, protocol: i32, ctx: &Context) -> Resu
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_SEQPACKET) => {
UnixStreamSocket::new(is_nonblocking, true) as Arc<dyn FileLike>
}
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_RAW | SockType::SOCK_DGRAM) => {
UnixDatagramSocket::new(is_nonblocking) as Arc<dyn FileLike>
}
(CSocketAddrFamily::AF_INET, SockType::SOCK_STREAM) => {
let protocol = Protocol::try_from(protocol)?;
debug!("protocol = {:?}", protocol);

View File

@ -2,8 +2,11 @@
use super::SyscallReturn;
use crate::{
fs::file_table::{FdFlags, FileDesc},
net::socket::unix::UnixStreamSocket,
fs::{
file_handle::FileLike,
file_table::{FdFlags, FileDesc},
},
net::socket::unix::{UnixDatagramSocket, UnixStreamSocket},
prelude::*,
util::net::{CSocketAddrFamily, Protocol, SockFlags, SockType, SOCK_TYPE_MASK},
};
@ -24,18 +27,27 @@ pub fn sys_socketpair(
domain, sock_type, sock_flags, protocol
);
// TODO: deal with all sock_flags and protocol
macro_rules! file_pair {
($expr:expr) => {{
let (socket_a, socket_b) = $expr;
(socket_a as Arc<dyn FileLike>, socket_b as Arc<dyn FileLike>)
}};
}
let nonblocking = sock_flags.contains(SockFlags::SOCK_NONBLOCK);
let (socket_a, socket_b) = match (domain, sock_type) {
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_STREAM) => {
UnixStreamSocket::new_pair(nonblocking, false)
file_pair!(UnixStreamSocket::new_pair(nonblocking, false))
}
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_SEQPACKET) => {
UnixStreamSocket::new_pair(nonblocking, true)
file_pair!(UnixStreamSocket::new_pair(nonblocking, true))
}
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_RAW | SockType::SOCK_DGRAM) => {
file_pair!(UnixDatagramSocket::new_pair(nonblocking))
}
_ => return_errno_with_message!(
Errno::EAFNOSUPPORT,
"cannot create socket pair for this family"
"creating a socket pair for this family is not supported"
),
};

View File

@ -0,0 +1,483 @@
// SPDX-License-Identifier: MPL-2.0
#define _GNU_SOURCE
#include <unistd.h>
#include <stddef.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include "../test.h"
static int sk_unbound;
static int sk_bound;
static int sk_connected;
#define UNIX_ADDR(path) \
((struct sockaddr_un){ .sun_family = AF_UNIX, .sun_path = path })
#define PATH_OFFSET offsetof(struct sockaddr_un, sun_path)
#define UNNAMED_ADDR UNIX_ADDR("")
#define UNNAMED_ADDRLEN PATH_OFFSET
#define BOUND_ADDR UNIX_ADDR("//tmp/B0")
#define BOUND_ADDRLEN (PATH_OFFSET + 9)
FN_SETUP(unbound)
{
sk_unbound = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0));
}
END_SETUP()
FN_SETUP(bound)
{
sk_bound = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0));
CHECK(bind(sk_bound, (struct sockaddr *)&BOUND_ADDR, BOUND_ADDRLEN));
}
END_SETUP()
FN_SETUP(connected)
{
sk_connected = CHECK(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0));
CHECK(connect(sk_connected, (struct sockaddr *)&BOUND_ADDR,
BOUND_ADDRLEN));
}
END_SETUP()
FN_TEST(getsockname)
{
struct sockaddr_un addr;
socklen_t addrlen;
addrlen = sizeof(addr);
TEST_RES(getsockname(sk_unbound, (struct sockaddr *)&addr, &addrlen),
addrlen == UNNAMED_ADDRLEN &&
memcmp(&addr, &UNNAMED_ADDR, UNNAMED_ADDRLEN) == 0);
addrlen = sizeof(addr);
TEST_RES(getsockname(sk_bound, (struct sockaddr *)&addr, &addrlen),
addrlen == BOUND_ADDRLEN &&
memcmp(&addr, &BOUND_ADDR, BOUND_ADDRLEN) == 0);
addrlen = sizeof(addr);
TEST_RES(getsockname(sk_connected, (struct sockaddr *)&addr, &addrlen),
addrlen == UNNAMED_ADDRLEN &&
memcmp(&addr, &UNNAMED_ADDR, UNNAMED_ADDRLEN) == 0);
}
END_TEST()
FN_TEST(getpeername)
{
struct sockaddr_un addr;
socklen_t addrlen;
addrlen = sizeof(addr);
TEST_ERRNO(getpeername(sk_unbound, (struct sockaddr *)&addr, &addrlen),
ENOTCONN);
addrlen = sizeof(addr);
TEST_ERRNO(getpeername(sk_bound, (struct sockaddr *)&addr, &addrlen),
ENOTCONN);
addrlen = sizeof(addr);
TEST_RES(getpeername(sk_connected, (struct sockaddr *)&addr, &addrlen),
addrlen == BOUND_ADDRLEN &&
memcmp(&addr, &BOUND_ADDR, BOUND_ADDRLEN) == 0);
}
END_TEST()
FN_TEST(bind)
{
TEST_ERRNO(bind(sk_bound, (struct sockaddr *)&UNIX_ADDR("\0Z"),
PATH_OFFSET + 1),
EINVAL);
TEST_SUCC(bind(sk_bound, (struct sockaddr *)&UNNAMED_ADDR,
UNNAMED_ADDRLEN));
}
END_TEST()
FN_TEST(bind_connected)
{
int fildes[2], sk;
struct sockaddr_un addr;
socklen_t addrlen;
TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes));
sk = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0));
TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2));
addrlen = sizeof(addr);
TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen),
addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"),
PATH_OFFSET + 2) == 0);
TEST_SUCC(bind(fildes[1], (struct sockaddr *)&UNIX_ADDR("\0Y"),
PATH_OFFSET + 2));
addrlen = sizeof(addr);
TEST_RES(getpeername(fildes[0], (struct sockaddr *)&addr, &addrlen),
addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0Y"),
PATH_OFFSET + 2) == 0);
TEST_ERRNO(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0Z"),
PATH_OFFSET + 2),
EINVAL);
TEST_ERRNO(bind(fildes[1], (struct sockaddr *)&UNIX_ADDR("\0Z"),
PATH_OFFSET + 2),
EINVAL);
TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNNAMED_ADDR,
UNNAMED_ADDRLEN));
TEST_SUCC(bind(fildes[1], (struct sockaddr *)&UNNAMED_ADDR,
UNNAMED_ADDRLEN));
// Closing the socket will release the bound address.
// So another socket can bind to it again.
TEST_ERRNO(bind(sk, (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2),
EADDRINUSE);
TEST_SUCC(close(fildes[0]));
TEST_SUCC(bind(sk, (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2));
// But the released address is still "visible" from
// the previously connected socket.
addrlen = sizeof(addr);
TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen),
addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"),
PATH_OFFSET + 2) == 0);
TEST_SUCC(close(fildes[1]));
TEST_SUCC(close(sk));
}
END_TEST()
FN_TEST(connect)
{
TEST_ERRNO(connect(sk_unbound, (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2),
ECONNREFUSED);
TEST_ERRNO(connect(sk_bound, (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2),
ECONNREFUSED);
TEST_SUCC(connect(sk_connected, (struct sockaddr *)&BOUND_ADDR,
BOUND_ADDRLEN));
TEST_ERRNO(connect(sk_connected, (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2),
ECONNREFUSED);
}
END_TEST()
FN_TEST(listen)
{
TEST_ERRNO(listen(sk_unbound, 10), EOPNOTSUPP);
TEST_ERRNO(listen(sk_bound, 10), EOPNOTSUPP);
TEST_ERRNO(listen(sk_connected, 10), EOPNOTSUPP);
}
END_TEST()
FN_TEST(accept)
{
TEST_ERRNO(accept(sk_unbound, NULL, NULL), EOPNOTSUPP);
TEST_ERRNO(accept(sk_bound, NULL, NULL), EOPNOTSUPP);
TEST_ERRNO(accept(sk_connected, NULL, NULL), EOPNOTSUPP);
}
END_TEST()
FN_TEST(send)
{
char buf[1] = { 'z' };
TEST_ERRNO(send(sk_unbound, buf, 1, 0), ENOTCONN);
TEST_ERRNO(send(sk_unbound, buf, 0, 0), ENOTCONN);
TEST_ERRNO(write(sk_unbound, buf, 1), ENOTCONN);
TEST_ERRNO(write(sk_unbound, buf, 0), ENOTCONN);
TEST_ERRNO(send(sk_bound, buf, 1, 0), ENOTCONN);
TEST_ERRNO(send(sk_bound, buf, 0, 0), ENOTCONN);
TEST_ERRNO(write(sk_bound, buf, 1), ENOTCONN);
TEST_ERRNO(write(sk_bound, buf, 0), ENOTCONN);
}
END_TEST()
FN_TEST(recv)
{
char buf[1] = { 'z' };
TEST_ERRNO(recv(sk_unbound, buf, 1, 0), EAGAIN);
TEST_ERRNO(recv(sk_unbound, buf, 0, 0), EAGAIN);
TEST_ERRNO(read(sk_unbound, buf, 1), EAGAIN);
TEST_SUCC(read(sk_unbound, buf, 0));
TEST_ERRNO(recv(sk_bound, buf, 1, 0), EAGAIN);
TEST_ERRNO(recv(sk_bound, buf, 0, 0), EAGAIN);
TEST_ERRNO(read(sk_bound, buf, 1), EAGAIN);
TEST_SUCC(read(sk_bound, buf, 0));
}
END_TEST()
FN_TEST(blocking_recv)
{
int i;
int sk1, sk2;
int pid;
char buf[20];
// Setup
sk1 = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0));
TEST_SUCC(bind(sk1, (struct sockaddr *)&UNIX_ADDR("\0"),
PATH_OFFSET + 1));
sk2 = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0));
TEST_SUCC(connect(sk2, (struct sockaddr *)&UNIX_ADDR("\0"),
PATH_OFFSET + 1));
#define MAKE_TEST(child, retval) \
pid = TEST_SUCC(fork()); \
if (pid == 0) { \
usleep(300 * 1000); \
CHECK(child); \
exit(0); \
} \
\
TEST_RES(recv(sk1, buf, sizeof(buf), 0), _ret == retval); \
TEST_SUCC(wait(NULL));
// Test 1: Sends a message resumes the blocked receiving
MAKE_TEST(send(sk2, "hello", 5, 0), 5);
// Test 2: Shuts down for reading resumes the blocked receiving
MAKE_TEST(shutdown(sk1, SHUT_RD), 0);
#undef MAKE_TEST
// Clean up
TEST_SUCC(close(sk1));
TEST_SUCC(close(sk2));
}
END_TEST()
FN_TEST(send_recv_trunc)
{
char buf[1];
TEST_SUCC(send(sk_connected, "abc", 3, 0));
TEST_SUCC(send(sk_connected, "def", 3, 0));
TEST_SUCC(send(sk_connected, "hij", 3, 0));
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'a');
TEST_RES(recv(sk_bound, buf, 0, 0), _ret == 0);
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'h');
}
END_TEST()
FN_TEST(send_recv_zero)
{
char buf[1];
buf[0] = 'a';
TEST_SUCC(send(sk_connected, buf, 1, 0));
buf[0] = 'b';
TEST_SUCC(send(sk_connected, buf, 0, 0));
buf[0] = 'c';
TEST_SUCC(send(sk_connected, buf, 0, 0));
buf[0] = 'd';
TEST_SUCC(send(sk_connected, buf, 1, 0));
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'a');
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 0 && buf[0] == 'a');
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 0 && buf[0] == 'a');
TEST_RES(recv(sk_bound, buf, 1, 0), _ret == 1 && buf[0] == 'd');
}
END_TEST()
FN_TEST(shutdown_connected)
{
int fildes[2];
TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes));
TEST_SUCC(shutdown(fildes[0], SHUT_RD));
TEST_SUCC(shutdown(fildes[0], SHUT_WR));
TEST_SUCC(shutdown(fildes[0], SHUT_RDWR));
TEST_SUCC(shutdown(fildes[0], SHUT_RD));
TEST_SUCC(shutdown(fildes[0], SHUT_WR));
TEST_SUCC(shutdown(fildes[0], SHUT_RDWR));
TEST_SUCC(close(fildes[0]));
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(shutdown_close_send)
{
int fildes[2];
struct sockaddr_un addr;
socklen_t addrlen;
TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes));
TEST_SUCC(bind(fildes[0], (struct sockaddr *)&UNIX_ADDR("\0X"),
PATH_OFFSET + 2));
// Test 1: Sending a message after shutting down the receiver.
TEST_SUCC(shutdown(fildes[0], SHUT_RDWR));
TEST_ERRNO(send(fildes[1], "", 0, 0), EPIPE);
// The socket is still connected.
addrlen = sizeof(addr);
TEST_RES(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen),
addrlen == PATH_OFFSET + 2 && memcmp(&addr, &UNIX_ADDR("\0X"),
PATH_OFFSET + 2) == 0);
// Test 2: Sending a message after closing the receiver.
TEST_SUCC(close(fildes[0]));
TEST_ERRNO(send(fildes[1], "", 0, 0), ECONNREFUSED);
// The socket is no longer connected.
TEST_ERRNO(send(fildes[1], "", 0, 0), ENOTCONN);
TEST_ERRNO(getpeername(fildes[1], (struct sockaddr *)&addr, &addrlen),
ENOTCONN);
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(poll)
{
int sk;
struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP };
sk = TEST_SUCC(socket(PF_UNIX, SOCK_DGRAM, 0));
pfd.fd = sk;
TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT);
TEST_SUCC(shutdown(sk, SHUT_WR));
TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT);
TEST_SUCC(shutdown(sk, SHUT_RD));
TEST_RES(poll(&pfd, 1, 0),
pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP));
TEST_SUCC(
bind(sk, (struct sockaddr *)&UNIX_ADDR("\0"), PATH_OFFSET + 1));
TEST_RES(poll(&pfd, 1, 0),
pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP));
TEST_SUCC(connect(sk, (struct sockaddr *)&BOUND_ADDR, BOUND_ADDRLEN));
TEST_RES(poll(&pfd, 1, 0),
pfd.revents == (POLLIN | POLLOUT | POLLRDHUP | POLLHUP));
TEST_SUCC(close(sk));
}
END_TEST()
FN_TEST(poll_connected_close)
{
int fildes[2];
struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP };
TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes));
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT);
TEST_SUCC(close(fildes[0]));
pfd.fd = fildes[1];
TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT);
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_TEST(poll_connected_shutdown)
{
int fildes[2];
struct pollfd pfd = { .events = POLLIN | POLLOUT | POLLRDHUP };
#define MAKE_TEST(shut, ev1) \
TEST_SUCC(socketpair(PF_UNIX, SOCK_DGRAM, 0, fildes)); \
\
TEST_SUCC(shutdown(fildes[0], shut)); \
\
pfd.fd = fildes[0]; \
TEST_RES(poll(&pfd, 1, 0), pfd.revents == (ev1)); \
\
pfd.fd = fildes[1]; \
TEST_RES(poll(&pfd, 1, 0), pfd.revents == POLLOUT); \
\
TEST_SUCC(close(fildes[0])); \
TEST_SUCC(close(fildes[1]));
MAKE_TEST(SHUT_RD, POLLIN | POLLOUT | POLLRDHUP);
MAKE_TEST(SHUT_WR, POLLOUT);
MAKE_TEST(SHUT_RDWR, POLLIN | POLLOUT | POLLRDHUP | POLLHUP);
#undef MAKE_TEST
}
END_TEST()
// See also `zero_reads_always_succeed` in `pipe_err.c`
FN_TEST(zero_recvs_may_fail)
{
int fildes[2];
char buf[1] = { 'z' };
TEST_SUCC(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, fildes));
TEST_ERRNO(recv(fildes[0], buf, 0, 0), EAGAIN);
TEST_RES(send(fildes[1], buf, 1, 0), _ret == 1);
TEST_SUCC(recv(fildes[0], buf, 0, 0));
TEST_SUCC(close(fildes[0]));
TEST_SUCC(close(fildes[1]));
}
END_TEST()
// See also `zero_writes_always_succeed` in `pipe_err.c`
FN_TEST(zero_sends_may_fail)
{
int fildes[2];
char buf[1] = { 'z' };
TEST_SUCC(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, fildes));
TEST_SUCC(send(fildes[1], buf, 0, 0));
TEST_SUCC(close(fildes[0]));
TEST_ERRNO(send(fildes[1], buf, 0, 0), ECONNREFUSED);
TEST_SUCC(close(fildes[1]));
}
END_TEST()
FN_SETUP(cleanup)
{
CHECK(close(sk_unbound));
CHECK(close(sk_bound));
CHECK(close(sk_connected));
CHECK(unlink(BOUND_ADDR.sun_path));
}
END_SETUP()

View File

@ -36,6 +36,7 @@ sleep 0.2
./udp_err
./unix_stream_err
./unix_seqpacket_err
./unix_datagram_err
./netlink_route
./rtnl_err

View File

@ -51,9 +51,12 @@ TESTS ?= \
sigaltstack_test \
signalfd_test \
socket_netlink_route_test \
socket_unix_dgram_local_test \
socket_unix_dgram_non_blocking_test \
socket_unix_pair_test \
socket_unix_seqpacket_local_test \
socket_unix_stream_test \
socket_unix_unbound_dgram_test \
socket_unix_unbound_seqpacket_test \
socket_unix_unbound_stream_test \
stat_test \

View File

@ -0,0 +1,13 @@
# TODO: Support `MSG_DONTWAIT` and `MSG_PEEK`
DgramUnixSockets/NonStreamSocketPairTest.SplitRecv/*
DgramUnixSockets/NonStreamSocketPairTest.SinglePeek/*
DgramUnixSockets/NonStreamSocketPairTest.RecvmsgTruncPeekDontwaitZeroLen/*
# TODO: Support `SO_SNDTIMEO`
DgramUnixSockets/UnixNonStreamSocketPairTest.SendTimeout/*
# TODO: Support `MSG_TRUNC`
DgramUnixSockets/NonStreamSocketPairTest.MsgTruncTruncation/*
DgramUnixSockets/NonStreamSocketPairTest.MsgTruncTruncationRecvmsgMsghdrFlagMsgTrunc/*
DgramUnixSockets/NonStreamSocketPairTest.RecvmsgMsgTruncZeroLen/*
DgramUnixSockets/NonStreamSocketPairTest.RecvmsgMsgTruncMsgPeekZeroLen/*

View File

@ -1,9 +1,3 @@
# TODO: Support `SOCK_DGRAM` sockets
AllUnixDomainSockets/*/2
AllUnixDomainSockets/*/3
AllUnixDomainSockets/*/8
AllUnixDomainSockets/*/9
# TODO: Support the `recvmmsg` system call
AllUnixDomainSockets/UnixSocketPairTest.RecvmmsgTimeoutAfterRecv/*