Spark Datafram----pyspark第六次作業


練習:

1、將下列JSON格式數據復制到Linux系統中,並保存命名為employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

為employee.json創建DataFrame,並寫出Python語句完成下列操作:

>>spark=SparkSession.builder.getOrCreate()
>>df=spark.read.json('file:///usr/local/spark/mycode/dafaframe/employee.json') 

 

1.查詢所有數據;

>>df.show()

 

 

2.查詢所有數據,並去除重復的數據;

>>df.distinct().show()

 

 

3.查詢所有數據,打印時去除id字段;

>>df.select(df.name,df.age).show()

 

 

4.篩選出age>30的記錄;

>>df.filter(df.age>30).show()

 

 

5.將數據按age分組;

>>df.groupby('age').count().show()

 

 

6.將數據按name升序排列;

>>df.sort(df.name.asc()).show()

 

 

7.取出前3行數據;

>>df.show(3)

 

 

8.查詢所有記錄的name列,並為其取別名為username;

>>df.select(df.name.alias('username')).show()

 

 

9.查詢年齡age的平均值;

>>df.groupby.avg('age').collect()[0].adDict()['avg(age)']
30.0

 

10.查詢年齡age的最小值。

>>df.groupby.min('age').collect()[0].adDict()['min(age)']
28

 

2.編程實現將RDD轉換為DataFrame

源文件內容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

請將數據復制保存到Linux系統中,命名為employee.txt,實現從RDD轉換得到DataFrame,並按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有數據。請寫出程序代碼。

from pyspark.sql import SparkSession, Row

spark = SparkSession.builder.appName('employee').getOrCreate()

sc = spark.sparkContext

lines = sc.textFile('file:///usr/local/spark/mycode/data/employee.txt')

result1 = lines.filter(lambda line: (len(line.strip()) > 0))

result2 = result1.map(lambda x: x.split(','))

#將RDD轉換成DataFrame

item = result2.map(lambda x: Row(id=x[0], name=x[1], age=x[2]))

df = spark.createDataFrame(item)

df.show()

3.統計chines_year文件每年各類節目的數量,打印(節目名稱、數量、年份)。要求首先按照節目名稱升序排序,節目名稱相同時其次按照年份升序排序。采用Spark RDD和Spark SQL兩種方式。分別寫出代碼並截圖。

 

sortTypeRDD.py

from pyspark import SparkConf, SparkContext

from operator import gt

 

class SecondSortKey():

   def __init__(self, k):

    self.year = k[0]

    self.name = k[1]

   def __gt__(self, other):

    if other.name == self.name:

     return gt(self.year, other.year)

    else:

     return gt(self.name, other.name)

def main():

   conf = SparkConf().setAppName('spark_sort').setMaster('local')

   sc = SparkContext(conf=conf)

   data = sc.textFile('file:///usr/local/spark/mycode/data/chinese_year.txt')

   rdd = data .map(lambda x: (x.split("\t")[1], x.split("\t")[0]))\

              .map(lambda x: (x, 1))\

              .reduceByKey(lambda a, b: a+b)\

              .map(lambda x: (SecondSortKey(x[0]), x[0][0]+','+x[0][1]+','+str(x[1])))\

              .sortByKey(True)\

              .map(lambda x: x[1])

   rdd.foreach(print)

 

if __name__=='__main__':

 main()

 

 

  

 

SortTypeSql.py

from pyspark.sql import SparkSession, Row

#讀取text文件

spark = SparkSession.builder.appName('topNSQL').getOrCreate()

sc = spark.sparkContext

lines = sc.textFile("file:///usr/local/spark/mycode/data/chinese_year.txt")

result1 = lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split('\t'))==4))

result2 = result1.map(lambda x: x.split('\t'))

#將RDD轉換成DataFrame

item = result2.map(lambda x: Row(year=x[0], type=x[1], program=x[2], performers=x[3]))

df = spark.createDataFrame(item)

 

df.createOrReplaceTempView('items')

df1 = spark.sql('select type,year,count(*) from items group by type ,year  order by type ,year ')

df1.show()

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM