#include "PL_Queue.h" #include #include #include struct QBlock { uint8_t* data; size_t maxSize; size_t size; QBlock() : data(nullptr), maxSize(0), size(0) { } QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { } ~QBlock() { data = nullptr; maxSize = 0; size = 0; } }; struct QBlockPool { uint8_t* blocksDataPool; typedef std::set qpool_t; qpool_t staticPool; qpool_t freePool; QBlockPool() : blocksDataPool(nullptr) { } QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : blocksDataPool(nullptr) { bp_init(maxBlockCount, maxBlockSize); } void bp_init(size_t maxBlockCount, size_t maxBlockSize) { const size_t totalBytes = maxBlockCount * maxBlockSize; blocksDataPool = new uint8_t[totalBytes]; printf("QBlockPool allocate byte: %u\n", totalBytes); for (size_t i = 0; i < maxBlockCount; i++) { uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize]; QBlock* qb = new QBlock(qbData, maxBlockSize); staticPool.insert(qb); freePool.insert(qb); } } ~QBlockPool() { if (freePool.size() != staticPool.size()) { printf("QBlockPool memory leakage\n"); } for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter) { delete *iter; } delete[] blocksDataPool; blocksDataPool = nullptr; } QBlock* bp_alloc() { if (freePool.empty()) { printf("QBlockPool bp_alloc empty\n"); return nullptr; } qpool_t::iterator iter = freePool.begin(); QBlock* qb = *iter; freePool.erase(iter); return qb; } void bp_free(QBlock* qb) { if (qb == nullptr) return; qpool_t::iterator iter = staticPool.find(qb); if (iter == staticPool.end()) { printf("QBlockPool bp_free not in pool\n"); return; } //iter = freePool.find(qb); //if (iter != freePool.end()) //{ // printf("QBlockPool bp_free block is free"); // return; //} qb->size = 0; freePool.insert(qb); } bool bp_empty() const { return freePool.empty(); } }; struct PL_Queue_Internal { PL_Queue_Config config; QBlockPool pool; typedef std::queue queue_t; queue_t que; pthread_mutex_t* queue_pool_mutex; pthread_mutex_t* sync_full_mutex; pthread_mutex_t* sync_empty_mutex; PL_Queue_Internal() : config(), pool(), que(), queue_pool_mutex(new pthread_mutex_t), sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t) { pthread_mutex_init(queue_pool_mutex, NULL); pthread_mutex_init(sync_full_mutex, NULL); pthread_mutex_init(sync_empty_mutex, NULL); } ~PL_Queue_Internal() { if (queue_pool_mutex != nullptr) { pthread_mutex_destroy(queue_pool_mutex); delete queue_pool_mutex; queue_pool_mutex = nullptr; } if (sync_full_mutex != nullptr) { pthread_mutex_destroy(sync_full_mutex); delete sync_full_mutex; sync_full_mutex = nullptr; } if (sync_empty_mutex != nullptr) { pthread_mutex_destroy(sync_empty_mutex); delete sync_empty_mutex; sync_empty_mutex = nullptr; } } void reset() { if (queue_pool_mutex != nullptr) { pthread_mutex_destroy(queue_pool_mutex); delete queue_pool_mutex; queue_pool_mutex = nullptr; } queue_pool_mutex = new pthread_mutex_t; pthread_mutex_init(queue_pool_mutex, NULL); if (sync_full_mutex != nullptr) { pthread_mutex_destroy(sync_full_mutex); delete sync_full_mutex; sync_full_mutex = nullptr; } sync_full_mutex = new pthread_mutex_t; pthread_mutex_init(sync_full_mutex, NULL); if (sync_empty_mutex != nullptr) { pthread_mutex_destroy(sync_empty_mutex); delete sync_empty_mutex; sync_empty_mutex = nullptr; } sync_empty_mutex = new pthread_mutex_t; pthread_mutex_init(sync_empty_mutex, NULL); //#todo free que } }; PipeLineElem* create_PL_Queue() { return new PL_Queue; } PL_Queue::PL_Queue() : internal(new PL_Queue_Internal) { } PL_Queue::~PL_Queue() { delete (PL_Queue_Internal*)internal; internal= nullptr; } bool PL_Queue::init(void* args) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; PL_Queue_Config* config = (PL_Queue_Config*)args; in->reset(); in->config = *config; in->pool.bp_init(config->maxBlockCount, config->maxBlockSize); if (config->syncQueueFull) { int ret = pthread_mutex_lock(in->sync_full_mutex); if(ret != 0) { printf("pthread_mutex_lock sync_full_mutex: %d\n", ret); return false; } } if (config->syncQueueEmpty) { int ret = pthread_mutex_lock(in->sync_empty_mutex); if(ret != 0) { printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret); return false; } } return true; } void PL_Queue::finit() { PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete } bool PL_Queue::pay(const PipeMaterial& pm) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; QBlock* qb = nullptr; if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) { // there is no available qb if (in->config.queueFullDropBlock) { printf("PL_Queue::pay queueFullDropBlock\n"); pthread_mutex_lock(in->queue_pool_mutex); qb = in->que.front(); in->que.pop(); in->pool.bp_free(qb); qb = nullptr; pthread_mutex_unlock(in->queue_pool_mutex); } else { if (in->config.syncQueueFull) { printf("PL_Queue::pay sync by sync_full_mutex\n"); pthread_mutex_lock(in->sync_full_mutex); } } } if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) { printf("PL_Queue::pay full\n"); return false; } pthread_mutex_lock(in->queue_pool_mutex); qb = in->pool.bp_alloc(); if (qb != nullptr) in->que.push(qb); pthread_mutex_unlock(in->queue_pool_mutex); if (qb == nullptr) { printf("PL_Queue::pay bp_alloc nullptr\n"); return false; } if (in->config.copyData) { memcpy(qb->data, pm.buffer, pm.buffSize); qb->size = pm.buffSize; } if (in->config.syncQueueEmpty) pthread_mutex_unlock(in->sync_empty_mutex); return true; } void PL_Queue::pm_deleter_qb(PipeMaterial* pm) { if (pm->former == nullptr || pm->args == nullptr) return; PL_Queue* _this = (PL_Queue*)pm->former; QBlock* qb = (QBlock*)pm->args; PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal; pthread_mutex_lock(in->queue_pool_mutex); in->pool.bp_free(qb); pthread_mutex_unlock(in->queue_pool_mutex); } bool PL_Queue::gain(PipeMaterial& pm) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; if (in->que.empty()) { if (in->config.syncQueueEmpty) pthread_mutex_lock(in->sync_empty_mutex); } if (in->que.empty()) { printf("PL_Queue::gain empty\n"); pm.buffer = nullptr; pm.buffSize = 0; pm.former = this; return false; } QBlock* qb = nullptr; pthread_mutex_lock(in->queue_pool_mutex); qb = in->que.front(); in->que.pop(); pthread_mutex_unlock(in->queue_pool_mutex); if (in->config.syncQueueFull) pthread_mutex_unlock(in->sync_full_mutex); pm.type = PipeMaterial::PMT_BYTES; pm.buffer = qb->data; pm.buffSize = qb->size; pm.former = this; pm.deleter = pm_deleter_qb; pm.args = qb; return true; }