Loading...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | /* * Copyright (c) 2020 Synopsys, Inc. * * SPDX-License-Identifier: Apache-2.0 */ #include "pktqueue.h" #include "main.h" static struct k_thread tthread[THREADS_NUM*QUEUE_NUM]; static struct k_thread qthread[QUEUE_NUM]; /* Each queue has its own mutex */ struct k_mutex sender_queue_mtx[QUEUE_NUM]; struct k_mutex receiver_queue_mtx[QUEUE_NUM]; /* Variable which indicates the amount of processed queues */ int queues_remain = QUEUE_NUM; /* Variable to define current queue in thread */ int current_queue; /* Array of packet header descriptors */ struct phdr_desc descriptors[QUEUE_NUM][SIZE_OF_QUEUE]; /* Arrays of receiver and sender queues */ struct phdr_desc_queue sender[QUEUE_NUM], receiver[QUEUE_NUM]; /* Array of packet headers */ uint8_t headers[QUEUE_NUM][SIZE_OF_QUEUE][SIZE_OF_HEADER]; static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM*QUEUE_NUM, STACK_SIZE); static K_THREAD_STACK_ARRAY_DEFINE(qstack, QUEUE_NUM, STACK_SIZE); K_MUTEX_DEFINE(fetch_queue_mtx); /* Function for initializing "sender" packet header queue */ void init_datagram_queue(struct phdr_desc_queue *queue, int queue_num) { queue->head = descriptors[queue_num]; for (int i = 0; i < SIZE_OF_QUEUE; i++) { queue->tail = &descriptors[queue_num][i]; descriptors[queue_num][i].ptr = (uint8_t *)&headers[queue_num][i]; /* Fill packet header with random values */ for (int j = 0; j < SIZE_OF_HEADER; j++) { /* leave crc field zeroed */ if (j < CRC_BYTE_1 || j > CRC_BYTE_2) { descriptors[queue_num][i].ptr[j] = (uint8_t)sys_rand32_get(); } else { descriptors[queue_num][i].ptr[j] = 0; } } /* Compute crc for further comparison */ uint16_t crc; crc = crc16(POLYNOMIAL, 0x0000, descriptors[queue_num][i].ptr, SIZE_OF_HEADER); /* Save crc value in header[CRC_BYTE_1-CRC_BYTE_2] field */ descriptors[queue_num][i].ptr[CRC_BYTE_1] = (uint8_t)(crc >> 8); descriptors[queue_num][i].ptr[CRC_BYTE_2] = (uint8_t)(crc); queue->count++; descriptors[queue_num][i].next = &descriptors[queue_num][i+1]; } } /* Thread takes packet from "sender" queue and puts it to "receiver" queue. * Each queue can be accessed only by one thread in a time. */ void test_thread(void *arg1, void *arg2, void *arg3) { struct phdr_desc_queue *sender_queue = (struct phdr_desc_queue *)arg1; struct phdr_desc_queue *receiver_queue = (struct phdr_desc_queue *)arg2; struct phdr_desc *qin_ptr = NULL; int queue_num = *(int *)arg3; /* Fetching one queue */ uint16_t crc, crc_orig; qin_ptr = phdr_desc_dequeue(sender_queue, &sender_queue_mtx[queue_num]); while (qin_ptr != NULL) { /* Store original crc value from header */ crc_orig = qin_ptr->ptr[CRC_BYTE_1] << 8; crc_orig |= qin_ptr->ptr[11]; /* Crc field should be zero before crc calculation */ qin_ptr->ptr[CRC_BYTE_1] = 0; qin_ptr->ptr[CRC_BYTE_2] = 0; crc = crc16(POLYNOMIAL, 0x0000, qin_ptr->ptr, SIZE_OF_HEADER); /* Compare computed crc with crc from phdr_desc->crc */ if (crc == crc_orig) { phdr_desc_enqueue(receiver_queue, qin_ptr, &receiver_queue_mtx[queue_num]); } /* Take next element from "sender queue" */ qin_ptr = phdr_desc_dequeue(sender_queue, &sender_queue_mtx[queue_num]); } } /* Thread that processes one pair of sender/receiver queue */ void queue_thread(void *arg1, void *arg2, void *arg3) { ARG_UNUSED(arg1); ARG_UNUSED(arg2); ARG_UNUSED(arg3); int queue_num; /* Fetching one queue */ k_mutex_lock(&fetch_queue_mtx, K_FOREVER); queue_num = current_queue; current_queue++; k_mutex_unlock(&fetch_queue_mtx); for (int i = 0; i < THREADS_NUM; i++) k_thread_create(&tthread[i+THREADS_NUM*queue_num], tstack[i+THREADS_NUM*queue_num], STACK_SIZE, (k_thread_entry_t)test_thread, (void *)&sender[queue_num], (void *)&receiver[queue_num], (void *)&queue_num, K_PRIO_PREEMPT(10), 0, K_NO_WAIT); /* Wait until sender queue is not empty */ while (sender[queue_num].count != 0) { k_sleep(K_MSEC(1)); } /* Decrementing queue counter */ k_mutex_lock(&fetch_queue_mtx, K_FOREVER); queues_remain--; k_mutex_unlock(&fetch_queue_mtx); } int main(void) { uint32_t start_time, stop_time, cycles_spent, nanoseconds_spent; current_queue = 0; printk("Simulating IP header validation on multiple cores.\n"); printk("Each of %d parallel queues is processed by %d threads" " on %d cores and contain %d packet headers.\n", QUEUE_NUM, THREADS_NUM, arch_num_cpus(), SIZE_OF_QUEUE); printk("Bytes in packet header: %d\n\n", SIZE_OF_HEADER); /* initializing "sender" queue */ for (int i = 0; i < QUEUE_NUM; i++) { init_datagram_queue(&sender[i], i); k_mutex_init(&sender_queue_mtx[i]); k_mutex_init(&receiver_queue_mtx[i]); } /* Capture initial time stamp */ start_time = k_cycle_get_32(); for (int i = 0; i < QUEUE_NUM; i++) k_thread_create(&qthread[i], qstack[i], STACK_SIZE, (k_thread_entry_t)queue_thread, (void *)&sender[i], (void *)&receiver[i], (void *)&i, K_PRIO_PREEMPT(11), 0, K_NO_WAIT); /* Wait until all queues are not processed */ while (queues_remain > 0) { k_sleep(K_MSEC(1)); } /* Capture final time stamp */ stop_time = k_cycle_get_32(); cycles_spent = stop_time - start_time; nanoseconds_spent = (uint32_t)k_cyc_to_ns_floor64(cycles_spent); /* Verify result of packet transmission * The counter of correct receiver queues */ int correct = 0; struct phdr_desc *tmp; /* Iterate and count amount of packages in receiver queues */ for (int i = 0; i < QUEUE_NUM; i++) { int count = 0; tmp = receiver[i].head; while (tmp != NULL) { tmp = tmp->next; count++; } if (receiver[i].count == SIZE_OF_QUEUE && count == SIZE_OF_QUEUE) { correct++; } } if (correct == QUEUE_NUM) { printk("RESULT: OK\n" "Application ran successfully.\n" "All %d headers were processed in %d msec\n", SIZE_OF_QUEUE*QUEUE_NUM, nanoseconds_spent / 1000 / 1000); } else { printk("RESULT: FAIL\n" "Application failed.\n" "The amount of packets in receiver queue " "is less than expected.\n"); } k_sleep(K_MSEC(10)); return 0; } |