Reorder methods and fix minor issues

This commit is contained in:
Ruihan Li 2025-09-06 15:56:52 +08:00 committed by Tate, Hongliang Tian
parent 75ca7e0377
commit 286d4d4466
6 changed files with 102 additions and 102 deletions

View File

@ -29,6 +29,15 @@ mod bound;
pub(super) mod observer;
mod unbound;
pub struct DatagramSocket {
// Lock order: `inner` first, `options` second
inner: RwMutex<Inner<UnboundDatagram, BoundDatagram>>,
options: RwLock<OptionSet>,
is_nonblocking: AtomicBool,
pollee: Pollee,
}
#[derive(Debug, Clone)]
struct OptionSet {
socket: SocketOptionSet,
@ -42,15 +51,6 @@ impl OptionSet {
}
}
pub struct DatagramSocket {
// Lock order: `inner` first, `options` second
inner: RwMutex<Inner<UnboundDatagram, BoundDatagram>>,
options: RwLock<OptionSet>,
is_nonblocking: AtomicBool,
pollee: Pollee,
}
impl DatagramSocket {
pub fn new(is_nonblocking: bool) -> Arc<Self> {
let unbound_datagram = UnboundDatagram::new();

View File

@ -54,7 +54,9 @@ mod util;
pub struct StreamSocket {
// Lock order: `state` first, `options` second
state: RwLock<Takeable<State>, PreemptDisabled>,
// FIXME: We perform userspace reads/writes when holding the spin locks (e.g., this state lock
// and other locks in `aster-bigtcp`), which will break the atomic mode.
state: RwLock<Takeable<State>>,
options: RwLock<OptionSet>,
is_nonblocking: AtomicBool,

View File

@ -118,7 +118,7 @@ pub trait Socket: private::SocketPrivate + Send + Sync {
/// and the message header.
fn recvmsg(
&self,
writers: &mut dyn MultiWrite,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, MessageHeader)>;
}

View File

@ -153,10 +153,10 @@ where
fn recvmsg(
&self,
writers: &mut dyn MultiWrite,
writer: &mut dyn MultiWrite,
flags: SendRecvFlags,
) -> Result<(usize, MessageHeader)> {
let (received_len, addr) = self.block_on(IoEvents::IN, || self.try_recv(writers, flags))?;
let (received_len, addr) = self.block_on(IoEvents::IN, || self.try_recv(writer, flags))?;
// TODO: Receive control message

View File

@ -101,7 +101,7 @@ impl Drop for Listener {
fn drop(&mut self) {
self.backlog.shutdown();
unregister_backlog(&self.backlog.addr().to_key())
BACKLOG_TABLE.remove_backlog(&self.backlog.addr().to_key());
}
}
@ -141,17 +141,19 @@ impl BacklogTable {
is_shutdown,
is_seqpacket,
));
backlog_sockets.insert(addr_key, new_backlog.clone());
let old_backlog = backlog_sockets.insert(addr_key, new_backlog.clone());
debug_assert!(old_backlog.is_none());
Some(new_backlog)
}
fn get_backlog(&self, addr: &UnixSocketAddrKey) -> Option<Arc<Backlog>> {
self.backlog_sockets.read().get(addr).cloned()
fn get_backlog(&self, addr_key: &UnixSocketAddrKey) -> Option<Arc<Backlog>> {
self.backlog_sockets.read().get(addr_key).cloned()
}
fn remove_backlog(&self, addr_key: &UnixSocketAddrKey) {
self.backlog_sockets.write().remove(addr_key);
let old_backlog = self.backlog_sockets.write().remove(addr_key);
debug_assert!(old_backlog.is_some());
}
}
@ -160,7 +162,7 @@ pub(super) struct Backlog {
pollee: Pollee,
backlog: AtomicUsize,
incoming_conns: SpinLock<Option<VecDeque<Connected>>>,
wait_queue: WaitQueue,
connect_wait_queue: WaitQueue,
listener_cred: SocketCred<ReadDupOp>,
is_seqpacket: bool,
}
@ -184,7 +186,7 @@ impl Backlog {
pollee,
backlog: AtomicUsize::new(backlog),
incoming_conns: SpinLock::new(incoming_sockets),
wait_queue: WaitQueue::new(),
connect_wait_queue: WaitQueue::new(),
listener_cred: SocketCred::<ReadDupOp>::new_current(),
is_seqpacket,
}
@ -206,7 +208,7 @@ impl Backlog {
if conn.is_some() {
self.pollee.invalidate();
self.wait_queue.wake_one();
self.connect_wait_queue.wake_one();
}
conn.ok_or_else(|| Error::with_message(Errno::EAGAIN, "no pending connection is available"))
@ -216,7 +218,7 @@ impl Backlog {
let old_backlog = self.backlog.swap(backlog, Ordering::Relaxed);
if old_backlog < backlog {
self.wait_queue.wake_all();
self.connect_wait_queue.wake_all();
}
}
@ -224,7 +226,7 @@ impl Backlog {
*self.incoming_conns.lock() = None;
self.pollee.notify(SHUT_READ_EVENTS);
self.wait_queue.wake_all();
self.connect_wait_queue.wake_all();
}
fn is_shutdown(&self) -> bool {
@ -301,21 +303,19 @@ impl Backlog {
Ok(client_conn)
}
pub(super) fn pause_until<F>(&self, mut cond: F) -> Result<()>
/// Blocks until the backlogs are free and the `try_connect` succeeds, or until interrupted.
pub(super) fn block_connect<F>(&self, mut try_connect: F) -> Result<()>
where
F: FnMut() -> Result<()>,
{
self.wait_queue.pause_until(|| match cond() {
Err(err) if err.error() == Errno::EAGAIN => None,
result => Some(result),
})?
self.connect_wait_queue
.pause_until(|| match try_connect() {
Err(err) if err.error() == Errno::EAGAIN => None,
result => Some(result),
})?
}
}
fn unregister_backlog(addr: &UnixSocketAddrKey) {
BACKLOG_TABLE.remove_backlog(addr);
}
pub(super) fn get_backlog(server_key: &UnixSocketAddrKey) -> Result<Arc<Backlog>> {
BACKLOG_TABLE.get_backlog(server_key).ok_or_else(|| {
Error::with_message(

View File

@ -35,7 +35,7 @@ use crate::{
pub struct UnixStreamSocket {
// Lock order: `state` first, `options` second
state: RwMutex<Takeable<State>>,
options: RwMutex<OptionSet>,
options: RwLock<OptionSet>,
pollee: Pollee,
is_nonblocking: AtomicBool,
@ -43,34 +43,6 @@ pub struct UnixStreamSocket {
is_seqpacket: bool,
}
impl UnixStreamSocket {
pub(super) fn new_init(init: Init, is_nonblocking: bool, is_seqpacket: bool) -> Arc<Self> {
Arc::new(Self {
state: RwMutex::new(Takeable::new(State::Init(init))),
options: RwMutex::new(OptionSet::new()),
pollee: Pollee::new(),
is_nonblocking: AtomicBool::new(is_nonblocking),
is_seqpacket,
})
}
pub(super) fn new_connected(
connected: Connected,
options: OptionSet,
is_nonblocking: bool,
is_seqpacket: bool,
) -> Arc<Self> {
let cloned_pollee = connected.cloned_pollee();
Arc::new(Self {
state: RwMutex::new(Takeable::new(State::Connected(connected))),
options: RwMutex::new(options),
pollee: cloned_pollee,
is_nonblocking: AtomicBool::new(is_nonblocking),
is_seqpacket,
})
}
}
enum State {
Init(Init),
Listen(Listener),
@ -164,6 +136,16 @@ impl UnixStreamSocket {
Self::new_init(Init::new(), is_nonblocking, is_seqpacket)
}
fn new_init(init: Init, is_nonblocking: bool, is_seqpacket: bool) -> Arc<Self> {
Arc::new(Self {
state: RwMutex::new(Takeable::new(State::Init(init))),
options: RwLock::new(OptionSet::new()),
pollee: Pollee::new(),
is_nonblocking: AtomicBool::new(is_nonblocking),
is_seqpacket,
})
}
pub fn new_pair(is_nonblocking: bool, is_seqpacket: bool) -> (Arc<Self>, Arc<Self>) {
let cred = SocketCred::<ReadDupOp>::new_current();
let options = OptionSet::new();
@ -183,6 +165,22 @@ impl UnixStreamSocket {
)
}
pub(super) fn new_connected(
connected: Connected,
options: OptionSet,
is_nonblocking: bool,
is_seqpacket: bool,
) -> Arc<Self> {
let cloned_pollee = connected.cloned_pollee();
Arc::new(Self {
state: RwMutex::new(Takeable::new(State::Connected(connected))),
options: RwLock::new(options),
pollee: cloned_pollee,
is_nonblocking: AtomicBool::new(is_nonblocking),
is_seqpacket,
})
}
fn try_send(
&self,
buf: &mut dyn MultiRead,
@ -303,7 +301,7 @@ impl Socket for UnixStreamSocket {
if self.is_nonblocking() {
self.try_connect(&backlog)
} else {
backlog.pause_until(|| self.try_connect(&backlog))
backlog.block_connect(|| self.try_connect(&backlog))
}
}
@ -381,6 +379,46 @@ impl Socket for UnixStreamSocket {
Ok(peer_addr.into())
}
fn get_option(&self, option: &mut dyn SocketOption) -> Result<()> {
let state = self.state.read();
let options = self.options.read();
// Deal with UNIX-socket-specific socket-level options
match do_unix_getsockopt(option, state.as_ref()) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
}
// Deal with socket-level options
match options.socket.get_option(option, state.as_ref()) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
}
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(Errno::ENOPROTOOPT, "the socket option to get is unknown")
}
fn set_option(&self, option: &dyn SocketOption) -> Result<()> {
let state = self.state.read();
let mut options = self.options.write();
match options.socket.set_option(option, state.as_ref()) {
Ok(_) => Ok(()),
Err(err) if err.error() == Errno::ENOPROTOOPT => {
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(
Errno::ENOPROTOOPT,
"the socket option to get is unknown"
)
}
Err(e) => Err(e),
}
}
fn sendmsg(
&self,
reader: &mut dyn MultiRead,
@ -436,46 +474,6 @@ impl Socket for UnixStreamSocket {
Ok((received_bytes, message_header))
}
fn get_option(&self, option: &mut dyn SocketOption) -> Result<()> {
let state = self.state.read();
let options = self.options.read();
// Deal with UNIX-socket-specific socket-level options
match do_unix_getsockopt(option, state.as_ref()) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
}
// Deal with socket-level options
match options.socket.get_option(option, state.as_ref()) {
Err(err) if err.error() == Errno::ENOPROTOOPT => (),
res => return res,
}
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(Errno::ENOPROTOOPT, "the socket option to get is unknown")
}
fn set_option(&self, option: &dyn SocketOption) -> Result<()> {
let mut state = self.state.write();
let mut options = self.options.write();
match options.socket.set_option(option, state.as_mut()) {
Ok(_) => Ok(()),
Err(err) if err.error() == Errno::ENOPROTOOPT => {
// TODO: Deal with socket options from other levels
warn!("only socket-level options are supported");
return_errno_with_message!(
Errno::ENOPROTOOPT,
"the socket option to get is unknown"
)
}
Err(e) => Err(e),
}
}
}
fn do_unix_getsockopt(option: &mut dyn SocketOption, state: &State) -> Result<()> {