Newer
Older
libkc / modules / src / kc_queue.c
/**
 * @file  kc_queue.c
 * @brief キューモジュール
 * @copyright  2003 - 2023  Nomura Kei
 */
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <stdatomic.h>
#include <limits.h>

#include <kc_lock_guard.h>
#include <kc_memory.h>
#include <kc_threads.h>
#include <kc_queue.h>
#include <kc_list.h>

/**
 * KcQueue 管理情報
 */
typedef struct
{
	mtx_t mutex;	 //!< ロック用j
	cnd_t not_empty; //!< 条件変数(Empty)
	cnd_t not_full;	 //!< 条件変数(Full)
	int queue_size;	 //!< キューサイズ
	KcList *list;	 //!< キュー内部で利用するリスト
} KcQueueInfo;

// =============================================================================
//  プロトタイプ宣言
// =============================================================================
static int KcQueue_size(struct KcQueue_ *queue);
static bool KcQueue_is_empty(struct KcQueue_ *queue);
static bool KcQueue_contains(struct KcQueue_ *queue, const void *element, size_t size);
static bool KcQueue_offer(struct KcQueue_ *queue, const void *element, size_t size);
static bool KcQueue_poll(struct KcQueue_ *queue, void *element, size_t *size);
static void *KcQueue_peek(struct KcQueue_ *queue, size_t *size);
static void KcQueue_put(struct KcQueue_ *queue, const void *element, size_t size);
static void KcQueue_take(struct KcQueue_ *queue, void *element, size_t *size);
static void KcQueue_clear(struct KcQueue_ *queue);
static void KcQueue_cleanup_info(struct KcQueue_ *queue);

// =============================================================================
//  new
// =============================================================================
/**
 * Queue を構築します。
 *
 * @param size キューのサイズ
 * @return Queue
 */
KcQueue *KcQueue_new(int size)
{
	// KcQueue の管理構造
	// +--------------+
	// | KcQueue      |
	// |  ...         |
	// |  _info  -----------+
	// +--------------+     |
	// | <_info>      | <---+
	// | list         |
	// +--------------+
	KcQueue *queue = (KcQueue *)malloc(sizeof(KcQueue) + sizeof(KcQueueInfo));
	KcList *list = KcList_new_LinkedList();
	if ((queue != NULL) && (list != NULL))
	{
		queue->size = KcQueue_size;
		queue->is_empty = KcQueue_is_empty;
		queue->contains = KcQueue_contains;
		queue->offer = KcQueue_offer;
		queue->poll = KcQueue_poll;
		queue->peek = KcQueue_peek;
		queue->put = KcQueue_put;
		queue->take = KcQueue_take;
		queue->clear = KcQueue_clear;
		queue->cleanup_info = KcQueue_cleanup_info;
		queue->_info = (queue + 1);

		KcQueueInfo *info = (KcQueueInfo *)queue->_info;
		info->list = list;
		info->queue_size = (size == 0) ? INT_MAX : size;
		printf("queue info->mutex: %p\n", &(info->mutex));
		mtx_init(&(info->mutex), mtx_plain);
		cnd_init(&(info->not_empty));
		cnd_init(&(info->not_full));
	}
	else
	{ // 何れかのメモリ確保に失敗したら、メモリを解放する。
		free(queue);
		queue = NULL;
		free(list);
		list = NULL;
	}
	return queue;
}

// =============================================================================
//  delete
// =============================================================================
/**
 * Queue をします。
 *
 * @param queue 破棄するキュー
 */
void KcQueue_delete(KcQueue *queue)
{
	queue->cleanup_info(queue);
	free(queue);
}

// =============================================================================
//  size
// =============================================================================
/**
 * キューに格納されている要素の数を返します。
 *
 * @param queue 対象キュー
 * @return 対象キュー内の要素数
 */
static int KcQueue_size(struct KcQueue_ *queue)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	size_t size = 0;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	size = info->list->size(info->list);
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return (int)size;
}

// =============================================================================
//  is_empty
// =============================================================================
/**
 * キューに要素がない場合に true を返します。
 *
 * @param queue 対象キュー
 * @return 対象キューに要素が含まれていない場合は true
 */
static bool KcQueue_is_empty(struct KcQueue_ *queue)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	bool is_empty = true;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	is_empty = info->list->is_empty(info->list);
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return is_empty;
}

// =============================================================================
//  contains
// =============================================================================
/**
 * 指定の要素が対象キューに含まれている場合に true を返します。
 *
 * @param queue   対象キュー
 * @param element 対象キュー内にあるかどうか判定される要素
 * @param size    要素のサイズ
 * @return 指定された要素が対象キュー内にある場合は true
 */
static bool KcQueue_contains(struct KcQueue_ *queue, const void *element, size_t size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	bool is_contains = false;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	is_contains = info->list->contains(info->list, element, size);
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return is_contains;
}

// =============================================================================
//  offer
// =============================================================================
/**
 * キューに要素を追加します。
 *
 * @param queue   対象キュー
 * @param element 追加する要素
 * @param size    要素のサイズ
 * @return true/false (追加成功/失敗)
 */
static bool KcQueue_offer(struct KcQueue_ *queue, const void *element, size_t size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	bool is_success = false;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	int now_size = (int)info->list->size(info->list);
	if (now_size < info->queue_size)
	{
		is_success = info->list->add(info->list, -1, element, size);
		if (is_success)
		{
			cnd_signal(&(info->not_empty));
		}
	}
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return is_success;
}

// =============================================================================
//  poll
// =============================================================================
/**
 * キューより要素を取り出します。
 *
 * @param queue   対象キュー
 * @param element 取り出された要素が格納されます。
 * @param size    element のバッファサイズを指定します。また、取り出された要素のサイズが格納されます。
 * @return true/false (要素の取り出し成功/失敗[要素がない])
 */
static bool KcQueue_poll(struct KcQueue_ *queue, void *element, size_t *size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	bool is_success = false;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	bool is_empty = info->list->is_empty(info->list);
	if (!is_empty)
	{
		is_success = info->list->remove(info->list, 0, element, size);
		// is_success は常に true
		cnd_signal(&(info->not_full));
	}
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return is_success;
}

// =============================================================================
//  peek
// =============================================================================
/**
 * キューより要素を取得しますが、削除しません。
 *
 * @param queue   対象キュー
 * @param size    取り出された要素のサイズが格納されます。
 * @return 要素
 */
static void *KcQueue_peek(struct KcQueue_ *queue, size_t *size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	void *value = NULL;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	value = info->list->get(info->list, 0, size);
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
	return value;
}

// =============================================================================
//  put
// =============================================================================
/**
 * キューに要素を追加します。
 * キューが一杯の状態で追加できない場合、追加できるまでブロックされます。
 *
 * @param queue   対象キュー
 * @param element 追加する要素
 * @param size    要素のサイズ
 */
static void KcQueue_put(struct KcQueue_ *queue, const void *element, size_t size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	while ((int)info->list->size(info->list) == info->queue_size)
	{
		cnd_wait(&(info->not_full), &(info->mutex));
	}
	info->list->add(info->list, -1, element, size);
	cnd_signal(&(info->not_empty));
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
}

// =============================================================================
//  take
// =============================================================================
/**
 * キューより要素を取り出します。
 * 必要に応じて、要素が利用可能になるまでブロックされます。
 *
 * @param queue   対象キュー
 * @param element 取り出された要素が格納されます。
 * @param size    element のバッファサイズを指定します。また、取り出された要素のサイズが格納されます。
 */
void KcQueue_take(struct KcQueue_ *queue, void *element, size_t *size)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	while (info->list->is_empty(info->list))
	{
		cnd_wait(&(info->not_empty), &(info->mutex));
	}
	info->list->remove(info->list, 0, element, size);
	cnd_signal(&(info->not_full));
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
}

// =============================================================================
//  clear
// =============================================================================
/**
 * すべての要素をキューより削除します。
 *
 * @param queue 対象キュー
 */
void KcQueue_clear(struct KcQueue_ *queue)
{
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	mtx_lock(&(info->mutex)); // ===== Lock Start ===============
	info->list->clear(info->list);
	mtx_unlock(&(info->mutex)); // ===== Lock End ===============
}

// =============================================================================
//  clearnup_info
// =============================================================================
/**
 * クリア
 *
 * @param queue 対象キュー
 */
static void KcQueue_cleanup_info(struct KcQueue_ *queue)
{
	printf("### cleanup info\n");
	KcQueueInfo *info = (KcQueueInfo *)queue->_info;
	KcList_delete(info->list);
	printf("info->mutex : %p\n", &info->mutex);
	mtx_destroy(&(info->mutex));
	cnd_destroy(&(info->not_empty));
	cnd_destroy(&(info->not_full));
}