在Centos6或者7上安裝Kafka最新版


一、官網

  http://kafka.apache.org/downloads.html

二、Kafka簡介

  Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

  類似的組件還有:Azure的ServiceBus、RabbitMQ等,據網上描述,Kafka比RabbitMQ性能強。

三、安裝

  1、安裝Java

yum install java-1.8.0-openjdk.x86_64

  2、配置JAVA環境變量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

  說明:配置時注意JAVA_HOME后面要加到/jre,這個比較特殊。另外,紅色區域可以換成您對應的安裝版本的路徑。

  3、下載Kafka:http://kafka.apache.org/downloads.html

cd /opt
wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz

  4、解壓並進入目錄

tar -zvxf ./kafka_2.12-1.1.0.tgz
cd kafka_2.12-1.1.0

  5、啟動Zookeeper

  使用安裝包中的腳本啟動單節點Zookeeper 實例:(參數-daemon表示后台運行)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

  6、啟動Kafka服務

bin/kafka-server-start.sh config/server.properties

  7、創建一個測試的Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  查看Topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

  8、產生消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello xingzhu
>hello sindrol

 

  9、消費消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
xingzhu
sindrol

  好了,到此,單台Kafka已經安裝完成。

四、集群配置

  1、單機多broker 集群配置

     利用單節點部署多個broker。 不同的broker 設置不同的 id,監聽端口及日志目錄。 例如:

cp config/server.properties config/server-1.properties 

    編輯配置:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

    啟動Kafka服務:

bin/kafka-server-start.sh config/server-1.properties &

    啟動多個服務后,可以參考第三節內容,產生和消費消息。

 

  2、多機多broker 集群配置

    分別在多個節點按上述方式安裝Kafka,配置啟動多個Zookeeper 實例。 例如: 在10.4.253.22,10.4.253.23,10.4.253.24三台機器部署,Zookeeper配置如下:

initLimit=5
syncLimit=2
server.1=10.4.253.22:2888:3888
server.2=10.4.253.23:2888:3888
server.3=10.4.253.24:2888:3888

    分別配置多個機器上的Kafka服務 設置不同的broke id,zookeeper.connect設置如下:

zookeeper.connect=10.4.253.22:2181,10.4.253.23:2181,10.4.253.24:2181

    啟動Zookeeper與Kafka服務,按上文方式產生和消費消息,驗證集群功能。

 

 

五、外網訪問

  安裝完成並啟動后,如果想要外網通過外網IP訪問,需要在config/service.properites中添加如下修改:

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:port

  修改完成后,重新啟動Kafka服務。

 

六、C#調用

  引入庫:kafka-net(https://github.com/Jroland/kafka-net)

  1、模擬消費端

using KafkaNet;
using KafkaNet.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KafkaClientDemo.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri("http://42.159.154.132:9092"));
            var router = new BrokerRouter(options);
            
            var consumer = new KafkaNet.Consumer(new ConsumerOptions("test", router));

            Console.WriteLine("waiting ...");
            //Consume returns a blocking IEnumerable (ie: never ending stream)
            foreach (var message in consumer.Consume())
            {
                Console.WriteLine("Response: P{0},O{1} : {2}", message.Meta.PartitionId, message.Meta.Offset, Encoding.UTF8.GetString(message.Value, 0, message.Value.Length));
            }
        }
    }
}

  2、模擬消息生產端

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KafkaClientDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri("http://42.159.154.132:9092"));
            var router = new BrokerRouter(options);
            var client = new Producer(router);
            //var topic = client.GetTopic("test");

            using (client)
            {
                while (true)
                {
                    Console.Write(">");
                    var text = Console.ReadLine();
                    client.SendMessageAsync("testTopic", new[] { new Message(text, "key_" + DateTime.Now.Ticks) }).Wait();
                  
                    if (text == "exit")
                        break;
                }
            }
        }
    }
}

 

  運行效果:

 

  

  我在部署和學習時,參考了https://www.mtyun.com/library/how-to-install-kafka-on-centos7文章,非常感謝原作者分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM