前言
好久沒寫東西了,9月份換了份工作,一上來就忙的要死。根本沒時間學東西,好在新公司的新項目里面遇到了之前沒遇到過的難題。那遇到難題就要想辦法解決咯,一個請求,調用兩個服務,同時操作更新兩個數據庫。這就帶來事務不一致的問題了,分布式事務管理被強行拉出來了。導致原本兩個springboot的單體項目,必須要協同管理起來。剛好微服務也接觸過,小試牛刀咯。
框架介紹
LCN分布式事務框架其本身並不創建事務,而是基於對本地事務的協調從而達到事務一致性的效果
核心步驟
創建事務組
是指在事務發起方開始執行業務代碼之前先調用TxManager創建事務組對象,然后拿到事務標示GroupId的過程。
添加事務組
添加事務組是指參與方在執行完業務方法以后,將該模塊的事務信息添加通知給TxManager的操作。
關閉事務組
是指在發起方執行完業務代碼以后,將發起方執行結果狀態通知給TxManager的動作。當執行完關閉事務組的方法以后,TxManager將根據事務組信息來通知相應的參與模塊提交或回滾事務。
框架的特點
- 支持各種基於spring的db框架
- 兼容SpringCloud、Dubbo、motan
- 使用簡單,低依賴,代碼完全開源
- 基於切面的強一致性事務框架
- 高可用,模塊可以依賴RPC模塊做集群化,TxManager也可以做集群化
- 支持本地事務和分布式事務共存
- 支持事務補償機制,增加事務補償決策提醒
- 添加插件拓展機制
源碼目錄說明
- transaction-dubbo LCN dubbo rpc框架擴展支持
- transaction-springcloud LCN springcloud rpc框架擴展支持
- tx-client 是LCN核心tx模塊端控制框架
- tx-manager 是LCN 分布式事務協調器
- tx-plugins-db 是LCN 對關系型數據庫的插件支持
- tx-plugins-nodb 是LCN 對於無數據庫模塊的插件支持
地址:https://gitee.com/fengyuduke/my_open_resources/blob/master/tx-lcn-master.7z
本章只說微服務模式下的使用。
1.配置分布式事務協調器
只要配置三個地方:redis連接信息,eureka注冊中心,開放的端口。
eureka注冊中心一定要配置自己系統的地址。
#######################################txmanager-start################################################# #服務端口 server.port=8899 #tx-manager不得修改 spring.application.name=tx-manager spring.mvc.static-path-pattern=/** spring.resources.static-locations=classpath:/static/ #######################################txmanager-end################################################# #zookeeper地址 #spring.cloud.zookeeper.connect-string=127.0.0.1:2181 #spring.cloud.zookeeper.discovery.preferIpAddress = true #eureka 地址 eureka.client.service-url.defaultZone=http://localhost:8083/eureka/ eureka.instance.prefer-ip-address=true #######################################redis-start################################################# #redis 配置文件,根據情況選擇集群或者單機模式 ##redis 集群環境配置 ##redis cluster #spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003 #spring.redis.cluster.commandTimeout=5000 ##redis 單點環境配置 #redis #redis主機地址 spring.redis.host=127.0.0.1 #redis主機端口 spring.redis.port=6379 #redis鏈接密碼 spring.redis.password= spring.redis.pool.maxActive=10 spring.redis.pool.maxWait=-1 spring.redis.pool.maxIdle=5 spring.redis.pool.minIdle=0 spring.redis.timeout=0 #####################################redis-end################################################### #######################################LCN-start################################################# #業務模塊與TxManager之間通訊的最大等待時間(單位:秒) #通訊時間是指:發起方與響應方之間完成一次的通訊時間。 #該字段代表的是Tx-Client模塊與TxManager模塊之間的最大通訊時間,超過該時間未響應本次請求失敗。 tm.transaction.netty.delaytime = 5 #業務模塊與TxManager之間通訊的心跳時間(單位:秒) tm.transaction.netty.hearttime = 15 #存儲到redis下的數據最大保存時間(單位:秒) #該字段僅代表的事務模塊數據的最大保存時間,補償數據會永久保存。 tm.redis.savemaxtime=30 #socket server Socket對外服務端口 #TxManager的LCN協議的端口 tm.socket.port=9999 #最大socket連接數 #TxManager最大允許的建立連接數量 tm.socket.maxconnection=100 #事務自動補償 (true:開啟,false:關閉) # 說明: # 開啟自動補償以后,必須要配置 tm.compensate.notifyUrl 地址,僅當tm.compensate.notifyUrl 在請求補償確認時返回success或者SUCCESS時,才會執行自動補償,否則不會自動補償。 # 關閉自動補償,當出現數據時也會 tm.compensate.notifyUrl 地址。 # 當tm.compensate.notifyUrl 無效時,不影響TxManager運行,僅會影響自動補償。 tm.compensate.auto=false #事務補償記錄回調地址(rest api 地址,post json格式) #請求補償是在開啟自動補償時才會請求的地址。請求分為兩種:1.補償決策,2.補償結果通知,可通過通過action參數區分compensate為補償請求、notify為補償通知。 #*注意當請求補償決策時,需要補償服務返回"SUCCESS"字符串以后才可以執行自動補償。 #請求補償結果通知則只需要接受通知即可。 #請求補償的樣例數據格式: #{"groupId":"TtQxTwJP","action":"compensate","json":"{\"address\":\"133.133.5.100:8081\",\"className\":\"com.example.demo.service.impl.DemoServiceImpl\",\"currentTime\":1511356150413,\"data\":\"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp\",\"groupId\":\"TtQxTwJP\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo1\",\"state\":0,\"time\":36,\"txGroup\":{\"groupId\":\"TtQxTwJP\",\"hasOver\":1,\"isCompensate\":0,\"list\":[{\"address\":\"133.133.5.100:8899\",\"isCompensate\":0,\"isGroup\":0,\"kid\":\"wnlEJoSl\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo2\",\"modelIpAddress\":\"133.133.5.100:8082\",\"channelAddress\":\"/133.133.5.100:64153\",\"notify\":1,\"uniqueKey\":\"bc13881a5d2ab2ace89ae5d34d608447\"}],\"nowTime\":0,\"startTime\":1511356150379,\"state\":1},\"uniqueKey\":\"be6eea31e382f1f0878d07cef319e4d7\"}"} #請求補償的返回數據樣例數據格式: #SUCCESS #請求補償結果通知的樣例數據格式: #{"resState":true,"groupId":"TtQxTwJP","action":"notify"} tm.compensate.notifyUrl=http://ip:port/path #補償失敗,再次嘗試間隔(秒),最大嘗試次數3次,當超過3次即為補償失敗,失敗的數據依舊還會存在TxManager下。 tm.compensate.tryTime=30 #各事務模塊自動補償的時間上限(毫秒) #指的是模塊執行自動超時的最大時間,該最大時間若過段會導致事務機制異常,該時間必須要模塊之間通訊的最大超過時間。 #例如,若模塊A與模塊B,請求超時的最大時間是5秒,則建議改時間至少大於5秒。 tm.compensate.maxWaitTime=5000 #######################################LCN-end################################################# logging.level.com.codingapi=debug
2、服務中加入分布式事務管理(加入LCN事務管理)
- pom.xml中添加LCN框架的依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.prise</groupId> 7 <artifactId>springcloud-lcn-demo</artifactId> 8 <version>4.1.0</version> 9 <packaging>pom</packaging> 10 11 <name>springcloud-lcn-demo</name> 12 13 <modules> 14 <module>mybatis-demo/springcloud-mybatis-demo1</module> 15 <module>mybatis-demo/springcloud-mybatis-demo2</module> 16 <module>mybatis-demo/springcloud-mybatis-demo3</module> 17 </modules> 18 19 20 <parent> 21 <groupId>org.springframework.boot</groupId> 22 <artifactId>spring-boot-starter-parent</artifactId> 23 <version>1.5.9.RELEASE</version> 24 <relativePath/> <!-- lookup parent from repository --> 25 </parent> 26 27 <properties> 28 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 29 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 30 31 <java.version>1.8</java.version> 32 <maven.compile.source>1.7</maven.compile.source> 33 <maven.compile.target>1.7</maven.compile.target> 34 <spring-cloud.version>Edgware.RELEASE</spring-cloud.version> 35 <!--lcn的版本--> 36 <lcn.last.version>4.1.0</lcn.last.version> 37 38 </properties> 39 40 <dependencies> 41 <!--LCN基於springcloud的分布式事務框架--> 42 <dependency> 43 <groupId>com.codingapi</groupId> 44 <artifactId>transaction-springcloud</artifactId> 45 <version>${lcn.last.version}</version> 46 <exclusions> 47 <exclusion> 48 <groupId>org.slf4j</groupId> 49 <artifactId>*</artifactId> 50 </exclusion> 51 </exclusions> 52 </dependency> 53 <!--LCN基於關系型數據模塊的封裝--> 54 55 <dependency> 56 <groupId>com.codingapi</groupId> 57 <artifactId>tx-plugins-db</artifactId> 58 <version>${lcn.last.version}</version> 59 <exclusions> 60 <exclusion> 61 <groupId>org.slf4j</groupId> 62 <artifactId>*</artifactId> 63 </exclusion> 64 </exclusions> 65 </dependency> 66 <!--spring-cloud注冊中心--> 67 <dependency> 68 <groupId>org.springframework.cloud</groupId> 69 <artifactId>spring-cloud-starter-eureka</artifactId> 70 </dependency> 71 <dependency> 72 <groupId>org.springframework.cloud</groupId> 73 <artifactId>spring-cloud-starter-feign</artifactId> 74 </dependency> 75 <dependency> 76 <groupId>org.springframework.boot</groupId> 77 <artifactId>spring-boot-starter-test</artifactId> 78 <scope>test</scope> 79 </dependency> 80 81 </dependencies> 82 83 <dependencyManagement> 84 <dependencies> 85 <dependency> 86 <groupId>org.springframework.cloud</groupId> 87 <artifactId>spring-cloud-dependencies</artifactId> 88 <version>${spring-cloud.version}</version> 89 <type>pom</type> 90 <scope>import</scope> 91 </dependency> 92 </dependencies> 93 </dependencyManagement> 94 95 <build> 96 <plugins> 97 <plugin> 98 <groupId>org.apache.maven.plugins</groupId> 99 <artifactId>maven-compiler-plugin</artifactId> 100 <configuration> 101 <source>${maven.compile.source}</source> 102 <target>${maven.compile.target}</target> 103 <encoding>${project.build.sourceEncoding}</encoding> 104 </configuration> 105 </plugin> 106 107 <plugin> 108 <groupId>org.springframework.boot</groupId> 109 <artifactId>spring-boot-maven-plugin</artifactId> 110 </plugin> 111 </plugins> 112 </build> 113 114 115 </project>
transaction-springcloud LCN springcloud rpc框架擴展支持,我們這里使用的是springcloud
tx-plugins-db 是LCN 對關系型數據庫的插件支持,我們這里使用的是mysql數據庫.我是將三個微服務放在一個包里一起的,但是每個微服務都獨立一個端口和數據庫。 - 配置TXmanager的訪問地址和端口號
1 #feign.hystrix.enabled=true 2 3 spring.datasource.driver-class-name = com.mysql.jdbc.Driver 4 spring.datasource.url= jdbc:mysql://localhost:3306/database0 5 spring.datasource.username= root 6 spring.datasource.password= 1234 7 spring.datasource.initialize = true 8 init-db= true 9 10 spring.application.name = demo1 11 server.port = 8085 12 #${random.int[9000,9999]} 13 eureka.client.service-url.defaultZone=http://localhost:8083/eureka/ 14 15 #txmanagerå°å 16 tm.manager.url=http://127.0.0.1:8899/tx/manager/ 17 18 logging.level.com.codingapi=debug 19 20 spring.jpa.show-sql=true
-
分布式事務發起方
需要實現TxManagerTxUrlService和TxManagerHttpRequestService這兩個接口,並作為bean注入到spring中。處理http請求和對服務器的連接。
實現 TxManagerTxUrlService
1 @Service 2 public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{ 3 4 5 @Value("${tm.manager.url}") 6 private String url; 7 8 @Override 9 public String getTxUrl() { 10 System.out.println("load tm.manager.url "); 11 return url; 12 } 13 }
這個方法是為了獲得連接LCN事務管理器的。
實現 TxManagerHttpRequestService
1 @Service 2 public class TxManagerHttpRequestServiceImpl implements TxManagerHttpRequestService{ 3 4 @Override 5 public String httpGet(String url) { 6 System.out.println("httpGet-start"); 7 String res = HttpUtils.get(url); 8 System.out.println("httpGet-end"); 9 return res; 10 } 11 12 @Override 13 public String httpPost(String url, String params) { 14 System.out.println("httpPost-start"); 15 String res = HttpUtils.post(url,params); 16 System.out.println("httpPost-end"); 17 return res; 18 } 19 }
這個文件的作用是:作為事務的發起者,要開啟一個事務組,要主動連接上分布式事務管理框架。
加入分布式事務注解@TxTransaction(isStart = true),開啟分布式事務。isStart = true聲明為分布式事務發起方
1 @Service 2 public class DemoServiceImpl implements DemoService { 3 4 @Autowired 5 private Demo2Feign demo2Feign; 6 @Autowired 7 private Demo3Feign demo3Feign; 8 9 @Autowired 10 private UserMapper userMapper; 11 12 private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class); 13 14 @Override 15 public List<User> list() { 16 return userMapper.findAll(); 17 } 18 19 @Override 20 @TxTransaction(isStart = true) 21 @Transactional 22 public int save() throws Exception { 23 24 int rs1 = userMapper.save("zhangsan", "shanghai"); 25 logger.info("本地服務數據插入成功"); 26 int rs2 = demo2Feign.save(); 27 // Integer rs2 = restTemplate.getForObject("http://127.0.0.1:8087/demo/save", Integer.class); 28 // System.out.println(rs2); 29 logger.info("遠程服務2號數據插入成功"+rs2); 30 // if(0 ==rs2) { 31 // throw new RuntimeException("遠程訪問出錯,全部回滾"); 32 // } 33 int rs3 = demo3Feign.save(); 34 logger.info("遠程服務3號數據插入成功"); 35 logger.info("插入" + (rs1 + rs2 + rs3) + "條記錄"); 36 logger.info("制造異常"); 37 int v = 100 / 0; 38 // if(1==rs2 ) { 39 // throw new RuntimeException("本地出錯,全部回滾"); 40 // } 41 return rs1 + rs2; 42 } 43 }
如上代碼執行完成以后所有參與此分布式事務模塊都將回滾事務。
分布式事務被調用方
需要實現TxManagerTxUrlService個接口,並作為bean注入到spring中。處理對服務器的連接。
1 @Service 2 public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{ 3 4 5 @Value("${tm.manager.url}") 6 private String url; 7 8 @Override 9 public String getTxUrl() { 10 System.out.println("load tm.manager.url "); 11 return url; 12 } 13 }
加入分布式事務注解@TxTransaction,或者實現ITxTransaction接口,開啟分布式事務。等待起調方發起事務
1 @Service 2 public class DemoServiceImpl implements DemoService,ITxTransaction{ 3 4 @Autowired 5 private PeopleMapper peopleMapper; 6 7 private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class); 8 9 10 @Override 11 public List<People> list() { 12 return peopleMapper.findAll(); 13 } 14 15 16 @Override 17 @Transactional 18 public int save() { 19 try { 20 21 int rs = peopleMapper.save("男", "22"); 22 logger.info("插入" + rs + "條記錄"); 23 int a = 1/1; 24 System.out.println(a); 25 return rs; 26 }catch (Exception e) { 27 logger.error(e.getMessage()); 28 return 0; 29 } 30 } 31 }
說明:在使用LCN分布式事務時,只需要將事務的開始方法添加@TxTransaction(isStart=true)注解即可,在參與方添加@TxTransaction或者實現ITxTransaction接口即可。
代碼下載地址:https://gitee.com/fengyuduke/my_open_resources/blob/master/demo.zip
