ElasticSearch 使用mysql熱更新 詞庫
這個也是《ELK高級搜索》的課件,但是應該是參考這篇博客的:https://blog.csdn.net/wuzhiwei549/article/details/80451302
1熱更新
每次都是在es的擴展詞典中,手動添加新詞語,很坑
(1)每次添加完,都要重啟es才能生效,非常麻煩
(2)es是分布式的,可能有數百個節點,你不能每次都一個一個節點上面去修改
es不停機,直接我們在外部某個地方添加新的詞語,es中立即熱加載到這些新詞語
熱更新的方案
(1)基於ik分詞器原生支持的熱更新方案,部署一個web服務器,提供一個http接口,通過modified和tag兩個http響應頭,來提供詞語的熱更新
(2)修改ik分詞器源碼,然后手動支持從mysql中每隔一定時間,自動加載新的詞庫
用第二種方案,第一種,ik git社區官方都不建議采用,覺得不太穩定
2步驟
1、下載源碼
https://github.com/medcl/elasticsearch-analysis-ik/releases
ik分詞器,是個標准的java maven工程,直接導入idea就可以看到源碼
2、修改源
在pom里面添加mysql的驅動
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
/**
* IK 中文分詞 版本 5.0
* IK Analyzer release 5.0
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* 源代碼由林良益(linliangyi2005@gmail.com)提供
* 版權聲明 2012,烏龍茶工作室
* provided by Linliangyi and copyright 2012 by Oolong studio
*
*
*/
package org.wltea.analyzer.dic;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.Files;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.*;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.plugin.analysis.ik.AnalysisIkPlugin;
import org.wltea.analyzer.cfg.Configuration;
import org.apache.logging.log4j.Logger;
import org.wltea.analyzer.help.ESPluginLoggerFactory;
/**
* 詞典管理類,單子模式
*/
public class Dictionary {
/*
* 詞典單子實例
*/
private static Dictionary singleton;
private DictSegment _MainDict;
private DictSegment _QuantifierDict;
private DictSegment _StopWords;
/**
* 配置對象
*/
private Configuration configuration;
private static final Logger logger = ESPluginLoggerFactory.getLogger(Monitor.class.getName());
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
private static final String PATH_DIC_MAIN = "main.dic";
private static final String PATH_DIC_SURNAME = "surname.dic";
private static final String PATH_DIC_QUANTIFIER = "quantifier.dic";
private static final String PATH_DIC_SUFFIX = "suffix.dic";
private static final String PATH_DIC_PREP = "preposition.dic";
private static final String PATH_DIC_STOP = "stopword.dic";
private final static String FILE_NAME = "IKAnalyzer.cfg.xml";
private final static String EXT_DICT = "ext_dict";
private final static String REMOTE_EXT_DICT = "remote_ext_dict";
private final static String EXT_STOP = "ext_stopwords";
private final static String REMOTE_EXT_STOP = "remote_ext_stopwords";
private Path conf_dir;
private Properties props;
private Dictionary(Configuration cfg) {
this.configuration = cfg;
this.props = new Properties();
this.conf_dir = cfg.getEnvironment().configFile().resolve(AnalysisIkPlugin.PLUGIN_NAME);
Path configFile = conf_dir.resolve(FILE_NAME);
InputStream input = null;
try {
logger.info("try load config from {}", configFile);
input = new FileInputStream(configFile.toFile());
} catch (FileNotFoundException e) {
conf_dir = cfg.getConfigInPluginDir();
configFile = conf_dir.resolve(FILE_NAME);
try {
logger.info("try load config from {}", configFile);
input = new FileInputStream(configFile.toFile());
} catch (FileNotFoundException ex) {
// We should report origin exception
logger.error("ik-analyzer", e);
}
}
if (input != null) {
try {
props.loadFromXML(input);
} catch (IOException e) {
logger.error("ik-analyzer", e);
}
}
}
private String getProperty(String key){
if(props!=null){
return props.getProperty(key);
}
return null;
}
/**
* 詞典初始化 由於IK Analyzer的詞典采用Dictionary類的靜態方法進行詞典初始化
* 只有當Dictionary類被實際調用時,才會開始載入詞典, 這將延長首次分詞操作的時間 該方法提供了一個在應用加載階段就初始化字典的手段
*
* @return Dictionary
*/
public static synchronized void initial(Configuration cfg) {
if (singleton == null) {
synchronized (Dictionary.class) {
if (singleton == null) {
singleton = new Dictionary(cfg);
singleton.loadMainDict();
singleton.loadSurnameDict();
singleton.loadQuantifierDict();
singleton.loadSuffixDict();
singleton.loadPrepDict();
singleton.loadStopWordDict();
//!!!!!!!!!!!!!!mysql監控線程
new Thread(new HotDictReloadThread()).start();
if(cfg.isEnableRemoteDict()){
// 建立監控線程
for (String location : singleton.getRemoteExtDictionarys()) {
// 10 秒是初始延遲可以修改的 60是間隔時間 單位秒
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
for (String location : singleton.getRemoteExtStopWordDictionarys()) {
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
}
}
}
}
}
private void walkFileTree(List<String> files, Path path) {
if (Files.isRegularFile(path)) {
files.add(path.toString());
} else if (Files.isDirectory(path)) try {
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
files.add(file.toString());
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException e) {
logger.error("[Ext Loading] listing files", e);
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
logger.error("[Ext Loading] listing files", e);
} else {
logger.warn("[Ext Loading] file not found: " + path);
}
}
private void loadDictFile(DictSegment dict, Path file, boolean critical, String name) {
try (InputStream is = new FileInputStream(file.toFile())) {
BufferedReader br = new BufferedReader(
new InputStreamReader(is, "UTF-8"), 512);
String word = br.readLine();
if (word != null) {
if (word.startsWith("\uFEFF"))
word = word.substring(1);
for (; word != null; word = br.readLine()) {
word = word.trim();
if (word.isEmpty()) continue;
dict.fillSegment(word.toCharArray());
}
}
} catch (FileNotFoundException e) {
logger.error("ik-analyzer: " + name + " not found", e);
if (critical) throw new RuntimeException("ik-analyzer: " + name + " not found!!!", e);
} catch (IOException e) {
logger.error("ik-analyzer: " + name + " loading failed", e);
}
}
private List<String> getExtDictionarys() {
List<String> extDictFiles = new ArrayList<String>(2);
String extDictCfg = getProperty(EXT_DICT);
if (extDictCfg != null) {
String[] filePaths = extDictCfg.split(";");
for (String filePath : filePaths) {
if (filePath != null && !"".equals(filePath.trim())) {
Path file = PathUtils.get(getDictRoot(), filePath.trim());
walkFileTree(extDictFiles, file);
}
}
}
return extDictFiles;
}
private List<String> getRemoteExtDictionarys() {
List<String> remoteExtDictFiles = new ArrayList<String>(2);
String remoteExtDictCfg = getProperty(REMOTE_EXT_DICT);
if (remoteExtDictCfg != null) {
String[] filePaths = remoteExtDictCfg.split(";");
for (String filePath : filePaths) {
if (filePath != null && !"".equals(filePath.trim())) {
remoteExtDictFiles.add(filePath);
}
}
}
return remoteExtDictFiles;
}
private List<String> getExtStopWordDictionarys() {
List<String> extStopWordDictFiles = new ArrayList<String>(2);
String extStopWordDictCfg = getProperty(EXT_STOP);
if (extStopWordDictCfg != null) {
String[] filePaths = extStopWordDictCfg.split(";");
for (String filePath : filePaths) {
if (filePath != null && !"".equals(filePath.trim())) {
Path file = PathUtils.get(getDictRoot(), filePath.trim());
walkFileTree(extStopWordDictFiles, file);
}
}
}
return extStopWordDictFiles;
}
private List<String> getRemoteExtStopWordDictionarys() {
List<String> remoteExtStopWordDictFiles = new ArrayList<String>(2);
String remoteExtStopWordDictCfg = getProperty(REMOTE_EXT_STOP);
if (remoteExtStopWordDictCfg != null) {
String[] filePaths = remoteExtStopWordDictCfg.split(";");
for (String filePath : filePaths) {
if (filePath != null && !"".equals(filePath.trim())) {
remoteExtStopWordDictFiles.add(filePath);
}
}
}
return remoteExtStopWordDictFiles;
}
private String getDictRoot() {
return conf_dir.toAbsolutePath().toString();
}
/**
* 獲取詞典單子實例
*
* @return Dictionary 單例對象
*/
public static Dictionary getSingleton() {
if (singleton == null) {
throw new IllegalStateException("詞典尚未初始化,請先調用initial方法");
}
return singleton;
}
/**
* 批量加載新詞條
*
* @param words
* Collection<String>詞條列表
*/
public void addWords(Collection<String> words) {
if (words != null) {
for (String word : words) {
if (word != null) {
// 批量加載詞條到主內存詞典中
singleton._MainDict.fillSegment(word.trim().toCharArray());
}
}
}
}
/**
* 批量移除(屏蔽)詞條
*/
public void disableWords(Collection<String> words) {
if (words != null) {
for (String word : words) {
if (word != null) {
// 批量屏蔽詞條
singleton._MainDict.disableSegment(word.trim().toCharArray());
}
}
}
}
/**
* 檢索匹配主詞典
*
* @return Hit 匹配結果描述
*/
public Hit matchInMainDict(char[] charArray) {
return singleton._MainDict.match(charArray);
}
/**
* 檢索匹配主詞典
*
* @return Hit 匹配結果描述
*/
public Hit matchInMainDict(char[] charArray, int begin, int length) {
return singleton._MainDict.match(charArray, begin, length);
}
/**
* 檢索匹配量詞詞典
*
* @return Hit 匹配結果描述
*/
public Hit matchInQuantifierDict(char[] charArray, int begin, int length) {
return singleton._QuantifierDict.match(charArray, begin, length);
}
/**
* 從已匹配的Hit中直接取出DictSegment,繼續向下匹配
*
* @return Hit
*/
public Hit matchWithHit(char[] charArray, int currentIndex, Hit matchedHit) {
DictSegment ds = matchedHit.getMatchedDictSegment();
return ds.match(charArray, currentIndex, 1, matchedHit);
}
/**
* 判斷是否是停止詞
*
* @return boolean
*/
public boolean isStopWord(char[] charArray, int begin, int length) {
return singleton._StopWords.match(charArray, begin, length).isMatch();
}
/**
* 加載主詞典及擴展詞典
*/
private void loadMainDict() {
// 建立一個主詞典實例
_MainDict = new DictSegment((char) 0);
// 讀取主詞典文件
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
loadDictFile(_MainDict, file, false, "Main Dict");
// 加載擴展詞典
this.loadExtDict();
// 加載遠程自定義詞庫
this.loadRemoteExtDict();
// !!!!!!1從mysql加載詞典
this.loadMySQLExtDict();
}
private static Properties prop = new Properties();
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
logger.error("error", e);
}
}
/**
* 從mysql加載熱更新詞典
*/
private void loadMySQLExtDict() {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));
logger.info("[==========]jdbc-reload.properties");
for(Object key : prop.keySet()) {
logger.info("[==========]" + key + "=" + prop.getProperty(String.valueOf(key)));
}
logger.info("[==========]query hot dict from mysql, " + prop.getProperty("jdbc.reload.sql") + "......");
conn = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password"));
stmt = conn.createStatement();
rs = stmt.executeQuery(prop.getProperty("jdbc.reload.sql"));
while(rs.next()) {
String theWord = rs.getString("word");
logger.info("[==========]hot word from mysql: " + theWord);
_MainDict.fillSegment(theWord.trim().toCharArray());
}
Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
} catch (Exception e) {
logger.error("erorr", e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
}
}
/**
* 加載用戶配置的擴展詞典到主詞庫表
*/
private void loadExtDict() {
// 加載擴展詞典配置
List<String> extDictFiles = getExtDictionarys();
if (extDictFiles != null) {
for (String extDictName : extDictFiles) {
// 讀取擴展詞典文件
logger.info("[Dict Loading] " + extDictName);
Path file = PathUtils.get(extDictName);
loadDictFile(_MainDict, file, false, "Extra Dict");
}
}
}
/**
* 加載遠程擴展詞典到主詞庫表
*/
private void loadRemoteExtDict() {
List<String> remoteExtDictFiles = getRemoteExtDictionarys();
for (String location : remoteExtDictFiles) {
logger.info("[Dict Loading] " + location);
List<String> lists = getRemoteWords(location);
// 如果找不到擴展的字典,則忽略
if (lists == null) {
logger.error("[Dict Loading] " + location + "加載失敗");
continue;
}
for (String theWord : lists) {
if (theWord != null && !"".equals(theWord.trim())) {
// 加載擴展詞典數據到主內存詞典中
logger.info(theWord);
_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
}
}
}
private static List<String> getRemoteWords(String location) {
SpecialPermission.check();
return AccessController.doPrivileged((PrivilegedAction<List<String>>) () -> {
return getRemoteWordsUnprivileged(location);
});
}
/**
* 從遠程服務器上下載自定義詞條
*/
private static List<String> getRemoteWordsUnprivileged(String location) {
List<String> buffer = new ArrayList<String>();
RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10 * 1000).setConnectTimeout(10 * 1000)
.setSocketTimeout(60 * 1000).build();
CloseableHttpClient httpclient = HttpClients.createDefault();
CloseableHttpResponse response;
BufferedReader in;
HttpGet get = new HttpGet(location);
get.setConfig(rc);
try {
response = httpclient.execute(get);
if (response.getStatusLine().getStatusCode() == 200) {
String charset = "UTF-8";
// 獲取編碼,默認為utf-8
HttpEntity entity = response.getEntity();
if(entity!=null){
Header contentType = entity.getContentType();
if(contentType!=null&&contentType.getValue()!=null){
String typeValue = contentType.getValue();
if(typeValue!=null&&typeValue.contains("charset=")){
charset = typeValue.substring(typeValue.lastIndexOf("=") + 1);
}
}
if (entity.getContentLength() > 0) {
in = new BufferedReader(new InputStreamReader(entity.getContent(), charset));
String line;
while ((line = in.readLine()) != null) {
buffer.add(line);
}
in.close();
response.close();
return buffer;
}
}
}
response.close();
} catch (IllegalStateException | IOException e) {
logger.error("getRemoteWords {} error", e, location);
}
return buffer;
}
/**
* 加載用戶擴展的停止詞詞典
*/
private void loadStopWordDict() {
// 建立主詞典實例
_StopWords = new DictSegment((char) 0);
// 讀取主詞典文件
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_STOP);
loadDictFile(_StopWords, file, false, "Main Stopwords");
// 加載擴展停止詞典
List<String> extStopWordDictFiles = getExtStopWordDictionarys();
if (extStopWordDictFiles != null) {
for (String extStopWordDictName : extStopWordDictFiles) {
logger.info("[Dict Loading] " + extStopWordDictName);
// 讀取擴展詞典文件
file = PathUtils.get(extStopWordDictName);
loadDictFile(_StopWords, file, false, "Extra Stopwords");
}
}
// 加載遠程停用詞典
List<String> remoteExtStopWordDictFiles = getRemoteExtStopWordDictionarys();
for (String location : remoteExtStopWordDictFiles) {
logger.info("[Dict Loading] " + location);
List<String> lists = getRemoteWords(location);
// 如果找不到擴展的字典,則忽略
if (lists == null) {
logger.error("[Dict Loading] " + location + "加載失敗");
continue;
}
for (String theWord : lists) {
if (theWord != null && !"".equals(theWord.trim())) {
// 加載遠程詞典數據到主內存中
logger.info(theWord);
_StopWords.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
}
}
//!!!!!!!!!!!1從mysql加載停用詞
this.loadMySQLStopwordDict();
}
/**
* 從mysql加載停用詞
*/
private void loadMySQLStopwordDict() {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));
logger.info("[==========]jdbc-reload.properties");
for(Object key : prop.keySet()) {
logger.info("[==========]" + key + "=" + prop.getProperty(String.valueOf(key)));
}
logger.info("[==========]query hot stopword dict from mysql, " + prop.getProperty("jdbc.reload.stopword.sql") + "......");
conn = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password"));
stmt = conn.createStatement();
rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));
while(rs.next()) {
String theWord = rs.getString("word");
logger.info("[==========]hot stopword from mysql: " + theWord);
_StopWords.fillSegment(theWord.trim().toCharArray());
}
Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
} catch (Exception e) {
logger.error("erorr", e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
}
}
/**
* 加載量詞詞典
*/
private void loadQuantifierDict() {
// 建立一個量詞典實例
_QuantifierDict = new DictSegment((char) 0);
// 讀取量詞詞典文件
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_QUANTIFIER);
loadDictFile(_QuantifierDict, file, false, "Quantifier");
}
private void loadSurnameDict() {
DictSegment _SurnameDict = new DictSegment((char) 0);
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_SURNAME);
loadDictFile(_SurnameDict, file, true, "Surname");
}
private void loadSuffixDict() {
DictSegment _SuffixDict = new DictSegment((char) 0);
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_SUFFIX);
loadDictFile(_SuffixDict, file, true, "Suffix");
}
private void loadPrepDict() {
DictSegment _PrepDict = new DictSegment((char) 0);
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_PREP);
loadDictFile(_PrepDict, file, true, "Preposition");
}
void reLoadMainDict() {
logger.info("重新加載詞典...");
// 新開一個實例加載詞典,減少加載過程對當前詞典使用的影響
Dictionary tmpDict = new Dictionary(configuration);
tmpDict.configuration = getSingleton().configuration;
tmpDict.loadMainDict();
tmpDict.loadStopWordDict();
_MainDict = tmpDict._MainDict;
_StopWords = tmpDict._StopWords;
logger.info("重新加載詞典完畢...");
}
}
/**
* @author WGR
* @create 2020/10/28 -- 14:35
*/
public class HotDictReloadThread implements Runnable {
private static final Logger logger = ESPluginLoggerFactory.getLogger(HotDictReloadThread.class.getName());
@Override
public void run() {
while(true) {
logger.info("[==========]reload hot dict from mysql......");
Dictionary.getSingleton().reLoadMainDict();
}
}
}
3、mvn package打包代碼
4、解壓縮ik壓縮包
將mysql驅動jar,放入lib的目錄下
5、修改jdbc相關配置
6、重啟es
觀察日志,日志中就會顯示我們打印的那些東西,比如加載了什么配置,加載了什么詞語,什么停用詞
7、在mysql中添加詞庫與停用詞
8、分詞實驗,驗證熱更新生效
GET /_analyze
{
"analyzer": "ik_smart",
"text": "我是大臉排"
}