Files
mango/sched/workqueue.c

138 lines
2.8 KiB
C
Raw Normal View History

#include <kernel/sched.h>
#include <kernel/vm.h>
#include <kernel/util.h>
#include <kernel/cpu.h>
static struct worker_pool *__global_worker_pool = NULL;
struct worker_pool *global_worker_pool(void)
{
return __global_worker_pool;
}
kern_status_t global_wq_init(void)
{
__global_worker_pool = worker_pool_create(1);
if (!__global_worker_pool) {
return KERN_NO_MEMORY;
}
return KERN_OK;
}
void work_item_init(work_func_t func, void *data, struct work_item *out)
{
out->w_func = func;
out->w_data = data;
out->w_head = QUEUE_ENTRY_INIT;
}
void workqueue_init(struct workqueue *wq)
{
wq->wq_lock = SPIN_LOCK_INIT;
wq->wq_queue = QUEUE_INIT;
}
static void worker_func()
{
unsigned long flags;
while (1) {
sleep_forever();
struct cpu_data *this_cpu = get_this_cpu();
spin_lock_irqsave(&this_cpu->c_wq.wq_lock, &flags);
if (queue_empty(&this_cpu->c_wq.wq_queue)) {
spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags);
put_cpu(this_cpu);
continue;
}
struct queue_entry *work_item_qe = queue_pop_front(&this_cpu->c_wq.wq_queue);
struct work_item *work_item = QUEUE_CONTAINER(struct work_item, w_head, work_item_qe);
spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags);
put_cpu(this_cpu);
work_item->w_func(work_item);
}
}
struct worker_pool *worker_pool_create(size_t nworkers)
{
struct worker_pool *out = kzalloc(sizeof *out, VM_NORMAL);
if (!out) {
return NULL;
}
out->wp_nworkers = nworkers;
out->wp_workers = kmalloc(nworkers * sizeof(struct task *), VM_NORMAL);
if (!out->wp_workers) {
kfree(out);
return NULL;
}
for (size_t i = 0; i < nworkers; i++) {
struct thread *t = create_kernel_thread(worker_func);
out->wp_workers[i] = t;
}
return out;
}
static void do_wake_workers(struct workqueue *wq, struct worker_pool *pool)
{
size_t nr_items = queue_length(&wq->wq_queue);
if (!nr_items) {
return;
}
size_t nr_workers = nr_items;
nr_workers = CLAMP(nr_workers, 1, pool->wp_nworkers);
for (size_t i = 0; i < pool->wp_nworkers; i++) {
struct thread *t = pool->wp_workers[i];
if (t->tr_state != THREAD_SLEEPING) {
continue;
}
t->tr_state = THREAD_READY;
schedule_thread_on_cpu(t);
nr_workers--;
if (nr_workers == 0) {
break;
}
}
}
void wake_workers(struct workqueue *wq, struct worker_pool *pool)
{
unsigned long flags;
spin_lock_irqsave(&wq->wq_lock, &flags);
do_wake_workers(wq, pool);
spin_unlock_irqrestore(&wq->wq_lock, flags);
}
bool schedule_work_on(struct workqueue *wq, struct work_item *work)
{
unsigned long flags;
spin_lock_irqsave(&wq->wq_lock, &flags);
queue_push_back(&wq->wq_queue, &work->w_head);
spin_unlock_irqrestore(&wq->wq_lock, flags);
wake_workers(wq, global_worker_pool());
return true;
}
bool schedule_work(struct work_item *work)
{
struct cpu_data *self = get_this_cpu();
bool ret = schedule_work_on(&self->c_wq, work);
put_cpu(self);
return ret;
}