diff --git a/include/socks/cpu.h b/include/socks/cpu.h index 1e3c47b..dd36683 100644 --- a/include/socks/cpu.h +++ b/include/socks/cpu.h @@ -20,6 +20,7 @@ struct cpu_data { int c_preempt_count; struct runqueue c_rq; + struct workqueue c_wq; struct queue c_timers; }; diff --git a/include/socks/sched.h b/include/socks/sched.h index 2e48612..0704b8f 100644 --- a/include/socks/sched.h +++ b/include/socks/sched.h @@ -32,6 +32,7 @@ extern "C" { #endif struct runqueue; +struct work_item; enum task_state { TASK_RUNNING, @@ -135,6 +136,24 @@ struct waitqueue { spin_lock_t wq_lock; }; +typedef void(*work_func_t)(struct work_item *); + +struct work_item { + void *w_data; + work_func_t w_func; + struct queue_entry w_head; +}; + +struct worker_pool { + struct thread **wp_workers; + size_t wp_nworkers; +}; + +struct workqueue { + spin_lock_t wq_lock; + struct queue wq_queue; /* list of struct work_item */ +}; + extern kern_status_t sched_init(void); extern void schedule(enum sched_mode mode); extern void preempt_disable(void); @@ -151,6 +170,7 @@ static inline void rq_unlock(struct runqueue *rq, unsigned long flags) { spin_unlock_irqrestore(&rq->rq_lock, flags); } +extern void rq_remove_thread(struct runqueue *rq, struct thread *thr); extern struct runqueue *cpu_rq(unsigned int cpu); extern struct task *task_alloc(void); @@ -192,6 +212,7 @@ extern void add_timer(struct timer *timer); extern void remove_timer(struct timer *timer); extern unsigned long schedule_timeout(unsigned long clock_ticks); extern unsigned long milli_sleep(unsigned long ms); +extern void sleep_forever(void); extern void wait_item_init(struct wait_item *item, struct thread *thr); extern void thread_wait_begin(struct wait_item *waiter, struct waitqueue *q); @@ -200,6 +221,15 @@ extern void wait_on_queue(struct waitqueue *q); extern void wakeup_queue(struct waitqueue *q); extern void wakeup_one(struct waitqueue *q); +extern void work_item_init(work_func_t func, void *data, struct work_item *out); +extern void workqueue_init(struct workqueue *wq); +extern struct worker_pool *worker_pool_create(size_t nworkers); +extern struct worker_pool *global_worker_pool(void); +extern bool schedule_work_on(struct workqueue *wq, struct work_item *work); +extern bool schedule_work(struct work_item *work); + +extern void wake_workers(struct workqueue *wq, struct worker_pool *pool); + #ifdef __cplusplus } #endif diff --git a/include/socks/util.h b/include/socks/util.h index c94303e..8d4ae20 100644 --- a/include/socks/util.h +++ b/include/socks/util.h @@ -9,6 +9,10 @@ extern "C" { #endif +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#define CLAMP(x, lo, hi) (MIN(MAX(x, lo), hi)) + extern uint64_t hash_string(const char *s); extern void data_size_to_string(size_t value, char *out, size_t outsz); static inline bool power_of_2(size_t x) { return (x > 0 && (x & (x - 1)) == 0); } diff --git a/sched/core.c b/sched/core.c index 1515c91..1f3f929 100644 --- a/sched/core.c +++ b/sched/core.c @@ -9,6 +9,7 @@ extern kern_status_t setup_kernel_task(void); extern kern_status_t setup_idle_task(void); extern kern_status_t task_object_type_init(void); extern kern_status_t thread_object_type_init(void); +extern kern_status_t global_wq_init(void); static cycles_t __default_quantum = 0; @@ -45,6 +46,8 @@ kern_status_t sched_init(void) this_cpu->c_rq.rq_idle = idle_thread; put_cpu(this_cpu); + global_wq_init(); + start_charge_period(); return status; @@ -165,7 +168,11 @@ struct runqueue *select_rq_for_thread(struct thread *thr) void schedule_thread_on_cpu(struct thread *thr) { - struct runqueue *rq = select_rq_for_thread(thr); + struct runqueue *rq = thr->tr_rq; + if (!rq) { + rq = select_rq_for_thread(thr); + } + if (rq) { unsigned long flags; rq_lock(rq, &flags); diff --git a/sched/runqueue.c b/sched/runqueue.c index 9be1015..8d70569 100644 --- a/sched/runqueue.c +++ b/sched/runqueue.c @@ -61,3 +61,22 @@ struct runqueue *cpu_rq(unsigned int cpu) return rq; } + +void rq_remove_thread(struct runqueue *rq, struct thread *thr) +{ + int prio = thread_priority(thr); + if (prio < 0 || prio > PRIO_MAX) { + return; + } + + struct queue *q = &rq->rq_queues[prio]; + queue_delete(q, &thr->tr_rqentry); + + if (rq->rq_nthreads > 0) { + rq->rq_nthreads--; + } + + if (queue_empty(q)) { + rq->rq_readybits &= ~PRIO_MASK(prio); + } +} diff --git a/sched/wait.c b/sched/wait.c index d3d8204..497e42c 100644 --- a/sched/wait.c +++ b/sched/wait.c @@ -71,3 +71,21 @@ void wakeup_one(struct waitqueue *q) spin_unlock_irqrestore(&q->wq_lock, flags); } + +void sleep_forever(void) +{ + struct thread *thr = current_thread(); + struct runqueue *rq = thr->tr_rq; + + unsigned long flags; + rq_lock(rq, &flags); + + rq_remove_thread(rq, thr); + thr->tr_state = THREAD_SLEEPING; + + rq_unlock(rq, flags); + + while (thr->tr_state == THREAD_SLEEPING) { + schedule(SCHED_NORMAL); + } +} diff --git a/sched/workqueue.c b/sched/workqueue.c new file mode 100644 index 0000000..bad0db9 --- /dev/null +++ b/sched/workqueue.c @@ -0,0 +1,137 @@ +#include +#include +#include +#include + +static struct worker_pool *__global_worker_pool = NULL; + +struct worker_pool *global_worker_pool(void) +{ + return __global_worker_pool; +} + +kern_status_t global_wq_init(void) +{ + __global_worker_pool = worker_pool_create(1); + if (!__global_worker_pool) { + return KERN_NO_MEMORY; + } + + return KERN_OK; +} + +void work_item_init(work_func_t func, void *data, struct work_item *out) +{ + out->w_func = func; + out->w_data = data; + out->w_head = QUEUE_ENTRY_INIT; +} + +void workqueue_init(struct workqueue *wq) +{ + wq->wq_lock = SPIN_LOCK_INIT; + wq->wq_queue = QUEUE_INIT; +} + +static void worker_func() +{ + unsigned long flags; + while (1) { + sleep_forever(); + struct cpu_data *this_cpu = get_this_cpu(); + + spin_lock_irqsave(&this_cpu->c_wq.wq_lock, &flags); + + if (queue_empty(&this_cpu->c_wq.wq_queue)) { + spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags); + put_cpu(this_cpu); + continue; + } + + struct queue_entry *work_item_qe = queue_pop_front(&this_cpu->c_wq.wq_queue); + struct work_item *work_item = QUEUE_CONTAINER(struct work_item, w_head, work_item_qe); + spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags); + put_cpu(this_cpu); + + work_item->w_func(work_item); + } +} + +struct worker_pool *worker_pool_create(size_t nworkers) +{ + struct worker_pool *out = kzalloc(sizeof *out, VM_NORMAL); + if (!out) { + return NULL; + } + + out->wp_nworkers = nworkers; + out->wp_workers = kmalloc(nworkers * sizeof(struct task *), VM_NORMAL); + if (!out->wp_workers) { + kfree(out); + return NULL; + } + + for (size_t i = 0; i < nworkers; i++) { + struct thread *t = create_kernel_thread(worker_func); + out->wp_workers[i] = t; + } + + return out; +} + +static void do_wake_workers(struct workqueue *wq, struct worker_pool *pool) +{ + size_t nr_items = queue_length(&wq->wq_queue); + if (!nr_items) { + return; + } + + size_t nr_workers = nr_items; + + nr_workers = CLAMP(nr_workers, 1, pool->wp_nworkers); + + for (size_t i = 0; i < pool->wp_nworkers; i++) { + struct thread *t = pool->wp_workers[i]; + + if (t->tr_state != THREAD_SLEEPING) { + continue; + } + + t->tr_state = THREAD_READY; + schedule_thread_on_cpu(t); + + nr_workers--; + + if (nr_workers == 0) { + break; + } + } +} + +void wake_workers(struct workqueue *wq, struct worker_pool *pool) +{ + unsigned long flags; + spin_lock_irqsave(&wq->wq_lock, &flags); + do_wake_workers(wq, pool); + spin_unlock_irqrestore(&wq->wq_lock, flags); +} + +bool schedule_work_on(struct workqueue *wq, struct work_item *work) +{ + unsigned long flags; + spin_lock_irqsave(&wq->wq_lock, &flags); + queue_push_back(&wq->wq_queue, &work->w_head); + spin_unlock_irqrestore(&wq->wq_lock, flags); + + wake_workers(wq, global_worker_pool()); + + return true; +} + +bool schedule_work(struct work_item *work) +{ + struct cpu_data *self = get_this_cpu(); + bool ret = schedule_work_on(&self->c_wq, work); + put_cpu(self); + return ret; +}