一.官網提供的Elasticsearch的Python接口包
1.github地址:https://github.com/elastic/elasticsearch-dsl-py
2.安裝:pip install elasticsearch-dsl
3.有很多api,使用可參考github中的文檔
二.定義寫入es的Pipeline:
1.生成索引,type及映射:
有可能會報IllegalOperation異常,訪問本地9200端口查看es版本,然后將python中的elasticsearch和elasticsearch-dsl改成相近版本即可
# _*_ encoding:utf-8 _*_ __author__ = 'LYQ' __date__ = '2018/10/29 11:02' #新版本把DocType改為Docment from datetime import datetime from elasticsearch_dsl import DocType,Date, Nested, Boolean, \ analyzer, Completion, Keyword, Text, Integer from elasticsearch_dsl.connections import connections # es連接到本地,可以連接到多台服務器 connections.create_connection(hosts=["localhost"]) class ArticleType(DocType): "定義es映射" # 以ik解析 title = Text(analyzer="ik_max_word") create_date = Date() # 不分析 url = Keyword() url_object_id = Keyword() front_image_url = Keyword() front_image_path = Keyword() praise_nums = Integer() fav_nums = Integer() comment_nums = Integer() tags = Text(analyzer="ik_max_word") content = Text(analyzer="ik_max_word") class Meta: #定義索引和type index = "jobbole" doc_type = "artitle" if __name__ == "__main__": #調用init()方法便能生成相應所應和映射 ArticleType.init()
2.創建相應item:
#導入定義的es映射 from models.es import ArticleType from w3lib.html import remove_tags class ElasticsearchPipeline(object): """ 數據寫入elasticsearch,定義pipeline,記得配置進setting """ class ElasticsearchPipeline(object): """ 數據寫入elasticsearch """ class ElasticsearchPipeline(object): """ 數據寫入elasticsearch """ def process_item(self, item, spider): #將定義的elasticsearch映射實列化 articletype=ArticleType() articletype.title= item["title"] articletype.create_date = item["create_date"] articletype.url = item["url"] articletype.front_image_url = item["front_image_url"] if "front_image_path" in item: articletype.front_image_path = item["front_image_path"] articletype.praise_nums = item["praise_nums"] articletype.fav_nums = item["fav_nums"] articletype.comment_nums = item["comment_nums"] articletype.tags = item["tags"] articletype.content = remove_tags(item["content"]) articletype.meta.id = item["url_object_id"] articletype.save() return item
查看9100端口,數據插入成功
class JobboleArticleSpider(scrapy.Item): ...... def save_to_es(self): "在item中分別定義存入es,方便不同的字段的保存" articletype = ArticleType() articletype.title = self["title"] articletype.create_date = self["create_date"] articletype.url = self["url"] articletype.front_image_url = self["front_image_url"] if "front_image_path" in self: articletype.front_image_path = self["front_image_path"] articletype.praise_nums = self["praise_nums"] articletype.fav_nums = self["fav_nums"] articletype.comment_nums = self["comment_nums"] articletype.tags = self["tags"] articletype.content = remove_tags(self["content"]) articletype.meta.id = self["url_object_id"] articletype.save()
class ElasticsearchPipeline(object): """ 數據寫入elasticsearch """ def process_item(self, item, spider): # 將定義的elasticsearch映射實列化 #調用item中的方法 item.save_to_es() return item
三.搜索建議:
實質調用anylyer接口如下:
GET _analyze { "analyzer": "ik_max_word", "text" : "Python網絡基礎學習" }
es文件中:
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer esc=connections.create_connection(ArticleType._doc_type.using) class Customanalyzer(_CustomAnalyzer): """自定義analyser""" def get_analysis_definition(self): # 重寫該函數返回空字典 return {} ik_analyser = Customanalyzer("ik_max_word", filter=["lowercase"]) class ArticleType(DocType): "定義es映射" suggest = Completion(analyzer=ik_analyser) ......
生成該字段的信息
2.item文件:
...... from models.es import esc def get_suggest(index, info_tuple): """根據字符串和權重生成搜索建議數組""" used_words = set() suggests = [] for text, weight in info_tuple: if text: # 調用es得analyer接口分析字符串 # 返回解析后得分詞數據 words = esc.indices.analyze(index=index, analyer="ik_max_word", params={"filter": ["lowercase"]}, body=text) # 生成式過濾掉長度為1的 anylyzed_words = set([r["token"] for r in words if len(r) > 1]) # 去重 new_words = anylyzed_words - used_words else: new_words = set() if new_words: suggests.append({"input": list(new_words), "weight": weight})
return suggests class JobboleArticleSpider(scrapy.Item): ...... def save_to_es(self): articletype = ArticleType() .......# 生成搜索建議字段,以及字符串和權重 articletype.suggest = get_suggest(ArticleType._doc_type.index,((articletype.title,1),(articletype.tags,7)) ) articletype.save()