Structrued Streaming業務數據實時分析


 

 

 

 

 

先啟動spark-shell,記得啟動nc服務

 

輸入以下代碼

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 20:55:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> val query = wordCounts.writeStream.outputMode("complete").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4e260e04

 

 

 在nc輸入幾個單詞

 

 

 

 我們再輸入一些單詞

 

 

 

 

我們改一下代碼換成update模式

首先重新啟動一次spark-shell,記得啟動nc

 

 

 

 

 

 

換成append模式

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 21:32:30 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val query = words.writeStream.outputMode("append").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19d85bbe

 

 

 

 

 

 

 

 

因為我們之前的kafka的版本低了,我下載一個0.10.0版本的

下載地址 http://kafka.apache.org/downloads

 我們把kafka0.9版本的配置文件直接復制過來

為了快一點我直接在虛擬機里操作了

復制這幾個配置文件

把kafka0.10的覆蓋掉

 

 修改一下配置文件

 

 把kafka分發都另外的兩個節點去

 

 

 

在節點2和節點3也把相應的配置文件修改一下

server.properties

 

 

 

 

 

在idea里重新建一個scala類

 

 

加上如下代碼

 

package com.spark.test

import org.apache.spark
import org.apache.spark.sql.SparkSession

object StructuredStreamingKafka {
  def main(args: Array[String]): Unit = {
     val spark=SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()
    import spark.implicits._
   val lines= df.selectExpr("CAST(value AS STRING)").as[String]

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

 

跑一下我們的程序

 如果報錯了提示需要0.10版本的可以先不用管

我們啟動一下kafka

 

 

 

 可以看到程序已經在跑了

 

 

我們在kafak里創建一個生產者

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs

 

 

 我們輸入幾個單詞

 

 

可以看到idea這邊的結果

 

我們可以換成update模式

 

 程序跑起來了

 

輸入單詞

 

 這個是運行的結果

 

 

 

 我們把包上傳上來(3個節點都這樣做)

 

 

啟動spark-shell

 

把代碼拷貝進來

 

 val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()
    import spark.implicits._
   val lines= df.selectExpr("CAST(value AS STRING)").as[String]

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()

 

 這個時候一定要保持kafka和生產者是開啟的

 我在生產者這邊輸入幾個單詞

 

 回到spark-shell界面可以看到統計結果

 

 

 

 

 

我們先把mysqld的test數據庫的webCount的表的內容清除

 

打開idea,我們編寫兩個程序

 

package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime

/**
  * Created by Administrator on 2017/10/16.
  */
object StructuredStreamingKafka {

  case class Weblog(datatime:String,
                    userid:String,
                    searchname:String,
                    retorder:String,
                    cliorder:String,
                    cliurl:String)

  def main(args: Array[String]): Unit = {

    val spark  = SparkSession.builder()
      .master("local[2]")
      .appName("streaming").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
    val lines = df.selectExpr("CAST(value AS STRING)").as[String]
    val weblog = lines.map(_.split(","))
      .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5)))
    val titleCount = weblog
      .groupBy("searchname").count().toDF("titleName","count")

    val url ="jdbc:mysql://bigdata-pro01.kfk.com:3306/test"
    val username="root"
    val password="root"

    val writer = new JDBCSink(url,username,password)
    val query = titleCount.writeStream
      .foreach(writer)
      .outputMode("update")
        //.format("console")
      .trigger(ProcessingTime("5 seconds"))
      .start()
    query.awaitTermination()
  }

}

 

 

package com.spark.test

import java.sql._
import java.sql.{Connection, DriverManager}
import org.apache.spark.sql.{ForeachWriter, Row}

/**
  * Created by Administrator on 2017/10/17.
  */
class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{

  var statement : Statement =_
  var resultSet : ResultSet =_
  var connection : Connection=_
  override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName("com.mysql.jdbc.Driver")
    //  connection = new MySqlPool(url,username,password).getJdbcConn();
    connection = DriverManager.getConnection(url,username,password);
      statement = connection.createStatement()
      return true
  }

  override def process(value: Row): Unit = {
    val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","")
    val count = value.getAs[Long]("count");

    val querySql = "select 1 from webCount " +
      "where titleName = '"+titleName+"'"

    val updateSql = "update webCount set " +
      "count = "+count+" where titleName = '"+titleName+"'"

    val insertSql = "insert into webCount(titleName,count)" +
      "values('"+titleName+"',"+count+")"

    try{


      var resultSet = statement.executeQuery(querySql)
      if(resultSet.next()){
        statement.executeUpdate(updateSql)
      }else{
        statement.execute(insertSql)
      }
    }catch {
      case ex: SQLException => {
        println("SQLException")
      }
      case ex: Exception => {
        println("Exception")
      }
      case ex: RuntimeException => {
        println("RuntimeException")
      }
      case ex: Throwable => {
        println("Throwable")
      }
    }

  }

  override def close(errorOrNull: Throwable): Unit = {
//    if(resultSet.wasNull()){
//      resultSet.close()
//    }
    if(statement==null){
      statement.close()
    }
    if(connection==null){
      connection.close()
    }
  }

}

 

在pom.xml文件里添加這個依賴包

 

<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

 

 

我在這里說一下這個依賴包版本的選擇上最好要跟你集群里面的依賴包版本一樣,不然可能會報錯的,可以參考hive里的Lib路徑下的版本

 

 

 

 

 保持集群的dfs,hbase,yarn,zookeeper,都是啟動的狀態

 

 

 

 啟動我們節點1和節點2的flume,在啟動之前我們先修改一下flume的配置,因為我們把jdk版本和kafka版本后面更換了,所以我們要修改配置文件(3個節點的都改)

 

 

 啟動節點1的flume

 

 啟動節點1的kafka

bin/kafka-server-start.sh config/server.properties

 

 

啟動節點2的flume

 

在節點2上把數據啟動起來,實時產生數據

 

 回到idea我們把程序運行一下

 

 

注意了,現在程序是沒有報錯的,因為我前期工作做得不是太好,給idea分配的內存小了,所以跑得很慢

 

 

 

回到mysql里面查看webCount表,已經有數據進來了

 

 

 

 

 

 

 

我們把配置文件修改如下

 

 

[client]
socket=/var/lib/mysql/mysql.sock
default-character-set=utf8

[mysqld]
character-set-server=utf8
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

[mysql]
default-character-set=utf8

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

 

 

 把表刪除了

 

 

 重新創建表

create table webCount( titleName varchar(255) CHARACTER SET utf8 DEFAULT NULL, count int(11) DEFAULT NULL )ENGINE=lnnoDB DEFAULT CHARSET=utf8;

 

 

重新在運行一次程序

 

 

可以看到沒有中文亂碼了。

 

同時我們通過可視化工具連接mysql查看

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM