LCN分布式事務管理(一)


前言

好久沒寫東西了,9月份換了份工作,一上來就忙的要死。根本沒時間學東西,好在新公司的新項目里面遇到了之前沒遇到過的難題。那遇到難題就要想辦法解決咯,一個請求,調用兩個服務,同時操作更新兩個數據庫。這就帶來事務不一致的問題了,分布式事務管理被強行拉出來了。導致原本兩個springboot的單體項目,必須要協同管理起來。剛好微服務也接觸過,小試牛刀咯。

 

框架介紹

LCN分布式事務框架其本身並不創建事務,而是基於對本地事務的協調從而達到事務一致性的效果

 

核心步驟


創建事務組
是指在事務發起方開始執行業務代碼之前先調用TxManager創建事務組對象,然后拿到事務標示GroupId的過程。

添加事務組
添加事務組是指參與方在執行完業務方法以后,將該模塊的事務信息添加通知給TxManager的操作。

關閉事務組
是指在發起方執行完業務代碼以后,將發起方執行結果狀態通知給TxManager的動作。當執行完關閉事務組的方法以后,TxManager將根據事務組信息來通知相應的參與模塊提交或回滾事務。

框架的特點

  1. 支持各種基於spring的db框架
  2. 兼容SpringCloud、Dubbo、motan
  3. 使用簡單,低依賴,代碼完全開源
  4. 基於切面的強一致性事務框架
  5. 高可用,模塊可以依賴RPC模塊做集群化,TxManager也可以做集群化
  6. 支持本地事務和分布式事務共存
  7. 支持事務補償機制,增加事務補償決策提醒
  8. 添加插件拓展機制

源碼目錄說明

  1. transaction-dubbo LCN dubbo rpc框架擴展支持
  2. transaction-springcloud LCN springcloud rpc框架擴展支持
  3. tx-client 是LCN核心tx模塊端控制框架
  4. tx-manager 是LCN 分布式事務協調器
  5. tx-plugins-db 是LCN 對關系型數據庫的插件支持
  6. tx-plugins-nodb 是LCN 對於無數據庫模塊的插件支持

地址:https://gitee.com/fengyuduke/my_open_resources/blob/master/tx-lcn-master.7z

本章只說微服務模式下的使用。

1.配置分布式事務協調器

只要配置三個地方:redis連接信息,eureka注冊中心,開放的端口。

eureka注冊中心一定要配置自己系統的地址。

#######################################txmanager-start#################################################
#服務端口
server.port=8899

#tx-manager不得修改
spring.application.name=tx-manager

spring.mvc.static-path-pattern=/**
spring.resources.static-locations=classpath:/static/
#######################################txmanager-end#################################################


#zookeeper地址
#spring.cloud.zookeeper.connect-string=127.0.0.1:2181
#spring.cloud.zookeeper.discovery.preferIpAddress = true

#eureka 地址
eureka.client.service-url.defaultZone=http://localhost:8083/eureka/
eureka.instance.prefer-ip-address=true

#######################################redis-start#################################################
#redis 配置文件,根據情況選擇集群或者單機模式

##redis 集群環境配置
##redis cluster
#spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003
#spring.redis.cluster.commandTimeout=5000

##redis 單點環境配置
#redis
#redis主機地址
spring.redis.host=127.0.0.1
#redis主機端口
spring.redis.port=6379
#redis鏈接密碼
spring.redis.password=
spring.redis.pool.maxActive=10
spring.redis.pool.maxWait=-1
spring.redis.pool.maxIdle=5
spring.redis.pool.minIdle=0
spring.redis.timeout=0
#####################################redis-end###################################################




#######################################LCN-start#################################################
#業務模塊與TxManager之間通訊的最大等待時間(單位:秒)
#通訊時間是指:發起方與響應方之間完成一次的通訊時間。
#該字段代表的是Tx-Client模塊與TxManager模塊之間的最大通訊時間,超過該時間未響應本次請求失敗。
tm.transaction.netty.delaytime = 5

#業務模塊與TxManager之間通訊的心跳時間(單位:秒)
tm.transaction.netty.hearttime = 15

#存儲到redis下的數據最大保存時間(單位:秒)
#該字段僅代表的事務模塊數據的最大保存時間,補償數據會永久保存。
tm.redis.savemaxtime=30

#socket server Socket對外服務端口
#TxManager的LCN協議的端口
tm.socket.port=9999

#最大socket連接數
#TxManager最大允許的建立連接數量
tm.socket.maxconnection=100

#事務自動補償 (true:開啟,false:關閉)
# 說明:
# 開啟自動補償以后,必須要配置 tm.compensate.notifyUrl 地址,僅當tm.compensate.notifyUrl 在請求補償確認時返回success或者SUCCESS時,才會執行自動補償,否則不會自動補償。
# 關閉自動補償,當出現數據時也會 tm.compensate.notifyUrl 地址。
# 當tm.compensate.notifyUrl 無效時,不影響TxManager運行,僅會影響自動補償。
tm.compensate.auto=false

#事務補償記錄回調地址(rest api 地址,post json格式)
#請求補償是在開啟自動補償時才會請求的地址。請求分為兩種:1.補償決策,2.補償結果通知,可通過通過action參數區分compensate為補償請求、notify為補償通知。
#*注意當請求補償決策時,需要補償服務返回"SUCCESS"字符串以后才可以執行自動補償。
#請求補償結果通知則只需要接受通知即可。
#請求補償的樣例數據格式:
#{"groupId":"TtQxTwJP","action":"compensate","json":"{\"address\":\"133.133.5.100:8081\",\"className\":\"com.example.demo.service.impl.DemoServiceImpl\",\"currentTime\":1511356150413,\"data\":\"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp\",\"groupId\":\"TtQxTwJP\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo1\",\"state\":0,\"time\":36,\"txGroup\":{\"groupId\":\"TtQxTwJP\",\"hasOver\":1,\"isCompensate\":0,\"list\":[{\"address\":\"133.133.5.100:8899\",\"isCompensate\":0,\"isGroup\":0,\"kid\":\"wnlEJoSl\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo2\",\"modelIpAddress\":\"133.133.5.100:8082\",\"channelAddress\":\"/133.133.5.100:64153\",\"notify\":1,\"uniqueKey\":\"bc13881a5d2ab2ace89ae5d34d608447\"}],\"nowTime\":0,\"startTime\":1511356150379,\"state\":1},\"uniqueKey\":\"be6eea31e382f1f0878d07cef319e4d7\"}"}
#請求補償的返回數據樣例數據格式:
#SUCCESS
#請求補償結果通知的樣例數據格式:
#{"resState":true,"groupId":"TtQxTwJP","action":"notify"}
tm.compensate.notifyUrl=http://ip:port/path

#補償失敗,再次嘗試間隔(秒),最大嘗試次數3次,當超過3次即為補償失敗,失敗的數據依舊還會存在TxManager下。
tm.compensate.tryTime=30

#各事務模塊自動補償的時間上限(毫秒)
#指的是模塊執行自動超時的最大時間,該最大時間若過段會導致事務機制異常,該時間必須要模塊之間通訊的最大超過時間。
#例如,若模塊A與模塊B,請求超時的最大時間是5秒,則建議改時間至少大於5秒。
tm.compensate.maxWaitTime=5000
#######################################LCN-end#################################################




logging.level.com.codingapi=debug

 

2、服務中加入分布式事務管理(加入LCN事務管理)

  1. pom.xml中添加LCN框架的依賴
      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      4     <modelVersion>4.0.0</modelVersion>
      5 
      6     <groupId>com.prise</groupId>
      7     <artifactId>springcloud-lcn-demo</artifactId>
      8     <version>4.1.0</version>
      9     <packaging>pom</packaging>
     10 
     11     <name>springcloud-lcn-demo</name>
     12 
     13     <modules>
     14         <module>mybatis-demo/springcloud-mybatis-demo1</module>
     15         <module>mybatis-demo/springcloud-mybatis-demo2</module>
     16         <module>mybatis-demo/springcloud-mybatis-demo3</module>
     17     </modules>
     18 
     19 
     20     <parent>
     21         <groupId>org.springframework.boot</groupId>
     22         <artifactId>spring-boot-starter-parent</artifactId>
     23         <version>1.5.9.RELEASE</version>
     24         <relativePath/> <!-- lookup parent from repository -->
     25     </parent>
     26 
     27     <properties>
     28         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     29         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     30 
     31         <java.version>1.8</java.version>
     32         <maven.compile.source>1.7</maven.compile.source>
     33         <maven.compile.target>1.7</maven.compile.target>
     34         <spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
     35         <!--lcn的版本-->
     36         <lcn.last.version>4.1.0</lcn.last.version>
     37 
     38     </properties>
     39 
     40     <dependencies>
     41         <!--LCN基於springcloud的分布式事務框架-->
     42         <dependency>
     43             <groupId>com.codingapi</groupId>
     44             <artifactId>transaction-springcloud</artifactId>
     45             <version>${lcn.last.version}</version>
     46             <exclusions>
     47                 <exclusion>
     48                     <groupId>org.slf4j</groupId>
     49                     <artifactId>*</artifactId>
     50                 </exclusion>
     51             </exclusions>
     52         </dependency>
     53         <!--LCN基於關系型數據模塊的封裝-->
     54         
     55         <dependency>
     56             <groupId>com.codingapi</groupId>
     57             <artifactId>tx-plugins-db</artifactId>
     58             <version>${lcn.last.version}</version>
     59             <exclusions>
     60                 <exclusion>
     61                     <groupId>org.slf4j</groupId>
     62                     <artifactId>*</artifactId>
     63                 </exclusion>
     64             </exclusions>
     65         </dependency>
     66         <!--spring-cloud注冊中心-->
     67         <dependency>
     68             <groupId>org.springframework.cloud</groupId>
     69             <artifactId>spring-cloud-starter-eureka</artifactId>
     70         </dependency>
     71         <dependency>
     72             <groupId>org.springframework.cloud</groupId>
     73             <artifactId>spring-cloud-starter-feign</artifactId>
     74         </dependency>
     75         <dependency>
     76             <groupId>org.springframework.boot</groupId>
     77             <artifactId>spring-boot-starter-test</artifactId>
     78             <scope>test</scope>
     79         </dependency>
     80 
     81     </dependencies>
     82 
     83     <dependencyManagement>
     84         <dependencies>
     85             <dependency>
     86                 <groupId>org.springframework.cloud</groupId>
     87                 <artifactId>spring-cloud-dependencies</artifactId>
     88                 <version>${spring-cloud.version}</version>
     89                 <type>pom</type>
     90                 <scope>import</scope>
     91             </dependency>
     92         </dependencies>
     93     </dependencyManagement>
     94 
     95     <build>
     96         <plugins>
     97             <plugin>
     98                 <groupId>org.apache.maven.plugins</groupId>
     99                 <artifactId>maven-compiler-plugin</artifactId>
    100                 <configuration>
    101                     <source>${maven.compile.source}</source>
    102                     <target>${maven.compile.target}</target>
    103                     <encoding>${project.build.sourceEncoding}</encoding>
    104                 </configuration>
    105             </plugin>
    106 
    107             <plugin>
    108                 <groupId>org.springframework.boot</groupId>
    109                 <artifactId>spring-boot-maven-plugin</artifactId>
    110             </plugin>
    111         </plugins>
    112     </build>
    113 
    114 
    115 </project>

    transaction-springcloud LCN springcloud rpc框架擴展支持,我們這里使用的是springcloud
    tx-plugins-db 是LCN 對關系型數據庫的插件支持,我們這里使用的是mysql數據庫.我是將三個微服務放在一個包里一起的,但是每個微服務都獨立一個端口和數據庫。

  2. 配置TXmanager的訪問地址和端口號
     1 #feign.hystrix.enabled=true
     2 
     3 spring.datasource.driver-class-name = com.mysql.jdbc.Driver
     4 spring.datasource.url= jdbc:mysql://localhost:3306/database0
     5 spring.datasource.username= root
     6 spring.datasource.password= 1234
     7 spring.datasource.initialize =  true
     8 init-db= true
     9 
    10 spring.application.name = demo1
    11 server.port = 8085
    12 #${random.int[9000,9999]}
    13 eureka.client.service-url.defaultZone=http://localhost:8083/eureka/
    14 
    15 #txmanager地址
    16 tm.manager.url=http://127.0.0.1:8899/tx/manager/
    17 
    18 logging.level.com.codingapi=debug
    19 
    20 spring.jpa.show-sql=true

     

  3. 分布式事務發起方
    需要實現TxManagerTxUrlService和TxManagerHttpRequestService這兩個接口,並作為bean注入到spring中。處理http請求和對服務器的連接。

     實現 TxManagerTxUrlService

    

 1 @Service
 2 public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{
 3 
 4 
 5     @Value("${tm.manager.url}")
 6     private String url;
 7 
 8     @Override
 9     public String getTxUrl() {
10         System.out.println("load tm.manager.url ");
11         return url;
12     }
13 }

這個方法是為了獲得連接LCN事務管理器的。

實現 TxManagerHttpRequestService

 1 @Service
 2 public class TxManagerHttpRequestServiceImpl implements TxManagerHttpRequestService{
 3 
 4     @Override
 5     public String httpGet(String url) {
 6         System.out.println("httpGet-start");
 7         String res = HttpUtils.get(url);
 8         System.out.println("httpGet-end");
 9         return res;
10     }
11 
12     @Override
13     public String httpPost(String url, String params) {
14         System.out.println("httpPost-start");
15         String res = HttpUtils.post(url,params);
16         System.out.println("httpPost-end");
17         return res;
18     }
19 }

這個文件的作用是:作為事務的發起者,要開啟一個事務組,要主動連接上分布式事務管理框架。

加入分布式事務注解@TxTransaction(isStart = true),開啟分布式事務。isStart = true聲明為分布式事務發起方

 1 @Service
 2 public class DemoServiceImpl implements DemoService {
 3 
 4     @Autowired
 5     private Demo2Feign demo2Feign;
 6     @Autowired
 7     private Demo3Feign demo3Feign;
 8 
 9     @Autowired
10     private UserMapper userMapper;
11 
12     private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
13 
14     @Override
15     public List<User> list() {
16         return userMapper.findAll();
17     }
18 
19     @Override
20     @TxTransaction(isStart = true)
21     @Transactional
22     public int save() throws Exception {
23 
24         int rs1 = userMapper.save("zhangsan", "shanghai");
25         logger.info("本地服務數據插入成功");
26          int rs2 = demo2Feign.save();
27 //        Integer rs2 = restTemplate.getForObject("http://127.0.0.1:8087/demo/save", Integer.class);
28 //        System.out.println(rs2); 
29         logger.info("遠程服務2號數據插入成功"+rs2);
30 //        if(0 ==rs2) {
31 //            throw new RuntimeException("遠程訪問出錯,全部回滾");
32 //        }
33          int rs3 = demo3Feign.save();
34          logger.info("遠程服務3號數據插入成功");
35          logger.info("插入" + (rs1 + rs2 + rs3) + "條記錄");
36          logger.info("制造異常");
37          int v = 100 / 0;
38 //        if(1==rs2 ) {
39 //            throw new RuntimeException("本地出錯,全部回滾");
40 //        }
41         return rs1 + rs2;
42     }
43 }

如上代碼執行完成以后所有參與此分布式事務模塊都將回滾事務。

 

分布式事務被調用方

需要實現TxManagerTxUrlService個接口,並作為bean注入到spring中。處理對服務器的連接。

 1 @Service
 2 public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{
 3 
 4 
 5     @Value("${tm.manager.url}")
 6     private String url;
 7 
 8     @Override
 9     public String getTxUrl() {
10         System.out.println("load tm.manager.url ");
11         return url;
12     }
13 }

 

加入分布式事務注解@TxTransaction,或者實現ITxTransaction接口,開啟分布式事務。等待起調方發起事務

 1 @Service
 2 public class DemoServiceImpl implements DemoService,ITxTransaction{
 3 
 4     @Autowired
 5     private PeopleMapper peopleMapper;
 6 
 7     private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
 8 
 9 
10     @Override
11     public List<People> list() {
12         return peopleMapper.findAll();
13     }
14 
15 
16     @Override
17     @Transactional
18     public int save() {
19         try {
20         
21         int rs = peopleMapper.save("男", "22");
22         logger.info("插入" + rs + "條記錄");
23         int a = 1/1;
24         System.out.println(a);
25         return rs;
26         }catch (Exception e) {
27             logger.error(e.getMessage());
28             return 0;
29         }
30     }
31 }

 

說明:在使用LCN分布式事務時,只需要將事務的開始方法添加@TxTransaction(isStart=true)注解即可,在參與方添加@TxTransaction或者實現ITxTransaction接口即可。

代碼下載地址:https://gitee.com/fengyuduke/my_open_resources/blob/master/demo.zip

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM