spark DataFrame 讀寫和保存數據


一。讀寫Parquet(DataFrame) 

  Spark SQL可以支持Parquet、JSON、Hive等數據源,並且可以通過JDBC連接外部數據源。前面的介紹中,我們已經涉及到了JSON、文本格式的加載,這里不再贅述。這里介紹Parquet,下一節會介紹JDBC數據庫連接。

  Parquet是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。Parquet是語言無關的,而且不與任何一種數據處理框架綁定在一起,適配多種語言和組件,能夠與Parquet配合的組件有:
    * 查詢引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
    * 計算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
    * 數據模型: Avro, Thrift, Protocol Buffers, POJOs
  Spark已經為我們提供了parquet樣例數據,就保存在“/usr/local/spark/examples/src/main/resources/”這個目錄下,有個users.parquet文件,這個文件格式比較特殊,如果你用vim編輯器打開,或者用cat命令查看文件內容,肉眼是一堆亂七八糟的東西,是無法理解的。只有被加載到程序中以后,Spark會對這種格式進行解析,然后我們才能理解其中的數據。
  下面代碼演示了如何從parquet文件中加載數據生成DataFrame。

>>> parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> parquetFileDF.createOrReplaceTempView("parquetFile")
 
>>> namesDF = spark.sql("SELECT * FROM parquetFile")
 
>>> namesDF.rdd.foreach(lambda person: print(person.name))
 
Alyssa
Ben
 

 

>>> peopleDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
 
>>> peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
 

 

二。jdbc

  

>>> jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
>>> jdbcDF.show()

 

>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要設置模式信息
>>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
>>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row對象和模式之間的對應關系,也就是把數據和模式對應起來
>>> studentDF = spark.createDataFrame(rowRDD, schema)
>>> prop = {}
>>> prop['user'] = 'root'
>>> prop['password'] = 'hadoop'
>>> prop['driver'] = "com.mysql.jdbc.Driver"
>>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM