关于微服务的搭建,大家可以参考前面几篇。
今天,搭建一个简单的消息中间件Kafka.
一, zookeeper:
1 下载
http://zookeeper.apache.org/releases.html#download
2 修改配置
将“zoo_sample.cfg”重命名为“zoo.cfg”
设置dataDir为自己的路径
添加admin.serverPort=xxx端口
添加系统变量:ZOOKEEPER_HOME=F:\apache-zookeeper-3.6.1-bin;
path系统变量:添加路径:%ZOOKEEPER_HOME%\bin;
3 运行
在目录中,启动zkserver
二,kafka
1 下载
http://kafka.apache.org/downloads
2 修改配置
找到config文件夹下的:server.properties
添加:zookeeper.connect=localhost:2181
log.dirs=tmp/kafka-logs
kafka 默认 9092 端口
3 运行
目录下,运行:> bin\windows\kafka-server-start.bat config\server.properties
PS:
在Windows上,启动Zookeeper和Kafka
> bin\windows\zookeeper-server-start.bat config\zookeeper.properties (zkserver)
> bin\windows\kafka-server-start.bat config\server.properties
三,新建kafka服务
1 新建
如何新建微服务,参考之前的文章。
2 bootstrap.yml(参考 配置中心)
spring: http: encoding: charset: UTF-8 enabled: true force: true cloud: config: uri: http://${host:localhost}:9020 name: config profile: ${active:dev}
3 application.yml (参考 配置中心)
server: port: 9900 eureka: client: service-url: defaultZone: ${registry.url} instance: lease-expiration-duration-in-seconds: 60 lease-renewal-interval-in-seconds: 30 preferIpAddress: true instanceId: ${spring.cloud.client.ip-address}:${server.port} spring: application: name: testkafka cloud: stream: kafka: binder: brokers: localhost:9092 bindings: test-in: #TestStream 中 INPUT destination: testkafka contentType: application/json test-out: #TestStream 中 OUTPUT destination: testkafka contentType: application/json
4 KafkaApplication
@SpringBootApplication @EnableEurekaClient public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
5 TestStream
package com.test.kafka.controller; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author Tyler * @date 2020/7/28 */ public interface TestStream { String INPUT = "test-in"; String OUTPUT = "test-out"; @Input(INPUT) SubscribableChannel testIn(); @Output(OUTPUT) MessageChannel testOut(); }
6 StreamReceiver
package com.test.kafka.controller; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @author Tyler * @date 2020/7/28 */ @Component @EnableBinding(value = {TestStream.class}) public class StreamReceiver { @StreamListener(TestStream.INPUT) public void receive(String message) { System.out.println("StreamReceiver: "+message); } }
7 SendController
package com.test.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Tyler * @date 2020/7/29 */ @RestController @RequestMapping("api/") public class SendController { @Autowired private TestStream testStream; @GetMapping("send") public void send() { // System.out.println("Hello World..."); testStream.testOut().send(MessageBuilder.withPayload("Hello World...").build()); } }
8 POM文件
主要:
spring-boot-starter-web spring-cloud-starter-netflix-eureka-client spring-cloud-config-client spring-cloud-starter-stream-kafka
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.test</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR6</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- 配置中心的客户端 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.2.4.RELEASE</version> <scope>compile</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
四,运行
1 启动 zookeeper
2 启动 kafka
3 启动注册中心,配置中心
4 启动kafka服务
5 浏览:http://localhost:9900/api/send
6 控制台