php + mysql 分布式事務
事務(Transaction)是訪問並可能更新數據庫中各種數據項的一個程序執行單元;
事務應該具有4個屬性:原子性、一致性、隔離性、持續性
原子性(atomicity)。一個事務是一個不可分割的工作單位,事務中包括的諸操作要么都做,要么都不做。
一致性(consistency)。事務必須是使數據庫從一個一致性狀態變到另一個一致性狀態。一致性與原子性是密切相關的。
隔離性(isolation)。一個事務的執行不能被其他事務干擾。即一個事務內部的操作及使用的數據對並發的其他事務是隔離的,並發執行的各個事務之間不能互相干擾。
持久性(durability)。持續性也稱永久性(permanence),指一個事務一旦提交,它對數據庫中數據的改變就應該是永久性的。接下來的其他操作或故障不應該對其有任何影響。
分布式事務:分布式事務的參與者、資源管理器、事務管理器等位於不用的節點上,這些不同的節點相互協作共同完成一個具有邏輯完整性的事務。
糾正自己對mysql的一個誤解,mysql從5.0開始支持XA DataSource。Connector/J 版本要使用5.0版本,5.0以下的不支持。
XA協議由Tuxedo首先提出的,並交給X/Open組織,作為資源管理器(數據庫)與事務管理器的接口標准。目前,Oracle、Informix、DB2和Sybase等各大數據庫廠家都提供對XA的支持。XA協議采用兩階段提交方式來管理分布式事務。XA接口提供資源管理器與事務管理器之間進行通信的標准接口。XA協議包括兩套函數,以xa_開頭的及以ax_開頭的。 以下的函數使事務管理器可以對資源管理器進行的操作:
1)xa_open,xa_close:建立和關閉與資源管理器的連接。
2)xa_start,xa_end:開始和結束一個本地事務。
3)xa_prepare,xa_commit,xa_rollback:預提交、提交和回滾一個本地事務。
4)xa_recover:回滾一個已進行預提交的事務。
5)ax_開頭的函數使資源管理器可以動態地在事務管理器中進行注冊,並可以對XID(TRANSACTION IDS)進行操作。
6)ax_reg,ax_unreg;允許一個資源管理器在一個TMS(TRANSACTION MANAGER SERVER)中動態注冊或撤消注冊。
MySQL XA分為兩類,內部XA與外部XA;內部XA用於同一實例下跨多個引擎的事務,由大家熟悉的Binlog作為協調者;外部XA用於跨多MySQL實例的分 布式事務,需要應用層介入作為協調者(崩潰時的懸掛事務,全局提交還是回滾,需要由應用層決定,對應用層的實現要求較高);
Binlog作為內部XA的協調者,在binlog中出現的內部xid,在crash recover時,由binlog負責提交。(這是因為,binlog不進行prepare, 只進行commit,因此在binlog中出現的內部xid,一定能夠保證其在底層各存儲引擎中已經完成prepare)。
MySQL數據庫外部XA可以用在分布式數據庫代理層,實現對MySQL數據庫的分布式事務支持,例如開源的代理工具:網易的DDB,淘寶的TDDL,B2B的Cobar等等。
示例
public function testAction(){
$goods_id=1;
$goods_name = "大西瓜";
$num = 1;
$rs_order = $this->test->createorder($goods_id,$goods_name,$num);
$rs_goods = $this->test->deduction($goods_id,$num);
if($rs_order['status'] =="success" && $rs_goods['status']=="success"){
$this->test->commitdb($rs_order['XA']);
$this->test->commitdb1($rs_goods['XA']);
}else{
$this->test->rollbackdb($rs_order['XA']);
$this->test->rollbackdb1($rs_goods['XA']);
}
print_r($rs_order);
echo "<br />";
print_r($rs_goods);
die("dddd");
}
public function createorder($goods_id,$goods_name,$num){
$XA = uniqid("");
$this->_db->query("XA START '$XA'");
$_rs = true;
try {
$data = array();
$data['order_id'] = "V".date("YmdHis");
$data['goods_name'] = $goods_name;
$data['goods_num'] = $num;
$this->_db->insert("temp_orders",$data);
$rs = $this->_db->lastInsertId();
if($rs){
$_rs = true;
}else{
$_rs = false;
}
} catch (Exception $e) {
$_rs = false;
}
$this->_db->query("XA END '$XA'");
if($_rs){
$this->_db->query("XA PREPARE '$XA'");
return array("status"=>"success","XA"=>$XA);
}else{
return array("status"=>"nosuccess","XA"=>$XA);
}
}
public function deduction($id){
$XA = uniqid("");
$this->db1->query("XA START '$XA'");
$last_rs = true;
try {
$sql = "select * from temp_goods where id = '$id' and goods_num>0";
$rs = $this->db1->fetchRow($sql);
if(!empty($rs)){
$sql = "update temp_goods set goods_num = goods_num-1 where id = '$id'";
$rd = $this->db1->query($sql);
if($rd){
$last_rs = true;
}else{
$last_rs = false;
}
}else{
$last_rs = false;;
}
} catch (Exception $e) {
$last_rs = false;;
}
$this->db1->query("XA END '$XA'");
if($last_rs){
$this->db1->query("XA PREPARE '$XA'");
return array("status"=>"success","XA"=>$XA);
}else{
return array("status"=>"nosuccess","XA"=>$XA);
}
}
//提交事務!
public function commitdb($xa){
return $this->_db->query("XA COMMIT '$xa'");
}
//回滾事務
public function rollbackdb($xa){
return $this->_db->query("XA ROLLBACK '$xa'");
}
//提交事務!
public function commitdb1($xa){
return $this->db1->query("XA COMMIT '$xa'");
}
//回滾事務
public function rollbackdb1($xa){
return $this->db1->query("XA ROLLBACK '$xa'");
}