0x00 下載
http://www.confluent.io/download,打開后,顯示最新版本3.0.0,然后在右邊填寫信息后,點擊Download下載。
之后跳轉到下載頁面,選擇zip 或者 tar都行, 下載完成后上傳linux系統,解壓即完成安裝。
- zip and tar archives – 推薦OS X 和 Quickstart
- deb packages via apt – 推薦安裝服務在 Debian/Ubuntu系統
- rpm packages via yum – 推薦安裝服務在 RHEL/CentOS/Fedora系統
- deb/rpm packages with installer script
Confluent 目前還不支持Windows系統。Windows用戶可以下載和使用zip 和 tar包,但最好直接運行jar文件 ,而不是使用包裝腳本。
0x01 Requirements
唯一需要的條件是java 版本>=1.7。
0x02 Confluent Platform快速入門
你可以快速的運行Confluent platform在單台服務器上。在這篇quickstart,我們將介紹如何運行ZooKeeper,Kafka,和Schema Registry,然后如何讀和寫一些Avro數據從/到Kafka。
(如果你想跑一個數據管道用Kafka Connect和Control Center,參考The Control Center QuickStart Guide.)我們隨后也會介紹。
1.下載和安裝Confluent platform。在這篇quickstart 我們使用zip包,也有很多其他安裝方式,見上。
$ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip $ unzip confluent-3.0.0-2.11.zip $ cd confluent-3.0.0
下邊展示的是安裝目錄里上層層級結構:
confluent-3.0.0/bin/ # Driver scripts for starting/stopping services confluent-3.0.0/etc/ # Configuration files confluent-3.0.0/share/java/ # Jars
如果你通過deb或者rpm安裝,目錄結構如下:
/usr/bin/ # Driver scripts for starting/stopping services, prefixed with <package> names /etc/<package>/ # Configuration files /usr/share/java/<package>/ # Jars
2.啟動Zookeeper。因為這是長期運行的服務,你應該運行它在一個獨立的終端(或者在后邊運行它,重定向輸出到一個文件中)。你需要有寫權限到/var/lib在這一步以及之后的步驟里:
# The following commands assume you exactly followed the instructions above. # This means, for example, that at this point your current working directory # must be confluent-3.0.0/. $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
3.啟動Kafka,同樣在一個獨立的終端。
$ ./bin/kafka-server-start ./etc/kafka/server.properties
4.啟動Schema Registry,同樣在一個獨立的終端。
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
5.現在所有需要的服務都已啟動,我們發送一些Avro數據到Kafka的topic中。雖然這一步一般會得到一些數據從一些應用里,這里我們使用Kafka提供的例子,不用寫代碼。我們在本地的Kafka集群里,寫數據到topic “test”里,讀取每一行Avro信息,校驗Schema Registry .
$ ./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
一旦啟動,進程等待你輸入一些信息,一條一行,會發送到topic中一旦按下enter鍵。試着輸入一些信息:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
輸入完成后,可以使用Ctrl+C來終止進程。
Note:如果一個空行你按下Enter鍵,會被解釋為一個null值,引起錯誤。然后僅僅需要做的是啟動producer進程,接着輸入信息。
6.現在我們可以檢查,通過Kafka consumer控制台讀取數據從topic。在topic ‘test'中,Zookeeper實例,會告訴consumer解析數據使用相同的schema。最后從開始讀取數據(默認consumer只讀取它啟動之后寫入到topic中的數據)
$ ./bin/kafka-avro-console-consumer --topic test \ --zookeeper localhost:2181 \ --from-beginning
你會看到你之前在producer中輸入的數據,以同樣的格式。
consumer不會退出,它可以監聽寫入到topic中的新數據。保持consumer運行,然后重復第5步,輸入一些信息,然后按下enter鍵,你會看到consumer會立即讀取到寫入到topic中的數據。
當你完成了測試,可以用Ctrl+C終止進程。
7.現在讓我們嘗試寫一些不兼容的schema的數據到topic ’test‘中,我們重新運行producer命令,但是改變schema。
$ ./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test \ --property value.schema='{"type":"int"}'
現在輸入一個整數按下enter鍵,你會看到以下的異常:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146) at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49) at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
當producer試圖發送一些信息,它會檢查schema用Schema Registry。當返回錯誤時說明現在的schema無效,因為它不能兼容之前設置的schema。控制台打印出錯誤信息並退出,但是你自己的應用可以更加人性化處理這類問題。但最重要的是,我們保證不讓不兼容的數據寫入到Kafka中。
8.當你完成這一系列測試,你可以使用ctrl+c來關閉服務,以啟動時相反的順序。
這一簡單的教程包含了Kafka和Schema Registry這一些核心的服務。你也可以參考以下document:
- Confluent Control Center documentation
- Kafka Streams documentation
- Kafka Connect documentation
- Schema Registry documentation
- Kafka REST Proxy documentation
- Camus documentation