java實現Kafka生產者示例


使用java實現Kafka的生產者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.lisg.kafkatest;
 
import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
 
/**
  * Kafka生產者
  * @author lisg
  *
  */
public class KafkaProducer {
 
     public static void main(String[] args) {
         
         Properties props = new Properties();
         //根據這個配置獲取metadata,不必是kafka集群上的所有broker,但最好至少有兩個
         props.put( "metadata.broker.list" , "vm1:9092,vm2:9092" );
         //消息傳遞到broker時的序列化方式
         props.put( "serializer.class" , StringEncoder. class .getName());
         //zk集群
         props.put( "zookeeper.connect" , "vm1:2181" );
         //是否獲取反饋
         //0是不獲取反饋(消息有可能傳輸失敗)
         //1是獲取消息傳遞給leader后反饋(其他副本有可能接受消息失敗)
         //-1是所有in-sync replicas接受到消息時的反饋
         props.put( "request.required.acks" , "1" );
//      props.put("partitioner.class", MyPartition.class.getName());
         
         //創建Kafka的生產者, key是消息的key的類型, value是消息的類型
         Producer<Integer, String> producer = new Producer<Integer, String>(
                 new ProducerConfig(props));
         
         int count = 0 ;
         while ( true ) {
             String message = "message-" + ++count;
             //消息主題是test
             KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>( "test" , message);
             //message可以帶key, 根據key來將消息分配到指定區, 如果沒有key則隨機分配到某個區
//          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
             producer.send(keyedMessage);
             System.out.println( "send: " + message);
             try {
                 Thread.sleep( 1000 );
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
         
//      producer.close();
     }
 
}
 
/**
  * 自定義分區類
  *
  */
class MyPartition implements Partitioner {
 
     public int partition(Object key, int numPartitions) {
         return key.hashCode()%numPartitions;
     }
     
}






附件列表

     


    免責聲明!

    本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



     
    粵ICP備18138465號   © 2018-2025 CODEPRJ.COM