diff --git a/Cargo.lock b/Cargo.lock index 147d45f25..fb5e3d999 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,6 +214,7 @@ dependencies = [ "ostd", "paste", "rand", + "ring-buffer", "riscv", "spin", "takeable", @@ -1546,6 +1547,14 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ring-buffer" +version = "0.1.0" +dependencies = [ + "inherit-methods-macro", + "ostd", +] + [[package]] name = "riscv" version = "0.15.0" diff --git a/Cargo.toml b/Cargo.toml index e6e275665..04dbd6827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ members = [ "kernel/libs/typeflags", "kernel/libs/typeflags-util", "kernel/libs/atomic-integer-wrapper", + "kernel/libs/ring-buffer", "kernel/libs/xarray", ] exclude = [ diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index f8fb85818..e4436f5c4 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -54,6 +54,7 @@ rand = { workspace = true, features = [ "small_rng", "std_rng", ] } +ring-buffer = { path = "libs/ring-buffer" } spin.workspace = true takeable.workspace = true time.workspace = true diff --git a/kernel/libs/ring-buffer/Cargo.toml b/kernel/libs/ring-buffer/Cargo.toml new file mode 100644 index 000000000..0bfb911aa --- /dev/null +++ b/kernel/libs/ring-buffer/Cargo.toml @@ -0,0 +1,11 @@ + [package] + name = "ring-buffer" + version = "0.1.0" + edition = "2024" + + [dependencies] + inherit-methods-macro = { git = "https://github.com/asterinas/inherit-methods-macro", rev = "98f7e3e" } + ostd = { path = "../../../ostd" } + +[lints] +workspace = true diff --git a/kernel/libs/ring-buffer/src/lib.rs b/kernel/libs/ring-buffer/src/lib.rs new file mode 100644 index 000000000..a0bcefff5 --- /dev/null +++ b/kernel/libs/ring-buffer/src/lib.rs @@ -0,0 +1,406 @@ +#![no_std] + +extern crate alloc; + +use alloc::sync::Arc; +use core::{ + marker::PhantomData, + mem::size_of, + num::Wrapping, + ops::Deref, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use inherit_methods_macro::inherit_methods; +use ostd::{ + Pod, + mm::{FrameAllocOptions, Segment, VmIo, PAGE_SIZE, io_util::HasVmReaderWriter}, +}; + +/// A lock-free SPSC FIFO ring buffer backed by a [`Segment<()>`]. +/// +/// The ring buffer supports `push`/`pop` any `T: Pod` items, also +/// supports `write`/`read` any bytes data based on [`VmReader`]/[`VmWriter`]. +/// +/// The ring buffer returns immediately after processing without any blocking. +/// The ring buffer can be shared between threads. +/// +/// # Example +/// +/// ``` +/// use ostd::Pod; +/// use ring_buffer::RingBuffer; +/// +/// #[derive(Pod)] +/// struct Item { +/// a: u32, +/// b: u32, +/// } +/// +/// let rb = RingBuffer::::new(10); +/// let (producer, consumer) = rb.split(); +/// +/// for i in 0..10 { +/// producer.push(Item { a: i, b: i }).unwrap(); +/// } +/// +/// for _ in 0..10 { +/// let item = consumer.pop().unwrap(); +/// assert_eq!(item.a, item.b); +/// } +/// ``` +pub struct RingBuffer { + segment: Segment<()>, + capacity: usize, + tail: AtomicUsize, + head: AtomicUsize, + phantom: PhantomData, +} + +/// A producer of a [`RingBuffer`]. +pub struct Producer>> { + rb: R, + phantom: PhantomData, +} +/// A consumer of a [`RingBuffer`]. +pub struct Consumer>> { + rb: R, + phantom: PhantomData, +} + +pub type RbProducer = Producer>>; +pub type RbConsumer = Consumer>>; + +impl RingBuffer { + const T_SIZE: usize = size_of::(); + + /// Creates a new [`RingBuffer`] with the given capacity. + pub fn new(capacity: usize) -> Self { + assert!( + capacity.is_power_of_two(), + "capacity must be a power of two" + ); + + let nframes = capacity + .checked_mul(Self::T_SIZE) + .unwrap() + .div_ceil(PAGE_SIZE); + let segment = FrameAllocOptions::new() + .zeroed(false) + .alloc_segment(nframes) + .unwrap(); + + Self { + segment, + capacity, + tail: AtomicUsize::new(0), + head: AtomicUsize::new(0), + phantom: PhantomData, + } + } + + /// Splits the [`RingBuffer`] into a producer and a consumer. + pub fn split(self) -> (RbProducer, RbConsumer) { + let producer = Producer { + rb: Arc::new(self), + phantom: PhantomData, + }; + let consumer = Consumer { + rb: Arc::clone(&producer.rb), + phantom: PhantomData, + }; + (producer, consumer) + } + + /// Gets the capacity of the `RingBuffer`. + pub fn capacity(&self) -> usize { + self.capacity + } + + /// Returns the underlying segment backing this ring buffer. + pub fn segment(&self) -> &Segment<()> { + &self.segment + } + + /// Checks if the `RingBuffer` is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Checks if the `RingBuffer` is full. + pub fn is_full(&self) -> bool { + self.free_len() == 0 + } + + /// Gets the number of items in the `RingBuffer`. + pub fn len(&self) -> usize { + // Implementation notes: This subtraction only makes sense if either the head or the tail + // is considered frozen; if both are volatile, the number of the items may become negative + // due to race conditions. This is always true with a `RingBuffer` or a pair of + // `RbProducer` and `RbConsumer`. + (self.tail() - self.head()).0 + } + + /// Gets the number of free items in the `RingBuffer`. + pub fn free_len(&self) -> usize { + self.capacity - self.len() + } + + /// Gets the head number of the `RingBuffer`. + /// + /// This is the number of items read from the ring buffer. The number wraps when crossing + /// [`usize`] boundaries. + pub fn head(&self) -> Wrapping { + Wrapping(self.head.load(Ordering::Acquire)) + } + + /// Gets the tail number of the `RingBuffer`. + /// + /// This is the number of items written into the ring buffer. The number wraps when crossing + /// [`usize`] boundaries. + pub fn tail(&self) -> Wrapping { + Wrapping(self.tail.load(Ordering::Acquire)) + } + + pub fn advance_tail(&self, mut tail: Wrapping, len: usize) { + tail += len; + self.tail.store(tail.0, Ordering::Release); + } + + pub fn advance_head(&self, mut head: Wrapping, len: usize) { + head += len; + self.head.store(head.0, Ordering::Release); + } + + pub fn reset_head(&self) { + let new_head = self.tail(); + self.head.store(new_head.0, Ordering::Release); + } + + /// Clears the `RingBuffer`. + pub fn clear(&mut self) { + self.tail.store(0, Ordering::Release); + self.head.store(0, Ordering::Release); + } +} + +impl RingBuffer { + /// Pushes an item to the `RingBuffer`. + /// + /// Returns `Some` on success. Returns `None` if + /// the ring buffer is full. + pub fn push(&mut self, item: T) -> Option<()> { + let mut producer = Producer { + rb: self, + phantom: PhantomData, + }; + producer.push(item) + } + + /// Pushes a slice of items to the `RingBuffer`. + /// + /// Returns `Some` on success, all items are pushed to the ring buffer. + /// Returns `None` if the ring buffer is full or cannot fit all items. + pub fn push_slice(&mut self, items: &[T]) -> Option<()> { + let mut producer = Producer { + rb: self, + phantom: PhantomData, + }; + producer.push_slice(items) + } + + /// Pops an item from the `RingBuffer`. + /// + /// Returns `Some` with the popped item on success. + /// Returns `None` if the ring buffer is empty. + pub fn pop(&mut self) -> Option { + let mut consumer = Consumer { + rb: self, + phantom: PhantomData, + }; + consumer.pop() + } + + /// Pops a slice of items from the `RingBuffer`. + /// + /// Returns `Some` on success, all items are popped from the ring buffer. + /// Returns `None` if the ring buffer is empty or cannot fill all items. + pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> { + let mut consumer = Consumer { + rb: self, + phantom: PhantomData, + }; + consumer.pop_slice(items) + } +} + +impl>> Producer { + const T_SIZE: usize = size_of::(); + + /// Pushes an item to the `RingBuffer`. + /// + /// Returns `Some` on success. Returns `None` if + /// the ring buffer is full. + pub fn push(&mut self, item: T) -> Option<()> { + let rb = &self.rb; + if rb.is_full() { + return None; + } + + let tail = rb.tail(); + let offset = tail.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; + + let mut writer = rb.segment.writer(); + writer.skip(byte_offset); + writer.write_val(&item).unwrap(); + + rb.advance_tail(tail, 1); + Some(()) + } + + /// Pushes a slice of items to the `RingBuffer`. + /// + /// Returns `Some` on success, all items are pushed to the ring buffer. + /// Returns `None` if the ring buffer is full or cannot fit all items. + pub fn push_slice(&mut self, items: &[T]) -> Option<()> { + let rb = &self.rb; + let nitems = items.len(); + if rb.free_len() < nitems { + return None; + } + + let tail = rb.tail(); + let offset = tail.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; + + if offset + nitems > rb.capacity { + // Write into two separate parts + rb.segment + .write_slice(byte_offset, &items[..rb.capacity - offset]) + .unwrap(); + rb.segment + .write_slice(0, &items[rb.capacity - offset..]) + .unwrap(); + } else { + rb.segment.write_slice(byte_offset, items).unwrap(); + } + + rb.advance_tail(tail, nitems); + Some(()) + } + + // There is no counterpart to `Consumer::skip` and `Consumer::clear`. They do not make sense + // for the producer. +} + +#[inherit_methods(from = "self.rb")] +impl>> Producer { + pub fn capacity(&self) -> usize; + pub fn is_empty(&self) -> bool; + pub fn is_full(&self) -> bool; + pub fn len(&self) -> usize; + pub fn free_len(&self) -> usize; + pub fn head(&self) -> Wrapping; + pub fn tail(&self) -> Wrapping; + pub fn segment(&self) -> &Segment<()>; + pub fn advance_tail(&self, tail: Wrapping, len: usize); + pub fn advance_head(&self, head: Wrapping, len: usize); + pub fn reset_head(&self); +} + +impl>> Consumer { + const T_SIZE: usize = size_of::(); + + /// Pops an item from the `RingBuffer`. + /// + /// Returns `Some` with the popped item on success. + /// Returns `None` if the ring buffer is empty. + pub fn pop(&mut self) -> Option { + let rb = &self.rb; + if rb.is_empty() { + return None; + } + + let head = rb.head(); + let offset = head.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; + + let mut reader = rb.segment.reader(); + reader.skip(byte_offset); + let item = reader.read_val::().unwrap(); + + rb.advance_head(head, 1); + Some(item) + } + + /// Pops a slice of items from the `RingBuffer`. + /// + /// Returns `Some` on success, all items are popped from the ring buffer. + /// Returns `None` if the ring buffer is empty or cannot fill all items. + pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> { + let rb = &self.rb; + let nitems = items.len(); + if rb.len() < nitems { + return None; + } + + let head = rb.head(); + let offset = head.0 & (rb.capacity - 1); + let byte_offset = offset * Self::T_SIZE; + + if offset + nitems > rb.capacity { + // Read from two separate parts + rb.segment + .read_slice(byte_offset, &mut items[..rb.capacity - offset]) + .unwrap(); + rb.segment + .read_slice(0, &mut items[rb.capacity - offset..]) + .unwrap(); + } else { + rb.segment.read_slice(byte_offset, items).unwrap(); + } + + rb.advance_head(head, nitems); + Some(()) + } + + /// Skips `count` items in the `RingBuffer`. + /// + /// In other words, `count` items are popped from the `RingBuffer` and discarded. + /// + /// # Panics + /// + /// This method will panic if the number of the available items to pop is less than `count`. + pub fn skip(&mut self, count: usize) { + let rb = &self.rb; + let len = rb.len(); + assert!(len >= count); + + let head = rb.head(); + rb.advance_head(head, count); + } + + /// Clears the `RingBuffer`. + /// + /// In other words, all items are popped from the `RingBuffer` and discarded. + pub fn clear(&mut self) { + self.rb.reset_head(); + } +} + +#[inherit_methods(from = "self.rb")] +impl>> Consumer { + pub fn capacity(&self) -> usize; + pub fn is_empty(&self) -> bool; + pub fn is_full(&self) -> bool; + pub fn len(&self) -> usize; + pub fn free_len(&self) -> usize; + pub fn head(&self) -> Wrapping; + pub fn tail(&self) -> Wrapping; + pub fn segment(&self) -> &Segment<()>; + pub fn advance_tail(&self, tail: Wrapping, len: usize); + pub fn advance_head(&self, head: Wrapping, len: usize); + pub fn reset_head(&self); +} + diff --git a/kernel/src/prelude.rs b/kernel/src/prelude.rs index 0bdc38721..387abaa6b 100644 --- a/kernel/src/prelude.rs +++ b/kernel/src/prelude.rs @@ -21,6 +21,7 @@ pub(crate) use ostd::{ mm::{FallibleVmRead, FallibleVmWrite, PAGE_SIZE, Vaddr, VmReader, VmWriter}, sync::{Mutex, MutexGuard, RwLock, RwMutex, SpinLock, SpinLockGuard}, }; +pub(crate) use crate::util::ring_buffer::{ConsumerU8Ext, ProducerU8Ext, RingBufferU8Ext}; /// return current process #[macro_export] diff --git a/kernel/src/util/ring_buffer.rs b/kernel/src/util/ring_buffer.rs index 186224453..677818225 100644 --- a/kernel/src/util/ring_buffer.rs +++ b/kernel/src/util/ring_buffer.rs @@ -1,511 +1,148 @@ // SPDX-License-Identifier: MPL-2.0 -use core::{ - marker::PhantomData, - num::Wrapping, - ops::Deref, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use inherit_methods_macro::inherit_methods; -use ostd::mm::{FrameAllocOptions, Segment, VmIo, io_util::HasVmReaderWriter}; +use core::ops::Deref; use super::{MultiRead, MultiWrite}; use crate::prelude::*; +use ostd::mm::io_util::HasVmReaderWriter; -/// A lock-free SPSC FIFO ring buffer backed by a [`Segment<()>`]. -/// -/// The ring buffer supports `push`/`pop` any `T: Pod` items, also -/// supports `write`/`read` any bytes data based on [`VmReader`]/[`VmWriter`]. -/// -/// The ring buffer returns immediately after processing without any blocking. -/// The ring buffer can be shared between threads. -/// -/// # Example -/// -/// ``` -/// use ostd_pod::Pod; -/// use ring_buffer::RingBuffer; -/// -/// #[derive(Pod)] -/// struct Item { -/// a: u32, -/// b: u32, -/// } -/// -/// let rb = RingBuffer::::new(10); -/// let (producer, consumer) = rb.split(); -/// -/// for i in 0..10 { -/// producer.push(Item { a: i, b: i }).unwrap(); -/// } -/// -/// for _ in 0..10 { -/// let item = consumer.pop().unwrap(); -/// assert_eq!(item.a, item.b); -/// } -/// ``` -pub struct RingBuffer { - segment: Segment<()>, - capacity: usize, - tail: AtomicUsize, - head: AtomicUsize, - phantom: PhantomData, +pub use ring_buffer::{Consumer, Producer, RingBuffer, RbConsumer, RbProducer}; + +pub trait RingBufferU8Ext { + fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result; } -/// A producer of a [`RingBuffer`]. -pub struct Producer>> { - rb: R, - phantom: PhantomData, -} -/// A consumer of a [`RingBuffer`]. -pub struct Consumer>> { - rb: R, - phantom: PhantomData, -} +impl RingBufferU8Ext for RingBuffer { + fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result { + let len = self.len(); -pub type RbProducer = Producer>>; -pub type RbConsumer = Consumer>>; + let head = self.head(); + let offset = head.0 & (self.capacity() - 1); -impl RingBuffer { - const T_SIZE: usize = size_of::(); - - /// Creates a new [`RingBuffer`] with the given capacity. - pub fn new(capacity: usize) -> Self { - assert!( - capacity.is_power_of_two(), - "capacity must be a power of two" - ); - - let nframes = capacity - .checked_mul(Self::T_SIZE) - .unwrap() - .div_ceil(PAGE_SIZE); - let segment = FrameAllocOptions::new() - .zeroed(false) - .alloc_segment(nframes) - .unwrap(); - - Self { - segment, - capacity, - tail: AtomicUsize::new(0), - head: AtomicUsize::new(0), - phantom: PhantomData, - } - } - - /// Splits the [`RingBuffer`] into a producer and a consumer. - pub fn split(self) -> (RbProducer, RbConsumer) { - let producer = Producer { - rb: Arc::new(self), - phantom: PhantomData, - }; - let consumer = Consumer { - rb: Arc::clone(&producer.rb), - phantom: PhantomData, - }; - (producer, consumer) - } - - /// Gets the capacity of the `RingBuffer`. - pub fn capacity(&self) -> usize { - self.capacity - } - - /// Checks if the `RingBuffer` is empty. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Checks if the `RingBuffer` is full. - pub fn is_full(&self) -> bool { - self.free_len() == 0 - } - - /// Gets the number of items in the `RingBuffer`. - pub fn len(&self) -> usize { - // Implementation notes: This subtraction only makes sense if either the head or the tail - // is considered frozen; if both are volatile, the number of the items may become negative - // due to race conditions. This is always true with a `RingBuffer` or a pair of - // `RbProducer` and `RbConsumer`. - (self.tail() - self.head()).0 - } - - /// Gets the number of free items in the `RingBuffer`. - pub fn free_len(&self) -> usize { - self.capacity - self.len() - } - - /// Gets the head number of the `RingBuffer`. - /// - /// This is the number of items read from the ring buffer. The number wraps when crossing - /// [`usize`] boundaries. - pub fn head(&self) -> Wrapping { - Wrapping(self.head.load(Ordering::Acquire)) - } - - /// Gets the tail number of the `RingBuffer`. - /// - /// This is the number of items written into the ring buffer. The number wraps when crossing - /// [`usize`] boundaries. - pub fn tail(&self) -> Wrapping { - Wrapping(self.tail.load(Ordering::Acquire)) - } - - /// Clears the `RingBuffer`. - pub fn clear(&mut self) { - self.tail.store(0, Ordering::Release); - self.head.store(0, Ordering::Release); - } -} - -impl RingBuffer { - /// Pushes an item to the `RingBuffer`. - /// - /// Returns `Some` on success. Returns `None` if - /// the ring buffer is full. - pub fn push(&mut self, item: T) -> Option<()> { - let mut producer = Producer { - rb: self, - phantom: PhantomData, - }; - producer.push(item) - } - - /// Pushes a slice of items to the `RingBuffer`. - /// - /// Returns `Some` on success, all items are pushed to the ring buffer. - /// Returns `None` if the ring buffer is full or cannot fit all items. - pub fn push_slice(&mut self, items: &[T]) -> Option<()> { - let mut producer = Producer { - rb: self, - phantom: PhantomData, - }; - producer.push_slice(items) - } - - /// Pops an item from the `RingBuffer`. - /// - /// Returns `Some` with the popped item on success. - /// Returns `None` if the ring buffer is empty. - pub fn pop(&mut self) -> Option { - let mut consumer = Consumer { - rb: self, - phantom: PhantomData, - }; - consumer.pop() - } - - /// Pops a slice of items from the `RingBuffer`. - /// - /// Returns `Some` on success, all items are popped from the ring buffer. - /// Returns `None` if the ring buffer is empty or cannot fill all items. - pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> { - let mut consumer = Consumer { - rb: self, - phantom: PhantomData, - }; - consumer.pop_slice(items) - } - - pub(self) fn advance_tail(&self, mut tail: Wrapping, len: usize) { - tail += len; - self.tail.store(tail.0, Ordering::Release); - } - - pub(self) fn advance_head(&self, mut head: Wrapping, len: usize) { - head += len; - self.head.store(head.0, Ordering::Release); - } - - pub(self) fn reset_head(&self) { - let new_head = self.tail(); - self.head.store(new_head.0, Ordering::Release); - } -} - -impl RingBuffer { - /// Writes data from the `reader` to the `RingBuffer`. - /// - /// Returns the number of bytes written. - #[expect(unused)] - pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result { - let mut producer = Producer { - rb: self, - phantom: PhantomData, - }; - producer.write_fallible(reader) - } - - /// Reads data from the `writer` to the `RingBuffer`. - /// - /// Returns the number of bytes read. - pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result { - let mut consumer = Consumer { - rb: self, - phantom: PhantomData, - }; - consumer.read_fallible(writer) - } -} - -impl>> Producer { - const T_SIZE: usize = size_of::(); - - /// Pushes an item to the `RingBuffer`. - /// - /// Returns `Some` on success. Returns `None` if - /// the ring buffer is full. - pub fn push(&mut self, item: T) -> Option<()> { - let rb = &self.rb; - if rb.is_full() { - return None; - } - - let tail = rb.tail(); - let offset = tail.0 & (rb.capacity - 1); - let byte_offset = offset * Self::T_SIZE; - - let mut writer = rb.segment.writer(); - writer.skip(byte_offset); - writer.write_val(&item).unwrap(); - - rb.advance_tail(tail, 1); - Some(()) - } - - /// Pushes a slice of items to the `RingBuffer`. - /// - /// Returns `Some` on success, all items are pushed to the ring buffer. - /// Returns `None` if the ring buffer is full or cannot fit all items. - pub fn push_slice(&mut self, items: &[T]) -> Option<()> { - let rb = &self.rb; - let nitems = items.len(); - if rb.free_len() < nitems { - return None; - } - - let tail = rb.tail(); - let offset = tail.0 & (rb.capacity - 1); - let byte_offset = offset * Self::T_SIZE; - - if offset + nitems > rb.capacity { - // Write into two separate parts - rb.segment - .write_slice(byte_offset, &items[..rb.capacity - offset]) - .unwrap(); - rb.segment - .write_slice(0, &items[rb.capacity - offset..]) - .unwrap(); - } else { - rb.segment.write_slice(byte_offset, items).unwrap(); - } - - rb.advance_tail(tail, nitems); - Some(()) - } - - // There is no counterpart to `Consumer::skip` and `Consumer::clear`. They do not make sense - // for the producer. -} - -impl>> Producer { - /// Writes data from the `VmReader` to the `RingBuffer`. - /// - /// Returns the number of bytes written. - pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result { - self.write_fallible_with_max_len(reader, usize::MAX) - } - - /// Writes data from the `VmReader` to the `RingBuffer` with the maximum length. - /// - /// Returns the number of bytes written. - pub fn write_fallible_with_max_len( - &mut self, - reader: &mut dyn MultiRead, - max_len: usize, - ) -> Result { - let rb = &self.rb; - let free_len = rb.free_len().min(max_len); - - let tail = rb.tail(); - let offset = tail.0 & (rb.capacity - 1); - - let write_len = if offset + free_len > rb.capacity { - // Write into two separate parts - let mut write_len = 0; - - let mut writer = rb.segment.writer(); - writer.skip(offset).limit(rb.capacity - offset); - write_len += reader.read(&mut writer)?; - - let mut writer = rb.segment.writer(); - writer.limit(free_len - (rb.capacity - offset)); - write_len += reader.read(&mut writer)?; - - write_len - } else { - let mut writer = rb.segment.writer(); - writer.skip(offset).limit(free_len); - reader.read(&mut writer)? - }; - - rb.advance_tail(tail, write_len); - Ok(write_len) - } -} - -#[inherit_methods(from = "self.rb")] -impl>> Producer { - pub fn capacity(&self) -> usize; - pub fn is_empty(&self) -> bool; - pub fn is_full(&self) -> bool; - pub fn len(&self) -> usize; - pub fn free_len(&self) -> usize; - pub fn head(&self) -> Wrapping; - pub fn tail(&self) -> Wrapping; -} - -impl>> Consumer { - const T_SIZE: usize = size_of::(); - - /// Pops an item from the `RingBuffer`. - /// - /// Returns `Some` with the popped item on success. - /// Returns `None` if the ring buffer is empty. - pub fn pop(&mut self) -> Option { - let rb = &self.rb; - if rb.is_empty() { - return None; - } - - let head = rb.head(); - let offset = head.0 & (rb.capacity - 1); - let byte_offset = offset * Self::T_SIZE; - - let mut reader = rb.segment.reader(); - reader.skip(byte_offset); - let item = reader.read_val::().unwrap(); - - rb.advance_head(head, 1); - Some(item) - } - - /// Pops a slice of items from the `RingBuffer`. - /// - /// Returns `Some` on success, all items are popped from the ring buffer. - /// Returns `None` if the ring buffer is empty or cannot fill all items. - pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> { - let rb = &self.rb; - let nitems = items.len(); - if rb.len() < nitems { - return None; - } - - let head = rb.head(); - let offset = head.0 & (rb.capacity - 1); - let byte_offset = offset * Self::T_SIZE; - - if offset + nitems > rb.capacity { - // Read from two separate parts - rb.segment - .read_slice(byte_offset, &mut items[..rb.capacity - offset]) - .unwrap(); - rb.segment - .read_slice(0, &mut items[rb.capacity - offset..]) - .unwrap(); - } else { - rb.segment.read_slice(byte_offset, items).unwrap(); - } - - rb.advance_head(head, nitems); - Some(()) - } - - /// Skips `count` items in the `RingBuffer`. - /// - /// In other words, `count` items are popped from the `RingBuffer` and discarded. - /// - /// # Panics - /// - /// This method will panic if the number of the available items to pop is less than `count`. - pub fn skip(&mut self, count: usize) { - let rb = &self.rb; - let len = rb.len(); - assert!(len >= count); - - let head = rb.head(); - rb.advance_head(head, count); - } - - /// Clears the `RingBuffer`. - /// - /// In other words, all items are popped from the `RingBuffer` and discarded. - pub fn clear(&mut self) { - self.rb.reset_head(); - } -} - -impl>> Consumer { - /// Reads data from the `VmWriter` to the `RingBuffer`. - /// - /// Returns the number of bytes read. - pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result { - self.read_fallible_with_max_len(writer, usize::MAX) - } - - /// Reads data from the `VmWriter` to the `RingBuffer` with the maximum length. - /// - /// Returns the number of bytes read. - pub fn read_fallible_with_max_len( - &mut self, - writer: &mut dyn MultiWrite, - max_len: usize, - ) -> Result { - let rb = &self.rb; - let len = rb.len().min(max_len); - - let head = rb.head(); - let offset = head.0 & (rb.capacity - 1); - - let read_len = if offset + len > rb.capacity { + let read_len = if offset + len > self.capacity() { // Read from two separate parts let mut read_len = 0; - let mut reader = rb.segment.reader(); - reader.skip(offset).limit(rb.capacity - offset); + let mut reader = self.segment().reader(); + reader.skip(offset).limit(self.capacity() - offset); read_len += writer.write(&mut reader)?; - let mut reader = rb.segment.reader(); - reader.limit(len - (rb.capacity - offset)); + let mut reader = self.segment().reader(); + reader.limit(len - (self.capacity() - offset)); read_len += writer.write(&mut reader)?; read_len } else { - let mut reader = rb.segment.reader(); + let mut reader = self.segment().reader(); reader.skip(offset).limit(len); writer.write(&mut reader)? }; - rb.advance_head(head, read_len); + self.advance_head(head, read_len); Ok(read_len) } } -#[inherit_methods(from = "self.rb")] -impl>> Consumer { - pub fn capacity(&self) -> usize; - pub fn is_empty(&self) -> bool; - pub fn is_full(&self) -> bool; - pub fn len(&self) -> usize; - pub fn free_len(&self) -> usize; - pub fn head(&self) -> Wrapping; - pub fn tail(&self) -> Wrapping; +pub trait ProducerU8Ext { + fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result; + fn write_fallible_with_max_len( + &mut self, + reader: &mut dyn MultiRead, + max_len: usize, + ) -> Result; +} + +impl>> ProducerU8Ext for Producer { + fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result { + self.write_fallible_with_max_len(reader, usize::MAX) + } + + fn write_fallible_with_max_len( + &mut self, + reader: &mut dyn MultiRead, + max_len: usize, + ) -> Result { + let free_len = self.free_len().min(max_len); + + let tail = self.tail(); + let offset = tail.0 & (self.capacity() - 1); + + let write_len = if offset + free_len > self.capacity() { + // Write into two separate parts + let mut write_len = 0; + + let mut writer = self.segment().writer(); + writer.skip(offset).limit(self.capacity() - offset); + write_len += reader.read(&mut writer)?; + + let mut writer = self.segment().writer(); + writer.limit(free_len - (self.capacity() - offset)); + write_len += reader.read(&mut writer)?; + + write_len + } else { + let mut writer = self.segment().writer(); + writer.skip(offset).limit(free_len); + reader.read(&mut writer)? + }; + + self.advance_tail(tail, write_len); + Ok(write_len) + } +} + +pub trait ConsumerU8Ext { + fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result; + fn read_fallible_with_max_len( + &mut self, + writer: &mut dyn MultiWrite, + max_len: usize, + ) -> Result; +} + +impl>> ConsumerU8Ext for Consumer { + fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result { + self.read_fallible_with_max_len(writer, usize::MAX) + } + + fn read_fallible_with_max_len( + &mut self, + writer: &mut dyn MultiWrite, + max_len: usize, + ) -> Result { + let len = self.len().min(max_len); + + let head = self.head(); + let offset = head.0 & (self.capacity() - 1); + + let read_len = if offset + len > self.capacity() { + // Read from two separate parts + let mut read_len = 0; + + let mut reader = self.segment().reader(); + reader.skip(offset).limit(self.capacity() - offset); + read_len += writer.write(&mut reader)?; + + let mut reader = self.segment().reader(); + reader.limit(len - (self.capacity() - offset)); + read_len += writer.write(&mut reader)?; + + read_len + } else { + let mut reader = self.segment().reader(); + reader.skip(offset).limit(len); + writer.write(&mut reader)? + }; + + self.advance_head(head, read_len); + Ok(read_len) + } } #[cfg(ktest)] mod test { + use alloc::vec; + use ostd::mm::{VmReader, VmWriter, PAGE_SIZE}; use ostd::prelude::*; use super::*;