(一)Apache Kafka安裝
1、安裝環境與前提條件
安裝環境:Ubuntu16.04
前提條件:
- ubuntu系統下安裝好jdk 1.8以上版本,正確配置環境變量
- ubuntu系統下安裝好scala 2.11版本
- 安裝ZooKeeper(注:kafka自帶一個Zookeeper服務,如果不單獨安裝,也可以使用自帶的ZK)
2、安裝步驟
Apache基金會開源的這些軟件基本上安裝都比較方便,只需要下載、解壓、配置環境變量三步即可完成,kafka也一樣,官網選擇對應版本下載后直接解壓到一個安裝目錄下就可以使用了,如果為了方便可以在~/.bashrc里配置一下環境變量,這樣使用的時候就不需要每次都切換到安裝目錄了。
3、測試
接下來可以通過簡單的console窗口來測試kafka是否安裝正確。
(1)首先啟動ZooKeeper服務。
如果啟動自己安裝的ZooKeeper,使用命令zkServer.sh start
即可。
如果使用kafka自帶的ZK服務,啟動命令如下(啟動之后shell不會返回,后續其他命令需要另開一個Terminal):
$ cd /opt/tools/kafka #進入安裝目錄
$ bin/zookeeper-server-start.sh config/zookeeper.properties
(2)第二步啟動kafka服務
啟動Kafka服務的命令如下所示:
$ cd /opt/tools/kafka #進入安裝目錄
$ bin/kafka-server-start.sh config/server.properties
(3)第三步創建一個topic,假設為“test”
創建topic的命令如下所示,其參數也都比較好理解,依次指定了依賴的ZooKeeper,副本數量,分區數量,topic的名字:
$ cd /opt/tools/kafka #進入安裝目錄
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
創建完成后,可以通過如下所示的命令查看topic列表:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
(4)開啟Producer和Consumer服務
kafka提供了生產者和消費者對應的console窗口程序,可以先通過這兩個console程序來進行驗證。
首先啟動Producer:
$ cd /opt/tools/kafka #進入安裝目錄
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
然后啟動Consumer:
$ cd /opt/tools/kafka #進入安裝目錄
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
在打開生產者服務的終端輸入一些數據,回車后,在打開消費者服務的終端能看到生產者終端輸入的數據,則說明kafka安裝成功。
(二)Apache Kafka簡單示例
安裝好Kafka之后,本節我們通過一些實例來使用kafka,了解其基本的API和使用方法。
1、驗證同一個消費組的兩個消費者不會同時消費一個partition
我們通過一個簡單的例子來驗證同一個消費組的兩個消費者不會同時消費一個partition中的數據,首先,啟動一個生產者服務,對應的topic只有一個分區,然后創建4個消費者,其中兩個屬於同一組,另外兩個屬於另一組,即可驗證則原理,驗證情況如下圖所示:

2、Kafka的Java API
通過以下程序給出一個kafka的java簡單示例:
package com.kafka.sgz;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**生產者示例*/
public class KafkaProducerTest {
private static Producer<String, String> producer;
public final static String TOPIC="test1";
public KafkaProducerTest(){
Properties props=new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("acks","all");
props.put("retries",0);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(props);
}
public void produce(){
int messageNum=100;
final int count=120;
while(messageNum<count){
String key=String.valueOf(messageNum);
String data="@@@@hello kafka message"+key;
producer.send(new ProducerRecord<String,String>(TOPIC,key,data));
System.out.println(data);
messageNum++;
}
producer.close(); //注意發送完數據要關閉,否則可能出錯
}
public static void main(String[] args){
new KafkaProducerTest().produce();
}
}
package com.kafka.sgz;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerTest {
private static Consumer<String,String> consumer;
public final static String TOPIC="test1";
public KafkaConsumerTest(){
Properties props=new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test-consumer-group"); //組號
props.put("enable.auto.commit","true"); //如果value合法,將自動提交偏移量
props.put("auto.commit.interval.ms","1000"); //設置多久更新一次被消費信息的偏移量
props.put("session.timeout.ms","30000"); //設置會話響應時間,超過可以放棄消費或者直接消費下一條
props.put("auto.offset.reset","earliest"); //自動重置Offset
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumer=new KafkaConsumer<String,String>(props);
}
public void consume(){
consumer.subscribe(Arrays.asList(TOPIC));
while(true){
ConsumerRecords<String,String> records=consumer.poll(100);
for(ConsumerRecord<String,String> record:records){
System.out.println("get message:"+record.key()+"---"+record.value());
}
}
}
public static void main(String[] args){
new KafkaConsumerTest().consume();
}
}
3、Kafka的python API
通過以下程序給出一個kafka的python簡單示例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
class Kafka_producer():
'''
使用kafka的生產模塊
'''
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
))
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params)
producer = self.producer
producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
producer.flush()
except KafkaError as e:
print e
class Kafka_consumer():
'''
使用Kafka—python的消費模塊
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort ))
def consume_data(self):
try:
for message in self.consumer:
# print json.loads(message.value)
yield message
except KeyboardInterrupt, e:
print e
def main():
'''
測試consumer和producer
:return:
'''
# 測試生產模塊
producer = Kafka_producer("127.0.0.1", 9092, "ranktest")
for id in range(10):
params = '{abetst}:{null}---'+str(i)
producer.sendjsondata(params)
##測試消費模塊
#消費模塊的返回格式為ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
#\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
#\serialized_key_size=-1, serialized_value_size=21)
consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
message = consumer.consume_data()
for i in message:
print i.value
if __name__ == '__main__':
main()
總結
本文從實踐的角度介紹了kafka的安裝和相關API的使用,結合上一篇文章的原理,可以對Kafka有一個比較基礎的理解和認識。