轉載: http://blog.sina.com.cn/s/blog_48d4cf2d0100mx4n.html
死鎖是由於不同線程按照不同順序進行加鎖而造成的。如:
線程A:對lock a加鎖 => 對lock b加鎖 => dosth => 釋放lock b => 釋放lock a
線程B:對lock b加鎖 => 對lock a加鎖 => dosth => 釋放lock a => 釋放lock b
這樣兩條線程,就可能發生死鎖問題。要避免發生死鎖,應該使用同一個順序進行加鎖。
這點在對象單向調用的情況下是很容易達成的。對象單向調用的意思是如果對象a的函數調用了對象b的函數,則對象b中的函數不會去調用對象a的函數(注意:a和b也可能同屬於一個類)。
舉個例子吧,假設聊天室(Room)對象room,聊天者(Chatter)對象chatter,假設Chatter和Room的定義如下:
class InnerChatter
{
public:
void sendMsg(const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
socket->send(msg);
}
private:
boost::mutex mtx;
TcpSocket socket;
};
typedef boost::shared_ptr< InnerChatter> Chatter;
class InnerRoom
{
public:
void sendMsg(const string& user, const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
if (chatters.find(user) != chatters.end())
{
chatters[user]-> sendMsg(user);
}
}
private:
boost::mutex mtx;
map<string, Chatter> chatters;
};
目前代碼中只存在Room調用Chatter的情況,不存在Chatter調用Room,Room調用Room,Chatter調用Chatter這三種情況。所以總是先獲得room鎖,再獲得chatter鎖,不會發生死鎖。
如果為Chatter加上發送歷史和以下這個方法之后呢?
vector<string> history;
void sendMsgToChatter(Chatter dst, const string& msg)
{
boost::mutex::scoped_lock lock(mtx); // 加鎖當前對象
history.push_back(msg);
dsg>sendMsg(msg); // 注意:次函數調用會加鎖dst對象
}
乍看起來似乎沒問題,但如果線程A執行chatterA.sendMsgToChatter(chatterB, “sth”)時,線程B正好執行chatterB.sendMsgToChatter(chatterA, “sth”),就會發生本文一開頭舉例的死鎖問題。
如果在Chatter中加入函數:
void sendMsgToAll(Room room, const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
history.push_back(msg);
room->sendMsgToAll(msg);
}
在Room中加入函數:
void sendMsgToAll(const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
for (map<string, Chatter>::iterator it = chatters.begin(); it != chatters.end(); ++it)
{
it->second->sendMsg(msg);
}
}
顯然死鎖問題更嚴重了,也更令人抓狂了。也許有人要問,為什么要這么做,不能就保持Room單向調用Chatter嗎?大部分時候答案是肯定的,也建議大部分模塊尤其是周邊模塊如基礎設施模塊使用明確清晰的單向調用關系,這樣可以減少對死鎖的憂慮,少白一些頭發。
但有時候保證單向調用的代價太高:試想一下,如果被調用者b是一個容器類,調用者a定義了一些對元素的匯總操作如求和,為了避免回調(回調打破了單向調用約束),那就只有對b加鎖,復制所有元素,解鎖,遍歷求和。復制所有元素比較耗計算資源,有可能成為性能瓶頸。
另外還有設計方面的考慮。還舉Room和Chatter的例子,如果避免Chatter調用Room和Chatter,則Chatter很難實現啥高級功能,這樣所有代碼都將堆砌在Room,Room將成為一個超級類,帶來維護上的難度。此外還有設計上的不妥:因為幾乎全部面向對象的設計模式都可以理解成某種方式的回調,禁止回調也就禁掉了設計模式,可能帶來不便。
當對象間的相互調用無法避免時,如果只使用傳統的mutex,保證相同順序加鎖需要十分小心,萬一編程時失誤,測試時又沒發現(這是很可能的,死鎖很不容易測試出來),如果條件允許還可以手忙腳亂地火線gdb,若無法調試定位,則服務器可能要成為重啟帝了,對產品的形象十分有害。
我想出的解決方案是既然mutex要保證相同順序加鎖,就直接讓mutex和一個優先級掛鈎,使用線程專有存儲(TSS)保存當前線程優先級最低的鎖,當對新的mutex加鎖時,如果mutex的優先級< 當前優先級(為什么=不可以,參考上文說的sendMsgToChatter函數),才允許加鎖,否則記錄當前函數棧信息,拋出異常(要仔細設計以免破壞內部數據結構)。代碼如下:
boost::thread_specific_ptr<global::stack<int>> locks_hold_by_current_thread;
class xrecursive_mutex
{
public:
xrecursive_mutex(int pri_level_)
: recursion_count(0)
, pri_level(pri_level_){}
~xrecursive_mutex(){}
class scoped_lock
{
public:
scoped_lock(xrecursive_mutex& mtx_)
: mtx(mtx_)
{
mtx.lock();
}
~scoped_lock()
{
mtx.unlock();
}
private:
xrecursive_mutex& mtx;
};
private:
int recursion_count;
int pri_level;
boost::recursive_mutex mutex;
int get_recursion_count()
{
return recursion_count;
}
void lock()
{
mutex.lock();
++ recursion_count;
if (recursion_count == 1)
{
if (locks_hold_by_current_thread.get() == NULL)
{
locks_hold_by_current_thread.reset(new std::stack<int>());
}
if (!locks_hold_by_current_thread->empty() &&
locks_hold_by_current_thread->top()>= pri_level)
{ // wrong order, lock failed
-- recursion_count;
mutex.unlock();
XASSERT(false);//記錄棧信息,拋異常
}
locks_hold_by_current_thread->push(pri_level);
}
}
void unlock()
{
bool bad_usage_flag = false;
if (recursion_count == 1 &&locks_hold_by_current_thread.get() != NULL)
{
if (!locks_hold_by_current_thread->empty()
&& (locks_hold_by_current_thread->top() == pri_level))
{
locks_hold_by_current_thread->pop();
}
else
{
bad_usage_flag = true;
}
}
-- recursion_count;
mutex.unlock();
XASSERT(!bad_usage_flag);// // 記錄棧信息,拋異常
}
};
使用:
xrecursive_mutex mtx1(1);
xrecursive_mutex mtx2(2);
xrecursive_mutex mtx3(3);
xrecursive_mutex mtx3_2(3);
{
xrecursive_mutex::scoped_lock lock1(mtx1); // pass, 當前線程鎖優先級1
xrecursive_mutex::scoped_lock lock2(mtx3); // pass, 當前線程鎖優先級3
ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock2_2(mtx3_2)); // 捕獲異常,因為優先級3 <= 當前線程鎖優先級
xrecursive_mutex::scoped_lock lock3(mtx3); // pass, 可重入鎖
xrecursive_mutex::scoped_lock lock4(mtx1); // pass, 可重入鎖
ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock5(mtx2)); // 捕獲異常,因為優先級2<= 當前線程鎖優先級3
}