1.概述
在流數據應用場景中,往往會通過Flink消費Kafka中的數據,然后將這些數據進行結構化到HDFS上,再通過Hive加載這些文件供后續業務分析。今天筆者為大家分析如何使用Flink消費Kafka的數據后,將消費后的數據結構化到Hive數據倉庫中。
2.內容
Hive能夠識別很多類型的文件,其中包含Parquet文件格式。因此,我們只需要將Flink消費Kafka后的數據以Parquet文件格式生成到HDFS上,后續Hive就可以將這些Parquet文件加載到數據倉庫中。具體流程圖如下所示:

2.1 Flink On YARN
實現整個案例,我們需要Hadoop環境、Kafka環境、Flink環境、Hive環境。這里,筆者只介紹Flink環境的部署,其他環境可自行搜索部署方案。關於Flink On YARN的安裝步驟如下:
2.1.1 准備安裝包
【官方下載地址】
2.2.2 解壓
解壓命令如下所示:
# 解壓Flink安裝包並重名名為flink tar -zxvf flink-1.7.1-bin-hadoop27-scala_2.12.tgz && mv flink-1.7.1 flink # 配置環境變量 vi ~/.bash_profile # 添加如下內容 export FLINK_HOME=/data/soft/new/flink export PATH=$PATH:$FLINK_HOME/bin # 保存並退出
Flink On YARN有兩種模式,分別是Flink Session和Flink Job On YARN。
2.2.3 Flink Session
Flink Session命令如下:
# 啟動一個Flink Session
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
各個參數含義如下:
| 參數 | 含義 |
| -n 2 | 指定2個容器 |
| -jm 1024 | JobManager內存為1024MB |
| -tm 1024 | TaskManager內存為1024MB |
| -d | 任務后台運行 |
如果你不想讓Flink YARN客戶端一直運行,也可以啟動分離的YARN Session,通過參數-d來實現。這種情況下Flink YARN客戶端只會將Flink提交給集群,然后自行關閉。需要注意的是,這種情況無法使用Flink停止YARN會話,需要使用YARN的命令來停止,命令如下:
yarn application -kill <appId>
2.2.4 Flink On YARN
命令如下:
# yarn-cluster模式提交Flink任務
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 WordCount.jar
各個參數含義如下:
| 參數 | 含義 |
| -m yarn-cluster | 連接指定集群,如使用標識yarn-cluster |
| -yn 2 | 2個容器 |
| -yjm 1024 | JobManager內存為1024MB |
| -ytm | TaskManager內存為1024MB |
如果不知道提交隊列,任務會提交到默認隊列中,如果需要指定提交隊列,可以使用參數-yqu queue_01進行提交。
3.消費Kafka並生成Parquet文件
准備一個Topic的Schema類TopicSource,TopicSource類定義如下:
public class TopicSource { private long time; private String id; public long getTime() { return time; } public void setTime(long time) { this.time = time; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
編寫一個生成Parquet的Flink類FlinkParquetUtils,具體代碼實現如下:
/** * Consumer kafka topic & convert data to parquet. * * @author smartloli. * * Created by Feb 24, 2019 */ public class FlinkParquetUtils { private final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final static Properties props = new Properties(); static { /** Set flink env info. */ env.enableCheckpointing(60 * 1000); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /** Set kafka broker info. */ props.setProperty("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); props.setProperty("group.id", "flink_group_parquet"); props.setProperty("kafka.topic", "flink_parquet_topic_d"); /** Set hdfs info. */ props.setProperty("hdfs.path", "hdfs://cluster1/flink/parquet"); props.setProperty("hdfs.path.date.format", "yyyy-MM-dd"); props.setProperty("hdfs.path.date.zone", "Asia/Shanghai"); props.setProperty("window.time.second", "60"); } /** Consumer topic data && parse to hdfs. */ public static void getTopicToHdfsByParquet(StreamExecutionEnvironment env, Properties props) { try { String topic = props.getProperty("kafka.topic"); String path = props.getProperty("hdfs.path"); String pathFormat = props.getProperty("hdfs.path.date.format"); String zone = props.getProperty("hdfs.path.date.zone"); Long windowTime = Long.valueOf(props.getProperty("window.time.second")); FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), props); KeyedStream<TopicSource, String> KeyedStream = env.addSource(flinkKafkaConsumer010).map(FlinkParquetUtils::transformData).assignTimestampsAndWatermarks(new CustomWatermarks<TopicSource>()).keyBy(TopicSource::getId); DataStream<TopicSource> output = KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(windowTime))).apply(new WindowFunction<TopicSource, TopicSource, String, TimeWindow>() { /** * */ private static final long serialVersionUID = 1L; @Override public void apply(String key, TimeWindow timeWindow, Iterable<TopicSource> iterable, Collector<TopicSource> collector) throws Exception { iterable.forEach(collector::collect); } }); // Send hdfs by parquet DateTimeBucketAssigner<TopicSource> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone)); StreamingFileSink<TopicSource> streamingFileSink = StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(TopicSource.class)).withBucketAssigner(bucketAssigner).build(); output.addSink(streamingFileSink).name("Sink To HDFS"); env.execute("TopicData"); } catch (Exception ex) { ex.printStackTrace(); } } private static TopicSource transformData(String data) { if (data != null && !data.isEmpty()) { JSONObject value = JSON.parseObject(data); TopicSource topic = new TopicSource(); topic.setId(value.getString("id")); topic.setTime(value.getLong("time")); return topic; } else { return new TopicSource(); } } private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<TopicSource> { /** * */ private static final long serialVersionUID = 1L; private Long cuurentTime = 0L; @Nullable @Override public Watermark checkAndGetNextWatermark(TopicSource topic, long l) { return new Watermark(cuurentTime); } @Override public long extractTimestamp(TopicSource topic, long l) { Long time = topic.getTime(); cuurentTime = Math.max(time, cuurentTime); return time; } } public static void main(String[] args) { getTopicToHdfsByParquet(env, props); } }
然后將編寫好的應用程序進行打包,這里我們可以利用Maven命令,可以很方便的進行打包,在pom.xml文件中添加如下插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.smartloli.kafka.connector.flink.hdfs.FlinkParquetUtils</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
然后使用如下命令進行編譯打包:
mvn clean && mvn assembly:assembly
最后將打包的JAR上傳到Flink集群。
4.運行及預覽
將應用程序的JAR上傳到Flink集群后,執行如下命令進行提交:
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -yqu hadoop kafka-connector-flink-parquet.jar
查看ResourceManager的頁面,提交任務如下:

在代碼中,我們在HDFS上以日期yyyy-MM-dd的格式進行生產,結果如下:

5.總結
在編寫Flink應用程序的時候,建議使用Maven來管理項目,這樣添加依賴JAR的時候,只需將依賴的信息添加到pom.xml文件即可。打包的時候,同樣使用Maven命令,這樣應用程序所依賴的JAR包均會打包進行,防止遺漏導致提交任務時失敗。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。
