第5章-實時計算-板塊和K線+第6章-實時預警-FlinkCEP


復習回顧

  • 1.指數業務

    和個股類似,稍作簡單修改即可

  • 2.板塊業務

    注意:

    板塊由個股組成,也就是說一個板塊下有多個個股

    板塊和個股的對應關系表:

    1612314561202

業務中需要將StockBean轉為SectorBean

1612314636230

  • 3.板塊秒級行情

    1612315368003

    1612315346826

個股核心業務開發-重點

個股分為如下子業務, 我們代碼中已經分配好子業務包類結構

序號 業務板塊 子業務
1 個股 秒級行情
2   分時行情
3   分時行情備份
4   個股漲跌幅
5   K線行情

1612056567391

1612056587535

個股行情-秒級行情

需求

對滬深兩市的個股數據按照個股代碼進行分組並進行窗口(5s滾動)計算,封裝為StockBean,計算結果寫入到HBase中

WhyHBase? ---HBase適合海量數據的存儲(課后復習HBase相關技術點,后面業務講完統一布置)

1612056828741

 

代碼實現-核心任務類

注意:

窗口開始和結束都是根據第一條數據和設置的窗口size生成的

第一條數據2021-01-28 18:00:00 每5s一個,

那么窗口就是:

2021-01-28 18:00:00 ~ 2021-01-28 18:00:05 ,

2021-01-28 18:00:05 ~ 2021-01-28 18:00:10

依次類推

因為我們指定了用事件時間

 
 
 
 
 
 
 
//設置使用事件時間env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//老版本該API沒過期,新版本過期的不需要設置
 

 

 
 
 
 
 
 
 
package cn.itcast.task;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.function.window.StockPutHBaseWindowFunction;
import cn.itcast.function.window.StockSecondsWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * Author itcast
 * Desc 個股秒級行情數據業務處理核心任務類
 * 需求: 對滬深兩市的個股數據按照個股代碼進行分組並進行窗口(5s滾動)計算,封裝為StockBean,計算結果寫入到HBase中
 */
public class StockSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        //.window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口計算
        //.sum--只支持簡單聚合
        //.reduce()--只支持簡單聚合
        //.process() //里面有context--可以
        .apply(new StockSecondsWindowFunction())//里面有Window--可以
        //TODO 4.數據合並
        //上面計算完,有很多分組,但是每個分組只有1條5s內最新的數據,
        //而后面需要把數據sink到HBase,為了提高性能,應該要批量插入數據到HBase
        //所以這里需要對數據進行合並
        //把上面各個分組的數據合到一起,這個窗口里面就是5s內各個個股的最新數據,也就是我們需要的結果
        .timeWindowAll(Time.seconds(5))
        //TODO 5.數據封裝
        //接下來要對窗口內的所有個股數據進行封裝為List<Put> ,方便后續批量插入到HBase
        .apply(new StockPutHBaseWindowFunction())
        //TODO 6.Sink到HBase
        .addSink(new HBaseSink(QuotConfig.STOCK_HBASE_TABLE_NAME));//"quot_stock"
    }
}
 

 

代碼實現-窗口計算函數/類

1612058229355

 
 
 
 
 
 
 
package cn.itcast.function.window;
import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Desc 把當前窗口(5s一個)中的CleanBean數據封裝為StockBean返回
 * 簡單來說就是把當前窗口中5s內的所有個股最新的CleanBean封裝為StockBean並返回
 */
public class StockSecondsWindowFunction implements WindowFunction<CleanBean, StockBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockBean> collector) throws Exception {
        //注意:
        // 當前5s窗口內有很多CleanBean,而要哪一個CleanBean的字段作為StockBean的字段呢?
        // 當前窗口內最新的CleanBean--就是eventTime最大的CleanBean
        //1.記錄最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {//進來的cleanBean
            //先假定第一個為最新的newCleanBean
            if (newCleanBean == null){
                newCleanBean = cleanBean;
            }
            //后面的每個進來的數據都要和暫時認為是最新的newCleanBean進行比較
            //如果當前的EventTime> 暫時的newCleanBean的EventTime,則當前這個為新的newCleanBean
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean  = cleanBean;
            }
        }
        //循環結束newCleanBean中存儲的就是當前最新的CleanBean
        //2.設置數據
        //對數據進行格式化:
        //1612058392042--long時間戳
        //20210131095952--格式化之后的long格式
        Long tradTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        
        StockBean stockBean = new StockBean();
        stockBean.setEventTime(newCleanBean.getEventTime());
        stockBean.setSecCode(newCleanBean.getSecCode());
        stockBean.setSecName(newCleanBean.getSecName());
        stockBean.setPreClosePrice(newCleanBean.getPreClosePx());
        stockBean.setOpenPrice(newCleanBean.getOpenPrice());
        stockBean.setHighPrice(newCleanBean.getMaxPrice());
        stockBean.setLowPrice(newCleanBean.getMinPrice());
        stockBean.setClosePrice(newCleanBean.getTradePrice());
        stockBean.setTradeVol(0l);//秒級行情不需要計算分時成交數據
        stockBean.setTradeAmt(0l);//秒級行情不需要計算分時成交數據
        stockBean.setTradeVolDay(newCleanBean.getTradeVolumn());
        stockBean.setTradeAmtDay(newCleanBean.getTradeAmt());
        stockBean.setTradeTime(tradTime);
        stockBean.setSource(newCleanBean.getSource());
        
        //3.返回數據
        collector.collect(stockBean);
    }
}
 

 

代碼實現-數據合並

 
 
 
 
 
 
 
 //TODO 4.數據合並
        //上面計算完,有很多分組,但是每個分組只有1條5s內最新的數據,
        //而后面需要把數據sink到HBase,為了提高性能,應該要批量插入數據到HBase
        //所以這里需要對數據進行合並
        //把上面各個分組的數據合到一起,這個窗口里面就是5s內各個個股的最新數據,也就是我們需要的結果
        .timeWindowAll(Time.seconds(5))
 

 

代碼實現-數據封裝

 
 
 
 
 
 
 
 //TODO 5.數據封裝
        //接下來要對窗口內的所有個股數據進行封裝為List<Put> ,方便后續批量插入到HBase
        .apply(new StockPutHBaseWindowFunction())
 

 

 
 
 
 
 
 
 
package cn.itcast.function.window;
import cn.itcast.bean.StockBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;
import java.util.ArrayList;
import java.util.List;
/**
 * Author itcast
 * Desc 將當前窗口內的所有StockBean封裝為List<Put>並返回
 */
public class StockPutHBaseWindowFunction implements AllWindowFunction<StockBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<StockBean> iterable, Collector<List<Put>> collector) throws Exception {
        //1.准備待返回List<Put>
        List<Put> list = new ArrayList<>();
        //2.封裝Put並放入list
        for (StockBean stockBean : iterable) {
            String rowkey = stockBean.getSecCode() + stockBean.getTradeTime(); //20210128180000
            String stockBeanJsonStr = JSON.toJSONString(stockBean);
            //Put表示放入到HBase中的每一條數據
            Put put = new Put(rowkey.getBytes());//創建Put並設置rowKey
            //指定Put的列族為info,列名為data,數據為stockBean的json字符串形式
            put.addColumn("info".getBytes(),"data".getBytes(),stockBeanJsonStr.getBytes());
            //將put放入list
            list.add(put);
        }
        //整個for循環走完,當前5s窗口內的各個個股的最新數據stockBean就已經封裝到List<Put>中了
        //3.返回List<Put>
        collector.collect(list);
    }
}
 

 

代碼實現-數據sink到HBase

 
 
 
 
 
 
 
 //TODO 6.Sink到HBase
        .addSink(new HBaseSink(QuotConfig.STOCK_HBASE_TABLE_NAME));//"quot_stock"
 

 

 
 
 
 
 
 
 
package cn.itcast.function.sink;
import cn.itcast.util.HBaseUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;
import java.util.List;
/**
 * Author itcast
 * Desc
 */
public class HBaseSink implements SinkFunction<List<Put>> {
    private String tableName;
    public HBaseSink(String tableName) {
        this.tableName = tableName;
    }
    @Override
    public void invoke(List<Put> value, Context context) throws Exception {
         System.out.println("正在調用工具類把數據批量插入到HBase");
        //調用工具類把數據存入HBase表中
        HBaseUtil.putList(tableName,value);
    }
}
 

 

 

測試

1.hdfs、zk和kafka啟動

 

2.啟動hbase

cd /export/servers/hbase-1.1.1/ bin/start-hbase.sh

 

3.進入hbase命令行

bin/hbase shell

 

4.刪除之前的表

list
disable "quot_index"
disable "quot_sector"
disable "quot_stock"
drop "quot_index"
drop "quot_sector"
drop "quot_stock"                                                                                                                             
                                                                                                        

 

disable “”

5.啟動程序

1612061815820

 

4.觀察hbase

list
scan "quot_stock",{LIMIT=>10}

1612061925434

 

個股行情-分時行情/分級行情

 

需求

回顧:Druid典型應用場景:

Kafka的sse和szse主題--->Flink處理--->Kafka的stock-sse和stock-szse主題--->Druid攝取-->實時查詢

1612062893534

 

按照上面的流程:

按照個股代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理並將結果寫入到Kafka的stock-sse和stock-szse主題主題,最終由Druid進行攝取並查詢

1612062831573

 

代碼實現-核心任務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 個股分時/分級行情數據業務處理核心任務類
 * 需求: 按照個股代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理並將結果寫入到Kafka的stock-sse和stock-szse主題主題,最終由Druid進行攝取並查詢
 */
public class StockMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        .timeWindow(Time.minutes(1))//60s
        //TODO 3.窗口計算CleanBean-->StockBean
        .apply(new StockMinutesWindowFunction())
        //TODO 4.數據分流--因為main中對滬深兩市數據進行了合並,這里后續又要把滬深兩市數據發到不同主題
        .process(new ProcessFunction<StockBean,StockBean>(){
            @Override
            public void processElement(StockBean stockBean, Context context, Collector<StockBean> collector) throws Exception {

            }
        });
        //TODO 5.數據sink到Kafka
        
    }
}

 

代碼實現-窗口計算函數/類

1612065273113

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 分時/分級行情數據處理窗口函數/類
 * 在該窗口里面需要對最近1min/60s的數據進行計算,要統計出這1分鍾,該個股的分級成交量和分級成交金額,也就這1分鍾內該股票的成交量和成交金額
 * 當前窗口這1分鍾該股票的成交量或成交金額 = 這一窗口該股票最新的日總成交量或成交金額 - 上一窗口該股票最新的日總成交量或成交金額
 * 所以應該要搞個東西保存下上一個窗口的數據,用狀態!
 *
 * interface WindowFunction<IN, OUT, KEY, W extends Window>
 * abstract class RichWindowFunction<IN, OUT, KEY, W extends Window>  這里需要用到rich里面的open等方法,所以需要使用RichWindowFunction
 */
public class StockMinutesWindowFunction extends RichWindowFunction<CleanBean, StockBean, String, TimeWindow> {
    //1.准備一個MapState用來存放上一個窗口的StockBean數據
    //MapState<個股代碼,上一個窗口的StockBean>
    private MapState<String,StockBean> stockState = null;

    //2.初始化State
    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, StockBean> stateDescriptor = new MapStateDescriptor<>("stockState", String.class, StockBean.class);
        stockState = getRuntimeContext().getMapState(stateDescriptor);
    }

    //3.計算當前窗口這1分鍾該股票的成交量或成交金額 = 這一窗口該股票最新的日總成交量或成交金額 - 上一窗口該股票最新的日總成交量或成交金額
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockBean> collector) throws Exception {
        //3.1記錄當前窗口最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if(newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean = cleanBean;
            }
        }

        //3.2獲取狀態中上一窗口的StockBean
        StockBean lastStockBean = stockState.get(newCleanBean.getSecCode());//根據key狀態中的value
        Long minutesVol = 0L;//分時成交量
        Long minutesAmt = 0L;//分時成交金額
        if(lastStockBean!=null){
            //3.3獲取上一個窗口的日總成交量和成交金額
            //當前窗口這1分鍾該股票的成交量或成交金額 = 這一窗口該股票最新的日總成交量或成交金額 - 上一窗口該股票最新的日總成交量或成交金額
            Long tradeVolDay = lastStockBean.getTradeVolDay();//上一個窗口的/上分鍾的日總成交量
            Long tradeAmtDay = lastStockBean.getTradeAmtDay();//上一個窗口的/上分鍾的日總成交金額

            ///3.4獲取當前窗口最新的日總成交量和成交金額
            Long tradeVolumn = newCleanBean.getTradeVolumn();
            Long tradeAmt = newCleanBean.getTradeAmt();

            //3.5計算當前窗口這1分鍾該股票的成交量或成交金額
            //當前窗口這1分鍾該股票的成交量或成交金額 = 這一窗口該股票最新的日總成交量或成交金額 - 上一窗口該股票最新的日總成交量或成交金額
            minutesVol = tradeVolumn - tradeVolDay;
            minutesAmt = tradeAmt - tradeAmtDay;
        }else{
            minutesVol = newCleanBean.getTradeVolumn();
            minutesAmt = newCleanBean.getTradeAmt();
        }

        //3.6封裝數據
        StockBean stockBean = new StockBean();
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        stockBean.setEventTime(newCleanBean.getEventTime());
        stockBean.setSecCode(newCleanBean.getSecCode());
        stockBean.setSecName(newCleanBean.getSecName());
        stockBean.setPreClosePrice(newCleanBean.getPreClosePx());
        stockBean.setOpenPrice(newCleanBean.getOpenPrice());
        stockBean.setHighPrice(newCleanBean.getMaxPrice());
        stockBean.setLowPrice(newCleanBean.getMinPrice());
        stockBean.setClosePrice(newCleanBean.getTradePrice());

        stockBean.setTradeVol(minutesVol);//分時行情需要計算的分時成交量/當前窗口這1分鍾該股票的成交量
        stockBean.setTradeAmt(minutesAmt);//分時行情需要計算的分時成交金額/當前窗口這1分鍾該股票的成交金額

        stockBean.setTradeVolDay(newCleanBean.getTradeVolumn());
        stockBean.setTradeAmtDay(newCleanBean.getTradeAmt());
        stockBean.setTradeTime(tradeTime);
        stockBean.setSource(newCleanBean.getSource());

        //3.7返回結果
        collector.collect(stockBean);

        //3.8更新State
        stockState.put(stockBean.getSecCode(),stockBean);
    }

}

 

代碼實現-分流+Sink

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Properties;

/**
 * Author itcast
 * Desc 個股分時/分級行情數據業務處理核心任務類
 * 需求: 按照個股代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理並將結果寫入到Kafka的stock-sse和stock-szse主題主題,最終由Druid進行攝取並查詢
 */
public class StockMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備側道輸出流用來存放分流后的結果
        OutputTag<StockBean> sseOutputTag = new OutputTag<>("sse", TypeInformation.of(StockBean.class));
        OutputTag<StockBean> szseOutputTag = new OutputTag<>("szse", TypeInformation.of(StockBean.class));


        SingleOutputStreamOperator<StockBean> processDS =
                //TODO 1.分組
                watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))//60s
                //TODO 3.窗口計算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                //TODO 4.數據分流--因為main中對滬深兩市數據進行了合並,這里后續又要把滬深兩市數據發到不同主題
                .process(new ProcessFunction<StockBean, StockBean>() {
                    @Override
                    public void processElement(StockBean stockBean, Context context, Collector<StockBean> collector) throws Exception {
                        if (stockBean.getSource().equals("sse")) {//sse滬市
                            context.output(sseOutputTag, stockBean);
                        } else {//szse深市
                            context.output(szseOutputTag, stockBean);
                        }
                    }
                });
        //獲取分流結果
        DataStream<StockBean> sseDS = processDS.getSideOutput(sseOutputTag);
        DataStream<StockBean> szseDS = processDS.getSideOutput(szseOutputTag);

        //TODO 5.數據sink到Kafka
        //數據sink到Kafka后需要被Driud攝取,而Druid攝取Kafka的數據並解析,需要數據是json格式!
        //所以先把數據變為json
        SingleOutputStreamOperator<String> sseJsonDS = sseDS.map(new MapFunction<StockBean, String>() {
            @Override
            public String map(StockBean stockBean) throws Exception {
                return JSON.toJSONString(stockBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseDS.map(new MapFunction<StockBean, String>() {
            @Override
            public String map(StockBean stockBean) throws Exception {
                return JSON.toJSONString(stockBean);
            }
        });
        //sink到kafka的不同主題
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);

        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_STOCK_TOPIC, new SimpleStringSchema(), props);//FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_STOCK_TOPIC, new SimpleStringSchema(), props);
        
        System.out.println("滬深兩市數據將要寫入Kafka的stock-sse和stock-szse主題");
        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

    }
}

 

測試

1.啟動zk/kafka/Druid

2.准備主題

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic stock-sse
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic stock-szse

3.創建Druid數據源,讓Druid實時攝取stock-sse和stock-szse

可以通過webUI配置也可以直接使用postman中配置好的

1612076307417

 

1612076349896

檢查WebUI發現Druid正在實時攝取Kafka數據

http://node01:8090/console.html

1612076409888

 

4.啟動程序

 

5.觀察Kafka中是否有數據

1612076682822

如果亂碼,進行如下設置

1612078753378

6.查詢Druid中是否有數據(需要等待Druid反應過來或running攝取重置一下或Druid重啟一下)

1612077589525

1612077613529

 

 

個股行情-分級行情備份

需求

1612078952955

 

代碼實現-核心業務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.map.StockPutHDFSMapFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 個股分時/分級行情數據業務處理核心任務類--分時行情數據備份到HDFS,前面的計算和分時/分級行情一樣算即可
 */
public class StockMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備HDFS-Sink
        //之前學習的新版本可以用StreamingFileSink或FileSink,但需要注意支持的hadoop版本為2.7+,而我們項目中用的cdh5.14是2.6
        //所以這里不能使用之前學習的Flink新特性中的Sink,得用老的BucketingSink
        //path為: stock.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/stock/
        BucketingSink<String> bucketingSink = new BucketingSink<>(QuotConfig.STOCK_SEC_HDFS_PATH);
        //文件大小: hdfs.batch=1073741824
        bucketingSink.setBatchSize(Long.parseLong(QuotConfig.HDFS_BATCH));
        //分桶/分文件夾策略:hdfs.bucketer=yyyyMMdd
        bucketingSink.setBucketer(new DateTimeBucketer(QuotConfig.HDFS_BUCKETER));

        //前綴--不設置會有默認的
        bucketingSink.setInProgressPrefix("stock_");
        bucketingSink.setPendingPrefix("stock2_");//掛起狀態的前綴
        //后綴--不設置會有默認的
        bucketingSink.setInProgressSuffix(".txt");
        bucketingSink.setPendingSuffix(".txt");

           /*
        https://blog.csdn.net/kisimple/article/details/83998238
        文件名格式為{狀態_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{狀態_suffix};
        in-progress,正在寫入。
        pending,等待Checkpoint。
        finished,Checkpoint-ok,寫入成功
        */

        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))//60s
                //TODO 3.窗口計算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                //TODO 4.數據轉換:將StockBean轉為|分割的普通文本
                .map(new StockPutHDFSMapFunction())
                //TODO 5.Sink到HDFS方便后續使用其他技術做離線分析,如SparkSQL/HiveSQL(格式使用普通文本,字段用|分割)
                .addSink(bucketingSink);

    }
}

 

 

 

代碼實現-數據轉換拼接

package cn.itcast.function.map;

import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

import java.sql.Timestamp;

/**
 * Author itcast
 * Desc 將StockBean轉為|分割的普通文本
 */
public class StockPutHDFSMapFunction implements MapFunction<StockBean, String> {
    @Override
    public String map(StockBean stockBean) throws Exception {
        //獲取分隔符
        String seperator = QuotConfig.HDFS_SEPERATOR;//就是|

        //獲取交易日期
        String tradeDate = DateUtil.longTimestamp2String(stockBean.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);

        //字段拼接
        //順序:
        //Timestamp|date|secCode|secName|preClosePrice|openPirce|highPrice|
        //lowPrice|closePrice|tradeVol|tradeAmt|tradeVolDay|tradeAmtDay|source
        StringBuilder sb = new StringBuilder();

        sb.append(new Timestamp(stockBean.getEventTime())).append(seperator)
                .append(tradeDate).append(seperator)
                .append(stockBean.getSecCode()).append(seperator)
                .append(stockBean.getSecName()).append(seperator)
                .append(stockBean.getPreClosePrice()).append(seperator)
                .append(stockBean.getOpenPrice()).append(seperator)
                .append(stockBean.getHighPrice()).append(seperator)
                .append(stockBean.getLowPrice()).append(seperator)
                .append(stockBean.getClosePrice()).append(seperator)
                .append(stockBean.getTradeVol()).append(seperator)
                .append(stockBean.getTradeAmt()).append(seperator)
                .append(stockBean.getTradeVolDay()).append(seperator)
                .append(stockBean.getTradeAmtDay()).append(seperator)
                .append(stockBean.getSource());

        return sb.toString();
    }
}

 

測試

0.啟動zk和kafka

1.把HDFS之前的數據清掉

hadoop fs -rmr /quot_data/dev/*

2.啟動程序

3.觀察HDFS數據

http://node01:50070/explorer.html#/quot_data/dev/stock/20210131

 

 

個股行情-個股漲跌/漲跌幅/振幅

需求

和之前的分時行情類似, 對實時行情數據按照code分組后,每隔60s/1min,計算個股的漲跌幅,並將數據發送的Kafka,最后由Druid攝取,提供實時分析

1612082008793

 

代碼實現-核心業務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockIncreaseBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.StockMinutesIncreaseWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Author itcast
 * Desc 個股分時/分級行情數據業務處理核心任務類--個股漲跌幅
 * 需求:和之前的分時行情類似, 對實時行情數據按照code分組后,每隔60s/1min,計算個股的漲跌幅,並將數據發送的Kafka(一個主題/兩個主題都行),最后由Druid攝取,提供實時分析
 */
public class StockIncreaseTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //#個股分時漲跌幅: stock.increase.topic=stock-increase
        FlinkKafkaProducer011<String> kafkaSink = new FlinkKafkaProducer011<>(QuotConfig.STOCK_INCREASE_TOPIC, new SimpleStringSchema(), props);
        
        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.minutes(1))
        //TODO 3.窗口計算-CleanBean轉了StockIncreaseBean(里面封裝了漲跌/漲跌幅/振幅)
        .apply(new StockMinutesIncreaseWindowFunction())
        //TODO 4.數據轉為Json
        .map(new MapFunction<StockIncreaseBean, String>() {
            @Override
            public String map(StockIncreaseBean stockIncreaseBean) throws Exception {
                return JSON.toJSONString(stockIncreaseBean);
            }
        })
        //TODO 5.Sink到Kafka
        .addSink(kafkaSink);
    }
}


 

代碼實現-窗口函數/類-完成漲跌幅計算

1612082525962

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockIncreaseBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.math.RoundingMode;

/**
 * Author itcast
 * Desc 需求:計算當前窗口/每1min的數據的漲跌幅並封裝為StockIncreaseBean返回
 */
public class StockMinutesIncreaseWindowFunction implements WindowFunction<CleanBean, StockIncreaseBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockIncreaseBean> collector) throws Exception {
        //1.記錄最新的數據
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if(newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean = cleanBean;
            }
        }
        //2.計算漲跌幅/漲跌/振幅--業務(按照文檔/產品經理/領導的要求來)
        //漲跌 = 當前價(最新價) - 前收盤價 = newCleanBean.getTradePrice() - newCleanBean.getPreClosePx()
        BigDecimal  updown = newCleanBean.getTradePrice().subtract(newCleanBean.getPreClosePx());
        //漲跌幅 = (當前價-前收盤價)/ 前收盤價 * 100% = (newCleanBean.getTradePrice()- newCleanBean.getPreClosePx()) / newCleanBean.getPreClosePx();
        BigDecimal  increase = newCleanBean.getTradePrice().subtract( newCleanBean.getPreClosePx()).divide(newCleanBean.getPreClosePx(),2, RoundingMode.HALF_UP);
        //振幅 = (當日最高點的價格-當日最低點的價格)/前收盤價 ×100%
        BigDecimal amplitude = newCleanBean.getMaxPrice().subtract(newCleanBean.getMinPrice()).divide(newCleanBean.getPreClosePx(),2, RoundingMode.HALF_UP);

        //3.數據封裝
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        StockIncreaseBean stockIncreaseBean = new StockIncreaseBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                increase,
                newCleanBean.getTradePrice(),
                updown,
                newCleanBean.getTradeVolumn(),
                amplitude,
                newCleanBean.getPreClosePx(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );

        //4.返回結果
        collector.collect(stockIncreaseBean);
    }
}


測試

0.啟動zk/kafka/druid

1.准備kafka主題stock-increase(也可以不創建,讓自動生成)

2.使用druid攝取kafka主題stock-increase

1612084239791

3.觀察druid

http://node01:8090/console.html

1612084303121

 

4.啟動程序

 

5.觀察kafka

1612084451833

6.查詢druid(需要等待Druid反應過來或running攝取重置一下或Druid重啟一下)

1612084573166

1612084540418

 

指數核心業務開發-重點

程序入口類

把之前的個股的程序入口類拿過來里面的個股改成指數即可

課后不熟悉的同學需要再敲一遍

package cn.itcast.app;

import cn.itcast.avro.AvroDeserializeSchema;
import cn.itcast.avro.SseAvro;
import cn.itcast.avro.SzseAvro;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.util.QuotUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Properties;

/**
 * Author itcast
 * Desc 指數實時行情數據處理業務入口類
 */
public class IndexStreamApplication {
    public static void main(String[] args) throws Exception {
        //TODO 0.env-流環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設置環境參數
        //-學習測試為了方便觀察可以設置並行度為1
        env.setParallelism(1);
        //設置使用事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//老版本該API沒過期,新版本過期的不需要設置
        //注意:Checkpoint是Flink容錯機制,上線是開啟,開發測試可以注掉!
        //-Checkpoint
        /*env.enableCheckpointing(5000);//開啟ckp
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/checkpoint"));
        }
        //===========類型2:建議參數===========
        //設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等 500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)
        //如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0
        //設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是  false不是
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是true
        //env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗
        //設置是否清理檢查點,表示 Cancel 時是否需要保留當前的 Checkpoint,默認 Checkpoint會在作業被Cancel時被刪除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========類型3:直接使用默認的即可===============
        //設置checkpoint的執行模式為EXACTLY_ONCE(默認)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //設置checkpoint的超時時間,如果 Checkpoint在 60s內尚未完成說明該次Checkpoint失敗,則丟棄。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鍾
        //設置同一時間有多少個checkpoint可以同時執行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1
        //固定延遲重啟--開發中常用
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 最多重啟3次數
                Time.of(5, TimeUnit.SECONDS) // 重啟時間間隔
        ));
        //上面的設置表示:如果job失敗,重啟3次, 每次間隔5s*/

        //TODO 1.source-kafka的主題sse滬市和szse深市
        //准備kafka參數
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);//集群地址192.168.52.100:9092
        props.setProperty("group.id", QuotConfig.GROUP_ID);//消費者組名稱
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//動態分區檢測,開一個后台線程每隔5s檢查Kafka的分區狀態
        //props.setProperty("enable.auto.commit", "true");//自動提交(提交到默認主題,后續學習了Checkpoint后隨着Checkpoint存儲在Checkpoint和默認主題中)
        //props.setProperty("auto.commit.interval.ms", "2000");//自動提交的時間間隔
        //props.setProperty("auto.offset.reset","latest");//latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費 /earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費

        //注意:從兩市消費的數據是經過Avro序列化之后的二進制數據
        //那么消費時就不能使用簡單的SimpleStringSchema反序列化,而應該進行自定義的反序列化
        //sse滬市
        FlinkKafkaConsumer011<SseAvro> sseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SSE_TOPIC), props);//注意新版本直接new FlinkKafkaConsumer,老版本需要指定kafka的版本
        //szse深市
        FlinkKafkaConsumer011<SzseAvro> szseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SZSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SZSE_TOPIC), props);

        //sseKafkaSource.setCommitOffsetsOnCheckpoints(true);//默認就是true
        //szseKafkaSource.setCommitOffsetsOnCheckpoints(true);//默認就是true

        //問題:那么如果Flink從Kafka消費消息,記錄了偏移量,那么再次啟動就會從偏移量位置開始消費!那么每次啟動Flink測試都要去發數據給Kafka!很麻煩!
        //那怎么辦?--用props.setProperty("auto.offset.reset","earliest");不可以,因為它只有第一次生效,后面不生效
        //所以應該告訴Flink不要管auto.offset.reset的值和記錄的offset,直接每次從最早的數據開始消費,便於開發測試
        sseKafkaSource.setStartFromEarliest();
        szseKafkaSource.setStartFromEarliest();

        DataStreamSource<SseAvro> sseAvroDS = env.addSource(sseKafkaSource);
        DataStreamSource<SzseAvro> szseAvroDS = env.addSource(szseKafkaSource);

        //sseAvroDS.print("滬市>>>");
        //szseAvroDS.print("深市>>>");

        //TODO 2.transformation--預處理:過濾/轉換/合並....
        //需要檢查數據正常和時間是在正常交易時間內
        //過濾出滬深兩市的合法數據/過濾掉非法數據
        SingleOutputStreamOperator<SseAvro> sseAvroFiltedDS = sseAvroDS.filter(new FilterFunction<SseAvro>() {
            @Override
            public boolean filter(SseAvro sseAvro) throws Exception {
                //雖然是下面這樣調用的,但其實對時間的判斷是根據配置文件中的配置open.time=00:00和close.time=23:59來的,也就是說數據時間都合法,也是為了方便測試
                if (QuotUtil.checkData(sseAvro) && QuotUtil.checkTime(sseAvro)) {//合法
                    return true;
                } else {//非法
                    return false;
                }
            }
        });
        SingleOutputStreamOperator<SzseAvro> szseAvroFiltedDS = szseAvroDS.filter(new FilterFunction<SzseAvro>() {
            @Override
            public boolean filter(SzseAvro szseAvro) throws Exception {
                return QuotUtil.checkData(szseAvro) && QuotUtil.checkTime(szseAvro);
            }
        });

        //sseAvroFiltedDS.print("滬市過濾后的合法數據>>");
        //szseAvroFiltedDS.print("深市過濾后的合法數據>>");


        //代碼走到這里說明數據都是過濾后的合法的"冰清玉潔"的數據!就應該將里面的重新指數數據封裝為CleanBean
        //將滬深兩市里面的AvroBean--->CleanBean
        SingleOutputStreamOperator<CleanBean> sseCleanBeanDS = sseAvroFiltedDS.map(new MapFunction<SseAvro, CleanBean>() {
            @Override
            public CleanBean map(SseAvro sseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean();
                cleanBean.setMdStreamId(sseAvro.getMdStreamID().toString());
                cleanBean.setSecCode(sseAvro.getSecurityID().toString());
                cleanBean.setSecName(sseAvro.getSymbol().toString());
                cleanBean.setTradeVolumn(sseAvro.getTradeVolume());
                cleanBean.setTradeAmt(sseAvro.getTotalValueTraded());
                cleanBean.setPreClosePx(new BigDecimal(sseAvro.getPreClosePx()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setOpenPrice(new BigDecimal(sseAvro.getOpenPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMaxPrice(new BigDecimal(sseAvro.getHighPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMinPrice(new BigDecimal(sseAvro.getLowPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setTradePrice(new BigDecimal(sseAvro.getTradePrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setEventTime(sseAvro.getTimestamp());
                cleanBean.setSource("sse");
                return cleanBean;
            }
        });

        SingleOutputStreamOperator<CleanBean> szseCleanBeanDS = szseAvroFiltedDS.map(new MapFunction<SzseAvro, CleanBean>() {
            @Override
            public CleanBean map(SzseAvro szseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean(
                        szseAvro.getMdStreamID().toString(),
                        szseAvro.getSecurityID().toString(),
                        szseAvro.getSymbol().toString(),
                        szseAvro.getTradeVolume(),
                        szseAvro.getTotalValueTraded(),
                        BigDecimal.valueOf(szseAvro.getPreClosePx()),
                        BigDecimal.valueOf(szseAvro.getOpenPrice()),
                        BigDecimal.valueOf(szseAvro.getHighPrice()),
                        BigDecimal.valueOf(szseAvro.getLowPrice()),
                        BigDecimal.valueOf(szseAvro.getTradePrice()),
                        szseAvro.getTimestamp(),
                        "szse"
                );
                return cleanBean;
            }
        });


        //現將滬深兩市的CleanBean做一個合並,然后過濾出里面的指數
        DataStream<CleanBean> unionDS = sseCleanBeanDS.union(szseCleanBeanDS);

        //現在手里拿到的就是滬深兩市過濾清理后的合法的指數數據
        SingleOutputStreamOperator<CleanBean> indexDS = unionDS.filter(new FilterFunction<CleanBean>() {
            @Override
            public boolean filter(CleanBean cleanBean) throws Exception {
                return QuotUtil.isIndex(cleanBean);
            }
        });

        //indexDS.print("合並后的滬深兩市的指數的CleanBean>>");


        //TODO 3.transformation--Watermark
        //開發測試時Watermark可以要可以不要
        //注意:老版本WatermarkAPI和新版本的不一樣
        //Watermark = 當前最大的事件時間 - 最大允許的亂序度
        //Watermark的作用: 觸發窗口計算!
        //Watermark如何觸發: Watermark >= 窗口結束時間時觸發計算 (該窗口里面得有數據)
        env.getConfig().setAutoWatermarkInterval(200);//默認就是200ms,表示每隔多久去給數據添加一次Watermaker,但在新版本中該API已過期
        SingleOutputStreamOperator<CleanBean> watermarkDS = indexDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<CleanBean>(Time.seconds(2)) {//設置最大亂序度為2s
                    @Override
                    public long extractTimestamp(CleanBean cleanBean) {
                        return cleanBean.getEventTime();//指定事件時間列
                    }
                });
        watermarkDS.print("指數:watermarkDS>>");

        //TODO 4.指數核心業務+Sink....
        //TODO 指數核心業務1.秒級行情/每5s計算最新的指數價格行情數據
        //new IndexSecondsTask().process(watermarkDS);
        //TODO 指數核心業務2.分級行情/每1min計算最新的指數價格行情數據
        //new IndexMinutesTask().process(watermarkDS);
        //TODO 指數核心業務3.分級行情備份/每1min備份最新的指數價格行情數據
        //new IndexMinutesBackupTask().process(watermarkDS);
        //TODO 指數核心業務4.指數漲跌幅
        //new IndexIncreaseTask().process(watermarkDS);//課后作業
        //TODO 指數核心業務5.K線行情/日K,周K,月K---后面單獨做
        //new IndexKlineTask().process(watermarkDS);

        //TODO 5.execute
        env.execute();
    }
}

1612143770333

1612143747789

指數行情-秒級行情

需求

對滬深兩市的指數數據按照指數代碼進行分組並進行窗口(5s滾動)計算,封裝為IndexBean,計算結果寫入到HBase中

注意:把個股換成指數即可

![1612056828741](../../../../授課/202009-47/今日指數-day06/筆記/筆記-day06.assets/1612056828741.png)

代碼實現-核心任務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 指數秒級行情數據業務處理核心任務類
 * 需求: 對滬深兩市的指數數據按照指數代碼進行分組並進行窗口(5s滾動)計算,封裝為IndexBean,計算結果寫入到HBase中
 */
public class IndexSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口計算:CleanBean-->IndexBean
        .apply(new IndexSecondsWindowFunction())
        //TODO 4.數據合並
        .timeWindowAll(Time.seconds(5))
        //TODO 5.封裝為List<Put>
        .apply(new IndexPutHBaseWindowFunction())
        //TODO 6.批量Sink到HBase:index.hbase.table.name=quot_index
        .addSink(new HBaseSink(QuotConfig.INDEX_HBASE_TABLE_NAME));
    }
}

 

代碼實現-窗口計算函數/類

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 把當前5s窗口內的最新的CleanBean轉為IndexBean返回
 */
public class IndexSecondsWindowFunction implements WindowFunction<CleanBean, IndexBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<IndexBean> collector) throws Exception {
        //1.記錄最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if (newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean  = cleanBean;
            }
        }
        //2.數據封裝
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), "yyyyMMddHHmmss");
        IndexBean indexBean = new IndexBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                newCleanBean.getPreClosePx(),
                newCleanBean.getOpenPrice(),
                newCleanBean.getMaxPrice(),
                newCleanBean.getMinPrice(),
                newCleanBean.getTradePrice(),
                0l,0l,
                newCleanBean.getTradeVolumn(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );

        //3.返回結果
        collector.collect(indexBean);
    }
}

 

代碼實現-數據合並

 //TODO 4.數據合並
        .timeWindowAll(Time.seconds(5))

 

代碼實現-數據封裝

package cn.itcast.function.window;

import cn.itcast.bean.IndexBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;

import java.util.ArrayList;
import java.util.List;

/**
 * Author itcast
 * Desc 將當前窗口內的數據封裝為List<Put>方便后續使用HBase批量插入方法
 */
public class IndexPutHBaseWindowFunction implements AllWindowFunction<IndexBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<IndexBean> iterable, Collector<List<Put>> collector) throws Exception {
        List<Put> list = new ArrayList<>();
        for (IndexBean indexBean : iterable) {
            String rowKey = indexBean.getIndexCode() + indexBean.getTradeTime();
            Put put = new Put(rowKey.getBytes());
            String jsonStr = JSON.toJSONString(indexBean);
            put.addColumn("info".getBytes(),"data".getBytes(),jsonStr.getBytes());
            list.add(put);
        }
        collector.collect(list);
    }
}

 

代碼實現-Sink到HBase

 //TODO 6.批量Sink到HBase:index.hbase.table.name=quot_index
        .addSink(new HBaseSink(QuotConfig.INDEX_HBASE_TABLE_NAME));

 

測試

1.zk/kafka/hbase啟動

2.啟動程序IndexStreamApplication

3.登錄hbase查看數據

bin/hbase shell
list
scan "quot_index",{LIMIT=>10}

1612146750236

 

指數行情-分時/分級行情

需求

按照指數代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理並將結果寫入到Kafka的index-sse和index-szse主題主題,最終由Druid進行攝取並查詢

1612062831573

代碼實現-核心任務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Properties;

/**
 * Author itcast
 * Desc 指數分時/分級行情數據業務處理核心任務類
 * 需求:按照指數代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理並將結果寫入到Kafka的index-sse和index-szse主題主題,
 * 最終由Druid進行攝取並查詢
 */
public class IndexMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備側道數據流用來存放分流后的數據
        OutputTag<IndexBean> sseOutputTag = new OutputTag<>("sse", TypeInformation.of(IndexBean.class));
        OutputTag<IndexBean> szseOutputTag = new OutputTag<>("szse", TypeInformation.of(IndexBean.class));

        //TODO 1.分組
        SingleOutputStreamOperator<IndexBean> processDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口計算:CleanBean-->IndexBean
                .apply(new IndexMinutesWindowFunction())
                //TODO 4.分流
                .process(new ProcessFunction<IndexBean, IndexBean>() {
                    @Override
                    public void processElement(IndexBean indexBean, Context context, Collector<IndexBean> collector) throws Exception {
                        if (indexBean.getSource().equals("sse")) {
                            context.output(sseOutputTag, indexBean);
                        } else {
                            context.output(szseOutputTag, indexBean);
                        }
                    }
                });
        DataStream<IndexBean> sseIndexDS = processDS.getSideOutput(sseOutputTag);
        DataStream<IndexBean> szseIndexDS = processDS.getSideOutput(szseOutputTag);

        SingleOutputStreamOperator<String> sseJsonDS = sseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });

        //TODO 5.Sink到Kafka的不同主題(最終到Druid)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.index.topic=index-sse
        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_INDEX_TOPIC, new SimpleStringSchema(), props);
        //szse.index.topic=index-szse
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_INDEX_TOPIC, new SimpleStringSchema(), props);

        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

    }
}

 

代碼實現-窗口計算函數/類

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 將當前60s窗口最新的CleanBean轉為IndexBean
 * 注意需要封裝分時成交量和成交金額
 * 這一分鍾的分時成交量或成交金額 = 當前窗口最新的日總成交量或成交金額  - 上一個窗口的最新的日總成交量或成交金額
 * 當前窗口最新的可以通過newCleanBean獲取
 * 上一個窗口的最新可以通過狀態保存和獲取
 */
public class IndexMinutesWindowFunction extends RichWindowFunction<CleanBean, IndexBean, String, TimeWindow> {
    //1.准備一個MapState用來存放上一個窗口的StockBean數據
    //MapState<指數代碼,上一個窗口的IndexBean>
    private MapState<String, IndexBean> indexState = null;

    //2.初始化狀態
    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, IndexBean> stateDescriptor = new MapStateDescriptor<>("indexState", String.class, IndexBean.class);
        indexState = getRuntimeContext().getMapState(stateDescriptor);
    }


    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<IndexBean> collector) throws Exception {
        //3.獲取newCleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if (newCleanBean == null) {
                newCleanBean = cleanBean;
            }
            if (cleanBean.getEventTime() > newCleanBean.getEventTime()) {
                newCleanBean = cleanBean;
            }
        }

        //4.從狀態中獲取上一窗口的IndexBean
        IndexBean lastIndexBean = indexState.get(newCleanBean.getSecCode());
        Long minutesVol = 0L;//要計算的分時成交量
        Long minutesAmt = 0L;//要計算的分時成交金額
        //這一分鍾的分時成交量或成交金額 = 當前窗口最新的日總成交量或成交金額  - 上一個窗口的最新的日總成交量或成交金額
        if (lastIndexBean != null) {
            //5.當前窗口最新的日總成交量和成交金額
            Long currentTradeVolumn = newCleanBean.getTradeVolumn();
            Long currentTradeAmt = newCleanBean.getTradeAmt();

            //6.獲取上一個窗口的最新的日總成交量和成交金額
            Long tradeVolDay = lastIndexBean.getTradeVolDay();
            Long tradeAmtDay = lastIndexBean.getTradeAmtDay();

            //7.計算這一分鍾的分時成交量和成交金額
            minutesVol = currentTradeVolumn - tradeVolDay;
            minutesAmt = currentTradeAmt - tradeAmtDay;

        } else {
            minutesVol = newCleanBean.getTradeVolumn();
            minutesAmt = newCleanBean.getTradeAmt();
        }

        //8.數據封裝
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), "yyyyMMddHHmmss");
        IndexBean indexBean = new IndexBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                newCleanBean.getPreClosePx(),
                newCleanBean.getOpenPrice(),
                newCleanBean.getMaxPrice(),
                newCleanBean.getMinPrice(),
                newCleanBean.getTradePrice(),
                minutesVol, minutesAmt,
                newCleanBean.getTradeVolumn(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );
        collector.collect(indexBean);

        //9.更新狀態
        indexState.put(indexBean.getIndexCode(),indexBean);

    }
}

 

代碼實現-分流+Sink

//TODO 4.分流
                .process(new ProcessFunction<IndexBean, IndexBean>() {
                    @Override
                    public void processElement(IndexBean indexBean, Context context, Collector<IndexBean> collector) throws Exception {
                        if (indexBean.getSource().equals("sse")) {
                            context.output(sseOutputTag, indexBean);
                        } else {
                            context.output(szseOutputTag, indexBean);
                        }
                    }
                });
        DataStream<IndexBean> sseIndexDS = processDS.getSideOutput(sseOutputTag);
        DataStream<IndexBean> szseIndexDS = processDS.getSideOutput(szseOutputTag);

        SingleOutputStreamOperator<String> sseJsonDS = sseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });

        //TODO 5.Sink到Kafka的不同主題(最終到Druid)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.index.topic=index-sse
        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_INDEX_TOPIC, new SimpleStringSchema(), props);
        //szse.index.topic=index-szse
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_INDEX_TOPIC, new SimpleStringSchema(), props);

        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

 

測試

1.啟動zk/kafka/Druid

broker http://node01:8888

middleManager、historical http://node01:8090/console.html

2.准備主題

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic index-sse
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic index-szse

3.創建Druid數據源,讓Druid實時攝取index-sse和index-szse

可以通過webUI配置也可以直接使用postman中配置好的

1612149777526

1612149823110

 

檢查WebUI發現Druid正在實時攝取Kafka數據

http://node01:8090/console.html

1612149843276

 

4.啟動程序

 

5.觀察Kafka中是否有數據

1612149967829

 

如果亂碼,進行如下設置

![1612078753378](../../../../授課/202009-47/今日指數-day06/筆記/筆記-day06.assets/1612078753378.png)

6.查詢Druid中是否有數據(需要等待Druid反應過來或running攝取reset重置一下然后程序flink程序或Druid重啟一下)

http://node01:8888/unified-console.html#query

1612150118537

 

1612150082671

 

 

 

指數行情-分時/分級行情備份

需求

1612078952955

代碼實現-核心業務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.IndexMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 指數分時/分級行情數據業務處理核心任務類--分時行情數據備份到HDFS,前面的計算和分時/分級行情一樣算即可
 */
public class IndexMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備HDFS-Sink
        //之前學習的新版本可以用StreamingFileSink或FileSink,但需要注意支持的hadoop版本為2.7+,而我們項目中用的cdh5.14是2.6
        //所以這里不能使用之前學習的Flink新特性中的Sink,得用老的BucketingSink
        //path為: index.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/index/
        BucketingSink<String> bucketingSink = new BucketingSink<>(QuotConfig.INDEX_SEC_HDFS_PATH);
        //文件大小: hdfs.batch=1073741824
        bucketingSink.setBatchSize(Long.parseLong(QuotConfig.HDFS_BATCH));
        //分桶/分文件夾策略:hdfs.bucketer=yyyyMMdd
        bucketingSink.setBucketer(new DateTimeBucketer(QuotConfig.HDFS_BUCKETER));

        //前綴--不設置會有默認的
        bucketingSink.setInProgressPrefix("index_");
        bucketingSink.setPendingPrefix("index_2_");//掛起狀態的前綴
        //后綴--不設置會有默認的
        bucketingSink.setInProgressSuffix(".txt");
        bucketingSink.setPendingSuffix(".txt");


        //TODO 1.分組
        SingleOutputStreamOperator<IndexBean> processDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口計算:CleanBean-->IndexBean
                .apply(new IndexMinutesWindowFunction())
                //TODO 4.數據轉換拼接
                .map(new IndexPutHDFSMapFunction())
                .addSink(bucketingSink);
    }
}

 

代碼實現-數據拼接

package cn.itcast.function.map;

import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

/**
 * Author itcast
 * Desc 將IndexBean中的字段拼接為String並使用|分隔
 */
public class IndexPutHDFSMapFunction implements MapFunction<IndexBean, String> {
    @Override
    public String map(IndexBean indexBean) throws Exception {
        //獲取分隔符
        String seperator = QuotConfig.HDFS_SEPERATOR;
        //日期轉換
        String tradeDate = DateUtil.longTimestamp2String(indexBean.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);
        //字段拼接
        StringBuilder sb = new StringBuilder();
        sb.append(indexBean.getTradeTime()).append(seperator)
                .append(tradeDate).append(seperator)
                .append(indexBean.getIndexCode()).append(seperator)
                .append(indexBean.getIndexName()).append(seperator)
                .append(indexBean.getPreClosePrice()).append(seperator)
                .append(indexBean.getOpenPrice()).append(seperator)
                .append(indexBean.getHighPrice()).append(seperator)
                .append(indexBean.getLowPrice()).append(seperator)
                .append(indexBean.getClosePrice()).append(seperator)
                .append(indexBean.getTradeVol()).append(seperator)
                .append(indexBean.getTradeAmt()).append(seperator)
                .append(indexBean.getTradeVolDay()).append(seperator)
                .append(indexBean.getTradeAmtDay()).append(seperator)
                .append(indexBean.getSource()).append(seperator);
        //返回拼接結果
        return sb.toString();
    }
}

 

測試

0.啟動zk/kafka

1.啟動程序IndexStreamApplication

2.觀察HDFS

http://node01:50070/explorer.html#/quot_data/dev/index/20210201

3.注意:如果關閉了Checkpoint, 數據沒有刷到HDFS, 可以把batchSize設置小一點

 

指數行情-漲跌/漲跌幅/振幅-作業

課后自己完成作業

需要自定義一個IndexIncreaseBean

然后和個股類似去做即可

把個股中的代碼換成指數就搞定--5分鍾搞定!

 

 

板塊核心業務開發-了解

注意:

業務復雜---聽懂更好 ! 不做強制要求 ! 面試這塊面試官沒做過,問不出來!

實際開發中,該業務是3個開發,耗時3周完成!(包括需求討論,實現,彎路)

我們今天下午半天講完! 所以難度很大!

 

說明

個股行情:每一支股票的行情

指數行情:每一個指數的行情(指數是多支股票的綜合行情)

板塊行情:也是人為的按照不同的分類方式對股票進行的分類

如:按照行業分為行業板塊:石油, 銀行, 文化傳媒, 旅游....

如:按照地區分為地區板塊:南方, 珠三角, 西部,長三角.....

如:按照概念分為概念板塊:區塊鏈, AI板塊, 新能源....

注意:

1.我們項目中做的是行業板塊!

2.我們項目中以滬市為例,做行業板塊(做滬深兩市也行,但是為了降低學習難度,只考慮滬市)

3.板塊是由個股組成!

1612161967109

 

程序入口類

package cn.itcast.app;

import cn.itcast.avro.AvroDeserializeSchema;
import cn.itcast.avro.SseAvro;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.util.QuotUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Properties;

/**
 * Author itcast
 * Desc 板塊實時行情數據處理業務入口類
 */
public class SectorStreamApplication {
    public static void main(String[] args) throws Exception {
        //TODO 0.env-流環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設置環境參數
        //-學習測試為了方便觀察可以設置並行度為1
        env.setParallelism(1);
        //設置使用事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//老版本該API沒過期,新版本過期的不需要設置
        //注意:Checkpoint是Flink容錯機制,上線是開啟,開發測試可以注掉!
        //-Checkpoint
        /*env.enableCheckpointing(5000);//開啟ckp
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/checkpoint"));
        }
        //===========類型2:建議參數===========
        //設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等 500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)
        //如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0
        //設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是  false不是
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是true
        //env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗
        //設置是否清理檢查點,表示 Cancel 時是否需要保留當前的 Checkpoint,默認 Checkpoint會在作業被Cancel時被刪除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========類型3:直接使用默認的即可===============
        //設置checkpoint的執行模式為EXACTLY_ONCE(默認)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //設置checkpoint的超時時間,如果 Checkpoint在 60s內尚未完成說明該次Checkpoint失敗,則丟棄。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鍾
        //設置同一時間有多少個checkpoint可以同時執行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1
        //固定延遲重啟--開發中常用
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 最多重啟3次數
                Time.of(5, TimeUnit.SECONDS) // 重啟時間間隔
        ));
        //上面的設置表示:如果job失敗,重啟3次, 每次間隔5s*/

        //TODO 1.source-kafka的主題sse滬市
        //准備kafka參數
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);//集群地址192.168.52.100:9092
        props.setProperty("group.id", QuotConfig.GROUP_ID);//消費者組名稱
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//動態分區檢測,開一個后台線程每隔5s檢查Kafka的分區狀態
        //sse滬市
        FlinkKafkaConsumer011<SseAvro> sseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SSE_TOPIC), props);//注意新版本直接new FlinkKafkaConsumer,老版本需要指定kafka的版本
        sseKafkaSource.setStartFromEarliest();

        DataStreamSource<SseAvro> sseAvroDS = env.addSource(sseKafkaSource);
        //sseAvroDS.print("滬市>>>");

        //TODO 2.transformation--預處理:過濾/轉換/合並....
        //需要檢查數據正常和時間是在正常交易時間內
        //過濾出滬深兩市的合法數據/過濾掉非法數據
        SingleOutputStreamOperator<SseAvro> sseAvroFiltedDS = sseAvroDS.filter(new FilterFunction<SseAvro>() {
            @Override
            public boolean filter(SseAvro sseAvro) throws Exception {
                //雖然是下面這樣調用的,但其實對時間的判斷是根據配置文件中的配置open.time=00:00和close.time=23:59來的,也就是說數據時間都合法,也是為了方便測試
                if (QuotUtil.checkData(sseAvro) && QuotUtil.checkTime(sseAvro)) {//合法
                    return true;
                } else {//非法
                    return false;
                }
            }
        });

        //sseAvroFiltedDS.print("滬市過濾后的合法數據>>");


        //代碼走到這里說明數據都是過濾后的合法的"冰清玉潔"的數據!就應該將里面的重新板塊數據封裝為CleanBean
        //將滬深兩市里面的AvroBean--->CleanBean
        SingleOutputStreamOperator<CleanBean> sseCleanBeanDS = sseAvroFiltedDS.map(new MapFunction<SseAvro, CleanBean>() {
            @Override
            public CleanBean map(SseAvro sseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean();
                cleanBean.setMdStreamId(sseAvro.getMdStreamID().toString());
                cleanBean.setSecCode(sseAvro.getSecurityID().toString());
                cleanBean.setSecName(sseAvro.getSymbol().toString());
                cleanBean.setTradeVolumn(sseAvro.getTradeVolume());
                cleanBean.setTradeAmt(sseAvro.getTotalValueTraded());
                cleanBean.setPreClosePx(new BigDecimal(sseAvro.getPreClosePx()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setOpenPrice(new BigDecimal(sseAvro.getOpenPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMaxPrice(new BigDecimal(sseAvro.getHighPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMinPrice(new BigDecimal(sseAvro.getLowPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setTradePrice(new BigDecimal(sseAvro.getTradePrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setEventTime(sseAvro.getTimestamp());
                cleanBean.setSource("sse");
                return cleanBean;
            }
        });

        //現在手里拿到的就是滬市過濾清理后的合法的板塊數據(由個股組成)
        SingleOutputStreamOperator<CleanBean> sectorDS = sseCleanBeanDS.filter(new FilterFunction<CleanBean>() {
            @Override
            public boolean filter(CleanBean cleanBean) throws Exception {
                return QuotUtil.isStock(cleanBean);
            }
        });

        //sectorDS.print("滬市的板塊的CleanBean>>");


        //TODO 3.transformation--Watermark
        //開發測試時Watermark可以要可以不要
        //注意:老版本WatermarkAPI和新版本的不一樣
        //Watermark = 當前最大的事件時間 - 最大允許的亂序度
        //Watermark的作用: 觸發窗口計算!
        //Watermark如何觸發: Watermark >= 窗口結束時間時觸發計算 (該窗口里面得有數據)
        env.getConfig().setAutoWatermarkInterval(200);//默認就是200ms,表示每隔多久去給數據添加一次Watermaker,但在新版本中該API已過期
        SingleOutputStreamOperator<CleanBean> watermarkDS = sectorDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<CleanBean>(Time.seconds(2)) {//設置最大亂序度為2s
                    @Override
                    public long extractTimestamp(CleanBean cleanBean) {
                        return cleanBean.getEventTime();//指定事件時間列
                    }
                });
        watermarkDS.print("滬市個股watermarkDS>>");

        //TODO 4.板塊核心業務+Sink....
        //TODO 板塊核心業務1.秒級行情/每5s計算最新的板塊價格行情數據
        //new SectorSecondsTask().process(watermarkDS);
        //TODO 板塊核心業務2.分級行情/每1min計算最新的板塊價格行情數據
        //new SectorMinutesTask().process(watermarkDS);
        //TODO 板塊核心業務3.分級行情備份/每1min備份最新的板塊價格行情數據
        //new SectorMinutesBackupTask().process(watermarkDS);
        //TODO 板塊核心業務4.板塊漲跌幅
        //new SectorIncreaseTask().process(watermarkDS);
        //TODO 板塊核心業務5.K線行情/日K,周K,月K---后面單獨做
        //new SectorKlineTask().process(watermarkDS);

        //TODO 5.execute
        env.execute();
    }
}

1612162537587

板塊行情-秒級行情

代碼實現-核心任務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.function.window.SectorPutHBaseWindowFunction;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockSecondsWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 板塊秒級行情數據業務處理核心任務類
 * 需求: 對滬市的個股數據按照個股代碼進行分組並進行窗口(5s滾動)計算,封裝為StockBean,再按照板塊進行計算封裝為SectorBean,最后將結果寫入到HBase中
 */
public class SectorSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口計算:CleanBean-->StockBean
        .apply(new StockSecondsWindowFunction())//板塊由個股組成,這里復用之前的個股代碼即可
        //TODO 4.匯總/合並
        .timeWindowAll(Time.seconds(5))//把所有轉換好的StockBean放一起,方便后面進行板塊計算
        //TODO 5.窗口計算:StockBean-->SectorBean
        .apply(new SectorWindowFunction())
        //TODO 6.匯總/合並
        .timeWindowAll(Time.seconds(5))//把各個板塊的數據匯總到一起,方便轉為List<Put>進行HBase批量插入
        //TODO 7.封裝為List<Put>
        .apply(new SectorPutHBaseWindowFunction())
        //TODO 8.Sink到HBase #板塊HBase表名:sector.hbase.table.name=quot_sector
        .addSink(new HBaseSink(QuotConfig.SECTOR_HBASE_TABLE_NAME));
    }
}

 

代碼實現-個股窗口

復用之前的個股代碼即可
StockSecondsWindowFunction

代碼實現-板塊窗口-這里不一樣

1612163830478

注意:

看上去好像是把左邊的數據取出來封裝到右邊即可

但是實際上:右邊的很多字段需要計算!

也就是說要找到該個股對應的板塊, 把該個股算到該板塊下去

也就是按照個股所屬的板塊進行聚合計算!

那么問題是: 該個股屬於哪個板塊? 在哪里查?---MySQL中有從Hive同步過來的(三方提供的)板塊個股對應關系表!

1612164046055

1612164082125

package cn.itcast.function.window;

import cn.itcast.bean.SectorBean;
import cn.itcast.bean.StockBean;
import cn.itcast.util.DBUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 板塊業務數據處理窗口函數/類
 * 需求:將StockBean轉為-->SectorBean
 * 注意:
 * 1.個股和板塊的對應關系表在MySQL中
 * 2.板塊中的很多字段需要根據該板塊下的個股的數據進行重新計算得到---復雜
 * 當前板塊開盤價格 = 板塊前收盤價*當前板塊以開盤價計算的各個股總流通市值  即 累計各個股開盤流通市值/當前板塊前一交易日板塊總流通市值
 * 當前板塊當前價格 = 板塊前收盤價*當前板塊以收盤價計算的各個股總流通市值  即 累計各個股收盤流通市值/當前板塊前一交易日板塊總流通市值
 */
public class SectorWindowFunction extends RichAllWindowFunction<StockBean, SectorBean, TimeWindow> {
    //TODO 0.准備一些變量、集合、狀態方便后續使用
    //定義基准價(作為上市首日的前一交易日收盤價)
    BigDecimal basePrice = new BigDecimal(1000);

    //定義集合用來存放MySQL表中的數據
    //Map<板塊代碼, List<Map<字段名,值>>>
    Map<String, List<Map<String,Object>>> sectorStockMap = null;

    //Map<板塊代碼,Map<字段名,值>>
    Map<String,Map<String,Object>> sectorKlineMap = null;

    //定義一個狀態用來存儲上一個窗口的板塊數據
    //MapState<板塊代碼,SectorBean>
    MapState<String,SectorBean> sectorState = null;

    //TODO 0.初始化數據
    @Override
    public void open(Configuration parameters) throws Exception {
        //查MySQL-bdp_sector_stock表中的數據封裝到sectorStockMap中
        String sql = "SELECT * FROM bdp_sector_stock WHERE sec_abbr = 'ss'";
        //執行sql可以使用原始JDBC代碼,這里直接使用封裝好的工具類
        sectorStockMap = DBUtil.queryForGroup("sector_code", sql);

        //查MySQL-bdp_quot_sector_kline_day板塊日K表--沒有數據沒關系,后續會計算存進去 --為了取前收盤價
        //# 查詢前一個交易日的板塊日K數據
        String sql2 = "SELECT * FROM bdp_quot_sector_kline_day WHERE trade_date = (SELECT last_trade_date FROM tcc_date where trade_date = CURDATE())";
        sectorKlineMap = DBUtil.query("sector_code", sql2);

        MapStateDescriptor<String, SectorBean> stateDescriptor = new MapStateDescriptor<>("sectorState", String.class, SectorBean.class);
        sectorState = getRuntimeContext().getMapState(stateDescriptor);
    }

    @Override
    public void apply(TimeWindow timeWindow, Iterable<StockBean> iterable, Collector<SectorBean> collector) throws Exception {
        //TODO 1.遍歷該窗口中的個股並緩存到新建的Map中
        Map<String,StockBean> cacheStockMap = new HashMap<>();
        for (StockBean stockBean : iterable) {
            cacheStockMap.put(stockBean.getSecCode(),stockBean);
        }

        //TODO 2.遍歷板塊對應關系表中的個股並獲取板塊下的個股
        //Map<板塊代碼, List<Map<字段名,值>>>
        //Map<String, List<Map<String,Object>>> sectorStockMap = 已經在open中初始化了;
        for (String sectorCode : sectorStockMap.keySet()) {
            //獲取該板塊代碼對應的個股
            //List表示該板塊下的個股組成的集合,里面的一個map就一個個股
            List<Map<String, Object>> listStock = sectorStockMap.get(sectorCode);

            //TODO 3.初始化要返回的SectorBean中需要的數據,后面計算再賦值
            //下面的都是返回的SectorBean中需要用到的
            Long eventTime = 0L;//ok
            String sectorName = null; //ok
            BigDecimal preClosePrice = new BigDecimal(0);//ok
            BigDecimal highPrice = new BigDecimal(0);//ok
            BigDecimal openPrice = new BigDecimal(0);//ok
            BigDecimal lowPrice = new BigDecimal(0);//ok
            BigDecimal closePrice = new BigDecimal(0);//ok
            Long tradeVol = 0L; //分時成交量 //ok
            Long tradeAmt = 0L; //分時成交額 //ok
            Long tradeVolDay = 0L; //日成交總量 //ok
            Long tradeAmtDay = 0L; //日成交總額 //ok
            Long tradeTime = 0L;//ok

            //下面的都是后面計算需要用到的
            //當前板塊以開盤價計算的總流通市值 也就是 累計各個股開盤流通市值
            BigDecimal stockOpenTotalNegoCap = new BigDecimal(0);//ok
            //當前板塊以收盤價計算的總流通市值 也就是 累計各個股收盤流通市值
            BigDecimal stockCloseTotalNegoCap = new BigDecimal(0);//ok
            //前一日板塊總流通市值
            BigDecimal preSectorNegoCap = new BigDecimal(0); //ok

            //TODO 4.遍歷該板塊下的各個個股,並獲取每個個股的所屬板塊名稱,個股代碼,個股流通股本,前一交易日板塊流通總市值
            for (Map<String, Object> map : listStock) {
                sectorName = map.get("sector_name").toString();//板塊名稱
                String sec_code = map.get("sec_code").toString();//個股代碼
                BigDecimal negoCap = new BigDecimal(map.get("nego_cap").toString());//個股流通股本
                preSectorNegoCap = new BigDecimal(map.get("pre_sector_nego_cap").toString());//前一交易日板塊流通總市值

                StockBean stockBean = cacheStockMap.get(sec_code);//根據個股代碼從當前窗口數據獲取對應的個股信息
                if(stockBean != null){//當前窗口中有該個股,且屬於該板塊
                    eventTime = stockBean.getEventTime();
                    tradeTime = stockBean.getTradeTime();

                    //TODO 5.計算業務字段:當前板塊以開盤價計算的總流通市值和當前板塊以收盤價計算的總流通市值
                    //當前板塊以開盤價計算的總流通市值 也就是 累計各個股開盤流通市值 = sum(板塊下的各個個股的開盤價 * 各個個股流通股本)
                    BigDecimal stockOpenValue = stockBean.getOpenPrice().multiply(negoCap).setScale(2, RoundingMode.HALF_UP);
                    stockOpenTotalNegoCap.add(stockOpenValue);
                    //當前板塊以收盤價計算的總流通市值 也就是 累計各個股收盤流通市值 = sum(板塊下的各個個股的收盤價 * 各個個股流通股本)
                    BigDecimal stockCloseValue = stockBean.getClosePrice().multiply(negoCap).setScale(2, RoundingMode.HALF_UP);
                    stockCloseTotalNegoCap.add(stockCloseValue);

                    //TODO 6.板塊下各個個股的累計成交量/成交金額
                    //板塊下各個個股的累計成交量
                    tradeVolDay += stockBean.getTradeVolDay();
                    //板塊下各個個股的累計成交金額
                    tradeAmtDay += stockBean.getTradeAmtDay();
                }
            }
            //TODO 7.是否是首日上市
            if(sectorKlineMap != null && sectorKlineMap.get(sectorCode) != null){
                //非首日上市
                //板塊前收盤價 = 板塊日K中的收盤價(注意板塊日k查詢的就是前一個交易日的k線,所以板塊前收盤價就等於該日k中的收盤價)
                Map<String, Object> kmap = sectorKlineMap.get(sectorCode);
                preClosePrice = new BigDecimal(kmap.get("close_price").toString());
            }else{
                //首日上市
                //板塊前收盤價 = 基准價
                preClosePrice = basePrice;
            }

            //TODO 8.計算板塊開盤價和收盤價/當前價
            //下面的公式不需要背
            //當前板塊開盤價格 = 板塊前收盤價*當前板塊以開盤價計算的各個股總流通市值  即 累計各個股開盤流通市值/當前板塊前一交易日板塊總流通市值
            openPrice = preClosePrice.multiply(stockOpenTotalNegoCap).divide(preSectorNegoCap,2, RoundingMode.HALF_UP).setScale(2, RoundingMode.HALF_UP);
            //當前板塊收盤價或當前價格 = 板塊前收盤價*當前板塊以收盤價計算的各個股總流通市值  即 累計各個股收盤流通市值/當前板塊前一交易日板塊總流通市值
            closePrice = preClosePrice.multiply(stockCloseTotalNegoCap).divide(preSectorNegoCap,2, RoundingMode.HALF_UP).setScale(2, RoundingMode.HALF_UP);


            //TODO 9.計算板塊最高最低價
            //先讓高低都賦值為最新價
            highPrice = closePrice;//如果不是最后一條,closePrice其實就是最新價
            lowPrice = closePrice;//如果不是最后一條,closePrice其實就是最新價

            //從狀態中獲取之前的高低進行比較,算出最終的高低
            SectorBean lastSectorBean = sectorState.get(sectorCode);
            if(lastSectorBean != null){
                BigDecimal lastHighPrice = lastSectorBean.getHighPrice();
                BigDecimal lastLowPrice = lastSectorBean.getLowPrice();
                Long lastTradeVol = lastSectorBean.getTradeVol();
                Long lastTradeAmt = lastSectorBean.getTradeAmt();
                
                if(lastHighPrice.compareTo(highPrice) == 1) {//lastHighPrice>highPrice
                    highPrice = lastHighPrice;
                }

                if (lastLowPrice.compareTo(lowPrice) == -1) {//lastLowPrice<lowPrice
                    lowPrice = lastLowPrice;
                }

                //TODO 10.分時成交量/成交金額
                tradeVol = tradeVolDay - lastTradeVol;
                tradeAmt = tradeVolDay - lastTradeAmt;

                //TODO 11.注意:嚴謹一點: 高低還和開盤價進行比較,因為之前的高低只是和上一窗口比的
                if(openPrice.compareTo(highPrice) == 1){
                    highPrice = openPrice;
                }
                if(openPrice.compareTo(lowPrice) == -1){
                    lowPrice = openPrice;
                }
            }/*else {
                //使用默認值,第一個窗口數據默認值沒事
            }*/

            //結束后上面的字段值就都有了
            //TODO 12.封裝並返回結果
            SectorBean sectorBean = new SectorBean();
            sectorBean.setEventTime(eventTime);
            sectorBean.setSectorCode(sectorCode);
            sectorBean.setSectorName(sectorName);

            sectorBean.setPreClosePrice(preClosePrice);
            sectorBean.setHighPrice(highPrice);
            sectorBean.setOpenPrice(openPrice);
            sectorBean.setLowPrice(lowPrice);
            sectorBean.setClosePrice(closePrice);

            sectorBean.setTradeVol(tradeVol);
            sectorBean.setTradeAmt(tradeAmt);
            sectorBean.setTradeVolDay(tradeVolDay);
            sectorBean.setTradeAmtDay(tradeAmtDay);

            sectorBean.setTradeTime(tradeTime);

            collector.collect(sectorBean);

            //TODO 13.更新狀態
            sectorState.put(sectorCode,sectorBean);//最后更新的SectorBean里面就封裝了最新的高低價
        }
    }
}

 

 

代碼實現-數據封裝為List< Put >

package cn.itcast.function.window;

import cn.itcast.bean.SectorBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;

import java.util.ArrayList;
import java.util.List;

/**
 * Author itcast
 * Desc 將SectorBean-->List<Put>
 */
public class SectorPutHBaseWindowFunction implements AllWindowFunction<SectorBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<SectorBean> iterable, Collector<List<Put>> collector) throws Exception {
        List<Put> list = new ArrayList<>();
        for (SectorBean sectorBean : iterable) {
            String rowKey = sectorBean.getSectorCode() + sectorBean.getTradeTime();
            Put put = new Put(rowKey.getBytes());
            String jsonStr = JSON.toJSONString(sectorBean);
            put.addColumn("info".getBytes(),"data".getBytes(),jsonStr.getBytes());
            list.add(put);
        }
        collector.collect(list);
    }
}


 

代碼實現-Sink到HBase

//TODO 8.Sink到HBase #板塊HBase表名:sector.hbase.table.name=quot_sector
        .addSink(new HBaseSink(QuotConfig.SECTOR_HBASE_TABLE_NAME));

 

測試

1.zk/kafka/hbase啟動

2.啟動程序SectorStreamApplication

3.登錄hbase查看數據

bin/hbase shell
list
scan "quot_sector",{LIMIT=>10}

1612173892675

 

板塊行情-分時行情

需求

按照個股代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理轉為StockBean, 再轉SectorBean,並將結果寫入到Kafka的主題,最終由Druid進行攝取並查詢

1612062831573

 

代碼實現-核心任務類

只需要寫這里,剩下的都可以復用

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.SectorBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Author itcast
 * Desc 板塊行情核心任務類-板塊分時行情
 * 按照個股代碼進行分組 每隔1min/60s划分一個窗口對數據進行處理轉為StockBean,
 * 再轉SectorBean,並將結果寫入到Kafka的主題,最終由Druid進行攝取並查詢
 */
public class SectorMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准備Kafka-Sink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.sector.topic=sector-sse
        FlinkKafkaProducer011<String> kafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_SECTOR_TOPIC, new SimpleStringSchema(), props);

        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口計算CleanBean-->StockBean--復用之前的個股分時行情代碼
                .apply(new StockMinutesWindowFunction())
                //TODO 4.數據合並
                .timeWindowAll(Time.minutes(1))
                //TODO 5.窗口計算StockBean-->SectorBean--復用之前的板塊秒級行情代碼
                .apply(new SectorWindowFunction())
                //TODO 6.轉為json再Sink到Kafka
                .map(new MapFunction<SectorBean, String>() {
                    @Override
                    public String map(SectorBean sectorBean) throws Exception {
                        return JSON.toJSONString(sectorBean);
                    }
                }).addSink(kafkaSink);

    }
}

 

測試

0.啟動zk/kafka/druid

1.准備kafka主題

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic sector-sse

 

2.讓druid攝取kafka

1612316953635

http://node01:8090/console.html

1612316968840

3.啟動程序SectorStreamApplication

4.觀察kafka

1612317033791

5.觀察druid

http://node01:8090/console.html

1612317139608

http://node01:8888/unified-console.html#query

1612317121625

 

 

板塊行情-分時行情-備份

需求

1612078952955

 

代碼實現-核心業務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.map.SectorPutHDFSMapFunction;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 板塊核心業務類-分時行情備份
 */
public class SectorMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.HDFS-Sink
        //sector.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/sector/
        BucketingSink<String> bucketSink = new BucketingSink<>(QuotConfig.SECTOR_SEC_HDFS_PATH);//路徑
        //bucketSink.setBatchSize(Integer.parseInt(QuotConfig.HDFS_BATCH));//批大小1073741824
        bucketSink.setBatchSize(16384L);//批大小:注意:如果設置太大,那么緩沖塊的大小也很大,數據較少時,達不到緩沖塊大小則不會flush到HDFS,所以這里把大小設置小一點,那么緩沖塊也就變下了
        bucketSink.setBucketer(new DateTimeBucketer<>(QuotConfig.HDFS_BUCKETER));//分桶規則hdfs.bucketer=yyyyMMdd
        //前綴后綴
        bucketSink.setPendingPrefix("sector2-");
        bucketSink.setInProgressPrefix("sector-");
        bucketSink.setPendingSuffix(".txt");
        bucketSink.setInProgressSuffix(".txt");

        //TODO 1.分組
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.minutes(1))
        //TODO 3.窗口計算CleanBean-->StockBean
        .apply(new StockMinutesWindowFunction())
        //TODO 4.數據合並
        .timeWindowAll(Time.minutes(1))
        //TODO 5.窗口計算StockBean-->SectorBean
        .apply(new SectorWindowFunction())
        //TODO 6.數據拼接為字符串
        .map(new SectorPutHDFSMapFunction())
        //TODO 7.數據Sink到HDFS
        .addSink(bucketSink);
    }
}

 

代碼實現-數據拼接

package cn.itcast.function.map;

import cn.itcast.bean.SectorBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

import java.sql.Timestamp;
import java.util.Date;

/**
 * Author itcast
 * Desc 將SectorBean-->String
 */
public class SectorPutHDFSMapFunction implements MapFunction<SectorBean, String> {
    String sp = QuotConfig.HDFS_SEPERATOR;
    @Override
    public String map(SectorBean value) throws Exception {
        String tradeDate = DateUtil.longTimestamp2String(value.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);
        StringBuilder sb = new StringBuilder();
        sb.append(new Timestamp(new Date().getTime())).append(sp)
                .append(tradeDate).append(sp)
                .append(value.getSectorCode()).append(sp)
                .append(value.getSectorName()).append(sp)
                .append(value.getPreClosePrice()).append(sp)
                .append(value.getOpenPrice()).append(sp)
                .append(value.getHighPrice()).append(sp)
                .append(value.getLowPrice()).append(sp)
                .append(value.getClosePrice()).append(sp)
                .append(value.getTradeVol()).append(sp)
                .append(value.getTradeAmt()).append(sp)
                .append(value.getTradeVolDay()).append(sp)
                .append(value.getTradeAmtDay());
        System.out.println("數據已經轉換拼接為:"+sb.toString());
        return sb.toString();
    }
}


 

測試

0.啟動zk/kafka

1.啟動程序SectorStreamApplication

2.觀察HDFS

http://node01:50070/explorer.html#/quot_data/dev/sector/20210203

 

 

K線業務-掌握/重點

盡量掌握

需求

如下圖:要對個股/指數/板塊求K線,並將結果存放到MySQL

注意:

我們項目中K線分為日K/周K/月K,數據量不大,當然也有時K

日K/周K/月K一起做! 就是不再拆分代碼去做了, 因為再拆的話,要分別實現如下代碼,太麻煩

所以把日K/周K/月K一起做,但區分個股,指數,板塊,那就意味着寫3個task即可

1612320289298

1612320043879

 

 

個股K線

1612321801185

 

代碼實現-核心任務類

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.KlineType;
import cn.itcast.function.map.StockKlineMapFunction;
import cn.itcast.function.sink.MySQLSink;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 個股K線行情核心任務類
 * 要完成個股的日K,周K,月K的數據計算並Sink到MySQL
 * 注意:日K最終1天就1條,周K最終1周就1條,月K最終1月就1條
 */
public class StockKlineTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分組
        DataStream<StockBean> stockBeanDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.划分窗口(1min更新)
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口計算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction());

        //TODO 4.StockBean-->K線Row(搞個Bean封裝也行,但這里用Row更方便)並Sink到MySQL
        //先准備一條日K,周K,月K的sql(sql應該是replace into 表示有則更新,沒有則插入),看sql中需要哪些字段
        String sql = "REPLACE INTO %s values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //#個股日k
        //mysql.stock.sql.day.table=bdp_quot_stock_kline_day
        //#個股周k
        //mysql.stock.sql.week.table=bdp_quot_stock_kline_week
        //#個股月k
        //mysql.stock.sql.month.table=bdp_quot_stock_kline_month

        //日K:StockKlineMapFunction將StockBean-->K線Row
        stockBeanDS.map(new StockKlineMapFunction(KlineType.DAY_K.getType(),KlineType.DAY_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分組
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_DAY_TABLE)));//Sink到MySQL
        //周K
        stockBeanDS.map(new StockKlineMapFunction(KlineType.WEEK_K.getType(),KlineType.WEEK_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分組
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_WEEK_TABLE)));//Sink到MySQL
        //月K
        stockBeanDS.map(new StockKlineMapFunction(KlineType.MONTH_K.getType(),KlineType.MONTH_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分組
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_MONTH_TABLE)));//Sink到MySQL

    }
}

 

代碼實現-數據轉換類-轉為KLine

package cn.itcast.function.map;

import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DBUtil;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Map;

/**
 * Author itcast
 * Desc
 */
public class StockKlineMapFunction extends RichMapFunction<StockBean, Row> {
    //TODO 0.定義變量
    private String type;//K線類型,1是日K,2是周K,3是月K
    private String firstTxDateTypeName;//周期內首個交易日期字段名

    private String firstTxDate;//周期內首個交易日期值
    private String tradeDate;//當天日期
    //Map<股票代碼, Map<字段名稱,字段值>>
    private Map<String, Map<String, Object>> aggMap;

    //TODO 1.初始化變量
    public StockKlineMapFunction(String type, String firstTxDateTypeName) {
        this.type = type;
        this.firstTxDateTypeName = firstTxDateTypeName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //查詢交易日歷表
        String sql = "select * from tcc_date where trade_date = CURDATE()";
        Map<String, String> tradeDateMap = DBUtil.queryKv(sql);
        firstTxDate = tradeDateMap.get(firstTxDateTypeName);//根據字段名獲取周期內首個交易日
        tradeDate = tradeDateMap.get("trade_date");//獲取當天日期
        //初始化aggMap,匯總高,低,成交量,成交金額
/*
select sec_code,
max(high_price) as high_price,
min(low_price) as low_price,
sum(trade_vol) as trade_vol,
sum(trade_amt) as trade_amt
from bdp_quot_stock_kline_day
where trade_date between firstTxDate and tradeDate
group by sec_code
 */
        String aggSQL = "select sec_code,\n" +
                "max(high_price) as high_price,\n" +
                "min(low_price) as low_price,\n" +
                "sum(trade_vol) as trade_vol,\n" +
                "sum(trade_amt) as trade_amt\n" +
                "from bdp_quot_stock_kline_day\n" +
                "where trade_date between " + firstTxDate + " and " + tradeDate + "\n" +
                "group by sec_code";

        aggMap = DBUtil.query("sec_code", aggSQL);

    }

    //TODO 3.轉換StockBean-->ROW
    @Override
    public Row map(StockBean stockBean) throws Exception {
        //獲取需要的數據
        Long eventTime = stockBean.getEventTime();
        String secCode = stockBean.getSecCode();
        String secName = stockBean.getSecName();
        BigDecimal preClosePrice = stockBean.getPreClosePrice();
        BigDecimal openPrice = stockBean.getOpenPrice();
        BigDecimal highPrice = stockBean.getHighPrice();
        BigDecimal lowPrice = stockBean.getLowPrice();
        BigDecimal closePrice = stockBean.getClosePrice();
        Long tradeVolDay = stockBean.getTradeVolDay();
        Long tradeAmtDay = stockBean.getTradeAmtDay();
        BigDecimal avgPrice = new BigDecimal(0);

        //均價 = 成交金額/成交量
        if (tradeVolDay != 0) {
            avgPrice = new BigDecimal(tradeAmtDay).divide(new BigDecimal(tradeVolDay), 2, RoundingMode.HALF_UP);
        }

        //將當前日期和周期內首個交易日期轉換為long
        Long tradeDateTime = DateUtil.stringToLong(tradeDate, DateFormatConstant.format_yyyy_mm_dd);
        Long firstTxDateTime = DateUtil.stringToLong(firstTxDate, DateFormatConstant.format_yyyy_mm_dd);
        //如果是日K,那么前收,高,開,低,收,成交量,成交金額,就直接存入日K表,因為stockBean中的數據就是當天最新的數據,直接更新MySQL日K表即可
        //如果是周K或月K,那么需要把stockBean中的數據和aggMap中的進行合並!
        if (firstTxDateTime < tradeDateTime && (type.equals(2) || type.equals(3))) {
            //表示進來的是周K或月K
            Map<String, Object> map = aggMap.get(secCode);
            if (map != null && map.size() > 0) {
                BigDecimal lastHighPrice = new BigDecimal(map.get("high_price").toString());
                BigDecimal lastLowPrice = new BigDecimal(map.get("low_price").toString());
                Long lastTradeVol = Long.parseLong(map.get("trade_vol").toString());
                Long lastTradeAmt = Long.parseLong(map.get("trade_amt").toString());

                //比較高低
                if (lastHighPrice.compareTo(highPrice) == 1) {
                    highPrice = lastHighPrice;
                }
                if (lastLowPrice.compareTo(lowPrice) == -1) {
                    lowPrice = lastLowPrice;
                }

                //累加成交量成交金額
                tradeVolDay += lastTradeVol;
                tradeAmtDay += lastTradeAmt;

                //更新aggMap--這里不更新也沒事,因為后續可以使用JavaWeb計算做定時任務修正周K和月K
                /*
                map.put("high_price",highPrice);
                map.put("low_price",lowPrice);
                map.put("trade_vol",tradeVolDay);
                map.put("trade_amt",tradeAmtDay);
                */

                //計算均價
                //均價 = 成交金額/成交量
                if (tradeVolDay != 0) {
                    avgPrice = new BigDecimal(tradeAmtDay).divide(new BigDecimal(tradeVolDay), 2, RoundingMode.HALF_UP);
                }
            }
        }/*else {
            //是日K,日K直接使用stockBean中的數據
        }*/


        Row row = new Row(13);
        row.setField(0, new Timestamp(new Date().getTime()));
        row.setField(1, tradeDate);
        row.setField(2, secCode);
        row.setField(3, secName);
        row.setField(4, type);
        row.setField(5, preClosePrice);
        row.setField(6, openPrice);
        row.setField(7, highPrice);
        row.setField(8, lowPrice);
        row.setField(9, closePrice);
        row.setField(10, avgPrice);
        row.setField(11, tradeVolDay);
        row.setField(12, tradeAmtDay);

        return row;
    }
}

 

代碼實現-MySQLSink

package cn.itcast.function.sink;

import cn.itcast.util.DBUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;

import java.sql.Connection;
import java.sql.PreparedStatement;

/**
 * Author itcast
 * Desc
 */
public class MySQLSink extends RichSinkFunction<Row> {
    //1.定義變量並初始化
    private String sqlWithName;
    private Connection conn;
    private PreparedStatement ps;

    public MySQLSink(String sqlWithName) {
        //sqlWithName = "REPLACE INTO 表名 values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //拼接好了表名的sql,但是參數需要賦值
        this.sqlWithName = sqlWithName;
    }

    //2.開啟資源
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DBUtil.getConnByJdbc();
        ps = conn.prepareStatement(sqlWithName);
    }

    //3.設置?占位符參數並執行
    @Override
    public void invoke(Row row, Context context) throws Exception {
        int length = row.getArity();//獲取字段長度,長度就是13
        //設置參數
        for (int i = 0; i < length; i++) {//i的范圍:[0,13)===>[0,12]===>正好就是索引的范圍
            Object value = row.getField(i);
            ps.setObject(i+1,value);//注意:row的索引從0開始,jdbc的?占位符從1開始
        }
        System.out.println("K線SQL已執行,類型為" + row.getField(4));
        //執行sql
        ps.executeUpdate();

    }

    //4.關閉資源
    @Override
    public void close() throws Exception {
        if(conn!=null) conn.close();
        if(ps!=null) ps.close();
    }
}

 

測試

1.啟動程序StockStreamApplication

2.觀察控制台輸出

1612335530286

3.觀察MySQL中的K線表

1612335638996

1612335730207

1612335760970

 

 

指數K線

代碼實現

使用個股的進行修改

個股-->指數

Stock-->Index

stock-->index

STOCK-->INDEX

sec_code-->index_code

getSecCode-->getIndexCode

getSecName-->getIndexName

 

測試

運行IndexStreamApplication

觀察

1612336792532

 

板塊K線

代碼實現

修改代碼

個股-->板塊

Stock-->Sector

....

 

注意:

 //TODO 3.窗口計算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                .timeWindowAll(Time.minutes(1))
                .apply(new SectorWindowFunction());

 

 

 

測試

啟動SectorStreamApplication

觀察

1612337593560

 

 

監控預警業務

需求

1612338711268

前面已經做了個股/指數/板塊3大核心業務的各個子業務:秒級行情/分時行情/分時行情備份/漲跌,漲跌幅,振幅/日K,周K,月K--K線業務...(課后把所有的個股全部搞定,其他的就簡單了,板塊秒級行情作為擴展!)

那么接下來我們要做圖中的綠色部分,也就是股票的監控預警業務(包括實時和離線,主要是實時監控預警)

而實時預警需要借助FlinkCEP去實現

 

技術准備-FlinkCEP介紹

FlinkCEP概述

Complex Event Processing(CEP)是 Flink 提供的一個非常亮眼的功能,是Flink提供的復雜事件處理(CEP)庫,使用它可以在無界的事件流中檢測事件模式,讓我們可以掌握數據中重要的事項。並允許指定要在流中檢測的模式,然后檢測匹配事件序列並對其進行操作。

 

說人話:

==FlinkCEP = 實時數據流(無界流) + 用戶指定的規則 + 匹配到之后的處理/輸出方式==

就是使用FlinkCEP技術對 實時流數據 做規則匹配(規則可以由用戶指定) , 匹配到之后可以進行相應的處理或輸出!

如下圖:

實時數據流中有很多不同形狀的數據, 然后通過FlinkCEP指定了匹配規則,為正方形和圓形, 那么最后匹配到的結果輸出就為結果流所示,

有點類似於Filter! 但是比Filter更強大!

1612339250855

 

FlinkCEP的應用場景

1.實時股票曲線預測

2.網站惡意攻擊登陸行為

3.電子商務實時營銷,對用戶行為實時分析

4.滴滴打車異常行為檢測

5.物流訂單實時追蹤

6.網絡欺詐

7.故障檢測

8.風險規避

9.智能營銷等領域

==綜上,FlinkCEP主要用在實時風控業務!==

 

后面會通過案例和項目中的代碼實現來演示!

 

FlinkCEP優缺點

  • l 優勢:

繼承了 Flink 高吞吐的特點

查詢是靜態的,數據是動態的,滿足實現和連續查詢的需求

擅長解決跨事件的匹配

API友好

  • l 劣勢:

本身無法做的直接動態更新規則(痛點),需要借助其他技術才可以動態注入或更新規則

但是后續項目中可以通過Redis來實現動態規則更新

 

FlinkCEP在流式開發中的位置

FlinkCEP 可以無縫嵌入到Flink流式程序中

1612339738109

技術准備-案例

編碼步驟

==FlinkCEP = 實時數據流(無界流) + 用戶指定的規則 + 匹配到之后的處理/輸出方式==

1612340201861

新建module

1612340284751

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>quot_47</artifactId>
        <groupId>cn.itcast</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>test_flinkcep</artifactId>
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <properties>
        <jedis.version>3.0.1</jedis.version>
        <mysql.version>5.1.44</mysql.version>
        <avatica.version>1.10.0</avatica.version>
        <druid.version>1.1.9</druid.version>
        <scala.version>2.11</scala.version>
        <log4j.version>2.11.0</log4j.version>
        <cdh.version>cdh5.14.0</cdh.version>
        <hadoop.version>2.6.0</hadoop.version>
        <hbase.version>1.2.0</hbase.version>
        <kafka.version>0.11.0.2</kafka.version>
        <fastjson.version>1.2.44</fastjson.version>
        <flink.version>1.7.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cep_2.11</artifactId>
             <version>1.10.0</version>
         </dependency>-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

准備bean和util

從資料中拷貝如下代碼

今日指數-課程資料\4.資料\骨架代碼\quot_bak_new\test_flinkcep\src\main\java\cn\itcast

1612340440385

 

案例1-量詞

需求:

識別惡意用戶

用戶如果在10s內,輸入了TMD 5次,就認為用戶為惡意攻擊,識別出該用戶

如果不使用FlinkCEP,那么實現起來較為麻煩,得記錄用戶輸出TMD多少次,得用狀態

使用 Flink CEP量詞模式很簡單就可以搞定

package cn.itcast.cep;

import cn.itcast.bean.Message;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 識別惡意用戶
 * 用戶如果在10s內,輸入了TMD 5次,就認為用戶為惡意攻擊,識別出該用戶
 * 如果不使用FlinkCEP,那么實現起來較為麻煩,得記錄用戶輸出TMD多少次,得用狀態
 * 使用 Flink CEP量詞模式很簡單就可以搞定
 */
public class Demo01_MaliceUser {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.source
        SingleOutputStreamOperator<Message> sourceDS = env.fromCollection(Arrays.asList(
                new Message("1", "TMD", 1558430842000L),//2019-05-21 17:27:22
                new Message("1", "TMD", 1558430843000L),//2019-05-21 17:27:23
                new Message("1", "TMD", 1558430845000L),//2019-05-21 17:27:25
                new Message("1", "TMD", 1558430850000L),//2019-05-21 17:27:30
                new Message("1", "TMD", 1558430851000L),//2019-05-21 17:27:30
                new Message("2", "TMD", 1558430851000L),//2019-05-21 17:27:31
                new Message("1", "TMD", 1558430852000L)//2019-05-21 17:27:32
        )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Message>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(Message element) {
                return element.getEventTime();
            }
        });

        //3.transformation
        //識別惡意用戶:10s內,輸入了TMD 5次
        //TODO 定義規則/模式
        //起了一個名字叫start
        Pattern<Message, Message> pattern = Pattern.<Message>begin("start").where(new SimpleCondition<Message>() {
            @Override
            public boolean filter(Message message) throws Exception {
                if (message.getMsg().equals("TMD")) {//輸入TMD
                    return true;
                } else {
                    return false;
                }
            }
        }).times(5)//5次
         .within(Time.seconds(10));//10s內

        //TODO 將規則/模式應用到流上
        //注意:要按照用戶id進行分組,因為要的是針對某個用戶的10s內,輸入了TMD 5次
        PatternStream<Message> patternDS = CEP.pattern(sourceDS.keyBy(Message::getId), pattern);

        //TODO 處理/獲取/輸出符合規則/模式的數據
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<Message, Object>() {
            @Override
            public Object select(Map<String, List<Message>> map) throws Exception {
                List<Message> list = map.get("start");//獲取符合start模式的所有消息
                return list;
            }
        });
        //4.sink
        resultDS.print("被FlinkCEP規則/模式匹配到的惡意用戶的詳細消息信息");

        //5.execute
        env.execute();

    }
}

 

 

 

案例2-3-組合

需求:

識別2秒內連續登錄失敗用戶

有一個業務系統,用戶要使用該業務系統必須要先登陸

過濾出來在2秒內連續登陸失敗的用戶

 

next:嚴格連續,必須是緊接着

followBy:非嚴格連續,可以不是緊接着

package cn.itcast.cep;

import cn.itcast.bean.LoginUser;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 識別2秒內連續登錄失敗用戶
 * 有一個業務系統,用戶要使用該業務系統必須要先登陸
 * 過濾出來在2秒內連續登陸失敗的用戶
 */
public class Demo02_03_LoginFail {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.source
        SingleOutputStreamOperator<LoginUser> sourceDS = env.fromCollection(Arrays.asList(
                new LoginUser(1, "192.168.0.1", "fail", 1558430842000L),    //2019-05-21 17:27:22
                new LoginUser(1, "192.168.0.2", "success", 1558430843000L),    //2019-05-21 17:27:23
                new LoginUser(1, "192.168.0.3", "fail", 1558430843000L),    //2019-05-21 17:27:24
                new LoginUser(2, "192.168.10.10", "success", 1558430845000L)//2019-05-21 17:27:25
        )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginUser>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(LoginUser element) {
                return element.getEventTime();
            }
        });

        //3.transformation
        //識別出在2秒內連續登陸失敗的用戶
        //TODO 定義規則/模式
        Pattern<LoginUser, LoginUser> pattern = Pattern.<LoginUser>begin("xx").where(new SimpleCondition<LoginUser>() {
            @Override
            public boolean filter(LoginUser loginUser) throws Exception {
                return loginUser.getStatus().equals("fail");
            }
        })
        //注意:下面這樣寫沒有連續的意思
        /*.times(2)
        .within(Time.seconds(2))*/
        .next("next")//緊接着下一次還是失敗,也就是說 fail fail fail 是匹配的, 而fail success fail 是不匹配的
        //.followedBy("next")//后面還有一次失敗即可,也就是說 fail fail fail 是匹配的, 而fail success fail 是匹配的
        .where(new SimpleCondition<LoginUser>() {
            @Override
            public boolean filter(LoginUser loginUser) throws Exception {
                return loginUser.getStatus().equals("fail");
            }
        }).within(Time.seconds(2));


        //TODO 將規則/模式應用到流上
        //注意:要按照用戶id進行分組,因為要的是針對某個用戶的10s內,輸入了TMD 5次
        PatternStream<LoginUser> patternDS = CEP.pattern(sourceDS.keyBy(LoginUser::getUserId), pattern);

        //TODO 處理/獲取/輸出符合規則/模式的數據
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<LoginUser, Object>() {
            @Override
            public Object select(Map<String, List<LoginUser>> map) throws Exception {
                List<LoginUser> list = map.get("next");
                return list;
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP規則/模式匹配到的2s內連續登陸失敗倆次的用戶的信息:");

        //5.execute
        env.execute();

    }
}

 

案例4-連續和允許組合--了解

從數據源中依次提取"c","a","b"元素

.oneOrMore() 表示一個或多個,不允許組合,但可以不連續

.oneOrMore() + .consecutive() 表示這一個或多個但必須是連續的

.oneOrMore() + .allowCombinations()表示這一個或多個但允許組合

package cn.itcast.cep;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 從數據源中依次提取"c","a","b"元素
 */
public class Demo04_Consecutive {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //2.source
        DataStreamSource<String> sourceDS = env.fromElements("c", "d", "a", "a", "a", "d", "a", "b");


        //3.transformation
        //從數據源中依次提取"c","a","b"元素
        Pattern<String, String> pattern = Pattern.<String>begin("begin").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("c");
            }
        })
        //.next("next")//嚴格的連續模式,該案例中匹配不上
        .followedBy("middle")
        .where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("a");
            }
        })
        //"c", "d", "a", "a", "a", "d", "a", "b"
        //要匹配"c","a","b"
        .oneOrMore()//允許一個或多個
        //([c],[a, a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        //.consecutive()//表示上面的a要求連續
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        //.allowCombinations()//允許組合.比較寬松的條件
        //([c],[a, a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        .followedBy("end")
        .where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("b");
            }
        });


        //TODO 將規則/模式應用到流上
        //注意:要按照用戶id進行分組,因為要的是針對某個用戶的10s內,輸入了TMD 5次
        PatternStream<String> patternDS = CEP.pattern(sourceDS, pattern);

        //TODO 處理/獲取/輸出符合規則/模式的數據
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<String, Object>() {
            @Override
            public Object select(Map<String, List<String>> map) throws Exception {
                List<String> begin = map.get("begin");
                List<String> middle = map.get("middle");
                List<String> end = map.get("end");
                return Tuple3.of(begin, middle, end);
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP規則/模式匹配到的數據:");

        //5.execute
        env.execute();

    }
}

 

案例5-高頻交易風險用戶識別

需求

高頻交易,找出活躍賬戶/交易活躍用戶

在這個場景中,我們模擬賬戶交易信息中,那些高頻的轉賬支付信息,希望能發現其中的風險或者活躍的用戶:

需要找出那些 24 小時內至少 5 次有效交易的賬戶

package cn.itcast.cep;

import cn.itcast.bean.TransactionEvent;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 高頻交易風險用戶識別
 * 高頻交易,找出活躍賬戶/交易活躍用戶
 * 在這個場景中,我們模擬賬戶交易信息中,那些高頻的轉賬支付信息,希望能發現其中的風險或者活躍的用戶:
 * 需要找出那些 24 小時內至少 5 次有效交易的賬戶
 */
public class Demo05_HighFrequencyTrading {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //2.source
        DataStream<TransactionEvent> sourceDS = env.fromElements(
                new TransactionEvent("100XX", 10.0D, 1597905234000L),//2020-08-20 14:33:54
                new TransactionEvent("100XX", 100.0D, 1597905235000L),//2020-08-20 14:33:55
                new TransactionEvent("100XX", 200.0D, 1597905236000L),//2020-08-20 14:33:56
                new TransactionEvent("100XX", 300.0D, 1597905237000L),//2020-08-20 14:33:57
                new TransactionEvent("100XX", 400.0D, 1597905238000L),//2020-08-20 14:33:58
                new TransactionEvent("100XX", 500.0D, 1597905239000L),//2020-08-20 14:33:59
                new TransactionEvent("101XX", 0.0D, 1597905240000L),//2020-08-20 14:34:00
                new TransactionEvent("101XX", 100.0D, 1597905241000L)//2020-08-20 14:34:01
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TransactionEvent>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(TransactionEvent element) {
                return element.getTimeStamp();
            }
        });


        //3.transformation
        //24 小時內至少 5 次 有效 交易的賬戶
        Pattern<TransactionEvent, TransactionEvent> pattern = Pattern.<TransactionEvent>begin("begin").where(new SimpleCondition<TransactionEvent>() {
            @Override
            public boolean filter(TransactionEvent transactionEvent) throws Exception {
                return transactionEvent.getAmount() > 0;//有效交易
            }
        })
        //.times(5)//5次
        .timesOrMore(5)//至少5次
        .within(Time.hours(24));


        //TODO 將規則/模式應用到流上
        //注意:要按照用戶id進行分組,因為要的是針對某個用戶的10s內,輸入了TMD 5次
        PatternStream<TransactionEvent> patternDS = CEP.pattern(sourceDS.keyBy(TransactionEvent::getAccout), pattern);

        //TODO 處理/獲取/輸出符合規則/模式的數據
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<TransactionEvent, Object>() {
            @Override
            public Object select(Map<String, List<TransactionEvent>> map) throws Exception {
                List<TransactionEvent> list = map.get("begin");
                return list;
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP規則/模式匹配到的數據:");

        //5.execute
        env.execute();

    }
}

 

 

案例6

稍微復雜一點點 , 可以提前預習 ,明天講

案例7

稍微復雜一點點 , 可以提前預習,明天講

 

實時監控業務實現

明天講

 

 

 

思考題-解答

https://leetcode-cn.com/problems/reverse-linked-list/

1612175364663

1612175332693

//反轉一個單鏈表。 
//
// 示例: 
//
// 輸入: 1->2->3->4->5->NULL
//輸出: 5->4->3->2->1->NULL 
//
// 進階: 
//你可以迭代或遞歸地反轉鏈表。你能否用兩種方法解決這道題? 
// Related Topics 鏈表 
// 👍 1330 👎 0


//leetcode submit region begin(Prohibit modification and deletion)
/**
 * Definition for singly-linked list.
 * public class ListNode {
 *     int val;
 *     ListNode next;
 *     ListNode(int x) { val = x; }
 * }
 */
/**
 * Definition for singly-linked list.
 * public class ListNode {
 *     int val;
 *     ListNode next;
 *     ListNode(int x) { val = x; }
 * }
 */
class Solution {

    //需求:反轉鏈表
    //輸入: 1->2->3->4->5->NULL
    //輸出: 5->4->3->2->1->NULL
    //注意:參數中的ListNode就是一個鏈表,不需要我們自己實現,鏈表的代碼在注釋中
    public ListNode reverseList(ListNode head) {
        //思路:要反轉一個長鏈表,首先要反轉其中的一部分短鏈表,所以可以用循環或遞歸去做
        //head為空(空鏈表)或head的下一個為空(只有一個元素的鏈表),那么反轉后就是空或該節點,所以直接返回head即可
        if(head == null || head.next == null) return head;
        //能走到下面鏈表不為空且長度>=2
        ListNode newHead = reverseList(head.next); //reverseList(head.next)就是 反轉2->3->4->5->NULL
        //反轉完了2->3->4->5->NULL---> 5 4 3 2  null
        //那么接下來應該要變成  5 4 3 2  1 null 才對
        //要讓2指向1,2是誰,2是原來的head.next,他要指向1(head),所以head.next.next=head
        //下面的代碼的意思讓1(head)的下一個節點(2)的下一個節點要指向1(head),所以指向下面的代碼:
        head.next.next = head;
        //接下來讓1指向null
        head.next = null;
        //最后返回新的頭(5)
        return newHead;
    }
}

//leetcode submit region end(Prohibit modification and deletion)

 

 

思考題

 

https://leetcode-cn.com/problems/3sum/

1612347445062

1612347414912

返回數組中3個相加==0 的 3元祖組成的集合 List< List< Integer >>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM