Refactor the implementation of signalfd

This commit is contained in:
jiangjianfeng 2025-10-28 03:54:50 +00:00 committed by Tate, Hongliang Tian
parent d2b88f48ff
commit 6e8dac0c36
5 changed files with 65 additions and 158 deletions

View File

@ -6,17 +6,14 @@ use aster_rights::{ReadDupOp, ReadOp, WriteOp};
use ostd::sync::{RoArc, RwMutexReadGuard, Waker};
use super::{
signal::{
sig_mask::AtomicSigMask, sig_num::SigNum, sig_queues::SigQueues, signals::Signal,
SigEvents, SigEventsFilter,
},
signal::{sig_mask::AtomicSigMask, sig_num::SigNum, sig_queues::SigQueues, signals::Signal},
Credentials, Process,
};
use crate::{
events::Observer,
events::IoEvents,
fs::{file_table::FileTable, thread_info::ThreadFsInfo},
prelude::*,
process::{namespace::nsproxy::NsProxy, Pid},
process::{namespace::nsproxy::NsProxy, signal::PollHandle, Pid},
thread::{Thread, Tid},
time::{clocks::ProfClock, Timer, TimerManager},
};
@ -179,6 +176,13 @@ impl PosixThread {
self.wake_signalled_waker();
}
pub fn register_signalfd_poller(&self, poller: &mut PollHandle, mask: IoEvents) {
self.sig_queues.register_signalfd_poller(poller, mask);
self.process()
.sig_queues()
.register_signalfd_poller(poller, mask);
}
/// Returns a reference to the profiling clock of the current thread.
pub fn prof_clock(&self) -> &Arc<ProfClock> {
&self.prof_clock
@ -206,18 +210,6 @@ impl PosixThread {
self.prof_timer_manager.process_expired_timers();
}
pub fn register_sigqueue_observer(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
) {
self.sig_queues.register_observer(observer, filter);
}
pub fn unregister_sigqueue_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.sig_queues.unregister_observer(observer);
}
/// Gets the read-only credentials of the thread.
pub fn credentials(&self) -> Credentials<ReadOp> {
self.credentials.dup().restrict()

View File

@ -1,37 +0,0 @@
// SPDX-License-Identifier: MPL-2.0
use super::{sig_mask::SigMask, sig_num::SigNum};
use crate::{
events::{Events, EventsFilter},
prelude::*,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SigEvents(SigNum);
impl SigEvents {
pub fn new(sig_num: SigNum) -> Self {
Self(sig_num)
}
pub fn sig_num(&self) -> SigNum {
self.0
}
}
impl Events for SigEvents {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SigEventsFilter(SigMask);
impl SigEventsFilter {
pub fn new(mask: SigMask) -> Self {
Self(mask)
}
}
impl EventsFilter<SigEvents> for SigEventsFilter {
fn filter(&self, event: &SigEvents) -> bool {
!self.0.contains(event.0)
}
}

View File

@ -2,7 +2,6 @@
pub mod c_types;
pub mod constants;
mod events;
mod pause;
mod pending;
mod poll;
@ -19,7 +18,6 @@ use core::sync::atomic::Ordering;
use align_ext::AlignExt;
use c_types::{siginfo_t, ucontext_t};
use constants::SIGSEGV;
pub use events::{SigEvents, SigEventsFilter};
use ostd::{
arch::cpu::context::{FpuContext, UserContext},
user::UserContextApi,

View File

@ -7,12 +7,11 @@ use super::{
sig_mask::{SigMask, SigSet},
sig_num::SigNum,
signals::Signal,
SigEvents, SigEventsFilter,
};
use crate::{
events::{Observer, Subject},
events::IoEvents,
prelude::*,
process::signal::sig_disposition::SigDispositions,
process::signal::{sig_disposition::SigDispositions, PollHandle, Pollee},
};
pub struct SigQueues {
@ -20,7 +19,7 @@ pub struct SigQueues {
// Useful for quickly determining if any signals are pending without locking `queues`.
count: AtomicUsize,
queues: Mutex<Queues>,
subject: Subject<SigEvents, SigEventsFilter>,
signalfd_pollee: Pollee,
}
impl SigQueues {
@ -28,7 +27,7 @@ impl SigQueues {
Self {
count: AtomicUsize::new(0),
queues: Mutex::new(Queues::new()),
subject: Subject::new(),
signalfd_pollee: Pollee::new(),
}
}
@ -37,14 +36,12 @@ impl SigQueues {
}
pub fn enqueue(&self, signal: Box<dyn Signal>) {
let signum = signal.num();
let mut queues = self.queues.lock();
if queues.enqueue(signal) {
self.count.fetch_add(1, Ordering::Relaxed);
// Avoid holding lock when notifying observers
drop(queues);
self.subject.notify_observers(&SigEvents::new(signum));
self.signalfd_pollee.notify(IoEvents::IN);
}
}
@ -86,16 +83,12 @@ impl SigQueues {
self.queues.lock().has_pending_signal(signum)
}
pub fn register_observer(
pub(in crate::process) fn register_signalfd_poller(
&self,
observer: Weak<dyn Observer<SigEvents>>,
filter: SigEventsFilter,
poller: &mut PollHandle,
mask: IoEvents,
) {
self.subject.register_observer(observer, filter);
}
pub fn unregister_observer(&self, observer: &Weak<dyn Observer<SigEvents>>) {
self.subject.unregister_observer(observer);
self.signalfd_pollee.register_poller(poller, mask);
}
}

View File

@ -12,7 +12,7 @@ use bitflags::bitflags;
use super::SyscallReturn;
use crate::{
events::{IoEvents, Observer},
events::IoEvents,
fs::{
file_handle::FileLike,
file_table::{get_file_fast, FdFlags, FileDesc},
@ -20,12 +20,12 @@ use crate::{
},
prelude::*,
process::{
posix_thread::AsPosixThread,
posix_thread::{AsPosixThread, PosixThread},
signal::{
constants::{SIGKILL, SIGSTOP},
sig_mask::{AtomicSigMask, SigMask},
signals::Signal,
PollHandle, Pollable, Pollee, SigEvents, SigEventsFilter,
HandlePendingSignal, PollHandle, Pollable, Poller,
},
Gid, Uid,
},
@ -89,10 +89,10 @@ fn create_new_signalfd(
non_blocking: bool,
fd_flags: FdFlags,
) -> Result<FileDesc> {
let atomic_mask = AtomicSigMask::new(mask);
let signal_file = SignalFile::new(atomic_mask, non_blocking);
register_observer(ctx, &signal_file, mask)?;
let signal_file = {
let atomic_mask = AtomicSigMask::new(mask);
Arc::new(SignalFile::new(atomic_mask, non_blocking))
};
let file_table = ctx.thread_local.borrow_file_table();
let fd = file_table.unwrap().write().insert(signal_file, fd_flags);
@ -118,17 +118,6 @@ fn update_existing_signalfd(
Ok(fd)
}
fn register_observer(ctx: &Context, signal_file: &Arc<SignalFile>, mask: SigMask) -> Result<()> {
// The `mask` specifies the set of signals that are accepted by the signalfd,
// so we need to filter out the signals that are not in the mask.
let filter = SigEventsFilter::new(!mask);
ctx.posix_thread
.register_sigqueue_observer(signal_file.observer_ref(), filter);
Ok(())
}
bitflags! {
/// Signal file descriptor creation flags
struct SignalFileFlags: u32 {
@ -144,42 +133,24 @@ bitflags! {
struct SignalFile {
/// Atomic signal mask for filtering signals
signals_mask: AtomicSigMask,
/// I/O event notifier
pollee: Pollee,
/// Non-blocking mode flag
non_blocking: AtomicBool,
/// Weak reference to self as an observer
weak_self: Weak<dyn Observer<SigEvents>>,
}
impl SignalFile {
/// Create a new signalfd instance
fn new(mask: AtomicSigMask, non_blocking: bool) -> Arc<Self> {
Arc::new_cyclic(|weak_ref| {
let weak_self = weak_ref.clone() as Weak<dyn Observer<SigEvents>>;
Self {
signals_mask: mask,
pollee: Pollee::new(),
non_blocking: AtomicBool::new(non_blocking),
weak_self,
}
})
fn new(mask: AtomicSigMask, non_blocking: bool) -> Self {
Self {
signals_mask: mask,
non_blocking: AtomicBool::new(non_blocking),
}
}
fn mask(&self) -> &AtomicSigMask {
&self.signals_mask
}
fn observer_ref(&self) -> Weak<dyn Observer<SigEvents>> {
self.weak_self.clone()
}
fn update_signal_mask(&self, new_mask: SigMask) -> Result<()> {
if let Some(thread) = current_thread!().as_posix_thread() {
thread.unregister_sigqueue_observer(&self.weak_self);
let filter = SigEventsFilter::new(!new_mask);
thread.register_sigqueue_observer(self.weak_self.clone(), filter);
}
self.signals_mask.store(new_mask, Ordering::Relaxed);
Ok(())
}
@ -193,14 +164,9 @@ impl SignalFile {
}
/// Check current readable I/O events
fn check_io_events(&self) -> IoEvents {
let current = current_thread!();
let Some(thread) = current.as_posix_thread() else {
return IoEvents::empty();
};
fn check_io_events(&self, posix_thread: &PosixThread) -> IoEvents {
let mask = self.signals_mask.load(Ordering::Relaxed);
if thread.sig_pending().intersects(mask) {
if posix_thread.pending_signals().intersects(mask) {
IoEvents::IN
} else {
IoEvents::empty()
@ -208,12 +174,7 @@ impl SignalFile {
}
/// Attempt non-blocking read operation
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
let current = current_thread!();
let thread = current
.as_posix_thread()
.ok_or_else(|| Error::with_message(Errno::ESRCH, "Not a POSIX thread"))?;
fn try_read(&self, writer: &mut VmWriter, thread: &PosixThread) -> Result<usize> {
// Mask is inverted to get the signals that are not blocked
let mask = !self.signals_mask.load(Ordering::Relaxed);
let max_signals = writer.avail() / size_of::<SignalfdSiginfo>();
@ -224,7 +185,6 @@ impl SignalFile {
Some(signal) => {
writer.write_val(&signal.to_signalfd_siginfo())?;
count += 1;
self.pollee.invalidate();
}
None => break,
}
@ -237,25 +197,18 @@ impl SignalFile {
}
}
impl Observer<SigEvents> for SignalFile {
// TODO: Fix signal notifications.
// Child processes do not inherit the parent's observer mechanism for signal event notifications.
// `sys_poll` with blocking mode gets stuck if the signal is received after polling.
fn on_events(&self, events: &SigEvents) {
if self
.signals_mask
.load(Ordering::Relaxed)
.contains(events.sig_num())
{
self.pollee.notify(IoEvents::IN);
}
}
}
impl Pollable for SignalFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.pollee
.poll_with(mask, poller, || self.check_io_events())
let current = current_thread!();
let Some(posix_thread) = current.as_posix_thread() else {
return IoEvents::empty();
};
if let Some(poller) = poller {
posix_thread.register_signalfd_poller(poller, mask);
}
self.check_io_events(posix_thread) & mask
}
}
@ -265,10 +218,27 @@ impl FileLike for SignalFile {
return_errno_with_message!(Errno::EINVAL, "Buffer too small for siginfo structure");
}
if self.is_non_blocking() {
self.try_read(writer)
} else {
self.wait_events(IoEvents::IN, None, || self.try_read(writer))
let thread = current_thread!();
let posix_thread = thread
.as_posix_thread()
.ok_or_else(|| Error::with_message(Errno::ESRCH, "Not a POSIX thread"))?;
// Fast path: There are already pending signals or the signalfd is non-blocking.
// So we don't need to create and register the poller.
match self.try_read(writer, posix_thread) {
Err(e) if e.error() == Errno::EAGAIN && !self.is_non_blocking() => {}
res => return res,
}
// Slow path
let mut poller = Poller::new(None);
posix_thread.register_signalfd_poller(poller.as_handle_mut(), IoEvents::IN);
loop {
match self.try_read(writer, posix_thread) {
Err(e) if e.error() == Errno::EAGAIN => poller.wait()?,
res => return res,
}
}
}
@ -310,15 +280,6 @@ impl FileLike for SignalFile {
}
}
impl Drop for SignalFile {
// TODO: Fix signal notifications. See `on_events` method.
fn drop(&mut self) {
if let Some(thread) = current_thread!().as_posix_thread() {
thread.unregister_sigqueue_observer(&self.weak_self);
}
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone, Pod)]
struct SignalfdSiginfo {