spring事務與消息隊列


在開發過程中,遇到一個bug,產生bug的原因是spring事務提交晚於消息隊列的生產消息,導致消息隊列消費消息時獲取到的數據不正確。這篇文章介紹問題的產生和一步步的解決過程。

一.問題的產生:

場景還原:接口中的一個方法,首先修改訂單狀態,然后向消息隊列中生產消息,消息隊列的消費者獲取到消息檢測訂單狀態,發現訂單狀態未更改。

代碼:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
	@Resource MqService mqService;
	@OrderDao orderDao;
	
	public void push(String orderId) {
		// 更新訂單狀態,之前的狀態是1
		updateStatus(orderId, 3);
		// 產生消息
		mqService.produce(orderId);
	}
	public viod updateStatus(String orderId, Integer status) {
		orderDao.updateStatus(orderId, status);
	}
}

問題產生原因:orderApi中的所有方法都有事務,事務類型PROPAGATION_REQUIRED,所以push方法對數據的操作會在push代碼全部執行之后提交,而在事務提交之前消息隊列的消息已經產生所以消息隊列中消費到的訂單從數據庫查詢出的狀態可能還為1。為了讓bug現象更明顯,可以在push方法最后添加:

try {
	Thread.sleep(10000);
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}

這樣就會發現消費消息時,訂單狀態一定是未修改的。

 

二.問題的解決:

解決方案:在更新數據時,新建一個事物,保證更新代碼執行完成后,更新數據庫的事務已被提交。(確保消息產生前數據庫操作已提交)

按照上述方案,我首先想到的是直接修改updateStatus方法的事務類型;我將此方法的事務類型改為PROPAGATION_REQUIRES_NEW(新建事務,如果當前存在事務,把當前事務掛起)。

但是這么做有兩點不合適:

  1.強制修改了updateStaus的事務類型,可能影響其他流程。

  2.未起到作用,updateStaus方法中沒有新建事務。

關於第二點的解釋:spring添加事務是通過BeanNameAutoProxyCreator實現的動態代理,只是給bean對象添加了事務,現在在類內部調用方法,是不會觸發新事物的創建的。

所以在經過以上嘗試后,我創建了一個新的類:

@Service("orderExtApi")
public class OrderExtApiImpl {
	@Resource OrderApi orderApi;
	
	public void updateStatusNewPropagation(String orderId) {
		orderApi.updateStatus(orderId);
	}
}

並為updateStatusNewPropagation方法添加事務PROPAGATION_REQUIRES_NEW

這個類就只是為了給orderApi中的updateStaus方法新起一個事務。

ok,到此為止bug已經解決了。

但是代碼中還是存在問題:對數據庫的操作已經提交,如果生產消息出現異常對業務邏輯來說還是錯誤的。所以需要檢測消息的產生是否完成。

最終orderApi中的代碼如下:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
	@Resource MqService mqService;
	@Resource OrderDao orderDao;
	@Resource OrderExtApiImpl orderExtApi;
	
	public void push(String orderId) {
		// 更新訂單狀態,之前的狀態是1
		orderExtApi.updateStatusNewPropagation(orderId, 3);
		// 產生消息--produce會檢測是否出現異常 當返回1時表示生產消息成功
		Response response = mqService.produce(orderId);
		if (response.getCode() != 1) {
			log.info("消息隊列生產消息異常:" + response.getErrorMsg())
			// 生產消息異常,重置狀態  等待下次重新執行
			orderExtApi.updateStatusNewPropagation(orderId, 1);
		}
		
	}
	public viod updateStatus(String orderId, Integer status) {
		orderDao.updateStatus(orderId, status);
	}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM