#ifndef MYTHREAD_H_XZL_201808081711 #define MYTHREAD_H_XZL_201808081711 #include #include #include #include #include #include #include #include struct threadInfo { bool isRunning; std::shared_ptr thd; }; enum { PROCESSOR_FAIL = -1, PROCESSOR_SUCCESS = 0, PROCESSOR_TIME_OUT = 1, PROCESSOR_QUEUE_FULL = 2, PROCESSOR_QUEUE_EMPTY = 3 }; template class mythread { using TASK_FUNCTION = std::function &)>; public: mythread(TASK_FUNCTION task_f = nullptr) : m_task_function(task_f) {} virtual ~mythread() {} int beginThreads(const int count) { for (int i = 0; i < count; i++) { threadInfo thInfo; thInfo.isRunning = true; m_thds.push_back(thInfo); } for (auto &ele:m_thds) { ele.thd = std::make_shared([&]() -> void { std::shared_ptr sp_task(nullptr); int status = 0; while (ele.isRunning) { status = get_task(sp_task, 200); if (status == PROCESSOR_SUCCESS) { doFunc(sp_task); if (m_task_function != nullptr) { m_task_function(sp_task); } } } }); } } int endThreads() { for (auto &ele:m_thds) { ele.isRunning = false; ele.thd->join(); } } int put_task(std::shared_ptr &sp_task, int *p_new_size = nullptr) { std::lock_guard auto_lock(m_task_lock); int cur_size = static_cast(m_taskList.size()); if (cur_size >= m_task_max_count) { if (p_new_size != nullptr) { *p_new_size = cur_size; } return PROCESSOR_QUEUE_FULL; } m_taskList.emplace_back(sp_task); m_task_semphore.signal(); if (p_new_size != nullptr) { *p_new_size = cur_size + 1; } return PROCESSOR_SUCCESS; } int get_task(std::shared_ptr &sp_task, const int ms = (-1), int *p_new_size = nullptr) { if (m_task_semphore.wait(ms) != SEMPHORE_SUCCESS) { return PROCESSOR_TIME_OUT; } std::lock_guard auto_lock(m_task_lock); /* * 元素入队时信号与元素是1对1关系,由于可以一次取全部队列元素, * 所以可能存在等待信号成功但队列中已无元素的情况,故在此判断 */ if (m_taskList.empty()) { if (p_new_size != nullptr) { *p_new_size = 0; } return PROCESSOR_QUEUE_EMPTY; } sp_task = std::move(m_taskList.front()); m_taskList.pop_front(); if (p_new_size != nullptr) { *p_new_size = static_cast(m_taskList.size()); } return PROCESSOR_SUCCESS; } int get_taskListSize() { std::lock_guard auto_lock(m_task_lock); return m_taskList.size(); } virtual void doFunc(std::shared_ptr spPacket) {} private: std::vector m_thds; std::mutex m_task_lock; Semphore m_task_semphore; volatile int m_task_max_count = 1024; std::list> m_taskList; TASK_FUNCTION m_task_function = nullptr; }; #endif