今天,我們將通過Apache Kafka主題構建一些彼此異步通信的微服務。我們使用Micronaut框架,它為與Kafka集成提供專門的庫。讓我們簡要介紹一下示例系統的體系結構。我們有四個微型服務:訂單服務,行程服務,司機服務和乘客服務。這些應用程序的實現非常簡單。它們都有內存存儲,並連接到同一個Kafka實例。
我們系統的主要目標是為客戶安排行程。訂單服務應用程序還充當網關。它接收來自客戶的請求,保存歷史記錄並將事件發送到orders主題。所有其他微服務都在監聽orders這個主題,並處理order-service發送的訂單。每個微服務都有自己的專用主題,其中發送包含更改信息的事件。此類事件由其他一些微服務接收。架構如下圖所示。

在閱讀本文之前,有必要熟悉一下Micronaut框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通信的過程:使用microaut框架構建微服務的快速指南。
1 運行Kafka
要在本地機器上運行Apache Kafka,我們可以使用它的Docker映像。最新的鏡像是由https://hub.docker.com/u/wurstmeister共享的。在啟動Kafka容器之前,我們必須啟動kafka所用使用的ZooKeeper服務器。如果在Windows上運行Docker,其虛擬機的默認地址是192.168.99.100。它還必須設置為Kafka容器的環境。
Zookeeper和Kafka容器都將在同一個網絡中啟動。在docker中運行Zookeeper以zookeeper的名稱提供服務,並在暴露2181端口。Kafka容器需要在環境變量使用KAFKA_ZOOKEEPER_CONNECT的地址。
$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka
2 添加micronaut-kafka依賴
使用Kafka構建的microaut應用程序可以在HTTP服務器存在的情況下啟動,也可以在不存在HTTP服務器的情況下啟動。要啟用Micronaut Kafka,需要添加micronaut-kafka庫到依賴項。如果您想暴露HTTP API,您還應該添加micronaut-http-server-netty:
<dependency>
<groupId>io.micronaut.configuration</groupId>
<artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
3 構建訂單微服務
訂單微服務是唯一一個啟動嵌入式HTTP服務器並暴露REST API的應用程序。這就是為什么我們可以為Kafka提供內置Micronaut健康檢查。要做到這一點,我們首先應該添加micronaut-management依賴:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-management</artifactId>
</dependency>
為了方便起見,我們將通過在application.yml中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。
endpoints:
all:
enabled: true
sensitive: false
現在,可以在地址欄http://localhost:8080/health下使用health check。我們的示例應用程序還將暴露添加新訂單和列出所有以前創建的訂單的簡單REST API。下面是暴露這些端點的Micronaut控制器實現:
@Controller("orders")
public class OrderController {
@Inject
OrderInMemoryRepository repository;
@Inject
OrderClient client;
@Post
public Order add(@Body Order order) {
order = repository.add(order);
client.send(order);
return order;
}
@Get
public Set<Order> findAll() {
return repository.findAll();
}
}
每個微服務都使用內存存儲庫實現。以下是訂單微服務(Order-Service)中的存儲庫實現:
@Singleton
public class OrderInMemoryRepository {
private Set<Order> orders = new HashSet<>();
public Order add(Order order) {
order.setId((long) (orders.size() + 1));
orders.add(order);
return order;
}
public void update(Order order) {
orders.remove(order);
orders.add(order);
}
public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
}
public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
.max(Comparator.comparing(Order::getId));
}
public Set<Order> findAll() {
return orders;
}
}
內存存儲庫存儲Order對象實例。Order對象還被發送到名為orders的Kafka主題。下面是Order類的實現:
public class Order {
private Long id;
private LocalDateTime createdAt;
private OrderType type;
private Long userId;
private Long tripId;
private float currentLocationX;
private float currentLocationY;
private OrderStatus status;
// ... GETTERS AND SETTERS
}
4 使用Kafka異步通信
現在,讓我們想一個可以通過示例系統實現的用例——添加新的行程。
我們創建了OrderType.NEW_TRIP類型的新訂單。在此之后,(1)訂單服務創建一個訂單並將其發送到orders主題。訂單由三個微服務接收:司機服務、乘客服務和行程服務。
(2)所有這些應用程序都處理這個新訂單。乘客服務應用程序檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則它什么也做不了。司機服務正在尋找最近可用的司機,(3)行程服務創建和存儲新的行程。司機服務和行程服務都將事件發送到它們的主題(drivers, trips),其中包含相關更改的信息。
每一個事件可以被其他microservices訪問,例如,(4)行程服務偵聽來自司機服務的事件,以便為行程分配一個新的司機
下圖說明了在添加新的行程時,我們的微服務之間的通信過程。

現在,讓我們繼續討論實現細節。
4.1 發送訂單
首先,我們需要創建Kafka 客戶端,負責向主題發送消息。我們創建的一個接口,命名為OrderClient,為它添加@KafkaClient並聲明用於發送消息的一個或多個方法。每個方法都應該通過@Topic注解設置目標主題名稱。對於方法參數,我們可以使用三個注解@KafkaKey、@Body或@Header。@KafkaKey用於分區,這是我們的示例應用程序所需要的。在下面可用的客戶端實現中,我們只使用@Body注解。
@KafkaClient
public interface OrderClient {
@Topic("orders")
void send(@Body Order order);
}
4.2 接收訂單
一旦客戶端發送了一個訂單,它就會被監聽orders 主題的所有其他微服務接收。下面是司機服務中的監聽器實現。監聽器類OrderListener應該添加@KafkaListener注解。我們可以聲明groupId作為一個注解參數,以防止單個應用程序的多個實例接收相同的消息。然后,我們聲明用於處理傳入消息的方法。與客戶端方法相同,應該通過@Topic注解設置目標主題名稱,因為我們正在監聽Order對象,所以應該使用@Body注解——與對應的客戶端方法相同。
@KafkaListener(groupId = "driver")
public class OrderListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private DriverService service;
public OrderListener(DriverService service) {
this.service = service;
}
@Topic("orders")
public void receive(@Body Order order) {
LOGGER.info("Received: {}", order);
switch (order.getType()) {
case NEW_TRIP -> service.processNewTripOrder(order);
}
}
}
4.3 發送到其他主題
現在,讓我們看一下司機服務中的processNewTripOrder方法。DriverService注入兩個不同的Kafka Client
bean: OrderClient和DriverClient。當處理新訂單時,它將試圖尋找與發送訂單的乘客最近的司機。找到他之后,將該司機的狀態更改為UNAVAILABLE,並將帶有Driver對象的事件發送到drivers主題。
@Singleton
public class DriverService {
private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);
private DriverClient client;
private OrderClient orderClient;
private DriverInMemoryRepository repository;
public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
this.client = client;
this.orderClient = orderClient;
this.repository = repository;
}
public void processNewTripOrder(Order order) {
LOGGER.info("Processing: {}", order);
Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
driver.ifPresent(driverLocal -> {
driverLocal.setStatus(DriverStatus.UNAVAILABLE);
repository.updateDriver(driverLocal);
client.send(driverLocal, String.valueOf(order.getId()));
LOGGER.info("Message sent: {}", driverLocal);
});
}
// ...
}
這是Kafka Client在司機服務中的實現,用於向driver主題發送消息。因為我們需要將Driver與Order 關聯起來,所以我們使用@Header注解 的orderId參數。沒有必要把它包括到Driver類中,將其分配給監聽器端的正確行程。
@KafkaClient
public interface DriverClient {
@Topic("drivers")
void send(@Body Driver driver, @Header("Order-Id") String orderId);
}
4.4 服務間通信
由DriverListener收到@KafkaListener在行程服務中聲明。它監聽傳入到trip主題。接收方法的參數和客戶端發送方法的類似,如下所示:
@KafkaListener(groupId = "trip")
public class DriverListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private TripService service;
public DriverListener(TripService service) {
this.service = service;
}
@Topic("drivers")
public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
service.processNewDriver(driver);
}
}
最后一步,將orderId查詢到的行程Trip與driverId關聯,這樣整個流程就結束。
@Singleton
public class TripService {
private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);
private TripInMemoryRepository repository;
private TripClient client;
public TripService(TripInMemoryRepository repository, TripClient client) {
this.repository = repository;
this.client = client;
}
public void processNewDriver(Driver driver, String orderId) {
LOGGER.info("Processing: {}", driver);
Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
trip.ifPresent(tripLocal -> {
tripLocal.setDriverId(driver.getId());
repository.update(tripLocal);
});
}
// ... OTHER METHODS
}
5 跟蹤
我們可以使用Micronaut Kafka輕松地啟用分布式跟蹤。首先,我們需要啟用和配置Micronaut跟蹤。要做到這一點,首先應該添加一些依賴項:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-http</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentracing.brave</groupId>
<artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>0.0.16</version>
<scope>runtime</scope>
</dependency>
我們還需要在application.yml配置文件中,配置Zipkin 的追蹤的地址等
tracing:
zipkin:
enabled: true
http:
url: http://192.168.99.100:9411
sampler:
probability: 1
在啟動應用程序之前,我們必須運行Zipkin容器:
$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin
6 總結
在本文中,您將了解通過Apache Kafka使用異步通信構建微服務架構的過程。我已經向大家展示了Microaut Kafka庫最重要的特性,它允許您輕松地聲明Kafka主題的生產者和消費者,為您的微服務啟用健康檢查和分布式跟蹤。我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求添加一個新的行程。本示例系統的整體實現,請查看GitHub上的源代碼
