#include #include #include #include static struct worker_pool *__global_worker_pool = NULL; struct worker_pool *global_worker_pool(void) { return __global_worker_pool; } kern_status_t global_wq_init(void) { __global_worker_pool = worker_pool_create(1); if (!__global_worker_pool) { return KERN_NO_MEMORY; } return KERN_OK; } void work_item_init(work_func_t func, void *data, struct work_item *out) { out->w_func = func; out->w_data = data; out->w_head = QUEUE_ENTRY_INIT; } void workqueue_init(struct workqueue *wq) { wq->wq_lock = SPIN_LOCK_INIT; wq->wq_queue = QUEUE_INIT; } static void worker_func() { unsigned long flags; while (1) { sleep_forever(); struct cpu_data *this_cpu = get_this_cpu(); spin_lock_irqsave(&this_cpu->c_wq.wq_lock, &flags); if (queue_empty(&this_cpu->c_wq.wq_queue)) { spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags); put_cpu(this_cpu); continue; } struct queue_entry *work_item_qe = queue_pop_front(&this_cpu->c_wq.wq_queue); struct work_item *work_item = QUEUE_CONTAINER(struct work_item, w_head, work_item_qe); spin_unlock_irqrestore(&this_cpu->c_wq.wq_lock, flags); put_cpu(this_cpu); work_item->w_func(work_item); } } struct worker_pool *worker_pool_create(size_t nworkers) { struct worker_pool *out = kzalloc(sizeof *out, VM_NORMAL); if (!out) { return NULL; } out->wp_nworkers = nworkers; out->wp_workers = kmalloc(nworkers * sizeof(struct task *), VM_NORMAL); if (!out->wp_workers) { kfree(out); return NULL; } for (size_t i = 0; i < nworkers; i++) { struct thread *t = create_kernel_thread(worker_func); out->wp_workers[i] = t; } return out; } static void do_wake_workers(struct workqueue *wq, struct worker_pool *pool) { size_t nr_items = queue_length(&wq->wq_queue); if (!nr_items) { return; } size_t nr_workers = nr_items; nr_workers = CLAMP(nr_workers, 1, pool->wp_nworkers); for (size_t i = 0; i < pool->wp_nworkers; i++) { struct thread *t = pool->wp_workers[i]; if (t->tr_state != THREAD_SLEEPING) { continue; } t->tr_state = THREAD_READY; schedule_thread_on_cpu(t); nr_workers--; if (nr_workers == 0) { break; } } } void wake_workers(struct workqueue *wq, struct worker_pool *pool) { unsigned long flags; spin_lock_irqsave(&wq->wq_lock, &flags); do_wake_workers(wq, pool); spin_unlock_irqrestore(&wq->wq_lock, flags); } bool schedule_work_on(struct workqueue *wq, struct work_item *work) { unsigned long flags; spin_lock_irqsave(&wq->wq_lock, &flags); queue_push_back(&wq->wq_queue, &work->w_head); spin_unlock_irqrestore(&wq->wq_lock, flags); wake_workers(wq, global_worker_pool()); return true; } bool schedule_work(struct work_item *work) { struct cpu_data *self = get_this_cpu(); bool ret = schedule_work_on(&self->c_wq, work); put_cpu(self); return ret; }