pyspark 針對Elasticsearch的讀寫操作


1.創建spark與Elasticsearch的連接

為了對Elasticsearch進行讀寫操作,需要添加Elasticsearch的依賴包,其中,添加依賴包(org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7.jar)有下面的三種方式:

1)將依賴包直接放在安裝spark目錄下面的jars目錄下,即可;

2) 在提交任務時,利用spark submit --jars 的方式

3)在創建spark對象時,添加依賴,如下圖所示

spark = SparkSession \
.builder \
.appName('es connection') \
.config('spark.jars.packages', "org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7") \
.getOrCreate()

2.spark 讀取Elasticsearch的數據

df3 = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", '節點') \
.option('es.port', '端口') \
.option("es.resource", '索引/索引類型') \
.option('es.query', '?q=*') \
.option('es.nodes.wan.only','true') \
.option("es.nodes.discovery", "false") \
.option("es.index.auto.create", "true") \
.option("es.write.ignore_exception", "true") \
.option("es.read.ignore_exception","true") \
.load()

3.spark 寫入elasticsearch

df.write.format('org.elasticsearch.spark.sql') \
.option('es.nodes', '節點') \
.option('es.port', '9200') \
.option('es.nodes.wan.only', 'true') \
.option("es.nodes.discovery", "false") \
.option('es.resource', '索引/索引類型') \
.save(mode='append')

備注:

當spark讀寫elasticsearch的過程中,elasticsearch包含Array類型的字段,就會出現下面錯誤:

 

無法將List類型數據寫入到es, 或者從es讀出list類型數據

解決方案:

在option 中添加一個es.read.field.as.array.include屬性,value為list Schema的字段名

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM