spark快速大數據分析學習筆記


hadoop環境配置:

系統變量:新建變量HADOOP_HOME,值編輯為D:\sowt\hadoop
Administrator的用戶變量:在PATH中添加bin文件夾位置D:\sowt\hadoop\bin


 報錯排除

運行:

val lines = sc.textFile("README.md")//打開spark文件夾中的README.md文件

lines.count()//計算文件中的行數

報錯:

org.apache.hadoop.mapred.InvalidInputException:Input path does not exist:file:/C:Users/Administrator/README.md

原因:

WIN+R輸入spark-shell是在C:Users/Administrator打開spark-shell,這個文件夾下當然沒有README.md

解決:

打開spark安裝目錄,shfit+右鍵,點擊在此處打開命令行,輸入spark-shell


 概念

運行:

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")

val sc = new.SparkContext(conf)

最最重要的內容是SparkContext

SparkContext的基本用法,傳遞兩個參數:

集群URL:.setMaster(),告訴Spark如何連接到集群上,使用local可以讓Spark在本地單機單線程運行

應用名:.setAppName(),為Spark應用起名,當連接到集群時,可以使用這個名字在集群管理器的用戶界面找到自己的應用

 初始化SparkContext后,就可以創建RDD來進行操控

關閉Spark可以調用SparkContext的stop()方法,或者System.exit(0)或者sys.exit()退出應用


 RDD是一個不可變的分布式對象集合,每個RDD被分為多個分區,這些分區運行在集群中的不同節點上。

RDD創建:

val lines = sc.textFile("README.md")

RDD轉化生成新的RDD,返回結果是一個RDD,RDD行動操作返回的是其他的數據類型。

轉化操作:

val inputRDD = sc.textFile("log.txt")

val errorsRDD = inputRDD.filter(line => line.contains("error"))


 

map()、filter()、flatMap()的區別

map()接收一個函數如(x => x*x),將這個函數應用於每個元素,生成一個新的RDD,並將函數反回結果作為新RDD中的元素,兩個RDD中包含元素數量不變

filter()接收一個函數如(x!=1),將這個函數應用於每個元素,生成一個新的RDD,並講滿足該函數的結果返回作為新RDD中的元素,兩個RDD中包含元素的數量可能不同

flatMap()接收一個函數如(x => x.split(" ")),會將這個函數應用於每個元素,生成一個新的RDD,將函數返回結果中的內容作為新RDD中的元素,新的RDD中包含元素的數量可能較多


 

數據混洗(shuffle):對網絡中的所有RDD取並集(RDD具有集合的一些特性,但不是嚴格意義上的集合),以確保每個元素只有一份。

常見的偽集合操作會導致數據混洗:distinct(),union(),intersection(),sbutract(),caresian(),


 

使用SparkSQL從ORACLE中獲取數據

更多細節可以參考官方文檔http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.package

1、使用interllij IDEA編程,新建scala object,在新建的object中新建main函數,所有程序均放置在main函數中

2、新建一個本地spark任務

val spark = SparkSession
.builder()
.master("local[*]")//表示在本地運行spark任務
.appName("Spark SQL basic example")//定義spark任務名稱
.config("spark.some.config.option", "some-value")
.getOrCreate()//獲取一個已有的sparksession,如果沒有則新建一個
val jdbcDF = spark.read//返回一個具有format、option方法的DataFrameReader類型
.format("jdbc")
.option("url", "jdbc:oracle:thin:@***.**.**.**:1521:orcl")//遠程連接數據庫的地址與端口
.option("dbtable", "sys_dept")//需要獲取數據的表名
.option("user", "cougaerptest")//數據庫用戶賬號
.option("driver","oracle.jdbc.driver.OracleDriver")//從oracle中讀取數據,需要加載oracle驅動
.option("password", "******")//數據庫密碼
.load()//從指定的數據庫關系表中加載數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM