java实现Kafka生产者示例


使用java实现Kafka的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.lisg.kafkatest;
 
import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
 
/**
  * Kafka生产者
  * @author lisg
  *
  */
public class KafkaProducer {
 
     public static void main(String[] args) {
         
         Properties props = new Properties();
         //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个
         props.put( "metadata.broker.list" , "vm1:9092,vm2:9092" );
         //消息传递到broker时的序列化方式
         props.put( "serializer.class" , StringEncoder. class .getName());
         //zk集群
         props.put( "zookeeper.connect" , "vm1:2181" );
         //是否获取反馈
         //0是不获取反馈(消息有可能传输失败)
         //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)
         //-1是所有in-sync replicas接受到消息时的反馈
         props.put( "request.required.acks" , "1" );
//      props.put("partitioner.class", MyPartition.class.getName());
         
         //创建Kafka的生产者, key是消息的key的类型, value是消息的类型
         Producer<Integer, String> producer = new Producer<Integer, String>(
                 new ProducerConfig(props));
         
         int count = 0 ;
         while ( true ) {
             String message = "message-" + ++count;
             //消息主题是test
             KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>( "test" , message);
             //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区
//          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
             producer.send(keyedMessage);
             System.out.println( "send: " + message);
             try {
                 Thread.sleep( 1000 );
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
         
//      producer.close();
     }
 
}
 
/**
  * 自定义分区类
  *
  */
class MyPartition implements Partitioner {
 
     public int partition(Object key, int numPartitions) {
         return key.hashCode()%numPartitions;
     }
     
}






附件列表

     


    免责声明!

    本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



     
    粤ICP备18138465号  © 2018-2025 CODEPRJ.COM