kafka集群六、java操作kafka(沒有密碼驗證)


系列導航

一、kafka搭建-單機版

二、kafka搭建-集群搭建

三、kafka集群增加密碼驗證

四、kafka集群權限增加ACL

五、kafka集群__consumer_offsets副本數修改

六、java操作kafka(沒有密碼驗證)

七、java操作kafka(有密碼驗證)

    kafka環境搭建好了如何通過代碼來訪問?

先介紹不需要密碼驗證的kafka集群如何操作

1、環境

包:kafka-clients-0.11.0.1.jar

jkd:1.7

2、kafka配置類

package nopassword; 

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

 
/*
kafka沒有用戶名驗證的配置
*/
public class KafkaUtil {
 
    //kafka集群地址
    public static final String servers="PLAINTEXT://192.168.0.104:9092,PLAINTEXT://192.168.0.104:9092,PLAINTEXT://192.168.0.104:9092";
  
    //kafka集群生產者配置
    public static KafkaProducer<String, String> getProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers",servers );
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 0);//16384
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");    
        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
        return kp;
    }
 
    //kafka集群消費者配置
    public static KafkaConsumer<String, String> getConsumer(String groupId) {
 
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "100");
        props.put("max.partition.fetch.bytes", "10240");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);   
        return kc;
    }
}

3、生產者類ProducerClient

package nopassword; 

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
 
public class ProducerClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        sendToKafka();
    }
    
    private static Producer<String, String> producer = KafkaUtil.getProducer();
    public static void sendToKafka( ) { 
        for(int i=0;i<5000;i++){
            try {
                final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic",
                        "d+key-" + i, "{\"name\":\"哈哈\",\"id\":\"218CF4630C2F8795\"}");
                Future<RecordMetadata> send = producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null) {
                            e.printStackTrace();
                        }
                    }
                });

                System.out.println("sendToKafka-發送至Kafka:" + "d+key-" + i);

            } catch (Exception e) {
                e.printStackTrace();

            }
        }
        producer.close();  
        }
}

4、消費者類ConsumerClient

package nopassword; 

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
/*
消費者
*/
public class ConsumerClient {

    public  static KafkaConsumer<String, String> consumer = null;
    
    public static void main(String[] args) {
        fecthKafka();
    }
     
    public static void fecthKafka( ) {
         consumer = KafkaUtil.getConsumer("testGroup"); //group   
         consumer.subscribe(Arrays.asList("testTopic"));//topics  
 
         int i=0;
         while (true) {
             ConsumerRecords<String, String> records ;
             try {
                 records = consumer.poll(Long.MAX_VALUE);//毫秒
             }catch (Exception e){
                 e.printStackTrace();
                 continue;
             }

             for (ConsumerRecord<String, String> record : records) {
                  System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
                  i++;
                  System.out.println(i);
             }

             try {
                 consumer.commitSync();
             } catch (Exception e) {
                 e.printStackTrace();
                 continue;
             }
             
          
             
       }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM