sched: add support for scheduling functions to be executed later

This commit is contained in:
2023-06-14 17:35:10 +01:00
parent cdb9fef36c
commit 4a1c6cae69
7 changed files with 217 additions and 1 deletions

View File

@@ -9,6 +9,7 @@ extern kern_status_t setup_kernel_task(void);
extern kern_status_t setup_idle_task(void);
extern kern_status_t task_object_type_init(void);
extern kern_status_t thread_object_type_init(void);
extern kern_status_t global_wq_init(void);
static cycles_t __default_quantum = 0;
@@ -45,6 +46,8 @@ kern_status_t sched_init(void)
this_cpu->c_rq.rq_idle = idle_thread;
put_cpu(this_cpu);
global_wq_init();
start_charge_period();
return status;
@@ -165,7 +168,11 @@ struct runqueue *select_rq_for_thread(struct thread *thr)
void schedule_thread_on_cpu(struct thread *thr)
{
struct runqueue *rq = select_rq_for_thread(thr);
struct runqueue *rq = thr->tr_rq;
if (!rq) {
rq = select_rq_for_thread(thr);
}
if (rq) {
unsigned long flags;
rq_lock(rq, &flags);

View File

@@ -61,3 +61,22 @@ struct runqueue *cpu_rq(unsigned int cpu)
return rq;
}
void rq_remove_thread(struct runqueue *rq, struct thread *thr)
{
int prio = thread_priority(thr);
if (prio < 0 || prio > PRIO_MAX) {
return;
}
struct queue *q = &rq->rq_queues[prio];
queue_delete(q, &thr->tr_rqentry);
if (rq->rq_nthreads > 0) {
rq->rq_nthreads--;
}
if (queue_empty(q)) {
rq->rq_readybits &= ~PRIO_MASK(prio);
}
}

View File

@@ -71,3 +71,21 @@ void wakeup_one(struct waitqueue *q)
spin_unlock_irqrestore(&q->wq_lock, flags);
}
void sleep_forever(void)
{
struct thread *thr = current_thread();
struct runqueue *rq = thr->tr_rq;
unsigned long flags;
rq_lock(rq, &flags);
rq_remove_thread(rq, thr);
thr->tr_state = THREAD_SLEEPING;
rq_unlock(rq, flags);
while (thr->tr_state == THREAD_SLEEPING) {
schedule(SCHED_NORMAL);
}
}

137
sched/workqueue.c Normal file
View File

@@ -0,0 +1,137 @@
#include <socks/sched.h>
#include <socks/vm.h>
#include <socks/util.h>
#include <socks/cpu.h>
static struct worker_pool *__global_worker_pool = NULL;
struct worker_pool *global_worker_pool(void)
{
return __global_worker_pool;
}
kern_status_t global_wq_init(void)
{
__global_worker_pool = worker_pool_create(1);
if (!__global_worker_pool) {
return KERN_NO_MEMORY;
}
return KERN_OK;
}
void work_item_init(work_func_t func, void *data, struct work_item *out)
{
out->w_func = func;
out->w_data = data;
out->w_head = QUEUE_ENTRY_INIT;
}
void workqueue_init(struct workqueue *wq)
{
wq->wq_lock = SPIN_LOCK_INIT;
wq->wq_queue = QUEUE_INIT;
}
static void worker_func()
{
unsigned long flags;
while (1) {
sleep_forever();
struct cpu_data *this_cpu = get_this_cpu();
spin_lock_irqsave(&this_cpu->c_wq.wq_lock, &flags);
if (queue_empty(&this_cpu->c_wq.wq_queue)) {
spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags);
put_cpu(this_cpu);
continue;
}
struct queue_entry *work_item_qe = queue_pop_front(&this_cpu->c_wq.wq_queue);
struct work_item *work_item = QUEUE_CONTAINER(struct work_item, w_head, work_item_qe);
spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags);
put_cpu(this_cpu);
work_item->w_func(work_item);
}
}
struct worker_pool *worker_pool_create(size_t nworkers)
{
struct worker_pool *out = kzalloc(sizeof *out, VM_NORMAL);
if (!out) {
return NULL;
}
out->wp_nworkers = nworkers;
out->wp_workers = kmalloc(nworkers * sizeof(struct task *), VM_NORMAL);
if (!out->wp_workers) {
kfree(out);
return NULL;
}
for (size_t i = 0; i < nworkers; i++) {
struct thread *t = create_kernel_thread(worker_func);
out->wp_workers[i] = t;
}
return out;
}
static void do_wake_workers(struct workqueue *wq, struct worker_pool *pool)
{
size_t nr_items = queue_length(&wq->wq_queue);
if (!nr_items) {
return;
}
size_t nr_workers = nr_items;
nr_workers = CLAMP(nr_workers, 1, pool->wp_nworkers);
for (size_t i = 0; i < pool->wp_nworkers; i++) {
struct thread *t = pool->wp_workers[i];
if (t->tr_state != THREAD_SLEEPING) {
continue;
}
t->tr_state = THREAD_READY;
schedule_thread_on_cpu(t);
nr_workers--;
if (nr_workers == 0) {
break;
}
}
}
void wake_workers(struct workqueue *wq, struct worker_pool *pool)
{
unsigned long flags;
spin_lock_irqsave(&wq->wq_lock, &flags);
do_wake_workers(wq, pool);
spin_unlock_irqrestore(&wq->wq_lock, flags);
}
bool schedule_work_on(struct workqueue *wq, struct work_item *work)
{
unsigned long flags;
spin_lock_irqsave(&wq->wq_lock, &flags);
queue_push_back(&wq->wq_queue, &work->w_head);
spin_unlock_irqrestore(&wq->wq_lock, flags);
wake_workers(wq, global_worker_pool());
return true;
}
bool schedule_work(struct work_item *work)
{
struct cpu_data *self = get_this_cpu();
bool ret = schedule_work_on(&self->c_wq, work);
put_cpu(self);
return ret;
}