在這篇文章里,我們模擬了一個場景,實時分析訂單數據,統計實時收益。
場景模擬
我試圖覆蓋工程上最為常用的一個場景:
1)首先,向Kafka里實時的寫入訂單數據,JSON格式,包含訂單ID-訂單類型-訂單收益
2)然后,spark-streaming每十秒實時去消費kafka中的訂單數據,並以訂單類型分組統計收益
3)最后,spark-streaming統計結果實時的存入本地MySQL。
前提條件
安裝
1)spark:我使用的yarn-client模式下的spark,環境中集群客戶端已經搞定
2)zookeeper:我使用的是這個集群:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
3)kafka:我使用的是standalone模式:10.93.21.21:9093
4)mysql:10.93.84.53:3306
語言
python:pykafka,pip install pykafka
java:spark,spark-streaming
下面開始
1、數據寫入kafka
- kafka寫入
我們使用pykafka模擬數據實時寫入,代碼如下:
kafka_producer.py
# -* coding:utf8 *- import time import json import uuid import random import threading from pykafka import KafkaClient # 創建kafka實例 hosts = '10.93.21.21:9093' client = KafkaClient(hosts=hosts) # 打印一下有哪些topic print client.topics # 創建kafka producer句柄 topic = client.topics['kafka_spark'] producer = topic.get_producer() # work def work(): while 1: msg = json.dumps({ "id": str(uuid.uuid4()).replace('-', ''), "type": random.randint(1, 5), "profit": random.randint(13, 100)}) producer.produce(msg) # 多線程執行 thread_list = [threading.Thread(target=work) for i in range(10)] for thread in thread_list: thread.setDaemon(True) thread.start() time.sleep(60) # 關閉句柄, 退出 producer.stop()
可以看到,我們寫入的形式是一個json,訂單id是一個uuid,訂單類型type從1-5隨機,訂單收益profit從13-100隨機,形如
{"id": ${uid}, "type": 1, "profit": 30}
注意:1)python對kafka的讀寫不需要借助zookeeper,2)使用多線程的形式寫入,讓數據量具有一定的規模。
執行producer,會持續寫入數據1分鍾。
python kafka_producer.py
- 驗證一下
kafka_consumer.py
# -* coding:utf8 *- from pykafka import KafkaClient hosts = '10.93.21.21:9093' client = KafkaClient(hosts=hosts) # 消費者 topic = client.topics['kafka_spark'] consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id='test') for message in consumer: if message is not None: print message.offset, message.value
執行,可以消費kafka剛才寫入的數據
python kafka_consumer.py
2、spark-streaming
1)先解決依賴
其中比較核心的是spark-streaming和kafka集成包spark-streaming-kafka_2.10,還有spark引擎spark-core_2.10
json和mysql看大家愛好。
pom.xml
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.19</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> </dependencies>
2)MySQL准備
- 建表
我們的結果去向是MySQL,先建立一個結果表。
id:主鍵,自增id
type:訂單類型
profit:每個spark batch聚合出的訂單收益結果
time:時間戳
CREATE TABLE `order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `type` int(11) DEFAULT NULL, `profit` int(11) DEFAULT NULL, `time` mediumtext, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
- Java客戶端
采用了單例線程池的模式簡單寫了一下。
ConnectionPool.java
package com.xiaoju.dqa.realtime_streaming; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; public class ConnectionPool { private static LinkedList<Connection> connectionQueue; static { try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public synchronized static Connection getConnection() { try { if (connectionQueue == null) { connectionQueue = new LinkedList<Connection>(); for (int i = 0; i < 5; i++) { Connection conn = DriverManager.getConnection( "jdbc:mysql://10.93.84.53:3306/big_data", "root", "1234"); connectionQueue.push(conn); } } } catch (Exception e) { e.printStackTrace(); } return connectionQueue.poll(); } public static void returnConnection(Connection conn){connectionQueue.push(conn);} }
3)代碼實現
我用java寫的,不會用scala很尷尬。
即時用java整個的處理過程依然比較簡單。跟常見的wordcount也沒有多大的差別。
SparkStreaming特點
spark的特點就是RDD,通過對RDD的操作,來屏蔽分布式運算的復雜度。
而spark-streaming的操作對象是RDD的時間序列DStream,這個序列的生成是跟batch的選取有關。例如我這里Batch是10s一個,那么每隔10s會產出一個RDD,對RDD的切割和序列的生成,spark-streaming對我們透明了。唯一暴露給我們的DStream和原生RDD的使用方式基本一致。
這里需要講解一下MySQL寫入注意的事項。
MySQL寫入
在處理mysql寫入時使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。
這樣做的原因是:
1)你無法再Driver端創建mysql句柄,並通過序列化的形式發送到worker端
2)如果你在處理rdd中創建mysql句柄,很容易對每一條數據創建一個句柄,在處理過程中很快內存就會溢出。
OrderProfitAgg.java
package com.xiaoju.dqa.realtime_streaming; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.sql.Connection; import java.sql.Statement; import java.util.*; /* * 生產者可以選用kafka自帶的producer腳本 * bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test * */ public class OrderProfitAgg { public static void main(String[] args) throws InterruptedException { /* * kafka所注冊的zk集群 * */ String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181"; /* * spark-streaming消費kafka的topic名稱, 多個以逗號分隔 * */ String topics = "kafka_spark,kafka_spark2"; /* * 消費組 group * */ String group = "bigdata_qa"; /* * 消費進程數 * */ int numThreads = 2; /* * 選用yarn隊列模式, spark-streaming程序的app名稱是"order profit" * */ SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit"); /* * 創建sc, 全局唯一, 設置logLevel可以打印一些東西到控制台 * */ JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.setLogLevel("WARN"); /* * 創建jssc, spark-streaming的batch是每10s划分一個 * */ JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10)); /* * 准備topicMap * */ Map<String ,Integer> topicMap = new HashMap<String, Integer>(); for (String topic : topics.split(",")) { topicMap.put(topic, numThreads); } /* * kafka數據流 * */ List<JavaPairReceiverInputDStream<String, String>> streams = new ArrayList<JavaPairReceiverInputDStream<String, String>>(); for (int i = 0; i < numThreads; i++) { streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap)); } /* * 從kafka消費數據的RDD * */ JavaPairDStream<String, String> streamsRDD = streams.get(0); for (int i = 1; i < streams.size(); i++) { streamsRDD = streamsRDD.union(streams.get(i)); } /* * kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35} * 統計結果, 以type分組的總收益 * mapToPair, 將kafka消費的數據, 轉化為type-profit key-value對 * reduceByKey, 以type分組, 聚合profit * */ JavaPairDStream<Integer, Integer> profits = streamsRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Tuple2<String, String> s_tuple2) throws Exception { JSONObject jsonObject = JSON.parseObject(s_tuple2._2); return new Tuple2<Integer, Integer>(jsonObject.getInteger("type"), jsonObject.getInteger("profit")); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); /* * 輸出結果到MySQL * 需要為每一個partition創建一個MySQL句柄, 使用foreachPartition * */ profits.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() { @Override public Void call(JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD) throws Exception { integerIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, Integer>>>() { @Override public void call(Iterator<Tuple2<Integer, Integer>> tuple2Iterator) throws Exception { Connection connection = ConnectionPool.getConnection(); Statement stmt = connection.createStatement(); long timestamp = System.currentTimeMillis(); while(tuple2Iterator.hasNext()) { Tuple2<Integer, Integer> tuple = tuple2Iterator.next(); Integer type = tuple._1; Integer profit = tuple._2; String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp); stmt.executeUpdate(sql); } ConnectionPool.returnConnection(connection); } }); return null; } }); /* * 開始計算, 等待計算結束 * */ jssc.start(); try { jssc.awaitTermination(); } catch (Exception ex) { ex.printStackTrace(); } finally { jssc.close(); } } }
4)打包方法
編寫pom.xml build tag。
mvn clean package打包即可。
pom.xml
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <!--這里要替換成jar包main方法所在類 --> <!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>--> <mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包節點執行jar包合並操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build>
3、執行與結果
1)執行kafka_producer.py
python kafka_producer.py
2) 執行spark-streaming
這里使用的是默認參數提交yarn隊列。
spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
3)查看結果
到MySQL中查看結果,每隔10秒會聚合出type=1-5的5條數據。
例如第一條數據,就是type=4這種類型的業務,在10s內收益是555473元。業務量驚人啊。哈哈。
完結。