Flink+kafka實現Wordcount實時計算


Flink介紹:

Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把所有任務當成流來處理,這也是其最大的特點。Flink 可以支持本地的快速迭代,以及一些環形的迭代任務。

Flink的特性:

Flink是個分布式流處理開源框架:
1>. 即使數據源是無序的或者晚到達的數據,也能保持結果准確性
2>. 有狀態並且容錯,可以無縫的從失敗中恢復,並可以保持exactly-once
3>. 大規模分布式
4>. 實時計算場景的廣泛應用(阿里雙十一實時交易額使用的Blink就是根據Flink改造而來)

Flink可以確保僅一次語義狀態計算;Flink有狀態意味着,程序可以保持已經處理過的數據;
Flink支持流處理和窗口事件時間語義,Flink支持靈活的基於時間窗口,計數,或會話數據驅動的窗戶;
Flink容錯是輕量級和在同一時間允許系統維持高吞吐率和提供僅一次的一致性保證,Flink從失敗中恢復,零數據丟失;
Flink能夠高吞吐量和低延遲;
Flink保存點提供版本控制機制,從而能夠更新應用程序或再加工歷史數據沒有丟失並在最小的停機時間。

2. Kafka

Kafka介紹

Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

Kafka特性

Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
1>. 通過磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
2>. 高吞吐量即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
3>. 支持通過Kafka服務器和消費機集群來分區消息。
4>. 支持Hadoop並行數據加載。

Kafka的安裝配置及基礎使用

因為此篇博客是本地Flink消費Kafka的數據實現WordCount,所以Kafka不需要做過多配置,從Apache官網下載安裝包直接解壓即可使用
這里我們創建一個名為test的topic
在producer輸入數據流:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在consumer監控從producer輸入的數據流:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

1>. 創建maven project

<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>1.0.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>1.0.0</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>1.0.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
			<version>1.0.0</version>
		</dependency>
	</dependencies>
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4>. 設置監控數據流時間間隔(官方叫狀態與檢查點)

env.enableCheckpointing(1000);

5>. 配置kafka和zookeeper的ip和端口

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties);

7>. 將Kafka的數據轉成flink的DataStream類型

DataStream<String> stream = env.addSource(myConsumer);

8>. 實施計算模型並輸出結果

DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

counts.print();

計算模型具體邏輯代碼

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		private static final long serialVersionUID = 1L;

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			String[] tokens = value.toLowerCase().split("\\W+");
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<String, Integer>(token, 1));
				}
			}
		}
	}

4. 驗證

1>. Kafka producer輸入

2>. Flink客戶端立刻得出結果

完整代碼

package com.scn;

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class FilnkCostKafka {
	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(1000);

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
		properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
		properties.setProperty("group.id", "test");

		FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
				properties);

		DataStream<String> stream = env.addSource(myConsumer);

		DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

		counts.print();

		env.execute("WordCount from Kafka data");
	}

	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		private static final long serialVersionUID = 1L;

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			String[] tokens = value.toLowerCase().split("\\W+");
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<String, Integer>(token, 1));
				}
			}
		}
	}

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM