Kafka Schema Registry | 學習Avro Schema


1.目標

在這個Kafka Schema Registry教程中,我們將了解Schema Registry是什么以及為什么我們應該將它與Apache Kafka一起使用此外,我們將看到Avro架構演變的概念,並使用Kafka Avro Serializers設置和使用Schema Registry。此外,我們將學習使用Schema Registry的REST接口管理Avro Schemas。 
那么,讓我們討論一下Apache Kafka Schema Registry。

Kafka Schema Registry

Apache Kafka架構注冊表

2.什么是Kafka Schema Registry?

基本上,對於Kafka ProducersKafka ConsumerKafka的 Schema Registry都存儲Avro Schemas。

  • 它提供了一個用於管理Avro架構的RESTful界面。
  • 它允許存儲版本化模式的歷史記錄。
  • 此外,它還支持檢查Kafka的架構兼容性。
  • 使用Avro Schema,我們可以配置兼容性設置以支持Kafka模式的發展。

學習Apache Kafka用例和應用程序

基本上,Kafka Avro序列化項目提供了序列化程序。在Avro和Kafka Schema Registry的幫助下,使用Kafka Avro序列化的Kafka Producers和Kafka Consumer都處理模式管理以及記錄的序列化。

Kafka Schema Registry

Kafka Schema Registry簡介

此外,生產者在使用Kafka中的Confluent Schema Registry時不必發送模式,只需要唯一的模式ID。因此,為了從Confluent模式注冊表中查找完整模式(如果尚未緩存),使用者使用模式ID。這意味着不必使用每組記錄發送模式,這樣也可以節省時間。 

但是,Kafka制作人也創建了一條記錄/消息,即Avro記錄。該記錄包含模式ID和數據。此外,如果需要,還會注冊架構,然后使用Kafka Avro Serializer序列化數據和架構ID。 
我們來討論Apache Kafka架構及其基本概念

3.為什么在Kafka中使用Schema Registry? 

消費者的架構可能與生產者的架構不同。在定義消費者模式時,消費者期望記錄/消息符合要求。執行檢查時,如果兩個模式不匹配但兼容,則通過Avma Schema Evolution和Schema Registry進行有效負載轉換。此外,Kafka記錄可以有一個鍵和一個值,兩者都可以有一個模式。

4. Kafka Schema注冊局運營

但是,對於Kafka記錄的鍵和值,Schema Registry可以存儲模式。此外,它按主題列出模式。此外,它可以列出主題(模式)的所有版本。此外,它可以按版本或ID檢索架構。並且可以獲得最新版本的模式。
Apache Kafka架構注冊表的一些兼容級別是向后,向前,完整,無。此外,我們可以通過REST API使用Schema注冊表管理模式。
修改Apache Kafka操作和命令。

5. Kafka Schema兼容性設置

讓我們了解所有兼容性級別。基本上,向后兼容性是指使用較舊模式編寫的數據,可以使用較新的模式進行讀取。此外,向前兼容性是指使用舊模式可讀取使用較新模式編寫的數據。此外,完全兼容性是指新版本的模式是向后兼容的。並且,“無”狀態,意味着它禁用模式驗證,不建議這樣做。因此,Schema Registry只存儲模式,如果我們將級別設置為“none”,它將不會驗證兼容性。

一個。架構注冊表配置

無論是全球還是每個主題。
兼容性值為:
A。無
表示不檢查架構兼容性。
B.轉發
說,檢查以確保最后一個模式版本與新模式向前兼容。
C.向后(默認)
這意味着確保新模式向后兼容最新模式。
D. Full 
“Full”表示確保新模式從最新到最新以及從最新到最新都是向前和向后兼容的。
閱讀Storm Kafka與配置和代碼的集成

6.架構演變

雖然,如果使用該架構的舊版本,在將數據寫入存儲后會更改Avro架構,那么當我們嘗試讀取該數據時,Avro可能會進行架構演變。
但是,從Kafka的角度來看,模式演變只發生在消費者的反序列化(閱讀)中。並且,如果可能,則在反序列化期間自動修改值或鍵,以便在消費者的模式與生產者的模式不同時符合消費者的讀取模式。
簡而言之,它是消費者模式版本與生產者放入Kafka日志的模式之間的Avro模式的自動轉換。但是,當消費者模式與用於序列化Kafka記錄的生產者模式不同時,將對Kafka記錄的鍵或值執行數據轉換。雖然,如果模式匹配,則無需進行轉換。

一個。模式演變過程中允許的修改

可以將具有默認值的字段添加到架構。我們可以刪除具有默認值的字段。此外,我們可以更改字段的訂單屬性。此外,我們可以將字段的默認值更改為其他值,或者將默認值添加到沒有字段的字段中。
但是,可以刪除或添加字段別名,但這可能會導致某些依賴別名的使用者中斷。此外,我們可以將類型更改為包含原始類型的聯合。上述更改將導致我們的架構在使用舊架構讀取時可以使用Avro的架構演變。

灣 修改架構的道路規則

如果我們想讓我們的架構可以進化,我們必須遵循這些准則。首先,我們需要為模式中的字段提供默認值,因為這允許我們稍后刪除該字段。請記住,永遠不要更改字段的數據類型。此外,在向架構添加新字段時,我們必須為字段提供默認值。並且,請確保不要重命名現有字段(而是使用別名)。
我們來討論Apache Kafka Streams | 流處理拓撲
例如:

  • 員工示例Avro架構:
  1. {"namespace": "com.dataflair.phonebook",
     "type": "record",
     "name": "Employee",
     "doc" : "Represents an Employee at a company",
     "fields": [
       {"name": "firstName", "type": "string", "doc": "The persons given name"},
       {"name": "nickName", "type": ["null", "string"], "default" : null},
       {"name": "lastName", "type": "string"},
       {"name": "age",  "type": "int", "default": -1},
       {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
       {"name": "phoneNumber",  "type":
       [ "null",
         { "type": "record",   "name": "PhoneNumber",
           "fields": [
             {"name": "areaCode", "type": "string"},
             {"name": "countryCode", "type": "string", "default" : ""},
             {"name": "prefix", "type": "string"},
             {"name": "number", "type": "string"}
           ]
         }
       ]
       },
       {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
         "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
       }
     ]
    }

     

7. Avro Schema Evolution場景

假設在模式的版本1中,我們的員工記錄沒有年齡因素。但現在我們要添加一個默認值為-1的年齡字段。所以,假設我們有一個消費者使用版本1沒有年齡,生產者使用模式的版本2與年齡
現在,通過使用Employee模式的版本2生產者,創建一個com.dataflair.Employee記錄設置年齡字段到42,然后將其發送給Kafka主題new-Employees。之后,使用版本1,使用者將使用Employee架構的新員工記錄。因此,在反序列化期間刪除age字段只是因為使用者正在使用模式的版本1。

您是否知道Kafka和RabbitMQ之間的區別
此外,同一個消費者修改了一些記錄,然后將記錄寫入NoSQL商店。因此,它寫入NoSQL存儲的記錄中缺少age字段。現在,使用模式的第2版,另一個具有年齡的客戶端從NoSQL存儲中讀取記錄。因此,由於Consumer使用版本1編寫它,因此記錄中缺少age字段,因此客戶端讀取記錄並將age設置為默認值-1。
因此,Schema Registry可以拒絕該模式,並且生產者永遠不會將其添加到Kafka日志中,如果我們添加了年齡並且它不是可選的,即age字段沒有默認值。

8.使用Schema Registry REST API

此外,通過使用以下操作,Kafka中的Schema Registry允許我們管理模式:

  1. 存儲Kafka記錄的鍵和值的模式
  2. 按主題列出模式
  3. 列出主題的所有版本(架構)
  4. 按版本檢索架構
  5. 按ID檢索架構
  6. 檢索最新版本的架構
  7. 執行兼容性檢查
  8. 全局設置兼容級別

您是否知道Apache Kafka職業范圍及其薪資趨勢
然而,所有這些都可通過REST API與Kafka中的Schema Registry一起獲得。
我們可以執行以下操作,以便發布新架構:

一個。發布新架構

  1. curl -X POST -H "Content-Type:
    application/vnd.schemaregistry.v1+json" \
       --data '{"schema": "{\"type\": …}’ \
       http://localhost:8081/subjects/Employee/versions

     

灣 列出所有模式

curl -X GET http:// localhost:8081 / subject
我們基本上可以通過Schema Registry的REST接口執行上述所有操作,只有你有一個好的HTTP客戶端。例如,Schema Registry使用Square中的OkHttp客戶端(com.squareup.okhttp3:okhttp:3.7.0+)稍微好一點,如下所示:

  • 使用REST端點嘗試所有Schema Registry選項:

  1. package com.dataflair.kafka.schema;
    import okhttp3.*;
    import java.io.IOException;
    public class SchemaMain {
       private final static MediaType SCHEMA_CONTENT =
               MediaType.parse("application/vnd.schemaregistry.v1+json");
       private final static String Employee_SCHEMA = "{\n" +
               " \"schema\": \"" +
               " {" +
               " \\\"namespace\\\": \\\"com.dataflair.phonebook\\\"," +
               " \\\"type\\\": \\\"record\\\"," +
               " \\\"name\\\": \\\"Employee\\\"," +
               " \\\"fields\\\": [" +
               " {\\\"name\\\": \\\"fName\\\", \\\"type\\\": \\\"string\\\"}," +
               " {\\\"name\\\": \\\"lName\\\", \\\"type\\\": \\\"string\\\"}," +
               " {\\\"name\\\": \\\"age\\\",  \\\"type\\\": \\\"int\\\"}," +
               " {\\\"name\\\": \\\"phoneNumber\\\",  \\\"type\\\": \\\"string\\\"}" +
               " ]" +
               " }\"" +
               "}";
       public static void main(String... args) throws IOException {
           System.out.println(Employee_SCHEMA);
           final OkHttpClient client = new OkHttpClient();
           //POST A NEW SCHEMA
           Request request = new Request.Builder()
                   .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
                   .url("http://localhost:8081/subjects/Employee/versions")
                   .build();
           String output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //LIST ALL SCHEMAS
           request = new Request.Builder()
                   .url("http://localhost:8081/subjects")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //SHOW ALL VERSIONS OF Employee
           request = new Request.Builder()
                   .url("http://localhost:8081/subjects/Employee/versions/")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //SHOW VERSION 2 OF Employee
           request = new Request.Builder()
                   .url("http://localhost:8081/subjects/Employee/versions/2")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //SHOW THE SCHEMA WITH ID 3
           request = new Request.Builder()
                   .url("http://localhost:8081/schemas/ids/3")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //SHOW THE LATEST VERSION OF Employee 2
           request = new Request.Builder()
                   .url("http://localhost:8081/subjects/Employee/versions/latest")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //CHECK IF SCHEMA IS REGISTERED
           request = new Request.Builder()
                   .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
                   .url("http://localhost:8081/subjects/Employee")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           //TEST COMPATIBILITY
           request = new Request.Builder()
                   .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
                   .url("http://localhost:8081/compatibility/subjects/Employee/versions/latest")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           // TOP LEVEL CONFIG
           request = new Request.Builder()
                   .url("http://localhost:8081/config")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           // SET TOP LEVEL CONFIG
           // VALUES are none, backward, forward and full
           request = new Request.Builder()
                   .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"none\"}"))
                   .url("http://localhost:8081/config")
                   .build();
           output = client.newCall(request).execute().body().string();
           System.out.println(output);
           // SET CONFIG FOR Employee
           // VALUES are none, backward, forward and full
           request = new Request.Builder()
                   .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"backward\"}"))
                   .url("http://localhost:8081/config/Employee")
                  .build();
           output = client.newCall(request).execute().body().string();
          System.out.println(output);
       }
    }

     

我們建議運行該示例以嘗試強制不兼容的架構到架構注冊表,並且還要注意各種兼容性設置的行為。

讓我們修改Kafka vs Storm

C。運行Kafka架構注冊表:

$ cat~ / tools / confluent-3.2.1 / etc / schema-registry / schema-registry.properties 
listeners = http://0.0.0.0:8081 
kafkastore.connection.url = localhost:2181 
kafkastore.topic = _schemas 
debug = false 
~ / tools / confluent-3.2.1 / bin / schema-registry-start~ / tools / confluent-3.2.1 / etc / schema-registry / schema-registry.properties

9.撰寫消費者和生產者 

在這里,我們將要求啟動指向ZooKeeper集群的Schema Registry服務器。此外,我們可能需要將Kafka Avro Serializer和Avro JAR導入我們的Gradle項目。之后,我們將要求配置生產者使用Schema Registry和KafkaAvroSerializer。此外,我們將要求將其配置為使用Schema Registry並使用KafkaAvroDeserializer來編寫使用者。
因此,此構建文件顯示了我們需要的Avro JAR文件。
閱讀Apache Kafka Security | Kafka的需求和組成部分

  • Kafka Avro Serializer示例的Gradle構建文件:
  1. plugins {
       id "com.commercehub.gradle.plugin.avro" version "0.9.0"
    }
    group 'dataflair'
    version '1.0-SNAPSHOT'
    apply plugin: 'java'
    sourceCompatibility = 1.8
    dependencies {
       compile "org.apache.avro:avro:1.8.1"
       compile 'com.squareup.okhttp3:okhttp:3.7.0'
       testCompile 'junit:junit:4.11'
       compile 'org.apache.kafka:kafka-clients:0.10.2.0'
       compile 'io.confluent:kafka-avro-serializer:3.2.1'
    }
    repositories {
       jcenter()
       mavenCentral()
       maven {
           url "http://packages.confluent.io/maven/"
       }
    }
    avro {
       createSetters = false
       fieldVisibility = "PRIVATE"
    }

     

請記住包括Kafka Avro Serializer lib(io.confluent:kafka-avro-serializer:3.2.1)和Avro lib(org.apache.avro:avro:1.8.1)。

一個。寫一個制片人

讓我們按如下方式編寫制作人。

  • 使用Kafka Avro Serialization和Kafka Registry的制作人:
package com.dataflair.kafka.schema;
import com.dataflair.phonebook.Employee;
import com.dataflair.phonebook.PhoneNumber;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.stream.IntStream;

 

public class AvroProducer {
   private static Producer<Long, Employee> createProducer() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
               LongSerializer.class.getName());
       // Configure the KafkaAvroSerializer.
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               KafkaAvroSerializer.class.getName());
       // Schema Registry location.
       props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
               "http://localhost:8081");
       return new KafkaProducer<>(props);
   }
   private final static String TOPIC = "new-Employees";
   public static void main(String... args) {
       Producer<Long, Employee> producer = createProducer();
        Employee bob = Employee.newBuilder().setAge(35)
               .setFirstName("Bob")
               .setLastName("Jones")
               .setPhoneNumber(
                       PhoneNumber.newBuilder()
                               .setAreaCode("301")
                               .setCountryCode("1")
                               .setPrefix("555")
                               .setNumber("1234")
                              .build())
               .build();
       IntStream.range(1, 100).forEach(index->{
           producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));
       });
       producer.flush();
       producer.close();
   }
}

 

 

另外,請確保我們將Schema Registry和KafkaAvroSerializer配置為生成器設置的一部分。

我們來討論Kafka主題

  1. // Configure the KafkaAvroSerializer.
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                   KafkaAvroSerializer.class.getName());
    // Schema Registry location.        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                   "http://localhost:8081");

     

此外,我們按預期使用生產者

寫一個消費者

之后,我們將寫信給消費者。

  • 使用Kafka Avro序列化和架構注冊表的Kafka Consumer:
  1. package com.dataflair.kafka.schema;
    import com.dataflair.phonebook.Employee;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.LongDeserializer;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.stream.IntStream;
    public class AvroConsumer {
       private final static String BOOTSTRAP_SERVERS = "localhost:9092";
       private final static String TOPIC = "new-Employee";
       private static Consumer<Long, Employee> createConsumer() {
           Properties props = new Properties();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                   LongDeserializer.class.getName());
           //Use Kafka Avro Deserializer.
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                   KafkaAvroDeserializer.class.getName());  //<----------------------
           //Use Specific Record or else you get Avro GenericRecord.
           props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
           //Schema registry location.
           props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                   "http://localhost:8081"); //<----- Run Schema Registry on 8081
           return new KafkaConsumer<>(props);
       }
       public static void main(String... args) {
           final Consumer<Long, Employee> consumer = createConsumer();
           consumer.subscribe(Collections.singletonList(TOPIC));
           IntStream.range(1, 100).forEach(index -> {
               final ConsumerRecords<Long, Employee> records =
                       consumer.poll(100);
               if (records.count() == 0) {
                   System.out.println("None found");
               } else records.forEach(record -> {
                   Employee EmployeeRecord = record.value();
                   System.out.printf("%s %d %d %s \n", record.topic(),
                           record.partition(), record.offset(), EmployeeRecord);
               });
           });
       }
    }

     

確保,我們必須告訴消費者在哪里找到注冊表,與生產者一樣,我們必須配置Kafka Avro反序列化器。

  • 為消費者配置架構注冊表:
  1. //Use Kafka Avro Deserializer.
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                   KafkaAvroDeserializer.class.getName());
    //Use Specific Record or else you get Avro GenericRecord.
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    //Schema registry location.        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                   "http://localhost:8081"); //<----- Run Schema Registry on 8081

     

此外,使用生成的Employee對象版本。因為,如果我們沒有,而不是我們生成的Employee對象,那么它將使用Avro GenericRecord,這是一個SpecificRecord。
而且,我們需要啟動Kafka和ZooKeeper,運行上面的例子:

  • 運行ZooKeeper和Kafka:
  1. kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &
    kafka/bin/kafka-server-start.sh kafka/config/server.properties

     

所以,這完全是關於Kafka Schema Registry的。希望你喜歡我們的解釋。
體驗最好的Apache Kafka Quiz Part-1 | 准備迎接挑戰

10.結論

因此,我們看到Kafka Schema Registry為Kafka消費者和Kafka生產商管理Avro Schemas。此外,Avro還提供模式遷移,這對於流式傳輸和大數據架構非常重要。因此,我們已經向Kafka Schema Registry學習了整個概念。在這里,我們討論了Kafka中Schema注冊表的需求。

此外,我們還學習了模式注冊表操作和兼容性設置。最后,我們看到了Kafka Avro Schema並使用了Schema Registry Rest API。最后,我們轉向使用Schema注冊表和Avro Serialization編寫Kafka使用者和生產者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM