github: https://github.com/ZhangDi-d/SpringCloudSample
本文包含springcloud 服務注冊發現Eureka Consul,服務調用Feign Ribbon,限流熔斷Hystrix,分布式配置中心Config,服務網關Zuul,消息驅動 Stream,服務追蹤Sleuth 等
SpringCloudSample
A simple project of springcloud self-learning.
一.服務注冊與發現
Eureka server
1. 引入依賴
<!--加入的 spring-cloud-starter-eureka-server 會自動引入 spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
2. 注解 @EnableEurekaServer
3. application.yml配置問題
如果euraka是單機部署,可使用以下配置:
server:
port: 8761
eureka:
instance:
hostname: localhost
client:
registerWithEureka: false #是否將自己注冊到Eureka服務器,(因為自己是服務器:false)
fetchRegistry: false #是否到Eureka服務器中拉取注冊信息,(因為自己是服務器:false,這兩項如果不寫,啟動會報錯)
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
spring:
application:
name: eureka-server
如果是集群部署,需要將自己注冊到其他的eurakaserver 上,所以配置為:
(假設三台eureka server 的ip分別為 127.0.0.1 ,127.0.0.2 ,127.0.0.3,端口為 8761,其他兩台同理)
server:
port: 8761
eureka:
instance:
prefer-ip-address: true
instance-id: eureka-127.0.0.1
client:
registerWithEureka: true #是否將自己注冊到Eureka服務器
fetchRegistry: true #是否到Eureka服務器中拉取注冊信息
serviceUrl:
defaultZone: http://127.0.0.2:8761/eureka/,http://127.0.0.3:8761/eureka/
spring:
application:
name: eureka-server
服務提供者 ,以service-hello為例
1. pom.xml配置
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. @EnableEurekaClient 或者 @EnableDiscoveryClient
3. application.yzmdl
server:
port: 8762
spring:
application:
name: service-hello #服務與服務之間相互調用一般都是根據這個name
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
3.2 添加 以下注解屬性的好處是 服務以ip:port展示,
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ipAddress}:${server.port}
未添加:
添加 instance: 注解:
3.3 集群 配置:
server:
port: 8762
spring:
application:
name: service-hello #服務與服務之間相互調用一般都是根據這個name
eureka:
client:
serviceUrl:
defaultZone: http://127.0.0.1:8761/eureka/,http://127.0.0.2:8761/eureka/,http://127.0.0.3:8761/eureka/
instance:
prefer-ip-address: true
instance-id: {spring.cloud.client.ipAddress}:${server.port}
Consul
配合consul 注冊中心使用,consul 下載和使用: https://blog.csdn.net/ShelleyLittlehero/article/details/104391744
二.服務消費的兩種方式
1.RestTemplate+Ribbon
2.Feign去消費服務
Ribbon 客戶端負載均衡
1. pom.xml
<dependencies>
<!--作為服務被euraka發現-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring mvc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--客戶端負載均衡組件依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<!--在ribbon使用斷路器的依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
</dependencies>
2. application.yml
spring:
application:
name: service-ribbon
server:
port: 8764
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
3.注解
@EnableDiscoveryClient -> 通過@EnableDiscoveryClient向服務中心注冊
@EnableHystrix -> 開啟Hystrix
@LoadBalanced ->開啟客戶端負載均衡功能
4. 測試
1.啟動 euraka-server -> EurekaServerApplication;
2.啟動 service-ribbon -> ServiceRibbonApplication;
3.以 8762 端口 啟動 service-hello -> ServiceHelloApplication;
4.以 8763 端口 啟動 service-hello -> ServiceHelloApplication;
5.調用service-ribbon 的 接口 http://localhost:8764/hello?name=zhangsan
, service-ribbon會使用restTemplate調用 service-hello
Spring Cloud Feign 聲明式服務調用
Feign是一個聲明式的偽Http客戶端,它使得寫Http客戶端變得更簡單。使用Feign,只需要創建一個接口並注解。它具有可插拔的注解特性,可使用Feign 注解和JAX-RS注解。Feign支持可插拔的編碼器和解碼器。Feign默認集成了Ribbon,並和Eureka結合,默認實現了負載均衡的效果。
簡而言之:
1.Feign 采用的是基於接口的注解
2.Feign 整合了ribbon,具有負載均衡的能力
3.整合了Hystrix,具有熔斷的能力
1. pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>
2. application.yml
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
server:
port: 8765
spring:
application:
name: service-feign
#Feign是自帶斷路器的,在D版本的Spring Cloud之后,它沒有默認打開
# feign.hystrix.enabled: true 或者下面的寫法
feign:
hystrix:
enabled: true
3.注解
@EnableDiscoveryClient ->作為服務被發現
@EnableFeignClients -> 開啟Feign的功能
@FeignClient(value = “service-hello”,fallback = SchedualServiceHelloHystric.class) ->指定調用哪個服務下的接口,並加上 fallback 容錯
4. 測試
1.啟動 euraka-server -> EurekaServerApplication;
2.啟動 service-feign -> ServiceFeignApplication;
- 啟動 service-hello -> ServiceHelloApplication;
4.調用 http://localhost:8765/hello?name=zhangsan
, 查看是否可以調通service-hello
Feign 文件上傳
在Spring Cloud封裝的Feign中並不直接支持傳文件,但可以通過引入Feign的擴展包來實現
1.service-hello 作為上傳服務的提供方,只需添加上傳文件的接口即可
UploadProviderController.java
2. service-feign 作為服務的消費方,調用service-hello接口到達上傳文件的目的
注意 : pom.xml中要新增feign-form 和feign-form-spring 的依賴,並且他們的版本和Feign的版本一定要對應,不然會報錯
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form-spring</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.3</version>
</dependency>
版本對應問題:
The feign-form extension depend on OpenFeign and its concrete versions:
1. all feign-form releases before 3.5.0 works with OpenFeign 9.* versions;
2. starting from feign-form's version 3.5.0, the module works with OpenFeign 10.1.0 versions and greater.
IMPORTANT: there is no backward compatibility and no any gurantee that the feign-form's versions after 3.5.0work with OpenFeign before 10.*. OpenFeign was refactored in 10th release, so the best approach - use the freshest OpenFeign and feign-form versions.
3. 增加@Configuration 配置
@Configuration
public class FeignSupportConfig {
@Bean
public Encoder feignFormEncoder() {
return new SpringFormEncoder();
}
}
4. 在@FeignClient 注解中指定
@FeignClient(value = "service-hello", fallback = FeignServiceHelloHystric.class,configuration = FeignSupportConfig.class)
5. 啟動eureka-server ,service-hello ,service-feign ,使用postman測試即可
6. 使用idea 自帶的接口測試工具 測試
更多可以參考:https://blog.csdn.net/u012954706/article/details/89383076
POST http://localhost:8765/uploadFile
Accept: */*
Cache-Control: no-cache
Content-Type: multipart/form-data; boundary=WebAppBoundary
--WebAppBoundary
Content-Disposition: form-data; name="file"; filename="D:\back\1.txt"
四.Spring Cloud Config 分布式配置中心
Spring Cloud Config 分布式配置中心 由兩部分組成 config-server 和config-client.
config-server 基於Git倉庫的配置中心
1. config-server
訪問配置信息的URL與配置文件的映射關系如下:
-
/{application}/{profile}[/{label}]
-
/{application}-{profile}.yml
-
/{label}/{application}-{profile}.yml
-
/{application}-{profile}.properties
-
/{label}/{application}-{profile}.properties
2. config-server pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
3. config-server application.yml
https://gitee.com/xuelaiLittleHero/config-repo-demo
是遠程配置文件倉庫的地址.
spring:
application:
name: config-server
cloud:
config:
server:
git:
uri: https://gitee.com/xuelaiLittleHero/config-repo-demo
searchPaths: config
# username:
# password:
server:
port: 8766
4. 配置server 的啟動類 ,並加注解 @EnableConfigServer, 開啟Spring Cloud Config的服務端功能
config-client 使用配置中心的客戶端
1.pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
2. bootstrap.yml (使用配置服務的客戶端,配置文件應為 bootstrap.xml或者 bootstrap.properties ,他的加載早於 application.yml)
spring:
application:
name: config-client
cloud:
config:
uri: http://localhost:8766/
profile: dev
label: master
涉及到使用配置服務的配置要存放於bootstrap.xml或者 bootstrap.properties,這樣才能保證config-server中的配置信息才能被正確加載
3. 測試 ,使用接口 http://localhost:8767/getInfo
測試配置是否能被拿到 .
分布式配置中心(加密解密,以對稱加密為例)
1.下載配置JCE
地址 :http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip
配置到 jdk 目錄中 .
2. 配置 加密 key
config-server bootstrap.yml 配置對稱加密的key
encrypt:
key: ryze
3. 加密
啟動config-server, 訪問 http://localhost:8766/encrypt/status ,顯示狀態OK。
使用 curl 訪問 /encrypt 端點進行加密, 獲得屬性 pa2sW0rd 加密后的值 ,配置在 遠程git 倉庫的配置文件中
C:\Users\張 迪>curl http://localhost:8766/encrypt/ -d pa2sW0rd
9ae2d08f248ab77561cbea8fe88566b7665f8ad65527e7757dcf1cd3bffe1aae
git 倉庫配置文件 config-client-dev.yml
**一定注意 當配置文件是yml格式的時候 ,使用 {cipher}要加單引號,因為yml格式嚴格,不加’'無法解析 **
info:
profile: dev
from: config/dev
secretValue: '{cipher}9ae2d08f248ab77561cbea8fe88566b7665f8ad65527e7757dcf1cd3bffe1aae'
4. 測試
驗證config client 是否可以獲取到正確的加密值:
http://localhost:8767/getInfo
-> 輸出 InfoController getInfo===============>profile=dev,from=config/dev,secretValue=pa2sW0rd
,ok.
spring cloud config 高可用與動態刷新
高可用
config server 的高可用,可以使用集群部署 config server ,讓他們指向同一個 git配置文件庫, 然后使用 負載均衡 ,config client 動態的去 指定 config server.
另一種更為簡單的做法是,將集群部署的config server 也注冊為服務,供eureka 發現. 這樣config client可以 以服務的方式去訪問 config server.
1. config server pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
2. config server application.yml
增加 eureka 地址 :
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
eureka 的配置最好放在最后,不要放在 git 和 spring 之間 ,不要想下面這樣: 這樣 spring會認為git 是配置在 eureka下的,啟動會報錯
spring:
application:
name: config-server
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
cloud:
config:
server:
git:
uri: https://gitee.com/xuelaiLittleHero/config-repo-demo
searchPaths: config
# username:
# password:
server:
port: 8766
3. config server @EnableDiscoveryClient
在啟動類上 增加注解 @EnableDiscoveryClient ->注冊為服務 ,供euraka發現
4. config client pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
5. config client bootstrap.yml
bootstrap.yml 做以下修改 : 注釋 之前config-client 直連 config-server 的配置 ;
增加 config-client 通過 服務注冊與發現 調用 config-server 的配置.
## 注意 : 當 config client 不直接訪問 config server 時 ,這段配置就需要注釋掉了
# cloud:
# config:
# uri: http://localhost:8766/
# profile: dev
# label: master
## 注意 config client 以服務的方式 訪問 config server 時 ,要增加 eureka 的配置 和 config 的相關配置
cloud:
config:
discovery:
enabled: true # 開啟通過服務來訪問Config Server的功能
service-id: config-server # 指定Config Server注冊的服務名
profile: dev # 用於定位Git中的資源
# 指定服務注冊中心,用於服務的注冊與發現
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
配置的自動刷新
config-server 不用做修改,修改主要在config-client中.
config-client pom.xml
增加監控 組件,它包含/refresh 端點:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
bootstrap.yml
注意 但是SpringCloud 2.0.0 我們需要在bootstrap.yml里面加上需要暴露出來的地址 , 刷新地址不是/refresh了,默認是/actuator/refresh
base-path可以自定義路徑->/config/refresh;
management:
endpoints:
web:
exposure:
include: refresh,health
base-path: /config
@RefreshScope
在需要刷新參數的類上加@RefreshScope ,實現自動刷新
測試
使用post請求刷新端口,查看前后 http://localhost:8767/getInfo
是否有值的變化.
POST http://localhost:8767/config/refresh
Accept: */*
Cache-Control: no-cache
五 Hystrix
Hystrix具備了服務降級、服務熔斷、線程隔離、請求緩存、請求合並以及服務監控等強大功能。
Hystrix服務降級
涉及模塊: eureka-server , service-ribbon, service-hello
service-ribbon pom.xml
service-ribbon pom.xml增加hystrix 的依賴
<!--在ribbon使用斷路器的依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
注解
使用 @SpringCloudApplication 或者 @EnableHystrix 或者 @EnableCircuitBreaker 開啟服務降級
注解
在service-ribbon 中調用 service-hello 的方法上加上 @ HystrixCommand,並指定fallbackMethod熔斷方法
@HystrixCommand(fallbackMethod = "helloError") //在ribbon中使用斷路器,該注解對該方法創建了熔斷器的功能,並指定了fallbackMethod熔斷方法
public String helloService(String name) {
return restTemplate.getForObject("http://service-hello/hello?name=" + name, String.class);
}
測試
啟動 eureka-server,service-ribbon ,不啟動 service-hello,訪問 http://localhost:8764/hello?name=zhangsan
,提示 hello,zhangsan,sorry,error!
Hystrix依賴隔離
線程池隔離和信號量隔離
Hystrix斷路器
“斷路器”本身是一種開關裝置,用於在電路上保護線路過載,當線路中有電器發生短路時,“斷路器”能夠及時的切斷故障電路,防止發生過載、發熱、甚至起火等嚴重后果。
在Hystrix服務降級一節中,我們沒有啟動service-hello 服務提供方,導致service-ribbon 觸發了降級邏輯,
但是即使這樣,受限於Hystrix超時時間的問題,我們的調用依然很有可能產生堆積。
這個時候斷路器就會發揮作用,那么斷路器是在什么情況下開始起作用呢?這里涉及到斷路器的三個重要參數:快照時間窗、請求總數下限、錯誤百分比下限。這個參數的作用分別是:
- 快照時間窗:斷路器確定是否打開需要統計一些請求和錯誤數據,而統計的時間范圍就是快照時間窗,默認為最近的10秒。
- 請求總數下限:在快照時間窗內,必須滿足請求總數下限才有資格根據熔斷。默認為20,意味着在10秒內,如果該hystrix命令的調用此時不足20次,即時所有的請求都超時或其他原因失敗,斷路器都不會打開。
- 錯誤百分比下限:當請求總數在快照時間窗內超過了下限,比如發生了30次調用,如果在這30次調用中,有16次發生了超時異常,也就是超過50%的錯誤百分比,在默認設定50%下限情況下,這時候就會將斷路器打開。
那么當斷路器打開之后會發生什么呢?我們先來說說斷路器未打開之前,對於之前那個示例的情況就是每個請求都會在當hystrix超時之后返回fallback,每個請求時間延遲就是近似hystrix的超時時間,如果設置為5秒,那么每個請求就都要延遲5秒才會返回。當熔斷器在10秒內發現請求總數超過20,並且錯誤百分比超過50%,這個時候熔斷器打開。打開之后,再有請求調用的時候,將不會調用主邏輯,而是直接調用降級邏輯,這個時候就不會等待5秒之后才返回fallback。
通過斷路器,實現了自動地發現錯誤並將降級邏輯切換為主邏輯,減少響應延遲的效果。
在斷路器打開之后,處理邏輯並沒有結束,我們的降級邏輯已經被成了主邏輯,那么原來的主邏輯要如何恢復呢?對於這一問題,hystrix也為我們實現了自動恢復功能。當斷路器打開,對主邏輯進行熔斷之后,hystrix會啟動一個休眠時間窗,在這個時間窗內,降級邏輯是臨時的成為主邏輯,當休眠時間窗到期,斷路器將進入半開狀態,釋放一次請求到原來的主邏輯上,如果此次請求正常返回,那么斷路器將繼續閉合,主邏輯恢復,如果這次請求依然有問題,斷路器繼續進入打開狀態,休眠時間窗重新計時。
Hystrix監控面板
涉及模塊 eureka-server , service-hello (服務提供方), service-ribbon (調用service-hello的消費方), hystrix-dashboard(監控模塊);
1. 新建 hystrix-dashboard 模塊
pom.xml 中引入關鍵依賴如下 :
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>
2. application.yml
spring:
application:
name: hystrix-dashboard
server:
port: 8768
3. 添加啟動類,並且添加合適的注解
@EnableHystrixDashboard //開啟監控頁面
@SpringCloudApplication //包含三個注解 , 開啟服務注冊與發現 , 開啟服務容錯
public class HystrixDashBoardApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixDashBoardApplication.class, args);
}
}
4.測試
訪問http://localhost:8768/hystrix
,彈出以下界面則服務啟動成功
Hystrix Dashboard共支持三種不同的監控方式,依次為:
- 默認的集群監控:通過URLhttp://turbine-hostname:port/turbine.stream開啟,實現對默認集群的監控。
- 指定的集群監控:通過URLhttp://turbine-hostname:port/turbine.stream?cluster=[clusterName]開啟,實現對clusterName集群的監控。
- 單體應用的監控:通過URLhttps://hystrix-app:port/actuator/hystrix.stream開啟,實現對具體某個服務實例的監控。
參數 :
Delay:該參數用來控制服務器上輪詢監控信息的延遲時間,默認為2000毫秒,我們可以通過配置該屬性來降低客戶端的網絡和CPU消耗。
Title:該參數對應了上圖頭部標題Hystrix Stream之后的內容,默認會使用具體監控實例的URL,我們可以通過配置該信息來展示更合適的標題。
5. service-ribbon pom.xml
pom.xml 新增與hystrix 相關的依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<!--dashboard-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
<!--監控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
6. service-ribbon新增配置
service-ribbon 已經 添加過注解@EnableHystrix , springboot2.x 之后還需要在service-ribbon模塊中新增以下配置 :
//springboot 2.x 之后 需要在想要監控的服務中添加 一下內容 ,
@Bean
public ServletRegistrationBean getServlet() {
HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet();
ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet);
registrationBean.setLoadOnStartup(1);
registrationBean.addUrlMappings("/actuator/hystrix.stream"); // 此地址是在hystrix-dashboard 中輸入的監控的地址
registrationBean.setName("HystrixMetricsStreamServlet");
return registrationBean;
}
7. 需要監控的接口上一定要有 @HystrixCommand 注解,否則無法被監控到
8. 測試
啟動 eureka-server , service-hello , service-ribbon , hystrix-dashboard;
在 hystrix-dashboard 的首頁(一個豪豬的頁面) 輸入監控模塊的url localhost:8764/actuator/hystrix.stream
點擊按鈕,進入監控頁面 ,發現 一直在loading ,此時需要調用 以下服務接口,否則數據不會出來
調用接口 http://localhost:8764/hello?name=lisi
, 彈出 hello lisi ,i am from port:8762
此時再查看hystrix-dashboard ,發現已經出現數據 :
根據各種顏色,區分請求狀態對應的的請求數.
Hystrix監控數據聚合
涉及模塊 :
- eureka-server(注冊中心),
- service-hello(服務提供方),
- service-ribbon(服務消費方,同時也是被監控者),
- hystrix-dashboard(監控面板),
- turbine (數據聚合)
新建模塊 turbine
turbine pom.xml
<!-- 提供者消費者 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<!-- dashboard -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
<!-- turbine -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-turbine</artifactId>
</dependency>
turbine application.yml
server:
port: 8770
spring:
application:
name: trubine
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
turbine:
app-config: service-ribbon # 指定了需要收集監控信息的服務名;
combine-host-port: true
cluster-name-expression: new String('default') #參數指定了集群名稱為default, "default" 會報錯
management:
port: 8771
啟動類 注解
@SpringBootApplication
@EnableTurbine //開啟 turbine'
@EnableDiscoveryClient //開啟服務注冊與 發現
@EnableHystrixDashboard //開啟 hystrix
public class TurbineApplication {
public static void main(String[] args) {
SpringApplication.run(TurbineApplication.class, args);
}
}
測試
啟動
- eureka-server(注冊中心),
- service-hello(服務提供方),
- service-ribbon(服務消費方,同時也是被監控者),
- hystrix-dashboard(監控面板),
- turbine (數據聚合);
訪問http://localhost:8768/hystrix
,打開hystrix-dashboard 首頁 ,輸入 localhost:8764/actuator/hystrix.stream
,進入 service-ribbon 的 監控頁面
訪問http://localhost:8768/hystrix
,打開hystrix-dashboard 首頁 ,輸入 http://localhost:8770/turbine.stream
,進入 turbine 的 監控頁面;
訪問http://localhost:8764/hello?name=zhangsan
;
如果 兩個監控頁面發生了變化,證明ok
service-ribbon 監控頁面:
turbine 聚合視圖 :
Hystrix監控數據聚合(amqp)
涉及模塊 :
- eureka-server(注冊中心),
- service-hello(服務提供方),
- service-ribbon(服務消費方,同時也是被監控者),
- hystrix-dashboard(監控面板),
- turbine-amqp (數據聚合-amqp 方式)
- rabbit 服務也要正常啟動
新建模塊 turbine-amqp
turbine-amqp pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--這個是多余的 todo :待驗證-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
</dependency>
turbine-amqp application.yml
server:
port: 8773
spring:
application:
name: turbine-amqp
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
management:
port: 8774
啟動類
@Configuration
@EnableAutoConfiguration
@EnableDiscoveryClient
@EnableTurbineStream //開啟turbine流
public class TurbineAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(TurbineAmqpApplication.class, args);
}
}
對 service-ribbon (service-hello 的服務消費端) 修改 pom.xml
pom.xml 增加依賴:
<!--turbine 通過 amqp方式聚合hystrix監控信息 需要添加的依賴 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
</dependency>
<!--springboot 2.x 在引入 spring-cloud-netflix-hystrix-stream ,還要引入spring-cloud-starter-stream-rabbit 否則會報錯 : A default binder has been requested, but there is no binder available ,原因是因為 hystrix 需要一個持續的輸出源, hystrix-stream的輸出源有rabbit和kafa之類。加上相應的依賴解決報錯問題 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
service-ribbon application.yml
增加 rabbitmq 的配置
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
server:
port: 8764
spring:
application:
name: service-ribbon
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
測試 :
-
啟動 rabbitMq ,並且訪問
http://localhost:15672/
確認正常啟動; -
啟動 eureka-server, service-hello ,service-ribbon, hystrix-dashboard(監控面板), turbine-amqp;
-
訪問
http://localhost:8768/hystrix/
,打開豪豬頁面 , -
訪問
http://localhost:8764/hello?name=lisi
,調用 接口 -
在 豪豬頁面輸入
http://localhost:8764/actuator/hystrix.stream
,打開對service-ribbon 的單個的監控; -
在 豪豬頁面輸入
http://localhost:8773/turbine.stream
,打開對hystrix的聚合的turbine的頁面, 發現無法連接, 這里有一個需要注意的地方:
如何解決 trubine-amqp 無法顯示 的問題
-
訪問
http://localhost:8773/turbine.stream
, 發現
-
打開RabbitMq監控頁面 ,查看 交換機情況 ,兩個交換機都在
-
進入 hystrixStreamOutput 發現有輸入 ,但是沒有 輸入
-
進入hystrixStreamOutput ,將其與 turbineStreamInput 綁定起來 ,具體操作如下
-
查看是否綁定成功, 點擊交換機 turbineStreamInput ,發現如下圖則綁定成功,
-
在 豪豬頁面輸入
http://localhost:8773/turbine.stream
,打開對hystrix的聚合的turbine的頁面 ,ok
truebine-amqp 到此完成.
六 服務網關 Zuul
路由功能
新建模塊 service-zuul,
涉及模塊:
- eureka-server(注冊中心),
- service-hello(服務提供方),
- service-ribbon(服務調用方),
- service-ribbon(服務調用方),
- service-zuul(服務網關)
service-zuul pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-zuul</artifactId>
</dependency>
service-zuul application.yml
在Eureka的幫助下,API網關服務本身就已經維護了系統中所有serviceId與實例地址的映射關系。當有外部請求到達API網關的時候,
根據請求的URL路徑找到最佳匹配的path規則,API網關就可以知道要將該請求路由到哪個具體的serviceId上去。
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
server:
port: 8769
spring:
application:
name: service-zuul
#首先指定服務注冊中心的地址為http://localhost:8761/eureka/,服務的端口為8769,服務名為service-zuul;
#以/api-a/ 開頭的請求都轉發給service-ribbon服務;以/api-b/開頭的請求都轉發給service-feign服務
zuul:
routes:
api-a:
path: /api-a/**
serviceId: service-ribbon
api-b:
path: /api-b/**
serviceId: service-feign
service-zuul 注解啟動類
@EnableZuulProxy,開啟zuul的功能
@SpringBootApplication
@EnableDiscoveryClient
@EnableZuulProxy //@EnableZuulProxy,開啟zuul的功能
public class ServiceZuulApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceZuulApplication.class, args);
}
}
測試
啟動:
- eureka-server(注冊中心),
- service-hello(服務提供方),
- service-ribbon(服務調用方),
- service-ribbon(服務調用方),
- service-zuul(服務網關)
分別使用 http://localhost:8769/api-a/hello?name=zhangsan
和 http://localhost:8769/api-b/hello?name=zhangsan
訪問,觀察service-ribbon和 service-feign的控制台輸出
服務網關之過濾器
服務網關的另一個核心功能就是過濾器.
新增過濾器 MyFilter
MyFilter驗證一下 請求中是否含有token,
在Spring Cloud Zuul中實現的過濾器必須包含4個基本特征:過濾類型、執行順序、執行條件、具體操作。實際上它就是ZuulFilter接口中定義的四個抽象方法:
String filterType();
int filterOrder();
boolean shouldFilter();
Object run();
public class MyFilter extends ZuulFilter {
private static Logger log = LoggerFactory.getLogger(MyFilter.class);
/** * filterType:返回一個字符串代表過濾器的類型,在zuul中定義了四種不同生命周期的過濾器類型,具體如下: * pre:路由之前 * routing:路由之時 * post: 路由之后 * error:發送錯誤調用 * filterOrder:過濾的順序,當請求在一個階段中存在多個過濾器時,需要根據該方法返回的值來依次執行。 * shouldFilter:這里可以寫邏輯判斷,是否要過濾,本文true,因此該過濾器對所有請求都會生效。 * run:過濾器的具體邏輯。可用很復雜,包括查sql,nosql去判斷該請求到底有沒有權限訪問。 */
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 0;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() throws ZuulException {
RequestContext context = RequestContext.getCurrentContext();
HttpServletRequest request = context.getRequest();
log.info(String.format("%s >>> %s", request.getMethod(), request.getRequestURL().toString()));
String token = request.getParameter("token");
if (token == null) {
log.warn("token is empty");
context.setSendZuulResponse(false);
context.setResponseStatusCode(401);
try {
context.getResponse().getWriter().write("token is empty");
} catch (Exception e) {
}
return null;
}
return null;
}
}
配置過濾器
編寫過濾器完成后需要配置過濾器,讓其生效
@Configuration
public class FilterConfig {
//配置過濾器,否則不會生效
@Bean
public MyFilter myFilter() {
return new MyFilter();
}
}
測試
重啟模塊service-zuul:
訪問 http://localhost:8769/api-b/hello?name=zhangsan
,提示 token is empty
;
訪問 http://localhost:8769/api-b/hello?name=zhangsan&token=123
,提示 hello zhangsan ,i am from port:8762
; ok.
服務網關之統一異常處理
自定義 ErrorFilter
由於在請求生命周期的pre、route、post三個階段中有異常拋出的時候都會進入error階段的處理,所以我們可以通過創建一個error類型的過濾器來捕獲這些異常信息,並根據這些異常信息在請求上下文中注入需要返回給客戶端的錯誤描述,這里我們可以直接沿用在try-catch處理異常信息時用的那些error參數,這樣就可以讓這些信息被SendErrorFilter捕獲並組織成消息響應返回給客戶端。比如,下面的代碼就實現了這里所描述的一個過濾器:
public class ErrorFilter extends ZuulFilter {
Logger log = LoggerFactory.getLogger(ErrorFilter.class);
@Override
public String filterType() {
return "error";
}
@Override
public int filterOrder() {
return 10;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() {
RequestContext ctx = RequestContext.getCurrentContext();
Throwable throwable = ctx.getThrowable();
log.error("this is a ErrorFilter : {}", throwable.getCause().getMessage());
ctx.set("error.status_code", HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
ctx.set("error.exception", throwable.getCause());
return null;
}
}
更多 請參考 :http://blog.didispace.com/spring-cloud-zuul-exception-3/
七 Spring Cloud Stream 消息驅動
Spring Cloud Stream 在 Spring Cloud 體系內用於構建高度可擴展的基於事件驅動的微服務,其目的是為了簡化消息在 Spring Cloud 應用程序中的開發。
概念
group :
組內只有1個實例消費。如果不設置group,則stream會自動為每個實例創建匿名且獨立的group——於是每個實例都會消費。
消費者集群高可用下的保持消息被消費一次的處理.
partition:
一個或多個生產者將數據發送到多個消費者,並確保有共同特征標識的數據由同一個消費者處理。默認是對消息進行hashCode,然后根據分區個數取余,所以對於相同的消息,總會落到同一個消費者上。
destination binder:
與外部消息系統通信的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。Binder使Spring Cloud Stream應用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder。
destination binding
Binding 是連接應用程序跟消息中間件的橋梁,用於消息的消費和生產,由binder創建。 使用@EnableBinding即可定義destination binding
新建模塊 stream-hello,(也可以新增 兩個模塊stream-producer,stream-consumer,一個作為生產者使用,一個座位消費者使用)
涉及模塊 :stream-hello,(stream-producer,stream-consumer)
使用官方的Sink.class Source.class 簡單測試
stream-hello pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--spring-cloud-starter-stream-rabbit 可替換為 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> -->
producer application.yml
server:
port: 8775 # stream-hello 分別以 8775(producer) ,8776(concumer) 啟動兩次
spring:
application:
name: stream-hello
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
output:
destination: stream-exchange
binder: localhost_rabbit #也可以是其他中間件 如 kafka
binders: #目標綁定器,目標指的是 kafka 還是 RabbitMQ,綁定器就是封裝了目標中間件的包。
localhost_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
consumer application.yml
server:
port: 8776 # stream-hello 分別以 8775(producer) ,8776(concumer) 啟動兩次
spring:
application:
name: stream-hello
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
input:
destination: stream-exchange # 指 exchange 的名稱
binder: localhost_rabbit
binders: #目標綁定器,目標指的是 kafka 還是 RabbitMQ,綁定器就是封裝了目標中間件的包。
localhost_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
Producer
@EnableBinding(Source.class)
public class Producer {
private static Logger logger = LoggerFactory.getLogger(Producer.class);
@Autowired
@Output(Source.OUTPUT)
private MessageChannel channel;
public void send(String message) {
logger.info("send massage begin...............................");
channel.send(MessageBuilder.withPayload("Producer send massage:" + message).build());
logger.info("send massage end...............................");
}
}
Consumer
//當我們需要為@EnableBinding指定多個接口來綁定消息通道的時候,可以這樣定義:@EnableBinding(value = {Sink.class, Source.class})
//注解用來指定一個或多個定義了@Input或@Output注解的接口,以此實現對消息通道(Channel)的綁定
@EnableBinding(Sink.class)
public class Consumer {
private static Logger logger = LoggerFactory.getLogger(Consumer.class);
@StreamListener(Sink.INPUT) //該注解主要定義在方法上,作用是將被修飾的方法注冊為消息中間件上數據流的事件監聽器,注解中的屬性值對應了監聽的消息通道名
public void receive(Object o) {
logger.info("receive message: " + o);
}
}
發送信息的類
可以用手動發送信息的接口:
@RestController
@RequestMapping
public class ProduceController {
private static Logger logger = LoggerFactory.getLogger(ProduceController.class);
@Resource
private Producer producer;
@RequestMapping("/send")
public void sendMessage(String message) {
producer.send("ProduceController send message:" + message);
}
}
也可以自動發送信息
@EnableBinding(Source.class)
public class TimerProcuer {
private static Logger logger = LoggerFactory.getLogger(TimerProcuer.class);
private final String format = "yyyy-MM-dd HH:mm:ss";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
logger.info("TimerProcuer sendMessage begin ..........");
return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
}
}
測試
- 啟動 rabbitMq
- 以 8775 ,output 配置段 啟動 stream-hello 作為生產者
- 以 8776 ,input 配置段 啟動 stream-hello 作為消費者
- 手動發送消息
http://localhost:8775/send?message=zhangsan
,查看控制台 - 8775 producer 控制台
com.ryze.sample.send.Producer : send massage begin...............................
com.ryze.sample.send.Producer : send massage end...............................
- 8776 consumer 控制台 :
com.ryze.sample.receive.Consumer : receive message: Producer send massage:ProduceController send message:zhangsan
stream 核心概念之消費組
spring.cloud.stream.bindings.<通道名>.group=<消費組名>
很多情況下,消息生產者發送消息給某個具體微服務時,只希望被消費一次,按照上面我們啟動兩個應用的例子,雖然它們同屬一個應用,但是這個消息出現了被重復消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念。
如果在同一個主題上的應用需要啟動多個實例的時候,我們可以通過spring.cloud.stream.bindings.input.group屬性為應用指定一個組名,這樣這個應用的多個實例在接收到消息的時候,只會有一個成員真正的收到消息並進行處理。
消費組和分區的設置
給消費者設置消費組和主題
設置消費組: spring.cloud.stream.bindings.<通道名>.group=<消費組名>
設置主題: spring.cloud.stream.bindings.<通道名>.destination=<主題名>
給生產者指定通道的主題:spring.cloud.stream.bindings.<通道名>.destination=<主題名>
消費者開啟分區,指定實例數量與實例索引
開啟消費分區: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
消費實例數量: spring.cloud.stream.instanceCount=1 (具體指定)
實例索引: spring.cloud.stream.instanceIndex=1 #設置當前實例的索引值
生產者指定分區鍵
分區鍵: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分區鍵>
分區數量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分區數量>
測試
-
保持 RabbitMq 開啟狀態
-
啟動 stream-producer(8777)
-
分別以 8778,8779 啟動 stream-consumer ;
-
查看 8778 8779控制台 的 輸出,兩者 輸出內容是相同的.
-
修改stream-consumer application.yml
以8778 啟動 group : group-A
的 消費者, 以8779 啟動 group : group-A
的 消費者
server:
port: 8778
spring:
application:
name: stream-hello
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
input:
destination: stream-exchange # 指 exchange 的名稱
binder: localhost_rabbit
group : group-A
#以下省略
server:
port: 8779
spring:
application:
name: stream-hello
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
input:
destination: stream-exchange # 指 exchange 的名稱
binder: localhost_rabbit
group : group-A
#以下省略
- 啟動 stream-producer(8777)
- 查看 stream-consumer 的輸出 ,證明group 的配置是生效的
一個為
2020-03-03 11:03:24.288 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:24
2020-03-03 11:03:34.291 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:34
2020-03-03 11:03:44.347 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:44
2020-03-03 11:03:54.351 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:54
2020-03-03 11:04:04.354 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:04:04
2020-03-03 11:04:14.398 INFO 9784 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:04:14
一個為
2020-03-03 11:03:29.289 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:29
2020-03-03 11:03:39.473 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:39
2020-03-03 11:03:49.347 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:49
2020-03-03 11:03:59.352 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:03:59
2020-03-03 11:04:09.398 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:04:09
2020-03-03 11:04:19.395 INFO 11124 --- [hange.group-A-1] com.ryze.sample.receive.Consumer : receive message: 2020-03-03 11:04:19
stream 核心概念之消息分區
這一塊在測試時,遇到很多問題 : 大多是關於 application.yml 中關於 分區的配置,引起 如 生產者啟動是失敗 ,消費者接受不到消息,尚待更多研究…
當生產者將消息數據發送給多個消費者實例時,保證同一消息數據始終是由同一個消費者實例接收和處理。
stream-producer application.yml
加上 spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpression
-> 分區表達式, 例如當表達式的值為1, 那么在訂閱者的instance-index中為1的接收方, 將會執行該消息.
和 spring.cloud.stream.bindings.<通道名>.producer.partitionCount
-> 指定參與消息分區的消費端節點數量為2個
配置 如下
server:
port: 8777 # stream-hello 分別以 8775(producer) ,8776(concumer) 啟動兩次
spring:
application:
name: stream-producer
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
output:
destination: stream-exchange
binder: localhost_rabbit
producer: # --------------為了測試 分區加入的配置 begin
partitionKeyExpression: headers['partitionKey'] #一旦計算出消息的key,分區選擇程序將把目標分區確定為介於0和partitionCount - 1之間的值
partitionCount: 2
# --------------為了測試 分區加入的配置 end
binders: #目標綁定器,目標指的是 kafka 還是 RabbitMQ,綁定器就是封裝了目標中間件的包。
localhost_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
stream-producer TimerProducer
@EnableBinding(Source.class)
public class TimerProcuer {
private static Logger logger = LoggerFactory.getLogger(TimerProcuer.class);
private final String format = "yyyy-MM-dd HH:mm:ss";
// @Bean
// @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
// public MessageSource<String> timerMessageSource() {
// logger.info("TimerProcuer sendMessage begin ..........");
// return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
// }
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
}
stream-consumer application.yml
主要加入
spring.cloud.stream.bindings.<通道名>.consumer.partitioned
, -> 開啟分區
spring.cloud.stream.bindings.<通道名>.consumer.instanceCount
, ->由於本例中 啟動兩個消費者(producer 也設置的2),代表 實例的個數
spring.cloud.stream.bindings.<通道名>.consumer.instanceIndex
, -> 代表實例的下標,
server:
port: 8779
spring:
application:
name: stream-consumer
cloud:
stream:
bindings: # 外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
input:
destination: stream-exchange # 指 exchange 的名稱
binder: localhost_rabbit
group : group-A
# -----------為了測試分區加入的配置 - begin
consumer:
partitioned: true # 開啟分區,默認為 false
instanceCount: 2 # 消費實例數量
instanceIndex: 1 # 設置當前實例的索引值 0,1...instanceCount-1
# -----------為了測試分區加入的配置 - end
binders: #目標綁定器,目標指的是 kafka 還是 RabbitMQ,綁定器就是封裝了目標中間件的包。
localhost_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
測試
- 以
server.port: 8778
和instanceIndex: 0
啟動 stream-consumer作為第一個消費者 - 以
server.port: 8779
和instanceIndex: 1
啟動 stream-consumer作為第二個消費者 - 啟動service-producer
- 查看 8778 ,8779 控制台 ;發現 8778 無輸出 ,8779 輸出一下內容 , 證明分區.
2020-03-03 15:16:59.746 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:04.748 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:09.750 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:14.750 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:19.825 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:24.803 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
2020-03-03 15:17:29.805 INFO 10528 --- [nge.group-A-1-1] com.ryze.sample.receive.Consumer : receive message: qux1
八.Spring Cloud Sleuth 服務鏈路追蹤
在一個完整的微服務架構項目中,服務之間的調用是很復雜的,Spring Cloud Sleuth可以幫助我們清楚直觀的了解每一個服務請求經過了哪些服務,用時多久,誰依賴誰或者被誰依賴。
Sleuth quick Start
新建模塊 trace-1(可以直接將service-ribbon copy過來 )
trace-1 pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--客戶端負載均衡組件依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<!--服務追蹤-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
trace-1 application.yml
spring:
application:
name: trace-1
server:
port: 8780
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
啟動類
@SpringBootApplication
@EnableDiscoveryClient
public class Trace1Application {
public static void main(String[] args) {
SpringApplication.run(Trace1Application.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}
調用接口
@RestController
public class TraceController {
private final Logger logger = LoggerFactory.getLogger(TraceController.class);
@Resource
private RestTemplate restTemplate;
@GetMapping(value = "/trace-1")
public String trace() {
logger.info("================trace-1 begin================");
return restTemplate.getForEntity("http://trace-2/trace-2", String.class).getBody();
}
}
新建模塊trace-2 ,pom.xml,application.yml,啟動類 同理
trace-2 被調用接口
@RestController
public class TraceController {
private final Logger logger = LoggerFactory.getLogger(TraceController.class);
private final String RETURN_STR = "trace-2";
@GetMapping(value = "/trace-2")
public String trace() {
logger.info("================trace-2 begin================");
return RETURN_STR;
}
}
測試
訪問 http://localhost:8780/trace-1
, 查看 trace-1,trace-2 的控制台輸出:
INFO [trace-1,c35be1c226c535c4,c35be1c226c535c4,false] 9628 --- [nio-8780-exec-1] c.r.sample.controller.TraceController : ================trace-1 begin================
INFO [trace-2,0f543d7a73490fe4,fe761e5df5da981c,false] 2936 --- [nio-8781-exec-4] c.r.sample.controller.TraceController : ================trace-2 begin================
從上面的控制台輸出內容中,我們可以看到多了一些形如[trace-1,c35be1c226c535c4,c35be1c226c535c4,false]的日志信息,而這些元素正是實現分布式服務跟蹤的重要組成部分,它們每個值的含義如下:
第一個值:trace-1,它記錄了應用的名稱,也就是application.properties中spring.application.name參數配置的屬性。
第二個值:c35be1c226c535c4,Spring Cloud Sleuth生成的一個ID,稱為Trace ID,它用來標識一條請求鏈路。一條請求鏈路中包含一個Trace ID,多個Span ID。
第三個值:c35be1c226c535c4,Spring Cloud Sleuth生成的另外一個ID,稱為Span ID,它表示一個基本的工作單元,比如:發送一個HTTP請求。
第四個值:false,表示是否要將該信息輸出到Zipkin等服務中來收集和展示。
上面四個值中的Trace ID和Span ID是Spring Cloud Sleuth實現分布式服務跟蹤的核心。在一次服務請求鏈路的調用過程中,會保持並傳遞同一個Trace ID,從而將整個分布於不同微服務進程中的請求跟蹤信息串聯起來,以上面輸出內容為例,trace-1和trace-2同屬於一個前端服務請求來源,所以他們的Trace ID是相同的,處於同一條請求鏈路中。
Sleuth 整合logstash
trace-1 pom.xml
新增 logstash 的依賴,此處要注意版本的問題,經過測試 springboot 2.1.2 可以使用 logstash 6.3 版本
<!--整合logstash-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
trace-2 配置文件修改
- 新增配置文件 bootstrap.properties,將
spring.application.name=trace-1
配置段移到 bootstrap.properties 文件中;
當然也可以將application.yml全部配置復制到bootstrap.properties,然后 刪除掉多余的application.yml.
spring.application.name=trace-1
- 新增 logback-spring.xml
本例使用將日志輸出到json文件的做法,所以指定的 appender 為RollingFileAppender
,見配置1;
也可以使用LogstashTcpSocketAppender
將日志內容直接通過Tcp Socket輸出到logstash服務端 ,見配置2
配置1:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<springProperty scope="context" name="springAppName" source="spring.application.name"/>
<!-- 日志在工程中的輸出位置 -->
<property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>
<!-- 控制台的日志輸出樣式 -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<!-- 控制台Appender -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 為logstash輸出的json格式的Appender -->
<appender name="logstash" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="logstash"/>
</root>
</configuration>
配置2
<?xml version="1.0" encoding="UTF-8"?>
<!--該日志將日志級別不同的log信息保存到不同的文件中 -->
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<springProperty scope="context" name="springAppName"source="spring.application.name" />
<!-- 日志在工程中的輸出位置 -->
<property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}" />
<!-- 控制台的日志輸出樣式 -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" />
<!-- 控制台輸出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<!-- 日志輸出編碼 -->
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 為logstash輸出的JSON格式的Appender -->
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:5044</destination> <!--5044是默認的端口 -->
<!-- 日志輸出編碼 -->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!-- 日志輸出級別 -->
<root level="INFO">
<appender-ref ref="console" />
<appender-ref ref="logstash" />
</root>
</configuration>
trace-2
對trace-2 做與trace-1同樣的改造處理 ;
測試
- 啟動 eureka-server
- 啟動 trace-1,trace-2
- 調用接口
http://localhost:8780/trace-1
, - 在 項目下生成 build 目錄 ,其中產生了 兩個json文件
- 格式如下
{"@timestamp":"2020-03-04T00:58:40.236Z","severity":"INFO","service":"trace-1","trace":"2b5bb0d8bd6f5e1b","span":"2b5bb0d8bd6f5e1b","exportable":"false","pid":"8740","thread":"http-nio-8780-exec-1","class":"c.ryze.sample.controller.TraceController","rest":"================trace-1 begin================"}
{"@timestamp":"2020-03-04T00:58:40.497Z","severity":"INFO","service":"trace-1","trace":"2b5bb0d8bd6f5e1b","span":"e2748e57496a2a19","exportable":"false","pid":"8740","thread":"http-nio-8780-exec-1","class":"c.netflix.config.ChainedDynamicProperty","rest":"Flipping property: trace-2.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647"}
{"@timestamp":"2020-03-04T00:58:40.538Z","severity":"INFO","service":"trace-1","trace":"2b5bb0d8bd6f5e1b","span":"e2748e57496a2a19","exportable":"false","pid":"8740","thread":"http-nio-8780-exec-1","class":"c.n.util.concurrent.ShutdownEnabledTimer","rest":"Shutdown hook installed for: NFLoadBalancer-PingTimer-trace-2"}
{"@timestamp":"2020-03-04T00:58:40.539Z","severity":"INFO","service":"trace-1","trace":"2b5bb0d8bd6f5e1b","span":"e2748e57496a2a19","exportable":"false","pid":"8740","thread":"http-nio-8780-exec-1","class":"c.netflix.loadbalancer.BaseLoadBalancer","rest":"Client: trace-2 instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=trace-2,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null"}
Sleuth 整合zipkin
Zipkin分布式跟蹤能幫助我們及時地發現系統中出現的延遲升高問題並找出系統性能瓶頸的根源.
Zipkin由4個核心組件構成:
- Collector:收集器組件,它主要用於處理從外部系統發送過來的跟蹤信息,將這些信息轉換為Zipkin內部處理的Span格式,以支持后續的存儲、分析、展示等功能。
- Storage:存儲組件,它主要對處理收集器接收到的跟蹤信息,默認會將這些信息存儲在內存中,我們也可以修改此存儲策略,通過使用其他存儲組件將跟蹤信息存儲到數據庫中。
- RESTful API:API組件,它主要用來提供外部訪問接口。比如給客戶端展示跟蹤信息,或是外接系統訪問以實現監控等。
- Web UI:UI組件,基於API組件實現的上層應用。通過UI組件用戶可以方便而有直觀地查詢和分析跟蹤信息。
zipkin quick start (HTTP方式收集)
在Spring cloud D版本以后,zipkin-server是通過引入依賴的方式構建的,到了E版本之后,官方就是開始啟用了jar的形式來運行zipkin-server。所以我們先到zipkin的官網下載最新的zipkin.jar。
涉及模塊:
- zipkin-server-2.10.1-exec.jar (java-jar啟動)
- eureka-server
- trace-1
- trace-2
zipkin-server
下載zipkin-server-2.10.1-exec.jar, 以java -jar方式啟動, 訪問 http://localhost:9411
, 出現界面,證明啟動成功
修改要跟蹤的模塊 pom.xml
在 trace-1,trace-2 pom.xml中新增 依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
修改要跟蹤的模塊 application.yml
在 trace-1,trace-2 application.yml中新增 zipkin配置段:
spring:
sleuth:
sampler:
probability: 1 #采樣頻率
web:
enabled: true
zipkin:
base-url: http://localhost:9411/ #zipkin服務地址
測試
- 啟動 eureka-server
- 以java -jar zipkin-server-2.10.1-exec.jar 啟動 zipkin-server
- 啟動trace-1 ,trace-2
- 訪問
http://localhost:8780/trace-1
, - 訪問
http://localhost:9411/zipkin
,點擊 FindTraces,界面出現反應
zipkin quick start (中間件方式收集)
zipkin的原理是服務之間的調用關系會通過HTTP方式上報到zipkin-server端,然后我們再通過zipkin-ui去調用查看追蹤服務之間的調用鏈路。但是這種方式存在一個隱患,如果微服務之間與zipkin服務端網絡不通,或調用鏈路上的網絡閃斷,http通信收集方式就無法工作。而且zipkin默認是將數據存儲在內存中的,如果服務端重啟或宕機,就會導致數據丟失。
通過結合Spring Cloud Stream,我們可以非常輕松的讓應用客戶端將跟蹤信息輸出到消息中間件上,同時Zipkin服務端從消息中間件上異步地消費這些跟蹤信息。
新建模塊trace-1-rabbitmq,trace-2-rabbitmq ;
涉及模塊:
- rabbitMq
- zipkin-server-2.10.1-exec.jar (啟動 java -jar zipkin-server-2.10.1-exec.jar --zipkin.collector.rabbitmq.addresses=127.0.0.1)
- eureka-server
- trace-1-rabbitmq
- trace-2-rabbitmq
trace-1-rabbitmq pom.xml
核心依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--客戶端負載均衡組件依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<!--服務追蹤-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<!--整合logstash-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
<!--整合zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<!--rabbitMq 方式-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
trace-1-rabbitmq application.yml
注釋 zipkin.base-url ,添加 rabbitmq注釋段
server:
port: 8783
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
spring:
sleuth:
sampler:
probability: 1 #采樣頻率
web:
enabled: true
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# rabbitmq 方式注釋http
# zipkin:
# base-url: http://localhost:9411/ #zipkin服務地址
調用接口
@RestController
public class TraceController {
private final Logger logger = LoggerFactory.getLogger(TraceController.class);
@Resource
private RestTemplate restTemplate;
@GetMapping(value = "/trace-1-rabbitmq")
public String trace() {
logger.info("================trace-1-rabbitmq begin================");
return restTemplate.getForEntity("http://trace-2-rabbitmq/trace-2-rabbitmq", String.class).getBody();
}
}
trace-2-rabbitmq 同理構造
測試
- zipkin-server-2.10.1-exec.jar (啟動 java -jar zipkin-server-2.10.1-exec.jar --zipkin.collector.rabbitmq.addresses=127.0.0.1)
- eureka-server
- trace-1-rabbitmq
- trace-2-rabbitmq
-
啟動 rabbitMq
-
啟動 zipkin-server (雙擊start-rabbitmq.bat)
-
啟動 eureka-server
-
啟動 trace-1-rabbitmq,trace-2-rabbitmq
5 .訪問http://localhost:8782/trace-1-rabbitmq
,查看 zipkin .
-
查看 rabbitmq
Sleuth+zipkin 收集原理
-
Span:它代表了一個基礎的工作單元。我們以HTTP請求為例,一次完整的請求過程在客戶端和服務端都會產生多個不同的事件狀態(比如下面所說的四個核心Annotation所標識的不同階段),
對於同一個請求來說,它們屬於一個工作單元,所以同一HTTP請求過程中的四個Annotation同屬於一個Span。每一個不同的工作單元都通過一個64位的ID來唯一標識,稱為Span ID。
另外,在工作單元中還存儲了一個用來串聯其他工作單元的ID,它也通過一個64位的ID來唯一標識,稱為Trace ID。在同一條請求鏈路中的不同工作單元都會有不同的Span ID,但是它們的Trace ID是相同的,所以通過Trace ID可以將一次請求中依賴的所有依賴請求串聯起來形成請求鏈路。
除了這兩個核心的ID之外,Span中還存儲了一些其他信息,比如:描述信息、事件時間戳、Annotation的鍵值對屬性、上一級工作單元的Span ID等。 -
Trace:它是由一系列具有相同Trace ID的Span串聯形成的一個樹狀結構。在復雜的分布式系統中,每一個外部請求通常都會產生一個復雜的樹狀結構的Trace。
-
Annotation:它用來及時地記錄一個事件的存在。我們可以把Annotation理解為一個包含有時間戳的事件標簽,對於一個HTTP請求來說,在Sleuth中定義了下面四個核心Annotation來標識一個請求的開始和結束:
– cs(Client Send):該Annotation用來記錄客戶端發起了一個請求,同時它也標識了這個HTTP請求的開始。
– sr(Server Received):該Annotation用來記錄服務端接收到了請求,並准備開始處理它。通過計算sr與cs兩個Annotation的時間戳之差,我們可以得到當前HTTP請求的網絡延遲。
– ss(Server Send):該Annotation用來記錄服務端處理完請求后准備發送請求響應信息。通過計算ss與sr兩個Annotation的時間戳之差,我們可以得到當前服務端處理請求的時間消耗。
– cr(Client Received):該Annotation用來記錄客戶端接收到服務端的回復,同時它也標識了這個HTTP請求的結束。通過計算cr與cs兩個Annotation的時間戳之差,我們可以得到該HTTP請求從客戶端發起開始到接收服務端響應的總時間消耗。
- BinaryAnnotation:它用來對跟蹤信息添加一些額外的補充說明,一般以鍵值對方式出現。比如:在記錄HTTP請求接收后執行具體業務邏輯時,此時並沒有默認的Annotation來標識該事件狀態,但是有BinaryAnnotation信息對其進行補充。
九 模塊及占用端口 :
eureka-server : 8761
service-hello : 8762 8763
service-ribbon : 8764
service-feign : 8765
config-server : 8766
config-client : 8767
hystrix-dashboard : 8768
service-zuul : 8769
turbine : 8770 8771
service-hello-consul : 8772
turbine-amqp : 8773 8774
stream-hello : 8775(producer) 8776(consumer)
stream-producer :8777
stream-consumer :8778 8779(測試消費組的概念)
trace-1: 8780
trace-2: 8781
zipkin-server : 9441 (默認)
trace-1-rabbitmq: 8782
trace-2-rabbitmq: 8783
本文 參考 :
http://www.itmuch.com/spring-cloud 作者:周立
http://blog.didispace.com/ 作者 :程序員DD