From 77936e3511b9a2f0b1068c12966e8ee61f4970e8 Mon Sep 17 00:00:00 2001 From: Max Wash Date: Sat, 21 Feb 2026 11:32:57 +0000 Subject: [PATCH] kernel: implement sending, receiving, and replying to message via port/channel --- include/kernel/channel.h | 1 + include/kernel/msg.h | 4 +- include/kernel/port.h | 4 +- kernel/channel.c | 74 +++++++----- kernel/port.c | 43 +++++-- syscall/msg.c | 242 ++++++++++++++++++++++++++++++++++++--- 6 files changed, 317 insertions(+), 51 deletions(-) diff --git a/include/kernel/channel.h b/include/kernel/channel.h index 83314cb..0da9d60 100644 --- a/include/kernel/channel.h +++ b/include/kernel/channel.h @@ -15,6 +15,7 @@ struct channel { }; extern kern_status_t channel_type_init(void); +extern struct channel *channel_cast(struct object *obj); extern struct channel *channel_create(void); diff --git a/include/kernel/msg.h b/include/kernel/msg.h index 910a997..f1dcb37 100644 --- a/include/kernel/msg.h +++ b/include/kernel/msg.h @@ -23,8 +23,8 @@ struct kmsg { kern_status_t msg_result; struct port *msg_sender_port; struct thread *msg_sender_thread; - const struct msg *msg_req; - struct msg *msg_resp; + struct msg msg_req; + struct msg msg_resp; }; #endif diff --git a/include/kernel/port.h b/include/kernel/port.h index a167afa..c82a16d 100644 --- a/include/kernel/port.h +++ b/include/kernel/port.h @@ -29,10 +29,12 @@ extern struct port *port_cast(struct object *obj); extern struct port *port_create(void); extern kern_status_t port_connect(struct port *port, struct channel *remote); +extern kern_status_t port_disconnect(struct port *port); extern kern_status_t port_send_msg( struct port *port, const struct msg *req, - struct msg *resp); + struct msg *resp, + unsigned long *lock_flags); DEFINE_OBJECT_LOCK_FUNCTION(port, p_base) diff --git a/kernel/channel.c b/kernel/channel.c index 0582577..7fbe255 100644 --- a/kernel/channel.c +++ b/kernel/channel.c @@ -18,6 +18,11 @@ kern_status_t channel_type_init(void) return object_type_register(&channel_type); } +struct channel *channel_cast(struct object *obj) +{ + return CHANNEL_CAST(obj); +} + extern struct channel *channel_create(void) { struct object *channel_object = object_create(&channel_type); @@ -69,26 +74,31 @@ static bool try_enqueue(struct btree *tree, struct kmsg *msg) return true; } -static void kmsg_reply_error(struct kmsg *msg, kern_status_t status) +static void kmsg_reply_error( + struct kmsg *msg, + kern_status_t status, + unsigned long *lock_flags) { msg->msg_status = KMSG_REPLY_SENT; - msg->msg_status = status; + msg->msg_result = status; thread_awaken(msg->msg_sender_thread); + spin_unlock_irqrestore(&msg->msg_lock, *lock_flags); } -static struct kmsg *get_next_msg(struct channel *channel) +static struct kmsg *get_next_msg( + struct channel *channel, + unsigned long *lock_flags) { - unsigned long flags; struct btree_node *cur = btree_first(&channel->c_msg); while (cur) { struct kmsg *msg = BTREE_CONTAINER(struct kmsg, msg_node, cur); - spin_lock_irqsave(&msg->msg_lock, &flags); + spin_lock_irqsave(&msg->msg_lock, lock_flags); if (msg->msg_status == KMSG_WAIT_RECEIVE) { msg->msg_status = KMSG_WAIT_REPLY; return msg; } - spin_unlock_irqrestore(&msg->msg_lock, flags); + spin_unlock_irqrestore(&msg->msg_lock, *lock_flags); cur = btree_next(cur); } @@ -118,11 +128,12 @@ extern kern_status_t channel_recv_msg( struct wait_item waiter; struct thread *self = current_thread(); struct kmsg *msg = NULL; + unsigned long msg_lock_flags; wait_item_init(&waiter, self); for (;;) { thread_wait_begin(&waiter, &channel->c_wq); - msg = get_next_msg(channel); + msg = get_next_msg(channel, &msg_lock_flags); if (msg) { break; } @@ -145,11 +156,11 @@ extern kern_status_t channel_recv_msg( out_msg->msg_data_count, sender->t_address_space, 0, - msg->msg_req->msg_data, - msg->msg_req->msg_data_count, + msg->msg_req.msg_data, + msg->msg_req.msg_data_count, VM_REGION_COPY_ALL); if (status != KERN_OK) { - kmsg_reply_error(msg, status); + kmsg_reply_error(msg, status, &msg_lock_flags); return status; } @@ -158,14 +169,17 @@ extern kern_status_t channel_recv_msg( out_msg->msg_handles, out_msg->msg_handles_count, sender->t_handles, - msg->msg_req->msg_handles, - msg->msg_req->msg_handles_count); + msg->msg_req.msg_handles, + msg->msg_req.msg_handles_count); if (status != KERN_OK) { - kmsg_reply_error(msg, status); + kmsg_reply_error(msg, status, &msg_lock_flags); return status; } - kmsg_reply_error(msg, KERN_OK); + *out_id = msg->msg_id; + + spin_unlock_irqrestore(&msg->msg_lock, msg_lock_flags); + return KERN_OK; } @@ -175,46 +189,54 @@ extern kern_status_t channel_reply_msg( const struct msg *resp, unsigned long *irq_flags) { + unsigned long msg_lock_flags; struct kmsg *msg = get_msg_with_id(&channel->c_msg, id); - if (!msg || msg->msg_status != KMSG_WAIT_REPLY) { + if (!msg) { + return KERN_INVALID_ARGUMENT; + } + + spin_lock_irqsave(&msg->msg_lock, &msg_lock_flags); + if (msg->msg_status != KMSG_WAIT_REPLY) { + spin_unlock_irqrestore(&msg->msg_lock, msg_lock_flags); return KERN_INVALID_ARGUMENT; } struct thread *self = current_thread(); - struct task *sender = msg->msg_sender_thread->tr_parent; - struct task *receiver = self->tr_parent; + /* the task that is about to receive the response */ + struct task *receiver = msg->msg_sender_thread->tr_parent; + /* the task that is about to send the response */ + struct task *sender = self->tr_parent; kern_status_t status = vm_region_memmove_v( receiver->t_address_space, 0, - msg->msg_resp->msg_data, - msg->msg_resp->msg_data_count, + msg->msg_resp.msg_data, + msg->msg_resp.msg_data_count, sender->t_address_space, 0, resp->msg_data, resp->msg_data_count, VM_REGION_COPY_ALL); if (status != KERN_OK) { - kmsg_reply_error(msg, status); + kmsg_reply_error(msg, status, &msg_lock_flags); return status; } status = handle_list_transfer( receiver->t_handles, - msg->msg_resp->msg_handles, - msg->msg_resp->msg_handles_count, + msg->msg_resp.msg_handles, + msg->msg_resp.msg_handles_count, sender->t_handles, resp->msg_handles, resp->msg_handles_count); if (status != KERN_OK) { - kmsg_reply_error(msg, status); + kmsg_reply_error(msg, status, &msg_lock_flags); return status; } - msg->msg_status = KERN_OK; - msg->msg_status = KMSG_REPLY_SENT; + kmsg_reply_error(msg, KERN_OK, &msg_lock_flags); - return KERN_UNIMPLEMENTED; + return KERN_OK; } extern kern_status_t channel_read_msg( diff --git a/kernel/port.c b/kernel/port.c index 620bf71..871b4c6 100644 --- a/kernel/port.c +++ b/kernel/port.c @@ -20,8 +20,24 @@ struct port *port_cast(struct object *obj) return PORT_CAST(obj); } -static void wait_for_reply(struct port *port) +static void wait_for_reply(struct kmsg *msg, unsigned long *lock_flags) { + struct wait_item waiter; + struct thread *self = current_thread(); + + wait_item_init(&waiter, self); + for (;;) { + self->tr_state = THREAD_SLEEPING; + if (msg->msg_status == KMSG_REPLY_SENT) { + break; + } + + port_unlock_irqrestore(msg->msg_sender_port, *lock_flags); + schedule(SCHED_NORMAL); + port_lock_irqsave(msg->msg_sender_port, lock_flags); + } + + self->tr_state = THREAD_READY; } struct port *port_create(void) @@ -49,10 +65,22 @@ kern_status_t port_connect(struct port *port, struct channel *remote) return KERN_OK; } +kern_status_t port_disconnect(struct port *port) +{ + if (port->p_status != PORT_READY) { + return KERN_BAD_STATE; + } + + port->p_remote = NULL; + port->p_status = PORT_OFFLINE; + return KERN_OK; +} + kern_status_t port_send_msg( struct port *port, const struct msg *req, - struct msg *resp) + struct msg *resp, + unsigned long *lock_flags) { if (port->p_status != PORT_READY) { return KERN_BAD_STATE; @@ -60,19 +88,20 @@ kern_status_t port_send_msg( struct thread *self = current_thread(); struct kmsg *msg = &self->tr_msg; + memset(msg, 0x0, sizeof *msg); + msg->msg_status = KMSG_WAIT_RECEIVE; msg->msg_sender_thread = self; msg->msg_sender_port = port; - msg->msg_req = req; - msg->msg_resp = resp; + msg->msg_req = *req; + msg->msg_resp = *resp; unsigned long flags; channel_lock_irqsave(port->p_remote, &flags); + port->p_status = PORT_SEND_BLOCKED; channel_enqueue_msg(port->p_remote, msg); channel_unlock_irqrestore(port->p_remote, flags); - port->p_status = PORT_SEND_BLOCKED; - - wait_for_reply(port); + wait_for_reply(msg, lock_flags); return msg->msg_result; } diff --git a/syscall/msg.c b/syscall/msg.c index a85cbe8..f56b864 100644 --- a/syscall/msg.c +++ b/syscall/msg.c @@ -62,14 +62,13 @@ kern_status_t sys_port_create(kern_handle_t *out) kern_handle_t handle; kern_status_t status = task_open_handle(self, &port->p_base, 0, &handle); + task_unlock_irqrestore(self, irq_flags); + if (status != KERN_OK) { - task_unlock_irqrestore(self, irq_flags); object_unref(&port->p_base); return status; } - task_unlock_irqrestore(self, irq_flags); - *out = handle; return KERN_OK; } @@ -98,6 +97,7 @@ kern_status_t sys_port_connect( /* add a reference to the port object to make sure it isn't deleted * while we're using it */ object_ref(port_obj); + struct port *port = port_cast(port_obj); task_unlock_irqrestore(self, flags); struct task *remote_task = task_from_tid(task_id); @@ -116,43 +116,255 @@ kern_status_t sys_port_connect( object_ref(&remote->c_base); task_unlock_irqrestore(remote_task, flags); - status = port_connect(port_cast(port_obj), remote); + port_lock_irqsave(port, &flags); + status = port_connect(port, remote); + port_unlock_irqrestore(port, flags); object_unref(port_obj); object_unref(&remote->c_base); return KERN_OK; } -kern_status_t sys_port_disconnect(kern_handle_t port) +kern_status_t sys_port_disconnect(kern_handle_t port_handle) { - return KERN_UNIMPLEMENTED; + unsigned long flags; + + struct task *self = current_task(); + task_lock_irqsave(self, &flags); + + struct object *port_obj = NULL; + handle_flags_t port_handle_flags = 0; + kern_status_t status = task_resolve_handle( + self, + port_handle, + &port_obj, + &port_handle_flags); + if (status != KERN_OK) { + return status; + } + + /* add a reference to the port object to make sure it isn't deleted + * while we're using it */ + object_ref(port_obj); + task_unlock_irqrestore(self, flags); + + struct port *port = port_cast(port_obj); + if (!port) { + object_unref(port_obj); + return KERN_INVALID_ARGUMENT; + } + + object_unref(port_obj); + port_lock_irqsave(port, &flags); + status = port_disconnect(port); + port_unlock_irqrestore(port, flags); + + return status; +} + +static bool validate_msg(struct task *task, const struct msg *msg, bool rw) +{ + if (!validate_access_r(task, msg, sizeof *msg)) { + return false; + } + + if (msg->msg_data_count + && !validate_access_r( + task, + msg->msg_data, + sizeof(struct iovec) * msg->msg_data_count)) { + return false; + } + + if (msg->msg_handles_count + && !validate_access_r( + task, + msg->msg_handles, + sizeof(struct handle_list) * msg->msg_handles_count)) { + return false; + } + + for (size_t i = 0; i < msg->msg_data_count; i++) { + bool ok = false; + const struct iovec *iov = &msg->msg_data[i]; + if (rw) { + ok = validate_access_w(task, iov->io_base, iov->io_len); + } else { + ok = validate_access_r(task, iov->io_base, iov->io_len); + } + + if (!ok) { + return false; + } + } + + for (size_t i = 0; i < msg->msg_handles_count; i++) { + bool ok = false; + const struct handle_list *list = &msg->msg_handles[i]; + if (rw) { + ok = validate_access_w( + task, + list->l_handles, + list->l_nr_handles * sizeof(kern_handle_t)); + } else { + ok = validate_access_r( + task, + list->l_handles, + list->l_nr_handles * sizeof(kern_handle_t)); + } + + if (!ok) { + return false; + } + } + + return true; } kern_status_t sys_msg_send( - kern_handle_t port, - msg_flags_t flags, + kern_handle_t port_handle, + msg_flags_t msg_flags, const struct msg *req, struct msg *resp) { - return KERN_UNIMPLEMENTED; + struct task *self = current_task(); + + if (!validate_msg(self, req, false)) { + return KERN_MEMORY_FAULT; + } + + if (!validate_msg(self, resp, true)) { + return KERN_MEMORY_FAULT; + } + + unsigned long flags; + + task_lock_irqsave(self, &flags); + + struct object *port_obj = NULL; + handle_flags_t port_handle_flags = 0; + kern_status_t status = task_resolve_handle( + self, + port_handle, + &port_obj, + &port_handle_flags); + if (status != KERN_OK) { + return status; + } + + /* add a reference to the port object to make sure it isn't deleted + * while we're using it */ + object_ref(port_obj); + task_unlock_irqrestore(self, flags); + + struct port *port = port_cast(port_obj); + if (!port) { + object_unref(port_obj); + return KERN_INVALID_ARGUMENT; + } + + port_lock_irqsave(port, &flags); + status = port_send_msg(port, req, resp, &flags); + port_unlock_irqrestore(port, flags); + object_unref(port_obj); + + return status; } kern_status_t sys_msg_recv( - kern_handle_t channel, - msg_flags_t flags, + kern_handle_t channel_handle, + msg_flags_t msg_flags, msgid_t *out_id, struct msg *out_msg) { - return KERN_UNIMPLEMENTED; + struct task *self = current_task(); + + if (!validate_access_w(self, out_id, sizeof *out_id)) { + return KERN_MEMORY_FAULT; + } + + if (!validate_msg(self, out_msg, true)) { + return KERN_MEMORY_FAULT; + } + + unsigned long flags; + + task_lock_irqsave(self, &flags); + + struct object *channel_obj = NULL; + handle_flags_t channel_handle_flags = 0; + kern_status_t status = task_resolve_handle( + self, + channel_handle, + &channel_obj, + &channel_handle_flags); + if (status != KERN_OK) { + return status; + } + + /* add a reference to the port object to make sure it isn't deleted + * while we're using it */ + object_ref(channel_obj); + task_unlock_irqrestore(self, flags); + + struct channel *channel = channel_cast(channel_obj); + if (!channel) { + object_unref(channel_obj); + return KERN_INVALID_ARGUMENT; + } + + channel_lock_irqsave(channel, &flags); + status = channel_recv_msg(channel, out_msg, out_id, &flags); + channel_unlock_irqrestore(channel, flags); + object_unref(channel_obj); + + return status; } kern_status_t sys_msg_reply( - kern_handle_t channel, - msg_flags_t flags, + kern_handle_t channel_handle, + msg_flags_t msg_flags, msgid_t id, const struct msg *reply) { - return KERN_UNIMPLEMENTED; + struct task *self = current_task(); + + if (!validate_msg(self, reply, false)) { + return KERN_MEMORY_FAULT; + } + + unsigned long flags; + + task_lock_irqsave(self, &flags); + + struct object *channel_obj = NULL; + handle_flags_t channel_handle_flags = 0; + kern_status_t status = task_resolve_handle( + self, + channel_handle, + &channel_obj, + &channel_handle_flags); + if (status != KERN_OK) { + return status; + } + + /* add a reference to the port object to make sure it isn't deleted + * while we're using it */ + object_ref(channel_obj); + task_unlock_irqrestore(self, flags); + + struct channel *channel = channel_cast(channel_obj); + if (!channel) { + object_unref(channel_obj); + return KERN_INVALID_ARGUMENT; + } + + channel_lock_irqsave(channel, &flags); + status = channel_reply_msg(channel, id, reply, &flags); + channel_unlock_irqrestore(channel, flags); + object_unref(channel_obj); + + return status; } kern_status_t sys_msg_read(