python之路 之一pyspark


pip包下載安裝pyspark

pip install pyspark  這里可能會遇到安裝超時的情況   加參數  --timeout=100

pip   -default   -timeout=100     install -U pyspark 

下面是我寫的一些代碼,在運行時,沒什么問題,但是目前不知道怎么拿到rdd與dataframe中的值 

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext,Row,DataFrame
from pyspark.sql.types import StructType,StructField,StringType,IntegerType



appname = "myappname"
master = "local"
myconf = SparkConf().setAppName(appname).setMaster(master)
sc = SparkContext(conf=myconf)
hc = HiveContext(sc)


# 構建一個表格 Parallelize a list and convert each line to a Row 將列表並行化並將每行轉換為一行
# 構建表可以用applySchema 或者 inferSchema inferSchema已經在1.5之后棄用,由createDataFrame代替
datas = ["1 b 28", "3 c 30", "2 d 29"]
source = sc.parallelize(datas)

splits = source.map(lambda line: line.split(" ")) # 后面是注釋
rows = splits.map(lambda words : Row(id=int(words[0]),name=words[1],age=int(words[2])))

myrows = Row(id="a",name="zhangkun",age="28")
#print(myrows.__getitem__(0))
#print(myrows.__getitem__(1))
#print(myrows.__getitem__(2))

# Infer the schema,and register the schema as a table 推斷架構,並將架構注冊為表
fields=[]
fields.append(StructField("id", IntegerType(), True))
fields.append(StructField("name", StringType(), True))
fields.append(StructField("age", IntegerType(), True))
schema = StructType(fields)
people=hc.createDataFrame(myrows,schema); # 1.5之前使用的是inferSchema
# people.printSchema()
people.registerTempTable("people")
# SQL can be run over SchemaRDD that been registered as a table sql 可以在注冊過的表上正常運行了
results=hc.sql("select * from people")

#print(results.show)
for i in results :
print(i)
sc.stop()


突然來個新任務,CDH部署大數據分布式平台 ,含以下組建安裝:hadoop、hbase、hive、kafka、spark 暫時上面的線擱置,等回頭用到在看,主要還是本人基礎比較差,需要多學習一些基礎。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM