一.簡介
SolrCloud是Solr4.0版本以后基於Solr和Zookeeper的分布式搜索方案。SolrCloud是Solr的基於Zookeeper一種部署方式。Solr可以以多種方式部署,例如單機方式,多機Master-Slaver方式。
二.特色功能
SolrCloud有幾個特色功能:
集中式的配置信息使用ZK進行集中配置。啟動時可以指定把Solr的相關配置文件上傳Zookeeper,多機器共用。這些ZK中的配置不會再拿到本地緩存,Solr直接讀取ZK中的配置信息。配置文件的變動,所有機器都可以感知到。另外,Solr的一些任務也是通過ZK作為媒介發布的。目的是為了容錯。接收到任務,但在執行任務時崩潰的機器,在重啟后,或者集群選出候選者時,可以再次執行這個未完成的任務。
自動容錯SolrCloud對索引分片,並對每個分片創建多個Replication。每個Replication都可以對外提供服務。一個Replication掛掉不會影響索引服務。更強大的是,它還能自動的在其它機器上幫你把失敗機器上的索引Replication重建並投入使用。
近實時搜索立即推送式的replication(也支持慢推送)。可以在秒內檢索到新加入索引。
查詢時自動負載均衡SolrCloud索引的多個Replication可以分布在多台機器上,均衡查詢壓力。如果查詢壓力大,可以通過擴展機器,增加Replication來減緩。
自動分發的索引和索引分片發送文檔到任何節點,它都會轉發到正確節點。
事務日志事務日志確保更新無丟失,即使文檔沒有索引到磁盤。
其它值得一提的功能有:
索引存儲在HDFS上索引的大小通常在G和幾十G,上百G的很少,這樣的功能或許很難實用。但是,如果你有上億數據來建索引的話,也是可以考慮一下的。我覺得這個功能最大的好處或許就是和下面這個“通過MR批量創建索引”聯合實用。
通過MR批量創建索引有了這個功能,你還擔心創建索引慢嗎?
強大的RESTful API通常你能想到的管理功能,都可以通過此API方式調用。這樣寫一些維護和管理腳本就方便多了。
優秀的管理界面主要信息一目了然;可以清晰的以圖形化方式看到SolrCloud的部署分布;當然還有不可或缺的Debug功能。
三.概念
Collection:在SolrCloud集群中邏輯意義上的完整的索引。它常常被划分為一個或多個Shard,它們使用相同的Config Set。如果Shard數超過一個,它就是分布式索引,SolrCloud讓你通過Collection名稱引用它,而不需要關心分布式檢索時需要使用的和Shard相關參數。
Config Set: Solr Core提供服務必須的一組配置文件。每個config set有一個名字。最小需要包括solrconfig.xml (SolrConfigXml)和schema.xml (SchemaXml),除此之外,依據這兩個文件的配置內容,可能還需要包含其它文件。它存儲在Zookeeper中。Config sets可以重新上傳或者使用upconfig命令更新,使用Solr的啟動參數bootstrap_confdir指定可以初始化或更新它。
Core: 也就是Solr Core,一個Solr中包含一個或者多個Solr Core,每個Solr Core可以獨立提供索引和查詢功能,每個Solr Core對應一個索引或者Collection的Shard,Solr Core的提出是為了增加管理靈活性和共用資源。在SolrCloud中有個不同點是它使用的配置是在Zookeeper中的,傳統的Solr core的配置文件是在磁盤上的配置目錄中。
Leader: 贏得選舉的Shard replicas。每個Shard有多個Replicas,這幾個Replicas需要選舉來確定一個Leader。選舉可以發生在任何時間,但是通常他們僅在某個Solr實例發生故障時才會觸發。當索引documents時,SolrCloud會傳遞它們到此Shard對應的leader,leader再分發它們到全部Shard的replicas。
Replica: Shard的一個拷貝。每個Replica存在於Solr的一個Core中。一個命名為“test”的collection以numShards=1創建,並且指定replicationFactor設置為2,這會產生2個replicas,也就是對應會有2個Core,每個在不同的機器或者Solr實例。一個會被命名為test_shard1_replica1,另一個命名為test_shard1_replica2。它們中的一個會被選舉為Leader。
Shard: Collection的邏輯分片。每個Shard被化成一個或者多個replicas,通過選舉確定哪個是Leader。
Zookeeper: Zookeeper提供分布式鎖功能,對SolrCloud是必須的。它處理Leader選舉。Solr可以以內嵌的Zookeeper運行,但是建議用獨立的,並且最好有3個以上的主機。
四.架構圖
五.其它
NRT 近實時搜索Solr的建索引數據是要在提交時寫入磁盤的,這是硬提交,確保即便是停電也不會丟失數據;為了提供更實時的檢索能力,Solr設定了一種軟提交方式。軟提交(soft commit):僅把數據提交到內存,index可見,此時沒有寫入到磁盤索引文件中。
一個通常的用法是:每1-10分鍾自動觸發硬提交,每秒鍾自動觸發軟提交。
RealTime Get 實時獲取允許通過唯一鍵查找任何文檔的最新版本數據,並且不需要重新打開searcher。這個主要用於把Solr作為NoSQL數據存儲服務,而不僅僅是搜索引擎。Realtime Get當前依賴事務日志,默認是開啟的。另外,即便是Soft Commit或者commitwithin,get也能得到真實數據。 注:commitwithin是一種數據提交特性,不是立刻,而是要求在一定時間內提交數據.
源碼分析開始
一.SolrDispatchFilter初始化
@Override public void init(FilterConfig config) throws ServletException { log.info("SolrDispatchFilter.init(): {}", this.getClass().getClassLoader()); String exclude = config.getInitParameter("excludePatterns"); if(exclude != null) { String[] excludeArray = exclude.split(","); excludePatterns = new ArrayList<>(); for (String element : excludeArray) { excludePatterns.add(Pattern.compile(element)); } } try { Properties extraProperties = (Properties) config.getServletContext().getAttribute(PROPERTIES_ATTRIBUTE); if (extraProperties == null) extraProperties = new Properties(); String solrHome = (String) config.getServletContext().getAttribute(SOLRHOME_ATTRIBUTE); if (solrHome == null) solrHome = SolrResourceLoader.locateSolrHome(); ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider()); this.cores = createCoreContainer(solrHome, extraProperties); if (this.cores.getAuthenticationPlugin() != null) { HttpClientConfigurer configurer = this.cores.getAuthenticationPlugin().getDefaultConfigurer(); if (configurer != null) { configurer.configure((DefaultHttpClient) httpClient, new ModifiableSolrParams()); } } log.info("user.dir=" + System.getProperty("user.dir")); } catch( Throwable t ) { // catch this so our filter still works log.error( "Could not start Solr. Check solr/home property and the logs"); SolrCore.log( t ); if (t instanceof Error) { throw (Error) t; } } log.info("SolrDispatchFilter.init() done"); }
二.CoreContainer執行load方法
//------------------------------------------------------------------- // Initialization / Cleanup //------------------------------------------------------------------- /** * Load the cores defined for this CoreContainer */ public void load() { log.info("Loading cores into CoreContainer [instanceDir={}]", loader.getInstanceDir()); // add the sharedLib to the shared resource loader before initializing cfg based plugins String libDir = cfg.getSharedLibDirectory(); if (libDir != null) { File f = FileUtils.resolvePath(new File(solrHome), libDir); log.info("loading shared library: " + f.getAbsolutePath()); loader.addToClassLoader(libDir, null, false); loader.reloadLuceneSPI(); } shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader); updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig()); solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader); logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader); hostName = cfg.getNodeName(); zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig()); initializeAuthenticationPlugin(); if (isZooKeeperAware()) { intializeAuthorizationPlugin(); } collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class); containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler); infoHandler = createHandler(cfg.getInfoHandlerClass(), InfoHandler.class); containerHandlers.put(INFO_HANDLER_PATH, infoHandler); coreAdminHandler = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class); containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler); coreConfigService = ConfigSetService.createConfigSetService(cfg, loader, zkSys.zkController); containerProperties.putAll(cfg.getSolrProperties()); // setup executor to load cores in parallel // do not limit the size of the executor in zk mode since cores may try and wait for each other. final ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool( ( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ), new DefaultSolrThreadFactory("coreLoadExecutor") ); final List<Future<SolrCore>> futures = new ArrayList<Future<SolrCore>>(); try { List<CoreDescriptor> cds = coresLocator.discover(this); checkForDuplicateCoreNames(cds); for (final CoreDescriptor cd : cds) { if (cd.isTransient() || !cd.isLoadOnStartup()) { solrCores.putDynamicDescriptor(cd.getName(), cd); } else if (asyncSolrCoreLoad) { solrCores.markCoreAsLoading(cd); } if (cd.isLoadOnStartup()) { futures.add(coreLoadExecutor.submit(new Callable<SolrCore>() { @Override public SolrCore call() throws Exception { SolrCore core; try { if (zkSys.getZkController() != null) { zkSys.getZkController().throwErrorIfReplicaReplaced(cd); } core = create(cd, false); } finally { if (asyncSolrCoreLoad) { solrCores.markCoreAsNotLoading(cd); } } try { zkSys.registerInZk(core, true); } catch (Throwable t) { SolrException.log(log, "Error registering SolrCore", t); } return core; } })); } } // Start the background thread backgroundCloser = new CloserThread(this, solrCores, cfg); backgroundCloser.start(); } finally { if (asyncSolrCoreLoad && futures != null) { Thread shutdownThread = new Thread() { public void run() { try { for (Future<SolrCore> future : futures) { try { future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { log.error("Error waiting for SolrCore to be created", e); } } } finally { ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor); } } }; coreContainerWorkExecutor.submit(shutdownThread); } else { ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor); } } if (isZooKeeperAware()) { zkSys.getZkController().checkOverseerDesignate(); } }
三.ZkContainer調用配置文件,初始化zookeeper
public void initZooKeeper(final CoreContainer cc, String solrHome, CloudConfig config) { ZkController zkController = null; String zkRun = System.getProperty("zkRun"); if (zkRun != null && config == null) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Cannot start Solr in cloud mode - no cloud config provided"); if (config == null) return; // not in zk mode String zookeeperHost = config.getZkHost(); // zookeeper in quorum mode currently causes a failure when trying to // register log4j mbeans. See SOLR-2369 // TODO: remove after updating to an slf4j based zookeeper System.setProperty("zookeeper.jmx.log4j.disable", "true"); if (zkRun != null) { String zkDataHome = System.getProperty("zkServerDataDir", solrHome + "zoo_data"); String zkConfHome = System.getProperty("zkServerConfDir", solrHome); zkServer = new SolrZkServer(stripChroot(zkRun), stripChroot(config.getZkHost()), zkDataHome, zkConfHome, config.getSolrHostPort()); zkServer.parseConfig(); zkServer.start(); // set client from server config if not already set if (zookeeperHost == null) { zookeeperHost = zkServer.getClientString(); } } int zkClientConnectTimeout = 30000; if (zookeeperHost != null) { // we are ZooKeeper enabled try { // If this is an ensemble, allow for a long connect time for other servers to come up if (zkRun != null && zkServer.getServers().size() > 1) { zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded ensemble log.info("Zookeeper client=" + zookeeperHost + " Waiting for a quorum."); } else { log.info("Zookeeper client=" + zookeeperHost); } String confDir = System.getProperty("bootstrap_confdir"); boolean boostrapConf = Boolean.getBoolean("bootstrap_conf"); if(!ZkController.checkChrootPath(zookeeperHost, (confDir!=null) || boostrapConf || zkRunOnly)) { throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost); } zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config, new CurrentCoreDescriptorProvider() { @Override public List<CoreDescriptor> getCurrentDescriptors() { List<CoreDescriptor> descriptors = new ArrayList<>( cc.getCoreNames().size()); Collection<SolrCore> cores = cc.getCores(); for (SolrCore core : cores) { descriptors.add(core.getCoreDescriptor()); } return descriptors; } }); if (zkRun != null && zkServer.getServers().size() > 1 && confDir == null && boostrapConf == false) { // we are part of an ensemble and we are not uploading the config - pause to give the config time // to get up Thread.sleep(10000); } if(confDir != null) { Path configPath = Paths.get(confDir); if (!Files.isDirectory(configPath)) throw new IllegalArgumentException("bootstrap_confdir must be a directory of configuration files"); String confName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX+ZkController.CONFIGNAME_PROP, "configuration1"); ZkConfigManager configManager = new ZkConfigManager(zkController.getZkClient()); configManager.uploadConfigDir(configPath, confName); } if(boostrapConf) { ZkController.bootstrapConf(zkController.getZkClient(), cc, solrHome); } } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } catch (TimeoutException e) { log.error("Could not connect to ZooKeeper", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } catch (IOException | KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } } this.zkController = zkController; }
四.調用zkController的初始化方法進行選舉
private void init(CurrentCoreDescriptorProvider registerOnReconnect) { try { boolean createdWatchesAndUpdated = false; Stat stat = zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, null, true); if (stat!= null && stat.getNumChildren()>0) { zkStateReader.createClusterStateWatchersAndUpdate(); createdWatchesAndUpdated = true; publishAndWaitForDownStates(); } createClusterZkNodes(zkClient); createEphemeralLiveNode(); ShardHandler shardHandler; UpdateShardHandler updateShardHandler; shardHandler = cc.getShardHandlerFactory().getShardHandler(); updateShardHandler = cc.getUpdateShardHandler(); if (!zkRunOnly) { overseerElector = new LeaderElector(zkClient); this.overseer = new Overseer(shardHandler, updateShardHandler, CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig); ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.setup(context); overseerElector.joinElection(context, false); } if (!createdWatchesAndUpdated) { zkStateReader.createClusterStateWatchersAndUpdate(); } } catch (IOException e) { log.error("", e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't create ZooKeeperController", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } }
五.具體實現為LeaderElector的joinElection()方法
/** * Begin participating in the election process. Gets a new sequential number * and begins watching the node with the sequence number before it, unless it * is the lowest number, in which case, initiates the leader process. If the * node that is watched goes down, check if we are the new lowest node, else * watch the next lowest numbered node. * * @return sequential node number */ public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException { context.joinedElectionFired(); final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE; long sessionId = zkClient.getSolrZooKeeper().getSessionId(); String id = sessionId + "-" + context.id; String leaderSeqPath = null; boolean cont = true; int tries = 0; while (cont) { try { if(joinAtHead){ log.info("Node {} trying to join election at the head", id); List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath); if(nodes.size() <2){ leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL, false); } else { String firstInLine = nodes.get(1); log.info("The current head: {}", firstInLine); Matcher m = LEADER_SEQ.matcher(firstInLine); if (!m.matches()) { throw new IllegalStateException("Could not find regex match in:" + firstInLine); } leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1); zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false); } } else { leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL, false); } log.info("Joined leadership election with path: {}", leaderSeqPath); context.leaderSeqPath = leaderSeqPath; cont = false; } catch (ConnectionLossException e) { // we don't know if we made our node or not... List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true); boolean foundId = false; for (String entry : entries) { String nodeId = getNodeId(entry); if (id.equals(nodeId)) { // we did create our node... foundId = true; break; } } if (!foundId) { cont = true; if (tries++ > 20) { throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } catch (KeeperException.NoNodeException e) { // we must have failed in creating the election node - someone else must // be working on it, lets try again if (tries++ > 20) { context = null; throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } cont = true; try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } checkIfIamLeader(context, replacement); return getSeq(context.leaderSeqPath); }
六.OverseerCollectionProcessor
實現了Runnable接口,故其核心方法是run()方法:
@Override public void run() { log.info("Process current queue of collection creations"); LeaderStatus isLeader = amILeader(); while (isLeader == LeaderStatus.DONT_KNOW) { log.debug("am_i_leader unclear {}", isLeader); isLeader = amILeader(); // not a no, not a yes, try ask again } String oldestItemInWorkQueue = null; // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer. // This variable is set in case there's any task found on the workQueue when the OCP starts up and // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk. // Beyond the marker, all tasks can safely be assumed to have never been executed. boolean hasLeftOverItems = true; try { oldestItemInWorkQueue = workQueue.getTailId(); } catch (KeeperException e) { // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed // async calls. SolrException.log(log, "", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if (oldestItemInWorkQueue == null) hasLeftOverItems = false; else log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue); try { prioritizeOverseerNodes(); } catch (Exception e) { log.error("Unable to prioritize overseer ", e); } // TODO: Make maxThreads configurable. this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory("OverseerThreadFactory")); try { while (!this.isClosed) { try { isLeader = amILeader(); if (LeaderStatus.NO == isLeader) { break; } else if (LeaderStatus.YES != isLeader) { log.debug("am_i_leader unclear {}", isLeader); continue; // not a no, not a yes, try asking again } log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size()); cleanUpWorkQueue(); printTrackingMaps(); boolean waited = false; while (runningTasks.size() > maxParallelThreads) { synchronized (waitLock) { waitLock.wait(100);//wait for 100 ms or till a task is complete } waited = true; } if (waited) cleanUpWorkQueue(); List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L); if (heads == null) continue; log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString()); if (isClosed) break; for (QueueEvent head : heads) { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); String collectionName = message.containsKey(COLLECTION_PROP) ? message.getStr(COLLECTION_PROP) : message.getStr(NAME); final String asyncId = message.getStr(ASYNC); if (hasLeftOverItems) { if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false; if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) { log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId ); workQueue.remove(head); continue; } } if (!checkExclusivity(message, head.getId())) { log.debug("Exclusivity check failed for [{}]", message.toString()); continue; } try { markTaskAsRunning(head, collectionName, asyncId, message); log.debug("Marked task [{}] as running", head.getId()); } catch (KeeperException.NodeExistsException e) { // This should never happen log.error("Tried to pick up task [{}] when it was already running!", head.getId()); } catch (InterruptedException e) { log.error("Thread interrupted while trying to pick task for execution.", head.getId()); Thread.currentThread().interrupt(); } log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString()); String operation = message.getStr(Overseer.QUEUE_OPERATION); Runner runner = new Runner(message, operation, head); tpe.execute(runner); } } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED) { log.warn("Overseer cannot talk to ZK"); return; } SolrException.log(log, "", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch (Exception e) { SolrException.log(log, "", e); } } } finally { this.close(); } }
該run()方法由調用了一個內部類Runner,見紅線所示,Runner也是一個線程,實現了Runnable接口,其核心方法同樣為run():
@Override public void run() { final TimerContext timerContext = stats.time("collection_" + operation); boolean success = false; final String asyncId = message.getStr(ASYNC); String collectionName = message.containsKey(COLLECTION_PROP) ? message.getStr(COLLECTION_PROP) : message.getStr(NAME); try { try { log.debug("Runner processing {}", head.getId()); response = processMessage(message, operation); } finally { timerContext.stop(); updateStats(); } if(asyncId != null) { if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) { failureMap.put(asyncId, SolrResponse.serializable(response)); log.debug("Updated failed map for task with zkid:[{}]", head.getId()); } else { completedMap.put(asyncId, SolrResponse.serializable(response)); log.debug("Updated completed map for task with zkid:[{}]", head.getId()); } } else { head.setBytes(SolrResponse.serializable(response)); log.debug("Completed task:[{}]", head.getId()); } markTaskComplete(head.getId(), asyncId, collectionName); log.debug("Marked task [{}] as completed.", head.getId()); printTrackingMaps(); log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:" + response.getResponse().toString()); success = true; } catch (KeeperException e) { SolrException.log(log, "", e); } catch (InterruptedException e) { // Reset task from tracking data structures so that it can be retried. resetTaskWithException(head.getId(), asyncId, collectionName); log.warn("Resetting task {} as the thread was interrupted.", head.getId()); Thread.currentThread().interrupt(); } finally { if(!success) { // Reset task from tracking data structures so that it can be retried. resetTaskWithException(head.getId(), asyncId, collectionName); } synchronized (waitLock){ waitLock.notifyAll(); } } }
上述方法中,使用紅線標明了核心實現方法processMessage(),該方法具體實現了Collection的各種操作:
protected SolrResponse processMessage(ZkNodeProps message, String operation) { log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString()); NamedList results = new NamedList(); try { // force update the cluster state zkStateReader.updateClusterState(); CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation); if (action == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); } switch (action) { case CREATE: createCollection(zkStateReader.getClusterState(), message, results); break; case DELETE: deleteCollection(message, results); break; case RELOAD: ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString()); collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE); break; case CREATEALIAS: createAlias(zkStateReader.getAliases(), message); break; case DELETEALIAS: deleteAlias(zkStateReader.getAliases(), message); break; case SPLITSHARD: splitShard(zkStateReader.getClusterState(), message, results); break; case DELETESHARD: deleteShard(zkStateReader.getClusterState(), message, results); break; case CREATESHARD: createShard(zkStateReader.getClusterState(), message, results); break; case DELETEREPLICA: deleteReplica(zkStateReader.getClusterState(), message, results); break; case MIGRATE: migrate(zkStateReader.getClusterState(), message, results); break; case ADDROLE: processRoleCommand(message, operation); break; case REMOVEROLE: processRoleCommand(message, operation); break; case ADDREPLICA: addReplica(zkStateReader.getClusterState(), message, results); break; case OVERSEERSTATUS: getOverseerStatus(message, results); break; case CLUSTERSTATUS://TODO . deprecated. OCP does not need to do it .remove in a later release new ClusterStatus(zkStateReader, message).getClusterStatus(results); break; case ADDREPLICAPROP: processReplicaAddPropertyCommand(message); break; case DELETEREPLICAPROP: processReplicaDeletePropertyCommand(message); break; case BALANCESHARDUNIQUE: balanceProperty(message); break; case REBALANCELEADERS: processRebalanceLeaders(message); break; case MODIFYCOLLECTION: overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message)); break; default: throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); } } catch (Exception e) { String collName = message.getStr("collection"); if (collName == null) collName = message.getStr(NAME); if (collName == null) { SolrException.log(log, "Operation " + operation + " failed", e); } else { SolrException.log(log, "Collection: " + collName + " operation: " + operation + " failed", e); } results.add("Operation " + operation + " caused exception:", e); SimpleOrderedMap nl = new SimpleOrderedMap(); nl.add("msg", e.getMessage()); nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1); results.add("exception", nl); } return new OverseerSolrResponse(results); }
我們以SPLITSHARD為例說明:
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { String collectionName = message.getStr("collection"); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); log.info("Split shard invoked"); String splitKey = message.getStr("split.key"); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); DocCollection collection = clusterState.getCollection(collectionName); DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; Slice parentSlice = null; if (slice == null) { if (router instanceof CompositeIdRouter) { Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection); if (searchSlices.isEmpty()) { throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey); } if (searchSlices.size() > 1) { throw new SolrException(ErrorCode.BAD_REQUEST, "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported"); } parentSlice = searchSlices.iterator().next(); slice = parentSlice.getName(); log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice); } else { throw new SolrException(ErrorCode.BAD_REQUEST, "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName()); } } else { parentSlice = clusterState.getSlice(collectionName, slice); } if (parentSlice == null) { if (clusterState.hasCollection(collectionName)) { throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice); } else { throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName); } } // find the leader for the shard Replica parentShardLeader = null; try { parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } DocRouter.Range range = parentSlice.getRange(); if (range == null) { range = new PlainIdRouter().fullRange(); } List<DocRouter.Range> subRanges = null; String rangesStr = message.getStr(CoreAdminParams.RANGES); if (rangesStr != null) { String[] ranges = rangesStr.split(","); if (ranges.length == 0 || ranges.length == 1) { throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard"); } else { subRanges = new ArrayList<>(ranges.length); for (int i = 0; i < ranges.length; i++) { String r = ranges[i]; try { subRanges.add(DocRouter.DEFAULT.fromString(r)); } catch (Exception e) { throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e); } if (!subRanges.get(i).isSubsetOf(range)) { throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString()); } } List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order Collections.sort(temp); if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) { throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range); } for (int i = 1; i < temp.size(); i++) { if (temp.get(i - 1).max + 1 != temp.get(i).min) { throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range); } } } } else if (splitKey != null) { if (router instanceof CompositeIdRouter) { CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range); if (subRanges.size() == 1) { throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice); } for (DocRouter.Range subRange : subRanges) { if (subRange.min == subRange.max) { throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId"); } } log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges); rangesStr = ""; for (int i = 0; i < subRanges.size(); i++) { DocRouter.Range subRange = subRanges.get(i); rangesStr += subRange.toString(); if (i < subRanges.size() - 1) rangesStr += ','; } } } else { // todo: fixed to two partitions? subRanges = router.partitionRange(2, range); } try { List<String> subSlices = new ArrayList<>(subRanges.size()); List<String> subShardNames = new ArrayList<>(subRanges.size()); String nodeName = parentShardLeader.getNodeName(); for (int i = 0; i < subRanges.size(); i++) { String subSlice = slice + "_" + i; subSlices.add(subSlice); String subShardName = collectionName + "_" + subSlice + "_replica1"; subShardNames.add(subShardName); Slice oSlice = clusterState.getSlice(collectionName, subSlice); if (oSlice != null) { final Slice.State state = oSlice.getState(); if (state == Slice.State.ACTIVE) { throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard."); } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { // delete the shards for (String sub : subSlices) { log.info("Sub-shard: {} already exists therefore requesting its deletion", sub); Map<String,Object> propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, "deleteshard"); propMap.put(COLLECTION_PROP, collectionName); propMap.put(SHARD_ID_PROP, sub); ZkNodeProps m = new ZkNodeProps(propMap); try { deleteShard(clusterState, m, new NamedList()); } catch (Exception e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e); } } } } } // do not abort splitshard if the unloading fails // this can happen because the replicas created previously may be down // the only side effect of this is that the sub shard may end up having more replicas than we want collectShardResponses(results, false, null, shardHandler); final String asyncId = message.getStr(ASYNC); HashMap<String,String> requestMap = new HashMap<>(); for (int i = 0; i < subRanges.size(); i++) { String subSlice = subSlices.get(i); String subShardName = subShardNames.get(i); DocRouter.Range subRange = subRanges.get(i); log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName); Map<String,Object> propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower()); propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice); propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString()); propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString()); propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName()); DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap))); // wait until we are able to see the new shard in cluster state waitForNewShard(collectionName, subSlice); // refresh cluster state clusterState = zkStateReader.getClusterState(); log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName + " on " + nodeName); propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); propMap.put(COLLECTION_PROP, collectionName); propMap.put(SHARD_ID_PROP, subSlice); propMap.put("node", nodeName); propMap.put(CoreAdminParams.NAME, subShardName); // copy over property params: for (String key : message.keySet()) { if (key.startsWith(COLL_PROP_PREFIX)) { propMap.put(key, message.getStr(key)); } } // add async param if (asyncId != null) { propMap.put(ASYNC, asyncId); } addReplica(clusterState, new ZkNodeProps(propMap), results); } collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler); completeAsyncRequest(asyncId, requestMap, results); for (String subShardName : subShardNames) { // wait for parent leader to acknowledge the sub-shard core log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName); CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); cmd.setCoreName(subShardName); cmd.setNodeName(nodeName); cmd.setCoreNodeName(coreNodeName); cmd.setState(Replica.State.ACTIVE); cmd.setCheckLive(true); cmd.setOnlyIfLeader(true); ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); } collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler); completeAsyncRequest(asyncId, requestMap, results); log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice + " on: " + parentShardLeader); log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection " + collectionName + " on " + parentShardLeader); ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString()); params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); for (int i = 0; i < subShardNames.size(); i++) { String subShardName = subShardNames.get(i); params.add(CoreAdminParams.TARGET_CORE, subShardName); } params.set(CoreAdminParams.RANGES, rangesStr); sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler); completeAsyncRequest(asyncId, requestMap, results); log.info("Index on shard: " + nodeName + " split into two successfully"); // apply buffered updates on sub-shards for (int i = 0; i < subShardNames.size(); i++) { String subShardName = subShardNames.get(i); log.info("Applying buffered updates on : " + subShardName); params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.NAME, subShardName); sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); } collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates", shardHandler); completeAsyncRequest(asyncId, requestMap, results); log.info("Successfully applied buffered updates on : " + subShardNames); // Replica creation for the new Slices // look at the replication factor and see if it matches reality // if it does not, find best nodes to create more cores // TODO: Have replication factor decided in some other way instead of numShards for the parent int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size(); // we need to look at every node and see how many cores it serves // add our new cores to existing nodes serving the least number of cores // but (for now) require that each core goes on a distinct node. // TODO: add smarter options that look at the current number of cores per // node? // for now we just go random Set<String> nodes = clusterState.getLiveNodes(); List<String> nodeList = new ArrayList<>(nodes.size()); nodeList.addAll(nodes); // TODO: Have maxShardsPerNode param for this operation? // Remove the node that hosts the parent shard for replica creation. nodeList.remove(nodeName); // TODO: change this to handle sharding a slice into > 2 sub-shards. List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2); for (int i = 1; i <= subSlices.size(); i++) { Collections.shuffle(nodeList, RANDOM); String sliceName = subSlices.get(i - 1); for (int j = 2; j <= repFactor; j++) { String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size()); String shardName = collectionName + "_" + sliceName + "_replica" + (j); log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName + " on " + subShardNodeName); ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP, collectionName, ZkStateReader.SHARD_ID_PROP, sliceName, ZkStateReader.CORE_NAME_PROP, shardName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName), ZkStateReader.NODE_NAME_PROP, subShardNodeName); Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); HashMap<String,Object> propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); propMap.put(COLLECTION_PROP, collectionName); propMap.put(SHARD_ID_PROP, sliceName); propMap.put("node", subShardNodeName); propMap.put(CoreAdminParams.NAME, shardName); // copy over property params: for (String key : message.keySet()) { if (key.startsWith(COLL_PROP_PREFIX)) { propMap.put(key, message.getStr(key)); } } // add async param if (asyncId != null) { propMap.put(ASYNC, asyncId); } // special flag param to instruct addReplica not to create the replica in cluster state again propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true"); replicas.add(propMap); } } // we must set the slice state into recovery before actually creating the replica cores // this ensures that the logic inside Overseer to update sub-shard state to 'active' // always gets a chance to execute. See SOLR-7673 if (repFactor == 1) { // switch sub shard states to 'active' log.info("Replication factor is 1 so switching shard states"); DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); Map<String,Object> propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); propMap.put(slice, Slice.State.INACTIVE.toString()); for (String subSlice : subSlices) { propMap.put(subSlice, Slice.State.ACTIVE.toString()); } propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); ZkNodeProps m = new ZkNodeProps(propMap); inQueue.offer(Utils.toJSON(m)); } else { log.info("Requesting shard state be set to 'recovery'"); DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); Map<String,Object> propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); for (String subSlice : subSlices) { propMap.put(subSlice, Slice.State.RECOVERY.toString()); } propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); ZkNodeProps m = new ZkNodeProps(propMap); inQueue.offer(Utils.toJSON(m)); } // now actually create replica cores on sub shard nodes for (Map<String, Object> replica : replicas) { addReplica(clusterState, new ZkNodeProps(replica), results); } collectShardResponses(results, true, "SPLITSHARD failed to create subshard replicas", shardHandler); completeAsyncRequest(asyncId, requestMap, results); log.info("Successfully created all replica shards for all sub-slices " + subSlices); commit(results, slice, parentShardLeader); return true; } catch (SolrException e) { throw e; } catch (Exception e) { log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e); throw new SolrException(ErrorCode.SERVER_ERROR, null, e); }
小結:
solrCloud 從zookeeper開始一步步分析到具體的命令執行,完整了走遍了流程,但因篇幅限制沒有就具體細節進行講解。后續會在后面的文章中分析每個細節。
參考文獻:
【1】http://itindex.net/detail/48735-solrcloud