[Kafka] - Kafka Java Producer代碼實現


根據業務需要可以使用Kafka提供的Java Producer API進行產生數據,並將產生的數據發送到Kafka對應Topic的對應分區中,入口類為:Producer

Kafka的Producer API主要提供下列三個方法:

  public void send(KeyedMessage<K,V> message) 發送單條數據到Kafka集群

  public void send(List<KeyedMessage<K,V>> messages) 發送多條數據(數據集)到Kafka集群

  public void close() 關閉Kafka連接資源

使用Java語言實現Kafka的Consumer詳見博客: Java 實現 High Level Consumer API 以及 Java實現LowerLevelConsumerAPI

======================================================================

一、JavaKafkaProducerPartitioner:自定義的數據分區器,功能是:決定輸入的key/value鍵值對的message發送到Topic的那個分區中,返回分區id,范圍:[0,分區數量); 這里的實現比較簡單,根據key中的數字決定分區的值。具體代碼如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducerPartitioner implements Partitioner {

    /**
     * 無參構造函數
     */
    public JavaKafkaProducerPartitioner() {
        this(new VerifiableProperties());
    }

    /**
     * 構造函數,必須給定
     *
     * @param properties 上下文
     */
    public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
        // nothings
    }

    @Override
    public int partition(Object key, int numPartitions) {
        int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
        return num % numPartitions;
    }
}

 

二、 JavaKafkaProducer:通過Kafka提供的API進行數據產生操作的測試類;具體代碼如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducer {
    private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
    public static final String TOPIC_NAME = "test";
    public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
    public static final int chartsLength = charts.length;


    public static void main(String[] args) {
        String brokerList = "192.168.187.149:9092";
        brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
        brokerList = "192.168.187.146:9092";
        Properties props = new Properties();
        props.put("metadata.broker.list", brokerList);
        /**
         * 0表示不等待結果返回<br/>
         * 1表示等待至少有一個服務器返回數據接收標識<br/>
         * -1表示必須接收到所有的服務器返回標識,及同步寫入<br/>
         * */
        props.put("request.required.acks", "0");
        /**
         * 內部發送數據是異步還是同步
         * sync:同步, 默認
         * async:異步
         */
        props.put("producer.type", "async");
        /**
         * 設置序列化的類
         * 可選:kafka.serializer.StringEncoder
         * 默認:kafka.serializer.DefaultEncoder
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * 設置分區類
         * 根據key進行數據分區
         * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區
         * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉換為字節數組后進行hash分區
         */
        props.put("partitioner.class", "JavaKafkaProducerPartitioner");

        // 重試次數
        props.put("message.send.max.retries", "3");

        // 異步提交的時候(async),並發提交的記錄數
        props.put("batch.num.messages", "200");

        // 設置緩沖區大小,默認10KB
        props.put("send.buffer.bytes", "102400");

        // 2. 構建Kafka Producer Configuration上下文
        ProducerConfig config = new ProducerConfig(props);

        // 3. 構建Producer對象
        final Producer<String, String> producer = new Producer<String, String>(config);

        // 4. 發送數據到服務器,並發線程發送
        final AtomicBoolean flag = new AtomicBoolean(true);
        int numThreads = 50;
        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
        for (int i = 0; i < 5; i++) {
            pool.submit(new Thread(new Runnable() {
                @Override
                public void run() {
                    while (flag.get()) {
                        // 發送數據
                        KeyedMessage message = generateKeyedMessage();
                        producer.send(message);
                        System.out.println("發送數據:" + message);

                        // 休眠一下
                        try {
                            int least = 10;
                            int bound = 100;
                            Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(Thread.currentThread().getName() + " shutdown....");
                }
            }, "Thread-" + i));

        }

        // 5. 等待執行完成
        long sleepMillis = 600000;
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag.set(false);

        // 6. 關閉資源

        pool.shutdown();
        try {
            pool.awaitTermination(6, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        } finally {
            producer.close(); // 最后之后調用
        }
    }

    /**
     * 產生一個消息
     *
     * @return
     */
    private static KeyedMessage<String, String> generateKeyedMessage() {
        String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
        StringBuilder sb = new StringBuilder();
        int num = ThreadLocalRandom.current().nextInt(1, 5);
        for (int i = 0; i < num; i++) {
            sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
        }
        String message = sb.toString().trim();
        return new KeyedMessage(TOPIC_NAME, key, message);
    }

    /**
     * 產生一個給定長度的字符串
     *
     * @param numItems
     * @return
     */
    private static String generateStringMessage(int numItems) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < numItems; i++) {
            sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
        }
        return sb.toString();
    }
}

 

三、Pom.xml依賴配置如下

<properties>
    <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM