Add the bottom half mechanism of workqueue

This commit is contained in:
Chuandong Li 2023-10-08 16:02:24 +00:00 committed by Tate, Hongliang Tian
parent e2b4302620
commit 7419f6b56b
16 changed files with 766 additions and 15 deletions

1
Cargo.lock generated
View File

@ -681,6 +681,7 @@ dependencies = [
"align_ext",
"aml",
"bitflags 1.3.2",
"bitvec",
"buddy_system_allocator",
"cfg-if",
"inherit-methods-macro",

View File

@ -19,6 +19,7 @@ lazy_static = { version = "1.0", features = ["spin_no_std"] }
trapframe = { git = "https://github.com/sdww0/trapframe-rs", rev = "e886763" }
inherit-methods-macro = { git = "https://github.com/jinzhao-dev/inherit-methods-macro", rev = "98f7e3e" }
tdx-guest = { path = "../libs/tdx-guest", optional = true }
bitvec = { version = "1.0", default-features = false, features = ["alloc"] }
[target.x86_64-custom.dependencies]
x86_64 = "0.14.2"

View File

@ -3,10 +3,13 @@
use core::arch::x86_64::{_fxrstor, _fxsave};
use core::fmt::Debug;
use alloc::vec::Vec;
use bitvec::{prelude::Lsb0, slice::IterOnes};
use trapframe::{GeneralRegs, UserContext as RawUserContext};
#[cfg(feature = "intel_tdx")]
use crate::arch::tdx_guest::{handle_virtual_exception, TdxTrapFrame};
use bitvec::prelude::BitVec;
use log::debug;
#[cfg(feature = "intel_tdx")]
use tdx_guest::tdcall;
@ -23,7 +26,65 @@ pub fn num_cpus() -> u32 {
/// Returns the ID of this CPU.
pub fn this_cpu() -> u32 {
todo!()
// FIXME: we only start one cpu now.
0
}
#[derive(Default)]
pub struct CpuSet {
bitset: BitVec,
}
impl CpuSet {
pub fn new_full() -> Self {
let num_cpus = num_cpus();
let mut bitset = BitVec::with_capacity(num_cpus as usize);
bitset.resize(num_cpus as usize, true);
Self { bitset }
}
pub fn new_empty() -> Self {
let num_cpus = num_cpus();
let mut bitset = BitVec::with_capacity(num_cpus as usize);
bitset.resize(num_cpus as usize, false);
Self { bitset }
}
pub fn add(&mut self, cpu_id: u32) {
self.bitset.set(cpu_id as usize, true);
}
pub fn add_from_vec(&mut self, cpu_ids: Vec<u32>) {
for cpu_id in cpu_ids {
self.add(cpu_id)
}
}
pub fn add_all(&mut self) {
self.bitset.fill(true);
}
pub fn remove(&mut self, cpu_id: u32) {
self.bitset.set(cpu_id as usize, false);
}
pub fn remove_from_vec(&mut self, cpu_ids: Vec<u32>) {
for cpu_id in cpu_ids {
self.remove(cpu_id);
}
}
pub fn clear(&mut self) {
self.bitset.fill(false);
}
pub fn contains(&self, cpu_id: u32) -> bool {
self.bitset.get(cpu_id as usize).as_deref() == Some(&true)
}
pub fn iter(&self) -> IterOnes<'_, usize, Lsb0> {
self.bitset.iter_ones()
}
}
/// Cpu context, including both general-purpose registers and floating-point registers.

View File

@ -18,7 +18,7 @@ pub struct RcuMonitor {
}
impl RcuMonitor {
pub fn new(num_cpus: u32) -> Self {
pub fn new(num_cpus: usize) -> Self {
Self {
is_monitoring: AtomicBool::new(false),
state: SpinLock::new(State::new(num_cpus)),
@ -89,7 +89,7 @@ struct State {
}
impl State {
pub fn new(num_cpus: u32) -> Self {
pub fn new(num_cpus: usize) -> Self {
Self {
current_gp: GracePeriod::new(num_cpus),
next_callbacks: VecDeque::new(),
@ -106,10 +106,10 @@ struct GracePeriod {
}
impl GracePeriod {
pub fn new(num_cpus: u32) -> Self {
pub fn new(num_cpus: usize) -> Self {
Self {
callbacks: Default::default(),
cpu_mask: AtomicBits::new_zeroes(num_cpus as usize),
cpu_mask: AtomicBits::new_zeroes(num_cpus),
is_complete: false,
}
}

View File

@ -69,6 +69,10 @@ impl WaitQueue {
}
}
pub fn is_empty(&self) -> bool {
self.waiters.lock_irq_disabled().is_empty()
}
// Enqueue a waiter into current waitqueue. If waiter is exclusive, add to the back of waitqueue.
// Otherwise, add to the front of waitqueue
fn enqueue(&self, waiter: &Arc<Waiter>) {

View File

@ -6,6 +6,7 @@ mod scheduler;
#[allow(clippy::module_inception)]
mod task;
pub use self::priority::Priority;
pub use self::processor::{current_task, disable_preempt, preempt, schedule, DisablePreemptGuard};
pub use self::scheduler::{add_task, set_scheduler, Scheduler};
pub use self::task::{Task, TaskAdapter, TaskOptions, TaskStatus};

View File

@ -87,7 +87,7 @@ pub fn preempt() {
/// if current task status is exit, then it will not add to the scheduler
///
/// before context switch, current task will switch to the next task
pub fn switch_to_task(next_task: Arc<Task>) {
fn switch_to_task(next_task: Arc<Task>) {
if !PREEMPT_COUNT.is_preemptive() {
panic!(
"Calling schedule() while holding {} locks",

View File

@ -1,6 +1,6 @@
use crate::config::{KERNEL_STACK_SIZE, PAGE_SIZE};
use crate::cpu::CpuSet;
use crate::prelude::*;
use crate::task::processor::switch_to_task;
use crate::user::UserSpace;
use crate::vm::{VmAllocOptions, VmFrameVec};
use spin::{Mutex, MutexGuard};
@ -8,6 +8,7 @@ use spin::{Mutex, MutexGuard};
use intrusive_collections::intrusive_adapter;
use intrusive_collections::LinkedListAtomicLink;
use super::add_task;
use super::priority::Priority;
use super::processor::{current_task, schedule};
@ -65,6 +66,8 @@ pub struct Task {
kstack: KernelStack,
link: LinkedListAtomicLink,
priority: Priority,
// TODO:: add multiprocessor support
cpu_affinity: CpuSet,
}
// TaskAdapter struct is implemented for building relationships between doubly linked list and Task struct
@ -100,7 +103,8 @@ impl Task {
}
pub fn run(self: &Arc<Self>) {
switch_to_task(self.clone());
add_task(self.clone());
schedule();
}
/// Returns the task status.
@ -150,6 +154,7 @@ pub struct TaskOptions {
data: Option<Box<dyn Any + Send + Sync>>,
user_space: Option<Arc<UserSpace>>,
priority: Priority,
cpu_affinity: CpuSet,
}
impl TaskOptions {
@ -158,11 +163,13 @@ impl TaskOptions {
where
F: Fn() + Send + Sync + 'static,
{
let cpu_affinity = CpuSet::new_full();
Self {
func: Some(Box::new(func)),
data: None,
user_space: None,
priority: Priority::normal(),
cpu_affinity,
}
}
@ -194,6 +201,11 @@ impl TaskOptions {
self
}
pub fn cpu_affinity(mut self, cpu_affinity: CpuSet) -> Self {
self.cpu_affinity = cpu_affinity;
self
}
/// Builds a new task but not run it immediately.
pub fn build(self) -> Result<Arc<Task>> {
/// all task will entering this function
@ -216,7 +228,7 @@ impl TaskOptions {
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
priority: self.priority,
//cpu_affinity: task_attrs.cpu_affinity,
cpu_affinity: self.cpu_affinity,
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
@ -253,6 +265,7 @@ impl TaskOptions {
kstack: KernelStack::new()?,
link: LinkedListAtomicLink::new(),
priority: self.priority,
cpu_affinity: self.cpu_affinity,
};
result.task_inner.lock().task_status = TaskStatus::Runnable;
@ -261,7 +274,7 @@ impl TaskOptions {
(crate::vm::paddr_to_vaddr(result.kstack.end_paddr())) as u64;
let arc_self = Arc::new(result);
switch_to_task(arc_self.clone());
arc_self.run();
Ok(arc_self)
}
}

View File

@ -76,6 +76,7 @@ fn init_thread() {
"[jinux-std/lib.rs] spawn kernel thread, tid = {}",
thread.tid()
);
thread::work_queue::init();
print_banner();

View File

@ -1,4 +1,5 @@
use jinux_frame::task::TaskOptions;
use jinux_frame::cpu::CpuSet;
use jinux_frame::task::{Priority, TaskOptions};
use crate::prelude::*;
@ -66,7 +67,8 @@ impl KernelThreadExt for Thread {
/// Options to create or spawn a new thread.
pub struct ThreadOptions {
func: Option<Box<dyn Fn() + Send + Sync>>,
priority: u16,
priority: Priority,
cpu_affinity: CpuSet,
}
impl ThreadOptions {
@ -74,9 +76,11 @@ impl ThreadOptions {
where
F: Fn() + Send + Sync + 'static,
{
let cpu_affinity = CpuSet::new_full();
Self {
func: Some(Box::new(func)),
priority: 100,
priority: Priority::normal(),
cpu_affinity,
}
}
@ -92,8 +96,13 @@ impl ThreadOptions {
self.func.take().unwrap()
}
pub fn priority(mut self, priority: u16) -> Self {
pub fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
pub fn cpu_affinity(mut self, cpu_affinity: CpuSet) -> Self {
self.cpu_affinity = cpu_affinity;
self
}
}

View File

@ -16,6 +16,7 @@ pub mod kernel_thread;
pub mod status;
pub mod task;
pub mod thread_table;
pub mod work_queue;
pub type Tid = u32;
@ -62,7 +63,7 @@ impl Thread {
.expect("[Internal Error] current thread cannot be None")
}
/// Add inner task to the run queue of scheduler. Note this does not means the thread will run at once.
/// Run this thread at once.
pub fn run(&self) {
self.status.lock().set_running();
self.task.run();

View File

@ -0,0 +1,194 @@
use crate::prelude::*;
use jinux_frame::cpu::CpuSet;
use spin::Once;
use work_item::WorkItem;
use worker_pool::WorkerPool;
mod simple_scheduler;
pub mod work_item;
pub mod worker;
pub mod worker_pool;
static WORKERPOOL_NORMAL: Once<Arc<WorkerPool>> = Once::new();
static WORKERPOOL_HIGH_PRI: Once<Arc<WorkerPool>> = Once::new();
static WORKQUEUE_GLOBAL_NORMAL: Once<Arc<WorkQueue>> = Once::new();
static WORKQUEUE_GLOBAL_HIGH_PRI: Once<Arc<WorkQueue>> = Once::new();
/// Work queue mechanism.
///
/// # Overview
///
/// A `workqueue` is a kernel-level mechanism used to schedule and execute deferred work.
/// Deferred work refers to tasks that need to be executed at some point in the future,
/// but not necessarily immediately.
///
/// The workqueue mechanism is implemented using a combination of kernel threads and data
/// structures such as `WorkItem`, `WorkQueue`, `Worker` and `WorkerPool`. The `WorkItem`
/// represents a task to be processed, while the `WorkQueue` maintains the queue of submitted
/// `WorkItems`. The `Worker` is responsible for processing these submitted tasks,
/// and the `WorkerPool` manages and schedules these workers.
///
/// # Examples
///
/// The system has a default work queue and worker pool,
/// and it also provides high-level APIs for users to use.
/// Here is a basic example to how to use those APIs.
///
/// ```rust
/// use crate::thread::work_queue::{submit_work_func, submit_work_item, WorkItem};
///
/// // Submit to high priority queue.
/// submit_work_func(||{ }, true);
///
/// // Submit to low priority queue.
/// submit_work_func(||{ }, false);
///
/// fn deferred_task(){
/// // ...
/// }
///
/// // Create a work item.
/// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task)));
///
/// // Submit to high priority queue.
/// submit_work_item(work_item, true);
///
/// // Submit to low priority queue.
/// submit_work_item(work_item, false);
/// ```
///
/// Certainly, users can also create a dedicated WorkQueue and WorkerPool.
///
/// ```rust
/// use jinux_frame::cpu::CpuSet;
/// use crate::thread::work_queue::{WorkQueue, WorkerPool, WorkItem};
///
/// fn deferred_task(){
/// // ...
/// }
///
/// let cpu_set = CpuSet::new_full();
/// let high_pri_pool = WorkerPool::new(true, cpu_set);
/// let my_queue = WorkQueue::new(Arc::downgrade(high_pri_pool.get().unwrap()));
///
/// let work_item = Arc::new(WorkItem::new(Box::new(deferred_task)));
/// my_queue.enqueue(work_item);
///
/// ```
/// Submit a function to a global work queue.
pub fn submit_work_func<F>(work_func: F, work_priority: WorkPriority)
where
F: Fn() + Send + Sync + 'static,
{
let work_item = Arc::new(WorkItem::new(Box::new(work_func)));
submit_work_item(work_item, work_priority);
}
/// Submit a work item to a global work queue.
pub fn submit_work_item(work_item: Arc<WorkItem>, work_priority: WorkPriority) -> bool {
match work_priority {
WorkPriority::High => WORKQUEUE_GLOBAL_HIGH_PRI
.get()
.unwrap()
.enqueue(work_item.clone()),
WorkPriority::Normal => WORKQUEUE_GLOBAL_NORMAL
.get()
.unwrap()
.enqueue(work_item.clone()),
}
}
/// A work queue maintains a series of work items to be handled
/// asynchronously in a process context.
pub struct WorkQueue {
worker_pool: Weak<WorkerPool>,
inner: SpinLock<WorkQueueInner>,
}
struct WorkQueueInner {
pending_work_items: Vec<Arc<WorkItem>>,
}
impl WorkQueue {
/// Create a `WorkQueue` and specify a `WorkerPool` to
/// process the submitted `WorkItems`.
pub fn new(worker_pool: Weak<WorkerPool>) -> Arc<Self> {
let queue = Arc::new(WorkQueue {
worker_pool: worker_pool.clone(),
inner: SpinLock::new(WorkQueueInner {
pending_work_items: Vec::new(),
}),
});
worker_pool
.upgrade()
.unwrap()
.assign_work_queue(queue.clone());
queue
}
/// Submit a work item. Return `false` if the work item is currently pending.
pub fn enqueue(&self, work_item: Arc<WorkItem>) -> bool {
if !work_item.try_pending() {
return false;
}
self.inner
.lock_irq_disabled()
.pending_work_items
.push(work_item);
true
}
/// Request a pending work item. The `request_cpu` indicates the CPU where
/// the calling worker is located.
fn dequeue(&self, request_cpu: u32) -> Option<Arc<WorkItem>> {
let mut inner = self.inner.lock_irq_disabled();
let Some(index) = inner
.pending_work_items
.iter()
.position(|item| item.is_valid_cpu(request_cpu))
else {
return None;
};
let item = inner.pending_work_items.remove(index);
Some(item)
}
fn has_pending_work_items(&self, request_cpu: u32) -> bool {
self.inner
.lock_irq_disabled()
.pending_work_items
.iter()
.any(|item| item.is_valid_cpu(request_cpu))
}
}
/// Initialize global worker pools and work queues.
pub fn init() {
WORKERPOOL_NORMAL.call_once(|| {
let cpu_set = CpuSet::new_full();
WorkerPool::new(WorkPriority::Normal, cpu_set)
});
WORKERPOOL_NORMAL.get().unwrap().run();
WORKERPOOL_HIGH_PRI.call_once(|| {
let cpu_set = CpuSet::new_full();
WorkerPool::new(WorkPriority::High, cpu_set)
});
WORKERPOOL_HIGH_PRI.get().unwrap().run();
WORKQUEUE_GLOBAL_NORMAL
.call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_NORMAL.get().unwrap())));
WORKQUEUE_GLOBAL_HIGH_PRI
.call_once(|| WorkQueue::new(Arc::downgrade(WORKERPOOL_HIGH_PRI.get().unwrap())));
}
impl Drop for WorkQueue {
fn drop(&mut self) {
//TODO: Handling non-empty queues.
}
}
#[derive(PartialEq)]
pub enum WorkPriority {
High,
Normal,
}

View File

@ -0,0 +1,34 @@
use alloc::sync::Weak;
use super::worker_pool::{WorkerPool, WorkerScheduler};
/// SimpleScheduler is the simplest scheduling implementation.
/// Only when there is a liveness problem in the workerpool, increase the workers,
/// set the upper limit of the workers, and do not actively reduce the workers.
/// And it only adds one worker at a time for each scheduling.
pub struct SimpleScheduler {
worker_pool: Weak<WorkerPool>,
}
impl SimpleScheduler {
pub fn new(worker_pool: Weak<WorkerPool>) -> Self {
Self { worker_pool }
}
}
const WORKER_LIMIT: u16 = 16;
impl WorkerScheduler for SimpleScheduler {
fn schedule(&self) {
let worker_pool = self.worker_pool.upgrade().unwrap();
for cpu_id in worker_pool.cpu_set().iter() {
if !worker_pool.heartbeat(cpu_id as u32)
&& worker_pool.has_pending_work_items(cpu_id as u32)
&& !worker_pool.wake_worker(cpu_id as u32)
&& worker_pool.num_workers(cpu_id as u32) < WORKER_LIMIT
{
worker_pool.add_worker(cpu_id as u32);
}
}
}
}

View File

@ -0,0 +1,56 @@
use crate::prelude::*;
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering;
use jinux_frame::cpu::CpuSet;
/// A task to be executed by a worker thread.
pub struct WorkItem {
work_func: Box<dyn Fn() + Send + Sync>,
cpu_affinity: CpuSet,
was_pending: AtomicBool,
}
impl WorkItem {
pub fn new(work_func: Box<dyn Fn() + Send + Sync>) -> WorkItem {
let cpu_affinity = CpuSet::new_full();
WorkItem {
work_func,
cpu_affinity,
was_pending: AtomicBool::new(false),
}
}
pub fn cpu_affinity(&self) -> &CpuSet {
&self.cpu_affinity
}
pub fn cpu_affinity_mut(&mut self) -> &mut CpuSet {
&mut self.cpu_affinity
}
pub(super) fn is_valid_cpu(&self, cpu_id: u32) -> bool {
self.cpu_affinity.contains(cpu_id)
}
pub(super) fn set_processing(&self) {
self.was_pending.store(false, Ordering::Release);
}
pub(super) fn set_pending(&self) {
self.was_pending.store(true, Ordering::Release);
}
pub(super) fn is_pending(&self) -> bool {
self.was_pending.load(Ordering::Acquire)
}
pub(super) fn try_pending(&self) -> bool {
self.was_pending
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
pub(super) fn call_work_func(&self) {
self.work_func.call(())
}
}

View File

@ -0,0 +1,113 @@
use super::worker_pool::WorkerPool;
use crate::prelude::*;
use crate::thread::kernel_thread::{KernelThreadExt, ThreadOptions};
use crate::Thread;
use jinux_frame::cpu::CpuSet;
use jinux_frame::task::Priority;
/// A worker thread. A `Worker` will attempt to retrieve unfinished
/// work items from its corresponding `WorkerPool`. If there are none,
/// it will go to sleep and be rescheduled when a new work item is
/// added to the `WorkerPool`.
pub(super) struct Worker {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
bound_cpu: u32,
inner: SpinLock<WorkerInner>,
}
struct WorkerInner {
worker_status: WorkerStatus,
}
#[derive(PartialEq)]
enum WorkerStatus {
Idle,
Running,
Exited,
/// This state only occurs when destructing the `WorkerPool`,
/// where workers will exit after processing the remaining work items.
Destroying,
}
impl Worker {
/// Creates a new `Worker` to the given `worker_pool`.
pub(super) fn new(worker_pool: Weak<WorkerPool>, bound_cpu: u32) -> Arc<Self> {
Arc::new_cyclic(|worker_ref| {
let weal_worker = worker_ref.clone();
let task_fn = Box::new(move || {
let current_worker: Arc<Worker> = weal_worker.upgrade().unwrap();
current_worker.run_worker_loop();
});
let mut cpu_affinity = CpuSet::new_empty();
cpu_affinity.add(bound_cpu);
let mut priority = Priority::normal();
if worker_pool.upgrade().unwrap().is_high_priority() {
priority = Priority::high();
}
let bound_thread = Thread::new_kernel_thread(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(priority),
);
Self {
worker_pool,
bound_thread,
bound_cpu,
inner: SpinLock::new(WorkerInner {
worker_status: WorkerStatus::Running,
}),
}
})
}
pub(super) fn run(&self) {
self.bound_thread.run();
}
/// The thread function bound to normal workers.
/// It pulls a work item from the work queue and sleeps if there is no more pending items.
fn run_worker_loop(self: &Arc<Self>) {
loop {
let worker_pool = self.worker_pool.upgrade();
let Some(worker_pool) = worker_pool else {
break;
};
if let Some(work_item) = worker_pool.fetch_pending_work_item(self.bound_cpu) {
work_item.set_processing();
work_item.call_work_func();
worker_pool.set_heartbeat(self.bound_cpu, true);
} else {
if self.is_destroying() {
break;
}
self.inner.lock_irq_disabled().worker_status = WorkerStatus::Idle;
worker_pool.idle_current_worker(self.bound_cpu, self.clone());
if !self.is_destroying() {
self.inner.lock_irq_disabled().worker_status = WorkerStatus::Running;
}
}
}
self.exit();
}
pub(super) fn is_idle(&self) -> bool {
self.inner.lock_irq_disabled().worker_status == WorkerStatus::Idle
}
pub(super) fn is_destroying(&self) -> bool {
self.inner.lock_irq_disabled().worker_status == WorkerStatus::Destroying
}
pub(super) fn destroy(&self) {
self.inner.lock_irq_disabled().worker_status = WorkerStatus::Destroying;
}
fn exit(&self) {
self.inner.lock_irq_disabled().worker_status = WorkerStatus::Exited;
}
pub(super) fn is_exit(&self) -> bool {
self.inner.lock_irq_disabled().worker_status == WorkerStatus::Exited
}
}

View File

@ -0,0 +1,262 @@
use core::sync::atomic::{AtomicBool, Ordering};
use super::{simple_scheduler::SimpleScheduler, worker::Worker, WorkItem, WorkPriority, WorkQueue};
use crate::prelude::*;
use crate::thread::kernel_thread::{KernelThreadExt, ThreadOptions};
use crate::Thread;
use jinux_frame::cpu::CpuSet;
use jinux_frame::sync::WaitQueue;
use jinux_frame::task::Priority;
/// A pool of workers.
///
/// The `WorkerPool` maintains workers created from different CPUs, while clustering workers
/// from the same CPU into a `LocalWorkerPool` for better management.
pub struct WorkerPool {
local_pools: Vec<Arc<LocalWorkerPool>>,
/// Monitor invokes `schedule()` in WorkerScheduler to determine whether there is a need for
/// adding or removing workers.
monitor: Arc<Monitor>,
priority: WorkPriority,
cpu_set: CpuSet,
scheduler: Arc<dyn WorkerScheduler>,
work_queues: SpinLock<Vec<Arc<WorkQueue>>>,
}
/// A set of workers for a specific CPU.
pub struct LocalWorkerPool {
cpu_id: u32,
idle_wait_queue: WaitQueue,
parent: Weak<WorkerPool>,
/// A liveness check for LocalWorkerPool. The monitor periodically clears heartbeat,
/// and when a worker completes an item, it will be set to indicate that there is still
/// an active worker. If there is no heartbeats and there are still pending work items,
/// it suggests that more workers are needed.
heartbeat: AtomicBool,
workers: SpinLock<VecDeque<Arc<Worker>>>,
}
/// Schedule `Workers` for a `WorkerPool`.
///
/// Having an excessive number of Workers in WorkerPool may result in wastage of system
/// resources, while a shortage of workers may lead to longer response time for workitems.
/// A well-designed WorkerScheduler must strike a balance between resource utilization and response time.
pub trait WorkerScheduler: Sync + Send {
/// Schedule workers in a worker pool. This needs to solve two problems: when to increase or decrease
/// workers, and how to add or remove workers to keep the number of workers in a reasonable range.
fn schedule(&self);
}
/// The `Monitor` is responsible for monitoring the `WorkerPool` for scheduling needs.
/// Currently, it only performs a liveness check, and attempts to schedule when no workers
/// are found processing in the pool.
pub struct Monitor {
worker_pool: Weak<WorkerPool>,
bound_thread: Arc<Thread>,
}
impl LocalWorkerPool {
fn new(worker_pool: Weak<WorkerPool>, cpu_id: u32) -> Self {
LocalWorkerPool {
cpu_id,
idle_wait_queue: WaitQueue::new(),
parent: worker_pool,
heartbeat: AtomicBool::new(false),
workers: SpinLock::new(VecDeque::new()),
}
}
fn add_worker(&self) {
let worker = Worker::new(self.parent.clone(), self.cpu_id);
self.workers.lock_irq_disabled().push_back(worker.clone());
worker.run();
}
fn remove_worker(&self) {
let mut workers = self.workers.lock_irq_disabled();
for (index, worker) in workers.iter().enumerate() {
if worker.is_idle() {
worker.destroy();
workers.remove(index);
break;
}
}
}
fn wake_worker(&self) -> bool {
if !self.idle_wait_queue.is_empty() {
self.idle_wait_queue.wake_one();
return true;
}
false
}
fn has_pending_work_items(&self) -> bool {
self.parent
.upgrade()
.unwrap()
.has_pending_work_items(self.cpu_id)
}
fn heartbeat(&self) -> bool {
self.heartbeat.load(Ordering::Acquire)
}
fn set_heartbeat(&self, heartbeat: bool) {
self.heartbeat.store(heartbeat, Ordering::Release);
}
fn idle_current_worker(&self, worker: Arc<Worker>) {
self.idle_wait_queue
.wait_until(|| (worker.is_destroying() || self.has_pending_work_items()).then_some(0));
}
fn destroy_all_workers(&self) {
for worker in self.workers.lock_irq_disabled().iter() {
worker.destroy();
}
self.idle_wait_queue.wake_all();
}
}
impl WorkerPool {
pub fn new(priority: WorkPriority, cpu_set: CpuSet) -> Arc<Self> {
Arc::new_cyclic(|pool_ref| {
let mut local_pools = Vec::new();
for cpu_id in cpu_set.iter() {
local_pools.push(Arc::new(LocalWorkerPool::new(
pool_ref.clone(),
cpu_id as u32,
)));
}
WorkerPool {
local_pools,
monitor: Monitor::new(pool_ref.clone()),
priority,
cpu_set,
scheduler: Arc::new(SimpleScheduler::new(pool_ref.clone())),
work_queues: SpinLock::new(Vec::new()),
}
})
}
pub fn run(&self) {
self.monitor.run();
}
pub fn assign_work_queue(&self, work_queue: Arc<WorkQueue>) {
self.work_queues.lock_irq_disabled().push(work_queue);
}
pub fn has_pending_work_items(&self, request_cpu: u32) -> bool {
self.work_queues
.lock_irq_disabled()
.iter()
.any(|work_queue| work_queue.has_pending_work_items(request_cpu))
}
pub fn schedule(&self) {
self.scheduler.schedule();
}
pub fn num_workers(&self, cpu_id: u32) -> u16 {
self.local_pool(cpu_id).workers.lock_irq_disabled().len() as u16
}
pub fn cpu_set(&self) -> &CpuSet {
&self.cpu_set
}
pub(super) fn fetch_pending_work_item(&self, request_cpu: u32) -> Option<Arc<WorkItem>> {
for work_queue in self.work_queues.lock_irq_disabled().iter() {
let item = work_queue.dequeue(request_cpu);
if item.is_some() {
return item;
}
}
None
}
fn local_pool(&self, cpu_id: u32) -> &Arc<LocalWorkerPool> {
self.local_pools
.iter()
.find(|local_pool: &&Arc<LocalWorkerPool>| local_pool.cpu_id == cpu_id)
.unwrap()
}
pub(super) fn wake_worker(&self, cpu_id: u32) -> bool {
self.local_pool(cpu_id).wake_worker()
}
pub(super) fn add_worker(&self, cpu_id: u32) {
self.local_pool(cpu_id).add_worker();
}
pub(super) fn remove_worker(&self, cpu_id: u32) {
self.local_pool(cpu_id).remove_worker();
}
pub(super) fn is_high_priority(&self) -> bool {
self.priority == WorkPriority::High
}
pub(super) fn heartbeat(&self, cpu_id: u32) -> bool {
self.local_pool(cpu_id).heartbeat()
}
pub(super) fn set_heartbeat(&self, cpu_id: u32, heartbeat: bool) {
self.local_pool(cpu_id).set_heartbeat(heartbeat)
}
pub(super) fn idle_current_worker(&self, cpu_id: u32, worker: Arc<Worker>) {
self.local_pool(cpu_id).idle_current_worker(worker);
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
for local_pool in self.local_pools.iter() {
local_pool.destroy_all_workers();
}
}
}
impl Monitor {
pub fn new(worker_pool: Weak<WorkerPool>) -> Arc<Self> {
Arc::new_cyclic(|monitor_ref| {
let weal_monitor = monitor_ref.clone();
let task_fn = Box::new(move || {
let current_monitor: Arc<Monitor> = weal_monitor.upgrade().unwrap();
current_monitor.run_monitor_loop();
});
let cpu_affinity = CpuSet::new_full();
let bound_thread = Thread::new_kernel_thread(
ThreadOptions::new(task_fn)
.cpu_affinity(cpu_affinity)
.priority(Priority::normal()),
);
Self {
worker_pool,
bound_thread,
}
})
}
pub fn run(&self) {
self.bound_thread.run();
}
fn run_monitor_loop(self: &Arc<Self>) {
loop {
let worker_pool = self.worker_pool.upgrade();
let Some(worker_pool) = worker_pool else {
break;
};
worker_pool.schedule();
for local_pool in worker_pool.local_pools.iter() {
local_pool.set_heartbeat(false);
}
Thread::yield_now();
}
}
}