1. 寫在前面
在大數據實時計算方向,天貓雙11的實時交易額是最具權威性的,當然技術架構也是相當復雜的,不是本篇博客的簡單實現,因為天貓雙11的數據是多維度多系統,實時粒度更微小的。當然在技術的總體架構上是相近的,主要的組件都是用到大數據實時計算組件Flink(當然阿里是用了基於Flink深度定制和優化改裝的Blink)。下圖是天貓雙11實時交易額的大體架構模型及數據流向(參照https://baijiahao.baidu.com/s?id=1588506573420812062&wfr=spider&for=pc)
2. 仿天貓雙11實時交易額技術架構
利用Linux shell自動化模擬每秒鍾產生一條交易額數據,數據內容為用戶id,購買商品的付款金額,用戶所在城市及所購買的商品
技術架構上利用Filebeat去監控每生產的一條交易額記錄,Filebeat將交易額輸出到Kafka(關於Filebeat和kafka的安裝或應用請參照之前的博客),然后編寫Flink客戶端程序去實時消費Kafka數據,對數據進行兩塊計算,一塊是統計實時總交易額,一塊是統計不同城市的實時交易額
技術架構圖
3.具體實現
3.1. 模擬交易額數據double11.sh腳本
#!/bin/bash
i=1
for i in $(seq 1 60)
do
customernum=`openssl rand -base64 8 | cksum | cut -c1-8`
pricenum=`openssl rand -base64 8 | cksum | cut -c1-4`
citynum=`openssl rand -base64 8 | cksum | cut -c1-2`
itemnum=`openssl rand -base64 8 | cksum | cut -c1-6`
echo "customer"$customernum","$pricenum",""city"$citynum",""item"$itemnum >> /home/hadoop/tools/double11/double11.log
sleep 1
done
將double11.sh放入Linux crontab
#每分鍾執行一次
* * * * * sh /home/hadoop/tools/double11/double11.sh
3.2. 實時監控double11.log
Filebeat實時監控double11.log產生的每條交易額記錄,將記錄實時流向到Kafka的topic,這里只需要對Filebeat的beat-kafka.yml做簡單配置,kafka只需要啟動就好
3.3. 核心:編寫Flink客戶端程序
這里將統計實時總交易額和不同城市的實時交易額區分寫成兩個類(只提供Flink Java API)
需要導入的maven依賴
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
統計實時總交易額代碼
package com.fastweb;
import java.util.Properties;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
public class Double11Sum {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print();
env.execute("Double 11 Real Time Transaction Volume");
}
//統計總的實時交易額
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
JSONObject object = JSONObject.parseObject(value);
String message = object.getString("message");
Integer price = Integer.parseInt(message.split(",")[1]);
out.collect(new Tuple2<String, Integer>("price", price));
}
}
}
統計不同城市的實時交易額
package com.fastweb;
import java.util.Properties;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
public class Double11SumByCity {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Tuple2<String, Integer>> cityCounts = stream.flatMap(new CitySplitter()).keyBy(0).sum(1);
cityCounts.print();
env.execute("Double 11 Real Time Transaction Volume");
}
//按城市分類匯總實時交易額
public static final class CitySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
JSONObject object = JSONObject.parseObject(value);
String message = object.getString("message");
Integer price = Integer.parseInt(message.split(",")[1]);
String city = message.split(",")[2];
out.collect(new Tuple2<String, Integer>(city, price));
}
}
}
代碼解釋:這里可以方向兩個類里面只有flatMap的對數據處理的內部類不同,但兩個內部類的結構基本相同,在內部類里面利用fastjson解析了一層獲取要得到的數據,這是因為經過Filebeat監控的數據是json格式的,Filebeat這樣實現是為了在正式的系統上確保每條數據的來源IP,時間戳等信息
3.4. 驗證
啟動Double11Sum類的main方法就可以得到實時的總交易額,按城市分類的實時交易額也一樣,這個結果是實時更新的,每條記錄都是新的