使用java實現Kafka的生產者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package
com.lisg.kafkatest;
import
java.util.Properties;
import
kafka.javaapi.producer.Producer;
import
kafka.producer.KeyedMessage;
import
kafka.producer.Partitioner;
import
kafka.producer.ProducerConfig;
import
kafka.serializer.StringEncoder;
/**
* Kafka生產者
* @author lisg
*
*/
public
class
KafkaProducer {
public
static
void
main(String[] args) {
Properties props =
new
Properties();
//根據這個配置獲取metadata,不必是kafka集群上的所有broker,但最好至少有兩個
props.put(
"metadata.broker.list"
,
"vm1:9092,vm2:9092"
);
//消息傳遞到broker時的序列化方式
props.put(
"serializer.class"
, StringEncoder.
class
.getName());
//zk集群
props.put(
"zookeeper.connect"
,
"vm1:2181"
);
//是否獲取反饋
//0是不獲取反饋(消息有可能傳輸失敗)
//1是獲取消息傳遞給leader后反饋(其他副本有可能接受消息失敗)
//-1是所有in-sync replicas接受到消息時的反饋
props.put(
"request.required.acks"
,
"1"
);
// props.put("partitioner.class", MyPartition.class.getName());
//創建Kafka的生產者, key是消息的key的類型, value是消息的類型
Producer<Integer, String> producer =
new
Producer<Integer, String>(
new
ProducerConfig(props));
int
count =
0
;
while
(
true
) {
String message =
"message-"
+ ++count;
//消息主題是test
KeyedMessage<Integer, String> keyedMessage =
new
KeyedMessage<Integer, String>(
"test"
, message);
//message可以帶key, 根據key來將消息分配到指定區, 如果沒有key則隨機分配到某個區
// KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
producer.send(keyedMessage);
System.out.println(
"send: "
+ message);
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
// producer.close();
}
}
/**
* 自定義分區類
*
*/
class
MyPartition
implements
Partitioner {
public
int
partition(Object key,
int
numPartitions) {
return
key.hashCode()%numPartitions;
}
}
|
附件列表