flink入門基本使用


依據flink官網-信用卡欺詐檢測例子進行測試flink
flink官網:
通過 Flink DataStream API 來實現一個有狀態流處理程序
1,執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2,創建數據源 TransactionSource為flink的一個示例數據源,可以模擬一個無限循環生成信用卡模擬交易數據的數據源,模擬一個無界數據流,
DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions");
這個數據源需要添加如下依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_2.12</artifactId>
<version>1.12.0</version>
</dependency>
3,對事件分區 & 業務邏輯處理
DataStream#keyBy 對流進行分區
process() 函數對流數據進行業務處理
DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector");
4,輸出結果 
sink 會將 DataStream 寫出到外部系統,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。 AlertSink 使用 INFO 的日志級別打印每一個 Alert 的數據記錄,而不是將其寫入持久存儲,以便你可以方便地查看結果。
alerts.addSink(new AlertSink());
5,運行作業
 調用 StreamExecutionEnvironment#execute 時給任務傳遞一個任務名參數,就可以開始運行任務
env.execute("Fraud Detection");
6,業務處理器
 FraudDetector 是 KeyedProcessFunction 接口的一個實現。 他的方法 KeyedProcessFunction#processElement 將會在每個事件上被調用
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; @Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } }
 
flink狀態類: ValueState
這是一種能夠為被其封裝的變量添加容錯能力的類型。 ValueState 是一種 keyed state,也就是說它只能被用於 keyed context 提供的 operator 中,即所有能夠緊隨 DataStream#keyBy 之后被調用的operator。 一個 operator 中的 keyed state 的作用域默認是屬於它所屬的 key 的。 這個例子中,key 就是當前正在處理的交易行為所屬的信用卡賬戶(key 傳入 keyBy() 函數調用),而 FraudDetector 維護了每個帳戶的標記狀態。 ValueState 需要使用 ValueStateDescriptor 來創建,ValueStateDescriptor 包含了 Flink 如何管理變量的一些元數據信息。狀態在使用之前需要先被注冊。 狀態需要使用 open() 函數來注冊狀態
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final long serialVersionUID = 1L; private transient ValueState<Boolean> flagState; @Override public void open(Configuration parameters) { ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( "flag", Types.BOOLEAN); flagState = getRuntimeContext().getState(flagDescriptor); }
ValueState 是一個包裝類,類似於 Java 標准庫里邊的 AtomicReference 和 AtomicLong。 它提供了三個用於交互的方法。update 用於更新狀態,value 用於獲取狀態值,還有 clear 用於清空狀態。 如果一個 key 還沒有狀態,例如當程序剛啟動或者調用過 ValueState#clear 方法時,ValueState#value 將會返回 null。 如果需要更新狀態,需要調用 ValueState#update 方法,直接更改 ValueState#value 的返回值可能不會被系統識別。 容錯處理將在 Flink 后台自動管理,你可以像與常規變量那樣與狀態變量進行交互。
下邊的示例,說明了如何使用標記狀態來追蹤可能的欺詐交易行為。
Java
@Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { // Get the current state for the current key Boolean lastTransactionWasSmall = flagState.value(); // Check if the flag is set if (lastTransactionWasSmall != null) { if (transaction.getAmount() > LARGE_AMOUNT) { // Output an alert downstream Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } // Clean up our state flagState.clear(); } if (transaction.getAmount() < SMALL_AMOUNT) { // Set the flag to true flagState.update(true); } }
Scala
對於每筆交易,欺詐檢測器都會檢查該帳戶的標記狀態。 請記住,ValueState 的作用域始終限於當前的 key,即信用卡帳戶。 如果標記狀態不為空,則該帳戶的上一筆交易是小額的,因此,如果當前這筆交易的金額很大,那么檢測程序將輸出報警信息。
在檢查之后,不論是什么狀態,都需要被清空。 不管是當前交易觸發了欺詐報警而造成模式的結束,還是當前交易沒有觸發報警而造成模式的中斷,都需要重新開始新的模式檢測。
最后,檢查當前交易的金額是否屬於小額交易。 如果是,那么需要設置標記狀態,以便可以在下一個事件中對其進行檢查。 注意,ValueState<Boolean> 實際上有 3 種狀態:unset (null),true,和 false,ValueState 是允許空值的。 我們的程序只使用了 unset (null) 和 true 兩種來判斷標記狀態被設置了與否。
 
maven打包並上傳至flink服務器
提交作業命令:
bin/flink run -c com.test.flinktest4.FraudDetectionJob /data/lts/flinktest4.jar 
如果打出的jar包不含依賴項的話,可以把依賴的jar包放到flink的lib文件夾,或放在其他位置,然后再flink安裝目錄的bin/config.sh腳本文件中設置該lib路徑:
依賴的jar包都可以放在這個目錄內
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:/opt/flink-1.12.0/lib/"
 
常用命令

flink提交job作業命令:
bin/flink run -c <入口類> -p <並行度> <jar包路徑> <啟動參數>
查看已提交的所有job
bin/flink list
取消job命令
bin/flink cancel <Job的ID>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM