Kafka Connect使用入門-Mysql數據導入到ElasticSearch


1.Kafka Connect

Connect是Kafka的一部分,它為在Kafka和外部存儲系統之間移動數據提供了一種可靠且伸縮的方式,它為連接器插件提供了一組API和一個運行時-Connect負責運行這些插件,它們負責移動數據。Connect以worker進程集群的方式運行,基於work進程安裝連接器插件,然后使用REST API管理和配置connector,這些work進程都是長時間運行的作業。connector啟動額外的task,利用work節點的資源以並行的方式移動大量的數據。SourceConnector負責從源系統讀取數據,並把數據對象提供給work進程,SinkConnector負責從work進程獲取數據,並把它們寫入目標系統。

2.Connect中一些概念

連接器:實現了Connect API,決定需要運行多少個任務,按照任務來進行數據復制,從work進程獲取任務配置並將其傳遞下去

任務:負責將數據移入或移出Kafka

work進程:相當與connector和任務的容器,用於負責管理連接器的配置、啟動連接器和連接器任務,提供REST API

轉換器:kafka connect和其他存儲系統直接發送或者接受數據之間轉換數據

3.運行Connect

//分布模式
cd kafka/bin sh connect-distributed.sh ../config/connect-distributed.properties

connect-distributed.properties中有一些配置:

bootstrap.servers:kafka集群信息
#相同id的connect worker屬於一個Connect集群
group.id:group.id=connect-cluster
#定義數據在Kafka中存儲形式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

REST API查看、管理connectors

查看kafka支持的connector
curl -X GET http://ip:8083/connector-plugins
GET /connectors – 返回所有正在運行的connector名。 
POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。 
GET /connectors/{name} – 獲取指定connetor的信息。 
GET /connectors/{name}/config – 獲取指定connector的配置信息。 
PUT /connectors/{name}/config – 更新指定connector的配置信息。 
GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在運行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體信息。 
GET /connectors/{name}/tasks – 獲取指定connector正在運行的task。 
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態信息。 
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。 
PUT /connectors/{name}/resume – 恢復一個被暫停的connector。 
POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用 
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。 
DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。 

apache kafka默認支持FileStreamSinkConnector、FileStreamSourceConnector。Confluent實現很多開源的connector,也可以自己根據Connect API實現自定義的connector。

4. 連接器示例-從MySQL到ElasticSearch

4.1 下載連接器

confluentinc-kafka-connect-elasticsearch-5.0.0、confluentinc-kafka-connect-jdbc-5.0.0,將兩個文件中lib中jar包放在運行connect worker節點中kafka安裝路徑下的lib目錄,另外mysql-connector-java-5.1.22.jar也要放進去

confluent 中的連接器使用說明 https://docs.confluent.io/2.0.0/connect/connect-jdbc/docs/index.html

4.2 重啟Connect

 驗證插件是否加載成功

curl -X GET http://ip:8083/connector-plugins
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"5.0.0"},{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"5.0.0"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"5.0.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"1.0"}]

4.3 mysql建立測試表

mysql> create table login(username varchar(50),login_time datetime);
Query OK, 0 rows affected (0.73 sec)

mysql> insert into login values('przhang',now());
Query OK, 1 row affected (0.03 sec)

mysql> insert into login values('peter',now());
Query OK, 1 row affected (0.00 sec)

4.4 啟動jdbc-connector

echo '{"name":"mysql-login-connector","config":{"connector.class":"JdbcSourceConnector","connection.url":"jdbc:mysql://localhost:3306/dwwspdb?user=dw_wspdb&password=dw_wspdb","mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp.column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http://ip:8083/connectors --header "Content-Type:application/json"

JdbcSourceConnector一些配置說明

connection.url,mysql數據庫連接

mode:timestamp  && "timestamp.column.name":"login_time",表示識別根據login_time時間列來識別增量數據,一旦這一列值發生變化,就會有一天新的記錄寫到kafka主題

mode:incrementing && "incrementing.column.id":"id",適合還有自增列的表,一旦有新的記錄入mysq,就會有新的記錄寫到kafka主題

topic.prefix:mysql.,表示寫到kafka的主題為mysql.表名

查看kafka主題中的消息

sh kafka-console-consumer.sh --bootstrap-server=kafkaip:9092 --topic mysql.login --from-beginning
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"przhang","login_time":1540453531000}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"peter","login_time":1540453540000}}

mysql數據更新:
update login set login_time=now() where username='przhang';
kafka實時輸出:
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"przhang","login_time":1540454254000}}

4.5 啟動ElasticsearchSinkConnector

echo '{"name":"elastic-login-connector","config":{"connector.class":"ElasticsearchSinkConnector","connection.url":"http://ESIP:9200","type.name":"mysql-data","topics":"mysql.login","key.ignore":true}}' | curl -X POST -d @- http://ip:8083/connectors --header "Content-Type:application/json"

ElasticsearchSinkConnector一些配置:

connection.url,es連接

type.name,寫入ES的索引類別

key.ignore=true,表示寫入ES的每條記錄的鍵為kafka主題名字+分區id+偏移量

從ES中查看數據:

curl -X GET http://ESIP:9200/mysql.login/_search?pretty=true
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysqllogin",
        "_type" : "mysql-data",
        "_id" : "mysqllogin+3+0",
        "_score" : 1.0,
        "_source" : {
          "username" : "przhang",
          "login_time" : 1540453531000
        }
      },
      {
        "_index" : "mysqllogin",
        "_type" : "mysql-data",
        "_id" : "mysqllogin+3+3",
        "_score" : 1.0,
        "_source" : {
          "username" : "mayun",
          "login_time" : 1540454531000
        }
      },
      {
        "_index" : "mysqllogin",
        "_type" : "mysql-data",
        "_id" : "mysqllogin+3+2",
        "_score" : 1.0,
        "_source" : {
          "username" : "przhang",
          "login_time" : 1540454254000
        }
      },
      {
        "_index" : "mysqllogin",
        "_type" : "mysql-data",
        "_id" : "mysqllogin+3+4",
        "_score" : 1.0,
        "_source" : {
          "username" : "pony",
          "login_time" : 1540473988000
        }
      },
      {
        "_index" : "mysqllogin",
        "_type" : "mysql-data",
        "_id" : "mysqllogin+3+1",
        "_score" : 1.0,
        "_source" : {
          "username" : "peter",
          "login_time" : 1540453540000
        }
      }
    ]
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM