SparkConf和SparkContext


任何 Spark程序都是SparkContext開始的,SparkContext的初始化需要一個SparkConf對象,SparkConf包含了Spark集群配置的各種參數。
初始化后,就可以使用SparkContext對象所包含的各種方法來創建和操作RDD和共享變量。
Scala:
val conf = new SparkConf().setMaster("master").setAppName("appName")
val sc = new SparkContext(conf)
或者
val sc = new SparkContext("master","appName")
 

通過創建SparkConf對象來配置應用,然后基於這個SparkConf創建一個SparkContext對象。驅動器程序通過SparkContext對象來訪問Spark。

這個對象代表對計算集群的一個連接。一旦有了SparkContext, 就可以用它來創建RDD。

Java:

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);

只需傳遞兩個參數:
集群URL:告訴Spark如何連接到集群上。例子中使用local可以讓Spark運行在單機單線程上而無需連接到集群。
應用名:例子中使用My App。當連接到一個集群時,這個值可以幫助我們在集群管理器的用戶界面中找到應用。

 

在Spark2.0之前, SparkContext 是所有 Spark 功能的結構, 驅動器(driver) 通過SparkContext 連接到集群 (通過resource manager), 因為在2.0之前, RDD就是Spark的基礎。

如果需要建立SparkContext,則需要SparkConf,通過Conf來配置SparkContext的內容。

在Spark2.0之后,Spark Session也是Spark 的一個入口, 為了引入dataframe和dataset的API, 同時保留了原來SparkContext的functionality,

如果想要使用 HIVE,SQL,Streaming的API, 就需要Spark Session作為入口。

SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getOrCreate();

 

1)SparkSession.builder() 創建此方法用於構造SparkSession。

2)master(“local”) 設置要連接的master URL,例如:

“local”在本地運行
“local[4]”以4核在本地運行
“spark://master:7077”在spark獨立集群上運行

3)appName( ) 設置將在spark Web UI中顯示的應用程序的名稱。如果未設置應用程序名稱,則將使用隨機生成的名稱。

4)Config 設置使用此方法設置的配置選項會自動傳遞到'SparkConf'和'SparkSession'自己的配置,它的參數由鍵值對組成。

5)enableHiveSupport啟用Hive支持,類似於HiveContext創建了sparkSession,我們可以用它來讀取數據。

6)getOrCreate()方法表示有就拿過來,沒有就創建,類似於單例模式。

 

使用SparkSession讀取數據SparkSession是讀取數據的入口點,類似於舊的SQLContext.read。以下代碼使用SparkSession從CSV讀取數據:

 val df = spark.read.format("com.databricks.spark.csv")                 
.schema(customSchema)                   
.load("data.csv")

  

從Spark 2.0.0開始,最好使用SparkSession,因為它提供了對sparkContext所具有的所有spark功能的訪問。 此外,它還提供了用於處理DataFrame和DataSet的API

運行SQL查詢SparkSession可用於對數據執行SQL查詢,將結果作為Data-Frame(即數據集[ROW])返回。

 

眾所周知,在以前的版本中,sparkcontext 是spark的入口點,因為RDD是主要的API,它是使用上下文API創建和操作的。 對於每個其他API,我們需要使用不同的context。

對於流式傳輸,我們需要streamingContext。 對於SQL sqlContext和hive hiveContext.,因為dataSet和DataFrame API正在成為新的獨立API,我們需要為它們構建入口點。 因此在spark 2.0中,我們為DataSet和DataFrame API創建了一個新的入口點構建,稱為Spark-Session。

 它是SQLContext,HiveContext和未來的streamingContext的組合。 在這些context中可用的所有API都可以在SparkSession上獲得,SparkSession也有實際計算的spark context 。

 SparkSession: SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),

                               所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。

Scala:

val sparkSession = SparkSession.builder
                    .master("master")
                    .appName("appName")
                    .getOrCreate()
或者
SparkSession.builder.config(conf=SparkConf())

  

 

I、 SparkSubmit 的 shell腳本

/data/spark/spark-2.2.0-bin-hadoop2.7/bin/spark-submit

--master spark://elcndc2sc39t:7077

--class com.enc.analysis.core.AlgorithmExecute

/data/upload/analysis/analysisFrame-1.1.0.jar $1 $2

-----------------------------------------------------------------

--master表示master路徑,

--class表示入口的類的全路徑

/data/upload/analysis/analysisFrame-1.1.0.jar 表示計算框架jar包的全路徑

$1,$2..是自定義的shell命令進行傳參,傳遞的參數會在入口類的main方法的String[] args中

 

II、 利用Spark讀取jdbc

Properties connectionProperties = new Properties();

String url = "jdbc:mysql://" + "mysql服務器地址" + ":" + "mysql端口" + "/" + "數據庫名?useUnicode=true&characterEncoding=utf-8";

String driver = "com.mysql.jdbc.Driver";

connectionProperties.setProperty("user", "用戶名");// 設置用戶名

connectionProperties.setProperty("password", "密碼");// 設置密碼

connectionProperties.setProperty("driver", driver);

connectionProperties.setProperty("url",url);

SparkSession spark = SparkSessionUtils.getLocalSession();

Dataset<Row> dataset = spark.read().jdbc(connectionProperties.getProperty("url"),"表名",connectionProperties).persist();

dataset.show();

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM