Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十四)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析


參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇

在了解如何avro發送到kafka,再從kafka解析avro數據之前,我們可以先看下如何使用操作字符串:

producer:

package com.spark;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by Administrator on 2017/8/29.
 */
public class KafkaProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
        int i=0;
        while (true) {
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
            System.out.println(i++);
            Thread.sleep(1000);
        }
       // producer.close();

    }
}
View Code

consumer:

package com.spark;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by Administrator on 2017/8/29.
 */
public class MyKafkaConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
            }

        }

    }
}
View Code

Avro操作工程pom.xml:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.10</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.4</version>
        </dependency>

需要依賴於avro的包,同時這里是需要使用kafka api。

在使用 Avro 之前,我們需要先定義模式(schemas)。模式通常使用 JSON 來編寫,我們不需要再定義相關的類,這篇文章中,我們將使用如下的模式:

{
    "fields": [
        { "name": "str1", "type": "string" },
        { "name": "str2", "type": "string" },
        { "name": "int1", "type": "int" }
    ],
    "name": "Iteblog",
    "type": "record"
}

上面的模式中,我們定義了一種 record 類型的對象,名字為 Iteblog,這個對象包含了兩個字符串和一個 int 類型的fields。定義好模式之后,我們可以使用 avro 提供的相應方法來解析這個模式:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

這里的 USER_SCHEMA 變量存儲的就是上面定義好的模式。

解析好模式定義的對象之后,我們需要將這個對象序列化成字節數組,或者將字節數組轉換成對象。Avro 提供的 API 不太易於使用,所以本文使用 twitter 開源的 Bijection 庫來方便地實現這些操作。我們先創建 Injection 對象來講對象轉換成字節數組:

Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

現在我們可以根據之前定義好的模式來創建相關的 Record,並使用 recordInjection 來序列化這個 Record :

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("str1", "My first string");
avroRecord.put("str2", "My second string");
avroRecord.put("int1", 42);
 
byte[] bytes = recordInjection.apply(record);

Producter實現

有了上面的介紹之后,我們現在就可以在 Kafka 中使用 Avro 來序列化我們需要發送的消息了:

package example.avro;

import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaProducter {
    public static final String USER_SCHEMA = 
             "{" 
            + "\"type\":\"record\"," 
            + "\"name\":\"Iteblog\"," 
            + "\"fields\":[" 
            + "  { \"name\":\"str1\", \"type\":\"string\" }," 
            + "  { \"name\":\"str2\", \"type\":\"string\" },"
            + "  { \"name\":\"int1\", \"type\":\"int\" }" 
            + "]}";
    public static final String TOPIC = "t-testavro";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
        
        for (int i = 0; i < 1000; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("str1", "Str 1-" + i);
            avroRecord.put("str2", "Str 2-" + i);
            avroRecord.put("int1", i);

            byte[] bytes = recordInjection.apply(avroRecord);

            ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "" + i, bytes);
            producer.send(record);
            System.out.println(">>>>>>>>>>>>>>>>>>" + i);
        }

        producer.close();
        System.out.println("complete...");
    }
}

因為我們使用到 Avro 和 Bijection 類庫,所有我們需要在 pom.xml 文件里面引入以下依賴:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.0</version>
</dependency>
 
<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>bijection-avro_2.10</artifactId>
  <version>0.9.2</version>
</dependency>

從 Kafka 中讀取 Avro 格式的消息

從 Kafka 中讀取 Avro 格式的消息和讀取其他類型的類型一樣,都是創建相關的流,然后迭代:

ConsumerConnector consumer = ...;
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
    ....
}

關鍵在於如何將讀出來的 Avro 類型字節數組轉換成我們要的數據。這里還是使用到我們之前介紹的模式解釋器:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

上面的 USER_SCHEMA 就是上邊介紹的消息模式,我們創建了一個 recordInjection 對象,這個對象就可以利用剛剛解析好的模式將讀出來的字節數組反序列化成我們寫入的數據:

GenericRecord record = recordInjection.invert(message).get();

然后我們就可以通過下面方法獲取寫入的數據:

record.get("str1")
record.get("str2")
record.get("int1")

Kafka 0.9.x 版本Consumer實現

package example.avro;

import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroKafkaConsumer {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("group.id", "testgroup");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        consumer.subscribe(Collections.singletonList(AvroKafkaProducter.TOPIC));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1")));
                    logger.info(info);
                }
            }
        } finally {
            consumer.close();
        }
    }

}

測試:

producer:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
>>>>>>>>>>>>>>>>>>0
>>>>>>>>>>>>>>>>>>1
>>>>>>>>>>>>>>>>>>2
>>>>>>>>>>>>>>>>>>3
>>>>>>>>>>>>>>>>>>4
>>>>>>>>>>>>>>>>>>5
>>>>>>>>>>>>>>>>>>6
>>>>>>>>>>>>>>>>>>7
>>>>>>>>>>>>>>>>>>8
>>>>>>>>>>>>>>>>>>9
>>>>>>>>>>>>>>>>>>10
...
>>>>>>>>>>>>>>>>>>997
>>>>>>>>>>>>>>>>>>998
>>>>>>>>>>>>>>>>>>999
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
complete...

consumer:

[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4321, customer = 165,country = Str 1-165
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4322, customer = 166,country = Str 1-166
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4323, customer = 167,country = Str 1-167
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4324, customer = 168,country = Str 1-168
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4325, customer = 169,country = Str 1-169
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4326, customer = 170,country = Str 1-170
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4327, customer = 171,country = Str 1-171

GenericRecord打印:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

public class AvroToJson {
    public static void main(String[] args) {
        String avroSchema = 
                 "{" 
                + "\"type\": \"record\", " 
                + "\"name\": \"LongList\"," 
                + "\"aliases\": [\"LinkedLongs\"]," 
                + "\"fields\" : ["
                + "    {\"name\": \"name\", \"type\": \"string\"}," 
                + "    {\"name\": \"favorite_number\", \"type\": [\"null\", \"long\"]},"
                + "    {\"name\": \"favorite_color\", \"type\": [\"null\", \"string\"]}"
                + "  ]" 
                + "}";

        Schema schema = new Schema.Parser().parse(avroSchema);
        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("name", "Format");
        user1.put("favorite_number", 666);
        user1.put("favorite_color", "red");

        GenericData genericData = new GenericData();
        
        String result = genericData.toString(user1);
        System.out.println(result);

    }
}

打印結果:

{"name": "Format", "favorite_number": 666, "favorite_color": "red"}

該信息在調試,想查看avro對象內容時,十分實用。

 一次有趣的測試:

avro schema:

{
    "type":"record",
    "name":"My",
    "fields":[
                     {"name":"id","type":["null", "string"]}, 
                     {"name":"start_time","type":["null", "string"]},
                     {"name":"stop_time","type":["null", "string"]},
                     {"name":"insert_time","type":["null", "string"]},                     
                     {"name":"eid","type":["null", "string"]},                     
                     {"name":"V_00","type":["null", "string"]},
                     {"name":"V_01","type":["null", "string"]},
                     {"name":"V_02","type":["null", "string"]},
                     {"name":"V_03","type":["null", "string"]},
                     {"name":"V_04","type":["null", "string"]},
                     {"name":"V_05","type":["null", "string"]},
                     {"name":"V_06","type":["null", "string"]},
                     {"name":"V_07","type":["null", "string"]},
                     {"name":"V_08","type":["null", "string"]},
                     {"name":"V_09","type":["null", "string"]}
            ]
}

測試程序:

    public static void main(String[] args) throws StreamingQueryException, IOException {
        String filePathString = "E:\\work\\my.avsc";

        Schema.Parser parser = new Schema.Parser();
        InputStream inputStream = new FileInputStream(filePathString);
        Schema schema = parser.parse(inputStream);
        inputStream.close();

        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("id", "9238234");
        avroRecord.put("start_time", "2018-08-12T12:09:04.987");
        avroRecord.put("stop_time", "2018-08-12T12:09:04.987");
        avroRecord.put("insert_time", "2018-08-12T12:09:04.987");
        avroRecord.put("eid", "23434");
        avroRecord.put("V_00", "0");
        avroRecord.put("V_01", "1");
        avroRecord.put("V_02", "2");
        avroRecord.put("V_09", "9");

        byte[] bytes = recordInjection.apply(avroRecord);

        String byteString = byteArrayToStr(bytes);
        //String byteString= bytes.toString();
        
        System.out.println(">>>>>>>>>>>>>>arvo字節流轉化為字符串。。。。");
        System.out.println(byteString);
        System.out.println(">>>>>>>>>>>>>>arvo字節流轉化為字符串。。。。");
        
        System.out.println(">>>>>>>>>>>>>>arvo字符串轉化為字節流。。。");
        byte[] data = strToByteArray(byteString);        
        GenericRecord record = recordInjection.invert(data).get();        
        for (Schema.Field field : schema.getFields()) {
            String value = record.get(field.name()) == null ? "" : record.get(field.name()).toString();
            System.out.println(field.name() + "," + value);
        }
        System.out.println(">>>>>>>>>>>>>>arvo字符串轉化為字節流。。。");
    }

    public static String byteArrayToStr(byte[] byteArray) {
        if (byteArray == null) {
            return null;
        }
        String str = new String(byteArray);
        return str;
    }

    public static byte[] strToByteArray(String str) {
        if (str == null) {
            return null;
        }
        byte[] byteArray = str.getBytes();
        return byteArray;
    }

經過測試,可以正常運行,輸出信息為:

 但是如果把代碼中的byte轉化為字符代碼修改為:

        //String byteString = byteArrayToStr(bytes);
        String byteString= bytes.toString();

就拋出錯誤了:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM