Java操作Kafka创建主题、生产者、消费者


环境

JDK 1.8
Zookeeper 3.6.1
Kafka 2.6.0

引入依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.6.0</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-nop</artifactId>
  <version>1.7.25</version>
</dependency>

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.9.5</version>
</dependency>

创建主题

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.25.132:9092");  //kafka服务地址
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
    //创建名称为test1的topic,有5个分区
    NewTopic topic = new NewTopic("test1", 5, (short) 1); 
    client.createTopics(Arrays.asList(topic));
    client.close();//关闭
}

查看结果,在zookeeper路径/brokers/topics下新增了节点test1,就是刚才创建的topic主题
在这里插入图片描述

生产者

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    //异步发送20条消息
    for (int i = 1; i <= 20; i++){
        ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i);
        producer.send(record);
    }

    producer.close();
}

先启动控制台的消费者,监听topic=test1的数据

kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1

然后运行生产者程序,这时消费端会输出消息信息

[root@cluster01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1
message1
message2
message3
message4
message5
message6
message7
message8
message9
message10
message11
message12
message13
message14
message15
message16
message17
message18
message19
message20

带回调函数的生产者

回调函数是在生产者收到ack时调用,把上面的producer.send(record)加上回调即可

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e == null){
            System.out.println("success:" + recordMetadata.offset());
        }
        else{
            e.printStackTrace();
        }
    }
});

同步发送:消息发送后,会堵塞当前线程直到收到ack为止,才继续发送

//在后面加上get()即可
producer.send(record).get();

消费者

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    //消费test1主题
    consumer.subscribe(Arrays.asList("test1"));
    while (true){
        System.out.println("consumer is polling");
        //5秒等待
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(String.format("offset=%d,key=%s,value=%s",
                    record.offset(), record.key(), record.value()));
        }
        //同步提交,失败会重试
        consumer.commitSync();
        //异步提交,失败不会重试
        //consumer.commitAsync();
    }
}

运行消费者,看到输出“consumer is polling”之后,再运行生产者,结果如下:

D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\Idea\IntelliJ IDEA 2019.1.1\lib\idea_rt.jar=52238:D:\Idea\IntelliJ IDEA 2019.1.1\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk1.8.0_131\jre\lib\charsets.jar;D:\Java\jdk1.8.0_131\jre\lib\deploy.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_131\jre\lib\javaws.jar;D:\Java\jdk1.8.0_131\jre\lib\jce.jar;D:\Java\jdk1.8.0_131\jre\lib\jfr.jar;D:\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_131\jre\lib\jsse.jar;D:\Java\jdk1.8.0_131\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_131\jre\lib\plugin.jar;D:\Java\jdk1.8.0_131\jre\lib\resources.jar;D:\Java\jdk1.8.0_131\jre\lib\rt.jar;F:\JavaTest\ConsoleTrain\kafka\target\classes;E:\.m2\repository\org\apache\kafka\kafka-clients\2.6.0\kafka-clients-2.6.0.jar;E:\.m2\repository\com\github\luben\zstd-jni\1.4.4-7\zstd-jni-1.4.4-7.jar;E:\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;E:\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;E:\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\.m2\repository\org\slf4j\slf4j-nop\1.7.25\slf4j-nop-1.7.25.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.5\jackson-databind-2.9.5.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.5\jackson-core-2.9.5.jar com.train.consumer.SimpleConsumer
consumer is polling
consumer is polling
partition=1,offset=15,key=key2,value=message2
partition=1,offset=16,key=key6,value=message6
partition=1,offset=17,key=key12,value=message12
consumer is polling
partition=0,offset=25,key=key1,value=message1
partition=0,offset=26,key=key3,value=message3
partition=0,offset=27,key=key10,value=message10
partition=0,offset=28,key=key15,value=message15
partition=0,offset=29,key=key19,value=message19
partition=3,offset=20,key=key5,value=message5
partition=3,offset=21,key=key11,value=message11
partition=3,offset=22,key=key14,value=message14
partition=3,offset=23,key=key18,value=message18
partition=2,offset=15,key=key7,value=message7
partition=2,offset=16,key=key8,value=message8
partition=2,offset=17,key=key16,value=message16
partition=4,offset=25,key=key4,value=message4
partition=4,offset=26,key=key9,value=message9
partition=4,offset=27,key=key13,value=message13
partition=4,offset=28,key=key17,value=message17
partition=4,offset=29,key=key20,value=message20
consumer is polling

注意观察上图的数据:consumer消费数据,只保证同一个分区内的数据是有序的。当1个consumer去消费不同分区的数据时,分区之间的message不能保证顺序。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM