一。讀寫Parquet(DataFrame)
Spark SQL可以支持Parquet、JSON、Hive等數據源,並且可以通過JDBC連接外部數據源。前面的介紹中,我們已經涉及到了JSON、文本格式的加載,這里不再贅述。這里介紹Parquet,下一節會介紹JDBC數據庫連接。
Parquet是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。Parquet是語言無關的,而且不與任何一種數據處理框架綁定在一起,適配多種語言和組件,能夠與Parquet配合的組件有:
* 查詢引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
* 計算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
* 數據模型: Avro, Thrift, Protocol Buffers, POJOs
Spark已經為我們提供了parquet樣例數據,就保存在“/usr/local/spark/examples/src/main/resources/”這個目錄下,有個users.parquet文件,這個文件格式比較特殊,如果你用vim編輯器打開,或者用cat命令查看文件內容,肉眼是一堆亂七八糟的東西,是無法理解的。只有被加載到程序中以后,Spark會對這種格式進行解析,然后我們才能理解其中的數據。
下面代碼演示了如何從parquet文件中加載數據生成DataFrame。
>>> parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet" >>> parquetFileDF.createOrReplaceTempView("parquetFile") >>> namesDF = spark.sql("SELECT * FROM parquetFile") >>> namesDF.rdd.foreach(lambda person: print(person.name)) Alyssa Ben
>>> peopleDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
二。jdbc
>>> jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load() >>> jdbcDF.show()
>>> from pyspark.sql.types import Row >>> from pyspark.sql.types import StructType >>> from pyspark.sql.types import StructField >>> from pyspark.sql.types import StringType >>> from pyspark.sql.types import IntegerType >>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" ")) //下面要設置模式信息 >>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) >>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3]))) //建立起Row對象和模式之間的對應關系,也就是把數據和模式對應起來 >>> studentDF = spark.createDataFrame(rowRDD, schema) >>> prop = {} >>> prop['user'] = 'root' >>> prop['password'] = 'hadoop' >>> prop['driver'] = "com.mysql.jdbc.Driver" >>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)