1Docker Compose
1.1運行以下命令以下載Docker Compose的當前穩定版本:
sudo curl -L "https://github.com/docker/compose/releases/download/1.27.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
1.2將可執行權限應用於二進制文件:
sudo chmod +x /usr/local/bin/docker-compose
1.3卸載
如果使用curl以下命令卸載Docker Compose :
sudo rm /usr/local/bin/docker-compose
2單機版kafka:
2.1准備鏡像文件:zookeeper和kafka
使用docker-compose管理容器
docker-compose-testkafka.yml
version: "3.3" services: zookeeper: image: zookeeper:3.5.5 restart: always container_name: zookeeper ports: - "2181:2181" expose: - "2181" environment: - ZOO_MY_ID=1 kafka: image: wurstmeister/kafka restart: always container_name: kafka environment: - KAFKA_BROKER_ID=1 - KAFKA_LISTENERS=PLAINTEXT://kafka:9090 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES=2000000 ports: - "9090:9090" depends_on: - zookeeper kafka-manager: image: sheepkiller/kafka-manager ## 鏡像:開源的web管理kafka集群的界面 environment: ZK_HOSTS: 192.168.228.128 ## 修改:宿主機IP ports: - "9001:9000" ## 暴露端口
2.2添加docker自身ip為信任,不然kafka無法連接zookeeper
firewall-cmd --zone=trusted --add-source=172.18.0.1/16 --permanent
firewall-cmd --reload
2.3啟動集群
docker-compose -f docker-compose.yml up -d
2.4查看啟動日志:
docker logs -f zookeeper

停止集群
docker-compose -f docker-compose*.yml stop
2.5 sheepkiller/kafka-manager 管理kafka集群

讓我們查看一下,Zookeeper 中注冊的 Broker 信息
docker exec -it zookeeper bash bin/zkCli.sh

退出--- [zk: localhost:2181(CONNECTED) 2] quit
2.6進入kafka容器,操作命令
docker exec -ti kafka bash
#cd opt/kafa_<版本>/bin
/opt/kafka_2.13-2.6.0/bin
//創建topic kafka-topics.sh --create --zookeeper 192.168.228.128:2181 --replication-factor 1 --partitions 1 --topic topicTEST //查看topic kafka-topics.sh --list --zookeeper 192.168.228.128:2181 //創建生產者 kafka-console-producer.sh --broker-list 192.168.228.128:9090 --topic topicTEST //創建消費者 kafka-console-consumer.sh --bootstrap-server 192.168.228.128:9090 --topic topicTEST --from-beginning
2.7生產消息 消費消息


3 springboot整合kafka
3.1配置文件
spring: kafka: bootstrap-servers: 192.168.228.128:9090 listener: concurrency: 10 ack-mode: MANUAL_IMMEDIATE poll-timeout: 1500 consumer: group-id: kafka_cluster1 enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: {session.timeout.ms: 6000, auto.offset.reset: earliest} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 65536 buffer-memory: 524288
3.2生產者ProducerController
public class ProducerController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; @RequestMapping(value="message/send" , method= RequestMethod.POST,produces = { "application/json" }) public String send(@RequestBody String message) { System.out.println(message); kafkaTemplate.send("topicTEST", message); //使用kafka模板發送信息 return "success"; } }
3.3消費者ConsumerDemo
@Component public class ConsumerDemo { @KafkaListener(topics = "topicTEST") public void listen (ConsumerRecord<?, ?> record){ System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value()); } }
3.4kafkaTemplate發送消息的3種方式
public void testTemplateSend() { //1 發送帶有時間戳的消息 kafkaTemplate.send("topicTEST", 0, System.currentTimeMillis(), String.valueOf(0), "send message with timestamp"); //2 使用ProducerRecord發送消息 ProducerRecord record = new ProducerRecord("topicTEST", "use ProducerRecord to send message"); kafkaTemplate.send(record); //3 使用Spring框架Message類發送消息 Map map = new HashMap(); map.put(KafkaHeaders.TOPIC, "topicTEST"); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.MESSAGE_KEY, String.valueOf(0)); GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map)); kafkaTemplate.send(message); }
