隨着處理器往多核的發展,多線程被越來越多的應用到軟件的開發中。但是如果沒有正確的使用多線程,反而可能會導致軟件性能的下降。
多線程程序中一個影響程序性能的因素就是同步。對於windows系統來說,最快的同步方案就是critical_section,critical_section基本上可以被認為是一個用戶態的同步機制(特別是設定了spincount,只有在自旋超過了spincount次之后任然不能獲得鎖,才會切入核心態並把當前線程阻塞).但即使是這樣,如果在臨界區中的代碼如果處理時間比較長,任然會導致處理器浪費在自旋上。如果我們可以讓線程在無法獲得鎖的時候就切換線程(當然是在用戶態,切換核心態線程的代價很大,除了進入核心態的開銷,還有因為線程切換而導致緩存失效帶來的代價)那么就可以把浪費在自旋上的cpu時間用來做有用的工作了。
下面介紹一種利用用戶態線程的多線程解決方案,首先,創建跟cpu數量一致的線程,每個線程上將會運行一個用戶級線程調度器。
所有的業務處理都交給用戶級線程處理,每當用戶級線程無法獲得鎖時,就將自己阻塞並回到調度器中,由調度器選擇另一個用戶級線程來運行。當一個用戶級線程釋放鎖的時候,會喚醒一個阻塞在這個鎖上的用戶級線程。當然,因為用戶級線程是沒有時間片控制的,如果在里面處理耗時的代碼,將會導致在同一調度器上運行的其它用戶級線程無法運行。
下面是代碼:
首先是一個lockfree隊列,隊列是線程安全的,並且無需任何鎖
lockfree_queue.h
#ifndef _LOCKFREE_QUEUE_H
#define _LOCKFREE_QUEUE_H
template <typename T>
struct _node
{
T val;
_node<T> *_next;
};
template <typename T>
class LockFreeQueue
{
public:
LockFreeQueue():_head(0){}
//在隊列頭插入一個節點
void push(_node<T> *newnode)
{
while(1)
{
_node<T> *lhead = _head;//本地保存
newnode->_next = lhead;
//成功就退出,失敗就重做
if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,newnode,lhead) == lhead)
break;
}
}
//從隊列頭彈出一個節點
_node<T>* pop()
{
while(1)
{
_node<T> *lhead = _head;//本地保存
if(!lhead)
return NULL;
_node<T> *ret = _head;
if(InterlockedCompareExchangePointer((volatile PVOID *)&_head,_head->_next,lhead) == lhead)
{
ret->_next = NULL;
return ret;
}
}
}
private:
_node<T> *_head;
};
#endif
用戶級線程
uthread.h
#ifndef _UTHREAD_H
#define _UTHREAD_H
#include <Windows.h>
#include "lockfree_queue.h"
//#include "luaWrapper.h"
enum
{
NONE,
WAIT4EVENT = 1, //等待某事件的來臨
DEAD, //纖程已死亡
ACTIVED, //可運行的
UNACTIVED, //不可被添加到調度隊列中
YIELD,
SLEEP,
};
enum
{
BS_MOV = 0,
BS_ATK,
BS_OTHER,
BS_WAIT4LOCK,
BS_END,
};
//阻塞結構
class BlockStruct
{
public:
BlockStruct(uChar type=BS_OTHER):bs_type(type){}
//返回true纖程將從block中返回
virtual bool WakeUp() = 0;
uChar bs_type;
};
typedef int uthread_t;
class uthread;
class runnable
{
public:
virtual void main_routine() = 0;
};
class uthread;
struct st_timeout
{
st_timeout(uthread *ut):ut(ut),_timeout(0),index(0){}
bool operator < (st_timeout &r)
{
return _timeout < r._timeout;
}
uLong _timeout;
uthread *ut;
int index;//在超時隊列中的下標
private:
st_timeout & operator = (const st_timeout &other);
st_timeout(const st_timeout &other);
};
class Scheduler;
class uthread;
struct ulstruct
{
void *lock_addr;
uthread *ut;
};
//纖程
class uthread
{
public:
uthread(Scheduler *sc):m_runnable(0),m_bs(0),uthread_id(-1),m_status(NONE),_st_timeout(this),p_uthreadContext(0),m_next(0),wakeuptick(0),m_scheduler(sc)
{
m_unlockevent = (_node<ulstruct>*)_aligned_malloc(sizeof(*m_unlockevent),4);
m_locknode = (_node<uthread*>*)_aligned_malloc(sizeof(*m_locknode),4);
m_locknode->val = this;
m_unlockevent->val.ut = this;
}
static void WINAPI fiber_routine(LPVOID pvParam);
Scheduler *GetScheduler()
{
return m_scheduler;
}
//有事件到達,嘗試喚醒block的纖程
void Signal();
void SetStatus(unsigned char st)
{
m_status = st;
}
unsigned char GetStatus()
{
return m_status;
}
PVOID GetUContext()
{
return p_uthreadContext;
}
void SetUContext(PVOID uct)
{
p_uthreadContext = uct;
}
void SetBs(BlockStruct *bs)
{
m_bs = bs;
}
BlockStruct *GetBs()
{
return m_bs;
}
void SetRunnable(runnable *ra)
{
m_runnable = ra;
}
runnable *GetRunnable()
{
return m_runnable;
}
st_timeout &GetTimeoutSt()
{
return _st_timeout;
}
uthread_t GetUid()
{
return uthread_id;
}
void SetUid(uthread_t uid)
{
uthread_id = uid;
}
uthread *Next()
{
return m_next;
}
void SetNext(uthread *ut)
{
m_next = ut;
}
uLong wakeuptick;
_node<ulstruct>* GetUnlockEvent()
{
return m_unlockevent;
}
_node<uthread*>* GetLockNode()
{
return m_locknode;
}
private:
unsigned char m_status;
uthread_t uthread_id;
PVOID p_uthreadContext;
BlockStruct *m_bs;
runnable *m_runnable;
uthread *m_next;
st_timeout _st_timeout;
_node<ulstruct>* m_unlockevent;
_node<uthread*>* m_locknode;
Scheduler *m_scheduler;
};
#endif
uthread.cpp
#include "stdafx.h"
#include "uthread.h"
#include "fiberApi.h"
#include <assert.h>
#include <iostream>
#include "ulock.h"
void WINAPI uthread::fiber_routine(LPVOID pvParam)
{
uthread *_uthread = (uthread*)pvParam;
while(1)
{
assert(_uthread->m_runnable);
std::cout << "Ai Start,threadid :" << _uthread->uthread_id << std::endl;
_uthread->m_runnable->main_routine();
std::cout << "Ai Stop" << std::endl;
_uthread->m_runnable = 0;
//從可運行隊列中刪除
//Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = UNACTIVED;
//SetCurrentUthreadState(UNACTIVED);
//Scheduler::ReleaseUthread(Scheduler::m_curuid);
ReleaseCurrentUthread();
//Scheduler::_Yield(UNACTIVED);
_Yield(UNACTIVED);
}
//Scheduler::m_uthreads[Scheduler::m_curuid]->m_status = DEAD;
//SetCurrentUthreadState(DEAD);
/*這里不能直接退出纖程運行函數,否則會導致運行線程的退出,
* 正確的做法是把運行權交回給scheduler,由scheduler來刪除
* 這個纖程
*/
//Scheduler::_Yield(DEAD);
_Yield(DEAD);
}
//等待的事件到達了,將纖程重新插入到可運行隊列中
void uthread::Signal()
{
if(m_bs->WakeUp())
{
//printf("滿足喚醒條件 %d /n",this->GetUid());
//等待的條件滿足了,把fiber置為可運行態並添加到運行隊列中
//Scheduler::Add2Active(this);
Add2Active(this);
m_bs = 0;
wakeuptick = 0;
}
}
然后是用戶態的鎖
uLock.h
#ifndef _ULOCK_H
#define _ULOCK_H
#pragma pack(push)
#pragma pack(4)
#include "fiberApi.h"
#include "lockfree_queue.h"
class Scheduler;
//纖程間使用的用戶級鎖
struct umutex
{
friend class Scheduler;
public:
umutex():flag(0){}
void Lock()
{
if(InterlockedCompareExchange(&flag,1,0) == 1)
{
uthread *currentUThread = GetCurrentUThread();
_node<uthread*> *tmp = currentUThread->GetLockNode();
m_blockthread.push(tmp);
//加鎖失敗,阻塞當前纖程
Wait4Lock();
}
}
void UnLock()
{
if(InterlockedCompareExchange(&flag,0,1) == 0)
{
//沒有lock
return;
}
//已經解鎖,喚醒阻塞在這個鎖上的纖程
_node<uthread*> *tmp = m_blockthread.pop();
if(tmp)
{
NotifyUnLock(this,tmp->val);
}
}
private:
bool _Lock(uthread *ut)
{
bool ret = InterlockedCompareExchange(&flag,1,0) == 0;
if(!ret)
{
//uthread *currentUThread = GetCurrentUThread();
_node<uthread*> *tmp = ut->GetLockNode();
m_blockthread.push(tmp);
}
return ret;
}
private:
volatile long flag;//如果被持有則置1,否則置0
LockFreeQueue<uthread*> m_blockthread;//阻塞在這個鎖上的纖程
};
#pragma pack(pop)
#endif
調度器
scheduler.h
#ifndef _SCHEDULER_H
#define _SCHEDULER_H
#include <Windows.h>
#include "uthread.h"
#include <map>
#include <list>
#include <time.h>
#include "minHeap.h"
#include "lockfree_queue.h"
#define MAX_FIBER 32
class Scheduler
{
friend class uthread;
friend void _Yield(uChar);
friend void ReleaseUthread(int);
friend void ReleaseCurrentUthread();
friend void SetCurrentUthreadState(uChar);
friend void Add2Active(uthread*);
friend uthread *GetCurrentUThread();
friend uthread_t GetCurrentUThreadId();
public:
Scheduler():m_active_head(0),m_active_tail(0),m_count(0),m_curuid(-1),pending_index(0)
{}
//初始化纖程庫
void Init();
void Destroy();
//將一個纖程加入到調度列表中以運行runnable
uthread_t FiberStartRun(runnable *param);
//選擇一個纖程以進行調度
void Schedule();
void SwitchTo(uthread_t uid)
{
SwitchToFiber(m_uthreads[uid]->GetUContext());
}
void SwitchToBlock(uthread_t uid)
{
if(m_uthreads[uid]->GetBs())
SwitchTo(uid);
}
void _Yield(uChar status = YIELD)
{
m_uthreads[m_curuid]->SetStatus(status);
SwitchToFiber(m_pUthreadContext);
}
//將一個纖程添加到可運行隊列中
void Add2Active(uthread *ut);
//阻塞纖程,直到wc得到滿足
void Block(BlockStruct *bs,uLong ms);
uthread_t GetFreeUthread()
{
if(!m_uthreadpool.empty())
{
uthread_t ret = m_uthreadpool.front();
m_uthreadpool.pop_front();
return ret;
}
return -1;
}
void ReleaseUthread(uthread_t uid)
{
if(uid < MAX_FIBER)
{
m_uthreads[uid]->SetStatus(UNACTIVED);
m_uthreadpool.push_back(uid);
}
}
//嘗試喚醒uid
void TryWakeup(uthread_t uid)
{
if(m_uthreads[uid]->GetBs())
m_uthreads[uid]->Signal();
}
//強制喚醒纖程
void ForceWakeup(uthread_t uid)
{
if(m_uthreads[uid]->GetStatus() != ACTIVED)
{
//printf("強制喚醒/n");
Add2Active(m_uthreads[uid]);
}
}
//強制喚醒阻塞在type條件上的纖程
void ForceWakeup(uthread_t uid,uChar type)
{
if(m_uthreads[uid]->GetStatus() != ACTIVED &&
m_uthreads[uid]->GetBs()->bs_type == type)
{
//printf("強制喚醒/n");
Add2Active(m_uthreads[uid]);
}
}
//清空activelist,和pendingadd
void ClearActiveList();
void ClearTimeOut()
{
m_timeoutlist.Clear();
}
void Sleep(uLong ms);
void NotifyUnlock(_node<ulstruct> *nn)
{
m_unlockevent.push(nn);
}
void Wait4Lock();
private:
uthread *m_active_head;
uthread *m_active_tail;
//也可以不使用m_pendingAdd,根據測試結果決定
uthread_t m_pendingAdd[MAX_FIBER];
unsigned int pending_index;
minheap<MAX_FIBER> m_timeoutlist;
PVOID m_pUthreadContext;//調度器所在纖程的上下文
uthread *m_uthreads[MAX_FIBER];
LockFreeQueue<ulstruct> m_unlockevent;
int m_count;
int m_curuid; //當前正在運行的纖程的uid,==-1表示在scheduler中運行
std::list<uthread_t> m_uthreadpool;//fiber池
//std::map<void*,std::list<uthread*> > m_wait4lock;
//std::list<uthread*> m_wait4lock;
static const int reservesize = 65536;
static const int commitsize = 8192;
};
#endif
scheduler.cpp
#include "stdafx.h"
//#include "Scheduler.h"
#include <assert.h>
#include <iostream>
#include "fiberApi.h"
#include "uLock.h"
//extern umutex *g_lock;
uthread_t Scheduler::FiberStartRun(runnable *param)
{
uthread_t uid = GetFreeUthread();
if(uid != -1)
{
m_uthreads[uid]->SetRunnable(param);
Add2Active(m_uthreads[uid]);
}
return uid;
}
void Scheduler::Schedule()
{
{
//看看是否有可以獲取鎖的纖程
_node<ulstruct> *tmp = NULL;
while(tmp = m_unlockevent.pop())
{
umutex *um = (umutex*)tmp->val.lock_addr;
uthread *ut = tmp->val.ut;
if(um->_Lock(ut))
{
//加鎖成功,將纖程從等待隊列中刪除並投入到可運行隊列中
Add2Active(ut);
}
//std::map<void*,std::list<uthread*> >::iterator it = m_wait4lock.find(tmp->val);
//if(it != m_wait4lock.end())
//{
//嘗試加鎖
/*if(!it->second.empty())
{
umutex *um = (umutex*)it->first;
uthread *ut = it->second.front();
if(um->_Lock(ut))
{
//加鎖成功,將纖程從等待隊列中刪除並投入到可運行隊列中
it->second.pop_front();
Add2Active(ut);
}
}*/
//}
//else
//{
//在Wait4Lock調用完成前,其它線程的解鎖可能已經調用過NotifyUnLock了,
//所以這里把解鎖消息重新放回隊列中,再次嘗試
// m_unlockevent.push(tmp);
//}
}
}
//將所有等待添加到m_activeList中的纖程都添加進去
{
for(unsigned int i = 0; i < pending_index; ++i)
{
uthread *ut = m_uthreads[m_pendingAdd[i]];
ut->SetNext(0);
if(m_active_tail)
{
m_active_tail->SetNext(ut);
m_active_tail = ut;
}
else
{
m_active_head = m_active_tail = ut;
}
}
pending_index = 0;
}
uthread *cur = m_active_head;
uthread *pre = NULL;
while(cur)
{
m_curuid = cur->GetUid();
SwitchToFiber(cur->GetUContext());
m_curuid = -1;
unsigned char status = cur->GetStatus();
//當纖程處於以下狀態時需要從可運行隊列中移除
if(status == DEAD || status == SLEEP || status == WAIT4EVENT || status == UNACTIVED || status == YIELD)
{
//刪除首元素
if(cur == m_active_head)
{
//同時也是尾元素
if(cur == m_active_tail)
m_active_head = m_active_tail = NULL;
else
m_active_head = cur->Next();
}
else if(cur == m_active_tail)
{
pre->SetNext(NULL);
m_active_tail = pre;
}
else
pre->SetNext(cur->Next());
uthread *tmp = cur;
cur = cur->Next();
tmp->SetNext(0);
//如果僅僅是讓出處理器,需要重新投入到可運行隊列中
if(status == YIELD)
Add2Active(tmp);
}
else
{
pre = cur;
cur = cur->Next();
}
}
//看看有沒有timeout的纖程
{
uLong now = GetTickCount();
while(m_timeoutlist.Min() !=0 && m_timeoutlist.Min() <= now)
{
st_timeout *timeout = m_timeoutlist.PopMin();
if(timeout->ut->GetStatus() == WAIT4EVENT || timeout->ut->GetStatus() == SLEEP)
{
timeout->ut->wakeuptick = timeout->_timeout;
Add2Active(timeout->ut);
}
}
}
}
void Scheduler::Destroy()
{
for(int i = 0; i < MAX_FIBER; ++i)
{
if(m_uthreads[i])
{
DeleteFiber(m_uthreads[i]->GetUContext());
delete m_uthreads[i];
}
}
ConvertFiberToThread();
}
void Scheduler::Block(BlockStruct *bs,uLong ms)
{
if(ms > 0)
{
st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt();
_st_timeout._timeout = GetTickCount() + ms;//time(NULL) + timeout;
if(!_st_timeout.index)
{
m_timeoutlist.Insert(&_st_timeout);
}
else
{
m_timeoutlist.Change(&_st_timeout);
}
}
m_uthreads[m_curuid]->SetBs(bs);
m_uthreads[m_curuid]->SetStatus(WAIT4EVENT);
SwitchToFiber(m_pUthreadContext);
m_uthreads[m_curuid]->SetBs(0);
}
void Scheduler::Init()
{
m_pUthreadContext = ConvertThreadToFiber(NULL);
//創建fiber池
for(int i = 0 ; i < MAX_FIBER; ++i)
{
uthread *nthread = new uthread(this);
PVOID uthreadcontext = CreateFiberEx(commitsize,reservesize,0,uthread::fiber_routine,nthread);
assert(uthreadcontext);
nthread->SetUContext(uthreadcontext);
m_uthreads[i] = nthread;
nthread->SetUid(i);
m_uthreadpool.push_back(i);
}
}
//將一個纖程添加到可運行隊列中
void Scheduler::Add2Active(uthread *ut)
{
//如果已經在active中了則不能再次添加
if(ut->GetStatus() != ACTIVED)
{
ut->SetStatus(ACTIVED);
m_pendingAdd[pending_index++] = ut->GetUid();
}
}
void Scheduler::ClearActiveList()
{
pending_index = 0;
uthread *cur = m_active_head;
while(cur)
{
uthread *next = cur->Next();
cur->SetNext(0);
cur = next;
}
m_active_head = m_active_tail = NULL;
}
void Scheduler::Sleep(uLong ms)
{
if(ms > 0)
{
st_timeout &_st_timeout = m_uthreads[m_curuid]->GetTimeoutSt();
_st_timeout._timeout = GetTickCount() + ms;//time(NULL) + seconds;
if(!_st_timeout.index)
{
m_timeoutlist.Insert(&_st_timeout);
}
else
{
m_timeoutlist.Change(&_st_timeout);
}
m_uthreads[m_curuid]->SetStatus(SLEEP);
}
SwitchToFiber(m_pUthreadContext);
}
//纖程在等待lock_addr鎖,需要將纖程移出可運行隊列,並記等待信息
void Scheduler::Wait4Lock()
{
/*std::map<void*,std::list<uthread*> >::iterator it = m_wait4lock.find(lock_addr);
uthread *current_uthread = m_uthreads[m_curuid];
if(it == m_wait4lock.end())
m_wait4lock.insert(std::make_pair(lock_addr,std::list<uthread*>(1,current_uthread)));
else
it->second.push_back(current_uthread);
*/
uthread *current_uthread = m_uthreads[m_curuid];
//m_wait4lock.push_back(current_uthread);
current_uthread->SetStatus(WAIT4EVENT);
//切換回調度器
SwitchToFiber(m_pUthreadContext);
}
然后是一些API
fiberApi.h
#ifndef _FIBERAPI_H
#define _FIBERAPI_H
#include "Scheduler.h"
#include <map>
//與每個線程相關的纖程調度器
//extern std::map<DWORD,Scheduler*> g_tlssc;
extern Scheduler* g_tlssc[1019];
void _Yield(uChar);
void ReleaseUthread(int);
void ReleaseCurrentUthread();//釋放當前的纖程
void SetCurrentUthreadState(uChar);//設置當前纖程的狀態
void Add2Active(uthread*);
uthread *GetCurrentUThread();
uthread_t GetCurrentUThreadId();
void Wait4Lock();
void NotifyUnLock(void*,uthread*);
#endif
fiberApi.cpp
#include "stdafx.h"
#include "fiberApi.h"
//std::map<DWORD,Scheduler*> g_tlssc;
Scheduler* g_tlssc[1019];
void _Yield(uChar state)
{
DWORD currenttrheadid = GetCurrentThreadId();
g_tlssc[currenttrheadid%512]->_Yield(state);
}
void ReleaseUthread(int uthreadid)
{
DWORD currenttrheadid = GetCurrentThreadId();
g_tlssc[currenttrheadid%512]->ReleaseUthread(uthreadid);
}
void ReleaseCurrentUthread()
{
DWORD currenttrheadid = GetCurrentThreadId();
Scheduler *sc = g_tlssc[currenttrheadid%512];
sc->ReleaseUthread(sc->m_curuid);
}
void SetCurrentUthreadState(uChar state)
{
DWORD currenttrheadid = GetCurrentThreadId();
Scheduler *sc = g_tlssc[currenttrheadid%512];
sc->m_uthreads[sc->m_curuid]->SetStatus(state);
}
void Add2Active(uthread *ut)
{
DWORD currenttrheadid = GetCurrentThreadId();
g_tlssc[currenttrheadid%512]->Add2Active(ut);
}
uthread *GetCurrentUThread()
{
DWORD currenttrheadid = GetCurrentThreadId();
Scheduler *sc = g_tlssc[currenttrheadid%512];
return sc->m_uthreads[sc->m_curuid];
}
uthread_t GetCurrentUThreadId()
{
DWORD currenttrheadid = GetCurrentThreadId();
Scheduler *sc = g_tlssc[currenttrheadid%512];
return sc->m_curuid;
}
void Wait4Lock()
{
DWORD currenttrheadid = GetCurrentThreadId();
g_tlssc[currenttrheadid%512]->Wait4Lock();
}
void NotifyUnLock(void *lock_addr,uthread *ut)
{
_node<ulstruct> *nn = ut->GetUnlockEvent();
nn->val.lock_addr = lock_addr;
ut->GetScheduler()->NotifyUnlock(nn);
//g_tlssc[threadid]->NotifyUnlock(lock_addr);
//std::map<DWORD,Scheduler>::iterator it = g_tlssc.begin();
//std::map<DWORD,Scheduler>::iterator end = g_tlssc.end();
//for(; it != end; ++it)
// it->second.NotifyUnlock(lock_addr);
}
經過進一步測試,在ulock的lock和unlock中使用的無鎖隊列m_blockthread可能因為多線程操作導致解鎖通告丟失。
因此,m_blockthread需要改為普通隊列,並且在操作前暫時用自旋鎖鎖定(暫時使用,希望可以找到更好的方法)。
大致修改如下:
void Lock()
{
if(InterlockedCompareExchange(&flag,1,0) == 1)
{
uthread *currentUThread = GetCurrentUThread();
_node<uthread*> tmp = currentUThread->GetLockNode();
while(InterlockedCompareExchange(&spinlock,1,0) == 1);
push(tmp);
InterlockedCompareExchange(&spinlock,0,1);
Wait4Lock();
}
}
void UnLock()
{
if(InterlockedCompareExchange(&flag,0,1) == 0)
return;
while(InterlockedCompareExchange(&spinlock,1,0) == 1);
_node<uthread*> *tmp = pop();
InterlockedCompareExchange(&spinlock,0,1);
if(tmp)
NotifyUnLock(this,tmp->val);
}
其次,還有一個問題需要解決,就是各纖程獲得鎖的次數不平均,具體例子如下:在雙核機器上啟動兩個線程,線程上各運行一個纖程,對testlist進行寫入的時候
會發現,大部分的寫入是由其中一個纖程完成的,而另外一個纖程則很少能獲得寫入的機會。
下面是修改后的uLock.h,解決了纖程獲得鎖不平均的問題,只要創建的調度線程不超過cpu的數量,基本保證了各纖程有均等的機會獲得鎖。
#ifndef _ULOCK_H
#define _ULOCK_H
#pragma pack(push)
#pragma pack(4)
#include "fiberApi.h"
//#include "lockfree_queue.h"
class Scheduler;
//纖程間使用的用戶級鎖
struct umutex
{
friend class Scheduler;
public:
umutex():flag(0),spinlock(0),m_head(0),m_tail(0)
{
}
void Lock()
{
if(InterlockedCompareExchange(&flag,1,0) == 1)
{
uthread *currentUThread = GetCurrentUThread();
_node<uthread*> *tmp = currentUThread->GetLockNode();
while(InterlockedCompareExchange(&spinlock,1,0) == 1);
//再次嘗試加鎖
if(InterlockedCompareExchange(&flag,1,0) == 0)
{
InterlockedCompareExchange(&spinlock,0,1);
return;
}
push(tmp);
InterlockedCompareExchange(&spinlock,0,1);
//加鎖失敗,阻塞當前纖程
Wait4Lock();
}
}
void UnLock()
{
if(InterlockedCompareExchange(&flag,0,1) == 0)
{
//沒有lock
return;
}
//已經解鎖,挑選一個纖程,並將它喚醒
while(InterlockedCompareExchange(&spinlock,1,0) == 1);
_node<uthread*> *tmp = pop();
InterlockedCompareExchange(&spinlock,0,1);
if(tmp)
{
NotifyUnLock(this,tmp->val);
}
}
private:
bool _Lock(uthread *ut)
{
bool ret = InterlockedCompareExchange(&flag,1,0) == 0;
if(!ret)
{
uthread *currentUThread = GetCurrentUThread();
_node<uthread*> *tmp = ut->GetLockNode();
while(InterlockedCompareExchange(&spinlock,1,0) == 1);
//再次嘗試加鎖
if(InterlockedCompareExchange(&flag,1,0) == 0)
{
InterlockedCompareExchange(&spinlock,0,1);
return true;
}
push(tmp);
InterlockedCompareExchange(&spinlock,0,1);
}
return ret;
}
void push(_node<uthread*> *blockut)
{
blockut->_next = NULL;
if(NULL == m_tail)
{
m_head = m_tail = blockut;
}
else
{
m_tail->_next = blockut;
m_tail = blockut;
}
}
_node<uthread*> *pop()
{
if(NULL == m_head)
return NULL;
else
{
_node<uthread*> *ret = m_head;
m_head = m_head->_next;
if(m_head == NULL)
m_tail = m_head;
return ret;
}
}
private:
volatile long flag;//如果被持有則置1,否則置0
volatile long spinlock;//自旋鎖,保護m_blockthread;
//隊列,記錄阻塞在這個鎖上的纖程
_node<uthread*> *m_head;
_node<uthread*> *m_tail;
};
#pragma pack(pop)
#endif
測試代碼:
// fiberFramework.cpp : 定義控制台應用程序的入口點。
//
#include "stdafx.h"
#include "CThread.h"
#include "fiberApi.h"
#include "uLock.h"
#include "CLock.h"
#define TESTSIZE 1000000
int g_testlist[TESTSIZE];
int g_testlistcs[TESTSIZE];
int g_testmutex[TESTSIZE];
umutex *g_lock;
zMutex *g_lockmutex;
zLightMutex *g_lockcs;
/*std::list<int> g_testlist2;
std::list<int> g_testlistcs2;
std::list<int> g_testmutex2;
umutex *g_lock2;
zMutex *g_lockmutex2;
zLightMutex *g_lockcs2;
*/
static volatile bool finish = false;
static volatile long count = 0;
zThreadGroup g_threadgroup;
DWORD starttime = 0;
DWORD endtime = 0;
class uworker : public runnable
{
public:
void main_routine()
{
while(1)
{
g_lock->Lock();
if(count==0)
{
starttime = GetTickCount();
}
if(count == TESTSIZE)
{
endtime = GetTickCount();
finish = true;
g_lock->UnLock();
return;
}
else
{
g_testlist[count] = GetCurrentThreadId()+GetCurrentUThreadId();
//InterlockedIncrement(&count);
}
++count;
g_lock->UnLock();
_Yield(YIELD);
volatile int c = 0;
for(volatile int cc = 0; cc < 100; ++cc)
c++;
}
}
};
class CWorkerThread : public zThread,private Noncopyable
{
public:
CWorkerThread(const std::string &name = std::string("zThread"),const bool joinable = true)
:zThread(name,joinable){}
~CWorkerThread(){}
void run()
{
Scheduler *sc = new Scheduler;
sc->Init();
//g_tlssc.insert(std::make_pair(GetCurrentThreadId(),sc));
if(g_tlssc[GetCurrentThreadId()%TLSSIZE] != NULL)
{
printf("error here/n");
getchar();
exit(0);
}
g_tlssc[GetCurrentThreadId()%TLSSIZE] = sc;
{
uworker uw1;
uworker uw2;
uworker uw3;
uworker uw4;
sc->FiberStartRun(&uw1);
sc->FiberStartRun(&uw2);
sc->FiberStartRun(&uw3);
sc->FiberStartRun(&uw4);
}
/*{
uworker uw1;
uworker uw2;
uworker uw3;
uworker uw4;
sc->FiberStartRun(&uw1);
sc->FiberStartRun(&uw2);
sc->FiberStartRun(&uw3);
sc->FiberStartRun(&uw4);
}*/
while(!finish)
{
sc->Schedule();
}
}
};
class CWorkerThreadCs : public zThread,private Noncopyable
{
public:
CWorkerThreadCs(const std::string &name = std::string("zThread"),const bool joinable = true)
:zThread(name,joinable){}
~CWorkerThreadCs(){}
void run()
{
while(1)
{
g_lockcs->Lock();
if(count == 0)
{
starttime = GetTickCount();;
}
if(count == TESTSIZE)
{
endtime = GetTickCount();
g_lockcs->UnLock();
return;
}
else
{
g_testlistcs[count] = GetCurrentThreadId();
//InterlockedIncrement(&count);
}
++count;
g_lockcs->UnLock();
volatile int c = 0;
for(volatile int cc = 0; cc < 100; ++cc)
c++;
}
}
};
class CWorkerThreadMutex : public zThread,private Noncopyable
{
public:
CWorkerThreadMutex(const std::string &name = std::string("zThread"),const bool joinable = true)
:zThread(name,joinable){}
~CWorkerThreadMutex(){}
void run()
{
while(1)
{
g_lockmutex->Lock();
if(count == 0)
{
starttime = GetTickCount();;
}
if(count == TESTSIZE)
{
endtime = GetTickCount();
g_lockmutex->UnLock();
//printf("finish/n");
return;
}
else
{
g_testmutex[count] = GetCurrentThreadId();
//InterlockedIncrement(&count);
//printf("uthread:%d/n",GetCurrentThreadId());
}
++count;
g_lockmutex->UnLock();
volatile int c = 0;
for(volatile int cc = 0; cc < 100; ++cc)
c++;
}
}
};
struct TestCallback : public zThreadGroup::Callback
{
void exec(zThread *e)
{
e->start();
}
~TestCallback(){}
};
void testfiber(int n)
{
void *buf = _aligned_malloc(sizeof(*g_lock),4);
g_lock = new (buf)umutex;
for(int i = 0; i < n; ++i)
{
CWorkerThread *cw1 = new CWorkerThread;
g_threadgroup.add(cw1);
}
TestCallback CallBack;
g_threadgroup.execAll(CallBack);
g_threadgroup.joinAll();
printf("test fiber/n");
printf("count %d/n",count);
printf("time %d/n",endtime - starttime);
std::map<int,int> stat;
for(int i = 0; i < TESTSIZE; ++i)
{
std::map<int,int>::iterator it = stat.find(g_testlist[i]);
if(it == stat.end())
{
stat.insert(std::make_pair(g_testlist[i],1));
}
else
{
stat[g_testlist[i]]++;
}
}
printf("stat size = %d/n",stat.size());
for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
{
printf("id=%d,count=%d/n",it->first,it->second);
}
}
void testcs(int n)
{
g_lockcs = new zLightMutex;
for(int i = 0; i < n; ++i)
{
CWorkerThreadCs *cw1 = new CWorkerThreadCs;
g_threadgroup.add(cw1);
}
TestCallback CallBack;
g_threadgroup.execAll(CallBack);
//while(!_kbhit())//等待服務器終止
//{
// Sleep(10);
//}
g_threadgroup.joinAll();
printf("test cs/n");
printf("count %d/n",count);
printf("time %d/n",endtime - starttime);
std::map<int,int> stat;
for(int i = 0; i < TESTSIZE; ++i)
{
std::map<int,int>::iterator it = stat.find(g_testlistcs[i]);
if(it == stat.end())
{
stat.insert(std::make_pair(g_testlistcs[i],1));
}
else
{
stat[g_testlistcs[i]]++;
}
}
printf("stat size = %d/n",stat.size());
for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
{
printf("id=%d,count=%d/n",it->first,it->second);
}
}
void testmutex(int n)
{
g_lockmutex = new zMutex;
for(int i = 0; i < n; ++i)
{
CWorkerThreadMutex *cw1 = new CWorkerThreadMutex;
g_threadgroup.add(cw1);
}
TestCallback CallBack;
g_threadgroup.execAll(CallBack);
g_threadgroup.joinAll();
printf("test mutex/n");
printf("count %d/n",count);
printf("time %d/n",endtime - starttime);
std::map<int,int> stat;
for(int i = 0; i < TESTSIZE; ++i)
{
std::map<int,int>::iterator it = stat.find(g_testmutex[i]);
if(it == stat.end())
{
stat.insert(std::make_pair(g_testmutex[i],1));
}
else
{
stat[g_testmutex[i]]++;
}
}
printf("stat size = %d/n",stat.size());
for(std::map<int,int>::iterator it = stat.begin(); it != stat.end(); ++it)
{
printf("id=%d,count=%d/n",it->first,it->second);
}
}
int _tmain(int argc, _TCHAR* argv[])
{
int n = _ttol(argv[1]);
memset(g_tlssc,0,sizeof(g_tlssc));
count = 0;
testfiber(n/4);
count = 0;
testcs(n);
count = 0;
testmutex(n);
/*LockFreeQueue<int> q;
for(int i = 0; i < 5; ++i)
{
_node<int> *pNode = new _node<int>;
pNode->val = i;
q.push(pNode);
}
//q.print();
for(int i = 0; i < 5; ++i)
{
_node<int> *pNode = q.pop();
printf("%d/n",pNode->val);
}
_node<int> *pNode = q.pop();
*/
getchar();
return 0;
}