一、Kafka Eagle
1.1、Eagle安裝
1)修改 kafka 啟動命令
修改 kafka-server-start.sh 命令:
[hadoop@hadoop102 kafka]$ vim bin/kafka-server-start.sh ... if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi #修改為 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
注意:修改之后在啟動 Kafka 之前要分發之其他節點
[hadoop@hadoop102 kafka]$ cat /usr/local/bin/xsync #!/bin/bash #1 獲取輸入參數個數,如果沒有參數,直接退出 pcount=$# if((pcount==0)); then echo no args; exit; fi #2 獲取文件名稱 p1=$1 fname=`basename $p1` echo fname=$fname #3 獲取上級目錄到絕對路徑 pdir=`cd -P $(dirname $p1); pwd` echo pdir=$pdir #4 獲取當前用戶名稱 user=`whoami` #5 循環 for((host=103; host<105; host++)); do echo ------------------- hadoop$host -------------- rsync -rvl $pdir/$fname $user@hadoop$host:$pdir done [hadoop@hadoop102 kafka]$ xsync bin/kafka-server-start.sh
2)上傳壓縮包 kafka-eagle-bin-1.3.7.tar.gz 到集群並解壓
[hadoop@hadoop102 software]$ ll kafka-eagle-bin-1.3.7.tar.gz -rw-r--r-- 1 hadoop hadoop 84934270 Aug 24 2019 kafka-eagle-bin-1.3.7.tar.gz [hadoop@hadoop102 software]$ tar xf kafka-eagle-bin-1.3.7.tar.gz [hadoop@hadoop102 software]$ cd kafka-eagle-bin-1.3.7/ [hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ ls kafka-eagle-web-1.3.7-bin.tar.gz [hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ tar xf kafka-eagle-web-1.3.7-bin.tar.gz -C /opt/module/ [hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ mv /opt/module/kafka-eagle-web-1.3.7/ /opt/module/eagle
3)給啟動文件執行權限
[hadoop@hadoop102 eagle]$ ls bin conf db font kms logs [hadoop@hadoop102 eagle]$ chmod 777 bin/ke.sh [hadoop@hadoop102 eagle]$ ll bin/ total 12 -rw-r--r-- 1 hadoop hadoop 1848 Aug 22 2017 ke.bat -rwxrwxrwx 1 hadoop hadoop 7190 Jul 30 2019 ke.sh
4)修改環境變量
[hadoop@hadoop102 eagle]$ sudo vim /etc/profile #eagle export KE_HOME=/opt/module/eagle export PATH=$PATH:$KE_HOME/bin [hadoop@hadoop102 eagle]$ source /etc/profile #分發/etc/profile,三台需一樣
5)修改配置文件system-config.properties
###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=false ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&ch aracterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=root
6)使用docker安裝mysql
安裝docker:
curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/docker-ce.repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo sed -i 's#download.docker.com#mirrors.tuna.tsinghua.edu.cn/docker-ce#g' /etc/yum.repos.d/docker-ce.repo yum install docker-ce lrzsz -y docker version mkdir -p /etc/docker/ && touch /etc/docker/daemon.json cat >/etc/docker/daemon.json <<EOF { "registry-mirrors": ["https://t09ww3n9.mirror.aliyuncs.com"] } EOF systemctl start docker systemctl enable docker
安裝mysql:
sudo docker run -p 3306:3306 --name mysql \ -v /mydata/mysql/log:/var/log/mysql \ -v /mydata/mysql/data:/var/lib/mysql \ -v /mydata/mysql/conf:/etc/mysql \ -e MYSQL_ROOT_PASSWORD=root \ -d mysql:5.7
創建數據庫:
mysql> create database ke;
7)啟動
注意:啟動之前需要先啟動 ZK 以及 KAFKA
[hadoop@hadoop102 eagle]$ bin/ke.sh start
1.2、Eagle簡單使用
1)啟動消費者
package com.dianchou.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; /** * @author lawrence * @create 2021-02-05 */ public class MyConsumer { public static void main(String[] args) { Properties properties = new Properties(); //連接的集群 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //開啟自動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自動提交間隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //key value反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //消費者組 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test"); //創建消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //訂閱主題 consumer.subscribe(Collections.singletonList("group-test")); //獲取數據 while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(100); //解析並打印consumerRecords for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } } } }
2)eagle中查看topic及消費者
3)啟動生產者
package com.dianchou.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * @author lawrence * @create 2021-02-04 */ public class MyProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { //創建Kafka生產者的配置信息 Properties properties = new Properties(); //指定連接的集群 //properties.put("bootstrap.servers","hadoop102:9092"); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //ack應答級別 //properties.put("acks", "all"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); //重試次數 properties.put("retries", 3); //批次大小16K properties.put("batch.size", 16384); //等待時間1ms properties.put("linger.ms", 1); //RecordAccumulator 緩沖區大小32M properties.put("buffer.memory", 33554432); //key序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建kafka生產者,傳入生產者的配置信息 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 10000; i++) { //不帶回調函數 producer.send(new ProducerRecord<String, String>("group-test", Integer.toString(i), Integer.toString(i))).get(); } producer.close(); } }
4)查看
5)Brokers監控
6)Kafka監控
7)zookeeper
8)topic