基于无锁队列和c++11的高性能线程池
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上
标签: <无>
代码片段(6)[全屏查看所有代码]
1. [代码]lckfree.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// lckfree.h
// Implementation of lock free queue using CAS operations
// for simple multi-threading use cases like:
// 1. multiple worker to process incoming messages
// 2. async processing using a thread pool
// 3. simple tcp server deal with async requests
// Author: typhoon_1986@163.com
// Refrence: http://coolshell.cn/articles/8239.html
#ifndef __LCKFREE_H__
#define __LCKFREE_H__
#include <string>
using
namespace
std;
namespace
bfd {
struct
LinkNode {
string data;
LinkNode* next;
};
typedef
struct
LinkNode LinkNode;
class
LckFreeQueue {
public
:
LckFreeQueue();
~LckFreeQueue();
int
push(
const
string &msg);
string pop();
// non-block pop method
// string bpop(); // block pop method
bool
empty();
private
:
LinkNode * head_;
LinkNode * tail_;
bool
empty_;
unsigned
int
length_;
};
}
// namespace bfd
#endif
|
2. [代码]lckfree.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#include <lckfree.h>
namespace
bfd {
LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_(
true
), length_(0) {
head_ =
new
LinkNode;
head_->next = NULL;
tail_ = head_;
}
LckFreeQueue::~LckFreeQueue() {
LinkNode *p = head_;
if
(p) {
LinkNode *q = p->next;
delete
p;
p = q;
}
}
int
LckFreeQueue::push(
const
string &msg) {
LinkNode * q =
new
LinkNode;
q->data = msg;
q->next = NULL;
LinkNode * p = tail_;
LinkNode * oldp = p;
do
{
while
(p->next != NULL)
p = p->next;
}
while
( __sync_bool_compare_and_swap(&(p->next), NULL, q) !=
true
);
//如果没有把结点链在尾上,再试
__sync_bool_compare_and_swap(&tail_, oldp, q);
//置尾结点
return
0;
}
string LckFreeQueue::pop() {
LinkNode * p;
do
{
p = head_;
if
(p->next == NULL){
return
""
;
}
}
while
( __sync_bool_compare_and_swap(&head_, p, p->next) !=
true
);
return
p->next->data;
}
bool
LckFreeQueue::empty() {
return
empty_;
}
}
|
3. [代码]workthreadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// workthreadpool.h
// 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据
#ifndef __WORK_THREAD_POOL__
#define __WORK_THREAD_POOL__
#include <stdio.h>
#include <thread>
#include <queue>
#include <string>
#include <vector>
#include "lckfree.h"
using
namespace
std;
namespace
bfd {
class
WorkThreadPool {
public
:
WorkThreadPool(
int
size);
virtual
~WorkThreadPool();
// 需要子类继承并实现的函数,每个线程实际执行的内容
virtual
void
Init() {};
virtual
void
Finish() {};
virtual
void
Handle(
const
string &msg)=0;
// 将消息放入处理队列, 消息只支持string类型
int
SendMessage(
const
string &msg);
int
Start();
int
Stop();
private
:
void
Worker();
int
size_;
LckFreeQueue msg_queue_;
// 线程池的协作基于这个无锁队列
vector<
thread
> thread_pool_;
};
}
// namespace
#endif
|
4. [代码]workthreadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#include "workthreadpool.h"
#include <sstream>
#include <unistd.h>
namespace
bfd {
WorkThreadPool::WorkThreadPool(
int
size) {
if
(size <= 0) {
// 最小也需要有1个线程
size_ = 1;
}
else
{
size_ = size;
}
}
WorkThreadPool::~WorkThreadPool() {
}
int
WorkThreadPool::SendMessage(
const
string &msg) {
msg_queue_.push(msg);
return
0;
}
void
WorkThreadPool::Worker() {
unsigned
int
msg_count = 0;
while
(1) {
string msg = msg_queue_.pop();
if
(msg.empty()) {
printf
(
"no msg got, sleep for 0.1 sec\n"
);
usleep(100000);
// 0.1 sec
continue
;
}
if
(msg ==
"__exit__"
) {
stringstream ss;
ss <<
"exit worker: "
<< std::this_thread::get_id() <<
", processed: "
<< msg_count <<
".."
;
printf
(
"%s\n"
, ss.str().c_str());
return
;
}
Handle(msg);
msg_count++;
if
(msg_count % 1000 == 0) {
printf
(
"every 1000 msg count\n"
);
}
}
}
int
WorkThreadPool::Start() {
for
(
int
i=0; i < size_; i++) {
thread_pool_.push_back(
thread
(&WorkThreadPool::Worker,
this
) );
}
return
0;
}
int
WorkThreadPool::Stop() {
for
(
int
i=0; i < size_; i++) {
SendMessage(
"__exit__"
);
}
for
(
int
i=0; i < size_; i++) {
thread_pool_[i].join();
}
return
0;
}
}
|
5. [代码]main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#include "workthreadpool.h"
#include <sstream>
#include <math.h>
class
MyThreadPool :
public
bfd::WorkThreadPool {
public
:
MyThreadPool(
int
size) : bfd::WorkThreadPool(size) {
}
void
Handle(
const
string &msg) {
stringstream ss;
ss <<
"worker ("
<< std::this_thread::get_id() <<
") got msg: "
<< msg;
printf
(
"%s\n"
, ss.str().c_str());
for
(
int
i=0; i<=999999; i++) {
double
result =
sqrt
(
sqrt
(i) / 93.234);
}
}
};
int
main() {
printf
(
"start running ....\n"
);
MyThreadPool pool(5);
pool.Start();
for
(
int
i=0; i<100; i++) {
pool.SendMessage(
"msg info ----------"
);
}
pool.Stop();
return
0;
}
|
6. [代码]Makefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp
TEST_SRC_FILES = src/main.cpp
INCLUDE_DIR = src
STD_FLAG = -std=c++0x
all: main.o libs
g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread
main.o: $(TEST_SRC_FILES)
g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR)
libs: $(LIB_SRC_FILES)
g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread
.PHONY : clean
clean :
rm -f test_workthreadpool main.o libworkthreadpool.so
|