替代Flume——Kafka Connect簡介


file
我們知道過去對於Kafka的定義是分布式,分區化的,帶備份機制的日志提交服務。也就是一個分布式的消息隊列,這也是他最常見的用法。但是Kafka不止於此,打開最新的官網。

file

我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform

分布式流處理平台。

file

這里也清晰的描述了Kafka的特點:Kafka用於構建實時數據管道和流式應用程序。它具有水平可擴展性、容錯性、速度極快,並在數千家公司投入生產。

所以現在的Kafka已經不僅是一個分布式的消息隊列,更是一個流處理平台。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的組件Kafka Connect與Kafka Streaming。

Kafka Connect簡介

我們知道消息隊列必須存在上下游的系統,對消息進行搬入搬出。比如經典的日志分析系統,通過flume讀取日志寫入kafka,下游由storm進行實時的數據處理。

file

Kafka Connect的作用就是替代Flume,讓數據傳輸這部分工作可以由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其他系統之間可靠且可靠地傳輸數據的工具。它可以快速地將大量數據集合移入和移出Kafka。

Kafka Connect的導入作業可以將數據庫或從應用程序服務器收集的數據傳入到Kafka,導出作業可以將Kafka中的數據傳遞到查詢系統,也可以傳輸到批處理系統以進行離線分析。

Kafka Connect功能包括:

  • 一個通用的Kafka連接的框架 - Kafka Connect規范化了其他數據系統與Kafka的集成,簡化了連接器開發,部署和管理
  • 分布式和獨立模式 - 支持大型分布式的管理服務,也支持小型生產環境的部署
  • REST界面 - 通過易用的REST API提交和管理Kafka Connect
  • 自動偏移管理 - 只需從連接器獲取一些信息,Kafka Connect就可以自動管理偏移量提交過程,因此連接器開發人員無需擔心連接器開發中偏移量提交這部分的開發
  • 默認情況下是分布式和可擴展的 - Kafka Connect構建在現有的組管理協議之上。可以添加擴展集群
  • 流媒體/批處理集成 - 利用Kafka現有的功能,Kafka Connect是橋接流媒體和批處理數據系統的理想解決方案

file

運行Kafka Connect

Kafka Connect目前支持兩種運行模式:獨立和集群。

獨立模式

在獨立模式下,只有一個進程,這種更容易設置和使用。但是沒有容錯功能。

啟動:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
獨立模式配置

第一個參數config/connect-standalone.properties是一些基本的配置:

這幾個在獨立和集群模式下都需要設置:

#bootstrap.servers   kafka集群列表
bootstrap.servers=localhost:9092
#key.converter       key的序列化轉換器  比如json的  key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter     value的序列化轉換器
value.converter=org.apache.kafka.connect.json.JsonConverter

#獨立模式特有的配置:
#offset.storage.file.filename       用於存儲偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
獨立模式連接器配置(配置文件)

后面的參數connector1.properties [connector2.properties ...] 可以多個,是連接器配置內容

這里我們配置一個從文件讀取數據並存入kafka的配置:

connect-file-sink.properties

  • name - 連接器的唯一名稱。嘗試再次使用相同名稱注冊將失敗。

  • connector.class - 連接器的Java類 此連接器的類的全名或別名。這里我們選擇FileStreamSink

  • tasks.max - 應為此連接器創建的最大任務數。如果連接器無法達到此級別的並行性,則可能會創建更少的任務。

  • key.converter - (可選)覆蓋worker設置的默認密鑰轉換器。

  • value.converter - (可選)覆蓋worker設置的默認值轉換器。

    下面兩個必須設置一個:

    • topics - 以逗號分隔的主題列表,用作此連接器的輸入
    • topics.regex - 用作此連接器輸入的主題的Java正則表達式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

可以在連接器中配置轉換器

需要指定參數:

  • transforms - 轉換的別名列表,指定將應用轉換的順序。
  • transforms.$alias.type - 轉換的完全限定類名。
  • transforms.$alias.$transformationSpecificConfig 轉換的配置屬性

例如,我們把剛才的文件轉換器的內容添加字段

首先設置connect-standalone.properties

key.converter.schemas.enable = false
value.converter.schemas.enable = false

設置connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

沒有轉換前的結果:

"foo"
"bar"
"hello world"

轉換后:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

常用轉換類型:

  • InsertField - 使用靜態數據或記錄元數據添加字段
  • ReplaceField - 過濾或重命名字段
  • MaskField - 用類型的有效空值替換字段(0,空字符串等)
  • ValueToKey Value轉換為Key
  • HoistField - 將整個事件作為單個字段包裝在Struct或Map中
  • ExtractField - 從Struct和Map中提取特定字段,並在結果中僅包含此字段
  • SetSchemaMetadata - 修改架構名稱或版本
  • TimestampRouter - 根據原始主題和時間戳修改記錄主題
  • RegexRouter - 根據原始主題,替換字符串和正則表達式修改記錄主題

集群模式

集群模式下,可以擴展,容錯。

啟動:
> bin/connect-distributed.sh config/connect-distributed.properties

在集群模式下,Kafka Connect在Kafka主題中存儲偏移量,配置和任務狀態。

集群模式配置

connect-distributed.properties

#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#還有一些配置要注意
#group.id(默認connect-cluster) - Connect的組id 請注意,這不得與使用者的組id 沖突
group.id=connect-cluster

#用於存儲偏移的主題; 此主題應具有許多分區
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用於存儲連接器和任務配置的主題  只能一個分區
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用於存儲狀態的主題; 此主題可以有多個分區
status.storage.topic=connect-status
status.storage.replication.factor=1

在集群模式下,配置並不會在命令行傳進去,而是需要REST API來創建,修改和銷毀連接器。

集群模式連接器配置(REST API)

可以配置REST API服務器,支持http與https

listeners=http://localhost:8080,https://localhost:8443

默認情況下,如果未listeners指定,則REST服務器使用HTTP協議在端口8083上運行。

以下是當前支持的REST API:

  • GET /connectors - 返回活動連接器列表
  • POST /connectors - 創建一個新的連接器; 請求主體應該是包含字符串name字段的JSON對象和包含config連接器配置參數的對象字段
  • GET /connectors/{name} - 獲取有關特定連接器的信息
  • GET /connectors/{name}/config - 獲取特定連接器的配置參數
  • PUT /connectors/{name}/config - 更新特定連接器的配置參數
  • GET /connectors/{name}/status - 獲取連接器的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,錯誤信息(如果失敗)以及所有任務的狀態
  • GET /connectors/{name}/tasks - 獲取當前為連接器運行的任務列表
  • GET /connectors/{name}/tasks/{taskid}/status - 獲取任務的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,以及錯誤信息是否失敗
  • PUT /connectors/{name}/pause - 暫停連接器及其任務,這將停止消息處理,直到恢復連接器
  • PUT /connectors/{name}/resume - 恢復暫停的連接器(如果連接器未暫停,則不執行任何操作)
  • POST /connectors/{name}/restart - 重新啟動連接器(通常是因為它已經失敗)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重啟個別任務(通常因為失敗)
  • DELETE /connectors/{name} - 刪除連接器,暫停所有任務並刪除其配置

連接器開發指南

kakfa允許開發人員自己去開發一個連接器。

核心概念

要在Kafka和其他系統之間復制數據,用戶需要創建一個Connector

Connector有兩種形式:

SourceConnectors從另一個系統導入數據,例如,JDBCSourceConnector將關系數據庫導入Kafka

SinkConnectors導出數據,例如,HDFSSinkConnector將Kafka主題的內容導出到HDFS文件

和對應的Task:

SourceTaskSinkTask

Task形成輸入輸出流,開發Task要注意偏移量的問題。

每個流應該是一系列鍵值記錄。還需要定期提交已處理的數據的偏移量,以便在發生故障時,處理可以從上次提交的偏移量恢復。Connector還需要是動態的,實現還負責監視外部系統是否存在任何更改。

開發一個簡單的連接器

開發連接器只需要實現兩個接口,即ConnectorTask

這里我們簡單開發一個FileStreamConnector。

此連接器是為在獨立模式下使用,SourceConnectorSourceTask讀取文件的每一行,SinkConnectorSinkTask每個記錄寫入一個文件。

連接器示例:

繼承SourceConnector,添加字段(要讀取的文件名和要將數據發送到的主題)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

定義實際讀取數據的類

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

FileStreamSourceTask下面定義該類。接下來,我們添加一些標准的生命周期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}
 
@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最后,實施的真正核心在於taskConfigs()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

任務示例:

源任務

實現SourceTask 創建FileStreamSourceTask繼承SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
 
    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }
 
    @Override
    public synchronized void stop() {
        stream.close();
    }

接下來,我們實現任務的主要功能,即poll()從輸入系統獲取事件並返回以下內容的方法List

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}
接收任務

不像SourceConnectorSinkConnectorSourceTaskSinkTask有非常不同的接口,因為SourceTask采用的是拉接口,並SinkTask使用推接口。兩者共享公共生命周期方法,但SinkTask完全不同:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
 
    public abstract void put(Collection<SinkRecord> records);
 
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

這是一個簡單的例子,它們有簡單的結構化數據 - 每一行只是一個字符串。幾乎所有實用的連接器都需要具有更復雜數據格式的模式。要創建更復雜的數據,您需要使用Kafka Connect dataAPI。

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
 
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

更多Kafka相關技術文章:

什么是Kafka?
Kafka監控工具匯總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

file


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM