搭建環境略(偽集群即可以),但要注意Kafka的配置必須配置的,少配了也一樣可以用,但是只能單機使用,外部機器無法連接,網上也有說。
host.name=192.168.1.30
advertised.host.name=192.168.1.30
interfaceshost.name=192.168.1.30
0.10.0.0應該和0.9一樣缺少log4j的依賴,不能直接log4j TO kafka。 想用的可以依賴kafka-log4j-appender此包即可,或者flume協同
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.0.0</version>
</dependency>
客戶端命令:
消息者(全)
kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning
生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
JAVA生產、消費(直接上官方例子)
生產都
-
Properties props = new Properties();
-
props.put("bootstrap.servers", "localhost:9092");
-
props.put("acks", "all");
-
props.put("retries ", 1);
-
props.put("buffer.memory", 33554432);
-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-
Producer<String, String> producer = new KafkaProducer<>(props);
-
for(int i = 0; i < 100; i++)
-
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
-
producer.close();
消費者
-
-
volatile RUNNING =true;
-
-
Properties props = new Properties();
-
props.put("bootstrap.servers", "localhost:9092");
-
props.put("group.id", "test");//不同ID 可以同時訂閱消息
-
props.put("enable.auto.commit", "false");
-
props.put("auto.commit.interval.ms", "1000");
-
props.put("session.timeout.ms", "30000");
-
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
consumer.subscribe(Arrays.asList("foo", "bar" , " my-topic "));//訂閱TOPIC
-
try {
-
while(RUNNING) {//輪詢
-
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
-
for (TopicPartition partition : records.partitions()) {
-
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
-
for (ConsumerRecord<String, String> record : partitionRecords) {
-
//可以自定義Handler,處理對應的TOPIC消息(partitionRecords.key())
-
System.out.println(record.offset() + ": " + record.value());
-
}
-
consumer.commitSync();//同步
-
}
-
}
-
} finally {
-
consumer.close();
-
}
總結:已經服務化的東西,只能說,一:學學配置,二:學學使用方法,從中增加需要的邏輯。一般都是黑箱,要改底層,得先遇到場景。用起來比較容易