Extract core ring-buffer crate + kernel shim u8 helpers + remove rb() exposure

This commit is contained in:
TankTechnology 2025-12-30 15:07:12 +08:00
parent 8ace76978e
commit 0c3a2c8b28
7 changed files with 543 additions and 477 deletions

9
Cargo.lock generated
View File

@ -214,6 +214,7 @@ dependencies = [
"ostd",
"paste",
"rand",
"ring-buffer",
"riscv",
"spin",
"takeable",
@ -1546,6 +1547,14 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "ring-buffer"
version = "0.1.0"
dependencies = [
"inherit-methods-macro",
"ostd",
]
[[package]]
name = "riscv"
version = "0.15.0"

View File

@ -48,6 +48,7 @@ members = [
"kernel/libs/typeflags",
"kernel/libs/typeflags-util",
"kernel/libs/atomic-integer-wrapper",
"kernel/libs/ring-buffer",
"kernel/libs/xarray",
]
exclude = [

View File

@ -54,6 +54,7 @@ rand = { workspace = true, features = [
"small_rng",
"std_rng",
] }
ring-buffer = { path = "libs/ring-buffer" }
spin.workspace = true
takeable.workspace = true
time.workspace = true

View File

@ -0,0 +1,11 @@
[package]
name = "ring-buffer"
version = "0.1.0"
edition = "2024"
[dependencies]
inherit-methods-macro = { git = "https://github.com/asterinas/inherit-methods-macro", rev = "98f7e3e" }
ostd = { path = "../../../ostd" }
[lints]
workspace = true

View File

@ -0,0 +1,406 @@
#![no_std]
extern crate alloc;
use alloc::sync::Arc;
use core::{
marker::PhantomData,
mem::size_of,
num::Wrapping,
ops::Deref,
sync::atomic::{AtomicUsize, Ordering},
};
use inherit_methods_macro::inherit_methods;
use ostd::{
Pod,
mm::{FrameAllocOptions, Segment, VmIo, PAGE_SIZE, io_util::HasVmReaderWriter},
};
/// A lock-free SPSC FIFO ring buffer backed by a [`Segment<()>`].
///
/// The ring buffer supports `push`/`pop` any `T: Pod` items, also
/// supports `write`/`read` any bytes data based on [`VmReader`]/[`VmWriter`].
///
/// The ring buffer returns immediately after processing without any blocking.
/// The ring buffer can be shared between threads.
///
/// # Example
///
/// ```
/// use ostd::Pod;
/// use ring_buffer::RingBuffer;
///
/// #[derive(Pod)]
/// struct Item {
/// a: u32,
/// b: u32,
/// }
///
/// let rb = RingBuffer::<Item>::new(10);
/// let (producer, consumer) = rb.split();
///
/// for i in 0..10 {
/// producer.push(Item { a: i, b: i }).unwrap();
/// }
///
/// for _ in 0..10 {
/// let item = consumer.pop().unwrap();
/// assert_eq!(item.a, item.b);
/// }
/// ```
pub struct RingBuffer<T> {
segment: Segment<()>,
capacity: usize,
tail: AtomicUsize,
head: AtomicUsize,
phantom: PhantomData<T>,
}
/// A producer of a [`RingBuffer`].
pub struct Producer<T, R: Deref<Target = RingBuffer<T>>> {
rb: R,
phantom: PhantomData<T>,
}
/// A consumer of a [`RingBuffer`].
pub struct Consumer<T, R: Deref<Target = RingBuffer<T>>> {
rb: R,
phantom: PhantomData<T>,
}
pub type RbProducer<T> = Producer<T, Arc<RingBuffer<T>>>;
pub type RbConsumer<T> = Consumer<T, Arc<RingBuffer<T>>>;
impl<T> RingBuffer<T> {
const T_SIZE: usize = size_of::<T>();
/// Creates a new [`RingBuffer`] with the given capacity.
pub fn new(capacity: usize) -> Self {
assert!(
capacity.is_power_of_two(),
"capacity must be a power of two"
);
let nframes = capacity
.checked_mul(Self::T_SIZE)
.unwrap()
.div_ceil(PAGE_SIZE);
let segment = FrameAllocOptions::new()
.zeroed(false)
.alloc_segment(nframes)
.unwrap();
Self {
segment,
capacity,
tail: AtomicUsize::new(0),
head: AtomicUsize::new(0),
phantom: PhantomData,
}
}
/// Splits the [`RingBuffer`] into a producer and a consumer.
pub fn split(self) -> (RbProducer<T>, RbConsumer<T>) {
let producer = Producer {
rb: Arc::new(self),
phantom: PhantomData,
};
let consumer = Consumer {
rb: Arc::clone(&producer.rb),
phantom: PhantomData,
};
(producer, consumer)
}
/// Gets the capacity of the `RingBuffer`.
pub fn capacity(&self) -> usize {
self.capacity
}
/// Returns the underlying segment backing this ring buffer.
pub fn segment(&self) -> &Segment<()> {
&self.segment
}
/// Checks if the `RingBuffer` is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Checks if the `RingBuffer` is full.
pub fn is_full(&self) -> bool {
self.free_len() == 0
}
/// Gets the number of items in the `RingBuffer`.
pub fn len(&self) -> usize {
// Implementation notes: This subtraction only makes sense if either the head or the tail
// is considered frozen; if both are volatile, the number of the items may become negative
// due to race conditions. This is always true with a `RingBuffer` or a pair of
// `RbProducer` and `RbConsumer`.
(self.tail() - self.head()).0
}
/// Gets the number of free items in the `RingBuffer`.
pub fn free_len(&self) -> usize {
self.capacity - self.len()
}
/// Gets the head number of the `RingBuffer`.
///
/// This is the number of items read from the ring buffer. The number wraps when crossing
/// [`usize`] boundaries.
pub fn head(&self) -> Wrapping<usize> {
Wrapping(self.head.load(Ordering::Acquire))
}
/// Gets the tail number of the `RingBuffer`.
///
/// This is the number of items written into the ring buffer. The number wraps when crossing
/// [`usize`] boundaries.
pub fn tail(&self) -> Wrapping<usize> {
Wrapping(self.tail.load(Ordering::Acquire))
}
pub fn advance_tail(&self, mut tail: Wrapping<usize>, len: usize) {
tail += len;
self.tail.store(tail.0, Ordering::Release);
}
pub fn advance_head(&self, mut head: Wrapping<usize>, len: usize) {
head += len;
self.head.store(head.0, Ordering::Release);
}
pub fn reset_head(&self) {
let new_head = self.tail();
self.head.store(new_head.0, Ordering::Release);
}
/// Clears the `RingBuffer`.
pub fn clear(&mut self) {
self.tail.store(0, Ordering::Release);
self.head.store(0, Ordering::Release);
}
}
impl<T: Pod> RingBuffer<T> {
/// Pushes an item to the `RingBuffer`.
///
/// Returns `Some` on success. Returns `None` if
/// the ring buffer is full.
pub fn push(&mut self, item: T) -> Option<()> {
let mut producer = Producer {
rb: self,
phantom: PhantomData,
};
producer.push(item)
}
/// Pushes a slice of items to the `RingBuffer`.
///
/// Returns `Some` on success, all items are pushed to the ring buffer.
/// Returns `None` if the ring buffer is full or cannot fit all items.
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
let mut producer = Producer {
rb: self,
phantom: PhantomData,
};
producer.push_slice(items)
}
/// Pops an item from the `RingBuffer`.
///
/// Returns `Some` with the popped item on success.
/// Returns `None` if the ring buffer is empty.
pub fn pop(&mut self) -> Option<T> {
let mut consumer = Consumer {
rb: self,
phantom: PhantomData,
};
consumer.pop()
}
/// Pops a slice of items from the `RingBuffer`.
///
/// Returns `Some` on success, all items are popped from the ring buffer.
/// Returns `None` if the ring buffer is empty or cannot fill all items.
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
let mut consumer = Consumer {
rb: self,
phantom: PhantomData,
};
consumer.pop_slice(items)
}
}
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
const T_SIZE: usize = size_of::<T>();
/// Pushes an item to the `RingBuffer`.
///
/// Returns `Some` on success. Returns `None` if
/// the ring buffer is full.
pub fn push(&mut self, item: T) -> Option<()> {
let rb = &self.rb;
if rb.is_full() {
return None;
}
let tail = rb.tail();
let offset = tail.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
let mut writer = rb.segment.writer();
writer.skip(byte_offset);
writer.write_val(&item).unwrap();
rb.advance_tail(tail, 1);
Some(())
}
/// Pushes a slice of items to the `RingBuffer`.
///
/// Returns `Some` on success, all items are pushed to the ring buffer.
/// Returns `None` if the ring buffer is full or cannot fit all items.
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
let rb = &self.rb;
let nitems = items.len();
if rb.free_len() < nitems {
return None;
}
let tail = rb.tail();
let offset = tail.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
if offset + nitems > rb.capacity {
// Write into two separate parts
rb.segment
.write_slice(byte_offset, &items[..rb.capacity - offset])
.unwrap();
rb.segment
.write_slice(0, &items[rb.capacity - offset..])
.unwrap();
} else {
rb.segment.write_slice(byte_offset, items).unwrap();
}
rb.advance_tail(tail, nitems);
Some(())
}
// There is no counterpart to `Consumer::skip` and `Consumer::clear`. They do not make sense
// for the producer.
}
#[inherit_methods(from = "self.rb")]
impl<T, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
pub fn capacity(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_full(&self) -> bool;
pub fn len(&self) -> usize;
pub fn free_len(&self) -> usize;
pub fn head(&self) -> Wrapping<usize>;
pub fn tail(&self) -> Wrapping<usize>;
pub fn segment(&self) -> &Segment<()>;
pub fn advance_tail(&self, tail: Wrapping<usize>, len: usize);
pub fn advance_head(&self, head: Wrapping<usize>, len: usize);
pub fn reset_head(&self);
}
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
const T_SIZE: usize = size_of::<T>();
/// Pops an item from the `RingBuffer`.
///
/// Returns `Some` with the popped item on success.
/// Returns `None` if the ring buffer is empty.
pub fn pop(&mut self) -> Option<T> {
let rb = &self.rb;
if rb.is_empty() {
return None;
}
let head = rb.head();
let offset = head.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
let mut reader = rb.segment.reader();
reader.skip(byte_offset);
let item = reader.read_val::<T>().unwrap();
rb.advance_head(head, 1);
Some(item)
}
/// Pops a slice of items from the `RingBuffer`.
///
/// Returns `Some` on success, all items are popped from the ring buffer.
/// Returns `None` if the ring buffer is empty or cannot fill all items.
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
let rb = &self.rb;
let nitems = items.len();
if rb.len() < nitems {
return None;
}
let head = rb.head();
let offset = head.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
if offset + nitems > rb.capacity {
// Read from two separate parts
rb.segment
.read_slice(byte_offset, &mut items[..rb.capacity - offset])
.unwrap();
rb.segment
.read_slice(0, &mut items[rb.capacity - offset..])
.unwrap();
} else {
rb.segment.read_slice(byte_offset, items).unwrap();
}
rb.advance_head(head, nitems);
Some(())
}
/// Skips `count` items in the `RingBuffer`.
///
/// In other words, `count` items are popped from the `RingBuffer` and discarded.
///
/// # Panics
///
/// This method will panic if the number of the available items to pop is less than `count`.
pub fn skip(&mut self, count: usize) {
let rb = &self.rb;
let len = rb.len();
assert!(len >= count);
let head = rb.head();
rb.advance_head(head, count);
}
/// Clears the `RingBuffer`.
///
/// In other words, all items are popped from the `RingBuffer` and discarded.
pub fn clear(&mut self) {
self.rb.reset_head();
}
}
#[inherit_methods(from = "self.rb")]
impl<T, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
pub fn capacity(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_full(&self) -> bool;
pub fn len(&self) -> usize;
pub fn free_len(&self) -> usize;
pub fn head(&self) -> Wrapping<usize>;
pub fn tail(&self) -> Wrapping<usize>;
pub fn segment(&self) -> &Segment<()>;
pub fn advance_tail(&self, tail: Wrapping<usize>, len: usize);
pub fn advance_head(&self, head: Wrapping<usize>, len: usize);
pub fn reset_head(&self);
}

View File

@ -21,6 +21,7 @@ pub(crate) use ostd::{
mm::{FallibleVmRead, FallibleVmWrite, PAGE_SIZE, Vaddr, VmReader, VmWriter},
sync::{Mutex, MutexGuard, RwLock, RwMutex, SpinLock, SpinLockGuard},
};
pub(crate) use crate::util::ring_buffer::{ConsumerU8Ext, ProducerU8Ext, RingBufferU8Ext};
/// return current process
#[macro_export]

View File

@ -1,511 +1,148 @@
// SPDX-License-Identifier: MPL-2.0
use core::{
marker::PhantomData,
num::Wrapping,
ops::Deref,
sync::atomic::{AtomicUsize, Ordering},
};
use inherit_methods_macro::inherit_methods;
use ostd::mm::{FrameAllocOptions, Segment, VmIo, io_util::HasVmReaderWriter};
use core::ops::Deref;
use super::{MultiRead, MultiWrite};
use crate::prelude::*;
use ostd::mm::io_util::HasVmReaderWriter;
/// A lock-free SPSC FIFO ring buffer backed by a [`Segment<()>`].
///
/// The ring buffer supports `push`/`pop` any `T: Pod` items, also
/// supports `write`/`read` any bytes data based on [`VmReader`]/[`VmWriter`].
///
/// The ring buffer returns immediately after processing without any blocking.
/// The ring buffer can be shared between threads.
///
/// # Example
///
/// ```
/// use ostd_pod::Pod;
/// use ring_buffer::RingBuffer;
///
/// #[derive(Pod)]
/// struct Item {
/// a: u32,
/// b: u32,
/// }
///
/// let rb = RingBuffer::<Item>::new(10);
/// let (producer, consumer) = rb.split();
///
/// for i in 0..10 {
/// producer.push(Item { a: i, b: i }).unwrap();
/// }
///
/// for _ in 0..10 {
/// let item = consumer.pop().unwrap();
/// assert_eq!(item.a, item.b);
/// }
/// ```
pub struct RingBuffer<T> {
segment: Segment<()>,
capacity: usize,
tail: AtomicUsize,
head: AtomicUsize,
phantom: PhantomData<T>,
pub use ring_buffer::{Consumer, Producer, RingBuffer, RbConsumer, RbProducer};
pub trait RingBufferU8Ext {
fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize>;
}
/// A producer of a [`RingBuffer`].
pub struct Producer<T, R: Deref<Target = RingBuffer<T>>> {
rb: R,
phantom: PhantomData<T>,
}
/// A consumer of a [`RingBuffer`].
pub struct Consumer<T, R: Deref<Target = RingBuffer<T>>> {
rb: R,
phantom: PhantomData<T>,
}
impl RingBufferU8Ext for RingBuffer<u8> {
fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
let len = self.len();
pub type RbProducer<T> = Producer<T, Arc<RingBuffer<T>>>;
pub type RbConsumer<T> = Consumer<T, Arc<RingBuffer<T>>>;
let head = self.head();
let offset = head.0 & (self.capacity() - 1);
impl<T> RingBuffer<T> {
const T_SIZE: usize = size_of::<T>();
/// Creates a new [`RingBuffer`] with the given capacity.
pub fn new(capacity: usize) -> Self {
assert!(
capacity.is_power_of_two(),
"capacity must be a power of two"
);
let nframes = capacity
.checked_mul(Self::T_SIZE)
.unwrap()
.div_ceil(PAGE_SIZE);
let segment = FrameAllocOptions::new()
.zeroed(false)
.alloc_segment(nframes)
.unwrap();
Self {
segment,
capacity,
tail: AtomicUsize::new(0),
head: AtomicUsize::new(0),
phantom: PhantomData,
}
}
/// Splits the [`RingBuffer`] into a producer and a consumer.
pub fn split(self) -> (RbProducer<T>, RbConsumer<T>) {
let producer = Producer {
rb: Arc::new(self),
phantom: PhantomData,
};
let consumer = Consumer {
rb: Arc::clone(&producer.rb),
phantom: PhantomData,
};
(producer, consumer)
}
/// Gets the capacity of the `RingBuffer`.
pub fn capacity(&self) -> usize {
self.capacity
}
/// Checks if the `RingBuffer` is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Checks if the `RingBuffer` is full.
pub fn is_full(&self) -> bool {
self.free_len() == 0
}
/// Gets the number of items in the `RingBuffer`.
pub fn len(&self) -> usize {
// Implementation notes: This subtraction only makes sense if either the head or the tail
// is considered frozen; if both are volatile, the number of the items may become negative
// due to race conditions. This is always true with a `RingBuffer` or a pair of
// `RbProducer` and `RbConsumer`.
(self.tail() - self.head()).0
}
/// Gets the number of free items in the `RingBuffer`.
pub fn free_len(&self) -> usize {
self.capacity - self.len()
}
/// Gets the head number of the `RingBuffer`.
///
/// This is the number of items read from the ring buffer. The number wraps when crossing
/// [`usize`] boundaries.
pub fn head(&self) -> Wrapping<usize> {
Wrapping(self.head.load(Ordering::Acquire))
}
/// Gets the tail number of the `RingBuffer`.
///
/// This is the number of items written into the ring buffer. The number wraps when crossing
/// [`usize`] boundaries.
pub fn tail(&self) -> Wrapping<usize> {
Wrapping(self.tail.load(Ordering::Acquire))
}
/// Clears the `RingBuffer`.
pub fn clear(&mut self) {
self.tail.store(0, Ordering::Release);
self.head.store(0, Ordering::Release);
}
}
impl<T: Pod> RingBuffer<T> {
/// Pushes an item to the `RingBuffer`.
///
/// Returns `Some` on success. Returns `None` if
/// the ring buffer is full.
pub fn push(&mut self, item: T) -> Option<()> {
let mut producer = Producer {
rb: self,
phantom: PhantomData,
};
producer.push(item)
}
/// Pushes a slice of items to the `RingBuffer`.
///
/// Returns `Some` on success, all items are pushed to the ring buffer.
/// Returns `None` if the ring buffer is full or cannot fit all items.
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
let mut producer = Producer {
rb: self,
phantom: PhantomData,
};
producer.push_slice(items)
}
/// Pops an item from the `RingBuffer`.
///
/// Returns `Some` with the popped item on success.
/// Returns `None` if the ring buffer is empty.
pub fn pop(&mut self) -> Option<T> {
let mut consumer = Consumer {
rb: self,
phantom: PhantomData,
};
consumer.pop()
}
/// Pops a slice of items from the `RingBuffer`.
///
/// Returns `Some` on success, all items are popped from the ring buffer.
/// Returns `None` if the ring buffer is empty or cannot fill all items.
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
let mut consumer = Consumer {
rb: self,
phantom: PhantomData,
};
consumer.pop_slice(items)
}
pub(self) fn advance_tail(&self, mut tail: Wrapping<usize>, len: usize) {
tail += len;
self.tail.store(tail.0, Ordering::Release);
}
pub(self) fn advance_head(&self, mut head: Wrapping<usize>, len: usize) {
head += len;
self.head.store(head.0, Ordering::Release);
}
pub(self) fn reset_head(&self) {
let new_head = self.tail();
self.head.store(new_head.0, Ordering::Release);
}
}
impl RingBuffer<u8> {
/// Writes data from the `reader` to the `RingBuffer`.
///
/// Returns the number of bytes written.
#[expect(unused)]
pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize> {
let mut producer = Producer {
rb: self,
phantom: PhantomData,
};
producer.write_fallible(reader)
}
/// Reads data from the `writer` to the `RingBuffer`.
///
/// Returns the number of bytes read.
pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
let mut consumer = Consumer {
rb: self,
phantom: PhantomData,
};
consumer.read_fallible(writer)
}
}
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
const T_SIZE: usize = size_of::<T>();
/// Pushes an item to the `RingBuffer`.
///
/// Returns `Some` on success. Returns `None` if
/// the ring buffer is full.
pub fn push(&mut self, item: T) -> Option<()> {
let rb = &self.rb;
if rb.is_full() {
return None;
}
let tail = rb.tail();
let offset = tail.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
let mut writer = rb.segment.writer();
writer.skip(byte_offset);
writer.write_val(&item).unwrap();
rb.advance_tail(tail, 1);
Some(())
}
/// Pushes a slice of items to the `RingBuffer`.
///
/// Returns `Some` on success, all items are pushed to the ring buffer.
/// Returns `None` if the ring buffer is full or cannot fit all items.
pub fn push_slice(&mut self, items: &[T]) -> Option<()> {
let rb = &self.rb;
let nitems = items.len();
if rb.free_len() < nitems {
return None;
}
let tail = rb.tail();
let offset = tail.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
if offset + nitems > rb.capacity {
// Write into two separate parts
rb.segment
.write_slice(byte_offset, &items[..rb.capacity - offset])
.unwrap();
rb.segment
.write_slice(0, &items[rb.capacity - offset..])
.unwrap();
} else {
rb.segment.write_slice(byte_offset, items).unwrap();
}
rb.advance_tail(tail, nitems);
Some(())
}
// There is no counterpart to `Consumer::skip` and `Consumer::clear`. They do not make sense
// for the producer.
}
impl<R: Deref<Target = RingBuffer<u8>>> Producer<u8, R> {
/// Writes data from the `VmReader` to the `RingBuffer`.
///
/// Returns the number of bytes written.
pub fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize> {
self.write_fallible_with_max_len(reader, usize::MAX)
}
/// Writes data from the `VmReader` to the `RingBuffer` with the maximum length.
///
/// Returns the number of bytes written.
pub fn write_fallible_with_max_len(
&mut self,
reader: &mut dyn MultiRead,
max_len: usize,
) -> Result<usize> {
let rb = &self.rb;
let free_len = rb.free_len().min(max_len);
let tail = rb.tail();
let offset = tail.0 & (rb.capacity - 1);
let write_len = if offset + free_len > rb.capacity {
// Write into two separate parts
let mut write_len = 0;
let mut writer = rb.segment.writer();
writer.skip(offset).limit(rb.capacity - offset);
write_len += reader.read(&mut writer)?;
let mut writer = rb.segment.writer();
writer.limit(free_len - (rb.capacity - offset));
write_len += reader.read(&mut writer)?;
write_len
} else {
let mut writer = rb.segment.writer();
writer.skip(offset).limit(free_len);
reader.read(&mut writer)?
};
rb.advance_tail(tail, write_len);
Ok(write_len)
}
}
#[inherit_methods(from = "self.rb")]
impl<T, R: Deref<Target = RingBuffer<T>>> Producer<T, R> {
pub fn capacity(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_full(&self) -> bool;
pub fn len(&self) -> usize;
pub fn free_len(&self) -> usize;
pub fn head(&self) -> Wrapping<usize>;
pub fn tail(&self) -> Wrapping<usize>;
}
impl<T: Pod, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
const T_SIZE: usize = size_of::<T>();
/// Pops an item from the `RingBuffer`.
///
/// Returns `Some` with the popped item on success.
/// Returns `None` if the ring buffer is empty.
pub fn pop(&mut self) -> Option<T> {
let rb = &self.rb;
if rb.is_empty() {
return None;
}
let head = rb.head();
let offset = head.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
let mut reader = rb.segment.reader();
reader.skip(byte_offset);
let item = reader.read_val::<T>().unwrap();
rb.advance_head(head, 1);
Some(item)
}
/// Pops a slice of items from the `RingBuffer`.
///
/// Returns `Some` on success, all items are popped from the ring buffer.
/// Returns `None` if the ring buffer is empty or cannot fill all items.
pub fn pop_slice(&mut self, items: &mut [T]) -> Option<()> {
let rb = &self.rb;
let nitems = items.len();
if rb.len() < nitems {
return None;
}
let head = rb.head();
let offset = head.0 & (rb.capacity - 1);
let byte_offset = offset * Self::T_SIZE;
if offset + nitems > rb.capacity {
// Read from two separate parts
rb.segment
.read_slice(byte_offset, &mut items[..rb.capacity - offset])
.unwrap();
rb.segment
.read_slice(0, &mut items[rb.capacity - offset..])
.unwrap();
} else {
rb.segment.read_slice(byte_offset, items).unwrap();
}
rb.advance_head(head, nitems);
Some(())
}
/// Skips `count` items in the `RingBuffer`.
///
/// In other words, `count` items are popped from the `RingBuffer` and discarded.
///
/// # Panics
///
/// This method will panic if the number of the available items to pop is less than `count`.
pub fn skip(&mut self, count: usize) {
let rb = &self.rb;
let len = rb.len();
assert!(len >= count);
let head = rb.head();
rb.advance_head(head, count);
}
/// Clears the `RingBuffer`.
///
/// In other words, all items are popped from the `RingBuffer` and discarded.
pub fn clear(&mut self) {
self.rb.reset_head();
}
}
impl<R: Deref<Target = RingBuffer<u8>>> Consumer<u8, R> {
/// Reads data from the `VmWriter` to the `RingBuffer`.
///
/// Returns the number of bytes read.
pub fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
self.read_fallible_with_max_len(writer, usize::MAX)
}
/// Reads data from the `VmWriter` to the `RingBuffer` with the maximum length.
///
/// Returns the number of bytes read.
pub fn read_fallible_with_max_len(
&mut self,
writer: &mut dyn MultiWrite,
max_len: usize,
) -> Result<usize> {
let rb = &self.rb;
let len = rb.len().min(max_len);
let head = rb.head();
let offset = head.0 & (rb.capacity - 1);
let read_len = if offset + len > rb.capacity {
let read_len = if offset + len > self.capacity() {
// Read from two separate parts
let mut read_len = 0;
let mut reader = rb.segment.reader();
reader.skip(offset).limit(rb.capacity - offset);
let mut reader = self.segment().reader();
reader.skip(offset).limit(self.capacity() - offset);
read_len += writer.write(&mut reader)?;
let mut reader = rb.segment.reader();
reader.limit(len - (rb.capacity - offset));
let mut reader = self.segment().reader();
reader.limit(len - (self.capacity() - offset));
read_len += writer.write(&mut reader)?;
read_len
} else {
let mut reader = rb.segment.reader();
let mut reader = self.segment().reader();
reader.skip(offset).limit(len);
writer.write(&mut reader)?
};
rb.advance_head(head, read_len);
self.advance_head(head, read_len);
Ok(read_len)
}
}
#[inherit_methods(from = "self.rb")]
impl<T, R: Deref<Target = RingBuffer<T>>> Consumer<T, R> {
pub fn capacity(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_full(&self) -> bool;
pub fn len(&self) -> usize;
pub fn free_len(&self) -> usize;
pub fn head(&self) -> Wrapping<usize>;
pub fn tail(&self) -> Wrapping<usize>;
pub trait ProducerU8Ext {
fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize>;
fn write_fallible_with_max_len(
&mut self,
reader: &mut dyn MultiRead,
max_len: usize,
) -> Result<usize>;
}
impl<R: Deref<Target = RingBuffer<u8>>> ProducerU8Ext for Producer<u8, R> {
fn write_fallible(&mut self, reader: &mut dyn MultiRead) -> Result<usize> {
self.write_fallible_with_max_len(reader, usize::MAX)
}
fn write_fallible_with_max_len(
&mut self,
reader: &mut dyn MultiRead,
max_len: usize,
) -> Result<usize> {
let free_len = self.free_len().min(max_len);
let tail = self.tail();
let offset = tail.0 & (self.capacity() - 1);
let write_len = if offset + free_len > self.capacity() {
// Write into two separate parts
let mut write_len = 0;
let mut writer = self.segment().writer();
writer.skip(offset).limit(self.capacity() - offset);
write_len += reader.read(&mut writer)?;
let mut writer = self.segment().writer();
writer.limit(free_len - (self.capacity() - offset));
write_len += reader.read(&mut writer)?;
write_len
} else {
let mut writer = self.segment().writer();
writer.skip(offset).limit(free_len);
reader.read(&mut writer)?
};
self.advance_tail(tail, write_len);
Ok(write_len)
}
}
pub trait ConsumerU8Ext {
fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize>;
fn read_fallible_with_max_len(
&mut self,
writer: &mut dyn MultiWrite,
max_len: usize,
) -> Result<usize>;
}
impl<R: Deref<Target = RingBuffer<u8>>> ConsumerU8Ext for Consumer<u8, R> {
fn read_fallible(&mut self, writer: &mut dyn MultiWrite) -> Result<usize> {
self.read_fallible_with_max_len(writer, usize::MAX)
}
fn read_fallible_with_max_len(
&mut self,
writer: &mut dyn MultiWrite,
max_len: usize,
) -> Result<usize> {
let len = self.len().min(max_len);
let head = self.head();
let offset = head.0 & (self.capacity() - 1);
let read_len = if offset + len > self.capacity() {
// Read from two separate parts
let mut read_len = 0;
let mut reader = self.segment().reader();
reader.skip(offset).limit(self.capacity() - offset);
read_len += writer.write(&mut reader)?;
let mut reader = self.segment().reader();
reader.limit(len - (self.capacity() - offset));
read_len += writer.write(&mut reader)?;
read_len
} else {
let mut reader = self.segment().reader();
reader.skip(offset).limit(len);
writer.write(&mut reader)?
};
self.advance_head(head, read_len);
Ok(read_len)
}
}
#[cfg(ktest)]
mod test {
use alloc::vec;
use ostd::mm::{VmReader, VmWriter, PAGE_SIZE};
use ostd::prelude::*;
use super::*;