kafka 2.12在linux下的安裝部署及java客戶端對接


一、下載kafka_2.12-2.4.0.tgz並解壓至/home/kafka_2.12-2.4.0

二、配置kafka

 2.1 創建kafka日志文件夾:/home/kafka_2.12-2.4.0/logs

2.2 創建zookeeper數據目錄:/tmp/zookeeper

2.3 配置/home/kafka_2.12-2.4.0/config/server.properties   內容如下(SSL證書在下面介紹):

ssl.keystore.location=/home/ca/server/server.keystore.jks
ssl.keystore.password=mima123
ssl.key.password=mima123
ssl.truststore.location=/home/ca/trust/server.truststore.jks
ssl.truststore.password=mima123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
#security.inter.broker.protocol=SSL
inter.broker.listener.name=SSL

############################# Server Basics #############################
broker.id=0
listeners=SSL://阿里雲內網IP:9093
advertised.listeners=SSL://阿里雲外網IP:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################

log.dirs=/home/kafka_2.12-2.4.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=阿里雲內網IP:2181
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enble=true

 2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

2.5 配置/etc/hosts文件,增加紅色行,IP為阿里雲內網IP

127.0.0.1    localhost    localhost.localdomain    localhost4    localhost4.localdomain4
::1    localhost    localhost.localdomain    localhost6    localhost6.localdomain6
172.18.54.18    iZwz9gq8vhwxtgpg21yonsZ    iZwz9gq8vhwxtgpg21yonsZ
172.18.54.18 kafka-single

三、生成SSL相關證書文件

3.1、創建四個文件夾 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client

3.2、簽發相關證書

第一步:生成server.keystore.jks文件(即:生成服務端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single

第二步:生成CA認證證書(為了保證整個證書的安全性,需要使用CA進行證書的簽名保證)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"

第三步:通過CA證書創建一個客戶端信任證書
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第四步:通過CA證書創建一個服務端器端信任證書
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第五步:服務器證書的簽名處理
第1小步:導出服務器端證書server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA給服務器端證書進行簽名處理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:將CA證書導入到服務器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:將已簽名的服務器證書導入到服務器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123

客戶端SSL證書簽發
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:導出客戶端證書client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA給客戶端證書進行簽名處理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:將CA證書導入到客戶端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:將已簽名的證書導入到客戶端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123

四、啟動和停止kafka和zookeeper服務

cd /home/kafka_2.12-2.4.0/bin

啟動zookeeper:

./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &

啟動kafka:

./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &

查看topic情況:

./kafka-topics.sh --list --zookeeper localhost:2181

關閉kafka:

./kafka-server-stop.sh

關閉zookeeper:

./zookeeper-server-stop.sh

查看 kafka 的 topic 情況:

./kafka-topics.sh --list --zookeeper 172.18.54.18:2181

查看topic詳細信息:

./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1

生產者客戶端命令:

./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1

消費者客戶端命令:

./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning

五、JAVA客戶端對接

5.1 Producer

package com.xrh.extend.kafka;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {

    public static String topic = "topic2";//定義主題

    public static void main(String[] args) throws InterruptedException {
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲外網IP:9093");//kafka地址,多個地址用逗號分割
//        acks:消息的確認機制,默認值是0。
//        acks=0:如果設置為0,生產者不會等待kafka的響應。
//        acks=1:這個配置意味着kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機器的成功響應。
//        acks=all:這個配置意味着leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。
        props.put("acks", "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        try {
            int i = 1;
            while (i < 20) {
                String msg = "測試 Hello," + new Random().nextInt(100);
 
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg);
                kafkaProducer.send(record, new MyProducerCallBack());
                System.out.println("消息發送成功:" + msg);
                ++ i;
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }

    }
    
    private static class MyProducerCallBack implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if(null != e){
             e.printStackTrace();
             return;
          }
          System.out.println("時間戳,主題,分區,位移: " + recordMetadata.timestamp() 
              + ", " + recordMetadata.topic() + "," + recordMetadata.partition() 
              + " " + recordMetadata.offset());
       }
    };
    
//    acks = 1
//    batch.size = 16384 //當多條消息需要發送到同一個分區時,生產者會嘗試合並網絡請求。這會提高client和生產者的效率。
//    bootstrap.servers = [39.108.124.173:9092]
//    buffer.memory = 33554432
//    client.dns.lookup = default
//    client.id = 
//    compression.type = none
//    connections.max.idle.ms = 540000
//    delivery.timeout.ms = 120000
//    enable.idempotence = false
//    interceptor.classes = []
//    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
//    linger.ms = 0
//    max.block.ms = 60000
//    max.in.flight.requests.per.connection = 5
//    max.request.size = 1048576
//    metadata.max.age.ms = 300000
//    metric.reporters = []
//    metrics.num.samples = 2
//    metrics.recording.level = INFO
//    metrics.sample.window.ms = 30000
//    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
//    receive.buffer.bytes = 32768
//    reconnect.backoff.max.ms = 1000
//    reconnect.backoff.ms = 50
//    request.timeout.ms = 30000
//    retries = 2147483647   //配置為大於0的值的話,客戶端會在消息發送失敗時重新發送。
//    retry.backoff.ms = 100
//    sasl.client.callback.handler.class = null
//    sasl.jaas.config = null
//    sasl.kerberos.kinit.cmd = /usr/bin/kinit
//    sasl.kerberos.min.time.before.relogin = 60000
//    sasl.kerberos.service.name = null
//    sasl.kerberos.ticket.renew.jitter = 0.05
//    sasl.kerberos.ticket.renew.window.factor = 0.8
//    sasl.login.callback.handler.class = null
//    sasl.login.class = null
//    sasl.login.refresh.buffer.seconds = 300
//    sasl.login.refresh.min.period.seconds = 60
//    sasl.login.refresh.window.factor = 0.8
//    sasl.login.refresh.window.jitter = 0.05
//    sasl.mechanism = GSSAPI
//    security.protocol = PLAINTEXT
//    security.providers = null
//    send.buffer.bytes = 131072
//    ssl.cipher.suites = null
//    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
//    ssl.endpoint.identification.algorithm = https
//    ssl.key.password = null
//    ssl.keymanager.algorithm = SunX509
//    ssl.keystore.location = null
//    ssl.keystore.password = null
//    ssl.keystore.type = JKS
//    ssl.protocol = TLS
//    ssl.provider = null
//    ssl.secure.random.implementation = null
//    ssl.trustmanager.algorithm = PKIX
//    ssl.truststore.location = null
//    ssl.truststore.password = null
//    ssl.truststore.type = JKS
//    transaction.timeout.ms = 60000
//    transactional.id = null
//    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
}

5.2 Consumer

package com.xrh.extend.kafka;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import javafx.util.Duration;

public class Consumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲外網IP:9093");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
//        p.put("auto.offset.reset", "latest");
//        bootstrap.servers: kafka的地址。
//        group.id:組名 不同組名可以重復消費。例如你先使用了組名A消費了kafka的1000條數據,但是你還想再次進行消費這1000條數據,並且不想重新去產生,那么這里你只需要更改組名就可以重復消費了。
//        enable.auto.commit:是否自動提交,默認為true。
//        auto.commit.interval.ms: 從poll(拉)的回話處理時長。
//        session.timeout.ms:超時時間。
//        max.poll.records:一次最大拉取的條數。
//        auto.offset.reset:消費規則,默認earliest 。
//        earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
//        latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 。
//        none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常。
//        key.serializer: 鍵序列化,默認org.apache.kafka.common.serialization.StringDeserializer。
//        value.deserializer:值序列化,默認org.apache.kafka.common.serialization.StringDeserializer。

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息
        while(true){
            
            ConsumerRecords<String, String> consumerDatas = consumer.poll(100);
            if( consumerDatas.count() > 0 ){
                Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator();
                while(consumerIter.hasNext()){
                    ConsumerRecord<String, String>  consumerData = consumerIter.next();
                    System.out.printf("offset = %d, key = %s, value = %s%n", 
                            consumerData.offset(), consumerData.key(), consumerData.value());
                }
            }else{
                System.out.println("KafkaConsumer1 is waiting message...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

       }
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM