一、下載kafka_2.12-2.4.0.tgz並解壓至/home/kafka_2.12-2.4.0
二、配置kafka
2.1 創建kafka日志文件夾:/home/kafka_2.12-2.4.0/logs
2.2 創建zookeeper數據目錄:/tmp/zookeeper
2.3 配置/home/kafka_2.12-2.4.0/config/server.properties 內容如下(SSL證書在下面介紹):
ssl.keystore.location=/home/ca/server/server.keystore.jks ssl.keystore.password=mima123 ssl.key.password=mima123 ssl.truststore.location=/home/ca/trust/server.truststore.jks ssl.truststore.password=mima123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.endpoint.identification.algorithm= #security.inter.broker.protocol=SSL inter.broker.listener.name=SSL ############################# Server Basics ############################# broker.id=0 listeners=SSL://阿里雲內網IP:9093 advertised.listeners=SSL://阿里雲外網IP:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/home/kafka_2.12-2.4.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=阿里雲內網IP:2181 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0 delete.topic.enble=true
2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
2.5 配置/etc/hosts文件,增加紅色行,IP為阿里雲內網IP
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.18.54.18 iZwz9gq8vhwxtgpg21yonsZ iZwz9gq8vhwxtgpg21yonsZ
172.18.54.18 kafka-single
三、生成SSL相關證書文件
3.1、創建四個文件夾 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client
3.2、簽發相關證書
第一步:生成server.keystore.jks文件(即:生成服務端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single
第二步:生成CA認證證書(為了保證整個證書的安全性,需要使用CA進行證書的簽名保證)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"
第三步:通過CA證書創建一個客戶端信任證書
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第四步:通過CA證書創建一個服務端器端信任證書
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:服務器證書的簽名處理
第1小步:導出服務器端證書server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA給服務器端證書進行簽名處理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:將CA證書導入到服務器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:將已簽名的服務器證書導入到服務器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123
客戶端SSL證書簽發
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:導出客戶端證書client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA給客戶端證書進行簽名處理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:將CA證書導入到客戶端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:將已簽名的證書導入到客戶端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123
四、啟動和停止kafka和zookeeper服務
cd /home/kafka_2.12-2.4.0/bin
啟動zookeeper:
./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &
啟動kafka:
./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &
查看topic情況:
./kafka-topics.sh --list --zookeeper localhost:2181
關閉kafka:
./kafka-server-stop.sh
關閉zookeeper:
./zookeeper-server-stop.sh
查看 kafka 的 topic 情況:
./kafka-topics.sh --list --zookeeper 172.18.54.18:2181
查看topic詳細信息:
./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1
生產者客戶端命令:
./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1
消費者客戶端命令:
./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning
五、JAVA客戶端對接
5.1 Producer
package com.xrh.extend.kafka; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public class Producer { public static String topic = "topic2";//定義主題 public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲外網IP:9093");//kafka地址,多個地址用逗號分割 // acks:消息的確認機制,默認值是0。 // acks=0:如果設置為0,生產者不會等待kafka的響應。 // acks=1:這個配置意味着kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機器的成功響應。 // acks=all:這個配置意味着leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。 props.put("acks", "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); try { int i = 1; while (i < 20) { String msg = "測試 Hello," + new Random().nextInt(100); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg); kafkaProducer.send(record, new MyProducerCallBack()); System.out.println("消息發送成功:" + msg); ++ i; Thread.sleep(500); } } finally { kafkaProducer.close(); } } private static class MyProducerCallBack implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ e.printStackTrace(); return; } System.out.println("時間戳,主題,分區,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset()); } }; // acks = 1 // batch.size = 16384 //當多條消息需要發送到同一個分區時,生產者會嘗試合並網絡請求。這會提高client和生產者的效率。 // bootstrap.servers = [39.108.124.173:9092] // buffer.memory = 33554432 // client.dns.lookup = default // client.id = // compression.type = none // connections.max.idle.ms = 540000 // delivery.timeout.ms = 120000 // enable.idempotence = false // interceptor.classes = [] // key.serializer = class org.apache.kafka.common.serialization.StringSerializer // linger.ms = 0 // max.block.ms = 60000 // max.in.flight.requests.per.connection = 5 // max.request.size = 1048576 // metadata.max.age.ms = 300000 // metric.reporters = [] // metrics.num.samples = 2 // metrics.recording.level = INFO // metrics.sample.window.ms = 30000 // partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner // receive.buffer.bytes = 32768 // reconnect.backoff.max.ms = 1000 // reconnect.backoff.ms = 50 // request.timeout.ms = 30000 // retries = 2147483647 //配置為大於0的值的話,客戶端會在消息發送失敗時重新發送。 // retry.backoff.ms = 100 // sasl.client.callback.handler.class = null // sasl.jaas.config = null // sasl.kerberos.kinit.cmd = /usr/bin/kinit // sasl.kerberos.min.time.before.relogin = 60000 // sasl.kerberos.service.name = null // sasl.kerberos.ticket.renew.jitter = 0.05 // sasl.kerberos.ticket.renew.window.factor = 0.8 // sasl.login.callback.handler.class = null // sasl.login.class = null // sasl.login.refresh.buffer.seconds = 300 // sasl.login.refresh.min.period.seconds = 60 // sasl.login.refresh.window.factor = 0.8 // sasl.login.refresh.window.jitter = 0.05 // sasl.mechanism = GSSAPI // security.protocol = PLAINTEXT // security.providers = null // send.buffer.bytes = 131072 // ssl.cipher.suites = null // ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] // ssl.endpoint.identification.algorithm = https // ssl.key.password = null // ssl.keymanager.algorithm = SunX509 // ssl.keystore.location = null // ssl.keystore.password = null // ssl.keystore.type = JKS // ssl.protocol = TLS // ssl.provider = null // ssl.secure.random.implementation = null // ssl.trustmanager.algorithm = PKIX // ssl.truststore.location = null // ssl.truststore.password = null // ssl.truststore.type = JKS // transaction.timeout.ms = 60000 // transactional.id = null // value.serializer = class org.apache.kafka.common.serialization.StringSerializer }
5.2 Consumer
package com.xrh.extend.kafka; import java.util.Collections; import java.util.Iterator; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import javafx.util.Duration; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲外網IP:9093"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); // p.put("auto.offset.reset", "latest"); // bootstrap.servers: kafka的地址。 // group.id:組名 不同組名可以重復消費。例如你先使用了組名A消費了kafka的1000條數據,但是你還想再次進行消費這1000條數據,並且不想重新去產生,那么這里你只需要更改組名就可以重復消費了。 // enable.auto.commit:是否自動提交,默認為true。 // auto.commit.interval.ms: 從poll(拉)的回話處理時長。 // session.timeout.ms:超時時間。 // max.poll.records:一次最大拉取的條數。 // auto.offset.reset:消費規則,默認earliest 。 // earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。 // latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 。 // none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常。 // key.serializer: 鍵序列化,默認org.apache.kafka.common.serialization.StringDeserializer。 // value.deserializer:值序列化,默認org.apache.kafka.common.serialization.StringDeserializer。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息 while(true){ ConsumerRecords<String, String> consumerDatas = consumer.poll(100); if( consumerDatas.count() > 0 ){ Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator(); while(consumerIter.hasNext()){ ConsumerRecord<String, String> consumerData = consumerIter.next(); System.out.printf("offset = %d, key = %s, value = %s%n", consumerData.offset(), consumerData.key(), consumerData.value()); } }else{ System.out.println("KafkaConsumer1 is waiting message..."); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }