diff --git a/include/socks/sched.h b/include/socks/sched.h index ca7f4f5..9a6db72 100644 --- a/include/socks/sched.h +++ b/include/socks/sched.h @@ -12,10 +12,27 @@ #define PRIO_MAX 32 #define THREAD_KSTACK_ORDER VM_PAGE_4K +#define wait_event(wq, cond) \ + ({ \ + struct thread *self = current_thread(); \ + struct wait_item waiter; \ + wait_item_init(&waiter, self); \ + for (;;) { \ + thread_wait_begin(&waiter, wq); \ + if (cond) { \ + break; \ + } \ + schedule(SCHED_NORMAL); \ + } \ + thread_wait_end(&waiter, wq); \ + }) + #ifdef __cplusplus extern "C" { #endif +struct runqueue; + enum task_state { TASK_RUNNING, TASK_STOPPED, @@ -83,6 +100,8 @@ struct thread { uintptr_t tr_sp, tr_bp; + struct runqueue *tr_rq; + struct queue_entry tr_threads; struct queue_entry tr_rqentry; @@ -174,6 +193,13 @@ extern void remove_timer(struct timer *timer); extern unsigned long schedule_timeout(unsigned long clock_ticks); extern unsigned long milli_sleep(unsigned long ms); +extern void wait_item_init(struct wait_item *item, struct thread *thr); +extern void thread_wait_begin(struct wait_item *waiter, struct waitqueue *q); +extern void thread_wait_end(struct wait_item *waiter, struct waitqueue *q); +extern void wait_on_queue(struct waitqueue *q); +extern void wakeup_queue(struct waitqueue *q); +extern void wakeup_one(struct waitqueue *q); + #ifdef __cplusplus } #endif diff --git a/sched/runqueue.c b/sched/runqueue.c index b03fc7f..9be1015 100644 --- a/sched/runqueue.c +++ b/sched/runqueue.c @@ -50,6 +50,7 @@ void rq_enqueue(struct runqueue *rq, struct thread *thr) rq->rq_nthreads++; rq->rq_readybits |= PRIO_MASK(thread_priority(thr)); + thr->tr_rq = rq; } struct runqueue *cpu_rq(unsigned int cpu) diff --git a/sched/wait.c b/sched/wait.c new file mode 100644 index 0000000..d3d8204 --- /dev/null +++ b/sched/wait.c @@ -0,0 +1,73 @@ +#include +#include + +void wait_item_init(struct wait_item *item, struct thread *thr) +{ + item->w_thread = thr; + item->w_entry = QUEUE_ENTRY_INIT; +} + +void thread_wait_begin(struct wait_item *waiter, struct waitqueue *q) +{ + unsigned long flags; + spin_lock_irqsave(&q->wq_lock, &flags); + + queue_push_back(&q->wq_waiters, &waiter->w_entry); + waiter->w_thread->tr_state = THREAD_SLEEPING; + + spin_unlock_irqrestore(&q->wq_lock, flags); +} + +void thread_wait_end(struct wait_item *waiter, struct waitqueue *q) +{ + waiter->w_thread->tr_state = THREAD_READY; + + unsigned long flags; + spin_lock_irqsave(&q->wq_lock, &flags); + + queue_delete(&q->wq_waiters, &waiter->w_entry); + + spin_unlock_irqrestore(&q->wq_lock, flags); +} + +void wakeup_queue(struct waitqueue *q) +{ + unsigned long flags; + spin_lock_irqsave(&q->wq_lock, &flags); + struct queue_entry *ent = queue_pop_front(&q->wq_waiters); + while (ent) { + struct wait_item *waiter = QUEUE_CONTAINER(struct wait_item, w_entry, ent); + struct thread *thr = waiter->w_thread; + struct runqueue *rq = thr->tr_rq; + if (!rq) { + rq = cpu_rq(this_cpu()); + } + + thr->tr_state = THREAD_READY; + rq_enqueue(rq, thr); + + ent = queue_pop_front(&q->wq_waiters); + } + + spin_unlock_irqrestore(&q->wq_lock, flags); +} + +void wakeup_one(struct waitqueue *q) +{ + unsigned long flags; + spin_lock_irqsave(&q->wq_lock, &flags); + struct queue_entry *ent = queue_pop_front(&q->wq_waiters); + if(ent) { + struct wait_item *waiter = QUEUE_CONTAINER(struct wait_item, w_entry, ent); + struct thread *thr = waiter->w_thread; + struct runqueue *rq = thr->tr_rq; + if (!rq) { + rq = cpu_rq(this_cpu()); + } + + thr->tr_state = THREAD_READY; + rq_enqueue(rq, thr); + } + + spin_unlock_irqrestore(&q->wq_lock, flags); +}