/** * @file kc_queue.c * @brief キューモジュール * @copyright 2003 - 2023 Nomura Kei */ #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <errno.h> #include <stdatomic.h> #include <limits.h> #include <kc_lock_guard.h> #include <kc_memory.h> #include <kc_threads.h> #include <kc_queue.h> #include <kc_list.h> /** * KcQueue 管理情報 */ typedef struct { mtx_t mutex; //!< ロック用j cnd_t not_empty; //!< 条件変数(Empty) cnd_t not_full; //!< 条件変数(Full) int queue_size; //!< キューサイズ KcList *list; //!< キュー内部で利用するリスト } KcQueueInfo; // ============================================================================= // プロトタイプ宣言 // ============================================================================= static int KcQueue_size(struct KcQueue_ *queue); static bool KcQueue_is_empty(struct KcQueue_ *queue); static bool KcQueue_contains(struct KcQueue_ *queue, const void *element, size_t size); static bool KcQueue_offer(struct KcQueue_ *queue, const void *element, size_t size); static bool KcQueue_poll(struct KcQueue_ *queue, void *element, size_t *size); static void *KcQueue_peek(struct KcQueue_ *queue, size_t *size); static void KcQueue_put(struct KcQueue_ *queue, const void *element, size_t size); static void KcQueue_take(struct KcQueue_ *queue, void *element, size_t *size); static void KcQueue_clear(struct KcQueue_ *queue); static void KcQueue_cleanup_info(struct KcQueue_ *queue); // ============================================================================= // new // ============================================================================= /** * Queue を構築します。 * * @param size キューのサイズ * @return Queue */ KcQueue *KcQueue_new(int size) { // KcQueue の管理構造 // +--------------+ // | KcQueue | // | ... | // | _info -----------+ // +--------------+ | // | <_info> | <---+ // | list | // +--------------+ KcQueue *queue = (KcQueue *)malloc(sizeof(KcQueue) + sizeof(KcQueueInfo)); KcList *list = KcList_new_LinkedList(); if ((queue != NULL) && (list != NULL)) { queue->size = KcQueue_size; queue->is_empty = KcQueue_is_empty; queue->contains = KcQueue_contains; queue->offer = KcQueue_offer; queue->poll = KcQueue_poll; queue->peek = KcQueue_peek; queue->put = KcQueue_put; queue->take = KcQueue_take; queue->clear = KcQueue_clear; queue->cleanup_info = KcQueue_cleanup_info; queue->_info = (queue + 1); KcQueueInfo *info = (KcQueueInfo *)queue->_info; info->list = list; info->queue_size = (size == 0) ? INT_MAX : size; printf("queue info->mutex: %p\n", &(info->mutex)); mtx_init(&(info->mutex), mtx_plain); cnd_init(&(info->not_empty)); cnd_init(&(info->not_full)); } else { // 何れかのメモリ確保に失敗したら、メモリを解放する。 free(queue); queue = NULL; free(list); list = NULL; } return queue; } // ============================================================================= // delete // ============================================================================= /** * Queue をします。 * * @param queue 破棄するキュー */ void KcQueue_delete(KcQueue *queue) { queue->cleanup_info(queue); free(queue); } // ============================================================================= // size // ============================================================================= /** * キューに格納されている要素の数を返します。 * * @param queue 対象キュー * @return 対象キュー内の要素数 */ static int KcQueue_size(struct KcQueue_ *queue) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; size_t size = 0; mtx_lock(&(info->mutex)); // ===== Lock Start =============== size = info->list->size(info->list); mtx_unlock(&(info->mutex)); // ===== Lock End =============== return (int)size; } // ============================================================================= // is_empty // ============================================================================= /** * キューに要素がない場合に true を返します。 * * @param queue 対象キュー * @return 対象キューに要素が含まれていない場合は true */ static bool KcQueue_is_empty(struct KcQueue_ *queue) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; bool is_empty = true; mtx_lock(&(info->mutex)); // ===== Lock Start =============== is_empty = info->list->is_empty(info->list); mtx_unlock(&(info->mutex)); // ===== Lock End =============== return is_empty; } // ============================================================================= // contains // ============================================================================= /** * 指定の要素が対象キューに含まれている場合に true を返します。 * * @param queue 対象キュー * @param element 対象キュー内にあるかどうか判定される要素 * @param size 要素のサイズ * @return 指定された要素が対象キュー内にある場合は true */ static bool KcQueue_contains(struct KcQueue_ *queue, const void *element, size_t size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; bool is_contains = false; mtx_lock(&(info->mutex)); // ===== Lock Start =============== is_contains = info->list->contains(info->list, element, size); mtx_unlock(&(info->mutex)); // ===== Lock End =============== return is_contains; } // ============================================================================= // offer // ============================================================================= /** * キューに要素を追加します。 * * @param queue 対象キュー * @param element 追加する要素 * @param size 要素のサイズ * @return true/false (追加成功/失敗) */ static bool KcQueue_offer(struct KcQueue_ *queue, const void *element, size_t size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; bool is_success = false; mtx_lock(&(info->mutex)); // ===== Lock Start =============== int now_size = (int)info->list->size(info->list); if (now_size < info->queue_size) { is_success = info->list->add(info->list, -1, element, size); if (is_success) { cnd_signal(&(info->not_empty)); } } mtx_unlock(&(info->mutex)); // ===== Lock End =============== return is_success; } // ============================================================================= // poll // ============================================================================= /** * キューより要素を取り出します。 * * @param queue 対象キュー * @param element 取り出された要素が格納されます。 * @param size element のバッファサイズを指定します。また、取り出された要素のサイズが格納されます。 * @return true/false (要素の取り出し成功/失敗[要素がない]) */ static bool KcQueue_poll(struct KcQueue_ *queue, void *element, size_t *size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; bool is_success = false; mtx_lock(&(info->mutex)); // ===== Lock Start =============== bool is_empty = info->list->is_empty(info->list); if (!is_empty) { is_success = info->list->remove(info->list, 0, element, size); // is_success は常に true cnd_signal(&(info->not_full)); } mtx_unlock(&(info->mutex)); // ===== Lock End =============== return is_success; } // ============================================================================= // peek // ============================================================================= /** * キューより要素を取得しますが、削除しません。 * * @param queue 対象キュー * @param size 取り出された要素のサイズが格納されます。 * @return 要素 */ static void *KcQueue_peek(struct KcQueue_ *queue, size_t *size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; void *value = NULL; mtx_lock(&(info->mutex)); // ===== Lock Start =============== value = info->list->get(info->list, 0, size); mtx_unlock(&(info->mutex)); // ===== Lock End =============== return value; } // ============================================================================= // put // ============================================================================= /** * キューに要素を追加します。 * キューが一杯の状態で追加できない場合、追加できるまでブロックされます。 * * @param queue 対象キュー * @param element 追加する要素 * @param size 要素のサイズ */ static void KcQueue_put(struct KcQueue_ *queue, const void *element, size_t size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; mtx_lock(&(info->mutex)); // ===== Lock Start =============== while ((int)info->list->size(info->list) == info->queue_size) { cnd_wait(&(info->not_full), &(info->mutex)); } info->list->add(info->list, -1, element, size); cnd_signal(&(info->not_empty)); mtx_unlock(&(info->mutex)); // ===== Lock End =============== } // ============================================================================= // take // ============================================================================= /** * キューより要素を取り出します。 * 必要に応じて、要素が利用可能になるまでブロックされます。 * * @param queue 対象キュー * @param element 取り出された要素が格納されます。 * @param size element のバッファサイズを指定します。また、取り出された要素のサイズが格納されます。 */ void KcQueue_take(struct KcQueue_ *queue, void *element, size_t *size) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; mtx_lock(&(info->mutex)); // ===== Lock Start =============== while (info->list->is_empty(info->list)) { cnd_wait(&(info->not_empty), &(info->mutex)); } info->list->remove(info->list, 0, element, size); cnd_signal(&(info->not_full)); mtx_unlock(&(info->mutex)); // ===== Lock End =============== } // ============================================================================= // clear // ============================================================================= /** * すべての要素をキューより削除します。 * * @param queue 対象キュー */ void KcQueue_clear(struct KcQueue_ *queue) { KcQueueInfo *info = (KcQueueInfo *)queue->_info; mtx_lock(&(info->mutex)); // ===== Lock Start =============== info->list->clear(info->list); mtx_unlock(&(info->mutex)); // ===== Lock End =============== } // ============================================================================= // clearnup_info // ============================================================================= /** * クリア * * @param queue 対象キュー */ static void KcQueue_cleanup_info(struct KcQueue_ *queue) { printf("### cleanup info\n"); KcQueueInfo *info = (KcQueueInfo *)queue->_info; KcList_delete(info->list); printf("info->mutex : %p\n", &info->mutex); mtx_destroy(&(info->mutex)); cnd_destroy(&(info->not_empty)); cnd_destroy(&(info->not_full)); }