java向linux的kafka發送消息 並接收消息實例


1.首先要關閉linux系統上的防火牆

service iptables status可以查看到iptables服務的當前狀態。

在此說一下關於啟動和關閉防火牆的命令:
1) 重啟后生效
開啟: chkconfig iptables on
關閉: chkconfig iptables off
2) 即時生效,重啟后失效
開啟: service iptables start
關閉: service iptables stop

2.開啟zookeeper服務和kafka服務,在之前的隨筆中有

3.開啟eclipse,添加好jar包,弄好環境,kafka開發需要的jar包列表自己百度,也可以用maven管理

4.寫producer類

package kafka;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class kafkaProducer extends Thread{
    private String topic;
    public kafkaProducer(String topic){
        super();
        this.topic = topic;
    }
    public static void main(String[] args) {
        new kafkaProducer("mytopic").start();
    }
    @Override
    public void run() {
        Producer producer =createProducer();
        int i = 0;
        while(true){
            producer.send(new KeyedMessage<Integer, String>(topic, "message:"+i++));;
            System.out.println("發送成功!");
            try{
                TimeUnit.SECONDS.sleep(1);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }
    private Producer createProducer(){
        Properties properties = new Properties();
        properties.put("zk.connect", "xx.xx.xx.xx:2181");
        properties.put("serializer.class",StringEncoder.class.getName());
        properties.put("metadata.broker.list","xx.xx.xx.xx:9092");
        return new Producer<Integer, String>(new ProducerConfig(properties));
    }
}

寫完這個之后運行,然后在linux系統上運行消費者就可以看到發送出去的消息了

[root@bogon kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper bogon:2181 --topic mytopic --from-beginning

生產者OK了

5.寫consumer類

package kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class kafkaConsumer extends Thread{
    private String topic;
    public kafkaConsumer(String topic){
        super();
        this.topic =topic;
    }
    public static void main(String[] args) {
        new kafkaConsumer("mytopic").start();
    }
    @Override
    public void run() {
        ConsumerConnector consumer = createConsumer();
        Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
        topicCountMap.put(topic, 1);
        Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("get:"+message);
        }
    }
    private ConsumerConnector createConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");
        properties.put("group.id", "0");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
    
}

上面的group.id隨便寫都可以,親測。

運行起生產者,然后再運行消費者就可以在控制台看到輸出的消息和接收到的消息了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM