Spark Streaming可以用於實時流項目的開發,實時流項目的數據源除了可以來源於日志、文件、網絡端口等,常常也有這種需求,那就是實時分析處理MySQL中的增量數據。面對這種需求當然我們可以通過JDBC的方式定時查詢Mysql,然后再對查詢到的數據進行處理也能得到預期的結果,但是Mysql往往還有其他業務也在使用,這些業務往往比較重要,通過JDBC方式頻繁查詢會對Mysql造成大量無形的壓力,甚至可能會影響正常業務的使用,在基本不影響其他Mysql正常使用的情況下完成對增量數據的處理,那就需要 Canal 了。
假設Mysql中 canal_test 庫下有一張表 policy_cred ,需要統計實時統計 policy_status 狀態為1的 mor_rate 的的變化趨勢,並標注比率的風險預警等級。
1. Canal
Canal [kə'næl] 是阿里巴巴開源的純java開發的基於數據庫binlog的增量訂閱&消費組件。Canal的原理是模擬為一個Mysql slave的交互協議,偽裝自己為MySQL slave,向Mysql Master發送dump協議,然后Mysql master接收到這個請求后將binary log推送給slave(也就是Canal),Canal解析binary log對象。詳細可以查閱Canal的官方文檔[alibaba/canal wiki]。
1.1 Canal 安裝
Canal的server mode在1.1.x版本支持的有TPC、Kafka、RocketMQ。本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos7,其他環境為:jdk8、Scala 2.11、Mysql、Zookeeper、Kafka。
1.1.1 准備
安裝Canal之前我們先把如下安裝好
Mysql
a. 如果沒有Mysql: 詳細的安裝過程可參考我的另一篇博客[Centos7環境下離線安裝mysql 5.7 / mysql 8.0]
b. 開啟Mysql的binlog。修改/etc/my.cnf,在[mysqld]下添加如下配置,改完之后重啟 Mysql /etc/init.d/mysql restart
[mysqld] #添加這一行就ok log-bin=mysql-bin #選擇row模式 binlog-format=ROW #配置mysql replaction需要定義,不能和canal的slaveId重復
server_id=1
c. 創建一個Mysql用戶並賦予相應權限,用於Canal使用
mysql> CREATE USER canal IDENTIFIED BY 'canal'; mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; mysql> FLUSH PRIVILEGES;
Zookeeper
因為安裝Kafka時需要Zookeeper,例如ZK安裝后地址為:cdh3:2181,cdh4:2181,cdh5:2181
Kafka
例如安裝后的地址為:node1:9092,node2:9092,node3:9092
安裝后創建一個Topic,例如創建一個 example
kafka-topics.sh --create --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --partitions 2 --replication-factor 1 --topic example
1.1.2 安裝Canal
1. 下載Canal
訪問Canal的Release頁 canal v1.1.2
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
2. 解壓
注意 這里一定要先創建出一個目錄,直接解壓會覆蓋文件
mkdir -p /usr/local/canal mv canal.deployer-1.1.2.tar.gz /usr/local/canal/ tar -zxvf canal.deployer-1.1.2.tar.gz
3. 修改instance 配置文件
vim $CANAL_HOME/conf/example/instance.properties,修改如下項,其他默認即可
## mysql serverId , v1.0.26+ will autoGen , 不要和server_id重復 canal.instance.mysql.slaveId=3 # position info。Mysql的url canal.instance.master.address=node1:3306 # table meta tsdb info canal.instance.tsdb.enable=false # 這里配置前面在Mysql分配的用戶名和密碼 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 # 配置需要檢測的庫名,可以不配置,這里只檢測canal_test庫 canal.instance.defaultDatabaseName=canal_test # enable druid Decrypt database password canal.instance.enableDruid=false # 配置過濾的正則表達式,監測canal_test庫下的所有表 canal.instance.filter.regex=canal_test\\..* # 配置MQ ## 配置上在Kafka創建的那個Topic名字 canal.mq.topic=example ## 配置分區編號為1 canal.mq.partition=1
4. 修改canal.properties配置文件 vim $CANAL_HOME/conf/canal.properties,修改如下項,其他默認即可 # 這個是如果開啟的是tcp模式,會占用這個11111端口,canal客戶端通過這個端口獲取數據 canal.port = 11111 # 可以配置為:tcp, kafka, RocketMQ,這里配置為kafka canal.serverMode = kafka # 這里將這個注釋掉,否則啟動會有一個警告 #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = node1:9092,node2:9092,node3:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時 canal.mq.canalGetTimeout = 100 # 是否為flat json格式對象 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 #canal.mq.transaction = false
5. 啟動Canal
$CANAL_HOME/bin/startup.sh
6. 驗證
查看日志
啟動后會在logs下生成兩個日志文件:logs/canal/canal.log、logs/example/example.log,查看這兩個日志,保證沒有報錯日志。
如果是在虛擬機安裝,最好給2個核數以上。
確保登陸的系統的hostname可以ping通。
在Mysql數據庫中進行增刪改查的操作,然后查看Kafka的topic為 example 的數據
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic example
7. 關閉Canal
不用的時候一定要通過這個命令關閉,如果是用kill或者關機,當再次啟動依然會提示要先執行stop.sh腳本后才能再啟動。
$CANAL_HOME/bin/stop.sh
*1.2 Canal 客戶端代碼
如果我們不使用Kafka作為Canal客戶端,我們也可以用代碼編寫自己的Canal客戶端,然后在代碼中指定我們的數據去向。此時只需要將canal.properties配置文件中的canal.serverMode值改為tcp。編寫我們的客戶端代碼。
在Maven項目的pom中引入:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
編寫代碼:
/** * Canal客戶端。 * 注意:canal服務端只會連接一個客戶端,當啟用多個客戶端時,其他客戶端是就無法獲取到數據。所以啟動一個實例即可 * * @see <a href="https://github.com/alibaba/canal/wiki/ClientExample">官方文檔:ClientSample代碼</a> * * Created by yore on 2019/3/16 10:50 */
public class SimpleCanalClientExample { public static void main(String args[]) { /** * 創建鏈接 * SocketAddress: 如果提交到canal服務端所在的服務器上運行這里可以改為 new InetSocketAddress(AddressUtils.getHostIp(), 11111) * destination 通服務端canal.properties中的canal.destinations = example配置對應 * username: * password: */ CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("node1", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據
} System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); /** * 如果只對某些庫的數據操作,可以加如下判斷: * if("庫名".equals(entry.getHeader().getSchemaName())){ * //TODO option * } * * 如果只對某些表的數據變動操作,可以加如下判斷: * if("表名".equals(entry.getHeader().getTableName())){ * //todo option * } * */
for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
本地運行上述代碼,我們修改Mysql數據中的數據,可在控制台中看到數據的改變:
empty count : 20 empty count : 21 empty count : 22
================> binlog[mysql-bin.000009:1510] , name[canal_test,customer] , eventType : INSERT id : 4 update=true name : spark update=true empty count : 1 empty count : 2 empty count : 3
2. Spark
通過上一步我們已經能夠獲取到 canal_test 庫的變化數據,並且已經可將將變化的數據實時推送到Kafka中,Kafka中接收到的數據是一條Json格式的數據,我們需要對 INSERT 和 UPDATE 類型的數據處理,並且只處理狀態為1的數據,然后需要計算 mor_rate 的變化,並判斷 mor_rate 的風險等級,0-75%為G1等級,75%-80%為R1等級,80%-100%為R2等級。最后將處理的結果保存到DB,可以保存到Redis、Mysql、MongoDB,或者推送到Kafka都可以。這里是將結果數據保存到了Mysql。
2.1 在Mysql中創建如下兩張表
-- 在canal_test庫下創建表 CREATE TABLE `policy_cred` ( p_num varchar(22) NOT NULL, policy_status varchar(2) DEFAULT NULL COMMENT '狀態:0、1', mor_rate decimal(20,4) DEFAULT NULL, load_time datetime DEFAULT NULL, PRIMARY KEY (`p_num`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- 在real_result庫下創建表 CREATE TABLE `real_risk` ( p_num varchar(22) NOT NULL, risk_rank varchar(8) DEFAULT NULL COMMENT '等級:G1、R1、R2', mor_rate decimal(20,4) , ch_mor_rate decimal(20,4), load_time datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.2 Spark代碼開發:
2.2.1 在resources下new一個項目的配置文件my.properties
## spark # spark://cdh3:7077
spark.master=local[2] spark.app.name=m_policy_credit_app spark.streaming.durations.sec=10 spark.checkout.dir=src/main/resources/checkpoint ## Kafka bootstrap.servers=node1:9092,node2:9092,node3:9092 group.id=m_policy_credit_gid # latest, earliest, none auto.offset.reset=latest enable.auto.commit=false kafka.topic.name=example ## Mysql mysql.jdbc.driver=com.mysql.jdbc.Driver mysql.db.url=jdbc:mysql://node1:3306/real_result
mysql.user=root mysql.password=123456 mysql.connection.pool.size=10
2.2.2 在pom.xml文件中引入如下
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.0</spark.version>
<canal.client.version>1.1.2</canal.client.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark -->
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
package yore.spark import java.util.Properties /** * Properties的工具類 * * Created by yore on 2018-06-29 14:05 */
object PropertiesUtil { private val properties: Properties = new Properties /** * * 獲取配置文件Properties對象 * * @author yore * @return java.util.Properties * date 2018/6/29 14:24 */ def getProperties() :Properties = { if(properties.isEmpty){ //讀取源碼中resource文件夾下的my.properties配置文件
val reader = getClass.getResourceAsStream("/my.properties") properties.load(reader) } properties } /** * * 獲取配置文件中key對應的字符串值 * * @author yore * @return java.util.Properties * @date 2018/6/29 14:24 */ def getPropString(key : String) : String = { getProperties().getProperty(key) } /** * * 獲取配置文件中key對應的整數值 * * @author yore * @return java.util.Properties * @date 2018/6/29 14:24 */ def getPropInt(key : String) : Int = { getProperties().getProperty(key).toInt } /** * * 獲取配置文件中key對應的布爾值 * * @author yore * @return java.util.Properties * @date 2018/6/29 14:24 */ def getPropBoolean(key : String) : Boolean = { getProperties().getProperty(key).toBoolean } }
2.2.4 在scala源碼目錄下的包下編寫數據庫操作的工具類
package yore.spark import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, SQLException} import java.util.concurrent.LinkedBlockingDeque import scala.collection.mutable.ListBuffer /** * * Created by yore on 2018/11/14 20:34 */
object JDBCWrapper { private var jdbcInstance : JDBCWrapper = _ def getInstance() : JDBCWrapper = { synchronized{ if(jdbcInstance == null){ jdbcInstance = new JDBCWrapper() } } jdbcInstance } } class JDBCWrapper { // 連接池的大小
val POOL_SIZE : Int = PropertiesUtil.getPropInt("mysql.connection.pool.size") val dbConnectionPool = new LinkedBlockingDeque[Connection](POOL_SIZE) try Class.forName(PropertiesUtil.getPropString("mysql.jdbc.driver")) catch { case e: ClassNotFoundException => e.printStackTrace() } for(i <- 0 until POOL_SIZE){ try{ val conn = DriverManager.getConnection( PropertiesUtil.getPropString("mysql.db.url"), PropertiesUtil.getPropString("mysql.user"), PropertiesUtil.getPropString("mysql.password")); dbConnectionPool.put(conn) }catch { case e : Exception => e.printStackTrace() } } def getConnection(): Connection = synchronized{ while (0 == dbConnectionPool.size()){ try{ Thread.sleep(20) }catch { case e : InterruptedException => e.printStackTrace() } } dbConnectionPool.poll() } /** * 批量插入 * * @param sqlText sql語句字符 * @param paramsList 參數列表 * @return Array[Int] */ def doBatch(sqlText: String, paramsList: ListBuffer[ParamsList]): Array[Int] = { val conn: Connection = getConnection() var ps: PreparedStatement = null
var result: Array[Int] = null
try{ conn.setAutoCommit(false) ps = conn.prepareStatement(sqlText) for (paramters <- paramsList) { paramters.params_Type match { case "real_risk" => { println("$$$\treal_risk\t" + paramsList) // // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
ps.setObject(1, paramters.p_num) ps.setObject(2, paramters.risk_rank) ps.setObject(3, paramters.mor_rate) ps.setObject(4, paramters.ch_mor_rate) ps.setObject(5, paramters.load_time) } } ps.addBatch() } result = ps.executeBatch conn.commit() } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) try { ps.close() } catch { case e: SQLException => e.printStackTrace() } if (conn != null) try { dbConnectionPool.put(conn) } catch { case e: InterruptedException => e.printStackTrace() } } result } }
2.2.5 在scala源碼目錄下的包下編寫Spark程序代碼
package yore.spark import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer /** * * Created by yore on 2019/3/16 15:11 */
object M_PolicyCreditApp { def main(args: Array[String]): Unit = { // 設置日志的輸出級別
Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf() .setMaster(PropertiesUtil.getPropString("spark.master")) .setAppName(PropertiesUtil.getPropString("spark.app.name")) // !!必須設置,否則Kafka數據會報無法序列化的錯誤
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //如果環境中已經配置HADOOP_HOME則可以不用設置hadoop.home.dir
System.setProperty("hadoop.home.dir", "/Users/yoreyuan/soft/hadoop-2.9.2") val ssc = new StreamingContext(conf, Seconds(PropertiesUtil.getPropInt("spark.streaming.durations.sec").toLong)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint(PropertiesUtil.getPropString("spark.checkout.dir")) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> PropertiesUtil.getPropString("bootstrap.servers"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> PropertiesUtil.getPropString("group.id"), "auto.offset.reset" -> PropertiesUtil.getPropString("auto.offset.reset"), "enable.auto.commit" -> (PropertiesUtil.getPropBoolean("enable.auto.commit"): java.lang.Boolean) ) val topics = Array(PropertiesUtil.getPropString("kafka.topic.name")) val kafkaStreaming = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) kafkaStreaming.map[JSONObject](line => { // str轉成JSONObject
println("$$$\t" + line.value()) JSON.parseObject(line.value) }).filter(jsonObj =>{ // 過濾掉非 INSERT和UPDATE的數據
if(null == jsonObj || !"canal_test".equals(jsonObj.getString("database")) ){ false }else{ val chType = jsonObj.getString("type") if("INSERT".equals(chType) || "UPDATE".equals(chType)){ true }else{ false } } }).flatMap[(JSONObject, JSONObject)](jsonObj => { // 將改變前和改變后的數據轉成Tuple
var oldJsonArr: JSONArray = jsonObj.getJSONArray("old") val dataJsonArr: JSONArray = jsonObj.getJSONArray("data") if("INSERT".equals(jsonObj.getString("type"))){ oldJsonArr = new JSONArray() val oldJsonObj2 = new JSONObject() oldJsonObj2.put("mor_rate", "0") oldJsonArr.add(oldJsonObj2) } val result = ListBuffer[(JSONObject, JSONObject)]() for(i <- 0 until oldJsonArr.size ) { val jsonTuple = (oldJsonArr.getJSONObject(i), dataJsonArr.getJSONObject(i)) result += jsonTuple } result }).filter(t => { // 過濾狀態不為1的數據,和mor_rate沒有改變的數據
val policyStatus = t._2.getString("policy_status") if(null != policyStatus && "1".equals(policyStatus) && null!= t._1.getString("mor_rate")){ true }else{ false } }).map(t => { val p_num = t._2.getString("p_num") val nowMorRate = t._2.getString("mor_rate").toDouble val chMorRate = nowMorRate - t._1.getDouble("mor_rate") val riskRank = gainRiskRank(nowMorRate) // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
(p_num, riskRank, nowMorRate, chMorRate, new java.util.Date) }).foreachRDD(rdd => { rdd.foreachPartition(p => { val paramsList = ListBuffer[ParamsList]() val jdbcWrapper = JDBCWrapper.getInstance() while (p.hasNext){ val record = p.next() val paramsListTmp = new ParamsList paramsListTmp.p_num = record._1 paramsListTmp.risk_rank = record._2 paramsListTmp.mor_rate = record._3 paramsListTmp.ch_mor_rate = record._4 paramsListTmp.load_time = record._5 paramsListTmp.params_Type = "real_risk" paramsList += paramsListTmp } /** * VALUES(p_num, risk_rank, mor_rate, ch_mor_rate, load_time) */ val insertNum = jdbcWrapper.doBatch("INSERT INTO real_risk VALUES(?,?,?,?,?)", paramsList) println("INSERT TABLE real_risk: " + insertNum.mkString(", ")) }) }) ssc.start() ssc.awaitTermination() } def gainRiskRank(rate: Double): String = { var result = ""
if(rate>=0.75 && rate<0.8){ result = "R1" }else if(rate >=0.80 && rate<=1){ result = "R2" }else{ result = "G1" } result } } /** * 結果表對應的參數實體對象 */
class ParamsList extends Serializable{ var p_num: String = _ var risk_rank: String = _ var mor_rate: Double = _ var ch_mor_rate: Double = _ var load_time:java.util.Date = _ var params_Type : String = _ override def toString = s"ParamsList($p_num, $risk_rank, $mor_rate, $ch_mor_rate, $load_time)" }
3. 測試
啟動 ZK、Kafka、Canal。
在 canal_test 庫下的 policy_cred 表中插入或者修改數據,
然后查看 real_result 庫下的 real_risk 表中結果。
更新一條數據時Kafka接收到的json數據如下(這是canal投送到Kafka中的數據格式,包含原始數據、修改后的數據、庫名、表名等信息):
{ "data": [ { "p_num": "1", "policy_status": "1", "mor_rate": "0.8800", "load_time": "2019-03-17 12:54:57" } ], "database": "canal_test", "es": 1552698141000, "id": 10, "isDdl": false, "mysqlType": { "p_num": "varchar(22)", "policy_status": "varchar(2)", "mor_rate": "decimal(20,4)", "load_time": "datetime" }, "old": [ { "mor_rate": "0.5500" } ], "sql": "", "sqlType": { "p_num": 12, "policy_status": 12, "mor_rate": 3, "load_time": 93 }, "table": "policy_cred", "ts": 1552698141621, "type": "UPDATE" }
查看Mysql中的結果表
4、出現的問題
在開發Spark代碼有時項目可能會引入大量的依賴包,依賴包之間可能就會發生沖突,比如發生如下錯誤:
Exception in thread "main" java.lang.NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.<init>(ZIIIIIIIZ)V at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:120) at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:106) at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:99) at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:71) at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:461) at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257) at org.apache.spark.SparkContext.<init>(SparkContext.scala:424) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:838) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85) at yore.spark.M_PolicyCreditApp$.main(M_PolicyCreditApp.scala:33) at yore.spark.M_PolicyCreditApp.main(M_PolicyCreditApp.scala)
我們可以在項目的根目錄下的命令窗口中輸人:mvn dependency:tree -Dverbose> dependency.log
然后可以在項目根目錄下生產一個dependency.log文件,查看這個文件,在文件中搜索 io.netty 關鍵字,找到其所在的依賴包:
然就在canal.client將io.netty排除掉
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>