--》org.apache.spark幫助文檔
--》幫助文檔的使用
在左側搜索框中輸入包名
在右側中查看相應函數及其用法
例rdd中的RDD類具有函數repartition
則輸入rdd則會顯示相應類RDD
包-》類-》函數 的搜索
--》sparknet
用於連接caffe需要下載
--》spark-bin-hadoop
含有hadoop的spark
-->
下載程序包,解壓到目錄(設解壓后的為www)
gedit /etc/profile之后
XX_HOME=解壓的目錄/www
PATH=$XX_HOME/bin:$PATH
source /etc/profile
--》spark-submit用於提交任務
運行成功的一個在.submission.sh文件中有如下,運行./submission.sh即可
/home/jswang/hadoop/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class com.wang.FlumeEventCount --master spark://DELL-DL:7077 --jars /home/jswang/someconf/spark-streaming-flume_2.10-1.6.0.jar,/home/jswang/someconf/flume-ng-sdk-1.6.0.jar /home/jswang/someconf/savelocal.jar 166.111.135.99 33333
--》submit運行自帶例子
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] lib/spark-examples-1.5.2-hadoop2.6.0.jar 100
--》自己的hello,world及其spark-submit運行
1.新建工程:創建hello文件,下面創建src,lib文件
2.編輯文件:在src中創建hello.scala內容為hw.scala內容為
object Hi{def main(args:Array[String])=println("Hi!")}
3.編譯代碼:在工程目錄下執行sbt package,則在target目錄下的scala-2.10目錄生成了.jar文件
4.運行程序:在工程目錄下執行spark-submit --class Hi target/scala-2.10/hello_2.10-0.1-SNAPSHOT.jar(hello為工程文件夾的名稱)
則會顯示Hi!
--》自己的wordCount及其spark-submit運行
1.新建工程:創建wordCount文件夾,在其下創建,src,lib文件夾
在src下創建 main/scala文件夾,並在其下創建wordCount.scala文件
將spark-1.5.2-bin-hadoop2.6/lib下的spark-assembly-1.5.2-hadoop2.6.0.jar復制到該工程的lib中
2.在wordCount.scala文件中輸入下面的代碼:
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.SparkContext._
object wordCount{
def main(args:Array[String]){
if (args.length == 0) {
System.err.println("Usage bin/spark-submit [options] --class wordCount wordCount.jar <file1:URI>")
System.err.println("Usage bin/spark-submit [options] --class wordCount wordCount.jar hdfs://172.16.1.141:9000/test.txt")
System.exit(1);
}
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
val doc = sc.textFile(args(0)) //用textFile來加載一個文件創建RDD
println(doc.first)
println(doc.count)
}
}
3.編譯:回到工程的目錄(必須回到有src的目錄),開始使用下面代碼編譯
sbt package
則會在wordCount下創建target/scala-2.10/wordcount_2.10-0.1-SNAPSHOT.jar
4.運行:
spark-submit --class wordCount target/scala-2.10/wordcount_2.10-0.1-SNAPSHOT.jar
file:///home/zhaodz/sbt/wordCount/count.txt
則會看到 count.txt的第一行
--》啟動spark-shell
直接輸入spark-shell即可,其他參數默認
--》使用spark-shell運行計數
val textFile=sc.textFile("
file:///usr/local/spark-1.5.2-bin-hadoop2.6/README.md")
textFile.first()
textFile.count()
--》在spark-shell中創建DataFrame
1.輸入 spark-shell啟動spark shell
2.分別輸入如下內容
scala》》scala
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1c3cfced
scala》val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@684f9842
scala》val df = sqlContext.read.json("
file:///usr/local/spark-1.5.2-bin-hadoop2.6/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala》》df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
就像sql(或者)表格的查詢語句一樣
scala> val df_filter = df.filter("age>20")
scala> df_filter.show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
就像sql語句的過濾語句
scala>var df2 = df.repartition(2)
scala>df2.foreachPartition(a=>println(a.size))
2
1
3.注意
若是用$SPARK_HOME則軟件中必須設置
--》集合(數組)創建RDD
啟動spark-shell,分別輸入
val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int]
distData.reduce((a,b)=>a+b)
--》spark常用包
org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
--》saveAsTextFile的理解
>val a = Array(1,2,3,4,5)
>b = sc.parallelize(a)
>b.saveAsTextFile("
file:///home/zhaodz/keshan/keshanrdd")
將會在文件夾中看到很多part-n的文件,其中一些是沒有內容的,其余的分別為1-5個數值的存儲
>b.repartition(1).saveAsTextFile("
file:///home/zhaodz/keshan/keshantwo")
則只會看到一個part-00000文件里面分別有1 2 3 4 5 個數
>b.repartition(3).saveAsTextFile("
file:///home/zhaodz/keshan/keshanthree")
則會看到數據保存到三個文件中
--》對repartition的理解
將一堆平行平等的數據,分成幾個部分,分成多少部分是用戶自定義,至於怎么存儲,是平均存還是一個個的存則交給電腦自行處理(交給worker進行處理,一般為worker的數量,請自行腦補)
--》對foreachrepartition的理解
-經典語錄
spark的一切都是基於RDD的
--》似乎dataFrame不能進行增刪改查,只能進行打碎這些形式上的操作
--》使用mapPartitions進行操作
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21
scala> var rdd3 = rdd1.mapPartitions{x=>{
| var result = List[Int]()
| var i= 0
| while(x.hasNext){
| i+=x.next()
| }
| result.::(i).iterator //相當於result.::(i)變成一個新的列表之后,再變成包含列表所有元素的迭代器List.iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at mapPartitions at <console>:23
res0: Array[Int] = Array(3, 12) 其中3為1和2的和,12為3,4,5的和
顯示數據塊的數量
scala> rdd3.partitions.size
res1: Int = 2
顯示RDD
scala>println(rdd3.collect.mkString(" "))
3 12
--》顯示RDD內容
scala> var rdd1 = sc.makeRDD(1 to 5,2)
顯示全部
scala>rdd1.collect().foreach(println)
1
2
3
4
5
顯示部分行
scala>rdd1.take(2).foreach(println)
1
2
--》RDD的操作
--》將RDD轉化為一個scala數組並返回
scala> val a = sc.makeRDD(1 to 5 ,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:21
scala> a.collect
//將分散的收集起來
res3: Array[Int] = Array(1, 2, 3, 4, 5)
scala> a.count //返回RDD的元素個數
res4: Long = 5
--》org.apache.rdd.RDD函數foreach的使用
scala> var rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:21
scala> rdd.foreach(println)
6
1
7
2
8
9
3
10
4
5
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
7
8
9
10
--》StructType類的使用
import org.apache.spark.sql.types._
val schema = StructType(StructField("data", ArrayType(FloatType), false) :: StructField("label", IntegerType, false) :: Nil) //::兩側必需有空格,必需有Nil,必須有_(先聲明再使用,_表示代替了所有的名詞)
scala> struct("data").dataType
res1: org.apache.spark.sql.types.DataType = ArrayType(FloatType)
/home/zhaodz/program/spark_source/spark/sql/catalyst/src/main/scala/org/apache/sql/types/StructType.scala
--》SQLContext類的使用
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
或者
import org.apache.spark.sql._
val sqlContext = new SQLContext(sc)
-->RDD類的使用
scala> val a = sc.parallelize(1 to 9, 3) //從普通數組創建RDD,里面包含了9個數字,分別在3個分區
scala>a.saveAsTextFile("
file:///home/zhaodz/keshan/keshankdks") //可看到並行存儲到了三個文件中
scala> val b = a.map(x => x*2) //map函數的使用,map即使用函數進行映射
scala> a.collect
//收集並行的返回scala的數組
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
//可看到b是a的映射
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
或者通過mkStrig函數進行查看
scala> println(b.collect().mkString(" "))
--》區域org.apache.spark.sql中類的使用
使用方法
1.import法,import org.apache.spark.sql._即可以使用下面所有的類
2.全名稱法 如val sqlContext = new org.apache.spark.sql.SQLContext(sc)
ROW類使用
scala> val row = Row("str1","str2")
row: org.apache.spark.sql.Row = [str1,str2]
scala> row(0)
res10: Any = str1
SQLContext類使用
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
DataFrame類使用
def repartition(numPartitions: Int): DataFrame
def cache()
def foreach(f: (Row) ? Unit): Unit
def foreachPartition(f: (Iterator[Row]) ? Unit): Unit
def mapPartitions[R](f: (Iterator[Row]) ? Iterator[R])(implicit arg0: ClassTag[R]): RDD[R]
--》區域org.apache.spark中類的使用
SparkConf類的使用
SparkContext類的使用
def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]
一般使用方法
import org.apache.spark.{SparkContext,SparkConf}
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
sc.parallelize()
-->初始化spark
import org.apache.spark.{SparkContext,SparkConf}
val conf = new SparkConf().setAppName("name")
val sc = new SparkContext(conf)
--》從一般數據創建DataFrame實例
scala> var name = Array("zhao") //都為Array形式
scala> var score = Array(98)
scala> var rdd = sc.parallelize(name.zip(score))
scala> import org.apache.spark.sql.types._
//(下面)表格個列每個單元格的數據類型
scala> var schema = StructType(StructField("name",StringType,false)::StructField("score",IntegerType,false)::Nil)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) //固定
scala> import org.apache.spark.sql.{DataFrame, Row}
//固定
scala> var dataFrame = sqlContext.createDataFrame(rdd.map{case(a,b)=>Row(a,b)},schema)
scala> dataFrame.show
-->從一般數據創建DataFrame實例2
spark-shell中直接復制如下
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val row1 = Row("Bruce Zhang", "developer", 38 )
val row2 = Row("Zhang Yi", "engineer", 39)
val table = List(row1, row2)
val rows = sc.parallelize(table)
val schema = StructType(Array(StructField("name", StringType, true),StructField("role", StringType, true), StructField("age", IntegerType, true)))
val dd = sqlContext.createDataFrame(rows, schema)
dd.show
+-----------+---------+---+
| name| role|age|
+-----------+---------+---+
|Bruce Zhang|developer| 38|
| Zhang Yi| engineer| 39|
+-----------+---------+---+
//固定
--》從list中分割子list
slice(from: Int, until: Int): List[A] 提取列表中從位置from到位置until(不含該位置)的元素列表
scala> val nums = List(1,2,3,4,5)
nums: List[Int] = List(1, 2, 3, 4, 5)
scala> nums.slice(2,4)
res20: List[Int] = List(3, 4)