python操作Spark常用命令


1. 獲取SparkSession

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

 

2. 獲取SparkContext  

1. 獲取sparkSession:  se = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
1. 獲取sparkContext: sc = se.sparkContext
2. 獲取sqlContext: sq = SparkSession.builder.getOrCreate()
3. 獲取DataFrame: df = sqlContext.createDataFrame(userRows)

 

3. 讀取文件

line1 = sc.textFile("hdfs://192.168.88.128:9000/hello.txt")
rawData = sc.textFile("hdfs://192.168.88.128:9000/data/sanxi/sanxi/*.gz")   獲取sanxi文件夾下所有.gz的文件
rawData = sc.textFile("file:///data/sanxi2/*.gz") spark 讀取本地文件

 

4. filter 使用方法

1. 過濾包含指定字符的RDD
  line2 = line1.filter(lambda x : "a" in x)
2. 接收一個函數, 將滿足該函數的元素放入新的RDD中
   def hasHWTC1AC5C088(line):
    return "HWTC1AC5C088" in line
   lines2 = lines.filter(hasHWTC1AC5C088("HWTC1AC5C088"))  #將函數傳入filter中
3. RDD 刪除第一條數據
   header = abc.first()
   df1 = abc.filter(lambda x:x != header)

 

5. map 和 flatMap 使用方法   將 lambda 函數做用在每一條記錄上

1)line2 = line1.map(lambda x: x.split(" "))

 2)line3 = line1.map(lambda x: x+"abc")  #對原數據進行任意操作,  將結果再放回給原數據

  3)line4 = line1.map(lambda x: (x, 1))  將原始數據改為 key-value形式,  key為原數據,  value為 1

 4)line2.flatMap(lambda line: line.split(" "))  #  

 

 5)map 與 flatMap 的區別(通常用來統計單詞個數示例,  必須使用flatMap來進行拆分單詞)

   map 具有分層,   就是每一行數據作為你一層來處理  ,  結果為: 

    [[u'extends', u'Object'], [u'implements', u'scala.Serializable']]   

   flatMap 不具有分層,   

    [u'extends', u'Object', u'implements', u'scala.Serializable']

  6)map 獲取前3列數據   下例中:  [:3]  表示從開頭到第三個數據項,   如果是[3:]  就表示從第三項到最后

    Rdd.map(lambda x: x.split(" ")[:3]) 結果:[[u'a', u'1', u'3'], [u'b', u'2', u'4'], [u'd', u'3', u'4']]

  ALS 訓練數據---獲取指定列數據

    ratingsRdd = rawRatings.map(lambda x:(x[0],x[1],x[2])  結果為:

      [(u'196', u'242', u'3'), (u'186', u'302', u'3'), (u'22', u'377', u'1')]

  7) 類型轉換

    Rdd.map(lambda x: float(x[0]))    將第一個字段轉換為 float 類型

  8) 刪除所有的 "" 號  replace(替換),   下列意思是將" 替換成空

    df2 = df1.map(lambda x:x.replace("\"",""))

  9) df2 = RDD.map(lambda x: (x[0],float(x[1]),float(x[2])))   設置一個 key 對應 多個value,  

    df3 = df2.filter(lambda keyValue: keyValue[0] > 2)    操作key

     df3 = df2.filter(lambda keyValue: keyValue[1] > 2)   操作第一個value

     df3 = df2.filter(lambda keyValue: keyValue[2] > 2)    操作第二個value

 6. RDD 類型數據 的查詢方式

print(abc)    打印當前對象
type(Rdd) 獲取當前對象類型
RDD.collect() 將RDD轉換為數組, 結果格式為:([u'{"name":"Michael"}', u'{"name":"Andy", "age":30}', u'{"name":"Justin", "age":19}'])
RDD.count() 查看內容條數
Rdd.printSchema() 查看rdd 列

7. RDD轉換操作  rdd轉list

 list = RDD.collect() 2) list轉RDD RDD = sc.parallelize(list)
3) RDD 調用 map 函數
  (1) RDD1 = RDD2.map(lambda x: x+1) #使用匿名函數操作每條數據 map(lambda x: x.split(","))字符串截取,
map(lambda x: "abc"+x) 重組字符串,

  (2) RDD2 = RDD1.map(addOne) #使用具名函數來操作每條數據(具名函數就是單獨定義一個函數來處理數據) 如下:
      def addOne(x):
      return x.split(",")
      print(lines.map(addOne).collect()) #調用具名函數
4. RDD 調用 filter 函數
  1) intRdd
.filter(lambda x: x>5) #對數字類型的 RDD 進行篩選 intRdd.filter(lambda x: x>5 and x <40) and 表示 並且 的意思, or 表示 或 的意思
  2) stringRdd.filter(lambda x: "abc" in x) #篩選包含 abc 的數據
4. RDD 刪除 重復 元素
  1) intRdd.distinct() #去重
5. 隨機將一個 RDD 通過指定比例 分為 2 個RDD
  1) sRdd = stringRdd.randomSplit([0.4,0.6]) 將 stringRdd 以4:6 分為2個 RDD, 獲取其中一個 RDD 的方法為: sRdd[0]
6. RDD 中 groupBy 分組計算
  1) gRdd = intRdd.groupBy(lambda x: x<2) #將會分為2組, 訪問第一粗: print(sorted(gRdd[0][1])), 方位第二組:
print(sorted(gRdd[1][1]))
  2) 分組並且取別名: gRdd = intRdd.groupBy(lambda x: "a" if(x < 2) else "b"),
    (1)獲取第一組信息: print(gRdd[0][0], sorted(gRdd[0][1]))
    (2)
獲取第二組信息: print(gRdd[1][0], sorted(gRdd[1][1])) 其中, 前半部分 gRdd[1][0] 表示獲取別名 a

7. 使用 union 進行並集運算, intersection 進行並集運算
  1)intRdd1.union(intRdd2) 如: intRdd1 為 1, 3, 1 intRdd2 為 1, 2, 3, 4 則結果為: 1,3,1,1,2,3,4
  2)intRdd1.intersection(intRdd2) 計算 2 個RDD 的交集
  3)intRdd3.subtract(intRdd1) 計算 2 個 Rdd 的差集, 此例表示 intRdd3中有, 但在intRdd1中沒有
  4)intRdd1.cartesian(intRdd2) 計算 笛卡爾積




 

 8.  RDD 動作運算

[1] 讀取元素  
  1
) first() 查看RDD 第一條數據
  2) take(2) 獲取第二條數據
  3) takeOrdered(3) 從小到大排序取出前 3 條數據
  4) intRdd3.takeOrdered(6,key=lambda x: -x) 從大道小排序, 取出前6條數據
[2] 統計功能
  1) intRdd1.stats() 統計 intRdd1, 結果為:(count: 5, mean: 5.0, stdev: 2.82842712475, max: 9, min: 1)
     mean表示平均值,  stdev 表示標准差
  2)intRdd3.min() 最新值,
  3)intRdd3.max() 最大值
  4)intRdd3.stdev() 標准差
  5)intRdd3.count() 數據條數
  6)intRdd3.sum() 求和
  7)intRdd3.mean() 平均值

9.  RDD key-value 基本轉換運算

1)kvRdd1 = sc.parallelize([(1, 4),(2, 5),(3, 6),(4, 7)])  創建RDD key-value 源數據
  結果為: [(1, 4), (2, 5), (3, 6), (4, 7)]
2)kvRdd1.keys() 獲取全部 key 的值
3)kvRdd1.values() 獲取全部 values 的值
4)kvRdd1.filter(lambda keyValue: keyValue[0] > 2) 過濾 key > 2 的數據
5)kvRdd1.filter(lambda keyValue: keyValue[1] >5) 過濾 value > 5 的數據
6)kvRdd1.mapValues(lambda x: x*x) 對每一條 value 進行運算
7)kvRdd1.sortByKey() 按照 key 從小到大 進行排序
8)kvRdd1.sortByKey(ascending=False) 按照 key 從大到小進行排序
9)kvRdd3.reduceByKey(lambda x, y:x+y) 將 key 相同的鍵的值合並相加

 10.  多個 RDD key-value 的轉換運算

1) join
  intK1 = sc.parallelize([(1,5),(2,6),(3,7),(4,8),(5,9)])   intK2 = sc.parallelize([(3,30),(2,20),(6,60)])   intK1.join(intK2) join結果為:   [(2, (6, 20)), (3, (7, 30))]
2)leftJoin
  intK1.leftOuterJoin(intK2).collect() leftJoin結果為:
  [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))]
3)rightJoin    rightJoin 結果為:
  intK1.rithtOuterJoin(intK2).collect()
  [(2, (6, 20)), (6, (None, 60)), (3, (7, 30))]
4)subtractByKey 從 intK1 中刪除 與 intK2 相同 key-value
  intK1.subtractByKey(intK2) 結果為:
  [(4, 8), (1, 5), (5, 9)]  

11. key-value 動作 運算

1) intK1.first()    獲取第一項數據
2) intK1.collect() 獲取所有項數據
3) intK1.take(2) 獲取前二項數據
4) intK1.first()[0] 獲取第一項數據的 key
5)
intK1.first()[1] 獲取第一項數據的 value
  例如: 一條記錄結果為 [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))](leftJoin結果)
  想要獲取第一條記錄的 6 , 可以使用: intK1.leftOuterJoin(intK2).first()[1][0] [1] 表示獲取第一條記錄的value[0] 表示
  從 value 中再獲取第一項值 6
6) intK3.countByKey() 計算 RDD 中每一個 Key 值得項數, 例如
  [(1, 2), (2, 3), (2, 5), (2, 8), (5, 10)] 源數據
  defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}) 結果值
7) KV = intK3.collectAsMap() 將 key-value 轉換為 key-value的字典
  {1: 2, 2: 8, 5: 10} 結果為
  例如, 如果要獲取 8 這個value, 就使用 KV[2] 就可以獲取得到
8) intK3.lookup(2) 查找 key 為 2 的所有value 值, 如果想要再進行統計計算, 就將結果再進行轉換為 RDD 進行統計計算
9) 廣播變量
  1> kvFrult = sc.parallelize([(1, "apple"),(2, "orange"),(3, "grape")]) 創建key-value 對照表
  2> fruitMap = kvFrult.collectAsMap() 轉換為 map 字典
  3> bcFruitMap = sc.broadcast(fruitMap) 創建廣播變量
  4> fruitIds = sc.parallelize([2,4,1,3]) 創建編號 RDD
  5> fruitNames = fruitIds.map(lambda x: bcFruitMap.value[x]) 使用
bcFruitMap.value 進行轉換 從而獲取編號對應的名稱
10) 通過累加器來計算總和
  intRdd = sc.parallelize([1,2,44,2,11,22]) 源數據
  total = sc.accumulator(0.0)  定義一個double類型的累加器, 來計算總和
  num = sc.accumulator(0)  定義一個int類型的累加器, 來計算數量
  intRdd.foreach(lambda l: [total.add(l), num.add(1)])  通過foreach 循環來統計
  total.value 獲取總和
  num.value 獲取個數
  avg = total.vaue/num.value 獲取平均值
11) RDD 持久化

  1.書221 頁面, 設置持久化等級列表
  2.intRdd1.persist()  設置持久化
  2.intRdd1.persist(StorageLevel.MEMORY_AND_DISK)  設置存儲等級
  4.intRdd1.is_cached  查看是否持久化
12) RDD.saveAsTextFile("hdfs://192.168.88.128:9000/data/result.txt") 將結果保存成文件



  

 12 數據格式

1. [[u'3', u'5'], [u'4', u'6'], [u'4', u'5'], [u'4', u'2']] 拆分或截取的原始數據, 可以通過 map 中的 x[0], x[1] 來獲取對應列的數據
  可以通過 map 來轉換為key-value 數據格式 例如: df3 = df2.map(lambda x: (x[0], x[1]))

2. key-value 數據格式
  [(u'3', u'5'), (u'4', u'6'), (u'4', u'5'), (u'4', u'2')] 中每一個() 表示一組數據, 第一個表示key 第二個表示value
3)PipelinedRDD 類型表示 key-value形式數據

 13 RDD類型轉換

userRdd = sc.textFile("D:\data\people.json")
    userRdd = userRdd.map(lambda x: x.split(" "))
    
    userRows = userRdd.map(lambda p:
                            Row(
                                userName = p[0],
                                userAge = int(p[1]),
                                userAdd = p[2],
                                userSalary = int(p[3])
                                )
                            )
    print(userRows.take(4))

結果: [Row(userAdd='shanghai', userAge=20, userName='zhangsan', userSalary=13), Row(userAdd='beijin', userAge=30, userName='lisi', userSalary=15)]

2) 創建 DataFrame
  userDF = sqlContext.createDataFrame(userRows)

14.  通過sql 語句查詢字段

from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row


if __name__ == '__main__':
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
        
    sc = spark.sparkContext
    
    rd = sc.textFile("D:\data\people.txt")
    rd2 = rd.map(lambda x:x.split(","))
    people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))
    
    peopleDF = spark.createDataFrame(people)
    peopleDF.createOrReplaceTempView("people")
    teenagers = spark.sql("SELECT name,age FROM people where name='Andy'")
    teenagers.show(5)
#     print(teenagers.rdd.collect())
    teenNames = teenagers.rdd.map(lambda p: 100 + p.age).collect()
    
    
    for name in teenNames:
        print(name)
        
    
View Code

15 dateFrame,sql,json使用詳細示例

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A simple example demonstrating basic Spark SQL features.
Run with:
  ./bin/spark-submit examples/src/main/python/sql/basic.py
"""
from __future__ import print_function

# $example on:init_session$
from pyspark.sql import SparkSession
# $example off:init_session$

# $example on:schema_inferring$
from pyspark.sql import Row
# $example off:schema_inferring$


# $example on:programmatic_schema$
# Import data types
from pyspark.sql.types import *
# $example off:programmatic_schema$


def basic_df_example(spark):
    # $example on:create_df$
    # spark is an existing SparkSession
    df = spark.read.json("/data/people.json")
    # Displays the content of the DataFrame to stdout
    df.show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+
    # $example off:create_df$

    # $example on:untyped_ops$
    # spark, df are from the previous example
    # Print the schema in a tree format
    df.printSchema()
    # root
    # |-- age: long (nullable = true)
    # |-- name: string (nullable = true)

    # Select only the "name" column
    df.select("name").show()
    # +-------+
    # |   name|
    # +-------+
    # |Michael|
    # |   Andy|
    # | Justin|
    # +-------+

    # Select everybody, but increment the age by 1
    df.select(df['name'], df['age'] + 1).show()
    # +-------+---------+
    # |   name|(age + 1)|
    # +-------+---------+
    # |Michael|     null|
    # |   Andy|       31|
    # | Justin|       20|
    # +-------+---------+

    # Select people older than 21
    df.filter(df['age'] > 21).show()
    # +---+----+
    # |age|name|
    # +---+----+
    # | 30|Andy|
    # +---+----+

    # Count people by age
    df.groupBy("age").count().show()
    # +----+-----+
    # | age|count|
    # +----+-----+
    # |  19|    1|
    # |null|    1|
    # |  30|    1|
    # +----+-----+
    # $example off:untyped_ops$

    # $example on:run_sql$
    # Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")

    sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+
    # $example off:run_sql$

    # $example on:global_temp_view$
    # Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")

    # Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+

    # Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+
    # $example off:global_temp_view$


def schema_inference_example(spark):
    # $example on:schema_inferring$
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")

    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

    # The results of SQL queries are Dataframe objects.
    # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
    teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
    for name in teenNames:
        print(name)
    # Name: Justin
    # $example off:schema_inferring$


def programmatic_schema_example(spark):
    # $example on:programmatic_schema$
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    # Each line is converted to a tuple.
    people = parts.map(lambda p: (p[0], p[1].strip()))

    # The schema is encoded in a string.
    schemaString = "name age"

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)

    # Apply the schema to the RDD.
    schemaPeople = spark.createDataFrame(people, schema)

    # Creates a temporary view using the DataFrame
    schemaPeople.createOrReplaceTempView("people")

    # SQL can be run over DataFrames that have been registered as a table.
    results = spark.sql("SELECT name FROM people")

    results.show()
    # +-------+
    # |   name|
    # +-------+
    # |Michael|
    # |   Andy|
    # | Justin|
    # +-------+
    # $example off:programmatic_schema$

if __name__ == "__main__":
    # $example on:init_session$
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    # $example off:init_session$

    basic_df_example(spark)
    # schema_inference_example(spark)
    # programmatic_schema_example(spark)

    spark.stop()
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM