Newer
Older
bashrc / .bash.d / tmp / util / src / util_blockingqueue.c
Nomura.Kei on 24 Jan 2020 6 KB UPDATE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#include "util_queue.h"
#include "util_blockingqueue.h"
 
/**
 * ブロッキングキューのインスタンス構造体
 */
struct blockingqueue {
	struct queue*   queue;							/**< キュー							*/
	struct mutex*   data_lock;						/**< データロック					*/
	struct cond*    empty_block;					/**< pop のブロック用				*/
	struct cond*    full_block;						/**< push のブロック用				*/
};


/**
 * ブロッキングキューを生成します。
 * インスタンスを生成できない場合、NULL を返します。
 *
 * @param cap  キューの容量
 * @param size キューに格納するデータサイズ
 * @return イベントキューのインスタンス
 */
struct blockingqueue* blockingqueue_new(size_t cap, size_t size)
{
	struct blockingqueue* blockingqueue = (struct blockingqueue*) malloc(sizeof(struct blockingqueue));
	if (blockingqueue != NULL)
	{
		blockingqueue->queue = queue_new(cap,4096);
		if (blockingqueue->queue != NULL)
		{	// pthread_mutex_lock ですでに保持している mutex をロックしようとしたとき、
			// デフォルトのスレッドを停止させる動作とするため、NULL (デフォルト設定)を指定する。
			pthread_mutex_init(&blockingqueue->data_lock, NULL);

			// データがない場合 pop をブロックするための cond オブジェクトを初期化する。
			pthread_cond_init(&blockingqueue->pop_block, NULL);
		}
		else
		{
			free(blockingqueue);
			blockingqueue = NULL;
		}

	}
	return blockingqueue;
}




/**
 * 指定されたイベントキューを破棄します。
 *
 * @param blockingqueue イベントキューのインスタンス
 */
void blockingqueue_destroy(struct blockingqueue* blockingqueue)
{
	if (blockingqueue != NULL)
	{
		// T.B.D.
		queue_destroy(blockingqueue->queue);
		blockingqueue->queue = NULL;
	}
	free(blockingqueue);
}


/**
 * 指定された mutex をロックします。
 *
 * @param mutex mutexオブジェクト
 */
static
void blockingqueue_mutex_lock(pthread_mutex_t* mutex)
{
	int errcode = pthread_mutex_lock(mutex);
	if (errcode != 0)
	{	// エラーが発生する条件は、下記の何れかであり、本プログラムの設定では発生しない。
		// => 発生するのであれば、プログラム誤り。
		// EINVAL  : mutex が適切に初期化されていない。
		// EDEADLK : mutex は既に呼び出しスレッドによりロックされている (「エラー検査を行う」mutexes のみ)
		printf("###!!! [ERROR] pthred_mutex_lock !!!###\n");
	}
}


/**
 * 指定された mutex をアンロックします。
 *
 * @param mutex mutexオブジェクト
 */
static
void blockingqueue_mutex_unlock(pthread_mutex_t* mutex)
{
	int errcode = pthread_mutex_unlock(mutex);
	if (errcode != 0)
	{	// エラーが発生する条件は、下記の何れかであり、本プログラムの設定では発生しない。
		// => 発生するのであれば、プログラム誤り。
		// EINVAL  : mutex が適切に初期化されていない。
		// EDEADLK : mutex は既に呼び出しスレッドによりロックされている (「エラー検査を行う」mutexes のみ)
		printf("###!!! [ERROR] pthred_mutex_lock !!!###\n");
	}
}


/**
 * イベントキューにデータを追加します。
 *
 * @param blockingqueue キューのインスタンス
 * @param data  キューに追加するエントリ(データ)
 * @param size  キューに追加するエントリ(データ)のサイズ
 * @return true/false (追加成功/追加失敗)
 */
bool blockingqueue_push(struct blockingqueue* blockingqueue, void* data, size_t size)
{
	blockingqueue_mutex_lock(&blockingqueue->data_lock);

	// キューにデータがない間ブロックする。
	size_t data_count = queue_size(blockingqueue->queue);
	if (data_count == 0)
	{
		pthread_cond_signal(&blockingqueue->pop_block);
	}

	bool result = queue_push(blockingqueue->queue, data, size);

	blockingqueue_mutex_unlock(&blockingqueue->data_lock);

	return result;
}


/**
 * キューの先頭よりデータを取り出します。
 * キューにデータがない場合、ブロックされます。
 *
 * @param blockingqueue  キューのインスタンス
 * @param buf    取り出したデータ格納用バッファ
 * @param buflen バッファのサイズ
 * @return 取り出したデータのサイズ
 */
size_t blockingqueue_pop( struct blockingqueue* blockingqueue, void* buf, size_t buflen)
{
	blockingqueue_mutex_lock(&blockingqueue->data_lock);

	// キューにデータがない間ブロックする。
	size_t data_count = queue_size(blockingqueue->queue);
	while (data_count == 0)
	{
		pthread_cond_wait(&blockingqueue->pop_block, &blockingqueue->data_lock);
		data_count = queue_size(blockingqueue->queue);
	}

	size_t data_size = queue_pop(blockingqueue->queue, buf, buflen);
	blockingqueue_mutex_unlock(&blockingqueue->data_lock);
	return data_size;
}


/**
 * キューの先頭よりデータを取り出します。
 * データの取り出しに失敗した場合、0を返します。
 * blockingqueue_pop と異なり、キューからデータは削除されません。
 *
 * @param blockingqueue  キューのインスタンス
 * @param buf    取り出したデータ格納用バッファ
 * @param buflen バッファのサイズ
 * @return 取り出したデータのサイズ
 */
size_t blockingqueue_peek( struct blockingqueue* blockingqueue, void* buf, size_t buflen)
{
	blockingqueue_mutex_lock(&blockingqueue->data_lock);
	size_t size = queue_peek(blockingqueue->queue, buf, buflen);
	blockingqueue_mutex_unlock(&blockingqueue->data_lock);
	return size;
}



/**
 * キューのサイズ(キューに入っているデータの個数)を取得します。
 *
 * @param blockingqueue キューのインスタンス
 * @return キューのサイズ
 */
size_t blockingqueue_size(struct blockingqueue* blockingqueue)
{
	size_t size;
	blockingqueue_mutex_lock(&blockingqueue->data_lock);
	size = queue_size(blockingqueue->queue);
	blockingqueue_mutex_unlock(&blockingqueue->data_lock);
	return size;
}


#endif