介紹
今天在Flink 1.7.2版本上跑一個Flink SQL 示例 RetractPvUvSQL,報
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 19 to line 1, column 51: Cannot apply 'DATE_FORMAT' to arguments of type 'DATE_FORMAT(<VARCHAR(65536)>, <CHAR(2)>)'. Supported form(s): '(TIMESTAMP, FORMAT)'
從提示看應該是不支持參數為字符串,接下來我們自定義一個UDF函數來支持這種場景。
官網不建議使用DATE_FORMAT(timestamp, string) 這種方式

RetractPvUvSQL 代碼
public class RetractPvUvSQL {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStreamSource<PageVisit> input = env.fromElements(
new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));
// register the DataStream as table "visit_table"
tEnv.registerDataStream("visit_table", input, "visitTime, userId, visitPage");
Table table = tEnv.sqlQuery(
"SELECT " +
"visitTime, " +
"DATE_FORMAT(max(visitTime), 'HH') as ts, " +
"count(userId) as pv, " +
"count(distinct userId) as uv " +
"FROM visit_table " +
"GROUP BY visitTime");
DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(table, Row.class);
if (params.has("output")) {
String outPath = params.get("output");
System.out.println("Output path: " + outPath);
dataStream.writeAsCsv(outPath);
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
dataStream.print();
}
env.execute();
}
/**
* Simple POJO containing a website page visitor.
*/
public static class PageVisit {
public String visitTime;
public long userId;
public String visitPage;
// public constructor to make it a Flink POJO
public PageVisit() {
}
public PageVisit(String visitTime, long userId, String visitPage) {
this.visitTime = visitTime;
this.userId = userId;
this.visitPage = visitPage;
}
@Override
public String toString() {
return "PageVisit " + visitTime + " " + userId + " " + visitPage;
}
}
}
UDF實現
實現參數為字符串的日期解析
public class DateFormat extends ScalarFunction {
public String eval(Timestamp t, String format) {
return new SimpleDateFormat(format).format(t);
}
/**
* 默認日期格式:yyyy-MM-dd HH:mm:ss
*
* @param t
* @param format
* @return
*/
public static String eval(String t, String format) {
try {
Date originDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(t);
return new SimpleDateFormat(format).format(originDate);
} catch (ParseException e) {
throw new RuntimeException("日期:" + t + "解析為格式" + format + "出錯");
}
}
}
因為flink 已經內置DATE_FORMAT函數,這里我們改個名字:DATEFORMAT
//register the function
tEnv.registerFunction("DATEFORMAT", new DateFormat());
Table table = tEnv.sqlQuery(
"SELECT " +
"visitTime, " +
"DATEFORMAT(max(visitTime), 'HH') as ts, " +
"count(userId) as pv, " +
"count(distinct userId) as uv " +
"FROM visit_table " +
"GROUP BY visitTime");
從UDF函數注冊源碼看,自定義函數在Table API或SQL API 都可以使用
/**
* Registers a [[ScalarFunction]] under a unique name. Replaces already existing
* user-defined functions under this name.
*/
def registerFunction(name: String, function: ScalarFunction): Unit = {
// check if class could be instantiated
checkForInstantiation(function.getClass)
// register in Table API
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
functionCatalog.registerSqlFunction(
createScalarSqlFunction(name, name, function, typeFactory)
)
}
執行的結果:
printing result to stdout. Use --output to specify output path.
6> (true,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,1,1)
4> (false,2017-09-16 09:00:00,09,1,1)
6> (false,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,2,1)
6> (true,2017-09-16 10:30:00,10,2,1)
6> (false,2017-09-16 10:30:00,10,2,1)
6> (true,2017-09-16 10:30:00,10,3,1)
Process finished with exit code 0
我們看下這個結果是什么意思:
Flink RetractStream 用true或false來標記數據的插入和撤回,返回true代表數據插入,false代表數據的撤回,在網上看到一個圖很直觀地說明RetractStream 為什么存在?


看我們的source數據,9點與10點半的數據剛開始pv,uv都為新增,對應的第二條數據來的時候,pv發生變化, 此時要撤掉第一次的結果,更新為新的結果數據 ,就好比我們有時候更新數據的一種辦法先刪除再插入,后面到來的數據以此類推。
總結
1.Flink處理數據把表轉換為流的時候,可以使用toAppendStream與toRetractStream,前者適用於數據追加的場景, 后者適用於更新,刪除場景
2.FlinkSQL中可以使用我們自定義的函數.Flink UDF自定義函數實現:evaluation方法必須定義為public,命名為eval。evaluation方法的輸入參數類型和返回值類型決定着函數的輸入參數類型和返回值類型。evaluation方法也可以被重載實現多個eval。同時evaluation方法支持變參數,例如:eval(String... strs)。
