Solr3.4 dataimport 配置實踐筆記


一、下載試用

下載apache-solr-3.4.0.zip http://lucene.apache.org/solr/

解壓到本地硬盤,切換到example目錄下,這里我們使用solr自帶的容器運行部署,執行

java -jar start.jar

訪問:http://localhost:8983/solr/admin/ 正常說明solr已經成功啟動。

二、集成mysql


1、創建表

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for `documents`
-- ----------------------------
DROP TABLE IF EXISTS `documents`;
CREATE TABLE `documents` (
  `id` int(11) NOT NULL auto_increment,
  `date_added` datetime NOT NULL,
  `title` varchar(255) NOT NULL,
  `content` text NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of documents
-- ----------------------------
INSERT INTO `documents` VALUES ('1', '2012-01-11 23:15:59', '你好', '測試');
INSERT INTO `documents` VALUES ('2', '2012-01-11 23:16:30', 'hello', 'test');

2、將dataimporthandler的包或者源文件加到工程文件中。並在solr-conf/conf 加入 data-config.xml 文件。

data-config.xml

<dataConfig>
  <dataSource type="JdbcDataSource"
   driver="com.mysql.jdbc.Driver"
   url="jdbc:mysql://localhost/test"
   user="test"
   password="test"
   />
 <document name="documents1" >
        <entity name="documents"

          query="select id,title,content,date_added from documents"
          deltaImportQuery="select  id,title,content,date_added  from documents where ID='${dataimporter.delta.id}'"
          deltaQuery="select id  from documents where date_added &gt; '${dataimporter.last_index_time}'"
          deletedPkQuery="select id  from documents where id=0"
        >
            <field column="id" name="id" />
            <field column="title" name="title" />
            <field column="content" name="content" />
            <field column="date_added" name="date_added" />
        </entity>
  </document>
</dataConfig>

上面指定了數據庫連接路徑。
query 用於初次導入到索引的sql語句。
deltaImportQuery 根據ID取得需要進入的索引的單條數據。
deltaQuery 用於增量索引的sql語句,用於取得需要增量索引的ID。
deletedPkQuery 用於取出需要從索引中刪除文檔的的ID。

field 用於映射數據庫字段到索引的文檔的field。

3、solrconfig.xml 加入 requestHandler請求處理器,指定下data-config.xml文件。

 <requestHandler name="/dataimport">
  <lst name="defaults">
    <str name="config">data-config.xml</str>
  </lst>
</requestHandler>

4、schema.xml中指定filed的索引類型。

<field name="id" type="string" indexed="true" stored="true" required="true" />
<field name="title" type="text_general" indexed="true" stored="true" termVectors="true" termPositions="true" termOffsets="true"/>
<field name="content" type="text_general" indexed="true" stored="true" termVectors="true" termPositions="true" termOffsets="true"/>
<field name="date_added" type="date" indexed="false" stored="true"/>

type 為filed類型,一般是string,text_general,int,long,date類型。
indexed 是否需要索引的,這樣可以用該字段來檢索。
stored 是否需要儲存的,需要前端顯示的內容都是要儲存。
termVectors,termPositions,termOffsets 如果你前端顯示需要帶高亮和位置的,這幾個字段都要設置為true。

 

三、中文支持

1、支持中文查詢

web.xml加入filter

<filter>
    <filter-name>Set Character Encoding</filter-name>
    <filter-class>filters.SetCharacterEncodingFilter</filter-class>
    <init-param>
        <param-name>encoding</param-name>
        <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
        <param-name>ignore</param-name>
        <param-value>true</param-value>
    </init-param>
</filter>

<filter-mapping>
    <filter-name>Set Character Encoding</filter-name>
    <url-pattern>/*</url-pattern>
</filter-mapping>

SetCharacterEncodingFilter類

package filters;

import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.UnavailableException;
public class SetCharacterEncodingFilter implements Filter {

 
 protected String encoding = null;

 
 protected FilterConfig filterConfig = null;

 
 protected boolean ignore = true;

 public void destroy() {

  this.encoding = null;
  this.filterConfig = null;

 }

 
 public void doFilter(ServletRequest request, ServletResponse response,
   FilterChain chain) throws IOException, ServletException {

  // Conditionally select and set the character encoding to be used
  if (ignore || (request.getCharacterEncoding() == null)) {
   String encoding = selectEncoding(request);
   if (encoding != null)
    request.setCharacterEncoding(encoding);
  }

  // Pass control on to the next filter
  chain.doFilter(request, response);

 }

 
 public void init(FilterConfig filterConfig) throws ServletException {

  this.filterConfig = filterConfig;
  this.encoding = filterConfig.getInitParameter("encoding");
  String value = filterConfig.getInitParameter("ignore");
  if (value == null)
   this.ignore = true;
  else if (value.equalsIgnoreCase("true"))
   this.ignore = true;
  else if (value.equalsIgnoreCase("yes"))
   this.ignore = true;
  else
   this.ignore = false;

 }

 protected String selectEncoding(ServletRequest request) {

  return (this.encoding);

 }

}

tomcat中conf下的server.xml修改為

<Connector port="8080" protocol="HTTP/1.1"
               connectionTimeout="20000"
               redirectPort="8443"  URIEncoding="UTF-8"/>

加入了URIEncoding="UTF-8"

2、支持中文分詞

下載IKAnalyzer3.2.8.jar並加入lib中,IKAnalyzer已經提供了solr接口的支持。

修改solr中conf目錄中的schema.xml配置文件

<tokenizer class="solr.StandardTokenizerFactory" />

改為

<tokenizer class="org.wltea.analyzer.solr.IKTokenizerFactory" isMaxWordLength="true">

 

四、測試運行

配置沒問題后,這里部署到8080下,solr項目,訪問http://localhost:8080/solr/dataimport 沒報錯,說明配置正常。

http://localhost:8080/solr/dataimport?command=full-import&commit=y&clean=true 第一次初始化導入,調用的是query語句。

http://localhost:8080/solr/dataimport?command=delta-import&commit=y 增量導入,調用deltaImportQuery,deltaQuery語句,同時會根據deletedPkQuery來刪除索引。

http://localhost:8080/solr/dataimport?command=show-config  查看配置文件

http://localhost:8080/solr/dataimport?command=reload-config  重新載入配置

http://localhost:8080/solr/dataimport?command=abort 停止導入

 

相關參數如下:
entity
entity是document下面的標簽(data-config.xml)。使用這個參數可以有選擇的執行一個或多個entity   。使用多個entity參數可以使得多個entity同時運行。如果不選擇此參數那么所有的都會被運行。

clean
選擇是否要在索引開始構建之前刪除之前的索引,默認為true

commit
選擇是否在索引完成之后提交。默認為true

optimize
是否在索引完成之后對索引進行優化。默認為true

debug
是否以調試模式運行,適用於交互式開發(interactive development mode)之中。請注意,如果以調試模式運行,那么默認不會自動提交,請加參數“commit=true”

 

五、檢索查詢

打開http://localhost:8080/solr/admin/index.jsp,Query String:中可以輸入查詢條件

查詢全部數據
*:*

單字段查詢
title:hello

多字段查詢(與、或)
text:福田南路 and roomNum:1 and roomNum:1
text:福田南路 or roomNum:1 or roomNum:1

返回指定字段
fl=name,id 返回 name id
fl=name,id,score 返回 name id 以及得分score
fl=*,score 返回所有字段,並且帶上得分score

返回json格式
&wt=json

高亮支持,hl開啟高亮,hl.fl高亮字段,hl.simple.pre高亮前面的格式,hl.simple.post高亮后面格式,hl.snippets高亮片段,hl.fragsize高亮片段的字數。
&hl=true&hl.fl=proName,detailText&hl.simple.pre=%3Cb%3E&hl.simple.post=%3C/b%3E&hl.snippets=3&hl.fragsize=50

排序
sort=id desc

區間查詢
createtime:[1327979772 TO 1327979773]
createtime:[1327979773 TO *]

過濾查詢 facet是否開啟過濾查詢,facet.field過濾查詢字段
facet=true&facet.field=name

時間查詢
facet=true&facet.date=insertDate&facet.date.start=2012-01-01T00:00:00Z&facet.date.end=2012-01-01T00:00:00Z&facet.date.gap=+1YEAR

六、scheduler支持(增量調度支持)用於定期支持增量sql。

1、org.apache.solr.handler.dataimport.scheduler 加入3個類。

ApplicationListener.java

package org.apache.solr.handler.dataimport.scheduler;

import java.util.Calendar;
import java.util.Date;
import java.util.Timer;

import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationListener implements ServletContextListener {

        private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);

        @Override
        public void contextDestroyed(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();

                // get our timer from the context
                Timer timer = (Timer)servletContext.getAttribute("timer");

                // cancel all active tasks in the timers queue
                if (timer != null)
                        timer.cancel();

                // remove the timer from the context
                servletContext.removeAttribute("timer");

        }

        @Override
        public void contextInitialized(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();
                try{
                        // create the timer and timer task objects
                        Timer timer = new Timer();
                        HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);

                        // get our interval from HTTPPostScheduler
                        int interval = task.getIntervalInt();

                        // get a calendar to set the start time (first run)
                        Calendar calendar = Calendar.getInstance();

                        // set the first run to now + interval (to avoid fireing while the app/server is starting)
                        calendar.add(Calendar.MINUTE, interval);
                        Date startTime = calendar.getTime();

                        // schedule the task
                        timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);

                        // save the timer in context
                        servletContext.setAttribute("timer", timer);

                } catch (Exception e) {
                        if(e.getMessage().endsWith("disabled")){
                                logger.info("Schedule disabled");
                        }else{
                                logger.error("Problem initializing the scheduled task: ", e);
                        }
                }
        }

}

HTTPPostScheduler.java

package org.apache.solr.handler.dataimport.scheduler;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTPPostScheduler extends TimerTask {
        private String syncEnabled;
        private String[] syncCores;
        private String server;
        private String port;
        private String webapp;
        private String params;
        private String interval;
        private String cores;
        private SolrDataImportProperties p;
        private boolean singleCore;

        private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);

        public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
                //load properties from global dataimport.properties
                p = new SolrDataImportProperties();
                reloadParams();
                fixParams(webAppName);

                if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");

                if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
                        singleCore = true;
                        logger.info("<index update process> Single core identified in dataimport.properties");
                }else{
                        singleCore = false;
                        logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores);
                }
        }

        private void reloadParams(){
                p.loadProperties(true);
                syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
                cores           = p.getProperty(SolrDataImportProperties.SYNC_CORES);
                server          = p.getProperty(SolrDataImportProperties.SERVER);
                port            = p.getProperty(SolrDataImportProperties.PORT);
                webapp          = p.getProperty(SolrDataImportProperties.WEBAPP);
                params          = p.getProperty(SolrDataImportProperties.PARAMS);
                interval        = p.getProperty(SolrDataImportProperties.INTERVAL);
                syncCores       = cores != null ? cores.split(",") : null;
        }

        private void fixParams(String webAppName){
                if(server == null || server.isEmpty())  server = "localhost";
                if(port == null || port.isEmpty())              port = "8080";
                if(webapp == null || webapp.isEmpty())  webapp = webAppName;
                if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30";
        }

        public void run() {
                try{
                        // check mandatory params
                        if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){
                                logger.warn("<index update process> Insuficient info provided for data import");
                                logger.info("<index update process> Reloading global dataimport.properties");
                                reloadParams();

                        // single-core
                        }else if(singleCore){
                                prepUrlSendHttpPost();

                        // multi-core
                        }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
                                logger.warn("<index update process> No cores scheduled for data import");
                                logger.info("<index update process> Reloading global dataimport.properties");
                                reloadParams();

                        }else{
                                for(String core : syncCores){
                                        prepUrlSendHttpPost(core);
                                }
                        }
                }catch(Exception e){
                        logger.error("Failed to prepare for sendHttpPost", e);
                        reloadParams();
                }
        }
        private void prepUrlSendHttpPost(){
                String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;
                sendHttpPost(coreUrl, null);
        }

        private void prepUrlSendHttpPost(String coreName){
                String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;
                sendHttpPost(coreUrl, coreName);
        }
        private void sendHttpPost(String completeUrl, String coreName){
                DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
                Date startTime = new Date();

                // prepare the core var
                String core = coreName == null ? "" : "[" + coreName + "] ";

                logger.info(core + "<index update process> Process started at .............. " + df.format(startTime));

                try{

                    URL url = new URL(completeUrl);
                    HttpURLConnection conn = (HttpURLConnection)url.openConnection();

                    conn.setRequestMethod("POST");
                    conn.setRequestProperty("type", "submit");
                    conn.setDoOutput(true);

                        // Send HTTP POST
                    conn.connect();

                    logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());
                    logger.info(core + "<index update process> Succesfully connected to server\t" + server);
                    logger.info(core + "<index update process> Using port\t\t\t" + port);
                    logger.info(core + "<index update process> Application name\t\t\t" + webapp);
                    logger.info(core + "<index update process> URL params\t\t\t" + params);
                    logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());
                    logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());
                    logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());

                    //listen for change in properties file if an error occurs
                    if(conn.getResponseCode() != 200){
                        reloadParams();
                    }

                    conn.disconnect();
                    logger.info(core + "<index update process> Disconnected from server\t\t" + server);
                    Date endTime = new Date();
                    logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime));
                }catch(MalformedURLException mue){
                        logger.error("Failed to assemble URL for HTTP POST", mue);
                }catch(IOException ioe){
                        logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
                }catch(Exception e){
                        logger.error("Failed to send HTTP POST", e);
                }
        }

        public int getIntervalInt() {
                try{
                        return Integer.parseInt(interval);
                }catch(NumberFormatException e){
                        logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
                        return 30; //return default in case of error
                }
        }
}

SolrDataImportProperties.java

package org.apache.solr.handler.dataimport.scheduler;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;

import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolrDataImportProperties {
        private Properties properties;

        public static final String SYNC_ENABLED         = "syncEnabled";
        public static final String SYNC_CORES           = "syncCores";
        public static final String SERVER               = "server";
        public static final String PORT                 = "port";
        public static final String WEBAPP               = "webapp";
        public static final String PARAMS               = "params";
        public static final String INTERVAL             = "interval";

        private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);

        public SolrDataImportProperties(){
//              loadProperties(true);
        }

        public void loadProperties(boolean force){
                try{
                        SolrResourceLoader loader = new SolrResourceLoader(null);
                        logger.info("Instance dir = " + loader.getInstanceDir());

                        String configDir = loader.getConfigDir();
                        configDir = SolrResourceLoader.normalizeDir(configDir);
                        if(force || properties == null){
                                properties = new Properties();

                                String dataImportPropertiesPath = configDir + "dataimport.properties";

                                FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
                                properties.load(fis);
                        }
                }catch(FileNotFoundException fnfe){
                        logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
                }catch(IOException ioe){
                        logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
                }catch(Exception e){
                        logger.error("Error loading DataImportScheduler properties", e);
                }
        }

        public String getProperty(String key){
                return properties.getProperty(key);
        }
}

2、web.xml加入監聽

<listener>
   <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener
    </listener-class>
 </listener>

3、solr_conf/conf下 加入dataimport.properties

#Tue Jul 21 12:10:50 CEST 2010
metadataObject.last_index_time=2010-09-20 11\:12\:47
last_index_time=2010-09-20 11\:12\:47
#################################################
#                                               #
#       dataimport scheduler properties         #
#                                               #
#################################################

#  to sync or not to sync
#  1 - active; anything else - inactive
syncEnabled=1

#  which cores to schedule
#  in a multi-core environment you can decide which cores you want syncronized
#  leave empty or comment it out if using single-core deployment
syncCores=

#  solr server name or IP address
#  [defaults to localhost if empty]
server=localhost

#  solr server port
#  [defaults to 80 if empty]
port=8080

#  application name/context
#  [defaults to current ServletContextListener's context (app) name]
webapp=solr

#  URL params [mandatory]
#  remainder of URL
params=/select?qt=/dataimport&command=delta-import&clean=false&commit=true

#  schedule interval
#  number of minutes between two runs
#  [defaults to 30 if empty]
interval=10

具體參考http://wiki.apache.org/solr/DataImportHandler#Scheduling上面的說明。
配置好了,會根據interval間隔10分鍾會同步一次數據。

七、部分參考文檔

http://www.sonrun.com/?p=16
http://www.cnblogs.com/ibook360/archive/2011/11/21/2257200.html
http://wiki.apache.org/solr/DataImportHandler
http://blog.csdn.net/xzknet/article/details/6710753


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM