Loading...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | /* * Copyright (c) 2016 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 */ /** * @file * @brief Message queues. */ #include <kernel.h> #include <kernel_structs.h> #include <debug/object_tracing_common.h> #include <toolchain.h> #include <linker/sections.h> #include <string.h> #include <wait_q.h> #include <misc/dlist.h> #include <init.h> extern struct k_msgq _k_msgq_list_start[]; extern struct k_msgq _k_msgq_list_end[]; #ifdef CONFIG_OBJECT_TRACING struct k_msgq *_trace_list_k_msgq; /* * Complete initialization of statically defined message queues. */ static int init_msgq_module(struct device *dev) { ARG_UNUSED(dev); struct k_msgq *msgq; for (msgq = _k_msgq_list_start; msgq < _k_msgq_list_end; msgq++) { SYS_TRACING_OBJ_INIT(k_msgq, msgq); } return 0; } SYS_INIT(init_msgq_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS); #endif /* CONFIG_OBJECT_TRACING */ void k_msgq_init(struct k_msgq *q, char *buffer, size_t msg_size, u32_t max_msgs) { q->msg_size = msg_size; q->max_msgs = max_msgs; q->buffer_start = buffer; q->buffer_end = buffer + (max_msgs * msg_size); q->read_ptr = buffer; q->write_ptr = buffer; q->used_msgs = 0; sys_dlist_init(&q->wait_q); SYS_TRACING_OBJ_INIT(k_msgq, q); } int k_msgq_put(struct k_msgq *q, void *data, s32_t timeout) { __ASSERT(!_is_in_isr() || timeout == K_NO_WAIT, ""); unsigned int key = irq_lock(); struct k_thread *pending_thread; int result; if (q->used_msgs < q->max_msgs) { /* message queue isn't full */ pending_thread = _unpend_first_thread(&q->wait_q); if (pending_thread) { /* give message to waiting thread */ memcpy(pending_thread->base.swap_data, data, q->msg_size); /* wake up waiting thread */ _set_thread_return_value(pending_thread, 0); _abort_thread_timeout(pending_thread); _ready_thread(pending_thread); if (!_is_in_isr() && _must_switch_threads()) { _Swap(key); return 0; } } else { /* put message in queue */ memcpy(q->write_ptr, data, q->msg_size); q->write_ptr += q->msg_size; if (q->write_ptr == q->buffer_end) { q->write_ptr = q->buffer_start; } q->used_msgs++; } result = 0; } else if (timeout == K_NO_WAIT) { /* don't wait for message space to become available */ result = -ENOMSG; } else { /* wait for put message success, failure, or timeout */ _pend_current_thread(&q->wait_q, timeout); _current->base.swap_data = data; return _Swap(key); } irq_unlock(key); return result; } int k_msgq_get(struct k_msgq *q, void *data, s32_t timeout) { __ASSERT(!_is_in_isr() || timeout == K_NO_WAIT, ""); unsigned int key = irq_lock(); struct k_thread *pending_thread; int result; if (q->used_msgs > 0) { /* take first available message from queue */ memcpy(data, q->read_ptr, q->msg_size); q->read_ptr += q->msg_size; if (q->read_ptr == q->buffer_end) { q->read_ptr = q->buffer_start; } q->used_msgs--; /* handle first thread waiting to write (if any) */ pending_thread = _unpend_first_thread(&q->wait_q); if (pending_thread) { /* add thread's message to queue */ memcpy(q->write_ptr, pending_thread->base.swap_data, q->msg_size); q->write_ptr += q->msg_size; if (q->write_ptr == q->buffer_end) { q->write_ptr = q->buffer_start; } q->used_msgs++; /* wake up waiting thread */ _set_thread_return_value(pending_thread, 0); _abort_thread_timeout(pending_thread); _ready_thread(pending_thread); if (!_is_in_isr() && _must_switch_threads()) { _Swap(key); return 0; } } result = 0; } else if (timeout == K_NO_WAIT) { /* don't wait for a message to become available */ result = -ENOMSG; } else { /* wait for get message success or timeout */ _pend_current_thread(&q->wait_q, timeout); _current->base.swap_data = data; return _Swap(key); } irq_unlock(key); return result; } void k_msgq_purge(struct k_msgq *q) { unsigned int key = irq_lock(); struct k_thread *pending_thread; /* wake up any threads that are waiting to write */ while ((pending_thread = _unpend_first_thread(&q->wait_q)) != NULL) { _set_thread_return_value(pending_thread, -ENOMSG); _abort_thread_timeout(pending_thread); _ready_thread(pending_thread); } q->used_msgs = 0; q->read_ptr = q->write_ptr; _reschedule_threads(key); } |