一、准備階段
先搭建兩個項目,provider-01-8081和consumer-01-8080,其中provider-01-8081用於提供服務,並操作數據庫,consumer-01-8080用於消費服務,使用 http + ip + 端口的方式進行調用,代碼比較簡單,並且和Spring Cloud沒有關系,就直接上代碼。
provider-01-8081的相關代碼如下:
@RestController @RequestMapping("/depart") @Slf4j public class DepartController { @Autowired private DepartService departService; @PostMapping("/save") public boolean save(@RequestBody Depart depart){ return departService.save(depart); } @PostMapping("/del") public boolean del(int id){ return departService.del(id); } @PostMapping("/update") public boolean update(@RequestBody Depart depart){ return departService.update(depart); } @GetMapping("/find/{id}") public Depart query(@PathVariable int id){ return departService.find(id); } @GetMapping("/findAll") public List<Depart> findAll(){ return departService.findAll(); } }
public interface DepartService { boolean save(Depart depart); boolean del(int id); boolean update(Depart depart); Depart find(int id); List<Depart> findAll(); }
@Service public class DepartServiceImpl implements DepartService { @Autowired private DepartRepository departRepository; @Override public boolean save(Depart depart) { Depart d = departRepository.save(depart); if(d != null){ return true; } return false; } @Override public boolean del(int id) { //如果id不存在,刪除會報錯 if(departRepository.existsById(id)){ departRepository.deleteById(id); } return true; } @Override public boolean update(Depart depart) { //這里save時,如果id為空,則新增;id不為空且id存在,則更新;id不為空且id不存在,仍然為新增,且id仍自動生成。 Depart d = departRepository.save(depart); return d == null ? true:false; } @Override public Depart find(int id) { if(departRepository.existsById(id)){ return departRepository.getOne(id); } return null; } @Override public List<Depart> findAll() { return departRepository.findAll(); } }
@Data @Table @Entity //使用自動建表 @JsonIgnoreProperties({"hibernateLazyInitializer","handler","fieldHandler"}) //使用懶加載 public class Depart { @Id //設置主鍵 @GeneratedValue(strategy = GenerationType.IDENTITY) //設置主鍵自增長 private Integer id; private String name; }
public interface DepartRepository extends JpaRepository<Depart, Integer> { }
server: port: 8081 # 配置spring-data-jpa spring: jpa: # 設置重啟時是否更新表結構 hibernate: ddl-auto: none # 設置是否在容器啟動時創建表,默認為false generate-ddl: true # 設置是否顯示sql,默認為false show-sql: true datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://8.131.245.53:3306/lcltest?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull username: root password: root application: # 當前服務名稱 name: provider-depart # 配置日志 logging: pattern: console: level-%level %msg%n level: root: info # hibernate輸出日志級別 org.hibernate: info # 在show-sql為true時,顯示sql中的動態值 org.hibernate.type.descriptor.sql.BasicBinder: trace # 在show-sql為true時,顯示查詢結果 org.hibernate.hql.internal.ast.exec.BasicExecutor: trace # 設置自己代碼的日志級別 com.lcl.springcloud: debug
consumer-01-8080代碼如下:
@Data public class Depart { private Integer id; private String name; }
@RestController @RequestMapping("/depart") public class DepertApi { @Autowired private RestTemplate restTemplate; private static final String provider_url = "http://localhost:8081"; @PostMapping("/save") public boolean save(@RequestBody Depart depart){ String url = provider_url + "/depart/save"; return restTemplate.postForObject(url, depart, Boolean.class); } @PostMapping("/del") public boolean del(int id){ String url = provider_url + "/depart/del"; return restTemplate.postForObject(url, id, Boolean.class); } @PostMapping("/update") public boolean update(@RequestBody Depart depart){ String url = provider_url + "/depart/update"; return restTemplate.postForObject(url, depart, Boolean.class); } @GetMapping("/find/{id}") public Depart query(@PathVariable int id){ String url = provider_url + "/depart/find/id"; return restTemplate.getForObject(url, Depart.class); } @GetMapping("/findAll") public List<Depart> findAll(){ String url = provider_url + "/depart/findAll"; return restTemplate.getForObject(url, List.class); } }
二、Eureka
(一)Eureka單機搭建
首先說一下SpringCloud和SpringBoot的版本對應關系,如果不對應,會產生異常。
Eureka單機搭建非常簡單,就是在啟動類上增加@EnableEurekaServer注解,其他的就是配置文件的一些配置
@SpringBootApplication @EnableEurekaServer public class Eureka018000Application { public static void main(String[] args) { SpringApplication.run(Eureka018000Application.class, args); } }
配置文件內容如下,都已經做了注釋,就不再多描述。
server: port: 8000 eureka: instance: # 指定eureka主機 hostname: localhost client: # 是否向eureka注冊自己 register-with-eureka: false # 指定客戶端是否能夠獲取eureka注冊信息 fetch-registry: false # 暴露服務中心地址 server-url: # 服務中心地址,寫法等同於:http:localhost:8080/eureka defualtZone: http://${eureka.instance.hostname}:${server.port}/eureka server: # 關閉自我保護,不建議關閉 enable-self-preservation: false # eureka server 剔除不可用服務的時間窗 eviction-interval-timer-in-ms: 4000 # 設置心跳保護開啟的閾值 #renewal-percent-threshold: 0.75 # 指定服務名稱 spring: application: name: eureka-server
<properties> <java.version>1.8</java.version> <spring-cloud.version>2020.0.2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
consumer需要開啟服務發現及Eureka客戶端注冊,然后就可以使用RestTemplate進行調用
@SpringBootApplication @EnableDiscoveryClient //開啟服務發現客戶端 @EnableEurekaClient public class Consumer018080Application { public static void main(String[] args) { SpringApplication.run(Consumer018080Application.class, args); } }
@Configuration public class DepartCodeConfig { @Bean @LoadBalanced //開啟客戶端負載均衡功能 public RestTemplate restTemplate(){ return new RestTemplate(); } }
private static final String provider_url = "http://provider-depart"; @PostMapping("/save") public boolean save(@RequestBody Depart depart){ String url = provider_url + "/depart/save"; return restTemplate.postForObject(url, depart, Boolean.class); }
(二)服務發現
使用DiscoveryClient的getInstances方法獲取實例集合,然后再獲取實例中的相關信息。
@RestController @RequestMapping("/depart") @Slf4j public class DepartController { @Autowired private DepartService departService; @Autowired //聲明服務發現客戶端 private DiscoveryClient client; ...... @GetMapping("/services") public void getServices(){ List list = new ArrayList(); List<String> services = client.getServices(); for (String serviceNanme: services) { List<ServiceInstance> instances = client.getInstances(serviceNanme); for (ServiceInstance instance : instances) { log.info("instance.getHost()==================={}", instance.getHost()); log.info("instance.getInstanceId()==================={}", instance.getInstanceId()); log.info("instance.getServiceId()==================={}", instance.getServiceId()); log.info("instance.getUri()==================={}", instance.getUri()); log.info("instance.getMetadata()==================={}", instance.getMetadata()); log.info("instance.getPort()==================={}", instance.getPort()); log.info("instance.getScheme()==================={}", instance.getScheme()); } } } }
(三)Eureka自我保護機制
Eureka控制台可以看到如下圖所示的紅色字體描述,表示Eureka進入了自我保護模式。
默認情況下,Eureka Server在90秒呢沒有檢測到服務列表中的某服務,就會將該服務從服務列表中剔除。
但是大多數情況下,Eureka Server檢測不到心跳是因為網絡抖動的問題,如果在短時間內網絡恢復正常,則可以正常對外提供服務,但是由於Eureka Server的服務列表中已不存在該服務,因此就無法對外提供服務。
基於以上的情況,如果在短時間內,Eureka Server丟失較多的微服務,即收到的心跳數量小於閾值,為了保證系統的可用性(AP),給由於網絡抖動造成服務不可用的服務以恢復機會,因此就會開啟Eureka的自我保護機制,當Eureka Server收到的心跳數恢復到閾值以上,則會關閉自我保護機制。
自我保護機制默認是開啟的,默認的閾值因子是0.85,可以通過如下配置進行調整,但是不建議調整。
eureka: server: # 關閉自我保護,不建議關閉 enable-self-preservation: false # eureka server 剔除不可用服務的時間窗 eviction-interval-timer-in-ms: 4000 # 設置心跳保護開啟的閾值 renewal-percent-threshold: 0.75
現面截圖中Renews threshold和Renews (last min)就是用來計算是否需要開啟自我保護的。
(四)服務下架及平滑上下線
為Eureka添加actuator監控模塊及添加配置文件,這一塊是SpringBoot的內容。
info: app.version: 1.0 app.auth: lcl app.name: provider-depart management: endpoint: shutdown: # 開啟shutdown功能 enabled: true endpoints: web: exposure: # 開啟所有監控終端 include: "*"
<!--actuator 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
可以使用http://localhost:8081/actuator來查看開放的安全檢查端點,可以使用http://localhost:8081/actuator/info查看在配置文件中配置的info信息,可以使用http://localhost:8081/actuator/shutdown手動停止服務。
可以使用POST請求調用http://localhost:8081/actuator/serviceregistry(這里不同的版本不太一樣,有的版本為serviceregistry,有的版本為service-registry),傳參為{"status":"DOWN"}或{"status":"UP"}對服務做平滑上下線操作。下線時,並非服務不可用,而是通過Eureka標記服務為DOWN狀態,使consumer不可調用。
三、Eureka集群
(一)Eureka集群搭建
對於集群的搭建,主要就是配置文件的不一樣,配置如下,三個實例除了端口號和eureka.instance.hostname不一致外,其他配置項一致。
server: port: 8100 eureka: instance: hostname: eureka8100 client: register-with-eureka: false fetch-registry: false service-url: defaultZone: http://eureka8100:8100, http://eureka8200:8200, http://eureka8300:8300 # defaultZone: http://localhost:8100/eureka, http://localhost:8200/eureka, http://localhost:8300/eureka spring: application: name: eureka-server
對於Eureka8100、Eureka8200、Eureka8300需要設置Host
127.0.0.1 eureka8100 127.0.0.1 eureka8200 127.0.0.1 eureka8300
為什么不使用真實IP呢,因為在同一台機器上模擬Eureka集群,都是使用localhost,那么IP重復,就不會出現其他實例的DS Replicas。
(二)Eureka的異地多活
Eureka中有Region和Availability Zone的概念,各個Region之間內網不通,但是在同一Region中的Zone內網是通的,只要是為了同城容災。拿阿里雲舉例,其有杭州、北京、上海、東南亞等Region。
以下面圖片為例,搭建Eureka的異地多活集群
1、首先配置Region為bj1的Eureka集群,三個實例的配置參數一致,除了端口號。
server: port: 8001 eureka: instance: metadata-map: zone: bj1 client: region: beijing register-with-eureka: false fetch-registry: false availability-zones: beijing: bj1,bj2 service-url: bj1: http://eureka8001:8001, http://eureka8002:8002, http://eureka8003:8003 bj2: http://eureka8004:8004, http://eureka8005:8005, http://eureka8006:8006
2、然后配置Region為bj2的Eureka集群,三個實例的配置參數一致,除了端口號。
server: port: 8004 eureka: instance: metadata-map: zone: bj2 client: region: beijing register-with-eureka: false fetch-registry: false availability-zones: beijing: bj1,bj2 service-url: bj1: http://eureka8001:8001, http://eureka8002:8002, http://eureka8003:8003 bj2: http://eureka8004:8004, http://eureka8005:8005, http://eureka8006:8006
3、配置各個Eureka的Host
127.0.0.1 eureka8001 127.0.0.1 eureka8002 127.0.0.1 eureka8003 127.0.0.1 eureka8004 127.0.0.1 eureka8005 127.0.0.1 eureka8006
4、服務提供者配置參數
eureka: client: # 指定區域名稱 region: beijing # 指定區域中所包含的地帶zoned availability-zones: beijing: bj1,bj2 # 指定各個地帶zone中所包含的eureka server地址 service-url: bj1: http://eureka8001:8001/eureka, http://eureka8002:8002/eureka, http://eureka8003:8003/eureka bj2: http://eureka8004:8004/eureka, http://eureka8005:8005/eureka, http://eureka8006:8006/eureka # 指定要連的注冊中心 fetch-remote-regions-registry: beijing instance: # 服務提供者id instance-id: provider-depart-01-8081 # 設置當前cleit每1秒向server發送一次心跳 lease-renewal-interval-in-seconds: 1 # 設置超過3秒即認為失效 lease-expiration-duration-in-seconds: 3 metadata-map: zone: bj1
四、OpenFeign
OpenFeign可以將提供者提供的Restful服務偽裝成接口調用,消費者只需要使用“feign接口 + 注解”的方式即可調用提供者提供的Restful服務,而無需再使用RestTemplate。
Feign和OpenFeign:在Spring Cloud D之前的版本使用的是feign,而該項目現在已更新為OpenFeign,所以后續的依賴也發生了變化。
(一)OpenFeign使用
與原來使用RestTemplate項目的差異:
1、pom依賴
<!--feign 依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>
2、不再配置RestTemplate
/* @Configuration public class DepartCodeConfig { @Bean @LoadBalanced //開啟客戶端負載均衡功能 public RestTemplate restTemplate(){ return new RestTemplate(); } } */
3、創建一個接口
該接口的參數與請求地址要是服務提供者保持一致,同時使用@FeignClient標記接口為Feign客戶端,參數為服務提供者的服務名稱(spring.application.name),使用@RequestMapping標記請求服務提供者的url
@FeignClient("provider-depart") @RequestMapping("/provider/depart") public interface DepartService { @PostMapping("/save") boolean save(Depart depart); @PostMapping("/del") boolean del(int id); @PostMapping("/update") boolean update(Depart depart); @GetMapping("/find/{id}") Depart query(int id); @GetMapping("/findAll") List<Depart> findAll(); }
4、啟動類上使用@EnableFeignClients開啟Feign客戶端,並指定Service所在的包
@SpringBootApplication @EnableEurekaClient @EnableFeignClients("com.lcl.springcloud.consumer01.service") public class ConsumerFeign018080Application { public static void main(String[] args) { SpringApplication.run(ConsumerFeign018080Application.class, args); } }
5、調整調用,不再使用RestTemplate,直接調用新增的Feign客戶端接口
@RestController @RequestMapping("/depart") public class DepertApi { @Autowired private DepartService departService; @PostMapping("/save") public boolean save(@RequestBody Depart depart){ return departService.save(depart); } @PostMapping("/del") public boolean del(int id){ return departService.del(id); } @PostMapping("/update") public boolean update(@RequestBody Depart depart){ return departService.update(depart); } @GetMapping("/find/{id}") public Depart query(@PathVariable int id){ return departService.query(id); } @GetMapping("/findAll") public List<Depart> findAll(){ return departService.findAll(); } }
說明:
消費者和服務者的調用url不能完全一致,否則會出現方法已存在的異常(There is already xxx bean method)。
示例:
@FeignClient("provider-depart") @RequestMapping("/depart") public interface DepartService { @PostMapping("/save") boolean save(Depart depart);
@RestController @RequestMapping("/depart") public class DepertApi { @Autowired private DepartService departService; @PostMapping("/save") public boolean save(@RequestBody Depart depart){ return departService.save(depart); }
Caused by: java.lang.IllegalStateException: Ambiguous mapping. Cannot map 'com.lcl.springcloud.consumer01.service.DepartService' method com.sun.proxy.$Proxy101#save(Depart) to {POST [/depart/update]}: There is already 'depertApi' bean method
(二)超時與壓縮配置
Feign連接提供者和對於提供者的調用均可設置超時時間限制,同時也可以配置壓縮信息。
feign: client: config: default: connectTimeout: 5000 # 指定feign連接提供者的超時時間 readTimeout: 5000 # 指定feign從請求到獲取響應的超時時間 compression: request: enabled: true # 開啟對請求的壓縮 # 指定對哪些文件類型進行壓縮 mime-types: ["text/xml","application/xml","application/json"] # 開啟壓縮的文件大小 min-request-size: 2048 response: enabled: true
五、負載均衡Ribbon
這里首先說明一下,上面的Spring Cloud版本是2020.0.2,其對應的Spring Boot版本是2.4.X,因此我們使用了2.4.4,但是由於在2020.X.X之后的版本,Spring Cloud移除了Ribbon、Hystrix、Zuul,后續的演示Ribbon、Hystrix、Zuul,就需要將Spring Boot的版本改為Hoxton.SR6,那么對應的Spring Boot版本也需要改為對應的2.3.X,因此我們這里使用了2.3.1.RELEASE。
(一)Ribbon配置
Ribbon內置的負載均衡策略有如下七種。
負載均衡策略 | 策略名稱 | 描述 |
RoundRobinRule | 輪詢策略 | Ribbon默認的策略,如果經過一輪輪詢都沒有找到可用的provider,最多輪詢10次,如果最終還是沒有可用的provider,則返回null |
RandomRule | 隨機策略 | 從所有可用的provider中隨機選擇一個 |
RetryRule | 重試策略 | 先采用RoundRobinRule進行獲取provider,如果獲取provider失敗,則在一定時間內進行重試。默認時間為500ms。 |
BestAvailableRule | 最可用策略 | 選擇並發最小的provider,即選擇連接數最小的provider |
AvailabilityFilteringRule | 可用過濾算法策略 | 1、先采用輪詢策略選擇一個provider,然后判斷其是否處於熔斷狀態或超過最大連接數,如果不超過,則返回該provider,如果超過,則再輪詢下一個,最大可輪詢10 2、若10次輪詢仍沒有找到可用的provider,則將所有的provider都做一次判斷,挑選出未熔斷且未超過連接限制的provider,然后在采用輪詢的方式選擇一個,如果沒有未熔斷且未超過鏈接限制的provider,則返回null |
ZoneAvoidancerRule | 回避策略 | 根據provider所在zone及provider的可用性,對provider進行選擇 |
WeightedResponseTimeRule | 權重響應時間 | 根據provider的平均響應時間選擇,相應時間越小,被選中的概率就越大。在服務器剛啟動時,是以哦那個輪詢策略,后面可以使用權重。 |
如果想要修改Ribbon的輪詢策略,可以使用配置文件和配置類兩種方式。
1、導包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> </parent> <groupId>com.lcl.springcloud</groupId> <artifactId>consumer-Hystrix-01-8080</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer-Hystrix-01-8080</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR6</spring-cloud.version> </properties> <dependencies> ....... <dependency> <groupId>com.netflix.ribbon</groupId> <artifactId>ribbon</artifactId> <version>2.7.18</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-ribbon</artifactId> </dependency> </dependencies> ......
2、配置文件方式:
# 負載均衡策略 consumer-depart: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
3、配置類方式
@Configuration public class DepartCodeConfig { /*@Bean @LoadBalanced //開啟客戶端負載均衡功能 public RestTemplate restTemplate(){ return new RestTemplate(); }*/ @Bean public IRule loadBalanceRule(){ return new RandomRule(); } }
(二)自定義Ribbon
實現自定義Ribbon主要分為兩步,第一步就是寫出自己對於IRule的實現類,第二步就是配置自己的IRule實現,由於編碼量的問題,先說第二步,例如我寫的IRule實現類為MyRule,因此需要配置MyRule,配置方式就是上面說的使用配置類或者配置文件。
@Configuration public class DepartCodeConfig { @Bean public IRule loadBalanceRule(){ return new MyRule(); } }
然后就是寫MyRule,其需要實現IRule接口,同時需要實現接口的獲取ILoadBalancer的get、set方法,為了簡化,直接使用了@Data注解,然后還需要實現choose方法,這里就是具體的如何選擇Server的方法。
@NoArgsConstructor @AllArgsConstructor @Data public class MyRule implements IRule { private ILoadBalancer loadBalancer; private List<Integer> excludePorts; @Override public Server choose(Object o) { //獲取所有provider List<Server> serverList = loadBalancer.getAllServers(); //排除指定provider List<Server> availableServer = getAvailableServer(serverList); //從剩余的provider中選擇 return getAvailableRandomServer(availableServer); } private Server getAvailableRandomServer(List<Server> availableServer) { int index = new Random().nextInt(availableServer.size()); return availableServer.get(index); } private List<Server> getAvailableServer(List<Server> serverList) { if(this.excludePorts == null || this.excludePorts.size() == 0){ return serverList; } List<Server> availableServer = new ArrayList(); for (Server server : serverList) { if(!this.excludePorts.contains(server.getPort())){ availableServer.add(server); } } return availableServer; } }
六、Hystrix服務熔斷與服務降級
使用Hystrix主要就是用來做服務熔斷和服務降級的。
其中服務熔斷主要是對於服務雪崩的一個有效解決方案,常見的熔斷方案有 預熔斷 和 即時熔斷 兩種。
而服務降級則是請求發生異常時,對用戶體驗的一種增強。
其是一種開關裝置,在消費端安裝一個Hystix熔斷器,當Hystrix監控到某個服務發生故障后,就會將服務訪問鏈路斷開,不過Hystix並不會將服務的消費者阻塞或拋出異常,而是向消費者返回一個符合逾期的備選響應方案。
通過Hystrix的熔斷和降級功能,避免了服務雪崩的發生,同時也考慮了用戶體驗,因此Hystrix是系統的一種防御機制。
(一)fallbackMethod服務降級
1、導包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> <version>2.2.7.RELEASE</version> </dependency>
2、開啟斷路器
開啟斷路器使用@EnableCircuitBreaker注解,但是由於該服務是SpringBoot的消費者,同時需要配置SpringBoot啟動配置注解@SpringBootApplication、服務注冊與發現注解@EnableDiscoveryClient、斷路器注解@EnableCircuitBreaker,比較麻煩,因此Spring提供了一個三合一的注解@SpringCloudApplication
//@SpringBootApplication //@EnableCircuitBreaker //@EnableDiscoveryClient @EnableFeignClients("com.lcl.springcloud.consumer01.service") @SpringCloudApplication public class ConsumerHystrix018080Application { public static void main(String[] args) { SpringApplication.run(ConsumerHystrix018080Application.class, args); } }
3、修改處理器
主要就是在調用方法上添加@HystriCommand注解,並在注解中添加服務降級方法fallBackMethod
@RestController @RequestMapping("/hystrix/depart") public class DepertApi { @Autowired private DepartService departService; @HystrixCommand(fallbackMethod = "failMethod") @GetMapping("/findAll") public List<Depart> findAll(){ return departService.findAll(); } public List<Depart> failMethod(){ Depart arrays[] = {Depart.builder().id(123).name("abc").build(), Depart.builder().id(456).name("def").build()}; return Arrays.asList(arrays); } }
(二)fallBackFactory服務降級
1、創建一個FallBackFactory實現類,泛型使用feign客戶端,然后重新其中的方法
@Component @Slf4j public class DepartFallBackFactory implements FallbackFactory<DepartService> { @Override public DepartService create(Throwable throwable) { return new DepartService() { @Override public boolean save(Depart depart) { log.info("=========執行保存異常服務降級處理========"); return false; } @Override public List<Depart> findAll() { log.info("=========執行查詢異常服務降級處理========"); Depart arrays[] = {Depart.builder().id(0001).name("demo").build(), Depart.builder().id(0002).name("kkk").build()}; return Arrays.asList(arrays); } }; } }
2、修改feign客戶端
修改feign客戶端,使用fallbackFactory參數指定降級工廠
@FeignClient(value="pd", fallbackFactory= DepartFallBackFactory.class) @RequestMapping("/provider/depart") public interface DepartService { @PostMapping("/save") boolean save(Depart depart); @GetMapping("/findAll") List<Depart> findAll(); }
3、開啟feign對於Hystrix的支持
這里踩了一個大坑,一直演示不出來使用feign調用最終降級的效果,最后才發現沒有配置該參數,該參數默認為false。
feign: hystrix: enabled: true
這里說明一點,如果同時存在fallbackFactory和fallbackMethod的情況,fallbackFactory會失效。
(三)fallback服務降級
1、feign客戶端的降級實現類
@Component @Slf4j @RequestMapping("/fallback") public class DepartFallBack implements DepartService { @Override public boolean save(Depart depart) { log.info("=========執行保存異常服務降級處理========"); return false; } @Override public List<Depart> findAll() { Depart arrays[] = {Depart.builder().id(20000).name("DepartFallBack").build(), Depart.builder().id(20001).name("DepartFallBack").build()}; return Arrays.asList(arrays); } }
2、修改feign客戶端,使用fallback引用上面的降級實現類
@FeignClient(name="pd", fallback= DepartFallBack.class) @RequestMapping("/provider/depart") @Service public interface DepartService { @PostMapping("/save") boolean save(Depart depart); @RequestMapping(value = "/findAll", method = RequestMethod.GET) List<Depart> findAll(); }
3、調用
這里說一點,由於第一步中使用了@Component注解,因此Spring會主動加載類,同時,第二步中的feign客戶端也會被Spring加載,如果不加@Service注解,idea就會在@Autowired修飾的departService顯示錯誤信息:有相同的兩個類,但是這並不影響系統運行,不影響的原因,可以自己看一下Spring的原理,這里就不再多說。
為了好看,就在第二步中加了@Service注解,讓Depert3Api中不顯示錯誤信息
@RestController @RequestMapping("/v3/hystrix/depart") public class Depert3Api { @Autowired private DepartService departService; @GetMapping("/findAll") public List<Depart> findAll(){ return departService.findAll(); } }
(四)Hystrix高級屬性配置
1、執行隔離策略
對依賴的請求數量進行限制即被稱為執行隔離,執行隔離有兩大作用,即防止服務熔斷和防止服務雪崩。
隔離請求的方式有兩種,分別是線程隔離和信號量隔離。
線程隔離:線程隔離是Hystrix的默認隔離策略,系統會創建一個依賴線程池,為每個依賴請求分配一個獨立的線程,而每個依賴所擁有的線程數量是有上限的,當對該依賴的調用請求數量達到上限后,如果再有請求,則該請求會被阻塞。所以對某依賴的並發量取決於該依賴所分配的線程數量。
信號量隔離:對依賴的調用所使用的線程仍為請求線程,即不會為依賴請求再創建新的線程,但系統會為每種依賴分配一定數量的信號量,而每個依賴請求分配一個信號號,當對該依賴的調用請求數量達到上限后,如果再有請求,則該請求會被阻塞。所以對某依賴的並發量取決於為該依賴所分配的信號數量。
同樣,配置隔離策略也是有配置文件和直接代碼設置
配置文件:
hystrix: command: default: execution: isolation: # 值可以設置為:thread、semaphore strategy: thread
代碼
@HystrixCommand(fallbackMethod = "failMethod", commandProperties = {@HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE")}) @GetMapping("/findAll") public List<Depart> findAll(){ return departService.findAll(); }
2、執行隔離其他屬性
線程執行超時時限:在默認的線程執行隔離策略中,關於線程的執行時間,可以為其設置超時時間。在設置超時時間前,需要先開啟超時時間,該屬性默認是開啟的。
超時中斷:當線程執行超時時是否終端線程的執行,默認為true,即超時終端。
取消中斷:在線程執行過程中,若請求取消了,當前執行線程是否會結束,默認為false。
信號量數量:采用信號量隔離策略,則可以通過以下屬性修改信號量的數量,即對某一依賴所允許的請求的最高並發量。
相關配置內容如下代碼所示:
hystrix: command: default: execution: timeout: # 開啟超時控制 默認為true enabled: true isolation: # 設置隔離策略:值可以設置為:thread、semaphore strategy: thread thread: # 設置超時時間,默認為1000ms timeoutInMilliseconds: 1000 # 開啟超時終端,默認為true interruptOnTimeout: true # 取消中斷 默認為false interruptOnCancel: true semaphore: # 設置信號量數量 maxConcurrentRequests: 20
3、服務熔斷屬性
熔斷功能開關:設置當前應用是否開啟熔斷器功能
熔斷器開啟閾值:當在時間窗內收到的請求數量超過該設置的閾值后,將開啟熔斷器,默認值為20.如果開啟熔斷器,則拒絕所有請求。
熔斷時間窗:當熔斷器開啟開啟該屬性設置的時長后,會嘗試關閉熔斷器,以恢復被熔斷的服務,默認值為5000ms
熔斷開啟錯誤率:當請求的錯誤率高於該百分比時,開啟熔斷器,默認為50,即50%
強制開啟斷路器:設置熔斷器無條件開啟,拒絕所有請求,默認為false
強制關閉熔斷器:設置熔斷器無條件關閉,通過所有請求,默認為false
hystrix: command: default: circuitBreaker: # 設置熔斷器開關,默認為true enabled: true # 設置熔斷器開啟閾值,默認為20 requestVolumeThread: 20 # 熔斷時間窗,默認為5000ms sleepWindowInMilliseconds: 5000 # 熔斷器開啟錯誤率 errorThresholdPercentage: 50 # 強制開啟熔斷器,默認為false forceOpen: false # 強制關閉熔斷器,默認為false forceClose: false
(五)Hystrix儀表盤
1、添加監控依賴和Hystrix儀表盤依賴
<!--actuator 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- hystrix-dashboard 依賴 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> <version>2.2.7.RELEASE</version> </dependency>
2、開啟actuator的所有web終端
management: endpoint: web: exposure: include: "*"
3、啟動類開啟儀表盤
@SpringCloudApplication @EnableHystrixDashboard @EnableHystrix public class ConsumerHystrixDashbord9000Application { public static void main(String[] args) { SpringApplication.run(ConsumerHystrixDashbord9000Application.class, args); } @Bean public ServletRegistrationBean getServlet(){ HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet(); ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet); registrationBean.setLoadOnStartup(1); registrationBean.addUrlMappings("/actuator/hystrix.stream"); registrationBean.setName("HystrixMetricsStreamServlet"); return registrationBean; } }
4、監控簡述
啟動項目,訪問http://localhost:9000/hystrix即可
然后填上http://localhost:9000/actuator/hystrix.stream即可監控,如果監控一直處於loading狀態,可以參照https://www.cnblogs.com/liconglong/p/14698763.html進行解決。關於監控信息,描述如下
5、Turbine
上面說的都是對於單體應用的監控,我們也可以對集群進行整體監控,此時就需要使用Turbine技術了。Turbine能夠匯集監控信息,並將聚合的信息提供給Hystrix Dashboard來集中展示和監控。
使用Turbine對集群進行監控的實現步驟很簡單,只有三步:
(1)導入Turbine依賴
(2)在配置文件中配置Turbine
(3)在啟動類上添加@EnableTurbine注解
(六)服務降級預警
服務降級要盡快的通知相應人員去解決問題,那么這里就應用redis去進行響應的預警操作。
1、導入redis包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2、配置redis
spring: redis: host: 8.131.245.53 port: 6388 password:
3、寫一個預警實現類
@Slf4j @Component public class HystrixAlarm { private final String keySuffix = "_getDepart_fallback"; @Autowired private StringRedisTemplate template; private ForkJoinPool forkJoinPool = new ForkJoinPool(5); public Depart getHystrixHandle(HttpServletRequest request, int id){ String ip = request.getLocalAddr(); String key = ip +keySuffix; alarm(key); return Depart.builder().id(id).name("錯誤").build(); } private void alarm(String key){ BoundValueOperations<String, String> ops = template.boundValueOps(key); String value = ops.get(); if(value == null){ synchronized (this){ value = ops.get(); if(value == null){ sendMsgOrMail(key); value = "已發生服務降級"; ops.set(value, 10, TimeUnit.SECONDS); } } } } private void sendMsgOrMail(String key){ forkJoinPool.submit(()->{ log.info("發送服務異常告警短信或郵件===={}", key); }); } }
4、在降級方法中調用該方法
@HystrixCommand(fallbackMethod = "failMethod2", commandProperties = {@HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE")}) @GetMapping("/find/{id}") public Depart find(HttpServletRequest request, @PathVariable int id){ return departService.query(id); } public Depart failMethod2(HttpServletRequest request, int id){ return hystrixAlarm.getHystrixHandle(request, id); }
這里可以使用自定義注解、攔截器、切面等方式去統一處理。
七、微服務網關Zuul
Zuul主要提供了對請求的路由和過濾功能。
路由:將外部請求轉發到具體的微服務上,是外部訪問微服務的統一入口。
過濾:對請求的處理過程進行干預,對請求進行校驗、鑒權等處理
官方給出的圖片如下:
對上面的圖進行進一步抽象:
(一)項目搭建
這里需要四個項目來演示,一個Eureka Server、兩個服務提供者、一個Zuul,其中Eureka就直接使用前面的項目,而兩個服務提供者就直接復制原來的項目,就是一個簡單的單體服務,提供了一個 /hystrix/depart/findAll的路徑方法,獲取到對應的數據;對於配置,有以下幾點說明一下(跟zuul沒太大關系,主要是為了下面演示)
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8080 server: port: 8080
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8090 server: port: 8090
然后就是搭建Zuul:
1、pom依賴,主要依賴Eureka和Zuul即可(由於在2020.X版本的SpringCloud中,已經砍掉了Zuul,所以這里的版本仍然沿用上面Ribbon中的版本)
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-zuul</artifactId> </dependency> </dependencies>
2、開啟Zuul代理模式
@SpringCloudApplication @EnableZuulProxy public class Zuul011000Application { public static void main(String[] args) { SpringApplication.run(Zuul011000Application.class, args); } }
3、配置文件
eureka: client: service-url: defaultZone: http://localhost:8000/eureka spring: application: name: zuul-01-1000 server: port: 1000
到這里,基本環境已經搭建完畢,可以進行驗證,首先訪問原項目的接口,http://localhost:8080/hystrix/depart/findAll 或者 http://localhost:8090/hystrix/depart/findAll,都可以正常訪問,說明原項目沒有問題
那么,我們就可以驗證zuul了。如果沒有配置路由策略,則默認使用注冊到Eureka中的服務名進行路由,首先從上面兩個服務提供者配置的服務名分別為c-8080和c-8090,那么分別訪問http://localhost:1000/c-8090/hystrix/depart/findAll 和 http://localhost:1000/c-8080/hystrix/depart/findAll (這里服務名中大小寫有區分,要嚴格按照配置文件中配置的進行訪問),均可正常訪問,此時,基本環境驗證完畢。
(二)項目路由
1、路由策略配置
其實上面已經是做了基本的項目路由了,因為在我們不配置路由規則時,Zuul就是默認以微服務名稱進行路由的。這樣直接將服務名稱暴露,會有安全問題,所以我們要自己設定路由規則,如下代碼所示,就是將微服務名稱映射為指定的訪問路徑,那么訪問路徑就可以由原來的 http://localhost:1000/c-8090/hystrix/depart/findAll 變為 http://localhost:1000/v2/hystrix/depart/findAll
zuul: routes: c-8080: /v1/** c-8090: /v2/**
2、路由前綴
在配置路由策略時,可以為路由路徑配置一個統一的前綴,以便請求歸類。
如下代碼所示,就可以由上面的 http://localhost:1000/v2/hystrix/depart/findAll 改為 http://localhost:1000/zuulpre/v2/hystrix/depart/findAll 進行調用了。
這里特別說明一點:路由前綴不能是/zuul,如果設置為/zuul,則會導致不能路由的問題,至於為什么不能設置為/zuul,后面有時間可以寫一下其源碼分析。
zuul: routes: c-8080: /v1/** c-8090: /v2/** prefix: /zuulpre
3、服務名屏蔽
上面的配置中,雖然可以使用自己設置的路由規則進行訪問,但是其仍然可以使用默認的微服務名稱訪問,為了防止服務侵入,可以將服務名稱屏蔽。
可以使用 * 屏蔽所有微服務名稱,也可以屏蔽指定微服務名稱。
zuul: # 路由規則 routes: c-8080: /v1/** c-8090: /v2/** # 路由前綴 prefix: /zuulpre # 屏蔽微服務名稱 ignored-services: "*"
4、屏蔽路徑
可以指定屏蔽掉的路徑url,即只要用戶請求中包含指定的url路徑,那么該請求將午發訪問到指定的服務。
通過該方式可以限制用戶的權限。
以下配置,如果路徑中包含list,則不能被訪問
zuul: # 路由規則 routes: c-8080: /v1/** c-8090: /v2/** # 路由前綴 prefix: /zuulpre # 屏蔽微服務名稱 ignored-services: "*" # 屏蔽指定url ignored-patterns: /**/list/**
5、敏感請求頭屏蔽
默認情況下,像Cookie、Set-Cookie等敏感請求頭信息會被Zuul屏蔽掉,我們可以將這些默認屏蔽去掉,同時也可以添加要屏蔽的請求頭。
在Zuul要調用的項目中,增加對於請求頭中內容的輸出,這里與敏感請求頭屏蔽沒有關系,主要是為了驗證
@GetMapping("/findAll2") public List<Depart> findAll(HttpServletRequest request){ log.info("request.getHeader(\"Token\")========={}", request.getHeader("Token")); log.info("request.getHeader(\"Set-Cookie\")========={}", request.getHeader("Set-Cookie")); log.info("request.getHeader(\"aaa\")========={}", request.getHeader("aaa")); log.info("request.getHeader(\"bbb\")========={}", request.getHeader("bbb")); return departService.findAll(); }
調整Zuul的配置文件
zuul: # 路由規則 routes: c-8080: /v1/** c-8090: /v2/** # 路由前綴 prefix: /zuulpre # 屏蔽微服務名稱 ignored-services: "*" # 屏蔽指定url ignored-patterns: /**/list/** # 指定屏蔽請求頭中信息 sensitive-headers: Token,aaa
調用方法輸出結果如下:
2021-04-28 00:28:15.868 INFO 2456 --- [nio-8091-exec-3] c.l.s.consumer01.api.DepertApi : request.getHeader("Token")=========null 2021-04-28 00:28:15.868 INFO 2456 --- [nio-8091-exec-3] c.l.s.consumer01.api.DepertApi : request.getHeader("Set-Cookie")=========Set-Cookie123 2021-04-28 00:28:15.868 INFO 2456 --- [nio-8091-exec-3] c.l.s.consumer01.api.DepertApi : request.getHeader("aaa")=========null 2021-04-28 00:28:15.868 INFO 2456 --- [nio-8091-exec-3] c.l.s.consumer01.api.DepertApi : request.getHeader("bbb")=========bbb123
6、負載均衡
例如同時搭建兩個微服務,微服務名稱一樣(這里要注意,實例名稱如果設置的話,一定不能一樣,否則Eureka會使用后啟動的服務將先啟動的服務覆蓋掉)
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8090 spring: application: name: c-8090 server: port: 8090
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8091 spring: application: name: c-8090 server: port: 8091
Zuul的配置文件仍然和之前配置一樣
zuul: # 路由規則 routes: c-8080: /v1/** c-8090: /v2/** # 路由前綴 prefix: /zuulpre # 屏蔽微服務名稱 ignored-services: "*" # 屏蔽指定url ignored-patterns: /**/list/** # 指定屏蔽請求頭中信息 sensitive-headers: Token,aaa
然后就可以了,此處默認使用輪詢負載均衡算法,也可以設置不同的負載均衡算法,設置方式可以參考Ribbon的設置,一摸一樣。
7、服務降級
當消費者調用服務提供者時由於各種原因出現午發調用的情況時,消費者可以進行服務降級,同樣的,通過網關調用消費者無法調用時,也是可以設置降級的。
主要就是配置服務降級的實現類,需要實現FallbackProvider接口,同時需要將該實現類注冊到Spring中。其中getRoute是設置降級的服務,可以設置具體的服務名,也可以設置為 * ,如果設置為 * ,則表示所有的服務都按該降級處理類處理(除了已經有單獨配置的服務,也就是說,單獨配置和 * 同時存在,以單獨配置的為主),然后ClientHttpResponse 中主要是配置該降級方案返回的狀態碼,描述等內容
@Component public class ConsumerFallback implements FallbackProvider { @Override public String getRoute() { return "c-8090"; } @Override public ClientHttpResponse fallbackResponse(String route, Throwable cause) { return new ClientHttpResponse() { @Override public HttpStatus getStatusCode() throws IOException { // 返回狀態常量 503 return HttpStatus.SERVICE_UNAVAILABLE; } @Override public int getRawStatusCode() throws IOException { // 返回狀態常量 503 return HttpStatus.SERVICE_UNAVAILABLE.value(); } @Override public String getStatusText() throws IOException { //返回狀態碼對應短語 return HttpStatus.SERVICE_UNAVAILABLE.getReasonPhrase(); } @Override public void close() { } @Override public InputStream getBody() throws IOException { //設置降級信息 String msg = "發生降級:" + ConsumerFallback.this.getRoute(); return new ByteArrayInputStream(msg.getBytes()); } @Override public HttpHeaders getHeaders() { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); return headers; } }; } }
(三)請求過濾
在服務路由的前、中、后都可以對請求進行過濾,使其只能訪問他應該訪問的資源,從而增強安全性。此時需要通過ZuulFilter過濾器來實現對外服務的安全控制。
例如,對於指定服務的調用,必須有user信息,且user已登錄才可訪問,下面的代碼主要演示對於c-9090服務的調用必須要有user信息,否則不通過,其他服務不校驗。
這個寫法比較簡單,就是需要定義一個RouteFilter類,並繼承ZuulFilter類,重寫其中的方法,方法包括過濾執行位置、過濾執行順序、過濾成功需要處理的邏輯、過濾的邏輯。
@Component @Slf4j public class RouteFilter extends ZuulFilter { @Override public String filterType() { //路由執行位置:指定路由之前執行 return FilterConstants.PRE_TYPE; } @Override public int filterOrder() { // 路由執行順序:系統最小值為-3,設置-5說明在所有的系統執行前執行 return -5; } //對請求進行過濾的核心邏輯 @Override public boolean shouldFilter() { //獲取請求上下文 RequestContext currentContext = RequestContext.getCurrentContext(); //獲取請求 HttpServletRequest request = currentContext.getRequest(); //獲取請求路徑和user信息 String requestURI = request.getRequestURI(); String user = request.getParameter("user"); //校驗邏輯 if(requestURI.contains("/v2") && StringUtils.isBlank(user)){ log.warn("訪問/v2時用戶不能為空"); //指定當前請求未通過Zuul過濾,默認值為true RequestContext.getCurrentContext().setSendZuulResponse(false); //設置返回狀態碼 RequestContext.getCurrentContext().setResponseStatusCode(HttpStatus.SC_UNAUTHORIZED); return false; } return true; } //校驗通過的執行邏輯 @Override public Object run() throws ZuulException { log.info("=====校驗通過====="); return null; } }
(四)令牌桶限流
令牌桶算法就是使用一個令牌桶給請求方法令牌,只有拿到令牌時,請求才可以通過,否則請求就不能通過,那么令牌桶限流就是控制令牌的產生數量和頻率,從而進行限流,最終是為了避免系統遭受雪崩之災。
令牌桶算法中控制令牌的方式有多種,可以使用redis對每分鍾請求的次數做計算,從而進行限流,我們這里使用Guava的RateLimiter來進行限流。
代碼就直接在上面的過濾類中寫了,只增加對限流的處理即可。
//每秒生成2個令牌 private static final RateLimiter RATE_LIMITER = RateLimiter.create(2);//對請求進行過濾的核心邏輯 @Override public boolean shouldFilter() { .....if(!RATE_LIMITER.tryAcquire()){ log.warn("請求超載,每秒只可訪問{}次", RATE_LIMITER.getRate()); RequestContext.getCurrentContext().setSendZuulResponse(false); RequestContext.getCurrentContext().setResponseStatusCode(HttpStatus.SC_CONFLICT); return false; } ..... }
(五)多維請求限流
使用令牌桶的限流粒度比較大,我們可以使用spring-cloud-zuul-ratelimit提供更細粒度的限流策略。
首先,復制上面的項目,在新的項目中首先需要刪除RouteFilter。
然后添加依賴
<!-- spring-cloud-zuul-ratelimit 依賴 --> <dependency> <groupId>com.marcosbarbero.cloud</groupId> <artifactId>spring-cloud-zuul-ratelimit</artifactId> <version>2.0.5.RELEASE</version> </dependency>
修改配置文件,配置信息都是做什么的,下面代碼中的注釋已經寫的很清楚了
zuul: # 路由規則 routes: c-8080: /v1/** c-8090: /v2/** # 對限流策略進行配置 ratelimit: # 開啟限流 enabled: true # 設置限流策略 default-policy: # 限流單位時間窗口 refresh-interval: 3 # 在指定單位窗口內啟動限流的限定值 limit: 3 # 指定限流的時間窗口數量 quota: 1 # 指定限流檢查對象類型 # user:針對用戶的限流,對單位時間窗內經過該網關的用戶數量進行限制 # origin:針對客戶端IP的限流,對單位時間窗內經過該網關的IP數量進行限制 # url:針對請求URL的限流,對單位時間窗內經過該網關的URL數量進行限制 type: user,origin,url
如果想要展示錯誤頁面,可以在resources目錄下創建public.error目錄,然后在里面創建429.html文件,這些路徑和名稱都是必須如此命名的,因為zuul限流成功后默認返回的狀態碼就是429.
(六)灰度發布
介紹:
在一般情況下,升級服務器端應用,需要將應用源碼或程序包上傳到服務器,然后停止掉老版本服務,再啟動新版本。但是這種簡單的發布方式存在兩個問題,一方面,在新版本升級過程中,服務是暫時中斷的,另一方面,如果新版本有BUG,升級失敗,回滾起來也非常麻煩,容易造成更長時間的服務不可用。
為了解決這些問題,人們研究出了多種發布策略。
首先是藍綠發布,所謂藍綠部署,是指同時運行兩個版本的應用,如上圖所示,藍綠部署的時候,並不停止掉老版本,而是直接部署一套新版本,等新版本運行起來后,再將流量切換到新版本上。但是藍綠部署要求在升級過程中,同時運行兩套程序,對硬件的要求就是日常所需的二倍,比如日常運行時,需要10台服務器支撐業務,那么使用藍綠部署,你就需要購置二十台服務器。
然后是滾動發布,所謂滾動升級,就是在升級過程中,並不一下子啟動所有新版本,是先啟動一台新版本,再停止一台老版本,然后再啟動一台新版本,再停止一台老版本,直到升級完成,這樣的話,如果日常需要10台服務器,那么升級過程中也就只需要11台就行了。
滾動發布能夠解決掉藍綠部署時對硬件要求增倍的問題,但是滾動升級有一個問題,在開始滾動升級后,流量會直接流向已經啟動起來的新版本,但是這個時候,新版本是不一定可用的,比如需要進一步的測試才能確認。那么在滾動升級期間,整個系統就處於非常不穩定的狀態,如果發現了問題,也比較難以確定是新版本還是老版本造成的問題。
為了解決這個問題,就出現了灰度發布,灰度發布也叫金絲雀發布,起源是,礦井工人發現,金絲雀對瓦斯氣體很敏感,礦工會在下井之前,先放一只金絲雀到井中,如果金絲雀不叫了,就代表瓦斯濃度高。
在灰度發布開始后,先啟動一個新版本應用,但是並不直接將流量切過來,而是測試人員對新版本進行線上測試,啟動的這個新版本應用,就是我們的金絲雀。如果沒有問題,那么可以將少量的用戶流量導入到新版本上,然后再對新版本做運行狀態觀察,收集各種運行時數據,如果此時對新舊版本做各種數據對比,就是所謂的A/B測試。
當確認新版本運行良好后,再逐步將更多的流量導入到新版本上,在此期間,還可以不斷地調整新舊兩個版本的運行的服務器副本數量,以使得新版本能夠承受越來越大的流量壓力。直到將100%的流量都切換到新版本上,最后關閉剩下的老版本服務,完成灰度發布。
如果在灰度發布過程中(灰度期)發現了新版本有問題,就應該立即將流量切回老版本上,這樣,就會將負面影響控制在最小范圍內。
使用:
生產環境中,可以實現灰度發布的技術很多,我們這里要講的是 zuul 對於灰度發布的實現。而其實現是基於 Eureka 元數據的。Eureka 元數據是指,Eureka 客戶端向 Eureka Server 中注冊時的描述信息。有兩種類型的元數據,分別是標准元數據和自定義元數據。
首先,在兩個consumer上進行配置元數據
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8090 metadata-map: gray-test: gray ...... server: port: 8090
eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: instance-id: c-8091 metadata-map: gray-test: running ...... server: port: 8091
然后在zuul中配置依賴
<dependency> <groupId>io.jmnarloch</groupId> <artifactId>ribbon-discovery-filter-spring-cloud-starter</artifactId> <version>2.1.0</version> </dependency>
修改過濾器
@Component @Slf4j public class RouteFilter extends ZuulFilter { ......//對請求進行過濾的核心邏輯 @Override public boolean shouldFilter() { //獲取請求上下文 RequestContext currentContext = RequestContext.getCurrentContext(); //獲取請求 HttpServletRequest request = currentContext.getRequest(); ...... //如果前端傳的test為Y,則認為是測試路徑,請求到gray-test為gray的服務上 String test = request.getHeader("test"); if(StringUtils.isNotBlank(test) && "Y".equals(test)){ RibbonFilterContextHolder.getCurrentContext().add("gray-test","gray"); } ...... return true; } ...... }
可以看到,只是獲取了請求頭中的test值,如果為Y,則請求元數據gray-test的值為gray的服務。
上面的這種寫法是測試時調用到指定的服務上,也可以在測試時修改負載策略,例如下面的代碼,就可以有小流量進度灰度發布的服務,而大流量仍然進入原服務
@Component @Slf4j public class RouteFilter extends ZuulFilter { ......//校驗通過的執行邏輯 @Override public Object run() throws ZuulException { String test = RequestContext.getCurrentContext().getRequest().getHeader("test"); if(StringUtils.isNotBlank(test) && "Y".equals(test)){ int send = (int) (Math.random() * 100); log.info("send==================={}", send); if (send >= 0 && send < 10) { //也就是百分之10的請求轉發到gray-test=gray的服務上去 RibbonFilterContextHolder.getCurrentContext().add("gray-test","gray"); } else { //百分之90的請求轉發到gray-test=running的服務上去 RibbonFilterContextHolder.getCurrentContext().add("gray-test","running"); } } return null; } }
八、分布式配置管理 Spring Cloud Config
Spring Cloud Config 就是對微服務的配置文件進行統一管理的。其工作原理是,我們首 先需要將各個微服務公共的配置信息推送到 GitHub 遠程版本庫。然后我們再定義一個 Spring Cloud Config Server,其會連接上這個 GitHub 遠程庫。這樣我們就可以定義 Config 版的 Eureka Server、提供者與消費者了,它們都將作為 Spring Cloud Config Client 出現,它們都會通過連 接 Spring Cloud Config Server 連接上 GitHub 上的遠程庫,以讀取到指定配置文件中的內容。
原理圖如下所示:
(一)環境搭建
1、創建Config Server
pom文件引入依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency>
配置文件:主要需要設置git倉庫地址、文件存放路徑、文件分支等信息
server: port: 9999 spring: cloud: config: server: git: # 設置git倉庫地址 uri: git@github.com:menglongdeye/lcl-springcloud.git search-paths: zzz-configfile # git連接超時時間 timeout: 5 # git分支 default-label: main
啟動類添加@EnableConfigServer注解
@SpringBootApplication @EnableConfigServer public class ConfigServer019999Application { public static void main(String[] args) { SpringApplication.run(ConfigServer019999Application.class, args); } }
2、配置host
127.0.0.1 configserver.com
3、創建Config Client
pom文件引入依賴,這里需要說明一點,springcloud2020 版本,Bootstrap被默認禁用,因此需要將org.springframework.cloud:spring-cloud-starter-bootstrap依賴引入到工程中,如果時springcloud2020之前的版本,就不需要引入spring-cloud-starter-bootstrap。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> <version>3.0.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-bootstrap --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> <version>3.0.2</version> </dependency>
新建bootstrap.yml文件,主要配置對應configserver的地址、分支、名稱、環境等信息
spring: application: name: application-provider-config profiles: active: dev cloud: config: uri: http://configserver.com:9999 label: main name: ${spring.application.name} profile: ${spring.profiles.active}
創建API文件,用以驗證配置文件變化
@RestController @RequestMapping("/depart") public class DepartController { @Value("${test.config}") private String testconfig; @GetMapping("/findAll") public String findAll() throws Exception { return testconfig; } }
4、創建配置文件
在config server配置的git項目地址和目錄中配置相關配置文件。
在config server中配置的uri:為git@github.com:menglongdeye/lcl-springcloud.git,目錄為search-paths: zzz-configfile;在config client中配置的文件名稱是name:application-provider-config,環境是profile:dev,因此在zzz-configfile文件夾中配置application-provider-config-dev.yml配置文件,文件內容:
test:
config: provider
5、驗證
驗證config server:http://localhost:9999/application-provider-config-dev.yml,輸出結果為provider,說明config server搭建完成。
驗證config server:http://localhost:8081/depart/findAll,輸出結果為provider,說明config client搭建完成。
6、刷新配置文件
上面的config自動配置有一個問題,就是如果更新了application-provider-config-dev.yml配置文件,只有config server會更新,但是config client不會更新,例如,將test.config的值修改為provider1,http://localhost:9999/application-provider-config-dev.yml,輸出結果為provider1;http://localhost:8081/depart/findAll,輸出結果為provider。
客戶端需要在引用配置文件的類上添加@RefreshScope注解,表示可以刷新配置
@RestController @RequestMapping("/depart") @Slf4j @RefreshScope public class DepartController { @Value("${test.config}") private String testconfig; @GetMapping("/findAll") public String findAll() throws Exception { return testconfig; } }
暴露刷新端口
management: # springboot 1.x配置 security: enabled: false # springboot 2.x配置 endpoints: web: exposure: include: refresh
使用post請求config client,刷新配置:http://localhost:8081/actuator/refresh,然后客戶端配置即更新
(二)Webhooks自動更新
上面的配置有個問題,就是如果更新了配置文件,需要一個個的訪問config client進行刷新,並沒有自動更新,GitHub提供了Webhooks的方式,可以實現自動更新。具體配置內容如下圖所示:
對於配置項說明如下:
Payload URL :觸發后回調的URL,即config client項目的刷新地址
Content type :數據格式,兩種一般使用json
Secret :用作給POST的body加密的字符串。采用HMAC算法
events :觸發的事件列表。
events事件類型描述:
push 倉庫有push時觸發。默認事件
create 當有分支或標簽被創建時觸發
這樣我們就可以利用hook的機制去觸發客戶端的更新,但是當客戶端越來越多的時候hook支持的已經不夠優雅,另外每次增加客戶端都需要改動hook也是不現實的。其實Spring Cloud給了我們更好解決方案,就是使用Spring Cloud Bus來解決。
九、Spring Cloud Bus消息總線
在微服務架構系統中,我們經常會使用輕量級的消息代理來構建一個公用的消息主題讓系統中所有的微服務實例都鏈接上來,由於該主題中產生的消息會被所有實例監聽和消費,所以稱為消息總線。
在總線上的各個實例都可以方便的廣播一些需要讓其他連接在該主題上的實例都需要知道的消息,例如配置信息的變更或者一些其他的一些管理操作。
消息總線可以使用ActiveMQ、Kafka、RabbitMQ、RocketMQ等,這里使用Kafka演示消息總線用於實時更新配置。
(一)環境搭建
1、配置ConfigServer
pom文件
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency>
配置文件調整:主要是需要配置kafka和開放指定監控端口
server: port: 9999 spring: application: name: springcloud-config-bus-server cloud: config: server: git: # 設置git倉庫地址 uri: git@github.com:menglongdeye/lcl-springcloud.git search-paths: zzz-configfile # git連接超時時間 timeout: 5 # git分支 default-label: main bus: refresh: enabled: true enabled: true trace: enabled: true # 注冊kafka集群 kafka: bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092 consumer: group-id: springcloudConfig eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: # 服務提供者id instance-id: config-bus-server # 設置當前cleit每1秒向server發送一次心跳 lease-renewal-interval-in-seconds: 1 # 設置超過3秒即認為失效 lease-expiration-duration-in-seconds: 3 management: endpoints: web: exposure: include: bus-refresh
啟動類調整:啟動類添加@EnableConfigServer注解
@SpringBootApplication @EnableConfigServer @EnableEurekaClient public class ConfigServerBus029999Application { public static void main(String[] args) { SpringApplication.run(ConfigServerBus029999Application.class, args); } }
2、配置configclient
pom文件
<!--actuator 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> <version>3.0.2</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> <version>3.0.2</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency>
配置文件:
spring: profiles: active: dev application: name: application-provider-config cloud: config: uri: http://configserver.com:9999 label: main name: ${spring.application.name} profile: ${spring.profiles.active} bus: refresh: enabled: true enabled: true trace: enabled: true # 注冊kafka集群 kafka: bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092 management: endpoints: web: exposure: include: bus-refresh eureka: client: service-url: defaultZone: http://localhost:8000/eureka instance: # 服務提供者id instance-id: provider-bus # 設置當前cleit每1秒向server發送一次心跳 lease-renewal-interval-in-seconds: 1 # 設置超過3秒即認為失效 lease-expiration-duration-in-seconds: 3
處理器類添加刷新支持注解@RefreshScope
@RestController @RequestMapping("/depart") @Slf4j @RefreshScope public class DepartController { @Value("${test.config}") private String testconfig; @GetMapping("/findAll") public String findAll() throws Exception { return testconfig; } }
(二)驗證
配置application-provider-config-dev.yml文件
test:
config: provider
項目啟動后:訪問http://localhost:9999/application-provider-config-dev.yml,顯示結果為
test:
config: provider
訪問http://localhost:8081/depart/findAll,輸出為:provider
更新git中配置文件,將其更新為:provider2
此時訪問http://localhost:9999/application-provider-config-dev.yml,結果更新為provider2
訪問http://localhost:8081/depart/findAll,輸出結果仍為:provider,沒有更新,因為需要手動刷新才可以
調用(post)http://localhost:8081/actuator/refresh刷新后,重新訪問http://localhost:8081/depart/findAll,輸出結果為:provider2,已經更新
十、調用鏈跟蹤 Spring Cloud Sleuth+zipkin
(一)簡述
1、概念
Spring Cloud Sleuth可以實現針對Spring Cloud應用程序的分布式跟蹤,兼容Zipkin、HTrace和基於日志的(如ELK)跟蹤。對於大多數用戶來說,Sleuth是不可見的,並且當前應用與外部系統的所有交互都是自動檢測的,我們可以簡單的在日志中捕獲數據,或者將其發送到遠程收集器中。
Spring Cloud Sleuth中存在跟蹤單元的概念,而跟蹤單元中涉及三個重要概念:trace、span、annotation
trace:跟蹤單元是從客戶端發起的請求抵達被跟蹤系統的邊界開始,到被跟蹤系統向客戶端返回響應為止的過程,這個過程稱為一個trace。
span:每個trace中會調用多個服務,為了記錄調用了哪些服務,以及每次調用所消耗的時間等信息,在每次調用服務時,埋入一個調用記錄,這樣兩個調用記錄之間的區域稱為一個span。一個trace中有一或多個span組成。
annotation:其是用來及時記錄事件的實體,表示一個事件發生的時間點。
2、配置
(1)日志生成
只要在工程中添加了Spring Cloud Sleuth依賴,那么在程序啟動和運行過程中就會自動生成很多日志。Sleuth會為這些日志打上收集標記,需要收集的設置為true,不需要收集的設置為false。這個配置可以通過在代碼中添加自己的日志信息看到。
(2)日志采樣率
Sleuth支持對日志的抽樣收集,即並不是會對所有的日志都收集到服務器,日志收集標記就是起到這個作用,默認的采樣比例是0.1,即10%,在配置文件中可以修改該值,若設置為1,則表示100%收集。
日志采樣默認使用的是水塘抽樣算法。
(二)搭建
搭建三個項目,分別是sleuth-provider-01-8081、sleuth-consumer-01-8080、sleuth-client-01-8082,三個項目的關系是:頁面調用、sleuth-client-01-8082,然后sleuth-client-01-8082通過feign調用sleuth-consumer-01-8080,然后sleuth-consumer-01-8080通過geign調用sleuth-provider-01-8081
1、首先搭建sleuth-provider-01-8081
與之前的provider沒有什么區別。搭建后,添加對Spring Cloud Sleuth的依賴。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency>
Controller添加日志輸出
@RestController @RequestMapping("/depart") @Slf4j public class DepartController { @Autowired private DepartService departService; @GetMapping("/findAll") public List<Depart> findAll() throws Exception { log.info("provider findAll 被調用"); return departService.findAll(); } }
同時需要注釋掉日志的相關配置,否則可能看不到后面的演示效果。
# 配置日志 #logging: # pattern: # console: level-%level %msg%n # level: # root: info # # hibernate輸出日志級別 # org.hibernate: info # # 在show-sql為true時,顯示sql中的動態值 # org.hibernate.type.descriptor.sql.BasicBinder: trace # # 在show-sql為true時,顯示查詢結果 # org.hibernate.hql.internal.ast.exec.BasicExecutor: trace # # 設置自己代碼的日志級別 # com.lcl.springcloud: debug
2、搭建sleuth-consumer-01-8080、sleuth-client-01-8082
然后相同的方式搭建sleuth-consumer-01-8080、sleuth-client-01-8082,區別就是需要調整端口號、項目實例名等內容。
(三)驗證
啟動服務,可以看到sleuth的日志
INFO中的第一個就是該項目的服務實例:spring.application.name的配置
調用http://localhost:8082/depart/findAll時,服務的調用順序是sleuth-client-01-8082、sleuth-consumer-01-8080、sleuth-provider-01-8081,各個項目的輸出結果如下:
通過上面的輸出結果可以發現,INFO中的三個參數,第一個是服務名,第二個是一次調用的唯一id(transid),第三個是當前服務的id(spanid)
十一、Zipkin及Zipkin + Sleuth
zipkin 是 Twitter 開發的一個分布式系統 APM(Application Performance Management,應用性能管理)工具,其是基於 Google Dapper 實現的,用於完成日志的聚合。其與 Sleuth 聯用,可以為用戶提供調用鏈路監控可視化 UI 界面。
(一)Zipkin系統結構
1、服務器組成
Zipkin服務器組成如下圖所示
Zipkin服務器主要由4個核心組件夠構成:
(1)Collection:收集器,它主要用於處理從外部系統發送過來的跟蹤信息,將這些信息轉換為Zipkin內部處理的Span格式,以支持后續的存儲、分析、展示等功能。
(2)Storage:存儲組件,它主要用於處理收集器收到的跟蹤信息,默認會將這些信息存儲到內哦村中,也可以修改存儲策略,例如可以將數據存儲到數據庫中。
(3)API:外部訪問接口組件,外部系統通過這里的API可以實現對系統的監控
(4)UI:用於操作界面組件,基於API組件實現的上層應用。通過UI組件用戶可以方便而又直觀的查詢和分析跟蹤信息。
2、日志的發送方式
在Spring Cloud Sleuth + Zipkin的系統中,客戶端中一旦發生服務間的調用,就會被配置在微服務中的Sleuth的監聽器的監聽,然后生成相應的Trance和Span等日志信息,並發送給zipkin服務端。
發送的方式主要有兩種:通過viaHttp報文的方式;通過消息隊列發送。
(二)Zikpin服務端搭建
三種方法
(1)wget
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
(2)curl
curl -sSL https://zipkin.io/quickstart.sh | bash -s
(3)源代碼
git clone https://github.com/openzipkin/zipkin cd zipkin # Build the server and also make its dependencies ./mvnw -DskipTests --also-make -pl zipkin-server clean install # Run the server java -jar ./zipkin-server/target/zipkin-server-*exec.jar
我是用的是第一種,啟動zipkin
java -jar zipkin.jar
zipkin默認端口是9411,訪問zipkin:http://ip:9411/zipkin/
(三)via Http 方式收集日志
搭建三個項目,分別是zipkin-provider-01-8081、zipkin-consumer-01-8080、zipkin-client-01-8082,三個項目的關系是:頁面調用、zipkin-client-01-8082,然后zipkin-client-01-8082通過feign調用zipkin-consumer-01-8080,然后zipkin-consumer-01-8080通過geign調用zipkin-provider-01-8081
1、首先搭建zipkin-provider-01-8081
與之前的sleuth-provider-01-8081沒有什么區別。搭建后,添加對Spring Cloud Zipkin的依賴。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-sleuth-zipkin</artifactId> </dependency>
然后添加zipkin的配置:zipkin地址和日志收集比例
spring: zipkin: base-url: http://8.131.245.53:9411 sleuth: sampler: probability: 1.0
2、然后同樣的搭建zipkin-consumer-01-8080和zipkin-client-01-8082
3、訪問http://localhost:8082/depart/findAll
4、查看zipkin控制台:http://8.131.245.53:9411/zipkin/
點進去后,可以查看調用鏈和調用時間
(四)消息隊列的方式收集日志
默認情況下,Sleuth是通過將調用日志寫入到via頭部信息中的方式實現鏈路追蹤的,但是在高並發場景下,這種方式的效率會非常低,會影響鏈路信息查看。此時,可以讓Sleuth將其生成的調用日志寫入到MQ中(目前只支持Kafka和RabbitMQ), 讓Zipkin從這些中間件中獲取日志。
直接修改上面使用via Http的三個項目
1、添加kafka依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version> </dependency>
2、修改配置文件
spring: zipkin: # base-url: http://8.131.245.53:9411 sender: type: kafka sleuth: sampler: probability: 1.0 kafka: bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092
3、然后使zipkin啟動時連接上kafka
java -DKAFKA_BOOTSTRAP_SERVERS=192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092 -jar zipkin.jar
也同樣可以收集調用日志。
(五)將日志收集的信息存儲到數據庫
1、首先創建一個zipkin數據庫,因為zipkin默認使用的數據庫名稱為zipkin,如果使用jar包部署zipkin,則數據庫必須使用該名稱。然后創建表
CREATE TABLE IF NOT EXISTS zipkin_spans ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL, `id` BIGINT NOT NULL, `name` VARCHAR(255) NOT NULL, `parent_id` BIGINT, `debug` BIT(1), `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL', `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query' ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_spans ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `id`) COMMENT 'ignore insert on duplicate'; ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`, `id`) COMMENT 'for joining with zipkin_annotations'; ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds'; ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames'; ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range'; CREATE TABLE IF NOT EXISTS zipkin_annotations ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id', `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id', `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1', `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB', `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation', `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp', `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address', `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null' ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate'; ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans'; ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds'; ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames'; ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces'; ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces'; CREATE TABLE IF NOT EXISTS zipkin_dependencies ( `day` DATE NOT NULL, `parent` VARCHAR(255) NOT NULL, `child` VARCHAR(255) NOT NULL, `call_count` BIGINT ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_dependencies ADD UNIQUE KEY(`day`, `parent`, `child`);
2、啟動zipkin時連接數據庫即可
STORAGE_TYPE=mysql MYSQL_USER=root MYSQL_PASS=root MYSQL_HOST=192.168.1.108 MYSQL_TCP_PORT=3306 java -DKAFKA_BOOTSTRAP_SERVERS=192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092 -jar zipkin.jar
十一、消息系統整合框架Spring Cloud Stream
Spring Cloud Stream 是一個用來為微服務應用構建消息驅動能力的框架。通過使用Spring Cloud Stream,可以有效簡化開發人員對於消息中間件的使用復雜度。目前Spring Cloud Stream只支持RabbitMQ和Kafka。
應用程序的核心部分通過Inputs和Outputs管道,與中間件連接,而管道是通過綁定器Binder與中間件相綁定的。
(一)發送者配置
1、發送一個topic
(1)搭建一個項目,引入spring-cloud-stream-binder-kafka依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
(2)配置發送者
@Component @EnableBinding(Source.class) public class SomeProducer { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel channel; public void sendMsg(String message){ channel.send(MessageBuilder.withPayload(message).build()); } }
(3)發送消息
@RestController @RequestMapping("/depart") @Slf4j public class DepartController { @Autowired private SomeProducer producer; @GetMapping("/send") public String send() throws Exception { producer.sendMsg("stream send message============="); return "OK"; } }
(4)配置kafka地址
spring: cloud: stream: kafka: binder: # kafka地址 brokers: 192.168.1.110:9092 # 是否自動創建topic auto-create-topics: true bindings: # 輸出的主題及類型 output: destination: stram-test content-type: text/plain
2、配置多個發送者
(1)配置topic信息
public interface ProducerSource001 { String CHANNEL_NAME = "stram-test-001"; @Output(ProducerSource001.CHANNEL_NAME) MessageChannel output(); }
public interface ProducerSource002 { String CHANNEL_NAME = "stram-test-002"; @Output(ProducerSource002.CHANNEL_NAME) MessageChannel output(); }
(2)配置發送者
@Component @EnableBinding({Source.class, ProducerSource001.class, ProducerSource002.class}) public class SelfProducer { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel channel; @Autowired @Qualifier(ProducerSource001.CHANNEL_NAME) private MessageChannel channel001; @Autowired @Qualifier(ProducerSource002.CHANNEL_NAME) private MessageChannel channel002; public void sendMsg(String message){ Message<String> msg = MessageBuilder.withPayload(message).build(); channel.send(msg); channel001.send(msg); channel002.send(msg); } }
(3)發送消息
@RestController @RequestMapping("/depart") @Slf4j public class DepartController { @Autowired private SelfProducer selfProducer; @GetMapping("/sendAll") public String sendAll() throws Exception { selfProducer.sendMsg("stream send all message============="); return "OK"; } }
(二)配置消費者
Spring Cloud Stream 提供了三種創建消費者的方式,這三種方式的都是在消費者類的“消 費”方法上添加注解。只要有新的消息寫入到了管道,該“消費”方法就會執行。只不過三 種注解,其底層的實現方式不同。即當新消息到來后,觸發“消費”方法去執行的實現方式 不同。
@PostConstruct:以發布/訂閱方式實現
@ServiceActivator:新的消息激活服務方式實現
@StreamListener:以監聽方式實現
1、@PostConstruct
@Component @EnableBinding(Sink.class) @Slf4j public class SomeConsumer { @Autowired @Qualifier(Sink.INPUT) private SubscribableChannel channel; @PostConstruct public void printMsg(){ channel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("output/input========={}========={}",message.getHeaders(), new String((byte[]) message.getPayload())); } }); } }
2、@ServiceActivator
(1)配置Sinkl
public interface Sink001 { String CHANNEL_NAME = "stram-test-001"; @Input(Sink001.CHANNEL_NAME) SubscribableChannel input(); }
(2)消費
@Component @EnableBinding(Sink001.class) @Slf4j public class SomeConsumer001 { @ServiceActivator(inputChannel = Sink001.CHANNEL_NAME) public void printMsg(Object object){ log.info("stram-test-001========={}",object); } }
3、@StreamListener
(1)配置Sink
public interface Sink002 { String CHANNEL_NAME = "stram-test-002"; @Output(Sink002.CHANNEL_NAME) MessageChannel input(); }
(2)消費
@Component @EnableBinding(Sink002.class) @Slf4j public class SomeConsumer002 { @StreamListener(Sink002.CHANNEL_NAME) public void printMsg(Object object){ log.info("stram-test-002========={}",object); } }