基于无锁队列和c++11的高性能线程池


基于无锁队列和c++11的高性能线程池
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上
 
标签: <无>
 

代码片段(6)[全屏查看所有代码]

1. [代码]lckfree.h     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// lckfree.h
// Implementation of lock free queue using CAS operations
// for simple multi-threading use cases like:
// 1. multiple worker to process incoming messages
// 2. async processing using a thread pool
// 3. simple tcp server deal with async requests
// Author: typhoon_1986@163.com
// Refrence: http://coolshell.cn/articles/8239.html
 
#ifndef __LCKFREE_H__
#define __LCKFREE_H__
 
#include <string>
using namespace std;
 
namespace bfd {
 
struct LinkNode {
   string data;
   LinkNode* next;
};
typedef struct LinkNode LinkNode;
 
class LckFreeQueue {
  public :
   LckFreeQueue();
   ~LckFreeQueue();
 
   int push( const string &msg);
   string pop();  // non-block pop method
//  string bpop(); // block pop method
   bool empty();
  private :
   LinkNode * head_;
   LinkNode * tail_;
   bool empty_;
   unsigned int length_;
};
 
} // namespace bfd
#endif

2. [代码]lckfree.cpp     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <lckfree.h>
 
namespace bfd {
 
LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_( true ), length_(0) {
   head_ = new LinkNode;
   head_->next = NULL;
   tail_ = head_;
}
 
LckFreeQueue::~LckFreeQueue() {
   LinkNode *p = head_;
   if (p) {
     LinkNode *q = p->next;
     delete p;
     p = q;
   }
}
 
int LckFreeQueue::push( const string &msg) {
   LinkNode * q = new LinkNode;
   q->data = msg;
   q->next = NULL;
 
   LinkNode * p = tail_;
   LinkNode * oldp = p;
   do {
     while (p->next != NULL)
         p = p->next;
   } while ( __sync_bool_compare_and_swap(&(p->next), NULL, q) != true ); //如果没有把结点链在尾上,再试
 
   __sync_bool_compare_and_swap(&tail_, oldp, q); //置尾结点
   return 0;
}
 
string LckFreeQueue::pop() {
   LinkNode * p;
   do {
     p = head_;
     if (p->next == NULL){
       return "" ;
     }
   } while ( __sync_bool_compare_and_swap(&head_, p, p->next) != true );
   return p->next->data;
}
 
bool LckFreeQueue::empty() {
   return empty_;
}
 
}

3. [代码]workthreadpool.h     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// workthreadpool.h
// 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据
#ifndef __WORK_THREAD_POOL__
#define __WORK_THREAD_POOL__
 
#include <stdio.h>
#include <thread>
#include <queue>
#include <string>
#include <vector>
#include "lckfree.h"
 
using namespace std;
namespace bfd {
 
class WorkThreadPool {
  public :
   WorkThreadPool( int size);
   virtual ~WorkThreadPool();
 
   // 需要子类继承并实现的函数,每个线程实际执行的内容
   virtual void Init() {};
   virtual void Finish() {};
   virtual void Handle( const string &msg)=0;
 
   // 将消息放入处理队列, 消息只支持string类型
   int SendMessage( const string &msg);
 
   int Start();
   int Stop();
 
  private :
   void Worker();
 
   int size_;
   LckFreeQueue msg_queue_; // 线程池的协作基于这个无锁队列
   vector< thread > thread_pool_;
};
} // namespace
#endif

4. [代码]workthreadpool.cpp     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "workthreadpool.h"
#include <sstream>
#include <unistd.h>
 
namespace bfd {
WorkThreadPool::WorkThreadPool( int size) {
   if (size <= 0) { // 最小也需要有1个线程
     size_ = 1;
   } else {
     size_ = size;
   }
}
 
WorkThreadPool::~WorkThreadPool() {
 
}
 
int WorkThreadPool::SendMessage( const string &msg) {
   msg_queue_.push(msg);
   return 0;
}
 
void WorkThreadPool::Worker() {
   unsigned int msg_count = 0;
   while (1) {
     string msg = msg_queue_.pop();
     if (msg.empty()) {
       printf ( "no msg got, sleep for 0.1 sec\n" );
       usleep(100000); // 0.1 sec
       continue ;
     }
 
     if (msg == "__exit__" ) {
       stringstream ss;
       ss << "exit worker: " << std::this_thread::get_id() << ", processed: " << msg_count << ".." ;
       printf ( "%s\n" , ss.str().c_str());
       return ;
     }
     Handle(msg);
     msg_count++;
     if (msg_count % 1000 == 0) {
       printf ( "every 1000 msg count\n" );
     }
   }
}
 
int WorkThreadPool::Start() {
   for ( int i=0; i < size_; i++) {
     thread_pool_.push_back( thread (&WorkThreadPool::Worker, this ) );
   }
   return 0;
}
 
int WorkThreadPool::Stop() {
   for ( int i=0; i < size_; i++) {
     SendMessage( "__exit__" );
   }
   for ( int i=0; i < size_; i++) {
     thread_pool_[i].join();
   }
   return 0;
}
 
}

5. [代码]main.cpp     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "workthreadpool.h"
#include <sstream>
#include <math.h>
 
class MyThreadPool : public bfd::WorkThreadPool {
  public :
   MyThreadPool( int size) : bfd::WorkThreadPool(size) {
   }
   void Handle( const string &msg) {
     stringstream ss;
     ss << "worker (" << std::this_thread::get_id() << ") got msg: " << msg;
     printf ( "%s\n" , ss.str().c_str());
     for ( int i=0; i<=999999; i++) {
       double result = sqrt ( sqrt (i) / 93.234);
     }
   }
};
 
int main() {
   printf ( "start running ....\n" );
   MyThreadPool pool(5);
   pool.Start();
   for ( int i=0; i<100; i++) {
     pool.SendMessage( "msg info ----------" );
   }
   pool.Stop();
 
   return 0;
}

6. [代码]Makefile     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp
TEST_SRC_FILES = src/main.cpp
INCLUDE_DIR = src
STD_FLAG = -std=c++0x
 
all: main.o libs
     g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread
     
main.o: $(TEST_SRC_FILES)
     g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR)
 
libs: $(LIB_SRC_FILES)
     g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread
 
.PHONY : clean
clean :
     rm -f test_workthreadpool main.o libworkthreadpool.so


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM