派生自 development/c++

xuxiuxi
2019-03-04 17675f1c6447b6e014b520608ce6d5f1f2e9707a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#ifndef MYTHREAD_H_XZL_201808081711
#define MYTHREAD_H_XZL_201808081711
 
#include <thread>
#include <vector>
#include <list>
#include <memory>
#include <thread>
#include <chrono>
#include <functional>
#include "mySem.hpp"
 
struct threadInfo {
    bool isRunning;
    std::shared_ptr<std::thread> thd;
};
enum {
    PROCESSOR_FAIL = -1,
    PROCESSOR_SUCCESS = 0,
    PROCESSOR_TIME_OUT = 1,
    PROCESSOR_QUEUE_FULL = 2,
    PROCESSOR_QUEUE_EMPTY = 3
};
 
template<typename PACKET>
class mythread {
    using TASK_FUNCTION = std::function<void(std::shared_ptr<PACKET> &)>;
public:
    mythread(TASK_FUNCTION task_f = nullptr) : m_task_function(task_f) {}
 
    virtual ~mythread() {}
 
    int beginThreads(const int count) {
        for (int i = 0; i < count; i++) {
            threadInfo thInfo;
            thInfo.isRunning = true;
            m_thds.push_back(thInfo);
        }
        for (auto &ele:m_thds) {
            ele.thd = std::make_shared<std::thread>([&]() -> void {
                std::shared_ptr<PACKET> sp_task(nullptr);
                int status = 0;
                while (ele.isRunning) {
                    status = get_task(sp_task, 200);
                    if (status == PROCESSOR_SUCCESS) {
                        doFunc(sp_task);
                        if (m_task_function != nullptr) {
                            m_task_function(sp_task);
                        }
                    }
                }
            });
        }
    }
 
    int endThreads() {
        for (auto &ele:m_thds) {
            ele.isRunning = false;
            ele.thd->join();
        }
    }
 
    int put_task(std::shared_ptr<PACKET> &sp_task, int *p_new_size = nullptr) {
        std::lock_guard<std::mutex> auto_lock(m_task_lock);
 
        int cur_size = static_cast<int>(m_taskList.size());
        if (cur_size >= m_task_max_count) {
            if (p_new_size != nullptr) {
                *p_new_size = cur_size;
            }
            return PROCESSOR_QUEUE_FULL;
        }
 
        m_taskList.emplace_back(sp_task);
        m_task_semphore.signal();
 
        if (p_new_size != nullptr) {
            *p_new_size = cur_size + 1;
        }
 
        return PROCESSOR_SUCCESS;
    }
 
    int get_task(std::shared_ptr<PACKET> &sp_task, const int ms = (-1), int *p_new_size = nullptr) {
        if (m_task_semphore.wait(ms) != SEMPHORE_SUCCESS) {
            return PROCESSOR_TIME_OUT;
        }
 
        std::lock_guard<std::mutex> auto_lock(m_task_lock);
 
        /*
        * 元素入队时信号与元素是1对1关系,由于可以一次取全部队列元素,
        * 所以可能存在等待信号成功但队列中已无元素的情况,故在此判断
        */
        if (m_taskList.empty()) {
            if (p_new_size != nullptr) {
                *p_new_size = 0;
            }
            return PROCESSOR_QUEUE_EMPTY;
        }
 
        sp_task = std::move(m_taskList.front());
        m_taskList.pop_front();
 
        if (p_new_size != nullptr) {
            *p_new_size = static_cast<int>(m_taskList.size());
        }
 
        return PROCESSOR_SUCCESS;
    }
 
    int get_taskListSize() {
        std::lock_guard<std::mutex> auto_lock(m_task_lock);
        return m_taskList.size();
    }
 
    virtual void doFunc(std::shared_ptr<PACKET> spPacket) {}
 
private:
    std::vector<threadInfo> m_thds;
    std::mutex m_task_lock;
    Semphore m_task_semphore;
    volatile int m_task_max_count = 1024;
    std::list<std::shared_ptr<PACKET>> m_taskList;
    TASK_FUNCTION m_task_function = nullptr;
};
 
#endif