pip包下載安裝pyspark
pip install pyspark 這里可能會遇到安裝超時的情況 加參數 --timeout=100
pip -default -timeout=100 install -U pyspark
下面是我寫的一些代碼,在運行時,沒什么問題,但是目前不知道怎么拿到rdd與dataframe中的值
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext,Row,DataFrame
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
appname = "myappname"
master = "local"
myconf = SparkConf().setAppName(appname).setMaster(master)
sc = SparkContext(conf=myconf)
hc = HiveContext(sc)
# 構建一個表格 Parallelize a list and convert each line to a Row 將列表並行化並將每行轉換為一行
# 構建表可以用applySchema 或者 inferSchema inferSchema已經在1.5之后棄用,由createDataFrame代替
datas = ["1 b 28", "3 c 30", "2 d 29"]
source = sc.parallelize(datas)
splits = source.map(lambda line: line.split(" ")) # 后面是注釋
rows = splits.map(lambda words : Row(id=int(words[0]),name=words[1],age=int(words[2])))
myrows = Row(id="a",name="zhangkun",age="28")
#print(myrows.__getitem__(0))
#print(myrows.__getitem__(1))
#print(myrows.__getitem__(2))
# Infer the schema,and register the schema as a table 推斷架構,並將架構注冊為表
fields=[]
fields.append(StructField("id", IntegerType(), True))
fields.append(StructField("name", StringType(), True))
fields.append(StructField("age", IntegerType(), True))
schema = StructType(fields)
people=hc.createDataFrame(myrows,schema); # 1.5之前使用的是inferSchema
# people.printSchema()
people.registerTempTable("people")
# SQL can be run over SchemaRDD that been registered as a table sql 可以在注冊過的表上正常運行了
results=hc.sql("select * from people")
#print(results.show)
for i in results :
print(i)
sc.stop()
突然來個新任務,CDH部署大數據分布式平台 ,含以下組建安裝:hadoop、hbase、hive、kafka、spark 暫時上面的線擱置,等回頭用到在看,主要還是本人基礎比較差,需要多學習一些基礎。