#ifndef MYTHREAD_H_XZL_201808081711
|
#define MYTHREAD_H_XZL_201808081711
|
|
#include <thread>
|
#include <vector>
|
#include <list>
|
#include <memory>
|
#include <thread>
|
#include <chrono>
|
#include <functional>
|
#include "mySem.hpp"
|
|
struct threadInfo {
|
bool isRunning;
|
std::shared_ptr<std::thread> thd;
|
};
|
enum {
|
PROCESSOR_FAIL = -1,
|
PROCESSOR_SUCCESS = 0,
|
PROCESSOR_TIME_OUT = 1,
|
PROCESSOR_QUEUE_FULL = 2,
|
PROCESSOR_QUEUE_EMPTY = 3
|
};
|
|
template<typename PACKET>
|
class mythread {
|
using TASK_FUNCTION = std::function<void(std::shared_ptr<PACKET> &)>;
|
public:
|
mythread(TASK_FUNCTION task_f = nullptr) : m_task_function(task_f) {}
|
|
virtual ~mythread() {}
|
|
int beginThreads(const int count) {
|
for (int i = 0; i < count; i++) {
|
threadInfo thInfo;
|
thInfo.isRunning = true;
|
m_thds.push_back(thInfo);
|
}
|
for (auto &ele:m_thds) {
|
ele.thd = std::make_shared<std::thread>([&]() -> void {
|
std::shared_ptr<PACKET> sp_task(nullptr);
|
int status = 0;
|
while (ele.isRunning) {
|
status = get_task(sp_task, 200);
|
if (status == PROCESSOR_SUCCESS) {
|
doFunc(sp_task);
|
if (m_task_function != nullptr) {
|
m_task_function(sp_task);
|
}
|
}
|
}
|
});
|
}
|
}
|
|
int endThreads() {
|
for (auto &ele:m_thds) {
|
ele.isRunning = false;
|
ele.thd->join();
|
}
|
}
|
|
int put_task(std::shared_ptr<PACKET> &sp_task, int *p_new_size = nullptr) {
|
std::lock_guard<std::mutex> auto_lock(m_task_lock);
|
|
int cur_size = static_cast<int>(m_taskList.size());
|
if (cur_size >= m_task_max_count) {
|
if (p_new_size != nullptr) {
|
*p_new_size = cur_size;
|
}
|
return PROCESSOR_QUEUE_FULL;
|
}
|
|
m_taskList.emplace_back(sp_task);
|
m_task_semphore.signal();
|
|
if (p_new_size != nullptr) {
|
*p_new_size = cur_size + 1;
|
}
|
|
return PROCESSOR_SUCCESS;
|
}
|
|
int get_task(std::shared_ptr<PACKET> &sp_task, const int ms = (-1), int *p_new_size = nullptr) {
|
if (m_task_semphore.wait(ms) != SEMPHORE_SUCCESS) {
|
return PROCESSOR_TIME_OUT;
|
}
|
|
std::lock_guard<std::mutex> auto_lock(m_task_lock);
|
|
/*
|
* 元素入队时信号与元素是1对1关系,由于可以一次取全部队列元素,
|
* 所以可能存在等待信号成功但队列中已无元素的情况,故在此判断
|
*/
|
if (m_taskList.empty()) {
|
if (p_new_size != nullptr) {
|
*p_new_size = 0;
|
}
|
return PROCESSOR_QUEUE_EMPTY;
|
}
|
|
sp_task = std::move(m_taskList.front());
|
m_taskList.pop_front();
|
|
if (p_new_size != nullptr) {
|
*p_new_size = static_cast<int>(m_taskList.size());
|
}
|
|
return PROCESSOR_SUCCESS;
|
}
|
|
int get_taskListSize() {
|
std::lock_guard<std::mutex> auto_lock(m_task_lock);
|
return m_taskList.size();
|
}
|
|
virtual void doFunc(std::shared_ptr<PACKET> spPacket) {}
|
|
private:
|
std::vector<threadInfo> m_thds;
|
std::mutex m_task_lock;
|
Semphore m_task_semphore;
|
volatile int m_task_max_count = 1024;
|
std::list<std::shared_ptr<PACKET>> m_taskList;
|
TASK_FUNCTION m_task_function = nullptr;
|
};
|
|
#endif
|