Refactor named pipe to correct its opening and blocking behaviors
This commit is contained in:
parent
5eddf21596
commit
d42b006e1a
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -35,13 +36,13 @@ impl Pollable for Full {
|
|||
}
|
||||
|
||||
impl FileIo for Full {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let len = writer.avail();
|
||||
writer.fill_zeros(len)?;
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn write(&self, _reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, _reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::ENOSPC, "no space left on /dev/full")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -35,11 +36,11 @@ impl Pollable for Null {
|
|||
}
|
||||
|
||||
impl FileIo for Null {
|
||||
fn read(&self, _writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, _writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let len = reader.remain();
|
||||
reader.skip(len);
|
||||
Ok(len)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ use crate::{
|
|||
file_table::FdFlags,
|
||||
fs_resolver::FsPath,
|
||||
inode_handle::FileIo,
|
||||
utils::{mkmod, AccessMode, Inode, IoctlCmd, OpenArgs},
|
||||
utils::{mkmod, AccessMode, Inode, IoctlCmd, OpenArgs, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::{
|
||||
|
|
@ -75,7 +75,7 @@ impl Pollable for PtyMaster {
|
|||
}
|
||||
|
||||
impl FileIo for PtyMaster {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
// TODO: Add support for non-blocking mode and timeout
|
||||
let mut buf = vec![0u8; writer.avail().min(IO_CAPACITY)];
|
||||
let read_len = self.wait_events(IoEvents::IN, None, || {
|
||||
|
|
@ -89,7 +89,7 @@ impl FileIo for PtyMaster {
|
|||
Ok(read_len)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let mut buf = vec![0u8; reader.remain().min(IO_CAPACITY)];
|
||||
let write_len = reader.read_fallible(&mut buf.as_mut_slice().into())?;
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -43,11 +44,11 @@ impl Pollable for Random {
|
|||
}
|
||||
|
||||
impl FileIo for Random {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
Self::getrandom(writer)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let len = reader.remain();
|
||||
reader.skip(len);
|
||||
Ok(len)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::IoctlCmd,
|
||||
utils::{IoctlCmd, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -99,11 +99,11 @@ impl Pollable for TdxGuest {
|
|||
}
|
||||
|
||||
impl FileIo for TdxGuest {
|
||||
fn read(&self, _writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, _writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EPERM, "Read operation not supported")
|
||||
}
|
||||
|
||||
fn write(&self, _reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, _reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EPERM, "Write operation not supported")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -42,11 +43,11 @@ impl Pollable for TtyDevice {
|
|||
}
|
||||
|
||||
impl FileIo for TtyDevice {
|
||||
fn read(&self, _writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, _writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EINVAL, "cannot read tty device");
|
||||
}
|
||||
|
||||
fn write(&self, _reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, _reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EINVAL, "cannot write tty device");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::IoctlCmd,
|
||||
utils::{IoctlCmd, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::{
|
||||
|
|
@ -232,7 +232,7 @@ impl<D: TtyDriver> Pollable for Tty<D> {
|
|||
}
|
||||
|
||||
impl<D: TtyDriver> FileIo for Tty<D> {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
self.job_control.wait_until_in_foreground()?;
|
||||
|
||||
// TODO: Add support for non-blocking mode and timeout
|
||||
|
|
@ -247,7 +247,7 @@ impl<D: TtyDriver> FileIo for Tty<D> {
|
|||
Ok(read_len)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let mut buf = vec![0u8; reader.remain().min(IO_CAPACITY)];
|
||||
let write_len = reader.read_fallible(&mut buf.as_mut_slice().into())?;
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -60,11 +61,11 @@ impl Pollable for Urandom {
|
|||
}
|
||||
|
||||
impl FileIo for Urandom {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
Self::getrandom(writer)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let len = reader.remain();
|
||||
reader.skip(len);
|
||||
Ok(len)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
fs::{
|
||||
device::{Device, DeviceId, DeviceType},
|
||||
inode_handle::FileIo,
|
||||
utils::StatusFlags,
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
|
|
@ -35,12 +36,12 @@ impl Pollable for Zero {
|
|||
}
|
||||
|
||||
impl FileIo for Zero {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
let read_len = writer.fill_zeros(writer.avail())?;
|
||||
Ok(read_len)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
Ok(reader.remain())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,10 @@
|
|||
use super::*;
|
||||
use crate::{
|
||||
events::IoEvents,
|
||||
fs::inode_handle::FileIo,
|
||||
fs::{
|
||||
inode_handle::FileIo,
|
||||
utils::{AccessMode, StatusFlags},
|
||||
},
|
||||
process::signal::{PollHandle, Pollable},
|
||||
};
|
||||
|
||||
|
|
@ -199,11 +202,11 @@ impl Pollable for Inner {
|
|||
}
|
||||
|
||||
impl FileIo for Inner {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn read(&self, writer: &mut VmWriter, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EINVAL, "cannot read ptmx");
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn write(&self, reader: &mut VmReader, _status_flags: StatusFlags) -> Result<usize> {
|
||||
return_errno_with_message!(Errno::EINVAL, "cannot write ptmx");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,10 @@ use super::*;
|
|||
use crate::{
|
||||
device::PtySlave,
|
||||
events::IoEvents,
|
||||
fs::inode_handle::FileIo,
|
||||
fs::{
|
||||
inode_handle::FileIo,
|
||||
utils::{AccessMode, StatusFlags},
|
||||
},
|
||||
process::signal::{PollHandle, Pollable},
|
||||
};
|
||||
|
||||
|
|
@ -117,19 +120,19 @@ impl Inode for PtySlaveInode {
|
|||
}
|
||||
|
||||
fn read_at(&self, offset: usize, writer: &mut VmWriter) -> Result<usize> {
|
||||
self.device.read(writer)
|
||||
self.device.read(writer, StatusFlags::empty())
|
||||
}
|
||||
|
||||
fn read_direct_at(&self, offset: usize, writer: &mut VmWriter) -> Result<usize> {
|
||||
self.device.read(writer)
|
||||
self.device.read(writer, StatusFlags::empty())
|
||||
}
|
||||
|
||||
fn write_at(&self, offset: usize, reader: &mut VmReader) -> Result<usize> {
|
||||
self.device.write(reader)
|
||||
self.device.write(reader, StatusFlags::empty())
|
||||
}
|
||||
|
||||
fn write_direct_at(&self, offset: usize, reader: &mut VmReader) -> Result<usize> {
|
||||
self.device.write(reader)
|
||||
self.device.write(reader, StatusFlags::empty())
|
||||
}
|
||||
|
||||
fn ioctl(&self, cmd: IoctlCmd, arg: usize) -> Result<i32> {
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ struct InodeHandle_ {
|
|||
impl InodeHandle_ {
|
||||
pub fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
if let Some(ref file_io) = self.file_io {
|
||||
return file_io.read(writer);
|
||||
return file_io.read(writer, self.status_flags());
|
||||
}
|
||||
|
||||
if !self.path.inode().is_seekable() {
|
||||
|
|
@ -64,7 +64,7 @@ impl InodeHandle_ {
|
|||
|
||||
pub fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
if let Some(ref file_io) = self.file_io {
|
||||
return file_io.write(reader);
|
||||
return file_io.write(reader, self.status_flags());
|
||||
}
|
||||
|
||||
if !self.path.inode().is_seekable() {
|
||||
|
|
@ -345,10 +345,20 @@ impl<R> Drop for InodeHandle<R> {
|
|||
}
|
||||
}
|
||||
|
||||
/// A trait for file-like objects that provide custom I/O operations.
|
||||
///
|
||||
/// This trait is typically implemented for special files like devices or
|
||||
/// named pipes (FIFOs), which have behaviors different from regular on-disk files.
|
||||
//
|
||||
// TODO: The `status_flags` parameter in `read` and `write` may need to be stored directly
|
||||
// in the `FileIo`. We need further refactoring to find an appropriate way to enable `FileIo`
|
||||
// to utilize the information in the `InodeHandle_`.
|
||||
pub trait FileIo: Pollable + Send + Sync + 'static {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize>;
|
||||
/// Reads data from the file into the given `VmWriter`.
|
||||
fn read(&self, writer: &mut VmWriter, status_flags: StatusFlags) -> Result<usize>;
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize>;
|
||||
/// Writes data from the given `VmReader` into the file.
|
||||
fn write(&self, reader: &mut VmReader, status_flags: StatusFlags) -> Result<usize>;
|
||||
|
||||
/// See [`FileLike::mappable`].
|
||||
fn mappable(&self) -> Result<Mappable> {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ pub mod file_handle;
|
|||
pub mod file_table;
|
||||
pub mod fs_resolver;
|
||||
pub mod inode_handle;
|
||||
pub mod named_pipe;
|
||||
pub mod overlayfs;
|
||||
pub mod path;
|
||||
pub mod pipe;
|
||||
|
|
|
|||
|
|
@ -1,50 +0,0 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use super::{
|
||||
file_handle::FileLike,
|
||||
pipe::{self, PipeReader, PipeWriter},
|
||||
utils::{AccessMode, Metadata},
|
||||
};
|
||||
use crate::{
|
||||
events::IoEvents,
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
};
|
||||
|
||||
pub struct NamedPipe {
|
||||
reader: Arc<PipeReader>,
|
||||
writer: Arc<PipeWriter>,
|
||||
}
|
||||
|
||||
impl NamedPipe {
|
||||
pub fn new() -> Result<Self> {
|
||||
let (reader, writer) = pipe::new_pair()?;
|
||||
|
||||
Ok(Self { reader, writer })
|
||||
}
|
||||
}
|
||||
|
||||
impl Pollable for NamedPipe {
|
||||
fn poll(&self, _mask: IoEvents, _poller: Option<&mut PollHandle>) -> IoEvents {
|
||||
warn!("Named pipe doesn't support poll now, return IoEvents::empty for now.");
|
||||
IoEvents::empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileLike for NamedPipe {
|
||||
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
self.reader.read(writer)
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
self.writer.write(reader)
|
||||
}
|
||||
|
||||
fn access_mode(&self) -> AccessMode {
|
||||
AccessMode::O_RDWR
|
||||
}
|
||||
|
||||
fn metadata(&self) -> Metadata {
|
||||
self.reader.metadata()
|
||||
}
|
||||
}
|
||||
|
|
@ -423,19 +423,11 @@ impl Path {
|
|||
let inode_type = inode.type_();
|
||||
let creation_flags = &open_args.creation_flags;
|
||||
|
||||
match inode_type {
|
||||
InodeType::NamedPipe => {
|
||||
warn!("named pipes don't support additional operation when opening");
|
||||
debug!("the named pipe is opened with {:?}", open_args);
|
||||
}
|
||||
InodeType::SymLink => {
|
||||
if creation_flags.contains(CreationFlags::O_NOFOLLOW)
|
||||
&& !open_args.status_flags.contains(StatusFlags::O_PATH)
|
||||
{
|
||||
return_errno_with_message!(Errno::ELOOP, "the file is a symlink");
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
if inode_type == InodeType::SymLink
|
||||
&& creation_flags.contains(CreationFlags::O_NOFOLLOW)
|
||||
&& !open_args.status_flags.contains(StatusFlags::O_PATH)
|
||||
{
|
||||
return_errno_with_message!(Errno::ELOOP, "the file is a symlink");
|
||||
}
|
||||
|
||||
if creation_flags.contains(CreationFlags::O_CREAT)
|
||||
|
|
|
|||
|
|
@ -71,6 +71,10 @@ impl PipeReader {
|
|||
self.state.peer_shutdown();
|
||||
}
|
||||
|
||||
pub(super) fn peer_activate(&self) {
|
||||
self.state.peer_activate();
|
||||
}
|
||||
|
||||
fn check_io_events(&self) -> IoEvents {
|
||||
let mut events = IoEvents::empty();
|
||||
if self.state.is_peer_shutdown() {
|
||||
|
|
@ -133,6 +137,10 @@ impl PipeWriter {
|
|||
self.state.shutdown();
|
||||
}
|
||||
|
||||
pub(super) fn activate(&self) {
|
||||
self.state.activate();
|
||||
}
|
||||
|
||||
fn check_io_events(&self) -> IoEvents {
|
||||
if self.state.is_shutdown() {
|
||||
IoEvents::ERR | IoEvents::OUT
|
||||
|
|
|
|||
|
|
@ -0,0 +1,251 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use core::{
|
||||
num::Wrapping,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use ostd::sync::WaitQueue;
|
||||
|
||||
use super::common::{PipeReader, PipeWriter};
|
||||
use crate::{
|
||||
events::IoEvents,
|
||||
fs::{
|
||||
inode_handle::FileIo,
|
||||
utils::{AccessMode, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
};
|
||||
|
||||
/// A handle for a named pipe that implements `FileIo`.
|
||||
///
|
||||
/// Once a handle for a `NamedPipe` exists, the corresponding pipe object will
|
||||
/// not be dropped.
|
||||
struct NamedPipeHandle {
|
||||
inner: Arc<PipeObj>,
|
||||
access_mode: AccessMode,
|
||||
}
|
||||
|
||||
impl NamedPipeHandle {
|
||||
fn new(inner: Arc<PipeObj>, access_mode: AccessMode) -> Arc<Self> {
|
||||
Arc::new(Self { inner, access_mode })
|
||||
}
|
||||
|
||||
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_readable());
|
||||
|
||||
self.inner.reader.try_read(writer)
|
||||
}
|
||||
|
||||
fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_writable());
|
||||
|
||||
self.inner.writer.try_write(reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl Pollable for NamedPipeHandle {
|
||||
fn poll(&self, mask: IoEvents, mut poller: Option<&mut PollHandle>) -> IoEvents {
|
||||
let mut events = IoEvents::empty();
|
||||
|
||||
if self.access_mode.is_readable() {
|
||||
events |= self.inner.reader.poll(mask, poller.as_deref_mut());
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
events |= self.inner.writer.poll(mask, poller);
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for NamedPipeHandle {
|
||||
fn drop(&mut self) {
|
||||
if self.access_mode.is_readable() {
|
||||
let old_value = self.inner.num_reader.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.reader.peer_shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
let old_value = self.inner.num_writer.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.writer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileIo for NamedPipeHandle {
|
||||
fn read(&self, writer: &mut VmWriter, status_flags: StatusFlags) -> Result<usize> {
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_read(writer)
|
||||
} else {
|
||||
self.wait_events(IoEvents::IN, None, || self.try_read(writer))
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&self, reader: &mut VmReader, status_flags: StatusFlags) -> Result<usize> {
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_write(reader)
|
||||
} else {
|
||||
self.wait_events(IoEvents::OUT, None, || self.try_write(reader))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A named pipe (FIFO) that provides inter-process communication.
|
||||
///
|
||||
/// Named pipes are special files that appear in the filesystem and provide
|
||||
/// a communication channel between processes. It can be opened multiple times
|
||||
/// for reading, writing, or both.
|
||||
///
|
||||
/// A `NamedPipe` will maintain exactly one **pipe object** that provides actual pipe
|
||||
/// functionalities when there is at least one handle opened on it.
|
||||
pub struct NamedPipe {
|
||||
pipe: Mutex<NamedPipeInner>,
|
||||
wait_queue: WaitQueue,
|
||||
}
|
||||
|
||||
impl NamedPipe {
|
||||
/// Creates a new named pipe.
|
||||
pub fn new() -> Result<Self> {
|
||||
Ok(Self {
|
||||
pipe: Mutex::new(NamedPipeInner::default()),
|
||||
wait_queue: WaitQueue::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Opens the named pipe with the specified access mode and status flags.
|
||||
///
|
||||
/// Returns a handle that implements `FileIo` for performing I/O operations.
|
||||
///
|
||||
/// The open behavior follows POSIX semantics:
|
||||
/// - Opening for read-only blocks until a writer opens the pipe.
|
||||
/// - Opening for write-only blocks until a reader opens the pipe.
|
||||
/// - Opening for read-write never blocks.
|
||||
///
|
||||
/// If no handle of this named pipe has existed, the method will create a new pipe object.
|
||||
/// Otherwise, it will return a handle that works on the existing pipe object.
|
||||
pub fn open(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<Arc<dyn FileIo>> {
|
||||
let mut pipe = self.pipe.lock();
|
||||
let pipe_obj = pipe.get_or_create_pipe_obj();
|
||||
|
||||
let handle: Arc<dyn FileIo> = match access_mode {
|
||||
AccessMode::O_RDONLY => {
|
||||
pipe.read_count += 1;
|
||||
|
||||
let old_value = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
if old_value == 0 {
|
||||
pipe_obj.reader.peer_activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_writer = pipe_obj.num_writer.load(Ordering::Relaxed) > 0;
|
||||
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
if !status_flags.contains(StatusFlags::O_NONBLOCK) && !has_writer {
|
||||
let old_write_count = pipe.write_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_write_count != self.pipe.lock().write_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_WRONLY => {
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_writer == 0 {
|
||||
pipe_obj.writer.activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_reader = pipe_obj.num_reader.load(Ordering::Relaxed) > 0;
|
||||
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
if !has_reader {
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
return_errno_with_message!(Errno::ENXIO, "no reader is present");
|
||||
}
|
||||
|
||||
let old_read_count = pipe.read_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_read_count != self.pipe.lock().read_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_RDWR => {
|
||||
pipe.read_count += 1;
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_reader = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_reader == 0 || old_num_writer == 0 {
|
||||
self.wait_queue.wake_all();
|
||||
pipe_obj.writer.activate();
|
||||
}
|
||||
|
||||
NamedPipeHandle::new(pipe_obj, access_mode)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
}
|
||||
|
||||
struct PipeObj {
|
||||
reader: PipeReader,
|
||||
writer: PipeWriter,
|
||||
num_reader: AtomicUsize,
|
||||
num_writer: AtomicUsize,
|
||||
}
|
||||
|
||||
impl PipeObj {
|
||||
fn new() -> Arc<Self> {
|
||||
let (reader, writer) = super::common::new_pair();
|
||||
Arc::new(Self {
|
||||
reader,
|
||||
writer,
|
||||
num_reader: AtomicUsize::new(0),
|
||||
num_writer: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct NamedPipeInner {
|
||||
// Holding a weak reference here ensures that the pipe object (including
|
||||
// the buffer data) will be dropped when no handle is open on the named
|
||||
// pipe. This is consistent with Linux behavior.
|
||||
pipe_obj: Weak<PipeObj>,
|
||||
read_count: Wrapping<usize>,
|
||||
write_count: Wrapping<usize>,
|
||||
}
|
||||
|
||||
impl NamedPipeInner {
|
||||
fn get_or_create_pipe_obj(&mut self) -> Arc<PipeObj> {
|
||||
if let Some(pipe_obj) = self.pipe_obj.upgrade() {
|
||||
return pipe_obj;
|
||||
}
|
||||
|
||||
let pipe_obj = PipeObj::new();
|
||||
self.pipe_obj = Arc::downgrade(&pipe_obj);
|
||||
|
||||
pipe_obj
|
||||
}
|
||||
}
|
||||
|
|
@ -19,14 +19,15 @@ use crate::{
|
|||
events::IoEvents,
|
||||
fs::{
|
||||
device::Device,
|
||||
file_handle::FileLike,
|
||||
named_pipe::NamedPipe,
|
||||
inode_handle::FileIo,
|
||||
path::{is_dot, is_dot_or_dotdot, is_dotdot},
|
||||
pipe::NamedPipe,
|
||||
registry::{FsProperties, FsType},
|
||||
utils::{
|
||||
mkmod, CStr256, CachePage, DirentVisitor, Extension, FallocMode, FileSystem, FsFlags,
|
||||
Inode, InodeMode, InodeType, IoctlCmd, Metadata, MknodType, PageCache,
|
||||
PageCacheBackend, Permission, SuperBlock, XattrName, XattrNamespace, XattrSetFlags,
|
||||
mkmod, AccessMode, CStr256, CachePage, DirentVisitor, Extension, FallocMode,
|
||||
FileSystem, FsFlags, Inode, InodeMode, InodeType, IoctlCmd, Metadata, MknodType,
|
||||
PageCache, PageCacheBackend, Permission, StatusFlags, SuperBlock, XattrName,
|
||||
XattrNamespace, XattrSetFlags,
|
||||
},
|
||||
},
|
||||
prelude::*,
|
||||
|
|
@ -549,11 +550,10 @@ impl Inode for RamInode {
|
|||
read_len
|
||||
}
|
||||
Inner::Device(device) => {
|
||||
device.read(writer)?
|
||||
device.read(writer, StatusFlags::empty())?
|
||||
// Typically, devices like "/dev/zero" or "/dev/null" do not require modifying
|
||||
// timestamps here. Please adjust this behavior accordingly if there are special devices.
|
||||
}
|
||||
Inner::NamedPipe(named_pipe) => named_pipe.read(writer)?,
|
||||
_ => return_errno_with_message!(Errno::EISDIR, "read is not supported"),
|
||||
}
|
||||
};
|
||||
|
|
@ -595,14 +595,10 @@ impl Inode for RamInode {
|
|||
}
|
||||
InodeType::CharDevice | InodeType::BlockDevice => {
|
||||
let device = self.inner.as_device().unwrap();
|
||||
device.write(reader)?
|
||||
device.write(reader, StatusFlags::empty())?
|
||||
// Typically, devices like "/dev/zero" or "/dev/null" do not require modifying
|
||||
// timestamps here. Please adjust this behavior accordingly if there are special devices.
|
||||
}
|
||||
InodeType::NamedPipe => {
|
||||
let named_pipe = self.inner.as_named_pipe().unwrap();
|
||||
named_pipe.write(reader)?
|
||||
}
|
||||
_ => return_errno_with_message!(Errno::EISDIR, "write is not supported"),
|
||||
};
|
||||
Ok(written_len)
|
||||
|
|
|
|||
|
|
@ -194,7 +194,8 @@ impl<T: AsRef<EndpointState>> Endpoint<T> {
|
|||
|
||||
/// Shuts down the local endpoint.
|
||||
///
|
||||
/// After this method, data cannot be written to from the local endpoint.
|
||||
/// After this method, data cannot be written to from the local endpoint
|
||||
/// until it is activated again.
|
||||
pub fn shutdown(&self) {
|
||||
let this_end = self.this_end().as_ref();
|
||||
let peer_end = self.peer_end().as_ref();
|
||||
|
|
@ -204,7 +205,8 @@ impl<T: AsRef<EndpointState>> Endpoint<T> {
|
|||
|
||||
/// Shuts down the remote endpoint.
|
||||
///
|
||||
/// After this method, data cannot be written to from the remote endpoint.
|
||||
/// After this method, data cannot be written to from the remote endpoint
|
||||
/// until it is activated again.
|
||||
pub fn peer_shutdown(&self) {
|
||||
let this_end = self.this_end().as_ref();
|
||||
let peer_end = self.peer_end().as_ref();
|
||||
|
|
@ -222,6 +224,24 @@ impl<T: AsRef<EndpointState>> Endpoint<T> {
|
|||
.notify(IoEvents::HUP | IoEvents::ERR | IoEvents::OUT);
|
||||
}
|
||||
|
||||
/// Activates the local endpoint.
|
||||
pub(in crate::fs) fn activate(&self) {
|
||||
self.this_end()
|
||||
.as_ref()
|
||||
.is_shutdown
|
||||
.store(false, Ordering::Relaxed);
|
||||
self.this_end().as_ref().pollee.invalidate();
|
||||
}
|
||||
|
||||
/// Activates the remote endpoint.
|
||||
pub(in crate::fs) fn peer_activate(&self) {
|
||||
self.peer_end()
|
||||
.as_ref()
|
||||
.is_shutdown
|
||||
.store(false, Ordering::Relaxed);
|
||||
self.peer_end().as_ref().pollee.invalidate();
|
||||
}
|
||||
|
||||
/// Returns whether the local endpoint has shut down.
|
||||
pub fn is_shutdown(&self) -> bool {
|
||||
self.this_end().as_ref().is_shutdown.load(Ordering::Relaxed)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ use crate::{
|
|||
pub fn sys_pipe2(fds: Vaddr, flags: u32, ctx: &Context) -> Result<SyscallReturn> {
|
||||
debug!("flags: {:?}", flags);
|
||||
|
||||
let (pipe_reader, pipe_writer) = pipe::new_pair_file()?;
|
||||
let (pipe_reader, pipe_writer) = pipe::new_file_pair()?;
|
||||
|
||||
let fd_flags = if CreationFlags::from_bits_truncate(flags).contains(CreationFlags::O_CLOEXEC) {
|
||||
FdFlags::CLOEXEC
|
||||
|
|
|
|||
Loading…
Reference in New Issue