企業介紹:
中國電信上海理想信息產業(集團)有限公司,成立於1999年,注冊資本7000萬元,是上海市投資規模較大的信息技術企業之一。母公司員工500多人,其中80%以上員工具有大學本科以上學歷,從事軟件開發人員超過50%,是一個典型的“知識密集型”企業。
通過整合公司內各事業部多年大型項目實施的整體實力,公司着力鍛造大型信息化項目咨詢規划和頂層設計能力,構建“智慧社區”、“智慧園區”及“智慧政務”、“智慧醫療”、“智慧物流”等各類智慧行業應用等整體解決方案,可提供IT外包服務和網絡監控運維管理一站式安全解決方案,逐步形成“智慧城市”專業領域產品研發積累和項目交付與平台運營經驗,鍛造了整體科研隊伍和項目實施團隊的綜合實力。
我們致力於中國IT產業發展,借助中國電信精品網絡資源,定位於電信與IT產業融合的ICT服務商形象(ICT即“Information Communication Technology”),為社會信息化、企業信息化和家庭信息化提供全方位、專業化的應用集成服務。
可以參考網址:http://www.ideal.sh.cn/public/idealout/contentPreviewLinkDetail.htm?param=gsgk&pageCode=qyjs
項目介紹:
數據總線平台是基於spark+spring+Mybatis體系而開發的一個集ETL、智能調度功能為一體的互聯網操作平台,平台以工業連接為基礎,構建在安全可信的天翼雲上,是可靈活擴展的工業互聯網和工業大數據平台。
該平台主要功能如下:
1.提供基於hdfs、tdengine、hive、mysql、oralce、ftp等十余種數據源的快速數據加載
2.對數據進行kv2table,table2kv,缺失值替換,增加序號列,過濾,類型轉換,sql,機器學習模型計算等多種數據處理
3.將處理好的數據存入hdfs、tdengine、hive、mysql、oralce、ftp等目標組件
該平台通過web界面來對組件進行配置,基本擺脫了代碼的編寫,不會寫代碼的工作人員也可進行開發
spark+TDengine使用過程
1.tdengine的安裝
請參考官方文檔
2.在tdengine中建立測試庫和測試表
taos> create database test;
taos>use test;
#這里我們創建一個和tdengine自帶庫log中log表結構一致的表,后提直接從log.log讀數據存儲到test.log_cp
taos> create table log_cp(
-> ts TIMESTAMP,
-> level TINYINT,
-> content BINARY(80),
-> ipaddr BINARY(15)
-> )
3.spark 讀取tdengine
因為tdengine並未提供供spark調用的DataSource,而且tdengine本身也支持jdbc,所以這里使用了spark-jdbc
來讀取tdengine,最新的jdbc可以到官網下載,我這里用的是如下版本:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>1.0.3</version>
</dependency>
關於使用jdbc,官網有如下提示:
由於 TDengine 是使用 c 語言開發的,使用 taos-jdbcdriver 驅動包時需要依賴系統對應的本地函數庫。
1.libtaos.so 在 linux 系統中成功安裝 TDengine 后,依賴的本地函數庫 libtaos.so 文件會被自動拷貝至 /usr/lib/libtaos.so,該目錄包含在 Linux 自動掃描路徑上,無需單獨指定。
2.taos.dll 在 windows 系統中安裝完客戶端之后,驅動包依賴的 taos.dll 文件會自動拷貝到系統默認搜索路徑 C:/Windows/System32 下,同樣無需要單獨指定。
第一次使用時,為了保證機器上有libtaos.so 或 taos.dll,需要在本地安裝tdengine客戶端(win客戶端,linux客戶端)
spark的讀取代碼如下:
val jdbccdf = spark
.read
.format("jdbc")
.option("url", "jdbc:TAOS://192.168.1.151:6030/log")
.option("driver", "com.taosdata.jdbc.TSDBDriver")
.option("dbtable", "log")
.option("user", "root")
.option("password", "taosdata")
.option("fetchsize", "1000")
.load()
4.spark 存tdengine
因為在讀tdengine的時候,第一個字段ts會被轉換為decimal,但是存儲時直接存decimal tdengine是不認的,
所以需要將ts進行類型轉換
jdbccdf.select(($"ts" / 1000000).cast(TimestampType).as("ts"), $"level", $"content", $"ipaddr")
.write.format("jdbc")
.option("url", "jdbc:TAOS://192.168.1.151:6030/test?charset=UTF-8&locale=en_US.UTF-8")
.option("driver", "com.taosdata.jdbc.TSDBDriver")
.option("dbtable", "log2")
.option("user", "root")
.option("password", "taosdata")
.mode(SaveMode.Append)
.save()
5.spark yarn 模式運行tdengine
上面的測試都是基於maser 為local測試的,如果以yarn模式運行,則在每個節點上都安裝tdengineclient那是不現實的,
查看taos-jdbcdriver的代碼,發現,driver會執行System.load("taos"),也就是說只要java.library.path 中存在
libtaos.so,程序就可正常運行,不必安裝tdengine的客戶端,因為java.library.path是在jvm啟動時候就設置好的,要
更改它的值,可以采用動態加載,采用如下方法
解決了加載libtaos.so的問題
1.將driver端libtaos.so發送到各個executor
spark.sparkContext.addFile("/path/to/libtaos.so")
2.重寫spark 中JdbcUtils類中的createConnectionFactory方法 ,添加loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent)
進行java.library.path的動態加載
def createConnectionFactory(options: JDBCOptions): () => Connection = {
val driverClass: String = options.driverClass
() => {
loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent)
DriverRegistry.register(driverClass)
val driver: Driver = DriverManager.getDrivers.asScala.collectFirst {
case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d
case d if d.getClass.getCanonicalName == driverClass => d
}.getOrElse {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
driver.connect(options.url, options.asConnectionProperties)
}
}
3.loadLibrary方法如下
def loadLibrary(libPath: String): Unit = {
var lib = System.getProperty("java.library.path")
val dirs = lib.split(":")
if (!dirs.contains(libPath)) {
lib = lib + s":${libPath}"
System.setProperty("java.library.path", lib)
val fieldSysPath = classOf[ClassLoader].getDeclaredField("sys_paths")
fieldSysPath.setAccessible(true)
fieldSysPath.set(null, null)
}
}
在yarn 模式下一定要給url設置 charset 和 locale ,如charset=UTF-8&locale=en_US.UTF-8,否則container可能會異常退出
6.libtaos.so 其他加載方式
本來還嘗試了jna加載libtaos.so的方式,此方式只需將libtaos.so 放入項目resources 中,程序變回自動搜索到so文件,奈何
不會改tdengine中c的代碼
作者介紹:
董鴻飛,大數據開發工程師,2015年加入上海理想大數據實施部,工作至今。目前主要負責公司數據總線產品設計和開發。