ELK+Kafka集群日志分析系統


ELK+Kafka集群分析系統部署

因為是自己本地寫好的word文檔復制進來的。格式有些出入還望體諒。如有錯誤請回復。謝謝!

一、 系統介紹 2

二、 版本說明 3

三、 服務部署 3

1) JDK部署 3

2) Elasticsearch集群部署及優化 3

3) Elasticsearch健康插件安裝 13

4) Shieldelasticsearch安全插件 15

5)Zookeeper集群搭建 15

6)Kafka集群搭建 17

7)測試KafkaZookeeper集群連通性 19

8) Logstash部署 20

9) Kibana部署 20

四、 系統使用示例 22

1) Logstash 作為kafka生產者示例 22

2) Logstash index 消費kafka示例 23

 

一、系統介紹

隨着實時分析技術的發展及成本的降低,用戶已經不僅僅滿足於離線分析。下面來介紹一下架構

 

 

 

這是一個再常見不過的架構了:

(1)Kafka:接收用戶日志的消息隊列

(2)Logstash:做日志解析,統一成json輸出給Elasticsearch

(3)Elasticsearch:實時日志分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。

(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。

(5)Zookeeper: 狀態管理,監控進程等服務

 

二、版本說明

本次系統為:Centos6.5 64

Java版本為:1.8.0_74

Elasticsearch為:2.4.0

Logstash :2.4.0

Kibana:4.6.1

Shield:2.0+

Kafka:2.10-0.10.0.1

Zookeeper:3.4.9

 

相應的版本最好下載對應的插件。

 

 

三、服務部署

1) JDK部署

下載JDK包到/data目錄下解壓,並將變量導入/etc/profile末尾。

export JAVA_HOME=/usr/local/jdk

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

 

2) Elasticsearch集群部署及優化

下載並安裝es rpm包:

Rpm -ivh https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.0/elasticsearch-2.4.0.rpm

啟動方式:

/etc/init.d/elasticsearch start|stop

Elasticsearch bin文件內存優化:

由於是rpm 方式安裝將/usr/share/Elasticsearch/bin/Elasticsearch 加入如下參數:

ES_HEAP_SIZE=16g     #ES_HEAP_SIZE表示JVM參數的-Xms and -Xmx設置

MAX_OPEN_FILES=65535

Elasticsearch 配置文件優化以及說明:

我們需要配置如下:

cluster.name: es-ajb-cluster

node.name: es-node-1

node.master: true

node.data: true

index.number_of_shards: 8

index.number_of_replicas: 1

path.data: /data/es

path.logs: /data/eslogs

bootstrap.mlockall: true

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["172.16.38.133", "172.16.38.134","172.16.38.135"]

以下為更詳細yml配置文件參數解釋:

##################### Elasticsearch Configuration Example ################

# 我只是挑些重要的配置選項進行注釋,其實自帶的已經有非常細致的英文注釋了.有理解偏差的地方請以英文原版解釋為准.

################################### Cluster#############################

# 代表一個集群,集群中有多個節點,其中有一個為主節點,這個主節點是可以通過選舉產生的,主從節點是對於集群內部來說的.

# es的一個概念就是去中心化,字面上理解就是無中心節點,這是對於集群外部來說的,因為從外部來看es集群,在邏輯上是個整體,你與任何一個節點的通信和與整個es集群通信是等價的。

# cluster.name可以確定你的集群名稱,當你的elasticsearch集群在同一個網段中elasticsearch會自動的找到具有相同cluster.name的elasticsearch服務.

# 所以當同一個網段具有多個elasticsearch集群時cluster.name就成為同一個集群的標識.

#cluster.name: elasticsearch

#################################### Node ##############################

# 節點名稱同理,可自動生成也可手動配置.

#node.name: "Franz Kafka"

# 允許一個節點是否可以成為一個master節點,es是默認集群中的第一台機器為master,如果這台機器停止就會重新選舉master.

#node.master: true

# 允許該節點存儲數據(默認開啟)

#node.data: true

# 配置文件中給出了三種配置高性能集群拓撲結構的模式,如下:

# 1. 如果你想讓節點從不選舉為主節點,只用來存儲數據,可作為負載器

# node.master: false

# node.data: true

#

# 2. 如果想讓節點成為主節點,且不存儲任何數據,並保有空閑資源,可作為協調器

# node.master: true

# node.data: false

#

# 3. 如果想讓節點既不稱為主節點,又不成為數據節點,那么可將他作為搜索器,從節點中獲取數據,生成搜索結果等

# node.master: false

# node.data: false

# 監控集群狀態有一下插件和API可以使用:

# Use the Cluster Health API [http://localhost:9200/_cluster/health], the

# Node Info API [http://localhost:9200/_nodes] or GUI tools

# such as <http://www.elasticsearch.org/overview/marvel/>,

# <http://github.com/karmi/elasticsearch-paramedic>,

# <http://github.com/lukas-vlcek/bigdesk> and

# <http://mobz.github.com/elasticsearch-head> to inspect the cluster state.

# A node can have generic attributes associated with it, which can later be used

# for customized shard allocation filtering, or allocation awareness. An attribute

# is a simple key value pair, similar to node.key: value, here is an example:

#

#node.rack: rack314

# By default, multiple nodes are allowed to start from the same installation location

# to disable it, set the following:

#node.max_local_storage_nodes: 1

 

#################################### Index #############################

# 設置索引的分片數,默認為5

#index.number_of_shards: 5

# 設置索引的副本數,默認為1:

#index.number_of_replicas: 1

# 配置文件中提到的最佳實踐是,如果服務器夠多,可以將分片提高,盡量將數據平均分布到大集群中去

# 同時,如果增加副本數量可以有效的提高搜索性能

# 需要注意的是,"number_of_shards" 是索引創建后一次生成的,后續不可更改設置

# "number_of_replicas" 是可以通過API去實時修改設置的

#################################### Paths ####################################

# 配置文件存儲位置

#path.conf: /path/to/conf

# 數據存儲位置(單個目錄設置)

#path.data: /path/to/data

# 多個數據存儲位置,有利於性能提升

#path.data: /path/to/data1,/path/to/data2

# 臨時文件的路徑

#path.work: /path/to/work

# 日志文件的路徑

#path.logs: /path/to/logs

# 插件安裝路徑

#path.plugins: /path/to/plugins

#################################### Plugin ############################

# 設置插件作為啟動條件,如果一下插件沒有安裝,則該節點服務不會啟動

#plugin.mandatory: mapper-attachments,lang-groovy

################################### Memory ##############################

# 當JVM開始寫入交換空間時(swapping)ElasticSearch性能會低下,你應該保證它不會寫入交換空間

# 設置這個屬性為true來鎖定內存,同時也要允許elasticsearch的進程可以鎖住內存,linux下可以通過 `ulimit -l unlimited` 命令

#bootstrap.mlockall: true

# 確保 ES_MIN_MEM 和 ES_MAX_MEM 環境變量設置為相同的值,以及機器有足夠的內存分配給Elasticsearch

# 注意:內存也不是越大越好,一般64位機器,最大分配內存別才超過32G

############################## Network And HTTP #######################

# 設置綁定的ip地址,可以是ipv4或ipv6的,默認為0.0.0.0

#network.bind_host: 192.168.0.1

# 設置其它節點和該節點交互的ip地址,如果不設置它會自動設置,值必須是個真實的ip地址

#network.publish_host: 192.168.0.1

# 同時設置bind_host和publish_host上面兩個參數

#network.host: 192.168.0.1

# 設置節點間交互的tcp端口,默認是9300

#transport.tcp.port: 9300

 

# 設置是否壓縮tcp傳輸時的數據,默認為false,不壓縮

#transport.tcp.compress: true

# 設置對外服務的http端口,默認為9200

#http.port: 9200

# 設置請求內容的最大容量,默認100mb

#http.max_content_length: 100mb

# 使用http協議對外提供服務,默認為true,開啟

#http.enabled: false

################################### Gateway #############################

# gateway的類型,默認為local即為本地文件系統,可以設置為本地文件系統

#gateway.type: local

# 下面的配置控制怎樣以及何時啟動一整個集群重啟的初始化恢復過程

# (當使用shard gateway時,是為了盡可能的重用local data(本地數據))

# 一個集群中的N個節點啟動后,才允許進行恢復處理

#gateway.recover_after_nodes: 1

# 設置初始化恢復過程的超時時間,超時時間從上一個配置中配置的N個節點啟動后算起

#gateway.recover_after_time: 5m

# 設置這個集群中期望有多少個節點.一旦這N個節點啟動(並且recover_after_nodes也符合),

# 立即開始恢復過程(不等待recover_after_time超時)

#gateway.expected_nodes: 2

############################# Recovery Throttling #######################

# 下面這些配置允許在初始化恢復,副本分配,再平衡,或者添加和刪除節點時控制節點間的分片分配

# 設置一個節點的並行恢復數

# 1.初始化數據恢復時,並發恢復線程的個數,默認為4

#cluster.routing.allocation.node_initial_primaries_recoveries: 4

#

# 2.添加刪除節點或負載均衡時並發恢復線程的個數,默認為2

#cluster.routing.allocation.node_concurrent_recoveries: 2

# 設置恢復時的吞吐量(例如:100mb,默認為0無限制.如果機器還有其他業務在跑的話還是限制一下的好)

#indices.recovery.max_bytes_per_sec: 20mb

 

# 設置來限制從其它分片恢復數據時最大同時打開並發流的個數,默認為5

#indices.recovery.concurrent_streams: 5

# 注意: 合理的設置以上參數能有效的提高集群節點的數據恢復以及初始化速度

################################## Discovery ##########################

# 設置這個參數來保證集群中的節點可以知道其它N個有master資格的節點.默認為1,對於大的集群來說,可以設置大一點的值(2-4)

#discovery.zen.minimum_master_nodes: 1

# 探查的超時時間,默認3秒,提高一點以應對網絡不好的時候,防止腦裂

#discovery.zen.ping.timeout: 3s

# For more information, see

# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>

# 設置是否打開多播發現節點.默認是true.

# 當多播不可用或者集群跨網段的時候集群通信還是用單播吧

#discovery.zen.ping.multicast.enabled: false

# 這是一個集群中的主節點的初始列表,當節點(主節點或者數據節點)啟動時使用這個列表進行探測

#discovery.zen.ping.unicast.hosts: ["host1", "host2:port"]

# Slow Log部分與GC log部分略,不過可以通過相關日志優化搜索查詢速度

############## Memory(重點需要調優的部分) ################

# Cache部分:

# es有很多種方式來緩存其內部與索引有關的數據.其中包括filter cache

# filter cache部分:

# filter cache是用來緩存filters的結果的.默認的cache type是node type.node type的機制是所有的索引內部的分片共享filter cache.node type采用的方式是LRU方式.即:當緩存達到了某個臨界值之后,es會將最近沒有使用的數據清除出filter cache.使讓新的數據進入es.

# 這個臨界值的設置方法如下:indices.cache.filter.size 值類型:eg.:512mb 20%。默認的值是10%。

# out of memory錯誤避免過於頻繁的查詢時集群假死

# 1.設置es的緩存類型為Soft Reference,它的主要特點是據有較強的引用功能.只有當內存不夠的時候,才進行回收這類內存,因此在內存足夠的時候,它們通常不被回收.另外,這些引用對象還能保證在Java拋出OutOfMemory異常之前,被設置為null.它可以用於實現一些常用圖片的緩存,實現Cache的功能,保證最大限度的使用內存而不引起OutOfMemory.在es的配置文件加上index.cache.field.type: soft即可.

# 2.設置es最大緩存數據條數和緩存失效時間,通過設置index.cache.field.max_size: 50000來把緩存field的最大值設置為50000,設置index.cache.field.expire: 10m把過期時間設置成10分鍾.

#index.cache.field.max_size: 50000

#index.cache.field.expire: 10m

#index.cache.field.type: soft

# field data部分&&circuit breaker部分:

# 用於field data 緩存的內存數量,主要用於當使用排序,faceting操作時,elasticsearch會將一些熱點數據加載到內存中來提供給客戶端訪問,但是這種緩存是比較珍貴的,所以對它進行合理的設置.

# 可以使用值:eg:50mb 或者 30%(節點 node heap內存量),默認是:unbounded

#indices.fielddata.cache.size: unbounded

# field的超時時間.默認是-1,可以設置的值類型: 5m

#indices.fielddata.cache.expire: -1

# circuit breaker部分:

# 斷路器是elasticsearch為了防止內存溢出的一種操作,每一種circuit breaker都可以指定一個內存界限觸發此操作,這種circuit breaker的設定有一個最高級別的設定:indices.breaker.total.limit 默認值是JVM heap的70%.當內存達到這個數量的時候會觸發內存回收

# 另外還有兩組子設置:

#indices.breaker.fielddata.limit:當系統發現fielddata的數量達到一定數量時會觸發內存回收.默認值是JVM heap的70%

#indices.breaker.fielddata.overhead:在系統要加載fielddata時會進行預先估計,當系統發現要加載進內存的值超過limit * overhead時會進行進行內存回收.默認是1.03

#indices.breaker.request.limit:這種斷路器是elasticsearch為了防止OOM(內存溢出),在每次請求數據時設定了一個固定的內存數量.默認值是40%

#indices.breaker.request.overhead:同上,也是elasticsearch在發送請求時設定的一個預估系數,用來防止內存溢出.默認值是1

# Translog部分:

# 每一個分片(shard)都有一個transaction log或者是與它有關的預寫日志,(write log),在es進行索引(index)或者刪除(delete)操作時會將沒有提交的數據記錄在translog之中,當進行flush 操作的時候會將tranlog中的數據發送給Lucene進行相關的操作.一次flush操作的發生基於如下的幾個配置

#index.translog.flush_threshold_ops:當發生多少次操作時進行一次flush.默認是 unlimited

#index.translog.flush_threshold_size:當translog的大小達到此值時會進行一次flush操作.默認是512mb

#index.translog.flush_threshold_period:在指定的時間間隔內如果沒有進行flush操作,會進行一次強制flush操作.默認是30m

#index.translog.interval:多少時間間隔內會檢查一次translog,來進行一次flush操作.es會隨機的在這個值到這個值的2倍大小之間進行一次操作,默認是5s

#index.gateway.local.sync:多少時間進行一次的寫磁盤操作,默認是5s

# 以上的translog配置都可以通過API進行動態的設置

 

集群部署

集群名稱不修改,節點名稱修改、將elasticsearch.yml 復制到其他節點,並替換其配置文件。並關閉自動發現,防止其他外來節點連入。

 

3) Elasticsearch健康插件安裝

elasticsearch-head是一個elasticsearch的集群管理工具,它是完全由html5編寫的獨立網頁程序,你可以通過插件把它集成到es

 

 插件安裝方法1:

1.elasticsearch/bin/plugin -install mobz/elasticsearch-head

2.運行es

3.打開http://localhost:9200/_plugin/head/

插件安裝方法2:

1.https://github.com/mobz/elasticsearch-head下載zip 解壓

2.建立elasticsearch\plugins\head\_site文件

3.將解壓后的elasticsearch-head-master文件夾下的文件copy到_site

4.運行es

5.打開http://localhost:9200/_plugin/head/

 

在地址欄輸入es服務器的ip地址和端口點connect就可以連接到集群。下面是連接后的視圖。這是主界面,在這里可以看到es集群的基本信息(如:節點情況,索引情況)。

 

通過head插件可以對索引執行api 操作。

 

4) Shieldelasticsearch安全插件

官方安裝步驟以及驗證

 

Shield 2.0+

Compatible with the latest versions of Elasticsearch and Kibana

Step 1: Install Shield into Elasticsearch 

bin/plugin install license

bin/plugin install shield

Step 2: Start Elasticsearch  bin/elasticsearch

Step 3: Add an admin user  bin/shield/esusers useradd es_admin -r admin

Step 4: Test your authenticated user  curl -u es_admin -XGET 'http://localhost:9200/'

Step 5: Install Shield Into Kibana for full session support (Shield 2.2+)

Step 6: Dive into the Getting Started Guide

目前測試尚未通過,等待官方發布新版本

 

 

 

 

 

 

 

 

 

5)Zookeeper集群搭建

Kafka依賴Zookeeper管理自身集群(BrokerOffsetProducerConsumer等),所以先要安裝 Zookeeper。自然,為了達到高可用的目的,Zookeeper自身也不能是單點,接下來就介紹如何搭建一個最小的Zookeeper集群(3個 zk節點)

    此處選用Zookeeper的版本是3.4.9,此為Kafka0.10中推薦的Zookeeper版本。

    首先解壓

    tar -xzvf zookeeper-3.4.9.tar.gz  

    進入zookeeperconf目錄,將zoo_sample.cfg復制一份,命名為zoo.cfg,此即為Zookeeper的配置文件

    cp zoo_sample.cfg zoo.cfg  

    編輯zoo.cfg

# The number of milliseconds of each tick 

tickTime=2000 

# The number of ticks that the initial 

# synchronization phase can take 

initLimit=10 

# The number of ticks that can pass between 

# sending a request and getting an acknowledgement 

syncLimit=5 

# the directory where the snapshot is stored. 

dataDir=/data/zk/zk0/data 

dataLogDir=/data/zk/zk0/logs 

# the port at which the clients will connect 

clientPort=2181 

server.176 = 172.16.38.176:2888:3888

server.177 = 172.16.38.177:2888:3888

server.1771 = 172.16.38.177:2999:3999

autopurge.purgeInterval=1    

dataDirdataLogDir的路徑需要在啟動前創建好

clientPortzookeeper的服務端口

server.0/1/2zk集群中三個node的信息,定義格式為hostname:port1:port2,其中port1node間通信使用的端口,port2node選舉使用的端口,需確保三台主機的這兩個端口都是互通的

在另外兩台主機上執行同樣的操作,安裝並配置zookeeper

    分別在三台主機的dataDir路徑下創建一個文件名為myid的文件,文件內容為該zk節點的編號。例如在第一台主機上建立的myid文件內容是0,第二台是1

    接下來,啟動三台主機上的zookeeper服務:

    bin/zkServer.sh start  

    3個節點都啟動完成后,可依次執行如下命令查看集群狀態:

    bin/zkServer.sh status  

    命令輸出如下:

    Mode: leader 或 Mode: follower

    3個節點中,應有1leader和兩個follower

    驗證zookeeper集群高可用性:

假設目前3zk節點中,server0leaderserver1server2follower

我們停掉server0上的zookeeper服務:

    bin/zkServer.sh stop  

    再到server1server2上查看集群狀態,會發現此時server1(也有可能是server2)為leader,另一個為follower

    再次啟動server0zookeeper服務,運行zkServer.sh status檢查,發現新啟動的server0也為follower

    至此,zookeeper集群的安裝和高可用性驗證完成。

    附:Zookeeper默認會將控制台信息輸出到啟動路徑下的zookeeper.out中,顯然在生產環境中我們不能允許Zookeeper這樣做,通過如下方法,可以讓Zookeeper輸出按尺寸切分的日志文件:

修改conf/log4j.properties文件,將zookeeper.root.logger=INFO, CONSOLE改為

zookeeper.root.logger=INFO, ROLLINGFILE修改bin/zkEnv.sh文件,將

    ZOO_LOG4J_PROP="INFO,CONSOLE"改為ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

然后重啟zookeeper,就ok

 

 

 

6)Kafka集群搭建

Kafka 是一個高吞吐量的分布式發布訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特性。目前已經在各大公司中廣泛使用。和之前采用 Redis 做輕量級消息隊列不同,Kafka 利用磁盤作隊列,所以也就無所謂消息緩沖時的磁盤問題。此外,如果公司內部已有 Kafka 服務在運行,logstash 也可以快速接入,免去重復建設的麻煩。

kafka 基本概念

 

    Topic 主題,聲明一個主題,producer指定該主題發布消息,訂閱該主題的consumer對該主題進行消費

    Partition 每個主題可以分為多個分區,每個分區對應磁盤上一個目錄,分區可以分布在不同broker上,producer在發布消息時,可以通過指定partition key映射到對應分區,然后向該分區發布消息,在無partition key情況下,隨機選取分區,一段時間內觸發一次(比如10分鍾),這樣就保證了同一個producer向同一partition發布的消息是順序的。 消費者消費時,可以指定partition進行消費,也可以使用high-level-consumer api,自動進行負載均衡,並將partition分給consumer,一個partition只能被一個consumer進行消費

    Consumer 消費者,可以多實例部署,可以批量拉取,有兩類API可供選擇:一個simpleConsumer,暴露所有的操作給用戶,可以提交offset、fetch offset、指定partition fetch message;另外一個high-level-consumer(ZookeeperConsumerConnector),幫助用戶做基於partition自動分配的負載均衡,定期提交offset,建立消費隊列等。simpleConsumer相當於手動擋,high-level-consumer相當於自動擋。

    simpleConsumer:無需像high-level-consumer那樣向zk注冊brokerid、owner,甚至不需要提交offset到zk,可以將offset提交到任意地方比如(mysql,本地文件等)。

    high-level-consumer:一個進程中可以啟多個消費線程,一個消費線程即是一個consumer,假設A進程里有2個線程(consumerid分別為1,2),B進程有2個線程(consumerid分別為1,2),topic1的partition有5個,那么partition分配是這樣的:

    partition1 ---> A進程consumerid1

    partition2 ---> A進程consumerid1

    partition3 ---> A進程consumerid2

    partition4 ---> B進程consumer1

    partition5 ---> B進程consumer2

    Group High-level-consumer可以聲明group,每個group可以有多個consumer,每group各自管理各自的消費offset,各個不同group之間互不關聯影響。

由於目前版本消費的offset、owner、group都是consumer自己通過zk管理,所以group對於broker和producer並不關心,一些監控工具需要通過group來監控,simpleComsumer無需聲明group。

 

部署安裝

 

此例中,我們會安裝配置一個有兩個Broker組成的Kafka集群,並在其上創建一個兩個分區的Topic

本例中使用Kafka最新版本0.10.0

首先解壓官網下載kafkatar包

編輯config/server.properties文件,下面列出關鍵的參數

#此Broker的ID,集群中每個Broker的ID不可相同

broker.id=0

#監聽器,端口號與port一致即可

listeners=PLAINTEXT://:9092

#Broker監聽的端口

port=19092

#Broker的Hostname,填主機IP即可

host.name=172.16.38.176

#向Producer和Consumer建議連接的Hostname和port(此處有坑,具體見后)

advertised.host.name=172.16.38.176

advertised.port=9092

#進行IO的線程數,應大於主機磁盤數

num.io.threads=8

#消息文件存儲的路徑

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小時前的消息記錄

log.retention.hours=168

#每個Topic默認的分區數,一般在創建Topic時都會指定分區數,所以這個配成1就行了

num.partitions=1

#Zookeeper連接串,此處填寫上一節中安裝的三個zk節點的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182

將172.16.38.177 也按照上圖配置文件中,只是修改brokerid即可。

 

 

 

7)測試KafkaZookeeper集群連通性

(1)建立一個主題

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer

#注意:factor大小不能超過broker

 

(2)查看有哪些主題已經創建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181   #列出集群中所有的topic

summer  #已經創建成功

 

(3)查看summer這個主題的詳情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer

Topic:summer PartitionCount:16 ReplicationFactor:2 Configs:

Topic: summer Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 5 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 7 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 9 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 11 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 13 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 15 Leader: 1 Replicas: 1,0 Isr: 0,1

#主題名稱:summer

#Partition:16個,從0開始

#leader id01broker

#Replicas 副本存在於broker id0,1的上面

#Isr:活躍狀態的broker

 

(4)發送消息,這里使用的是生產者角色

[root@kafka1 data]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.38.176:9092,172.16.38.177:9092 --topic test1

 

(5)接收消息,這里使用的是消費者角色

[root@kafka2 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --topic test1

 

(6)驗證的效果,生產者和消費者

 

 

(7)刪除一個消息主題

[root@kafka2data]#/usr/local/kafka/bin/kafka-topics.sh--zookeeper 172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --delete --topic test1

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

 

這樣kafkazookeeper集群配置完畢。

 

8)Logstash部署

Logstash requires Java 7 or later. Use the official Oracle distribution or an open-source distribution such as OpenJDK.

Download and install the public signing key:

 

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

 

Add the following in your /etc/yum.repos.d/ directory in a file with a .repo suffix, for example logstash.repo

 

[logstash-2.4]

name=Logstash repository for 2.4.x packages

baseurl=https://packages.elastic.co/logstash/2.4/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

 

And your repository is ready for use. You can install it with:

 

yum install logstash

 

 

 

 

 

9)Kibana部署

     1Download and install the public signing key:

 

    rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

 

    2Create a file named kibana.repo in the /etc/yum.repos.d/ directory with the following contents:

 

    [kibana-4.6]

    name=Kibana repository for 4.6.x packages

    baseurl=https://packages.elastic.co/kibana/4.6/centos

    gpgcheck=1

    gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

    enabled=1

 

    3Install Kibana by running the following command:

 

    yum install kibana

 

    4Configure Kibana to automatically start during bootup. If your distribution is using the System V version of init (check with ps -p 1), run the following command:

 

    chkconfig --add kibana

 

   5If your distribution is using systemd, run the following commands instead:

 

    sudo /bin/systemctl daemon-reload

sudo /bin/systemctl enable kibana.service

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

四、系統使用示例

按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 數據順序

1) Logstash 作為kafka生產者示例

 

 

2) Logstash index 消費kafka示例

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM