這兩天主要是做了中國移動的實時數據分析一個小項目(可以說是demo了),這里記錄下來整個過程里面遇到的坑,首先安裝好flume,kafka,spark(基於代碼本地運行可以不安裝),redis,zookeeper 主要是為了熟悉一下整個的一個spark-streaming的一個整個流程,還有就是了解調優的地方。
上述假設已經安裝好了相應的組件,然后就開始正式的踩坑之路:
1.編寫一個java程序去讀取原始數據文件,模擬1s進行文件的插入一行,原始的數據文件格式如下:
坑a
.整個的數據格式是json,但是是一整行的。。。。
解決a1:於是就想這去把這樣的數據轉化為json格式的,就去搗鼓了一下notepad++轉json格式的方法:notepad++上面的菜單欄中,插件-> plugins Admin..->search中直接查找就好了,然后找找有個install的按鈕點擊一下就ok了,然后各種確定,之后notepad++會自動重啟,重啟之后上面的菜單欄中,插件->就會多出一個JSON Viewer,然后就可以了。但是我操作的時候遇到了notepad++重啟之后沒有出現JSON Viewer(但是后來又出現了),
解決a2:於是又去找了idea實現json格式的方法:setting->keymap->main enum->code->reformat code 這個功能是將文本格式化,該功能的快捷鍵默認是ctrl+shift+l,但是這個快捷鍵組合是有沖突的,所以將其轉化為ctrl+shift+s,修改后進行保存,然后創建一個xxx.json的文件,復制一行json數據到該文件中,然后全選,按下ctrl+shift+s即可轉化為標准的json文件格式
相應的java實現代碼如下:
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class WriteCMCC {
public static void main(String[] args) {
List<String> allLines = getCmcc(args[0]);
System.out.println(allLines.size());
writeCmcc(allLines, args[1]);
}
/**
* 一次性讀取cmcc中的數據
* @return 存放在list中
*/
private static List<String> getCmcc(String path) {
BufferedReader br = null;
List<String> allLines = new ArrayList<String>();
try {
br = new BufferedReader(new FileReader(new File(path)));
String line = "";
while ((line = br.readLine()) != null) {
allLines.add(line);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (br != null) br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return allLines;
}
/**
* 寫入cmcc中的數據,一次寫入一個list的數據集
*/
private static void writeCmcc(List<String> cmcc, String path) {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(new File(path)));
for(String line : cmcc) {
bw.write(line);
bw.flush();
Thread.sleep(1000);
bw.newLine();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (bw != null) bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
代碼寫好,然后測試完,然后打成jar包,丟到Linux准備運行。
java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.log
2.flume編寫相應的conf去把數據抽取到kafka中(cmcc.conf)
先啟動zookeeper,啟動kafka並創建topic(cmcc):
zookeeper啟動命令:
/home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每個節點都需要啟動)
kafka啟動命令:
/home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
kafka創建topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor
kafka查看所有的topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list
然后編寫conf測試(cmcc.conf):
a1.sources = s1
a1.channels = c1
#這里先不使用該種方式去讀取文件,因為該方式flume會出如下的錯誤
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#原因:出現這個問題的原因是,當我們拷貝一個文件的時候,一些對文件進行了修改
#解決:最好的方法就是,確保大文件完全拷貝后,再讓flume來讀取,思路是將拷貝中的文件加上一個多余的后綴,flume一開始不會讀取文件,當文件拷貝完成后去掉多余的后綴,這個時候flume就會針對新文件進行讀取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true
a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#創建好相應的topic
a1.channels.c1.kafka.topic = cmcc
#這個是自己定義的沒啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#這個一定要設置,否則就是個坑,寫入到kafka中的數據會被追加進一些數據,而且還是亂碼的
a1.channels.c1.parseAsFlumeEvent = false
#拼接source和channel
a1.sources.s1.channels=c1
flume啟動命令:下面的a1就對應着上面的a1(控制台打印信息)
bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console
3.spark程序去讀取kafka的中的數據並將結果存放至redis中
啟動redis:/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.conf
程序相應的配置:resources -> application.conf
#kafka的相關參數 kafka.topic = "cmcc" kafka.broker.list="os1:9092,os2:9092,os3:9092" kafka.group.id="cmcc" redis.host="xxx.xxx.xxx.xxx" redis.db.index="0"
主程序代碼:scala -> BootStarpApp
package app import java.text.SimpleDateFormat import com.alibaba.fastjson.JSON import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import utils.{AppParams, Jpools} object BootStarpApp { def main(args: Array[String]): Unit = { /** * 錯誤集: * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer * 錯誤解釋,kafka在進行序列化實例對象的時候出錯 * 查找原因: * org.codehaus.jackson.map.deser.std.StringDeserializer是我們AppParas中導入的類型,可能是導錯了, * 查看后發現應該導入:import org.apache.kafka.common.serialization.StringDeserializer * 2. 程序出現INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序不再執行下去 * 原因:因為kafka-clent程序默認讀取到kafka上的信息之后將host:os3返回作為主機節點去獲取數據,但是在本機中沒有配置相應的host與ip的映射,所有這里就無法直接進行訪問os3 * 解決辦法;在windows中配置相應的ip與hostname的映射(kafka中的broker節點) * 3.json解析出錯:error parse false * 原因json格式錯誤 * * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false 1.7以后默認為true * 如果設置此項為 true,Kafka Sink 則會把數據按照標准的 Flume Event 格式(即Headers域和body域結合的數據結構)發送。Flume Event 中的 Headers 域通常是一些附加字段,可以是時間戳(比如時間戳攔截器指定的時間戳)、文件名(比如 spooldir Source 開啟的 fileHeader = true)等信息。但是 1.7.0 版本的 Flume 一旦開啟此配置,會導致 Headers 域里面的信息亂碼 * * 5.flume異常崩潰 File has been modified since being read * 原因:出現這個問題的原因是,當我們拷貝一個文件的時候,一些對文件進行了修改,就會出現這個錯誤 * 解決:最好的方法就是,確保大文件完全拷貝后,再讓flume來讀取,思路是將拷貝中的文件加上一個多余的后綴,flume一開始不會讀取文件,當文件拷貝完成后去掉多余的后綴,這個時候flume就會針對新文件進行讀取。 * 另外針對大文件,flume的解決方案可以設置一個文件完成后綴: */ val sparkConf = new SparkConf() sparkConf.setAppName("中國移動運營實時監控平台") sparkConf.setMaster("local[*]") /** *將rdd以kryo的序列化保存,以減少內存的使用 */ sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") /** * 對rdd進行壓縮,使用內存空間換去處理時間的方式,減少內存的使用 */ sparkConf.set("spark.rdd.compress", "true") /** * */ sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100") /** * 進行優雅的停止程序 */ sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") /** * 每兩秒執行一個批次 */ val ssc = new StreamingContext(sparkConf, Seconds(2)) /** * 獲取kafka的數據 * LocationStrategies:位置策略,如果kafka的broker節點與Excutor在同一台機器上給一種策略,不再一台機器上給另一種策略 * 設定策略之后會以最有的策略進行獲取數據 * 一般在企業中kafka節點與Excutor不會放到一台機器的,原因是kafka是消息存儲的,Executor是用來做消息計算的 * 因此計算與存儲需要分開,存儲對磁盤要求高,計算對內存和cpu的要求更高 * 如果Executor節點跟Broker的節點在一起的話就使用PreferBrokers策略,不再一起的話就使用preferConsisent策略 * 使用preferConsisent策略的話,將來在kafka中拉去數據以后盡量將數據分散到所有的Executor上 */ val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams)) stream.foreachRDD(rdd => { /** * { * "bussinessRst": "0000", * "channelCode": "0705", * "chargefee": "10000", * "clientIp": "125.82.117.133", * "endReqTime": "20170412080609613", * "idType": "01", * "interFacRst": "0000", * "logOutTime": "20170412080609613", * "orderId": "384681890175026754", * "prodCnt": "1", * "provinceCode": "280", * "requestId": "20170412080450886738519397327610", * "retMsg": "成功", * "serverIp": "172.16.59.241", * "serverPort": "8088", * "serviceName": "sendRechargeReq", * "shouldfee": "9950", * "startReqTime": "20170412080609503", * "sysId": "15" * } */ /** * 業務邏輯: * serviceName:reChargeNotifyReq,則為充值通知的記錄 * requestId:包含充值的日期(訂單開始時間) * bussinessRst:是否成功 0000 為成功,其他為不成功 * chargefee:充值的金額 * receiveNotifyTime:訂單結束時間 * */ /** * 我們可以通過serviceName字段來確定,如果該字段是reChargeNotifyReq則代表該條數據是充值通知部分的數據。 * 獲取所有的充值通知日志 */ val baseData = rdd.map(cr => { print(cr.value()) JSON.parseObject(cr.value()) }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache() /** * 獲取每天充值成功的訂單筆數 * 回憶: * wordcount flatMap-》map-》reduceByKey */ val totalSucc = baseData.map(obj=> { //獲取日期 val reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //取出該條充值是否成功的標志 val result = obj.getString("bussinessRst") val flag = if(result.equals("0000")) 1 else 0 (day, flag) }).reduceByKey(_+_) /** * 獲取充值成功的訂單金額 */ val totalMoney = baseData.map(obj=> { val reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //去除該條充值是否成功的標記 val result = obj.getString("bussinessRst") val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0 (day, fee) }).reduceByKey(_+_) //總訂單數 val total = baseData.count() /** * 獲取充值成功的充值時長 */ val totalTime = baseData.map(obj=> { var reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //取出該條充值是否成功的標示 val result = obj.getString("bussinessRst") //時間格式為:yyyyMMddHHmissSSS val endTime = obj.getString("receiveNotifyTime") val startTime = reqId.substring(0, 17) val format = new SimpleDateFormat("yyyyMMddHHmissSSS") val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0 (day, cost) }).reduceByKey(_+_) /** * 將數據存儲到redis中: * (CMCC-20170412,35) */ totalSucc.foreachPartition(itr=> { val jedis = Jpools.getJedis itr.foreach(tp => { // print("CMCC-"+tp._1, tp._2) jedis.incrBy("CMCC-"+tp._1, tp._2) }) }) }) ssc.start() ssc.awaitTermination() } }
兩個工具類:
package utils import com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.StringDeserializer object AppParams { /**Scala中使用關鍵字lazy來定義惰性變量,實現延遲加載(懶加載)。 惰性變量只能是不可變變量,並且只有在調用惰性變量時,才會去實例化這個變量。 load中可以指定相應的配置文件,但是不指定的情況下默認去讀取resources下的application.conf文件 默認規則:application.conf->application.json->application.properties **/ private lazy val config = ConfigFactory.load() val redisHost = config.getString("redis.host") val selectDBIndex = config.getInt("redis.db.index") /** * 返回訂閱的主題 */ val topic = config.getString("kafka.topic").split(",") /** * kafka集群所在的主機和端口 */ val brokers:String = config.getString("kafka.broker.list") /** * 消費者的id */ val groupId = config.getString("kafka.group.id") /** * 將kafka的相關參數進行分裝到map中 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer"-> classOf[StringDeserializer], "group.id"-> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) }
package utils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool /** * 創建一個redis的線程池 */ object Jpools { private val poolConfig = new GenericObjectPoolConfig poolConfig.setMaxIdle(5) //最大的空閑連接數為5,連接池中最大的空閑連接數,默認是8 poolConfig.setMaxTotal(2000) //最大支持的連接數量,默認也是8 //連接池是私有的,不能對外進行公開訪問 private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost) def getJedis = { val jedis = jedisPool.getResource jedis.select(AppParams.selectDBIndex) jedis } }
pom文件
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- 導入kafka的依賴--> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> </dependency>--> <!-- 指定kafka-client API的版本--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> <!-- 導入spark streaming 與kafka的依賴包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies>
問題總結:
1.json格式的轉換 (已解決)
2.flume讀取數據到kafka后數據亂碼增多問題(已解決)
3.flume spooldir 讀取文件的同時對文件更改造成的java.lang.IllegalStateException:File has been modified since being read:問題 (待解決)
4.上述spark主程序代碼優化問題 (待解決)