默認你已經安裝配置了Zookeeper和Kafka。
為了目錄清晰,我的Kafka配置文件的Zookeeper部分是:加上了節點用來存放Kafka信息
啟動Zookeeper,然后啟動Kafka。
Zookeeper的節點樹:根目錄下有專門的Kafka存放節點【以前沒有配這個,結果Kafka的一大堆東西全部跑到根節點上了,很亂】
接下來是代碼部分了。
依賴包:
Log4j2配置文件:
<?xml version="1.0" encoding="UTF-8"?> <Configuration monitorInterval="1800"> <Filter type="ThresholdFilter" level="trace" /> <Appenders> <Console name="console" target="SYSTEM_OUT"> <!--控制台只輸出level及以上級別的信息(onMatch),其他的直接拒絕(onMismatch) --> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY" /> <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n" /> </Console> <Kafka name="Kafka" topic="my-topic" syncSend="false"> <PatternLayout pattern="%date %message" /> <Property name="bootstrap.servers">192.168.127.129:9092,192.168.127.130:9092,192.168.127.131:9092</Property> </Kafka> </Appenders> <Loggers> <Root level="WARN"> <!-- TRACE < DEBUG < INFO < WARN < ERROR < FATAL --> <AppenderRef ref="console" /> </Root> <Logger name="kafkaLog" level="trace"> <AppenderRef ref="Kafka" /> </Logger> </Loggers> </Configuration>
生產者:
package learn.kafka.log4j; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class SimpleProducer { private static Logger log = LogManager.getLogger("kafkaLog"); public static void main(String[] args) { for (int i = 10; i < 20; i++) { log.info("Hello---" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
消費者:
package learn.kafka.log4j; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.127.129:9092,192.168.127.130:9092,192.168.127.131:9092"); props.put("group.id", "testGroup"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱的topic,多個用逗號隔開 consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //consumer.close(); } }
先運行消費者,讓它監聽等待。
在運行生產者,讓它生產消息。
你會發現每隔一秒輸出一行信息。信息在value后面。