spark知識點


  來自官網DataFrames、DataSets、SQL,即sparkSQL模塊。

  spark2.0之前,主要的數據格式是RDD(彈性分布式數據集)。spark2.0之后,使用Dataset代替RDD;再,Datasets在Python中是Datasets[Row],故稱之為DataFrame,與Python保持一致。

  Dataset API只適用於Scala和Java,使用列名來組織Dataset就是DataFrame,類似於關系型數據庫中的表或者Python中的dataframe,且在后台擁有更豐富的優化機制;DataFrame API適用於Scala、Java、Python、R。

  Spark SQL初識:

  初始化一個sparkSession,這是spark中所有功能的切入點

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \  #builder這里無括號
    .master("local") \  #要連接的主機名
   .appName("Python Spark SQL basic example") \ #CappName只是顯示在spark web 中,與本程序名並無關聯 .config("spark.some.config.option", "some-value") \ #對於一個(key,value)對,忽略參數名 .getOrCreate() #如果已有SparkSession則get,否則Create

   

  spark.select用於篩選特定的行,spark.sql用於數據庫查詢。還可以groupby分組、filter過濾。

  方法調用:

  1)join,同SQL的聯合查詢,1個表來調用join,參數是另一個表、相等的字段、連接的方式(inner、outer)

  

people = sqlContext.read.parquet("...")  #兩個表,people和department
department = sqlContext.read.parquet("...")

people.filter(people.age > 30).join(department, people.deptId == department.id) \  #連接的字段,倒是比SQL簡潔不少
  .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})      #groupby分組,分組之后要設置聚合的字段和方式,agg也可單獨使用

  2)類 Row :可直接定義一個row同時賦值,也可先聲明列名再賦值

>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row['name'], row['age'] #類似字典
('Alice', 11)
>>> row.name, row.age  #類似屬性,dataframe取某一列也是這兩種方式
('Alice', 11)
>>> 'name' in row
True
>>> 'wrong_key' in row
False

>>> Person = Row("name", "age")  #先聲明,其后再賦值
>>> Person
<Row(name, age)>
>>> 'name' in Person
True
>>> 'wrong_key' in Person
False
>>> Person("Alice", 11)
Row(name='Alice', age=11)

  3)利用反射來推測數據模式(數據類型):

from pyspark.sql import Row  #sql是個模塊,而Row是個類,故不能直接 import pyspark.sql.Row

sc = spark.sparkContext   #創建上下文

# 載入文件並轉換為Row。people文件中內容是Michael, 29  Andy, 30  Justin, 19
lines = sc.textFile("examples/src/main/resources/people.txt")  #可以讀HDFS文件、本地文件或支持Hadoop文件系統的URI,返回RDD的字符串
parts = lines.map(lambda l: l.split(","))  #以逗號分隔
people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) #每行轉換為Row

# 推斷數據模式,把Dataframe注冊為數據庫表.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# DataFrames注冊為數據庫表之后,可以使用SQL.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# SQL查詢的結果是Dataframe對象.
# rdd則返回類:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin

  4)利用程序化方式來推測數據模式(數據類型):

from pyspark.sql.types import *   #數據類型

sc = spark.sparkContext

# 載入數據並轉換為Row,此處未給列名.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# schema編碼為string.
schemaString = "name age"

#StructField是個類,參數有字段名,數據類型,是否可以為空;那么fields就是個對象(實例);StructType也是個類,與StructField的數據類型保持一致,可迭代
#StringType也是個類,還有BinaryType、 BooleanType等。此處把name、age傳進去。 fields
= [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # 把schema應用到RDD. schemaPeople = spark.createDataFrame(people, schema) # 創建臨時視圖 schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+

  ???為什么要推斷數據模式,因為原始的數據沒有列名和數據類型,定義了列名和數據類型之后,就是DataFrame了,然后方便處理數據?

 

 

  ???嘗試編代碼時,總是報錯,如下:

  

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:58808)
Traceback (most recent call last):
File "C:\Users\dell\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 827, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Users\dell\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 963, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] 由於目標計算機積極拒絕,無法連接。

  且在這里記下,以后找到解決方法再說。

  奇怪的是,wordCount的程序第一次運行時正常,給出結果,在運行時就一直報上述錯誤。貌似是Java服務只能連上一會兒,之后就斷開了。

  jdk的環境變量按照網上說的,反復確認,但仍然報錯。一開始裝的jdk9,后又改成jdk1.8。

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM