linux下kafka安裝


轉載

原文地址:https://www.jianshu.com/p/c74e0ec577b0

kafka依賴zookeeper,zookeeper安裝參見:https://www.cnblogs.com/liyuanhong/articles/12501001.html

jdk安裝

  1. 查看centos自帶的openjdk安裝包:
    rpm -qa | grep openjdk
java-1.7.0-openjdk-headless-1.7.0.111-2.6.7.8.el7.x86_64
java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64
  1. 卸載centos7的openjdk:
    yum -y remove java-1.7.0-openjdk-headless-1.7.0.111-2.6.7.8.el7.x86_64

  2. 下載jdk1.8
    http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

  3. 解壓jdk安裝包

    tar -xvf jdk-8u201-linux-x64.tar.gz 
    vi /etc/profile
    

    JAVA_HOME=/home/wusong1/software/jdk1.8.0_201
    JRE_HOME=/home/wusong1/software/jdk1.8.0_201/jre
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    export JAVA_HOME JRE_HOME PATH CLASSPATH

  1. 是profile文件生效
    sourece /etc/profile

  2. 測試Java是否安裝成功
    java -version

配置免密登錄

  1. 進入到.ssh文件夾中,如果沒有就自己創建該文件夾,生成公鑰私鑰
    ssh-keygen -t rsa 對於提示 一路回車

  2. 將生成的公鑰 id_rsa.pub 追加到 authorized_keys文件中,
    cat id_rsa.pub >> authorized_keys
    這里需要注意修改authorized_keys文件的權限:
    chmod 600 authorized_keys

  3. 配置完成后本地測試一下,如果沒有提示輸入密碼則配置成功
    ssh localhost

  4. 對於分布式環境,只需要把對應的公鑰放到目標機器的authorized_keys里面即可免密登錄

我們要注意,.ssh目錄的權限為700,其下文件authorized_keys和私鑰的權限為600。否則會因為權限問題導致無法免密碼登錄。我們可以看到登陸后會有known_hosts文件生成。

Zookeeper安裝

  1. 下載zookeeper: https://mirrors.cnnic.cn/apache/zookeeper/

  2. 解壓: tar xvf zookeeper-3.4.13.tar.gz

  3. 修改配置文件:

    1. dataDir=/opt/data/zookeeper/data
      存儲快照文件的目錄,默認情況下, 事務日志也會存儲在該目
      dataDir錄上。由於事務日志 的寫性能直接影響 ZooKeeper 性能,因此 建議同時配置參數 dataLogDir
      dataLogDir=/opt/data/zookeeper/logs

    2. 集群配直。首先在 3 台機器的/etc/hosts 文件中加入 3 台機器 的 IP 與機器域名映射, 域名自定義, 這里分別命名為 server-I、 server-2、 server-3, 3 台機器 IP與機器域名映射關 系如下:

    10.211.55.4 server-1
    10.211.55.5 server-2
    10.211.55.6 server-3

    1. 進入其中一台機器的Zookeeper安裝路徑conf,添加

    server.1=server-1:2888:3888
    server.2=server-2:2888:3888
    server.3=server-3:2888:3888

    端口號2888表示該服務器與集群中leader交換信息的端口,默認為2888, 3888表示選舉時服務器相互通信的端口。

    1. 接着在${dataDir}路徑下創建一個myid文件,myid存放的值就是服務器的編號,即對應上面的1、2、3。ZooKeeper在啟動時會讀取 myid文件 中的值與 zoo.cfg文件中的配置信息進行比較, 以確定是哪台服務器。

    2. 將配置好的zoo.cfg拷貝到其他兩台機器,並分別創建對應的myid,為了操作方便,我們可以將Zookeeper相關環境變量添加到/etc/profile文件中,如:
      export ZOOKEEPER_HOME=/home/wusong/software/zookeeper-3.4.13
      PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$ZOOKEEPER_HOME/bin

    3. 驗證。 zkServer.sh start
      zkServer.sh status
      輸出下面類似結果表示安裝成功

       ```
       ZooKeeper JMX enabled by default Using config: /home/wusong/software/zookeeper-3.4.13/bin/../conf/zoo.cfg 

Mode: follower
```

注意: centos7默認使用firewall作為防火牆,並默認開啟, 在啟動zk時需要關閉防護牆,不然無法通信,查看防火牆狀態: firewall-cmd --state 停止防火牆: systemctl stop firewalld.service 禁止防火牆開機啟動:systemctl disable firewalld.service

kafka安裝

  1. 下載 http://kafka.apache.org/downloads
    當前 Kafka 最新版本為 kafka_2.12-2.1.0.tgz,其中 2.12 代表 Scala 版本, 2.1.0 表示 Kafka 的版本

  2. 解壓安裝 tar xvf kafka_2.12-2.1.0.tgz, 為了操作方便,這里我們對kafka的環境變量進行設置, 在/etc/profile 文件中加入kafka的安裝路徑,

export KAFKA_HOME=/home/wusong/software/kafka_2.12-2.1.0 PATH=\$PATH:\$JAVA_HOME/bin:\$JRE_HOME/bin:\$ZOOKEEPER_HOME/bin:\$KAFKA_HOME/bin 
  1. 修改配置 。修改$KAFKA_HOME/config 目錄下的 server.prope叫es文件,為了便於后 續集群環境搭建的配置, 需要保證同一個集群下 broker.id要唯一,因此這里手動配置 broker.id, 直接保持與ZooKeeper的myid值一致, 同時配置日志存儲路徑。server.properties修改的配置 如下 :
    broker.id=l #指定代理的 id
    log.dirs=/opt/data/kafka- logs #指定 Log 存儲路徑
    zookeeper . connect=server-1:2181 , server - 2:2181 , server-3:2181

在三台機器上分別修改配置文件server.properties, 並修改對應的broker.id.

  1. 啟動。
    kafka-server-start.sh -daemon ../config/server.properties
    執行 jps命令查看 Java進程,此時進程信息至少包括以下幾項:
    15976 Jps
    14999 QuorumPeerMain
    15906 Kafka

  2. 通過 ZooKeeper 客戶端登錄 ZooKeeper 查看目錄結構,執行以下命令:
    zkCli.sh -server server 1:2181 #登錄 ZooKeeper
    ls / #查看 ZooKeeper 目錄結構
    ls /brokers/ ids 輸出 [1, 2, 3]

    由/brokers/ids 節點存儲的元數據可知, 3台機器的 Kafka 均已正常己啟動。至此, Kafka 分布式環境搭建過程介紹完畢。

kafka實戰

  1. 創建一個擁有3個副本的topic:

kafka-topics.sh --create --zookeeper 10.211.55.6:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka-topics.sh --list --zookeeper 10.211.55.5:2181
my-replicated-topic

現在我們搭建了一個集群,怎么知道每個節點的信息呢?運行“"describe topics”命令就可以了:

kafka-topics.sh --describe --zookeeper server-1:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 

第一行是對所有分區的一個描述,然后每個分區都會對應一行,因為我們只有一個分區所以下面就只加了一行。
leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
replicas:列出了所有的副本節點,不管節點是否在服務中.
isr:是正在服務中的節點.

在我們的例子中,節點2是作為leader運行。

修改topic過期時間
默認是7天 修改為3天
kafka-topics.sh --zookeeper server-1:2181 -topic xxxx --alter --config retention.ms=259200000

  1. 往topic發送消息:
    kafka-console-producer.sh --broker-list server-1:9092 --topic my-replicated-topic

  2. 消費這些消息:

kafka-console-consumer.sh --bootstrap-server server-1:9092 --from-beginning --topic my-replicated-topic

  1. 測試一下容錯能力:

broker2作為leader運行,現在kill掉:
jps
22839 QuorumPeerMain
22334 Jps
1999 Kafka
[wusong@centos-linux-2 config]$ kill -9 1999

kafka-topics.sh --describe --zookeeper server-1:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1

現在leader為3,

雖然最初負責續寫消息的leader down掉了,但之前的消息還是可以消費的:

kafka-console-consumer.sh --bootstrap-server server-1:9092 --from-beginning --topic my-replicated-topic
hello
world
nihao
yes
hello

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM