-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathSimpleThreadPool.hpp
More file actions
296 lines (243 loc) · 6.06 KB
/
SimpleThreadPool.hpp
File metadata and controls
296 lines (243 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#ifndef SIMPLE_THREAD_POOL_HPP
#define SIMPLE_THREAD_POOL_HPP
/**
* 一个简单的线程池 v0.3
* 采用boost线程库
* 线程函数须采用boost线程库编写
* 线程函数如有参数,可使用boost::bind传入参数
* 编译时链接libboost_thread和libboost_system
*
* 分为两种模式,函数模式和任务模式。
* 函数模式,为每一个传入的函数创建一个线程,函数返回则线程退出;
* 任务模式,将函数封装为任务,任务在线程池内排队,按顺序执行,
* 执行完成后,任务退出,线程等待。
*
* 欢迎补充!
**/
#include <list>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/bind.hpp>
#include <SimpleJob.hpp>
#include <SimpleThreadList.hpp>
#ifndef NAMESPACE_SWITCH_TOOL
#define NAMESPACE_SWITCH_TOOL
#define OPEN_NAMESPACE_SWITCHTOOL namespace Switch { \
namespace Tool {
#define CLOSE_NAMESPACE_SWITCHTOOL }; \
};
#define USING_NAMESPACE_SWITCHTOOL using namespace Switch::Tool;
#endif
OPEN_NAMESPACE_SWITCHTOOL
typedef float SIMPLE_FUNCTION_MODE;
typedef double SIMPLE_JOB_MODE;
template < typename T >
class SimpleThreadPool;
template <>
class SimpleThreadPool < SIMPLE_FUNCTION_MODE >
{
public:
SimpleThreadPool(void)
{
m_nMaxPoolSize = 0;
m_pJoinThread = new boost::thread(boost::bind(&SimpleThreadPool::ThreadJoin, this));
}
SimpleThreadPool(unsigned int size)
{
m_nMaxPoolSize = 0;
m_pJoinThread = new boost::thread(boost::bind(&SimpleThreadPool::ThreadJoin, this));
}
~SimpleThreadPool(void)
{
m_pJoinThread->interrupt();
m_pJoinThread->join();
delete m_pJoinThread;
StopThreadPool();
}
public:
template < typename T >
int AddThread(T func)
{
m_TMutex.lock();
if (m_lpThread.size() >= m_nMaxPoolSize) {
m_TMutex.unlock();
return -1;
}
m_lpThread.push_back(new boost::thread(func));
m_TMutex.unlock();
return 0;
}
/// 停止线程池
int StopThreadPool(void)
{
std::list< boost::thread* >::iterator iter, end;
end = m_lpThread.end();
for (iter = m_lpThread.begin(); iter != end;)
{
boost::thread* p = *iter;
p->interrupt();
p->join();
delete p;
}
return 0;
}
/// 设置线程池大小
int SetMaxPoolSize(unsigned int size)
{
m_TMutex.lock();
if (size > m_nMaxPoolSize || size >= m_lpThread.size())
{
m_nMaxPoolSize = size;
m_TMutex.unlock();
return 0;
}
m_TMutex.unlock();
return -1;
}
/// 获取线程池大小
int GetMaxPoolSize(void)
{
m_TMutex.lock();
int size = m_nMaxPoolSize;
m_TMutex.unlock();
return size;
}
/// 获取线程池当前大小
int GetPoolSize(void)
{
m_TMutex.lock();
int size = m_lpThread.size();
m_TMutex.unlock();
return size;
}
private:
void ThreadJoin(void)
{
std::list< boost::thread* >::iterator iter, end;
while(1)
{
m_TMutex.lock();
end = m_lpThread.end();
for (iter = m_lpThread.begin(); iter != end;)
{
if ((*iter)->timed_join(boost::posix_time::microseconds(1)))
{
boost::thread* p = *iter;
iter = m_lpThread.erase(iter);
delete p;
}
else
{
++iter;
}
}
m_TMutex.unlock();
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
}
private:
boost::mutex m_TMutex;
std::list< boost::thread* > m_lpThread;
boost::thread* m_pJoinThread;
unsigned int m_nMaxPoolSize;
};
template <>
class SimpleThreadPool < SIMPLE_JOB_MODE >
{
public:
SimpleThreadPool(void)
{
m_oTrdList.set_thread_func(boost::bind(&SimpleThreadPool::dispatch_thread, this));
m_bStopFlag = false;
}
SimpleThreadPool(unsigned int max_thread_size, unsigned int max_job_size)
{
m_oTrdList.set_thread_func(boost::bind(&SimpleThreadPool::dispatch_thread, this));
m_oTrdList.set_max_thread_num(max_thread_size);
m_oJobList.set_max_job_num(max_job_size);
m_bStopFlag = false;
}
bool set_max_thread_size(unsigned int max)
{
return m_oTrdList.set_max_thread_num(max);
}
bool set_max_job_size(unsigned int max)
{
return m_oJobList.set_max_job_num(max);
}
int get_max_thread_size(void)
{
return m_oTrdList.get_max_thread_num();
}
int get_max_job_size(void)
{
return m_oJobList.get_max_job_num();
}
int get_current_job_num(void)
{
return m_oJobList.get_current_job_num();
}
bool add_job_nonblock(const SimpleJob< void >& job)
{
m_oJobList.lock_add_job();
bool ret = m_oJobList.push_back(job);
m_oJobList.unlock_add_job();
if (ret)
{
m_oJobList.notify_one_get_job();
}
return ret;
}
bool add_job_block(const SimpleJob< void >& job)
{
m_oJobList.lock_add_job();
m_oJobList.wait_add_job();
if (m_bStopFlag)
{
m_oJobList.unlock_add_job();
return false;
}
bool ret = m_oJobList.push_back(job);
m_oJobList.unlock_add_job();
if (ret)
{
m_oJobList.notify_one_get_job();
}
return ret;
}
void stop(void)
{
m_bStopFlag = true;
m_oJobList.stop();
m_oTrdList.stop();
}
private:
void dispatch_thread(void)
{
SimpleJob< void > job;
while (1)
{
m_oJobList.lock_get_job();
m_oJobList.wait_get_job();
if (m_bStopFlag)
{
m_oJobList.unlock_get_job();
return;
}
m_oJobList.pop_front(job);
m_oJobList.unlock_get_job();
m_oJobList.notify_all_add_job();
job.CallJob();
}
}
private:
boost::mutex m_oMutex;
SimpleJobList m_oJobList;
SimpleThreadList m_oTrdList;
bool m_bStopFlag;
};
typedef SimpleThreadPool < SIMPLE_FUNCTION_MODE > STFPool;
typedef SimpleThreadPool < SIMPLE_JOB_MODE > STJPool;
typedef SimpleJob< void > SJob;
CLOSE_NAMESPACE_SWITCHTOOL
#endif // SIMPLE_THREAD_POOL_HPP