大數據必須了解的Flink實時數據架構


  本文從上述現狀及實時數據需求出發,結合工業界案例、筆者的實時數據開發經驗, 梳理總結了實時數據體系建設的總體方案。

  作者:劉大龍@唯品會;來源:Flink 中文社區

  隨着互聯網的發展進入下半場,數據的時效性對企業的精細化運營越來越重要, 商場如戰場,在每天產生的海量數據中,如何能實時有效的挖掘出有價值的信息, 對企業的決策運營策略調整有很大幫助。此外,隨着 5G 技術的成熟、廣泛應用, 對於工業互聯網、物聯網等數據時效性要求非常高的行業,企業就更需要一套完整成熟的實時數據體系來提高自身的行業競爭力。

  本文從上述現狀及實時數據需求出發,結合工業界案例、筆者的實時數據開發經驗, 梳理總結了實時數據體系建設的總體方案,本文主要分為三個部分:

  第一部分主要介紹了當下在工業界比較火熱的實時計算引擎 Flink 在實時數據體系建設過程中主要的應用場景及對應解決方案;第二部分從實時數據體系架構、實時數據模型分層、實時數據體系建設方式、流批一體實時數據架構發展等四個方面思考了實時數據體系的建設方案;第三部分則以一個具體案例介紹如何使用 Flink SQL 完成實時數據統計類需求。

  一、Flink 實時應用場景

  目前看來,Flink 在實時計算領域內的主要應用場景主要可分為四類場景, 分別是實時數據同步、流式 ETL、實時數據分析和復雜事件處理,具體的業務場景和對應的解決方案可詳細研究下圖, 文字層面不再詳述。

  ss/202104/28/c751e39d76b71542872d04e0e6971eac.jpg" _fcksavedurl="s5.51cto/oss/202104/28/c751e39d76b71542872d04e0e6971eac.jpg" target="_blank">

  ss/202104/28/858b3962449f8965f92ee85087656206.jpg" _fcksavedurl="s4.51cto/oss/202104/28/858b3962449f8965f92ee85087656206.jpg" target="_blank">

  ss/202104/28/3acd52145f106e9f680cd411a172257b.jpg" _fcksavedurl="s3.51cto/oss/202104/28/3acd52145f106e9f680cd411a172257b.jpg" target="_blank">

  ss="dp-sql">

  ss="alt">ss="keyword">public class PageViewDeserializationSchema implements DeserializationSchema { ss="alt"> ss="keyword">public ss="keyword">static final Logger LOG=LoggerFactory.getLogger(PageViewDeserializationSchema.class); protected SimpleDateFormat dayFormatter; ss="alt"> private final RowTypeInfo rowTypeInfo; ss="alt"> ss="keyword">public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){ ss="alt"> dayFormatter=new SimpleDateFormat(ss="string">"yyyyMMdd", Locale.UK); this.rowTypeInfo=rowTypeInfo; ss="alt"> } @Override ss="alt"> ss="keyword">public Row deserialize(byte[] message) throws IOException { Row row=new Row(rowTypeInfo.getArity()); ss="alt"> MobilePage mobilePage=ss="op">null; try { ss="alt"> mobilePage=MobilePage.parseFrom(message); String mid=mobilePage.getMid(); ss="alt"> row.setField(0, mid); Long timeLocal=mobilePage.getTimeLocal(); ss="alt"> String logDate=dayFormatter.format(timeLocal); row.setField(1, logDate); ss="alt"> row.setField(2, timeLocal); }catch (Exception e){ ss="alt"> String mobilePageError=(mobilePage !=ss="op">null) ? mobilePage.toString() : ss="string">""; LOG.error(ss="string">"error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e); ss="alt"> } ss="keyword">return ss="op">null; ss="alt"> }

  3.2 編寫 Flink Job 主程序

  將 PV 數據解析為 Flink 的 Row 類型后,接下來就很簡單了,編寫主函數,寫 SQL 就能統計 UV 指標了,代碼如下:

  ss="dp-sql">ss="alt">ss="keyword">public class RealtimeUV { ss="alt"> ss="keyword">public ss="keyword">static void main(String[] args) throws Exception { //step1 從properties配置文件中解析出需要的Kakfa、Hbase配置信息、ss="keyword">checkpoint參數信息 ss="alt"> Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]); String topic = config.get(ss="string">"source.kafka.topic"); ss="alt"> String groupId = config.get(ss="string">"source.id"); String sourceBootStrapServers = config.get(ss="string">"source.bootstrap.servers"); ss="alt"> String hbaseTable = config.get(ss="string">"hbase.table.name"); String hbaseZkQuorum = config.get(ss="string">"hbase.zk.quorum"); ss="alt"> String hbaseZkParent = config.get(ss="string">"hbase.zk.parent"); ss="keyword">int checkPointPeriod = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.period")); ss="alt"> ss="keyword">int checkPointTimeout = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.timeout")); ss="alt"> StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //step2 設置ss="keyword">Checkpoint相關參數,用於Failover容錯 ss="alt"> sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class, ProtobufSerializer.class); ss="alt"> sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(ss="keyword">false); sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); ss="alt"> sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE); sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout); ss="alt"> sEnv.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ss="alt"> //step3 使用Blink planner、創建TableEnvironment,並且設置狀態過期時間,避免Job OOM ss="alt"> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() ss="alt"> .inStreamingMode() .build(); ss="alt"> StreamTableEnvironment tEnv = StreamTableEnvironment.ss="keyword">create(sEnv, environmentSettings); tEnv.getConfig().setIdleStateRetentionTime(ss="keyword">Time.days(1), ss="keyword">Time.days(2)); ss="alt"> Properties sourceProperties = new Properties(); ss="alt"> sourceProperties.setProperty(ss="string">"bootstrap.servers", sourceBootStrapServers); sourceProperties.setProperty(ss="string">"automit.interval.ms", ss="string">"3000"); ss="alt"> sourceProperties.setProperty(ss="string">"group.id", groupId); ss="alt"> //step4 初始化KafkaTableSource的ss="keyword">Schema信息,筆者這里使用register TableSource的方式將源表注冊到Flink中,而沒有用register DataStream方式,也是因為想熟悉一下如何注冊KafkaTableSource到Flink中 TableSchema ss="keyword">schema = TableSchemaUtil.getAppPageViewTableSchema(); ss="alt"> Optional proctimeAttribute = Optional.empty(); List rowtimeAttributeDescriptors = Collections.emptyList(); ss="alt"> Map<String, String> fieldMapping = new HashMap<>(); List columnNames = new ArrayList<>(); ss="alt"> RowTypeInfo rowTypeInfo = new RowTypeInfo(ss="keyword">schema.getFieldTypes(), ss="keyword">schema.getFieldNames()); columnNames.addAll(Arrays.asList(ss="keyword">schema.getFieldNames())); ss="alt"> columnNames.forEach(ss="keyword">name -> fieldMapping.put(ss="keyword">name, ss="keyword">name)); PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema( ss="alt"> rowTypeInfo); Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); ss="alt"> Kafka011TableSource kafkaTableSource = new Kafka011TableSource( ss="keyword">schema, ss="alt"> proctimeAttribute, rowtimeAttributeDescriptors, ss="alt"> Optional.ss="keyword">of(fieldMapping), topic, ss="alt"> sourceProperties, deserializationSchema, ss="alt"> StartupMode.EARLIEST, specificOffsets); ss="alt"> tEnv.registerTableSource(ss="string">"pageview", kafkaTableSource); ss="alt"> //step5 初始化Hbase TableSchema、寫入參數,並將其注冊到Flink中 HBaseTableSchema hBaseTableSchema = new HBaseTableSchema(); ss="alt"> hBaseTableSchema.setRowKey(ss="string">"log_date", String.class); hBaseTableSchema.addColumn(ss="string">"f", ss="string">"UV", Long.class); ss="alt"> HBaseOptions hBaseOptions = HBaseOptions.builder() .setTableName(hbaseTable) ss="alt"> .setZkQuorum(hbaseZkQuorum) .setZkNodeParent(hbaseZkParent) ss="alt"> .build(); HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder() ss="alt"> .setBufferFlushMaxRows(1000) .setBufferFlushIntervalMillis(1000) ss="alt"> .build(); HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions); ss="alt"> tEnv.registerTableSink(ss="string">"uv_index", hBaseSink); ss="alt"> //step6 實時計算當天UV指標sql, 這里使用最簡單的ss="keyword">group ss="keyword">by agg,沒有使用minibatch或窗口,在大數據量優化時最好使用后兩種方式 String uvQuery = ss="string">"insert into uv_index " ss="alt"> + ss="string">"select log_date,

  " + ss="string">"ROW(count(distinct mid) as UV)

  " ss="alt"> + ss="string">"from pageview

  " + ss="string">"group by log_date"; ss="alt"> tEnv.sqlUpdate(uvQuery); //step7 執行Job ss="alt"> sEnv.ss="keyword">execute(ss="string">"UV Job"); } ss="alt">}

  以上就是一個簡單的使用 Flink SQL 統計 UV 的 case, 代碼非常簡單,只需要理清楚如何解析 Kafka 中數據,如何初始化 Table Schema,以及如何將表注冊到 Flink中,即可使用 Flink SQL 完成各種復雜的實時數據統計類的業務需求,學習成本比API 的方式低很多。說明一下,筆者這個 demo 是基於目前業務場景而開發的,在生產環境中可以真實運行起來,可能不能拆箱即用,你需要結合自己的業務場景自定義相應的 kafka 數據解析類。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM