組件版本關系
Spring Cloud Alibaba Version | Sentinel Version | Nacos Version | RocketMQ Version | Dubbo Version | Seata Version |
---|---|---|---|---|---|
2.2.1.RELEASE or 2.1.2.RELEASE or 2.0.2.RELEASE |
1.7.1 |
1.2.1 |
4.4.0 |
2.7.6 |
1.2.0 |
2.2.0.RELEASE |
1.7.1 |
1.1.4 |
4.4.0 |
2.7.4.1 |
1.0.0 |
2.1.1.RELEASE or 2.0.1.RELEASE or 1.5.1.RELEASE |
1.7.0 |
1.1.4 |
4.4.0 |
2.7.3 |
0.9.0 |
2.1.0.RELEASE or 2.0.0.RELEASE or 1.5.0.RELEASE |
1.6.3 |
1.1.1 |
4.4.0 |
2.7.3 |
0.7.1 |
畢業版本依賴關系(推薦使用)
Spring Cloud Version | Spring Cloud Alibaba Version | Spring Boot Version |
---|---|---|
Spring Cloud Hoxton.SR3 |
2.2.1.RELEASE |
2.2.5.RELEASE |
Spring Cloud Hoxton.RELEASE |
2.2.0.RELEASE |
2.2.X.RELEASE |
Spring Cloud Greenwich |
2.1.2.RELEASE |
2.1.X.RELEASE |
Spring Cloud Finchley |
2.0.2.RELEASE |
2.0.X.RELEASE |
Spring Cloud Edgware |
1.5.1.RELEASE(停止維護,建議升級) |
1.5.X.RELEASE |
依賴管理
Spring Cloud Alibaba BOM 包含了它所使用的所有依賴的版本。
RELEASE 版本
Spring Cloud Hoxton
如果需要使用 Spring Cloud Hoxton 版本,請在 dependencyManagement 中添加如下內容
<dependency>
<groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
Spring Cloud Greenwich
如果需要使用 Spring Cloud Greenwich 版本,請在 dependencyManagement 中添加如下內容
<dependency>
<groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.1.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
Spring Cloud Finchley
如果需要使用 Spring Cloud Finchley 版本,請在 dependencyManagement 中添加如下內容
<dependency>
<groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.0.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
Spring Cloud Edgware
如果需要使用 Spring Cloud Edgware 版本,請在 dependencyManagement 中添加如下內容
<dependency>
<groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>1.5.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
服務注冊與發現nacos
何時選用哪種模式?
一般來說,如果不需要存儲服務級別的信息且服務實例是通過nacos-client注冊,並能夠保持心跳上報,那么就可以選擇AP模式。當前主流的服務如Spring Cloud 和 Dubbo服務,都適用於AP模式,AP模式為了服務的可能性而減弱了一致性,因此AP模式下只支持注冊臨時實例。
如果需要在服務級別編輯或者存儲配置信息,那么CP是必須的,K8S服務和DNS服務則適用於CP模式。CP模式下支持注冊持久化實例,此時則是以Raft協議為集群運行模式,該模式下注冊實例之前必須先注冊服務,如果服務不存在,則會返回錯誤。
curl -X PUT '$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'
#false為永久實例,true表示臨時實例開啟,注冊為臨時實例 spring.cloud.nacos.discovery.ephemeral=false
Nacos產生的背景
Nacos分布式注冊與發現功能|分布式配置中心
產生背景rpc遠程調用中,服務的url的治理
Rpc的遠程調用框架 HttpClient、gprc、dubbo、rest、openfeign等。
傳統的rpc遠程調用中存在哪些問題
1、超時的問題
2、安全的問題
3、服務與服務之間URL地址管理
在我們微服務架構通訊,服務之間依賴關系非常大,如果通過傳統的方式管理我們服務的url地址的情況下,一旦地址發生變化的情況下,還需要人工修改rpc遠程調用地址。
每個服務的url管理地址發出復雜,所以這是我們采用服務url治理技術,可以實現對我們整個實現動態服務注冊與發現、本地負載均衡、容錯等。
服務治理基本的概念
服務治理概念:
在RPC遠程調用過程中,服務與服務之間依賴關系非常大,服務Url地址管理非常復雜,所以這時候需要對我們服務的url實現治理,通過服務治理可以實現服務注冊與發現、負載均衡、容錯等。
rpc遠程調用中,地址中 域名和端口號/調用的方法名稱:
域名和端口號/調用的方法名稱。
服務注冊中心的概念
每次調用該服務如果地址直接寫死的話,一旦接口發生變化的情況下,這時候需要重新發布版本才可以接口調用地址,所以需要一個注冊中心統一管理我們的服務注冊與發現。
注冊中心:我們的服務注冊到我們注冊中心,key為服務名稱、value為該服務調用地址,該類型為集合類型。Eureka、consul、zookeeper、nacos等。
服務注冊:我們生產者項目啟動的時候,會將當前服務自己的信息地址注冊到注冊中心。
服務發現: 消費者從我們的注冊中心上獲取生產者調用的地址(集合),在使用負載均衡的策略獲取集群中某個地址實現本地rpc遠程調用。
微服務調用接口常用名詞
生產者:提供接口被其他服務調用
消費者:調用生產者接口實現消費
服務注冊:將當前服務地址自動注冊到 Nacos 服務端
服務發現:動態感知和刷新某個實例的服務列表
服務注冊原理實現:
1、生產者啟動的時候key=服務的名稱 value=ip:端口號 注冊到注冊中心上。如:member 127.0.0.1:8080
2、注冊存放服務地址列表類型:key唯一,列表是list集合。Map<Key,List(String)>
{
member:["127.0.0.1.8080","127.0.0.1.8081"]
}
3、消費者從注冊中心上根據服務名稱查詢服務地址列表(集合)
member=["127.0.0.1.8080","127.0.0.1.8081"]
4、消費者獲取到集群列表之后,采用負載均衡器選擇一個地址實現rpc遠程調用。
Nacos的基本的介紹
Nacos可以實現分布式服務注冊與發現/分布式配置中心框架。
官網的介紹: https://nacos.io/zh-cn/docs/what-is-nacos.html
Nacos的環境的准備
Nacos可以在linux/windows/Mac版本上都可以安裝
官方安裝地址:https://nacos.io/zh-cn/docs/quick-start.html
Nacos整合SpringCloud
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot 整合web組件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<!-- nacos注冊組件--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>0.2.2.RELEASE</version> </dependency> </dependencies>
提供者
spring: application: name: member-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 server: port: 8081
消費者
spring: application: name: order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 server: port: 8090
package com.lvym.order.service; import com.lvym.order.loadbalance.LoadBalace; import jdk.internal.org.objectweb.asm.tree.LdcInsnNode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.net.URI; import java.util.List; @RestController public class OrderService { @Autowired private DiscoveryClient discoveryClient; // @RequestMapping("/getOrderToMemberInfo") // public Object getOrderToMemberInfo(){ // List<ServiceInstance> instances = discoveryClient.getInstances("menber-service"); // System.out.println(">>>>>>>>>>>>>>>>>>"+instances); // List<String> services = discoveryClient.getServices(); // System.out.println(">>>>>>>>>>>services>>>>>>>"+services); // return instances.get(0); // } @Autowired private RestTemplate restTemplate; @Autowired private LoadBalace loadBalace; @RequestMapping("/getOrderToMemberInfo") public String getOrderToMemberInfo(){ //獲取服務列表 List<ServiceInstance> instances = discoveryClient.getInstances("menber-service"); // ServiceInstance serviceInstance = instances.get(0); //自定義輪詢 ServiceInstance singleAddres = loadBalace.getSingleAddres(instances); String result = restTemplate.getForObject(singleAddres.getUri()+"/getMemberInfo", String.class); return "訂單調用會員,"+result; } /** * ribbon + @LoadBalanced * @return */ @RequestMapping("/getOrderToRibbonMemberInfo") public String getOrderToRibbonMemberInfo(){ String result = restTemplate.getForObject("http://menber-service/getMemberInfo", String.class); return "訂單調用會員,"+result; } /*
需要手動注入
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
*/ @Autowired private LoadBalancerClient loadBalancerClient; /** * LoadBalancerClient - @LoadBalanced * @return */ @RequestMapping("/getOrderToLoadBalancerClientMemberInfo") public Object getOrderToLoadBalancerClientMemberInfo(){ URI uri = loadBalancerClient.choose("menber-service").getUri(); String result = restTemplate.getForObject(uri+"/getMemberInfo", String.class); return "訂單調用會員,"+result; } }
package com.lvym.order.loadbalance; import org.springframework.cloud.client.ServiceInstance; import java.util.List; public interface LoadBalace { ServiceInstance getSingleAddres(List<ServiceInstance> serviceInstance); }
package com.lvym.order.loadbalance; import org.springframework.cloud.client.ServiceInstance; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @Component public class RotationLoadBalance implements LoadBalace { private AtomicInteger atomicInteger=new AtomicInteger(0);//記錄訪問次數 @Override public ServiceInstance getSingleAddres(List<ServiceInstance> serviceInstance) { int index = atomicInteger.incrementAndGet() % serviceInstance.size(); return serviceInstance.get(index); } }
Nacos的集群部署
1.准備三台機器,並把nacos-server-1.3.1.tar.gz上傳到機器
2.解壓,配置config
application.properties
### Default web server port: server.port=8847
#Linux的默認需要配置數據庫
spring.datasource.platform=mysql
### Count of DB: 在這個配置文件的MySQL數量,在這里也可配置其他機器的MySQL
db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://192.168.146.110:12345/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user=root
db.password=123456
cluster.conf 這個IP不能寫127.0.0.1,必須是Linux命令hostname -i能夠識別的IP
192.168.146.110:8847 192.168.146.111:8848 192.168.146.112:8849
另外兩台做出相應修改
3.啟動
4.進入nacos管理界面
CAP原則又稱CAP定理,指的是在一個分布式系統中,一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)。CAP 原則指的是,這三個要素最多只能同時實現兩點,不可能三者兼顧。
Nacos與Eureka區別
1、Eureka采用ap模式形式實現注冊中心
2、Nacos默認采用ap模式。在1.0版本之后采用ap+cp模式混合實現注冊中心。
Eureka與Nacos底層實現集群協議區別
1、去中心化對等。相互注冊
2、Raft協議實現集群產生領導角色。
到底什么是分布式一致性協議的算法
分布式系統一致性算法 應用於系統軟件實現集群保持每個節點數據的同步性
保持我們的集群中每個節點的數據的一致性的問題,專業的術語分布式一致性的算法。
場景:Redis集群、nacos集群、mongdb集群等
Nacos與Zookeeper區別
1、Zookeeper采用cp模式形式實現注冊中心
2、Nacos默認采用ap模式。在1.0版本之后采用ap+cp模式混合實現注冊中心。
Zookeeper與Nacos底層實現集群協議區別
1、Zab協議集群,中心化思想集群模式。
2、Raft協議實現集群產生領導角色。
Zab協議集群原理
我們在分布式系統,存在多個系統之間的集群保持數據一致性,采用CP一致性算法保持數據的一致性問題。
Zookeeper基於ZAP協議實現保持每個節點的數據同步的問題,中心化思想集群模式;
分為領導和跟隨角色。
在程序中如何成為 某個節點能力比較強:
對每個節點配置一個myid或者serverid還有數值越大表示能力越強 或者隨機時間。
整個集群中為了保持數據的一致性的問題,必須滿足大多數情況>n/2+1 可運行的節點環境下才可以使用。
ZAP的協議實現原理事通過比較myid myid誰最大誰就是為可能是領導角色,只要滿足過半的機制就可以成為領導角色,后來啟動的節點不會參與選舉的。
Zab協議如何保持數據的一致性問題?
所有寫的請求統一交給我們的領導角色實現,領導角色寫完數據之后,領導角色再將 數據同步給每個節點。
注意:數據之間同步采用2pc兩個階段提交協議。
第一階段:攜帶zxid發送給每個Follower,詢問是否同步數據,Follower回饋。
第二階段:攜帶zxid發送給每個Follower,詢問是否提交數據,Follower回饋。
選舉過程:
先去比較zxid zxid誰最大誰就是為領導角色;
如果zxid相等的情況下,myid誰最大誰就為領導角色;
Raft協議選舉的基本概念
在Raft協議算法中分為角色|名詞:
1、狀態:分為三種 跟隨者、競選者(候選人)、領導角色 。 三台機器似乎只有領導者和候選人
2、大多數:>=n/2+1
3、任期:每次選舉一個新的領導角色 任期都會 +1
4、競選者誰的票數最多,誰就是為領導角色
多個競選者,產生的票數都一樣,這到底誰是領導角色,服務器集群是偶數的情況下。總之票數相同就重新選舉。
SpringCloud負載均衡器說明
在SpringCloud第一代中使用Ribbon、SpringCloud第二代中直接采用自研發loadbalancer即可,默認使用的Ribbon。
ribbon:
//需要手動注入
@Bean @LoadBalanced public RestTemplate restTemplate() { return new RestTemplate(); }
/** * ribbon + @LoadBalanced * @return */ @RequestMapping("/getOrderToRibbonMemberInfo") public String getOrderToRibbonMemberInfo(){ String result = restTemplate.getForObject("http://menber-service/getMemberInfo", String.class); return "訂單調用會員,"+result; }
loadbalancer:
@Autowired private LoadBalancerClient loadBalancerClient; /** * LoadBalancerClient - @LoadBalanced * @return */ @RequestMapping("/getOrderToLoadBalancerClientMemberInfo") public Object getOrderToLoadBalancerClientMemberInfo(){ URI uri = loadBalancerClient.choose("menber-service").getUri(); String result = restTemplate.getForObject(uri+"/getMemberInfo", String.class); return "訂單調用會員,"+result; }
本地負載均衡與Nginx 的區別
本地負載均衡
本地負載均衡器基本的概念:我們的消費者服務從我們的注冊中心獲取到集群地址列表,緩存到本地,然后本地采用負載均衡策略(輪訓、隨機、權重等),實現本地的rpc遠程的。
本地負載均衡器有哪些呢:自己寫、ribbon SpringleCloud第一代中loadbalancer SpringCloud自己研發。
如何選擇ribbon還是loadbalancer
SpringCloud Rest或者Openfeign都是默認支持ribbon。
區別
Nginx是客戶端所有的請求統一都交給我們的Nginx處理,讓后在由Nginx實現負載均衡轉發,屬於服務器端負載均衡器。
本地負載均衡器是從注冊中心獲取到集群地址列表,本地實現負載均衡算法,既本地負載均衡器。
應用場景:
Nginx屬於服務器負載均衡,應用於Tomcat/Jetty服務器等,而我們的本地負載均衡器,應用於在微服務架構中rpc框架中,rest、openfeign、dubbo。
OpenFeign客戶端
OpenFeign是一個Web聲明式的Http客戶端調用工具,提供接口和注解形式調用。
SpringCloud第一代采用feign第二代采用openfeign
openfeign客戶端作用:是一個Web聲明式的Http客戶端遠程調用工具,底層是封裝HttpClient技術。
Openfeign屬於SPringleCloud自己研發,而feign是netflix代碼寫法幾乎是沒有任何變化。
注意feign客戶端調用的事項:如果請求參數沒有加上注解的話,默認采用post請求發送。
Openfeign默認是支持負載均衡,ribbon。
測試:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <version>2.0.0.RELEASE</version> </dependency>
調用方:繼承被調用方的接口
@FeignClient("member-service") public interface MemberServiceFeign extends MemberService{}
分布式配置中心
分布式配置中心產生的背景?
在項目中定義配置文件,最大的缺陷?
如果在生成環境正在運行的時候突然需要修改配置文件的話,必須重啟我們的服務器。
分布式配置中心的框架有哪些:
攜程的阿波羅、Nacos(屬於輕量級)、SpringCloud Config(沒有界面)、攜程的阿波羅(屬於比較重的分布式配置)/disConfig等。
輕量級:部署、架構設計原理都比較簡單,學習成本也是比較低:
重量級:部署、架構設計、體量都是非常大,學習成本是比較高。
如何判斷配置文件是否發生變化 采用版本|MD5(nacos)
分布式配置中心實現原理:
1、本地應用讀取我們雲端分布式配置中心文件(第一次建立長連接)
2、本地應用讀取到配置文件之后,本地jvm和硬盤中都會緩存一份。
3、本地應用與分布式配置中心服務器端一致保持長連接。
4、當我們的配置文件發生變化(MD5|版本號)實現區分,將變化的結果通知給我們的本地應用實時的刷新我們的配置文件。
完全百分百實現動態化修改我們的配置文件。
注意:Nacos分布式配置中心和注冊中心都部署在同一個應用,就是一個單體的應用。
分布式配置中心的作用
分布式配置中心可以實現不需要重啟我們的服務器,動態的修改我們的配置文件內容,
常見的配置中心有攜程的阿波羅、SpringCloud Config、Nacos輕量級的配置中心等。
指定spring.profile.active和配置文件的DataID來使不同環境下讀取不同的配置
測試:
進入nacos配置
命名規則:https://github.com/alibaba/spring-cloud-alibaba/wiki/Nacos-config
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>0.2.2.RELEASE</version> </dependency>
新建bootstrap.yml
spring: application: name: nacos-service-client cloud: nacos: config: server-addr: 127.0.0.1:8848 group: DEFAULT_GROUP #namespace: ****
file-extension: yaml #要與配置中心文件后綴完全匹配 yml=yml yaml=yaml properties=properties
application.yml
spring: cloud: nacos: discovery: server-addr: 127.0.0.1:8848 profiles: active: dev #版本
測試:
package com.lvym.nacos.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RefreshScope //實時刷新,拉取配置 public class NacosController { @Value("${ants.name}") private String antsName; @GetMapping("/getInfo") public String getInfo(){ return antsName; } }
配置中心數據持久化,不會因為每次重啟配置會消失,默認存儲內存,
1.導入sql文件到數據庫,文件在conf目錄下nacos-mysql.sql
2.修改配置文件 application.properties
#*************** Config Module Related Configurations ***************# ### If use MySQL as datasource: spring.datasource.platform=mysql ### Count of DB: db.num=1 ### Connect URL of DB: db.url.0=jdbc:mysql://139.196.130.111:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC db.user=lvym db.password=tbny1312
3.重新啟動。
默認的情況下,分布式配置中心的數據存放到本地data目錄下,但是這種情況如果nacos集群的話無法保證數據的同步性。
在0.7版本之前,在單機模式時nacos使用嵌入式數據庫實現數據的存儲,不方便觀察數據存儲的基本情況。0.7版本增加了支持mysql數據源能力,具體的操作步驟:
1.安裝數據庫,版本要求:5.6.5+ 版本在1.3.1之后可以使用MySQL8
2.初始化mysql數據庫,數據庫初始化文件:nacos-mysql.sql
3.修改conf/application.properties文件,增加支持mysql數據源配置(目前只支持mysql),添加mysql數據源的url、用戶名和密碼。
基於Nacos+Nginx集群部署方案
1.nacos集群跟上面一樣,
2.配置 nginx.conf
#gzip on; upstream nacosCluster{ server 127.0.0.1:8847 server 127.0.0.1:8848;
server 127.0.0.1:8849; } server { listen 80; server_name localhost; location /nacos/ { proxy_pass http://nacosCluster; }
3.配置
spring: application: ###服務的名稱 name: nacos-client cloud: nacos: discovery: ###nacos注冊地址 server-addr: 127.0.0.1:8847,127.0.0.1:8848,127.0.0.1:8849 enabled: true config: ###配置中心連接地址 可選 server-addr: 127.0.0.1:8847,127.0.0.1:8848,127.0.0.1:8849 ###分組 group: DEFAULT_GROUP ###類型 file-extension: yaml
微服務網關
微服務網關是整個微服務API請求的入口,可以實現過濾Api接口。
作用:可以實現用戶的驗證登錄、解決跨域、日志攔截、權限控制、限流、熔斷、負載均衡、黑名單與白名單機制等。
微服務中的架構模式采用前后端分離,前端調用接口地址都能夠被抓包分析到。
在微服務中,我們所有的企業入口必須先經過Api網關,經過Api網關轉發到真實的服務器中。
如果此時需要添加驗證會話信息:
傳統的方式我們可以使用過濾器攔截用戶會話信息,這個過程所有的服務器都必須寫入該驗證會話登錄的代碼。
過濾器與網關的區別
過濾器適合於單個服務實現過濾請求;
網關攔截整個的微服務實現過濾請求,能夠解決整個微服務中冗余代碼。
過濾器是局部攔截,網關實現全局攔截。
Zuul與Gateway有哪些區別
Zuul網關屬於netfix公司開源的產品,屬於第一代微服務網關。
Gateway屬於SpringCloud自研發的網關框架,屬於第二代微服務網關。
相比來說SpringCloudGateway性能比Zuul性能要好。
注意:Zuul網關底層基於Servlet實現的,阻塞式的Api, 不支持長連接。
SpringCloudGateway基於Spring5構建,能夠實現響應式非阻塞式的Api,支持長連接,能夠更好的整合Spring體系的產品,依賴SpringBoot-WebFux。
Spring Cloud Gateway 使用的Webflux中的reactor-netty響應式編程組件,底層使用了Netty通訊框架
測試:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> <version>2.0.0.RELEASE</version> </dependency>
不能有,否則報錯
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
yml
server:
port: 80
spring:
application:
name: gateway-service
cloud:
gateway:
discovery:
locator:
enabled: true #開啟以服務id去注冊中心上獲取轉發地址
routes:
- id: baidu
uri: http://www.baidu.com/ #轉發http://www.baidu.com/
predicates:
- Path=/bd/** #匹配規則
# 127.0.0.1/bd 轉發到http://www.baidu.com/
##路由id
- id: member
uri: lb://member-service
filters: #過濾
- StripPrefix=1
predicates:
- Path=/member/**
#- After=2020-03-08T10:59:34.102+08:00[Asia/Shanghai]
#- Cookie=username,zhangshuai #並且Cookie是username=zhangshuai才能訪問
#- Header=X-Request-Id, \d+ #請求頭中要有X-Request-Id屬性並且值為整數的正則表達式
#- Host=**.lvym.com
#- Method=GET
#- Query=username, \d+ #要有參數名稱並且是正整數才能路由
可以通過訪問 http://localhost/member/接口 訪問。。。。
Nginx與網關的區別
微服務網關能夠做的事情,Nginx也可以實現。
相同點:都是可以實現對api接口的攔截,負載均衡、反向代理、請求過濾等,可以實現和網關一樣的效果。
不同點:
Nginx采用C語言編寫的
在微服務領域中,都是自己語言編寫的,比如我們使用java構建微服務項目,Gateway就是java語言編寫的。
Gateway屬於Java語言編寫的, 能夠更好對微服務實現擴展功能,相比Nginx如果想實現擴展功能需要結合Nginx+Lua語言等。
Nginx實現負載均衡的原理:屬於服務器端負載均衡器。
Gateway實現負載均衡原理:采用本地負載均衡器的形式。
GateWay高可用
使用Nginx或者lvs虛擬vip訪問增加系統的高可用。
動態請求參數網關
動態網關:任何配置都實現不用重啟網關服務器都可以及時刷新網關配置。
方案:
1.基於數據庫形式實現,特別建議,閱讀性高
2.基於配置中心實現,不建議使用,需要定義json格式配置,閱讀性差
注意:配置中心實現維護性比較差,建議采用數據庫形式設計。
基於數據庫表形式的設計
網關已經提供了API接口
1、直接新增
2、直接修改
思路:
默認加載時候
1、當我們的網關服務啟動的時候,從我們數據庫查詢網關的配置。
2、將數據庫的內容讀取到網關內存中
網關配置要更新的時候,需要同步調用
測試:
1.增加表
CREATE TABLE `gateway` ( `id` int(11) NOT NULL AUTO_INCREMENT, `route_id` varchar(11) DEFAULT NULL, `route_name` varchar(255) DEFAULT NULL, `route_pattern` varchar(255) DEFAULT NULL, `route_type` varchar(255) DEFAULT NULL, `route_url` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
2.依賴
<!-- mysql 依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 阿里巴巴數據源 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.14</version> </dependency>
3.實體類
package com.lvym.gateway.entity; import lombok.Data; @Data public class GateWayEntity { private Long id; private String routeId; private String routeName; private String routePattern; private String routeType; private String routeUrl; }
4.yml
server: port: 80 spring: application: name: gateway-service cloud: gateway: discovery: locator: enabled: true #開啟以服務id去注冊中心上獲取轉發地址 routes: # - id: baidu # uri: http://www.baidu.com/ #轉發http://www.baidu.com/ # predicates: # - Path=/bd/** #匹配規則 # # 127.0.0.1/bd 轉發到http://www.baidu.com/ # ##路由id # - id: member # uri: lb://member-service # filters: #過濾 # - StripPrefix=1 # predicates: # - Path=/member/** nacos: discovery: server-addr: 127.0.0.1:8848 datasource: url: jdbc:mysql://139.196.130.111:3306/gateway?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: lvym password: tbny1312 driver-class-name: com.mysql.cj.jdbc.Driver
5.實現類
package com.lvym.gateway.service; import com.lvym.gateway.entity.GateWayEntity; import com.lvym.gateway.mapper.AntsGatewayMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.event.RefreshRoutesEvent; import org.springframework.cloud.gateway.filter.FilterDefinition; import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition; import org.springframework.cloud.gateway.route.RouteDefinition; import org.springframework.cloud.gateway.route.RouteDefinitionWriter; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Service; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Mono; import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class GatewayService implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @Autowired private RouteDefinitionWriter routeDefinitionWriter; @Autowired private AntsGatewayMapper antsGatewayMapper; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } public String initAllRoute() { // 從數據庫查詢配置的網關配置 List<GateWayEntity> gateWayEntities = antsGatewayMapper.gateWayAll(); for (GateWayEntity gw : gateWayEntities) { loadRoute(gw); } return "SUCCESS"; } public String loadRoute(GateWayEntity gateWayEntity) { RouteDefinition definition = new RouteDefinition(); Map<String, String> predicateParams = new HashMap<>(8); PredicateDefinition predicate = new PredicateDefinition(); FilterDefinition filterDefinition = new FilterDefinition(); Map<String, String> filterParams = new HashMap<>(8); // 如果配置路由type為0的話 則從注冊中心獲取服務 URI uri = null; if (gateWayEntity.getRouteType().equals("0")) { uri = UriComponentsBuilder.fromUriString("lb://" + gateWayEntity.getRouteUrl() + "/").build().toUri(); } else { uri = UriComponentsBuilder.fromHttpUrl(gateWayEntity.getRouteUrl()).build().toUri(); } // 定義的路由唯一的id definition.setId(gateWayEntity.getRouteId()); predicate.setName("Path"); //路由轉發地址 predicateParams.put("pattern", gateWayEntity.getRoutePattern()); predicate.setArgs(predicateParams); // 名稱是固定的, 路徑去前綴 filterDefinition.setName("StripPrefix"); filterParams.put("_genkey_0", "1"); filterDefinition.setArgs(filterParams); definition.setPredicates(Arrays.asList(predicate)); definition.setFilters(Arrays.asList(filterDefinition)); definition.setUri(uri); routeDefinitionWriter.save(Mono.just(definition)).subscribe(); this.publisher.publishEvent(new RefreshRoutesEvent(this)); return "SUCCESS"; } }
6.mapper
package com.lvym.gateway.mapper; import com.lvym.gateway.entity.GateWayEntity; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import java.util.List; public interface AntsGatewayMapper { @Select("SELECT ID AS ID, route_id as routeid, route_name as routeName,route_pattern as routePattern\n" + ",route_type as routeType,route_url as routeUrl\n" + " FROM ants_gateway\n") List<GateWayEntity> gateWayAll(); @Update("update ants_gateway set route_url=#{routeUrl} where route_id=#{routeId};") Integer updateGateWay(@Param("routeId") String routeId, @Param("routeUrl") String routeUrl); }
7.controller
package com.lvym.gateway.controller; import com.lvym.gateway.service.GatewayService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class GatewayController { @Autowired private GatewayService gatewayService; /** * 同步網關配置 * * @return */ @RequestMapping("/synGatewayConfig") public String synGatewayConfig() { return gatewayService.initAllRoute(); } }
更多網關配置:https://cloud.spring.io/spring-cloud-gateway/reference/html/#gatewayfilter-factories
GateWay解決跨域的問題
@Component public class CrossOriginFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); HttpHeaders headers = response.getHeaders(); headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "POST, GET, PUT, OPTIONS, DELETE, PATCH"); headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true"); headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "*"); headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "*"); return chain.filter(exchange); } }
網關GateWay源碼分析
1.客戶端向網關發送Http請求,會到達DispatcherHandler接受請求,匹配到 RoutePredicateHandlerMapping。
- 根據RoutePredicateHandlerMapping匹配到具體的路由策略。
- FilteringWebHandler獲取的路由的GatewayFilter數組,創建 GatewayFilterChain 處理過濾請求
- 執行我們的代理業務邏輯訪問。
常用配置類說明:
- GatewayClassPathWarningAutoConfiguration 檢查是否有正確的配置webflux
- GatewayAutoConfiguration 核心配置類
- GatewayLoadBalancerClientAutoConfiguration 負載均衡策略處理
- GatewayRedisAutoConfiguration Redis+lua整合限流
spring.factoies
- GatewayClassPathWarningAutoConfiguration 作用檢查是否配置我們webfux依賴。
- GatewayAutoConfiguration加載了我們Gateway需要的注入的類。
- GatewayLoadBalancerClientAutoConfiguration 網關需要使用的負載均衡 Lb//mayikt-member// 根據服務名稱查找真實地址
- GatewayRedisAutoConfiguration 網關整合Redis整合Lua實現限流
- GatewayDiscoveryClientAutoConfiguration 服務注冊與發現功能
解決跨域的問題
- HttpClient轉發
- 使用過濾器允許接口可以跨域 響應頭設置
- Jsonp 不支持我們的post 屬於前端解決
- Nginx解決跨域的問題保持我們域名和端口號一致性
- Nginx也是通過配置文件解決跨域的問題
- 基於微服務網關解決跨域問題,需要保持域名和端口一致性
- 使用網關代碼允許所有的服務可以跨域的問題
- 使用SpringBoot注解形式@CrossOrigin
SpringCloud Sentinel
Sentinel 介紹
隨着微服務的流行,服務和服務之間的穩定性變得越來越重要。 Sentinel 以流量為切入點,從流量控制、熔斷降級、系統負載保護等多個維度保護服務的穩定性。
Sentinel 具有以下特征:
-
豐富的應用場景: Sentinel 承接了阿里巴巴近 10 年的雙十一大促流量的核心場景,例如秒殺(即突發流量控制在系統容量可以承受的范圍)、消息削峰填谷、實時熔斷下游不可用應用等。
-
完備的實時監控: Sentinel 同時提供實時的監控功能。您可以在控制台中看到接入應用的單台機器秒級數據,甚至 500 台以下規模的集群的匯總運行情況。
-
廣泛的開源生態: Sentinel 提供開箱即用的與其它開源框架/庫的整合模塊,例如與 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相應的依賴並進行簡單的配置即可快速地接入 Sentinel。
-
完善的 SPI 擴展點: Sentinel 提供簡單易用、完善的 SPI 擴展點。您可以通過實現擴展點,快速的定制邏輯。例如定制規則管理、適配數據源等。
服務限流/熔斷
服務限流目的是為了更好的保護我們的服務,在高並發的情況下,如果客戶端請求的數量達到一定極限(后台可以配置閾值),請求的數量超出了設置的閾值,開啟自我的保護,直接調用我們的服務降級的方法,不會執行業務邏輯操作,直接走本地blockHandler的方法,返回一個友好的提示。
服務降級
在高並發的情況下, 防止用戶一直等待,采用限流/熔斷方法,使用服務降級的方式返回一個友好的提示給客戶端,不會執行業務邏輯請求,直接走本地的fallback的方法。返回一個友好的提示給到客戶端。
提示語:當前排隊人數過多,稍后重試~
熔斷降級設計理念
在限制的手段上,Sentinel 和 Hystrix 采取了完全不一樣的方法。
Hystrix 通過 線程池隔離 的方式,來對依賴(在 Sentinel 的概念中對應 資源)進行了隔離。這樣做的好處是資源和資源之間做到了最徹底的隔離。缺點是除了增加了線程切換的成本(過多的線程池導致線程數目過多),還需要預先給各個資源做線程池大小的分配。
Sentinel 對這個問題采取了兩種手段:
- 通過並發線程數進行限制
和資源池隔離的方法不同,Sentinel 通過限制資源並發線程的數量,來減少不穩定資源對其它資源的影響。這樣不但沒有線程切換的損耗,也不需要您預先分配線程池的大小。當某個資源出現不穩定的情況下,例如響應時間變長,對資源的直接影響就是會造成線程數的逐步堆積。當線程數在特定資源上堆積到一定的數量之后,對該資源的新請求就會被拒絕。堆積的線程完成任務后才開始繼續接收請求。
- 通過響應時間對資源進行降級
除了對並發線程數進行控制以外,Sentinel 還可以通過響應時間來快速降級不穩定的資源。當依賴的資源出現響應時間過長后,所有對該資源的訪問都會被直接拒絕,直到過了指定的時間窗口之后才重新恢復。
fallback與blockHandler的區別
fallback是服務熔斷或者業務邏輯出現異常執行的方法(1.6版本以上)
blockHandler 限流出現錯誤執行的方法
服務的雪崩效應
默認的情況下,Tomcat或者是Jetty服務器只有一個線程池去處理客戶端的請求,
這樣的話就是在高並發的情況下,如果客戶端所有的請求都堆積到同一個服務接口上,
那么就會產生tomcat服務器所有的線程都在處理該接口,可能會導致其他的接口無法訪問,短暫沒有線程處理
Sentinel 與hytrix區別
哨兵以流量為切入點,從流量控制,熔斷降級,系統負載保護等多個維度保護服務的穩定性。
Sentinel的斷路器是沒有半開狀態的,半開的狀態系統自動去檢測是否請求有異常,沒有異常就關閉斷路器恢復使用,有異常則繼續打開斷路器不可用。
Sentinel采用的懶加載說明。
Sentinel 實現對Api動態限流
限流配置有兩種方案:
1、手動使用代碼配置 純代碼/注解的形式
2、Sentinel控制台形式配置+注解 推薦
默認情況下Sentinel不對數據持久化,需要自己獨立持久化
測試:控制台+注解
1.下載sentinel-dashboard-1.7.1.jar 並運行 java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar 可做相應修改
2.依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-sentinel</artifactId> <version>0.2.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
3.yml
server: port: 8090 spring: application: name: order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: dashboard: 127.0.0.1:8080 #界面訪問接口 port: 8719 #默認 通訊接口 eager: true
4.實現類
流控--QPS
private static final String GETORDER_KEY = "getOrder";
/** * 限流
* 不寫注解 @SentinelResource 默認資源名是 /getOrderAnnotation 且有自己的回滾提示。 blockHandlerClass * @return */ @SentinelResource(value = GETORDER_KEY,blockHandler ="getOrderQpsException") @RequestMapping("/getOrderAnnotation") public String getOrderAnnotation() { return "getOrder接口"; } /** * 被限流后返回的提示 * * @param e * @return */ public String getOrderQpsException(BlockException e) { e.printStackTrace(); return "該接口已經被限流啦!"; }
啟動項目,並登錄sentinel管理界面
流控 --線程數
/** * 限流-- 並發數量處理限流 * @return */ @SentinelResource(value = "getOrderThrad",blockHandler ="getOrderQpsException") @RequestMapping("/getOrderThrad") public String getOrderThrad() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (Exception e) { } return "getOrderThrad"; }
降級---RT
/** * 基於我們的平均響應時間實現降級 * fallbackClass * @return */ @SentinelResource(value = "getOrderDowngradeRtType", fallback = "getOrderDowngradeRtTypeFallback") @RequestMapping("/getOrderDowngradeRtType") public String getOrderDowngradeRtType() { try { Thread.sleep(300); } catch (Exception e) { System.out.println("降級不會走"); } System.out.println("次數"); return "正常執行我們業務邏輯"; } public String getOrderDowngradeRtTypeFallback() { return "執行我們本地的服務降級的方法"; }
平均響應時間 (DEGRADE_GRADE_RT
):當 1s 內持續進入 N 個請求,對應時刻的平均響應時間(秒級)均超過閾值(count
,以 ms 為單位),那么在接下的時間窗口(DegradeRule
中的 timeWindow
,以 s 為單位)之內,對這個方法的調用都會自動地熔斷(拋出 DegradeException
)。注意 Sentinel 默認統計的 RT 上限是 4900 ms,超出此閾值的都會算作 4900 ms,若需要變更此上限可以通過啟動配置項 -Dcsp.sentinel.statistic.max.rt=xxx
來配置
降級----異常比例
/** * 基於我們錯誤率/異常實現服務降級 * * @return */ @SentinelResource(value = "getOrderDowngradeErrorType", fallback = "getOrderDowngradeErrorTypeFallback") @RequestMapping("/getOrderDowngradeErrorType") public String getOrderDowngradeErrorType(int age) { int j = 1 / age; return "正常執行我們業務邏輯:j" + j; } public String getOrderDowngradeErrorTypeFallback(int age) { return "錯誤率/異常數太高,展示無法訪問該接口"; }
異常比例 (DEGRADE_GRADE_EXCEPTION_RATIO
):當資源的每秒請求量 >= N(可配置),並且每秒異常總數占通過量的比值超過閾值(DegradeRule
中的 count
)之后,資源進入降級狀態,即在接下的時間窗口(DegradeRule
中的 timeWindow
,以 s 為單位)之內,對這個方法的調用都會自動地返回。異常比率的閾值范圍是 [0.0, 1.0]
,代表 0% - 100%。
降級---異常數
/** * 基於我們錯誤率/異常實現服務降級 * * @return */ @SentinelResource(value = "getOrderDowngradeErrorType", fallback = "getOrderDowngradeErrorTypeFallback") @RequestMapping("/getOrderDowngradeErrorType") public String getOrderDowngradeErrorType(int age) { int j = 1 / age; return "正常執行我們業務邏輯:j" + j; } public String getOrderDowngradeErrorTypeFallback(int age) { return "錯誤率/異常數太高,展示無法訪問該接口"; }
異常數 (DEGRADE_GRADE_EXCEPTION_COUNT
):當資源近 1 分鍾的異常數目超過閾值之后會進行熔斷。注意由於統計時間窗口是分鍾級別的,若 timeWindow
小於 60s,則結束熔斷狀態后仍可能再進入熔斷狀態。
熱點規則
何為熱點?熱點即經常訪問的數據。很多時候我們希望統計某個熱點數據中訪問頻次最高的 Top K 數據,並對其訪問進行限制。比如:
- 商品 ID 為參數,統計一段時間內最常購買的商品 ID 並進行限制
- 用戶 ID 為參數,針對一段時間內頻繁訪問的用戶 ID 進行限
熱點參數限流會統計傳入參數中的熱點參數,並根據配置的限流閾值與模式,對包含熱點參數的資源調用進行限流。熱點參數限流可以看做是一種特殊的流量控制,僅對包含熱點參數的資源調用生效。
Sentinel 利用 LRU 策略統計最近最常訪問的熱點參數,結合令牌桶算法來進行參數級別的流控。熱點參數限流支持集群模式。
@SentinelResource(value = "seckill", fallback = "seckillFallback", blockHandler = "seckillBlockHandler") @RequestMapping("/seckill") public String seckill(Long userId, Long orderId) { return "秒殺成功"; } public String seckillFallback(Long userId, Long orderId) { return "不走這里"; } public String seckillBlockHandler(Long userId, Long orderId) { return "不走這里"; }
熱點回滾:
package com.lvym.service.order.config; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestControllerAdvice; @RestControllerAdvice public class InterfaceExceptionHandler { @ResponseBody @ExceptionHandler(ParamFlowException.class) public String businessInterfaceException(ParamFlowException e) { return "您當前訪問的頻率過高,請稍后重試!"; } }
Sentinel規則的持久化
默認的情況下Sentinel的規則是存放在內存中,如果Sentinel客戶端重啟后,Sentinel數據規則可能會丟失。
解決方案:
Sentinel持久化機制支持四種持久化的機制。
- 本地文件 ,拉模式
- 攜程阿波羅 ,推模式
- Nacos ,推模式
- Zookeeper ,推模式
基於Nacos持久化我們的數據規則+動態修改
進入nacos界面配置
resource:資源名,即限流規則的作用對象
limitApp:流控針對的調用來源,若為 default 則不區分調用來源
grade:限流閾值類型(QPS 或並發線程數);0代表根據並發數量來限流,1代表根據QPS來進行流量控制
count:限流閾值
strategy:調用關系限流策略
controlBehavior:流量控制效果(直接拒絕、Warm Up、勻速排隊)
clusterMode:是否為集群模式
[ { "resource": "getOrderSentinel", "limitApp": "default", "grade": 1, "count": 1, "strategy": 0, "controlBehavior": 0, "clusterMode": false } ]
實現類:
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> <version>1.5.2</version> </dependency>
yml
server: port: 8090 spring: application: name: order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 sentinel: transport: dashboard: 127.0.0.1:8080 #界面訪問接口 port: 8719 #默認 通訊接口 eager: true datasource: ds: nacos: ### nacos連接地址 server-addr: localhost:8848 ## nacos連接的分組 group-id: DEFAULT_GROUP ###路由存儲規則 rule-type: flow ### 讀取配置文件的 data-id data-id: order-sentinel ### 讀取培訓文件類型為json data-type: json
/** * 持久化 * @return */ @SentinelResource(value = "getOrderSentinel", blockHandler = "getOrderQpsException") @RequestMapping("/getOrderSentinel") public String getOrderSentinel() { return "getOrderSentinel持久化"; }
SpringCloudGateWay整合sentinel實現限流
https://github.com/alibaba/Sentinel/wiki/%E7%BD%91%E5%85%B3%E9%99%90%E6%B5%81
1.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> <version>2.0.0.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> <version>1.6.0</version> </dependency>
2.yml
server: port: 80 spring: application: name: gateway-service cloud: gateway: discovery: locator: enabled: true #開啟以服務id去注冊中心上獲取轉發地址 routes: - id: baidu uri: http://www.baidu.com/ #轉發http://www.baidu.com/ predicates: - Path=/bd/** #匹配規則 #127.0.0.1/bd 轉發到http://www.baidu.com/
3.GatewayConfiguration
package com.lvym.gateway.config; import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter; import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler; import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.web.reactive.result.view.ViewResolver; import java.util.Collections; import java.util.List; @Configuration public class GatewayConfiguration { private final List<ViewResolver> viewResolvers; private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer) { this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList); this.serverCodecConfigurer = serverCodecConfigurer; } // @Bean // @Order(Ordered.HIGHEST_PRECEDENCE) // public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() { // // Register the block exception handler for Spring Cloud Gateway. // return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer); // } /** * 自定以回滾 * @return */ @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public JsonSentinelGatewayBlockExceptionHandler jsonSentinelGatewayBlockExceptionHandler() { // 自定義 return new JsonSentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer); } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public GlobalFilter sentinelGatewayFilter() { return new SentinelGatewayFilter(); } }
自定義回滾:
package com.lvym.gateway.config; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.reactive.result.view.ViewResolver; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebExceptionHandler; import reactor.core.publisher.Mono; import java.nio.charset.StandardCharsets; import java.util.List; public class JsonSentinelGatewayBlockExceptionHandler implements WebExceptionHandler { public JsonSentinelGatewayBlockExceptionHandler(List<ViewResolver> viewResolvers, ServerCodecConfigurer serverCodecConfigurer) { } @Override public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) { ServerHttpResponse serverHttpResponse = exchange.getResponse(); serverHttpResponse.getHeaders().add("Content-Type", "application/json;charset=UTF-8"); byte[] datas = "{\"code\":403,\"msg\":\"API接口被限流\"}".getBytes(StandardCharsets.UTF_8); DataBuffer buffer = serverHttpResponse.bufferFactory().wrap(datas); return serverHttpResponse.writeWith(Mono.just(buffer)); } }
4.規則
package com.lvym.gateway.config; import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule; import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.HashSet; import java.util.Set; @Slf4j @Component public class SentinelApplicationRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args){ initGatewayRules(); } /** * 配置限流規則 */ private void initGatewayRules() { Set<GatewayFlowRule> rules = new HashSet<>(); rules.add(new GatewayFlowRule("baidu") // 限流閾值 .setCount(1) // 統計時間窗口,單位是秒,默認是 1 秒 .setIntervalSec(1) ); GatewayRuleManager.loadRules(rules); } }
基於nacos網關動態sentinel,不知什么原因,一走網關,sentinel就會刪除規則???????????????
Feign整合Sentinel
<!--sentinel客戶端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
在配置文件中開啟Feign對Sentinel的支持
feign: sentinel: enabled: true
容錯類中拿到具體的錯誤
@FeignClient(value = "service-product",
fallbackFactory = ProductServiceFallBackFactory.class)
fallback
import com.itheima.service.ProductService; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; //這是容錯類,他要求我們要是實現一個FallbackFactory<要為哪個接口產生容錯類> @Slf4j @Service public class ProductServiceFallbackFactory implements FallbackFactory<ProductService> { //Throwable 這就是fegin在調用過程中產生異常 @Override public ProductService create(Throwable throwable) { return new ProductService() { @Override public Product findByPid(Integer pid) { log.error("{}",throwable); Product product = new Product(); product.setPid(-100); product.setPname("商品微服務調用出現異常了,已經進入到了容錯方法中"); return product; } }; } }
SpringCloud 解決分布式事務
Base與CAP理論
這個定理的內容是指的是在一個分布式系統中、Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。
一致性C:在分布式系統中,同一時刻所有的節點的數據都是相同的;
可用性A: 集群中部分節點出現了故障,集群的整體也能夠給響應;
分區容錯性P:分區容錯性是指系統能夠容忍節點之間的網絡通信的故障,意味着發生了分區的情況,必須就當前操作在C和A之間做出選擇;
BASE是Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent(最終一致性)三個短語的縮寫。
目前主流分布式解決框架
- 單體項目多數據源 可以jta+ Atomikos
- 基於rabbitmq的形式解決 最終一致性的思想
- 基於rocketmq解決分布式事務 采用事務消息
- LCN采用lcn模式 假關閉連接 (目前已經被淘汰)
- Alibaba的Seata 未來可能是主流 背景非常強大
兩階段提交協議基本概念
兩階段提交協議可以理解為2pc,也就是分為參與者和協調者,協調者會通過兩次階段實現數據最終的一致性的。
2PC和3pc的區別就是解決參與者超時的問題和多加了一層詢問,保證數據的傳輸可靠性。
Seata 是什么?
Seata 是一款開源的分布式事務解決方案,致力於提供高性能和簡單易用的分布式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。
官方文檔:http://seata.io/zh-cn/docs/overview/what-is-seata.html
Seata的實現原理
Seata有3個基本組成部分:
事務協調器(TC):維護全局事務和分支事務的狀態,驅動全局提交或回滾。
事務管理器TM:定義全局事務的范圍:開始全局事務,提交或回滾全局事務。
資源管理器(RM):管理分支事務正在處理的資源,與TC進行對話以注冊分支事務並報告分支事務的狀態,並驅動分支事務的提交或回滾。
分布式事務的執行流程
TM開啟分布式事務(TM向TC注冊全局事務記錄)
換業務場景,編排數據庫,服務等事務內資源(RM向TC匯報資源准備狀態)
TM結束分布式事務,事務一階段結束(TM通知TC提交/回滾分布式事務)
TC匯總事務信息,決定分布式事務是提交還是回滾
TC通知所有RM提交/回滾資源,事務二階段結束。
分布式事務產生的背景
- 如果是在傳統項目中,使用同一個數據源,在數據用同用一個事務管理器的情況下,不存在分別事務事務問題,因為有事務的傳播行為幫助我們實現。每個數據源都自己獨立的事務事務管理,每個數據源中的事務管理都互不影響。
- 2. 如果是在單體項目中, 存在多個不同的數據源,每個事務源都有自己獨立的事務管理器,每個事務管理器互不影響,也會存在分布式事務的問題。Jta+atominc 將每個獨立的事務管理器統一交給我們的atominc全局事務管理。
- 3. 在分布式系統中采用rpc遠程通訊也會存在分布式事務問題。
分布式rpc通訊中為什么會存在分布式事務?
消費者(調用方)調用完接口成功之后后,調用方突然拋出異常。。
Rpc通訊中產生的分布式事務的問題原因
1.調用方(訂單服務)調用完rpc接口之后,突然程序拋出異常,調用方的事務回滾了,但是被調用方接口沒有回滾。
訂單服務回滾了,派單成功,在每個jvm中都有自己的本地事務,每個事務都互不影響。
2.被調用方(派單服務)的接口失敗的話,調用方可以根據返回的結果,手動回滾調用方本地事務。
解決分布式事務的最大核心是什么?
最終一致性 在分布式系統中, 因為rpc通訊是需要時間的,短暫的數據一致這是允許的,但是最終數據一定要保持一致性;
Base理論和CAP理論
CAP總結:三者無法兼顧,在分布式系統當中可以容忍網絡之間出現的通訊故障;
要么是CP或者AP
CP:當你網絡出現故障之后,只能保證數據一致性,但是不能保證可用性; zk
AP:當你網絡出現故障之后,不能保證數據一致性,但是能夠保證可用性 eureka
在分布式系統中,可能存在強一致性的問題。
注意:在分布式系統中無法保證強一致性,因為數據短暫不一致這是運行的,但是最終數據一定要保證一致性的問題。
AT模式如何做到對業務的無侵入?
測試:》》》》》》》》》》
<!--seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <artifactId>seata-all</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>0.9.0</version> </dependency>
yml
server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: tx-service-group: fsp_tx_group #自定義事務組名稱需要與seata-server中的對應 nacos: discovery: server-addr: localhost:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.146.177:3306/seata_order username: root password: Lvym777@ feign: hystrix: enabled: true logging: level: io: seata: info mybatis: mapperLocations: classpath:mapper/*.xml
file.conf
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { vgroup_mapping.fsp_tx_group = "default" #修改自定義事務組名稱 default.grouplist = "127.0.0.1:8091" enableDegrade = false disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" disableGlobalTransaction = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } ## transaction log store store { ## store mode: file、db mode = "db" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://192.168.146.177:3306/seata" user = "root" password = "Lvym777@" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } }
registry.conf
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" nacos { serverAddr = "localhost:8848" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "localhost" namespace = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
數據源:
package com.lvym.alibaba.config; import com.alibaba.druid.pool.DruidDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.transaction.SpringManagedTransactionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapperLocations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); } @Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } }
package com.lvym.alibaba.config; import org.mybatis.spring.annotation.MapperScan; import org.springframework.context.annotation.Configuration; @Configuration @MapperScan({"com.lvym.alibaba.dao"}) public class MyBatisConfig { }
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) @EnableDiscoveryClient @EnableFeignClients public class AlibabaSeataOrder2001 { public static void main(String[] args) { SpringApplication.run(AlibabaSeataOrder2001.class,args); } }
@Override @GlobalTransactional //如果失敗的話,可以在失敗方加入本地事務注解 public void create(Order order) { log.info("創建訂單"); orderDao.create(order); //扣減庫存 log.info("----->訂單微服務開始調用庫存,做扣減Count"); storageService.decrease(order.getProductId(),order.getCount()); log.info("----->訂單微服務開始調用庫存,做扣減end"); //扣減賬戶 log.info("----->訂單微服務開始調用賬戶,做扣減Money"); accountService.decrease(order.getUserId(),order.getMoney()); log.info("----->訂單微服務開始調用賬戶,做扣減end"); //修改訂單狀態,從零到1代表已經完成 log.info("----->修改訂單狀態開始"); orderDao.update(order.getUserId(),0); log.info("----->修改訂單狀態結束"); log.info("----->下訂單結束了"); }
SpringCloud Sleuth分布式請求鏈路追蹤
在大型系統的微服務化構建中,一個系統被拆分成了許多模塊。這些模塊負責不同的功能,組合成系統,最終可以提供豐富的功能。在這種架構中,一次請求往往需要涉及到多個服務。互聯網應用構建在不同的軟件模塊集上,這些軟件模塊,有可能是由不同的團隊開發、可能使用不同的編程語言來實現、有可能布在了幾千台服務器,橫跨多個不同的數據中心,也就意味着這種架構形式也會存在一些問題:如何快速發現問題?如何判斷故障影響范圍?如何梳理服務依賴以及依賴的合理性?如何分析鏈路性能問題以及實時容量規划
分布式鏈路追蹤(Distributed Tracing),就是將一次分布式請求還原成調用鏈路,進行日志記錄,性能監控並將一次分布式請求的調用情況集中展示。比如各個服務節點上的耗時、請求具體到達哪台機器上、每個服務節點的請求狀態等等。
常見的鏈路追蹤技術有下面這些:
cat由大眾點評開源,基於Java開發的實時應用監控平台,包括實時應用監控,業務監控。集成方案是通過代碼埋點的方式來實現監控,比如:攔截器,過濾器等。對代碼的侵入性很大,集成成本較高。風險較大。
zipkin由Twitter公司開源,開放源代碼分布式的跟蹤系統,用於收集服務的定時數據,以解決微服務架構中的延遲問題,包括:數據的收集、存儲、查找和展現。該產品結合spring-cloud-sleuth使用較為簡單,集成很方便,但是功能較簡單。
Pinpoint是韓國人開源的基於字節碼注入的調用鏈分析,以及應用監控分析工具。特點是支持多種插件,UI功能強大,接入端無代碼侵入。
SkyWalking是本土開源的基於字節碼注入的調用鏈分析,以及應用監控分析工具。特點是支持多種插件,UI功能較強,接入端無代碼侵入。目前已加入Apache孵化器。
Sleuth SleuthSpringCloud 提供的分布式系統中鏈路追蹤解決方案。
注意:SpringCloud alibaba技術棧中並沒有提供自己的鏈路追蹤技術的,我們可以采用Sleuth +Zinkin來做鏈路追蹤解決方案
Sleuth介紹
SpringCloud Sleuth主要功能就是在分布式系統中提供追蹤解決方案。它大量借用了GoogleDapper的設計,先來了解一下Sleuth中的術語和相關概念。
Trace 由一組Trace Id相同的Span串聯形成一個樹狀結構。為了實現請求跟蹤,當請求到達分布式系統的入口端點時,只需要服務跟蹤框架為該請求創建一個唯一的標識(即TraceId),同時在分布式系統內部流轉的時候,框架始終保持傳遞該唯一值,直到整個請求的返回。那么我們就可以使用該唯一標識將所有的請求串聯起來,形成一條完整的請求鏈路。
Span 代表了一組基本的工作單元。為了統計各處理單元的延遲,當請求到達各個服務組件的時候,也通過一個唯一標識(SpanId)來標記它的開始、具體過程和結束。通過SpanId的開始和結束時間戳,就能統計該span的調用時間,除此之外,我們還可以獲取如事件的名稱。請求信息等元數據。
Annotation 用它記錄一段時間內的事件,內部使用的重要注釋:
cs(Client Send)客戶端發出請求,開始一個請求的生命
sr(Server Received)服務端接受到請求開始進行處理, sr-cs = 網絡延遲(服務調用的時間)
ss(Server Send)服務端處理完畢准備發送到客戶端,ss - sr = 服務器上的請求處理時間
cr(Client Reveived)客戶端接受到服務端的響應,請求結束。 cr - sr = 請求的總時間
測試:
<!--鏈路追蹤Sleuth-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
加入依賴啟動,之后控制台就會發生變化。
.. INFO [gateway-service,c3a459362b832a82,c3a459362b832a82,false] ..
.. INFO [gateway-service,c3a459362b832a82,c3a459362b832a82,false] ..
.. INFO [member-service,,,] ..
.. INFO [member-service,,,] ..
其中c3a459362b832a82是TraceId,c3a459362b832a82是SpanId,依次調用有一個全局的TraceId,將調用鏈路串起來。仔細分析每個微服務的日志,不難看出請求的具體過程。
查看日志文件並不是一個很好的方法,當微服務越來越多日志文件也會越來越多,通過Zipkin可以將日志聚合,並進行可視化展示和全文檢索。
所以需要zipkin+spring-cloud-sleuth更方便查看。。。
ZipKin介紹
Zipkin 是 Twitter 的一個開源項目,它基於Google Dapper實現,它致力於收集服務的定時數據,以解決微服務架構中的延遲問題,包括數據的收集、存儲、查找和展現。
我們可以使用它來收集各個服務器上請求鏈路的跟蹤數據,並通過它提供的REST API接口來輔助我們查詢跟蹤數據以實現對分布式系統的監控程序,從而及時地發現系統中出現的延遲升高問題並找出系統性能瓶頸的根源。
除了面向開發的 API 接口之外,它也提供了方便的UI組件來幫助我們直觀的搜索跟蹤信息和分析請求鏈路明細,比如:可以查詢某段時間內各用戶請求的處理時間等。
Zipkin 提供了可插拔數據存儲方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。
上圖展示了 Zipkin 的基礎架構,
它主要由 4 個核心組件構成:
Collector:收集器組件,它主要用於處理從外部系統發送過來的跟蹤信息,將這些信息轉換為Zipkin內部處理的 Span 格式,以支持后續的存儲、分析、展示等功能。
Storage:存儲組件,它主要對處理收集器接收到的跟蹤信息,默認會將這些信息存儲在內存中,我們也可以修改此存儲策略,通過使用其他存儲組件將跟蹤信息存儲到數據庫中。RESTful API:API 組件,它主要用來提供外部訪問接口。比如給客戶端展示跟蹤信息,或是外接系統訪問以實現監控等。
Web UI:UI 組件,基於API組件實現的上層應用。通過UI組件用戶可以方便而有直觀地查詢和分析跟蹤信息。
Zipkin分為兩端,一個是 Zipkin服務端,一個是 Zipkin客戶端,客戶端也就是微服務的應用。客戶端會配置服務端的 URL 地址,一旦發生服務間的調用的時候,會被配置在微服務里面的 Sleuth 的監聽器監聽,並生成相應的 Trace 和 Span 信息發送給服務端。
整合Zipkin+Sleuth
下載jar並運行 運行命令: java -jar zipkin-server-2.12.9-exec.jar
客戶端:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>
yml
spring: zipkin: base-url: http://127.0.0.1:9411/ #zipkin server的請求地址 discoveryClientEnabled: false #讓n acos把它當成一個URL,而不要當做服務名 sleuth: sampler: probability: 1.0 #采樣的百分比
正常訪問
默認Zipkin訪問地址:http://localhost:9411/
MySQL持久化:建庫建表
CREATE TABLE IF NOT EXISTS zipkin_spans ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL, `id` BIGINT NOT NULL, `name` VARCHAR(255) NOT NULL, `remote_service_name` VARCHAR(255), `parent_id` BIGINT, `debug` BIT(1), `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL', `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query', PRIMARY KEY (`trace_id_high`, `trace_id`, `id`) ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds'; ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames'; ALTER TABLE zipkin_spans ADD INDEX(`remote_service_name`) COMMENT 'for getTraces and getRemoteServiceNames'; ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range'; CREATE TABLE IF NOT EXISTS zipkin_annotations ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id', `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id', `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1', `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB', `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation', `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp', `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address', `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null' ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate'; ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans'; ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds'; ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames'; ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values'; ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values'; ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job'; CREATE TABLE IF NOT EXISTS zipkin_dependencies ( `day` DATE NOT NULL, `parent` VARCHAR(255) NOT NULL, `child` VARCHAR(255) NOT NULL, `call_count` BIGINT, `error_count` BIGINT, PRIMARY KEY (`day`, `parent`, `child`) ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
啟動Zipkin服務指定MySQL
java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=mysql --MYSQL_HOST=127.0.0.1 --MYSQL_TCP_PORT=3306 --MYSQL_DB=zipkin --MYSQL_USER=root --MYSQL_PASS=root
SpringCloud Stream消息驅動 屏蔽底層消息中間件的差異,降低切換版本,統一消息的編程模型
Message 生產者/消費者之間靠消息媒介傳遞信息內容
消息通道MessageChannel 消息必須走特定的通道
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器訂閱
stream為什么可以統一底層差異
Binder
Stream中的消息通信方式遵循了發布-訂閱模式
在RabbitMQ就是Exchange
在kafka中就是Topic
Spring Cloud Stream標准流程套路
測試:》》》》》》》》》》》》》》》》》
生產者:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
yml
server: port: 8801 spring: application: name: stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka注冊的配置,也可用nacos service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: send-8801 # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
package com.lvym.springcloud.service; public interface IRabbitmqService { String send(); }
package com.lvym.springcloud.service.impl; import cn.hutool.core.util.IdUtil; import com.lvym.springcloud.service.IRabbitmqService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; @EnableBinding(Source.class) public class RabbitmqServiceImpl implements IRabbitmqService { @Autowired private MessageChannel output; @Override public String send() { String uuid = IdUtil.simpleUUID(); output.send(MessageBuilder.withPayload(uuid).build()); System.out.println("*****uuid: "+uuid); return null; } }
package com.lvym.springcloud.controller; import com.lvym.springcloud.service.IRabbitmqService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitmqController { @Autowired private IRabbitmqService iRabbitmqService; @GetMapping(value = "/sendMessage") public String sendMessage(){ return iRabbitmqService.send(); } }
消費者:
yml
server: port: 8802 spring: application: name: stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 group: lvymA #微服務應用放置於同一個group中,就能夠保證消息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關系,只有其中一個可以消費。 與 Spring Cloud Stream 的opinionated 應用模型一致,消費者組訂閱是持久的。也就是說, binder 實現確保組訂閱是持久的,一旦一個組中創建了一個訂閱,就算這個組里邊的所有應用都掛掉了,這個組也會受到消息。 #匿名訂閱生來就是不持久的。在一些 binder 實現中(例如:RabbitMQ),存在不持久的組訂閱是有可能的。 #通常來說,當綁定一個應用到給定的 destination 時,最好是指定一個消費者組。在擴展 Spring Cloud Stream 應用的時候,你必須隊每個輸入綁定指定你一個消費者組。這將保護應用實例不會接收到重復信息(除非你的確想要這么做)。 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: receive-8802 # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
package com.lvym.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("消費者1號,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort); } }
Rocketmq--消息驅動
MQ(Message Queue)是一種跨進程的通信機制,用於傳遞消息。通俗點說,就是一個先進先出的數據結構。
場景主要包含以下3個方面
-
應用解耦
系統的耦合性越高,容錯性就越低。以電商應用為例,用戶創建訂單后,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為升級等原因暫時不可用,都會造成下單操作異常,影響用戶使用體驗。
異步解耦是消息隊列 MQ 的主要特點,主要目的是減少請求響應時間和解耦。主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。同時,由於使用了消息隊列MQ,只要保證消息格式不變,消息的發送方和接收方並不需要彼此聯系,也不需要受對方的影響,即解耦合。
使用消息隊列解耦合,系統的耦合性就會降低。比如物流系統發生故障,需要幾分鍾才能來修復,在這段時間內,物流系統要處理的數據被緩存到消息隊列中,用戶的下單操作正常完成。當物流系統回復后,補充處理存在消息隊列中的訂單消息即可,終端系統感知不到物流系統發生過幾分鍾故障。
-
流量削峰
應用系統如果遇到系統請求流量的瞬間猛增,有可能會將系統壓垮。有了消息隊列可以將大量請求緩存起來,分散到很長一段時間處理,這樣可以大大提到系統的穩定性和用戶體驗。
一般情況,為了保證系統的穩定性,如果系統負載超過閾值,就會阻止用戶請求,這會影響用戶體驗,而如果使用消息隊列將請求緩存起來,等待系統處理完畢后通知用戶下單完畢,這樣總不能下單體驗要好。
處於經濟考量:
業務系統正常時段的QPS如果是1000,流量最高峰是10000,為了應對流量高峰配置高性能的服務器顯然不划算,這時可以使用消息隊列對峰值流量削峰
-
數據分發
通過消息隊列可以讓數據在多個系統更加之間進行流通。數據的產生方不需要關心誰來使用數據,只需要將數據發送到消息隊列,數據使用方直接在消息隊列中直接獲取數據即可
MQ的優點和缺點
優點:解耦、削峰、數據分發
缺點包含以下幾點:
-
系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦MQ宕機,就會對業務造成影響。
如何保證MQ的高可用?
-
系統復雜度提高
MQ的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過MQ進行異步調用。
如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
-
一致性問題
如何保證消息數據處理的一致性?
各種MQ產品的比較
常見的MQ產品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。
安裝啟動MQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
解壓,進入bin
修改runbroker.sh和runserver.sh修改默認JVM大小,修改成適當大小。。。
# JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
# JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1.啟動NameServer
# 1.啟動NameServer nohup sh bin/mqnamesrv & # 2.查看啟動日志 tail -f ~/logs/rocketmqlogs/namesrv.log
2.啟動Broker
# 1.啟動Broker nohup sh bin/mqbroker -n localhost:9876 & # 2.查看啟動日志 tail -f ~/logs/rocketmqlogs/broker.log
關閉
# 1.關閉NameServer sh bin/mqshutdown namesrv # 2.關閉Broker sh bin/mqshutdown broker
測試》》》》》》》》》》》》》
spring boot整合
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
rocketmq: name-server: 192.168.146.200:9876;192.168.146.201:9876 producer: group: group
非spring boot整合
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
發送消息
消息發送步驟:
1. 創建消息生產者, 指定生產者所屬的組名
2. 指定Nameserver地址
3. 啟動生產者
4. 創建消息對象,指定主題、標簽和消息體
5. 發送消息
6. 關閉生產者
發送同步消息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
API 請看:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
spring boot
@Autowired private RocketMQTemplate rocketMQTemplate; @Test public void test(){ Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); //topic 信息 過期時間ms 延遲級別優先 rocketMQTemplate.syncSend("springboot-mq",message,1000,3); //還有其他的api rocketMQTemplate.syncSend("springboot-mq",1,1000); }
發送異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
spring boot
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); //topic 信息 過期時間 延遲級別優先ms rocketMQTemplate.asyncSend("springboot-mq", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("接收異步返回結果的回調"+sendResult.getMsgId()); } @Override public void onException(Throwable e) { log.info("發生異常>>>>"+e); e.printStackTrace(); } }, 1000, 3); //不讓程序那么快停止 System.in.read();
單向發送消息
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); rocketMQTemplate.sendOneWay("springboot-mq", message); rocketMQTemplate.sendOneWay("springboot-mq", 123);
發送方式 | 發送 TPS | 發送結果反饋 | 可靠性 |
---|---|---|---|
同步發送 | 快 | 有 | 不丟失 |
異步發送 | 快 | 有 | 不丟失 |
單向發送 | 最快 | 無 | 可能丟失 |
順序消息 默認有四個queue
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
同步異步單向類似
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); //topic 信息 標識(同一個標識分在queue) 過期時間 rocketMQTemplate.syncSendOrderly("springboot-mq", message,"shun",1000);
延時消息
現在RocketMq並不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應着等級1到18: "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); //topic 信息 過期時間ms 延遲級別優先(從1開始) rocketMQTemplate.syncSend("springboot-mq",message,1000,3);
convertAndSend
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); //同步消息 rocketMQTemplate.convertAndSend("springboot-mq", message); rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
批量消息
批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。
就是將消息放入List中
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); List<GenericMessage> messages = new ArrayList<>(); messages.add(message); messages.add(message); //批量消息 rocketMQTemplate.convertAndSend("springboot-mq", messages);
以上的消費者差不多:
接收消息
消息接收步驟:
1. 創建消息消費者, 指定消費者所屬的組名
2. 指定Nameserver地址
3. 指定消費者訂閱的主題和標簽
4. 設置回調函數,編寫處理消息的方法
5. 啟動消息消費者
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "group") @Component @Slf4j public class RocketmqListen implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { log.info("消費者>>>>>"+new String(message.getBody())); } }
過濾消息
在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:
依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
生產:getProducer下生產,api有的這里也有,像設置延遲級別,額外的值等,但SQL過濾 (selectorType = SelectorType.SQL92,selectorExpression = "")
Message msg = new Message("TopicTest","tag",("Hello RocketMQ ").getBytes());
//延遲級別
// msg.setDelayTimeLevel(3);
// 設置一些屬性
msg.putUserProperty("a",String.valueOf(2));
rocketMQTemplate.getProducer().send(msg);
消費:
依賴 2.1.0之前的有bug
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
rocketmq: name-server: 192.168.146.200:9876;192.168.146.201:9876 consumer: group: group1 rocketmqs: selectorExpression: tag
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "group",selectorExpression = "${rocketmqs.selectorExpression}") @Component @Slf4j //生產什么類型的數據就用什么類型的接收 public class RocketmqListen implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { log.info("消費者>>>>>"+new String(message.getBody())); } }
事務消息
RocketMQ提供了事務消息,通過事務消息就能達到分布式事務的最終一致
流程分析
上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。
兩個概念:
半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了RocketMQ服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半事務消息。
消息回查:由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,RocketMQ服務端通過掃描發現某條消息長期處於“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該詢問過程即消息回查。
事務消息發送步驟:
1. 發送方將半事務消息發送至RocketMQ服務端。
2. RocketMQ服務端將消息持久化之后,向發送方返回Ack確認消息已經發送成功,此時消息為半事務消息。
3. 發送方開始執行本地事務邏輯。
4. 發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。事務消息回查步驟:
1. 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。
2. 發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
3. 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟
4對半事務消息進行操作。
1)事務消息發送及提交
(1) 發送消息(half消息)。
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
2)事務補償
(5) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”
(6) Producer收到回查消息,檢查回查消息對應的本地事務的狀態
(7) 根據本地事務狀態,重新Commit或者Rollback
其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。
3)事務消息狀態
事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:
-
TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
-
TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
-
TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。
4.6.1 發送事務消息
依賴還是以2.0.3,2.1.0有不同,但差別不大
Order order = new Order(); order.setId(1); order.setName("helloWorld"); GenericMessage<String> message = new GenericMessage(order); String txId = UUID.randomUUID().toString(); //1.發送半事務消息 發送唯一標識,防止冪等性,回查根據txId查 rocketMQTemplate.sendMessageInTransaction("tx_producer_group","tx_topic",MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order);
import com.lvym.rocketmq.entity.TxLog; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener { @Autowired private OrderServiceImpl4 orderServiceImpl4; @Autowired private TxLogDao txLogDao; //執行本地事物 @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String txId = (String) msg.getHeaders().get("txId"); try { //3.本地事物 第二步不可見 Order order = (Order) arg; orderServiceImpl4.createOrder(txId,order); //4. return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) {
//4. return RocketMQLocalTransactionState.ROLLBACK; } } //5.消息回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String txId = (String) msg.getHeaders().get("txId"); //6.
TxLog txLog = txLogDao.findById(txId).get(); if (txLog != null){ //本地事物(訂單)成功了 7. return RocketMQLocalTransactionState.COMMIT; }else {
//7. return RocketMQLocalTransactionState.ROLLBACK; } } }
//3.本地事物 @Transactional public void createOrder(String txId, Order order) { //本地事物代碼 //保存訂單 orderDao.save(order); //記錄日志到數據庫,回查使用 TxLog txLog = new TxLog(); txLog.setTxId(txId); txLog.setDate(new Date()); //記錄事物日志 txLogDao.save(txLog); }
使用限制
-
事務消息不支持延時消息和批量消息。
-
為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的
-
事務消息將在 Broker 配置文件中的參數 transactionMsgTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於
transactionMsgTimeout
參數。 -
事務性消息可能不止一次被檢查或消費。
-
提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
-
事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。
消息存儲
分布式隊列因為有高可靠性的要求,所以數據要進行持久化存儲。
-
-
MQ收到消息,將消息進行持久化,在存儲中新增一條記錄
-
返回ACK給生產者
-
MQ push 消息給對應的消費者,然后等待消費者返回ACK
-
如果消息消費者在指定時間內成功返回ack,那么MQ認為消息消費成功,在存儲中刪除消息,即執行第6步;如果MQ在指定時間內沒有收到ACK,則認為消息消費失敗,會嘗試重新push消息,重復執行4、5、6步驟
-
-
關系型數據庫DB
Apache下開源的另外一款MQ—ActiveMQ(默認采用的KahaDB做消息存儲)可選用JDBC的方式來做消息持久化,通過簡單的xml配置信息即可實現JDBC消息存儲。由於,普通關系型數據庫(如Mysql)在單表數據量達到千萬級別的情況下,其IO讀寫性能往往會出現瓶頸。在可靠性方面,該種方案非常依賴DB,如果一旦DB出現故障,則MQ的消息就無法落盤存儲會導致線上故障。
-
文件系統
目前業界較為常用的幾款產品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤至所部署虛擬機/物理機的文件系統來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲提供了一種高效率、高可靠性和高性能的數據持久化方式。除非部署MQ機器本身或是本地磁盤掛了,否則一般是不會出現無法持久化的故障問題。
性能對比 :文件系統>關系型數據庫DB
消息的存儲和發送
1)消息存儲
磁盤如果使用得當,磁盤的速度完全可以匹配上網絡 的數據傳輸速度。目前的高性能磁盤,順序寫速度可以達到600MB/s, 超過了一般網卡的傳輸速度。但是磁盤隨機寫的速度只有大概100KB/s,和順序寫的性能相差6000倍!因為有如此巨大的速度差別,好的消息隊列系統會比普通的消息隊列系統速度快多個數量級。RocketMQ的消息用順序寫,保證了消息存儲的速度。
Linux操作系統分為【用戶態】和【內核態】,文件操作、網絡操作需要涉及這兩種形態的切換,免不了進行數據復制。
一台服務器 把本機磁盤文件的內容發送到客戶端,一般分為兩個步驟:
1)read;讀取本地文件內容;
2)write;將讀取的內容通過網絡發送出去。
這兩個看似簡單的操作,實際進行了4 次數據復制,分別是:
-
從磁盤復制數據到內核態內存;
-
從內核態內存復 制到用戶態內存;
-
然后從用戶態 內存復制到網絡驅動的內核態內存;
-
最后是從網絡驅動的內核態內存復 制到網卡中進行傳輸。
RocketMQ充分利用了上述特性,也就是所謂的“零拷貝”技術,提高消息存盤和網絡發送的速度。
這里需要注意的是,采用MappedByteBuffer這種內存映射的方式有幾個限制,其中之一是一次只能映射1.5~2G 的文件至用戶態的虛擬內存,這也是為何RocketMQ默認設置單個CommitLog日志數據文件為1G的原因了
消息存儲結構
-
-
ConsumerQueue:存儲消息在CommitLog的索引
-
刷盤機制
RocketMQ的消息是存儲到磁盤上的,這樣既能保證斷電后恢復, 又可以讓存儲的消息量超出內存的限制。RocketMQ為了提高性能,會盡可能地保證磁盤的順序寫。消息在通過Producer寫入RocketMQ的時 候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。
同步刷盤
2)異步刷盤
在返回寫成功狀態時,消息可能只是被寫入了內存的PAGECACHE,寫操作的返回快,吞吐量大;當內存里的消息量積累到一定程度時,統一觸發寫磁盤動作,快速寫入。但可能會有丟失數據的風險
配置
#刷盤方式
#- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=SYNC_FLUSH
RocketMQ分布式集群是通過Master和Slave的配合達到高可用性的。
Master和Slave的區別:在Broker的配置文件中,參數 brokerId的值為0表明這個Broker是Master,大於0表明這個Broker是 Slave,同時brokerRole參數也會說明這個Broker是Master還是Slave。
#Broker 的角色
#- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SLAVE
#Broker 的角色
#- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SYNC_MASTER
Master角色的Broker支持讀和寫,Slave角色的Broker僅支持讀,也就是 Producer只能和Master角色的Broker連接寫入消息;Consumer可以連接 Master角色的Broker,也可以連接Slave角色的Broker來讀取消息。
消息消費高可用
在Consumer的配置文件中,並不需要設置是從Master讀還是從Slave 讀,當Master不可用或者繁忙的時候,Consumer會被自動切換到從Slave 讀。有了自動切換Consumer這種機制,當一個Master角色的機器出現故障后,Consumer仍然可以從Slave讀取消息,不影響Consumer程序。這就達到了消費端的高可用性。
消息發送高可用
消息主從復制
如果一個Broker組有Master和Slave,消息需要從Master復制到Slave 上,有同步和異步兩種復制方式。
1)同步復制
同步復制方式是等Master和Slave均寫 成功后才反饋給客戶端寫成功狀態;
在同步復制方式下,如果Master出故障, Slave上有全部的備份數據,容易恢復,但是同步復制會增大數據寫入 延遲,降低系統吞吐量。
2)異步復制
異步復制方式是只要Master寫成功 即可反饋給客戶端寫成功狀態。
在異步復制方式下,系統擁有較低的延遲和較高的吞吐量,但是如果Master出了故障,有些數據因為沒有被寫 入Slave,有可能會丟失;
3)配置
同步復制和異步復制是通過Broker配置文件里的brokerRole參數進行設置的,這個參數可以被設置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三個值中的一個。
#Broker 的角色
#- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SYNC_MASTER
#Broker 的角色
#- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SLAVE
實際應用中要結合業務場景,合理設置刷盤方式和主從復制方式, 尤其是SYNC_FLUSH方式,由於頻繁地觸發磁盤寫動作,會明顯降低 性能。通常情況下,應該把Master和Save配置成ASYNC_FLUSH的刷盤 方式,主從之間配置成SYNC_MASTER的復制方式,這樣即使有一台 機器出故障,仍然能保證數據不丟,是個不錯的選擇。
RocketMQ集群搭建
各角色介紹
-
Producer:消息的發送者;舉例:發信者
-
Consumer:消息接收者;舉例:收信者
-
Broker:暫存和傳輸消息;舉例:郵局
-
NameServer:管理Broker;舉例:各個郵局的管理機構
-
Topic:區分消息的種類;一個發送者可以發送消息給一個或者多個Topic;一個消息的接收者可以訂閱一個或者多個Topic消息
-
Message Queue:相當於是Topic的分區;用於並行發送和接收消息
集群特點
-
NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
-
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。
-
Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
-
Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
集群模式
1)單Master模式
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。
2)多Master模式
一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
-
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
-
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
3)多Master多Slave模式(異步)
每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
-
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
-
缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
4)多Master多Slave模式(同步)
每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
-
優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
-
缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
雙主雙從集群搭建---同步,兩台機器,主1和從2在同一台機器(200),主2和從1在同一台機器(201)
環境:java8起步,關閉防火牆或開放端口 (RocketMQ默認使用3個端口:9876 、10911 、11011 )* `nameserver` 默認使用 9876 端口* `master` 默認使用 10911 端口* `slave` 默認使用11011 端口
1.修改hosts文件(可選)
# nameserver
192.168.146.200 rocketmq-nameserver1 192.168.146.201 rocketmq-nameserver2 # broker 192.168.146.200 rocketmq-master1 192.168.146.200 rocketmq-slave2 192.168.146.201 rocketmq-master2 192.168.146.201 rocketmq-slave1
配置完成后, 重啟網卡 systemctl restart network
2. 環境變量配置(可選),方便在任何地方都能輸入命令
vim /etc/profile
在profile文件的末尾加入如下命令
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH
使得配置立刻生效:source /etc/profile
3.創建消息存儲路徑
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
mkdir /usr/local/rocketmq/store2
mkdir /usr/local/rocketmq/store2/commitlog
mkdir /usr/local/rocketmq/store2/consumequeue
mkdir /usr/local/rocketmq/store2/index
4.broker配置文件
Master1(200)
vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties
brokerIP1 | 網卡的 InetAddress | 當前 broker 監聽的 IP |
brokerIP2 | 跟 brokerIP1 一樣 | 存在主從 broker 時,如果在 broker 主節點上配置了 brokerIP2 屬性,broker 從節點會連接主節點配置的 brokerIP2 進行同步 |
#所屬集群名字
brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0
#當前 broker 監聽的 IP
brokerIP1=192.168.146.200
brokerIP2=192.168.146.201 #nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
Slave2 broker-b-s.properties (200)
# limitations under the License.
#所屬集群名字
brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=1
#當前 broker 監聽的 IP
brokerIP2=192.168.146.201 #nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=11011 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store2 #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store2/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store2/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store2/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store2/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SLAVE #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
Master2(201)broker-b.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=0
#當前 broker 監聽的 IP
brokerIP1=192.168.146.201
brokerIP2=192.168.146.200 #nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
Slave1(201)broker-a-s.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1
#當前 broker 監聽的 IP
brokerIP1=192.168.146.200 #nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=11011 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store2 #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store2/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store2/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store2/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store2/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=SLAVE #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
5.修改啟動腳本文件
runbroker.sh
vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh
需要根據內存大小進行適當的對JVM參數進行調整:
# JVM Configuration
#=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
runserver.sh
vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runserver.sh
# JVM Configuration
#=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
6.服務啟動 啟動過程中最好不要出錯,
先啟動NameServe集群
nohup sh mqnamesrv &
啟動Broker集群
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
201
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
啟動后通過JPS查看啟動進程
[root@rocketmqM bin]# jps
2917 NamesrvStartup 2949 BrokerStartup 3046 BrokerStartup 3126 Jps
[root@rocketmq bin]# jps
2866 NamesrvStartup 3011 BrokerStartup 3348 Jps 2927 BrokerStartup
負載均衡
Producer負載均衡
Producer端,每個實例在發消息的時候,默認會輪詢所有的message queue發送,以達到讓消息平均落在不同的queue上。而由於queue可以散落在不同的broker,所以消息就發送到不同的broker下,如下圖
Consumer負載均衡
1)集群模式
在集群消費模式下,每條消息只需要投遞到訂閱這個topic的Consumer Group下的一個實例即可。RocketMQ采用主動拉取的方式拉取並消費消息,在拉取的時候需要明確指定拉取哪一條message queue。
而每當實例的數量有變更,都會觸發一次所有實例的負載均衡,這時候會按照queue的數量和實例的數量平均分配queue給每個實例。
默認的分配算法是AllocateMessageQueueAveragely
另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條queue,只是以環狀輪流分queue的形式,
需要注意的是,集群模式下,queue都是只允許分配只一個實例,這是由於如果多個實例同時消費一個queue的消息,由於拉取哪些消息是consumer主動控制的,那樣會導致同一個消息在不同的實例下被消費多次,所以算法上都是一個queue只分給一個consumer實例,一個consumer實例可以允許同時分到不同的queue。
通過增加consumer實例去分攤queue的消費,可以起到水平擴展的消費能力的作用。而有實例下線的時候,會重新觸發負載均衡,這時候原來分配到的queue將分配到其他實例上繼續消費。
但是如果consumer實例的數量比message queue的總數量還多的話,多出來的consumer實例將無法分到queue,也就無法消費到消息,也就無法起到分攤負載的作用了。所以需要控制讓queue的總數量大於等於consumer的數量。
廣播模式
由於廣播模式下要求一條消息需要投遞到一個消費組下面所有的消費者實例,所以也就沒有消息被分攤消費的說法。
在實現上,其中一個不同就是在consumer分配queue的時候,所有consumer都分到所有的queue。
消息消費要注意的細節 @RocketMQMessageListener(consumerGroup = "shop",//消費者分組 topic = "order-topic",//要消費的主題 consumeMode = ConsumeMode.CONCURRENTLY, //消費模式:無序和有序 messageModel = Mess ageModel.CLUSTERING, //消息模式:廣播和集群,默認是集群) public class SmsService implements RocketMQListener<Order> {} RocketMQ支持兩種消息模式: 廣播消費: 每個消費者實例都會收到消息,也就是一條消息可以被每個消費者實例處理; 集群消費: 一條消息只能被一個消費者實例消息
消息重試
1.4.1 順序消息的重試
對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,在使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
1.4.2 無序消息的重試
對於無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
1)重試次數
消息隊列 RocketMQ 默認允許每條消息最多重試 16 次,每次重試的間隔時間如下
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鍾 |
2 | 30 秒 | 10 | 8 分鍾 |
3 | 1 分鍾 | 11 | 9 分鍾 |
4 | 2 分鍾 | 12 | 10 分鍾 |
5 | 3 分鍾 | 13 | 20 分鍾 |
6 | 4 分鍾 | 14 | 30 分鍾 |
7 | 5 分鍾 | 15 | 1 小時 |
8 | 6 分鍾 | 16 | 2 小時 |
如果消息重試 16 次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鍾之內進行 16 次重試,超過這個時間范圍消息將不再重試投遞。
注意: 一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。
集群消費方式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):
-
-
返回 Null
-
拋出異常
//方式1:返回 Action.ReconsumeLater,消息將重試 return Action.ReconsumeLater; //方式2:返回 null,消息將重試 return null; //方式3:直接拋出異常, 消息將重試 throw new RuntimeException("88888");
消費失敗后,不重試配置方式
集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage,此后這條消息將不會再重試。
//捕獲消費邏輯中的所有異常,並返回 Action.CommitMessage; return Action.CommitMessage;
自定義消息最大重試次數
消息隊列 RocketMQ 允許 Consumer 啟動的時候設置最大重試次數,重試時間間隔將按照如下策略:
-
最大重試次數小於等於 16 次,則重試時間間隔同上表描述。
-
最大重試次數大於 16 次,超過 16 次的重試時間間隔均為每次 2 小時。
可以配置原生的API,spring boot好像只有自動重試,或者版本問題。。。。。
-
-
如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那么該配置對兩個 Consumer 實例均生效。
-
死信隊列
在消息隊列 RocketMQ 中,這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性
-
不會再被消費者正常消費。
-
有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產生后的 3 天內及時處理。
死信隊列具有以下特性:
-
一個死信隊列對應一個 Group ID, 而不是對應單個消費者實例。
-
如果一個 Group ID 未產生死信消息,消息隊列 RocketMQ 不會為其創建相應的死信隊列。
-
一個死信隊列包含了對應 Group ID 產生的所有死信消息,不論該消息屬於哪個 Topic。
一條消息進入死信隊列,意味着某些因素導致消費者無法正常消費該消息,因此,通常需要您對其進行特殊處理。排查可疑因素並解決問題后,可以在消息隊列 RocketMQ 控制台重新發送該消息,讓消費者重新消費一次。
消費冪等
消息隊列 RocketMQ 消費者在接收到消息以后,有必要根據業務上的唯一 Key 對消息做冪等處理的必要性。
消費冪等的必要性
在互聯網應用中,尤其在網絡不穩定的情況下,消息隊列 RocketMQ 的消息有可能會出現重復,這個重復簡單可以概括為以下情況:
-
發送時消息重復
當一條消息已被成功發送到服務端並完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。 如果此時生產者意識到消息發送失敗並嘗試再次發送消息,消費者后續會收到兩條內容相同並且 Message ID 也相同的消息。
-
投遞時消息重復
消息消費的場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。 為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端將在網絡恢復后再次嘗試投遞之前已被處理過的消息,消費者后續會收到兩條內容相同並且 Message ID 也相同的消息。
-
負載均衡時消息重復(包括但不限於網絡抖動、Broker 重啟以及訂閱方應用重啟)
當消息隊列 RocketMQ 的 Broker 或客戶端重啟、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重復消息。
處理方式
因為 Message ID 有可能出現沖突(重復)的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據。 最好的方式是以業務唯一標識作為冪等處理的關鍵依據,而業務的唯一標識可以通過消息 Key 進行設置:
Message message = new Message("springboot-mq","hello".getBytes()); message.setKeys("uuid1");唯一就好 rocketMQTemplate.getProducer().send(message);
消費:
String key = message.getKeys() // 根據業務唯一標識的 key 做冪等處理
消息軌跡
1. 消息軌跡數據關鍵屬性
Producer端 | Consumer端 | Broker端 |
---|---|---|
生產實例信息 | 消費實例信息 | 消息的Topic |
發送消息時間 | 投遞時間,投遞輪次 | 消息存儲位置 |
消息是否發送成功 | 消息是否消費成功 | 消息的Key值 |
發送耗時 | 消費耗時 | 消息的Tag值 |
2. 支持消息軌跡集群部署
2.1 Broker端配置文件
這里貼出Broker端開啟消息軌跡特性的properties配置文件內容:
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
2.2 普通模式
RocketMQ集群中每一個Broker節點均用於存儲Client端收集並發送過來的消息軌跡數據。因此,對於RocketMQ集群中的Broker節點數量並無要求和限制。
2.3 物理IO隔離模式
對於消息軌跡數據量較大的場景,可以在RocketMQ集群中選擇其中一個Broker節點專用於存儲消息軌跡,使得用戶普通的消息數據與消息軌跡數據的物理IO完全隔離,互不影響。在該模式下,RockeMQ集群中至少有兩個Broker節點,其中一個Broker節點定義為存儲消息軌跡數據的服務端。
2.4 啟動開啟消息軌跡的Broker
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &
3. 保存消息軌跡的Topic定義
RocketMQ的消息軌跡特性支持兩種存儲軌跡數據的方式:
3.1 系統級的TraceTopic
在默認情況下,消息軌跡數據是存儲於系統級的TraceTopic中(其名稱為:RMQ_SYS_TRACE_TOPIC)。該Topic在Broker節點啟動時,會自動創建出來(如上所敘,需要在Broker端的配置文件中將traceTopicEnable的開關變量設置為true)。
3.2 用戶自定義的TraceTopic
如果用戶不准備將消息軌跡的數據存儲於系統級的默認TraceTopic,也可以自己定義並創建用戶級的Topic來保存軌跡(即為創建普通的Topic用於保存消息軌跡數據)。下面一節會介紹Client客戶端的接口如何支持用戶自定義的TraceTopic。
4. 支持消息軌跡的Client客戶端實踐
為了盡可能地減少用戶業務系統使用RocketMQ消息軌跡特性的改造工作量,作者在設計時候采用對原來接口增加一個開關參數(enableMsgTrace)來實現消息軌跡是否開啟;並新增一個自定義參(customizedTraceTopic)數來實現用戶存儲消息軌跡數據至自己創建的用戶級Topic。
4.1 發送消息時開啟消息軌跡
rocketmq: name-server: 192.168.146.200:9876;192.168.146.201:9876 producer: group: group retry-times-when-send-failed: 2 retry-next-server: true enable-msg-trace: true customized-trace-topic:
4.2 訂閱消息時開啟消息軌跡
與發送方差不多
4.3 支持自定義存儲消息軌跡Topic
在上面的發送和訂閱消息時候分別將DefaultMQProducer和DefaultMQPushConsumer實例的初始化修改為如下即可支持自定義存儲消息軌跡Topic。
rocketmq: name-server: 192.168.146.200:9876;192.168.146.201:9876 producer: group: group retry-times-when-send-failed: 2 retry-next-server: true enable-msg-trace: true customized-trace-topic: ttt
docker安裝rocketmq
新建目錄:在相應目錄
mkdir /mydata/rocketmq/broker/{conf,logs,store} mkdir /mydata/rocketmq/namesrv/logs mkdir /mydata/rocketmq/rocketmq-console-ng/logs
拉取鏡像:docker pull rocketmqinc/rocketmq:4.4.0
運行rmqnamesrv服務器:
docker run --name rmqnamesrv --restart=always -d -p 9876:9876 -v /mydata/rocketmq/namesrv/logs:/root/rocketmq/logs -v /mydata/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
安裝 broker 服務器:
在brocker/conf目錄下創建 broker.conf 文件
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1 = 192.168.146.200
啟動服務器:
docker run --name rmqbroker --restart=always -d -p 10911:10911 -p 10909:10909 -v /mydata/rocketmq/broker/logs:/root/rocketmq/logs -v /mydata/rocketmq/broker/store:/root/rocketmq/store -v /mydata/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
安裝 rocketmq 控制台
docker pull styletang/rocketmq-console-ng:1.0.0
啟動控制台:
docker run -d --restart=always --name rocketmq-console-ng -v /mydata/rocketmq/rocketmq-console-ng/logs:/root/logs -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.146.200:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 6080:8080 -t styletang/rocketmq-console-ng:1.0.0
SMS--短信服務
短信服務(Short Message Service)是阿里雲為用戶提供的一種通信服務的能力。
產品優勢:覆蓋全面、高並發處理、消息堆積處理、開發管理簡單、智能監控調度
產品功能:短信通知、短信驗證碼、推廣短信、異步通知、數據統計
應用場景:短信驗證碼、系統信息推送、推廣短信等
短信服務使用
》》》》》》》》》》》》》》》》》》》》》》》》》》》》》
<dependencyManagement> <dependencies> <!--spring boot 2.2.2--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.2.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!--spring cloud Hoxton.SR1--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR1</version> <type>pom</type> <scope>import</scope> </dependency> <!--spring cloud alibaba 2.1.0.RELEASE--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.1.0.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency>
spring: application: name: gateway-service cloud: gateway: discovery: locator: enabled: true #開啟從注冊中心動態創建路由的功能,利用微服務名進行路由 routes: - id: payment_routh #路由的ID,沒有固定規則但要求唯一,建議配合服務名 #uri: http://localhost:8001 #匹配后提供服務的路由地址 uri: lb://lvym-payment-service #動態路由 predicates: - Path=/payment/get/** #斷言,路徑相匹配的進行路由 - id: payment_routh2 #uri: http://localhost:8001 uri: lb://lvym-payment-service #動態路由 predicates: - Path=/payment/lb/** #斷言,路徑相匹配的進行路由 - After=2020-04-13T17:09:09.715+08:00[Asia/Shanghai] # - Cookie=username,lvym
》》》》》》》》》》》》》》》
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> <version>2.0.0.RELEASE</version> </dependency>
spring: zipkin: base-url: http://127.0.0.1:9411/ #zipkin server的請求地址 discoveryClientEnabled: false #讓n acos把它當成一個URL,而不要當做服務名 sleuth: sampler: probability: 1.0 #采樣的百分比 application: name: gateway-service cloud: gateway: discovery: locator: enabled: true #開啟以服務id去注冊中心上獲取轉發地址 routes: - id: baidu uri: http://www.baidu.com/ #轉發http://www.baidu.com/ predicates: - Path=/bd/** #匹配規則 - id: member uri: lb://member-service #轉發http://www.baidu.com/ filters: - StripPrefix=1 #不加會報錯 predicates: - Path=/member/** #匹配規則 #127.0.0.1/bd 轉發到http://www.baidu.com/
》》》》》》》》》》》》》》》》 有兩種依賴,但似乎沒什么不同
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>0.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>