SpringCloud中Config
1.Config的簡介
官網
分布式系統面臨的問題
微服務意味着要將單體應用中的業務拆分成一個個子服務,每個服務的粒度相對較小,因此系統中會出現大量的服務。由於每個服務都需要必要的配置信息才能運行,所以套集中式的、動態的配置管理設施是必不可少的。
SpringCloud提供了ConfigServer來解決這個問題,我們每一個微服務自己帶着一個application.yml,. 上百個配置文件的管理…
太多了,需要一個東西把所有的都管理起來,這就有了我們的config
config是什么
SpringCloud Config為微服務架構中的微服務提供集中化的外部配置支持,配置服務器為各個不同微服務應用的所有環境提供了一個中心化的外部配置。
如何使用
SpringCloud Config分為服務端和客戶端兩部分。
-
服務端也稱為分布式配置中心,它是一個獨立的微服務應用,睞連接配置服務器並為客戶端提供獲取配置信息,加密/解密信息等訪問接口
-
客戶端則是通過指定的配置中心來管理應用資源,以吸與業務相關的配置內容,並在啟動的時候從配置中心獲取和加載配置信息配置服務器默認采用git來存儲配置信息,這樣就有助於對環境配置進行版本管理,並且可以通過git客戶端工具來方便的管理和訪問配置內容
能做什么
- 集中管理配置文件
- 不同環境不同配置,動態化的配置更新,分環境部署比如dev/test/prod/beta/release
- 運行期間動態調整配置,不再需要在每個服務部署的機器上編寫配置文件,服務會向配置中心統一拉取配置自己的信息
- 當配置發生變動時,服務不需要重啟即可感知到配置的變化並應用新的配置
- 將配置信息以REST接口的形式暴露
與git的配合使用
由於SpringCloud Config默認使用Git來存儲配置文件(也有其它方式,比如支持SVN和本地文件),但最推薦的還是Git,而且使用的是http/https訪問的形式
2.Config服務端的配置和測試
准備
先在自己的github或gitee賬號建配置倉庫。
我的config倉庫
同時要保證項目中有注冊中心7001可以在我的倉庫中復制
我的項目倉庫
在idea中新建項目
- 建項目
項目名稱cloud-config-center3344 - 寫pom
<dependencies>
<!-- 添加消息總線RabbitMQ支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>wf.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
寫yml
server:
port: 3344
spring:
application:
name: cloud-config-center
cloud:
config:
server:
git:
uri: https://gitee.com/gyhdx/SpringCloud-Config.git
search-paths:
- springcoud-config
label: master
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
主啟動
@SpringBootApplication
@EnableConfigServer
public class ConfigMain3344 {
public static void main(String[] args) {
SpringApplication.run(ConfigMain3344.class,args);
}
}
測試
什么的內容配置完成,啟動7001和3344對項目進行測試
訪問http://localhost:3344/master/config-dev.yml
成功
配置的讀取規則(怎么知道訪問上面的鏈接就能得到數據)
上面是config官網提供的5種訪問的方式,這只介紹常用的三種
/{label}/{application}-{profile}.yml
這是我們上面用到的訪問方式
- {label}
就是訪問的那個分支master還是其他如dev等分支 - {application}-{profile}.yml
這是一種配置文件的命名規則
{application}是前綴config
{profile}是后綴dev等等,兩個之間使用-連接
名稱config-dev是可以自定義
/{application}-{profile}.yml
和上面的服務方式類似只是省略服務的分支。該一般默認訪問master分支
/{application}/{profile}[/{label}]
就是把分支放在后面。不過不同的是使用以上方式訪問返回的是一個json串而不是一個字符串。
總結
label:分支(branch)
name :服務名
profiles:環境(dev/test/prod)
3.config的客戶端的配置和測試
不改動的直接在倉庫中復制,下面不寫
- 建項目
- 寫pom
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>wf.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 寫yml
此時的yml是bootstrap.yml因為該yml比application.yml先加載
server:
port: 3355
spring:
application:
name: config-client
cloud:
config:
label: master
name: config
profile: dev
uri: http://localhost:3344
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
- 主啟動
- controller
@RestController
@Slf4j
public class ConfigController {
@Value("${config.info}")
private String configInfo;
@GetMapping(value = "/configInfo")
public String getInfo(){
return configInfo;
}
}
測試
啟動項目
訪問http://localhost:3355/configInfo
完成
問題
上面的內容配置完成雖然可以正常訪問,但存在一個問題。就是如果在倉庫中改變配置的內容
把圈出來的改為4
訪問3344服務端時數據被實時更新了
但是客戶端的數據沒被更新
重啟項目才能刷新數據。
如何實現客戶端內容的動態刷新?
實現config客戶端動態刷新
先改3355的配置
確保pom中有actuator依賴
在bootstrap.yml中添加監控點
# 暴露監控端點
management:
endpoints:
web:
exposure:
include: "*"
在controller添加@RefreshScope注解
@RestController
@Slf4j
@RefreshScope
public class ConfigController {
@Value("${config.info}")
private String configInfo;
@GetMapping(value = "/configInfo")
public String getInfo(){
return configInfo;
}
}
啟動項目,修改倉庫中的配置,此時訪問時,客戶端依然不能實時更新數據,但是此時通過cmd發送
完成再次訪問客戶端就能發現數據更新了,雖然上面的發送可以在客戶端不重啟的模式下更新數據但是還有很大的限制。想要靈活的實現客戶端數據的實時更新,需要下面的知識。
SpringCloud的Bus
Spring Cloud Bus配合Spring Cloud Config使用可以實現配置的動態刷新。
以下是一種拓撲機制,一個客戶機接收到消息,就會給整個群中的機器發送。
Spring Cloud Bus是用來將分布式系統的節點與輕量級消息系統鏈接起來的框架,
它整合了Java的事件處理機制和消息中間件的功能。
Spring Clud Bus目前支持RabbitMQ和Kafka。
作用
Spring Cloud Bus能管理和傳播分布式系統間的消息,就像一個分布式執行器, 可用於廣播狀態更改、事件推送等,也可以當作微服務間的通信通道
這張圖和上面的那張圖類似,但不同的是,這里只需要對服務機進行通知信息,通過服務機來通知群組中的其他客戶機。
為什么被稱為信息總線(bus)
- 什么是總線
在微服務架構的系統中,通常會使用輕量級的消息代理來構建一個共用的消息主題, 並讓系統中所有微服務實例都連接上來。由於該主題中產生的消息會被所有實例監聽和消費,所以稱它為消息總線。在總線上的各個實例,都可以方便地廣播一些需要讓其他連接在該主題上的實例都知道的消息。 - 基本原理
ConfigClient實例都監聽MQ中同一個topic(默認是springCloudBus)。當一個服務刷新數據的時候,它會把這個信息放入到Topic中,這樣其它監聽同一Topic的服務就能得到通知,然后去更新自身的配置。
Bus的配置
注意:本次bus是通過RabbitMq來實現消息的發送,故如果需要完成配置。需要在機器上安裝RabbitMQ和Erlang。自行百度,這里不給出具體安裝流程。
新建項目3366
上面只有一個項目不好演示集群效果,故新建一個客戶機3366.
直接從我的倉庫中復制即可
- 建項目
- 寫pom
- 寫bootstrap.yml
- 主啟動
- controller
bus刷新全局廣播設計思想
兩種
- 1)利用消息總線觸發一個 客戶端/bus/refresh,而刷新所有客戶端的配置口
- 2)利用消息總線觸發一個服務端ConfigServer的/bus/refresh端點,而刷新所有客戶端的配置圓
以上兩種設計思想中第二種要好,因為第一種 - 打破了微服務的職責單一性, 因為微服務本身是業務模塊,它本不應該承擔配置刷新的職責。
- 破壞了微服務各節點的對等性。
- 有一定的局限性。例如,微服務在遷移時,它的網絡地址常常會發生變化,此時如果想要做到自動刷新,那就會增加更多的修改
服務的引入消息總線的支持(3344)
- 改pom
添加RabbitMQ的支持
<!--添加消息總線RabbitMQ支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
- 該yml
server:
port: 3344
spring:
application:
name: cloud-config-center
cloud:
config:
server:
git:
uri: https://gitee.com/gyhdx/SpringCloud-Config.git
search-paths:
- springcoud-config
label: master
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
# 暴露監控端點
management:
endpoints:
web:
exposure:
include: "bus-refresh"
添加了
rabbitmq:
host: 192.168.113.6
port: 5672
username: guest
password: guest
開啟對rabbitMQ的注冊
# 暴露監控端點
management:
endpoints:
web:
exposure:
include: "bus-refresh"
暴露監控點讓后續的命令好執行
客戶端添加消息總線的支持
- 改pom
<!-- 添加消息總線RabbitMQ支持 在第一次演示時不要添加該依賴不然不配置相關環境會出問題 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
- 改yml
server:
port: 3366
spring:
application:
name: config-client
cloud:
config:
label: master
name: config
profile: dev
uri: http://localhost:3344
rabbitmq:
host: 192.168.113.6
port: 5672
username: guest
password: guest
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
# 暴露監控端點
management:
endpoints:
web:
exposure:
include: "*"
同服務端
測試
完成上述配置啟動7001(注冊中心)3344、3355、3366
訪問http://localhost:3344/master/config-dev.yml、http://localhost:3355/configInfo、http://localhost:3366/configInfo可以看到三個鏈接訪問結果相同都是
此時修改遠程倉庫中的version
並提交,會發現3344端口同步數據,但是3355、3366的數據沒變化,此時(要開啟RabbitMQ),通過命令行向3344端口發信息
curl -X POST "http://localhost:3344/actuator/bus-refresh"
再次訪問3355、3366會發現數據已經同步。
bus實現定點通知
指定具體某一個實例生效而不是全部
公式: http://localhost:配置中心的端口號/actuator/bus-refresh/ {destination}
/bus/refresh請求不再發送到具體的服務實例上,而是發給config server通過destination參數類指定需要更新配置的服務或實例
測試
修改遠程倉庫中的version,3344同步3355、3366不同步通過命令行發送curl -X POST “http://localhost:3344/actuator/bus-refresh/config-client:3355”
在實現3355、3366的鏈接會發現3355同步數據,而3366沒有。
SpringCloud中的Stream
官網地址:https://spring.io/projects/spring-cloud-stream#overview
我在這使用的是rabbitmq,stream整合RabbitMQ的官網:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/2.2.1.RELEASE/spring-cloud-stream-binder-rabbit.html
1.為什么會出現stream
上面的bus中為了解決全局動態刷新問題引入了RabbitMQ信息中間件,然而此處有一個問題,我們現在的系統分為了三個部分前端,后端和大數據處理
但是我們可能會在后端和大數據處理部分使用不同的信息中間件。但是各個信息中間件之間是不能通信的,故為了解決不同信息中間件之間的通信問題我們引入了Stream。
stream讓我們不再關注具體MQ的細節我們只需要用一種適配綁定的方式,自動的給我們在各種MQ內切換,
總結
屏蔽底層消息中間件的差異降低切換成本,統一消息的編程模型
2.什么是SpringCloudStream
官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。
應用程序通過inputs或者outputs來與Spring Cloud Stream中binder對象交互。
通過我們配置來binding(綁定),而Spring Cloud Stream的binder對象負責與消息中間件交互。
所以,我們只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅動的方式。
通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動。
Spring Cloud Stream為-些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。
目前僅支持RabbitMQ、Kafka.其他幾種MQ有其他的框架支持,我們后續在講。
3.Stream設計思想
標准MQ的通信方式
- 生產者/消費者之間靠消息媒介傳遞信息內容:Message(pub)
- 消息必須走特定的通道:消息通道MessageChannel(queue)
- 消息通道里的消息如何被消費呢,誰負責收發處理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消 息處理器所訂閱(sub)
為什么用Cloud Stream
比方說我們用到了RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,
像RabbitMQ有exchange, kafka有Topic和Partitions分區,故我們需要一種東西屏蔽不同信息中間件的差異
這些中間件的差異性導致我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中-種,后面的業務需求,想往另外一種消息隊列進行廷移,這時候無疑就是一個災難性的, 一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。
stream憑什么可以統一底層差異?
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,
由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性
通過定義綁定器作為中間層,完美地實現了應用程序與消息中間件細節之間的隔離。
通過向應用程序暴露統一的Channel通道, 使得應用程序不需要再考慮各種不同的消息中間件實現。
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節之間的隔離。
Binder
INPUT對應於消費者
OUTPUT對應於生產者
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節之間的隔離。
Stream中的消息通信方式遵循了發布-訂閱模式
Topic主題進行廣播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
4.stream的標准流程
官方架構圖,圖1
對信息方式及處理流程,圖2
- Binder:很方便的連接中間件,屏蔽差異
- Channel:通道是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過Channel對隊列進行配置
- Source和Sink:簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發布消息就是輸出,接受消息就是輸入。
編碼常用注解
5.案例配置
消息生產者
模塊名稱:cloud-stream-rabbitmq-provider8801
沒給的代碼自己到遠程倉庫中找,我只給出重要的代碼
- 建項目
- 寫pom
- 寫yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #需要綁定的rabbitmq的服務信息
defaultRabbit: #定義的名稱,用於binding整合
type: rabbit #消息組件類型
environment: #環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,本次為json
binder: defaultRabbit #設置要綁定的消息服務的具體設置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設置跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 #如果現在超過了5秒的間隔(默認是90秒)
instance-id: send-8801.com # 在信息列表時顯示 主機名稱
prefer-ip-address: true #訪問的路徑變為IP地址
- 主啟動
- service
public interface IMerssageProvider {
String send();
}
//消息的發送者故綁定Source,圖2中有給出
@EnableBinding(Source.class)
public class MessagerProviderImpl implements IMerssageProvider {
//消息是通過通道發送的故引入MessageChannel
@Resource
private MessageChannel output;
@Override
public String send() {
String seral = UUID.randomUUID().toString();
//官網給出的使用
output.send(MessageBuilder.withPayload(seral).build());
System.out.println("-*-*-*-*-serial:" + seral);
return null;
}
}
我沒設置訪問頭
- controller
@RestController
public class ProviderController {
@Resource
private IMerssageProvider merssageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage(){
merssageProvider.send();
return "發送成功!!";
}
}
啟動7001、RabbitMQ和8801后會在RabbitMQ中看到
訪問http://localhost:8801/sendMessage
在后台顯示
表示消息生產者配置成功
消息消費者
模塊名稱:cloud-stream-rabbitmq-consumer8802
- 建項目
- 寫pom
- 寫yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #需要綁定的rabbitmq的服務信息
defaultRabbit: #定義的名稱,用於binding整合
type: rabbit #消息組件類型
environment: #環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,本次為json
binder: defaultRabbit #設置要綁定的消息服務的具體設置
# group: wf #這里和下面的8803項目都把group注釋掉,不然無法得到與下面測試相同的結果
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
- 主啟動
- controller
@Controller
//消費者故綁定Sink
@EnableBinding(Sink.class)
public class StreamConsumerLisnterController {
@Value("${server.port}")
private String serverPort;
//消費者是輸入流故在此聲明,stream會把與生產者同topic的等待發送到有改注解的方法
@StreamListener(Sink.INPUT)
//要接受消息故在形參表明
public void input(Message<String> message){
//生產者的信息使用了MessageBuilder.withPayload(seral).build()
//故在此get就好
String payload = message.getPayload();
System.out.println("消費者1,------>接收到的信息:" + payload +", port:" + serverPort);
}
}
再新建與8802相似的項目8803與8802組成一個集群。就只需要改變yml中的端口號就好。這里不給出具體流程
測試
啟動7001、8801、8802、8803和rabbitMQ
啟動完成查看rabbitMQ
表示配置成功。發送消息
然后發送消息http://localhost:8801/sendMessage觀察后台
會發現8801發送的消息8802和8803都接收到了,這就引出了我們下一個問題
持續消費問題
目前是8802/8803同時都收到了,存在重復消費問題。因為8802和8803屬於同一個集群,故如果有一個消息進來應該是集群中的每一個訪問進行處理,而不是集群中所以訪問都去處理一個問題,如再存錢的時候應該的負責存錢的集群中一個服務來處理錢的存入,而不是所有服務都去處理,不然就會造成輸入一次,存多次錢的問題,這樣是不行的,
如果解決上述的問題,給服務分組即我在上面創建8802的yml中的注釋,因為在stream中如果屬於同一個組那么消息就只能被組中某一個服務得到。
而如果我們不再yml配置中設置組,那么stream就會給該服務設置一個默認的組
解決持續消費問題
給8802和8803添加group屬性(把注釋掉的group放開)
再啟動所有項目訪問 localhost:8801/sendMessage
生產者發送多條消息
消費者進行消費
可以看到重復消費問題被解決了。
消息的持久化問題
只要在yml配置中給服務配置了group屬性那么stream就會自動進行持久化
我們把8802和8803服務都停掉,把兩個服務中8802中的group屬性注釋掉,8803不注釋訪問localhost:8801/sendMessage
然后啟動8802、8803
會發現
8802沒有接受8801發送的消息,而8803接收到了這就是消息的持久化。