# -*- coding:utf-8 -*- import sys from pyspark import SparkContext from pyspark.sql.session import SparkSession from pyspark import SQLContext from pyspark.sql.types import * def getkey(line): fields = line.strip().split('\t') key = fields[0] return (key,)
#return [key] def getkey2(line): fields = line.strip().split('\t') key = fields[0] return key sc = SparkContext(appName="zxm:copc") spark = SparkSession(sc) inpath = "xxx/imei_hashv" outpath="xxx/test" imeiRdd = sc.textFile(inpath, use_unicode=False).map(getkey2) #imeiRdd = sc.textFile(inpath, use_unicode=False).map(getkey) schema = StructType([StructField("imei", StringType(), True)]) imeiDf = spark.createDataFrame(imeiRdd, schema) imeiDf.registerTempTable("t2") res = spark.sql("select imei from t2") res.repartition(1).write.format("csv").save(outpath)
(1)用sc.textFile()讀取 inpath的文件成為rdd,文件只有一列,前三行為
13279285433414550239
492335506325762025
12750066214056691161
(2)schema = StructType([StructField("imei", StringType(), True)]) 表示這列的列名是imei,數據類型為 StringType
(3)然后把 rdd + schema 轉換成dataframe,
(4)把dataframe注冊成臨時表t2,以方便使用sql語句。
----------------------------------------------------------------------------------------------------------------------------------------------------
但是在map函數中使用 getkey2 函數總是報錯 StructType can not accept object '13279285433414550239' in type <type 'str'>
原因是schema 這里是一個數組,雖然只有一列。而在 getkey2函數 中 return key 返回的是一個string,spark不能把string 解析成"數組"。改成 return (key, ) 或者 return [key] 即可,這時返回的就是一個只有一列的"數組"了,能與schema對應上。
平時在map函數中多是返回多個值 return a,b,c,d 這樣,這次只返回一個值的時候遇到了問題,查了許多類似的問答才發現。
https://stackoverflow.com/questions/52586199/cannot-create-dataframe-in-pyspark
https://stackoverflow.com/questions/44334326/data-not-being-populated-with-dataframe-pyspark