通过spark实现点击流日志分析案例
1. 访问的pv
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object PV { def main(args: Array[String]): Unit = { //todo:创建sparkconf,设置appName //todo:setMaster("local[2]")在本地模拟spark运行 这里的数字表示 使用2个线程 val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]") //todo:创建SparkContext val sc: SparkContext = new SparkContext(sparkConf) //todo:读取数据 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //todo:将一行数据作为输入,输出("pv",1) val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1)) //todo:聚合输出 val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_) totalPV.foreach(println) sc.stop() } }
2. 访问的uv
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object UV { def main(args: Array[String]): Unit = { //todo:构建SparkConf和 SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]") val sc: SparkContext = new SparkContext(sparkConf) //todo:读取数据 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //todo:对每一行分隔,获取IP地址 val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0)) //todo:对ip地址进行去重,最后输出格式 ("UV",1) val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1)) //todo:聚合输出 val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_) totalUV.foreach(println) //todo:数据结果保存 totalUV.saveAsTextFile("d:\\data\\out") sc.stop() } }
3. 访问的topN
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 求访问的topN */ object TopN { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]") val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //读取数据 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //将一行数据作为输入,输出(来源URL,1) val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1)) //聚合 排序-->降序 val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false) //通过take取topN,这里是取前5名 val finalResult: Array[(String, Int)] = result.take(5) println(finalResult.toBuffer) sc.stop() } }
通过Spark实现ip地址查询
1. 需求分析
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
2. 技术调研
因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。
3. 架构设计
搭建spark集群
4. 开发流程
4.1. 数据准备
4.2. ip日志信息
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
4.3. 城市ip段信息
5. 代码开发
5.1. 思路
1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,维度
2、 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
3、 比较的时候采用二分法查找,找到对应的经度和维度
4、 然后对经度和维度做单词计数
5.2. 代码
package cn.itcast.IPlocaltion import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * ip地址查询 */ object IPLocaltion_Test { def main(args: Array[String]): Unit = { //todo:创建sparkconf 设置参数 val sparkConf: SparkConf = new SparkConf().setAppName("IPLocaltion_Test").setMaster("local[2]") //todo:创建SparkContext val sc = new SparkContext(sparkConf) //todo:读取基站数据 val data: RDD[String] = sc.textFile("d:\\data\\ip.txt") //todo:对基站数据进行切分 ,获取需要的字段 (ipStart,ipEnd,城市位置,经度,纬度) val jizhanRDD: RDD[(String, String, String, String, String)] = data.map(_.split("\\|")).map( x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8), x(13), x(14))) //todo:获取RDD的数据 val jizhanData: Array[(String, String, String, String, String)] = jizhanRDD.collect() //todo:广播变量,一个只读的数据区,所有的task都能读到的地方 val jizhanBroadcast: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanData) //todo:读取目标数据 val destData: RDD[String] = sc.textFile("d:\\data\\20090121000132.394251.http.format") //todo:获取数据中的ip地址字段 val ipData: RDD[String] = destData.map(_.split("\\|")).map(x=>x(1)) //todo:把IP地址转化为long类型,然后通过二分法去基站数据中查找,找到的维度做wordCount val result=ipData.mapPartitions(iter=>{ //获取广播变量中的值 val valueArr: Array[(String, String, String, String, String)] = jizhanBroadcast.value //todo:操作分区中的itertator iter.map(ip=>{ //将ip转化为数字long val ipNum:Long=ipToLong(ip) //拿这个数字long去基站数据中通过二分法查找,返回ip在valueArr中的下标 val index:Int=binarySearch(ipNum,valueArr) //根据下标获取对一个的经纬度 val tuple = valueArr(index) //返回结果 ((经度,维度),1) ((tuple._4,tuple._5),1) }) }) //todo:分组聚合 val resultFinal: RDD[((String, String), Int)] = result.reduceByKey(_+_) //todo:打印输出 resultFinal.foreach(println) //todo:将结果保存到mysql表中 resultFinal.map(x=>(x._1._1,x._1._2,x._2)).foreachPartition(data2Mysql) sc.stop() } //todo:ip转为long类型 def ipToLong(ip: String): Long = { //todo:切分ip地址。 val ipArray: Array[String] = ip.split("\\.") var ipNum=0L for(i <- ipArray){ ipNum=i.toLong | ipNum << 8L } ipNum } //todo:通过二分查找法,获取ip在广播变量中的下标 def binarySearch(ipNum: Long, valueArr: Array[(String, String, String, String, String)]): Int ={ //todo:口诀:上下循环寻上下,左移右移寻中间 //开始下标 var start=0 //结束下标 var end=valueArr.length-1 while(start<=end){ val middle=(start+end)/2 if(ipNum>=valueArr(middle)._1.toLong && ipNum<=valueArr(middle)._2.toLong){ return middle } if(ipNum > valueArr(middle)._2.toLong){ start=middle } if(ipNum<valueArr(middle)._1.toLong){ end=middle } } -1 } //todo:数据保存到mysql表中 def data2Mysql(iterator:Iterator[(String,String, Int)]):Unit = { //todo:创建数据库连接Connection var conn:Connection=null //todo:创建PreparedStatement对象 var ps:PreparedStatement=null //todo:采用拼占位符问号的方式写sql语句。 var sql="insert into iplocaltion(longitude,latitude,total_count) values(?,?,?)" //todo:获取数据连接 conn=DriverManager.getConnection("jdbc:mysql://itcast01:3306/spark","root","root123") //todo: 选中想被try/catch包围的语句 ctrl+alt+t 快捷键选中try/catch/finally try { iterator.foreach(line=> { //todo:预编译sql语句 ps = conn.prepareStatement(sql) //todo:对占位符设置值,占位符顺序从1开始,第一个参数是占位符的位置,第二个参数是占位符的值。 ps.setString(1, line._1) ps.setString(2, line._2) ps.setLong(3, line._3) //todo:执行 ps.execute() }) } catch { case e:Exception =>println(e) } finally { if(ps!=null){ ps.close() } if (conn!=null){ conn.close() } } } }