pyspark 使用時環境設置


 

在腳本中導入pyspark的流程

import os 

import sys

spark_name = os.environ.get('SPARK_HOME',None)

# SPARK_HOME即spark的安裝目錄,不用到bin級別,一般為/usr/local/spark

if not spark_home:

    raise ValueErrorError('spark 環境沒有配置好')

 

# sys.path是Python的第三方包查找的路徑列表,將需要導入的包的路徑添加進入,避免 can't find modal xxxx

# 這個方法應該同 spark-submit提交時添加參數 --py_files='/path/to/my/python/packages.zip',將依賴包打包成zip 添加進去 效果一致

sys.path.insert(0,'/root/virtualenvs/my_envs/lib/python3.6/site-packages/')

sys.path.insert(0,os.path.join(spark_name,'python')

sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.7-src.zip'))

# sys.path.insert(0,os.path.join(spark_name,'libexec/python'))

# sys.path.insert(0,os.path.join(spark_name,'libexex/python/build'))

 

from pyspark import SparkConf, SparkContext

 

設置pyspark運行時的python版本

vi ~/.bashrc

export PYSPARK_PYTHON=/usr/local/bin/python3 

export PYSPARK_DRIVER_PYTHON=ipython3

編輯完保存退出

source ~/.bashrc

 

使用pyspark處理hbase缺少jar包時需配置環境

spark加載配置的默認目錄是 SPARK_HOME/conf/spark-env.sh ,不存在此目錄此文件時可自行創建

一般來說在spark-env.sh的末尾需要添加幾行

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)   不添加這一行可能導致java class not found 之類的異常

export JAVA_HOME=/usr/java/jdk1.8.0_191-amd64/jre

export HADOOP_HOME=/usr/local/hadoop

export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

export SPARK_MASTER_HOST=HDP-master

export SPARK_WORKER_CORES=4     設置每個worker最多使用的核數,可設置為機器的內核數

export SPARK_WORKER_MEMORY=4g    設置每個worker最多使用的內存

 

spark處理hbase時需要一些hbase的jar包,可以在SPARK_HOME/jars/下新建一個hbase目錄,然后將HBASE_HOME/lib/下面的相關包都復制過來

(也可單獨復制lib目錄下的這些包 hbase*.jar ,guava-12.0.1.jar,htrace-core-3.1.0-incubating.jar , protobuf-java-2.5.0.jar )

另外需下載把hbase的數據轉換為Python可讀取的jar包 spark-example-1.6.0.jar

(下載頁面地址為https://mvnrepository.com/artifact/org.apache.spark/spark-example_2.11/1.6.0-typesafe-001 )

這樣就需要將spark-env.sh中的SPARK_DIST_CLASSPATH的值修改為

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

 

使用spark讀寫hbase的相關代碼流程

host = 'master,slave1,slave2'

hbase_table = 'TEST:test1'

conf = {"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":hbase_table}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

# 讀取habse表中的數據到rdd

hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable",

"org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)

count = hbase_rdd.count()

one = hbase_rdd.first()            查看rdd的第一條數據tuple(rowkey,'\n'.join(str(json_value)))

one_value = one[1].split('\n')

one_value[1]    形式為'{"qualifier":"列名","timestamp":"1560533059864","columnFamily":"列簇名", "row":"0000632232_1550712079","type":"Put","value":"0"}'

       

寫入hbase

write_table = 'student'

write_keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"

write_valueConv= "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

conf = {"hbase.zookeeper.quorum":host,"hbase.mapred.outputtable":table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",

"mapreduce.job.output.key.class":"org.apache.hadoop.habse.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}

rawData = ['3,info,age,19','4,info,age,17'] # 最后將數據改成[rowkey,[rowkey,column family, column name,value]]形式寫進hbase

sc.parallelize(rawData).map(lambda x:(x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 

  

 

 

 

spark啟動后對應的進程是WORKER 和 MASTER

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM