Flink kuduSink開發


1、繼承RichSinkFunction

(1)首先在構造方式傳入kudu的masterAddress地址、默認表名、TableSerializationSchema、KuduTableRowConverter、Properties配置對象

(2)重寫open方法

初始化KuduClient對象操作kudu,KuduSession對象並傳入一堆配置

(3)重寫invoke方法

核心是如果已傳入TableSerializationSchema對象,則通過其serializeTable方法從輸入的json數據里提取表名,如果未定義則直接取默認表名。拿到表名后就能使用KuduClient對象對其操作了

if (schema != null) {
String serializeTableName = schema.serializeTable(row);
if (serializeTableName == null) return;
table = client.openTable(serializeTableName);
}
else
table = client.openTable(tableName);
insert = table.newInsert();

2、定義KuduTableRowConverter接口,將每一條輸入數據轉換成TableRow對象

public interface KuduTableRowConverter<IN> extends Serializable {
TableRow convert(IN value);
}

定義TableRow類,代表一行數據,key是字串型的鍵名,value是Object型的鍵值

public class TableRow implements Serializable {
private static final long serialVersionUID = 1L;
private Map<String, Object> pairs = new HashMap<>();
public int size() {return pairs.size();}
public Map<String, Object> getPairs() {return pairs;}
public Object getElement(String key) {return pairs.get(key);}
public void putElement(String key, Object value) {pairs.put(key, value);}
}

定義JsonKuduTableRowConverter實現KuduTableRowConverter接口,對於輸入的json數據,通過一系列轉換邏輯轉換成TableRow對象

3、定義TableSerializationSchema接口,從每一條輸入數據里提取表名

public interface TableSerializationSchema<IN> extends Serializable {
String serializeTable(IN value);
}

定義JsonLogidKeyTableSerializationSchema實現TableSerializationSchema接口,對於輸入的json數據,使用指定key值提取value值,然后再從一個預先獲取的map里找到這個value對應的表名,然后加上必要的前綴與后綴組成impala的表名


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM