前言
Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
Kafka官網
| http://kafka.apache.org/
步驟
下載Kafka
進入Kafka的官網選擇自己需要的版本下載即可,我這里選擇的是2.12
版本。
# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
創建安裝目錄
我一般創建於usr/local
的目錄下
# mkdir /usr/local/kafka
解壓到安裝目錄下
將下載好的kafka
解壓到剛才創建的目錄下
# tar -zxvf kafka_2.12-2.2.0.tgz -C /usr/local/kafka/
修改配置文件
編輯kafka
的配置文件server.properties
# vi /usr/local/kafka/kafka_2.12-2.2.0/config/server.properties
log.dirs=/usr/local/kafka/kafka_2.12-2.2.0/kafka-logs //日志文件
#遠程連接
#去掉31行的注釋,listeners=PLAINTEXT://:9092
#去掉36行的注釋,把advertised.listeners值改為PLAINTEXT://host.name:9092(host.name是你的IP地址)
啟動zookeeper
運行kafka之前,需要啟動zookeeper
# /usr/local/kafka/kafka_2.12-2.2.0/bin/zookeeper-server-start.sh /usr/local/kafka/kafka_2.12-2.2.0/config/zookeeper.properties
啟動kafka
# /usr/local/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka/kafka_2.12-2.2.0/config/server.properties
到這里安裝已經完成了,走下來沒遇到什么報錯
創建topic
運行kafka-topics.sh
腳本,可以看到一些幫助命令
# /usr/local/kafka/kafka_2.12-2.2.0/bin/kafka-topics.sh
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to. In case of providing this, a
direct Zookeeper connection won't be
required.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.It is
supported only in combination with --
create if --bootstrap-server option
is used.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
Not supported with the --bootstrap-
server option.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist. Not
supported with the --bootstrap-
server option.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: hosts> DEPRECATED, The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
創建wechat
並創建10個分區
# bin/kafka-topics.sh --create --topic wechat --partitions 10 --replication-factor 1 --zookeeper localhost:2181
查看topic詳情
bin/kafka-topics.sh --topic wechat --describe --zookeeper localhost:2181
安裝腳本
#!/bin/bash
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
mkdir /usr/local/kafka
tar -zxvf kafka_2.12-2.2.0.tgz -C /usr/local/kafka/
network=`ip a | grep '2: ' | awk {'print $2'} | sed "s/://g"`
ip=`ifconfig $network |grep -w 'inet' | awk '{print $2}'`
if [ 0 -eq $? ] ; then
sed -i "/^#listeners=PLAINTEXT/clisteners=PLAINTEXT://$ip:9092" "/usr/local/kafka/kafka_2.12-2.2.0/config/server.properties"
sed -i "/^log.dir/clog.dirs=/usr/local/kafka/kafka_2.12-2.2.0/kafka-logs" "/usr/local/kafka/kafka_2.12-2.2.0/config/server.properties"
sed -i "/^#advertised.listeners/cadvertised.listeners=PLAINTEXT://$ip:9092" "/usr/local/kafka/kafka_2.12-2.2.0/config/server.properties"
else
echo "No geting IP! Kafka config need reconfigure!"
fi
nohup /usr/local/kafka/kafka_2.12-2.2.0/bin/zookeeper-server-start.sh /usr/local/kafka/kafka_2.12-2.2.0/config/zookeeper.properties >zookeeper.log 2>&1 &
nohup /usr/local/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka/kafka_2.12-2.2.0/config/server.properties >kafka.log 2>&1 &