Refactor `add_watch` and `remove_watch`

This commit is contained in:
Ruihan Li 2026-01-03 11:35:32 +08:00 committed by Tate, Hongliang Tian
parent 1ab7550c83
commit 4a93b34e3d
2 changed files with 68 additions and 106 deletions

View File

@ -6,17 +6,13 @@ use alloc::{
sync::{Arc, Weak},
};
use core::{
any::Any,
fmt::Display,
sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
};
use bitflags::bitflags;
use hashbrown::HashMap;
use ostd::{
mm::VmWriter,
sync::{Mutex, SpinLock},
};
use ostd::{mm::VmWriter, sync::SpinLock};
use crate::{
events::IoEvents,
@ -37,7 +33,7 @@ use crate::{
#[derive(Clone)]
struct SubscriberEntry {
inode: Weak<dyn Inode>,
subscriber: Weak<dyn FsEventSubscriber>,
subscriber: Weak<InotifySubscriber>,
}
/// A file-like object that provides inotify functionality.
@ -45,12 +41,10 @@ struct SubscriberEntry {
/// `InotifyFile` accepts events from multiple inotify subscribers (watches) on different inodes.
/// Users should read events from this file to receive notifications about filesystem changes.
pub struct InotifyFile {
// Lock to serialize watch updates and removals.
watch_lock: Mutex<()>,
// Next watch descriptor to allocate.
next_wd: AtomicU32,
// A map from watch descriptor to subscriber entry.
watch_map: RwLock<HashMap<u32, SubscriberEntry>>,
// A map from watch descriptors to subscriber entries.
watch_map: SpinLock<HashMap<u32, SubscriberEntry>>,
// Whether the file is opened in non-blocking mode.
is_nonblocking: AtomicBool,
// Bounded queue of inotify events.
@ -68,8 +62,9 @@ impl Drop for InotifyFile {
///
/// This will remove all subscribers from their inodes.
fn drop(&mut self) {
let mut watch_map = self.watch_map.write();
for (_, entry) in watch_map.iter() {
let watch_map = self.watch_map.get_mut();
for (_, entry) in watch_map.drain() {
let (Some(inode), Some(subscriber)) =
(entry.inode.upgrade(), entry.subscriber.upgrade())
else {
@ -79,12 +74,11 @@ impl Drop for InotifyFile {
if inode
.fs_event_publisher()
.unwrap()
.remove_subscriber(&subscriber)
.remove_subscriber(&(subscriber as _))
{
inode.fs().fs_event_subscriber_stats().remove_subscriber();
}
}
watch_map.clear();
}
}
@ -97,11 +91,10 @@ impl InotifyFile {
/// Creates a new inotify file.
pub fn new(is_nonblocking: bool) -> Result<Arc<Self>> {
Ok(Arc::new_cyclic(|weak_self| Self {
watch_lock: Mutex::new(()),
// Allocate watch descriptors from 1.
// Reference: <https://elixir.bootlin.com/linux/v6.17/source/fs/notify/inotify/inotify_user.c#L402>
next_wd: AtomicU32::new(1),
watch_map: RwLock::new(HashMap::new()),
watch_map: SpinLock::new(HashMap::new()),
is_nonblocking: AtomicBool::new(is_nonblocking),
event_queue: SpinLock::new(VecDeque::new()),
queue_capacity: DEFAULT_MAX_QUEUED_EVENTS,
@ -133,114 +126,79 @@ impl InotifyFile {
interesting: InotifyEvents,
options: InotifyControls,
) -> Result<u32> {
// Serialize updates so concurrent callers do not create duplicate watches.
let _guard = self.watch_lock.lock();
let mut watch_map = self.watch_map.lock();
// Try to update existing subscriber first
match self.update_existing_subscriber(path, interesting, options) {
Ok(wd) => Ok(wd),
Err(e) if e.error() == Errno::ENOENT => {
// Subscriber not found, create a new one
self.create_new_subscriber(path, interesting, options)
// Try to find and update the existing subscriber first.
let inode_weak = Arc::downgrade(path.inode());
for (wd, entry) in watch_map.iter() {
if !Weak::ptr_eq(&entry.inode, &inode_weak) {
continue;
}
Err(e) => Err(e),
// The inode has been unlinked and the subscriber is dead. We shouldn't need to update
// since no new events can occur.
let Some(subscriber) = entry.subscriber.upgrade() else {
return Ok(*wd);
};
subscriber.update(interesting, options)?;
path.inode()
.fs_event_publisher()
.unwrap()
.update_subscriber_events();
return Ok(*wd);
}
// Create a new subscriber and register it.
let inotify_subscriber = InotifySubscriber::new(self.this(), interesting, options)?;
let subscriber = inotify_subscriber.clone() as Arc<dyn FsEventSubscriber>;
let inode = path.inode();
if inode
.fs_event_publisher_or_init()
.add_subscriber(subscriber)
{
inode.fs().fs_event_subscriber_stats().add_subscriber();
}
let wd = inotify_subscriber.wd();
let entry = SubscriberEntry {
inode: inode_weak,
subscriber: Arc::downgrade(&inotify_subscriber),
};
watch_map.insert(wd, entry);
Ok(wd)
}
/// Removes a watch by watch descriptor.
pub fn remove_watch(&self, wd: u32) -> Result<()> {
let _guard = self.watch_lock.lock();
let mut watch_map = self.watch_map.lock();
let mut watch_map = self.watch_map.write();
let Some(entry) = watch_map.remove(&wd) else {
return_errno_with_message!(Errno::EINVAL, "watch not found");
return_errno_with_message!(Errno::EINVAL, "the inotify watch does not exist");
};
// When concurrent removal happens, the weak refs may have already been dropped.
// Try to upgrade the weak refs; if either side is gone, treat as already removed.
let (inode, subscriber) = match (entry.inode.upgrade(), entry.subscriber.upgrade()) {
(Some(i), Some(s)) => (i, s),
_ => return_errno_with_message!(Errno::EINVAL, "watch not found"),
(Some(inode), Some(subscriber)) => (inode, subscriber),
// The inode has been unlinked and the subscriber is dead. The watch is considered
// removed, so we return an error.
_ => return_errno_with_message!(Errno::EINVAL, "the inotify watch does not exist"),
};
if inode
.fs_event_publisher()
.unwrap()
.remove_subscriber(&subscriber)
.remove_subscriber(&(subscriber as _))
{
inode.fs().fs_event_subscriber_stats().remove_subscriber();
}
Ok(())
}
/// Updates an existing inotify subscriber.
fn update_existing_subscriber(
&self,
path: &Path,
interesting: InotifyEvents,
options: InotifyControls,
) -> Result<u32> {
let Some(publisher) = path.inode().fs_event_publisher() else {
return_errno_with_message!(Errno::ENOENT, "watch not found");
};
let inotify_file = self.this();
let result = publisher.find_subscriber_and_process(|subscriber| {
// Try to downcast to InotifySubscriber and check if it belongs to this InotifyFile.
let inotify_subscriber =
(subscriber.as_ref() as &dyn Any).downcast_ref::<InotifySubscriber>()?;
if Arc::ptr_eq(&inotify_subscriber.inotify_file(), &inotify_file) {
// Found the matching subscriber, perform the update in place.
Some(inotify_subscriber.update(interesting, options))
} else {
None
}
});
if let Some(result) = result {
// Notify publisher to recalculate aggregated events after subscriber update.
publisher.update_subscriber_events();
return result;
}
// If the subscriber is not found, return ENOENT.
return_errno_with_message!(Errno::ENOENT, "watch not found");
}
/// Creates a new FS event subscriber and activates it.
fn create_new_subscriber(
&self,
path: &Path,
interesting: InotifyEvents,
options: InotifyControls,
) -> Result<u32> {
let inotify_subscriber = InotifySubscriber::new(self.this(), interesting, options)?;
let subscriber = inotify_subscriber.clone() as Arc<dyn FsEventSubscriber>;
if path
.inode()
.fs_event_publisher_or_init()
.add_subscriber(subscriber.clone())
{
path.inode()
.fs()
.fs_event_subscriber_stats()
.add_subscriber();
}
let wd = inotify_subscriber.wd();
self.watch_map.write().insert(
wd,
SubscriberEntry {
inode: Arc::downgrade(path.inode()),
subscriber: Arc::downgrade(&subscriber),
},
);
Ok(wd)
}
/// Sends an inotify event to the inotify file.
///
/// The event will be queued and can be read by users.
@ -411,7 +369,7 @@ impl FileLike for InotifyFile {
writeln!(f, "mnt_id:\t{}", RESERVED_MOUNT_ID)?;
writeln!(f, "ino:\t{}", self.inner.inode().ino())?;
for (wd, entry) in self.inner.watch_map.read().iter() {
for (wd, entry) in self.inner.watch_map.lock().iter() {
let Some(inode) = entry.inode.upgrade() else {
continue;
};
@ -510,9 +468,9 @@ impl InotifySubscriber {
}
/// Updates the interesting events and options atomically.
fn update(&self, interesting: InotifyEvents, options: InotifyControls) -> Result<u32> {
fn update(&self, interesting: InotifyEvents, options: InotifyControls) -> Result<()> {
if options.contains(InotifyControls::MASK_CREATE) {
return_errno_with_message!(Errno::EEXIST, "watch already exists");
return_errno_with_message!(Errno::EEXIST, "the inotify watch already exists");
}
let mut merged_interesting = interesting;
@ -525,7 +483,8 @@ impl InotifySubscriber {
merged_options.remove(InotifyControls::MASK_ADD);
self.update_interesting_and_controls(merged_interesting.bits(), merged_options.bits());
Ok(self.wd())
Ok(())
}
/// Updates the interesting events and options atomically with raw bits.

View File

@ -133,7 +133,9 @@ impl FsEventPublisher {
/// Updates the aggregated events when a subscriber's interesting events change.
pub fn update_subscriber_events(&self) {
let subscribers = self.subscribers.read();
// Take a write lock to avoid race conditions that may change `all_interesting_events` to
// an outdated value.
let subscribers = self.subscribers.write();
self.recalc_interesting_events(&subscribers);
}
@ -151,6 +153,7 @@ impl FsEventPublisher {
///
/// The matcher should return `Some(T)` if the subscriber matches and processing
/// should stop, or `None` to continue searching.
#[expect(dead_code)]
pub fn find_subscriber_and_process<F, T>(&self, mut matcher: F) -> Option<T>
where
F: FnMut(&Arc<dyn FsEventSubscriber>) -> Option<T>,