public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException { if (!(request instanceof HttpServletRequest)) return; try { if (cores == null || cores.isShutDown()) { log.error("Error processing the request. CoreContainer is either not initialized or shutting down."); throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Error processing the request. CoreContainer is either not initialized or shutting down."); } AtomicReference wrappedRequest = new AtomicReference<>(); if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been // sent return; } if (wrappedRequest.get() != null) { request = wrappedRequest.get(); } request = closeShield(request, retry); response = closeShield(response, retry); if (cores.getAuthenticationPlugin() != null) { log.debug("User principal: {}", ((HttpServletRequest) request).getUserPrincipal()); } // No need to even create the HttpSolrCall object if this path is excluded. if (excludePatterns != null) { String requestPath = ((HttpServletRequest) request).getServletPath(); String extraPath = ((HttpServletRequest) request).getPathInfo(); if (extraPath != null) { // In embedded mode, servlet path is empty - include all post-context path here for // testing requestPath += extraPath; } for (Pattern p : excludePatterns) { Matcher matcher = p.matcher(requestPath); if (matcher.lookingAt()) { chain.doFilter(request, response); return; } } }// 这块没有太多的逻辑 HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry); // HttpSolrCall总要的关注点。 ExecutorUtil.setServerThreadFlag(Boolean.TRUE); try { Action result = call.call(); switch (result) { case PASSTHROUGH: chain.doFilter(request, response); break; case RETRY: doFilter(request, response, chain, true); break; case FORWARD: request.getRequestDispatcher(call.getPath()).forward(request, response); break; } } finally { call.destroy(); ExecutorUtil.setServerThreadFlag(null); } } finally { consumeInputFully((HttpServletRequest) request); } }
@Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {......直接跳入主题...... if (!rb.isDistrib) { // 判断是否是分布式请求 // a normal non-distributed request long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L); if (timeAllowed > 0L) { SolrQueryTimeoutImpl.set(timeAllowed); } try { // The semantics of debugging vs not debugging are different enough that // it makes sense to have two control loops if(!rb.isDebug()) { // Process for( SearchComponent c : components ) { c.process(rb); // 主要关注的就是QueryComponent } } else { // Process RTimerTree subt = timer.sub( "process" ); for( SearchComponent c : components ) { rb.setTimer( subt.sub( c.getName() ) ); c.process(rb); rb.getTimer().stop(); } subt.stop(); // add the timing info if (rb.isDebugTimings()) { rb.addDebugInfo("timing", timer.asNamedList() ); } } } catch (ExitableDirectoryReader.ExitingReaderException ex) { log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage()); SolrDocumentList r = (SolrDocumentList) rb.rsp.getResponse(); if(r == null) r = new SolrDocumentList(); r.setNumFound(0); rb.rsp.addResponse(r); if(rb.isDebug()) { NamedList debug = new NamedList(); debug.add("explain", new NamedList()); rb.rsp.add("debug", debug); } rb.rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE); } finally { SolrQueryTimeoutImpl.reset(); } } else { // a distributed request if (rb.outgoing == null) { rb.outgoing = new LinkedList<>(); } rb.finished = new ArrayList<>(); int nextStage = 0; do { rb.stage = nextStage; nextStage = ResponseBuilder.STAGE_DONE; // 也就是上面提到的不同的stage // call all components for( SearchComponent c : components ) { // the next stage is the minimum of what all components report nextStage = Math.min(nextStage, c.distributedProcess(rb)); } // check the outgoing queue and send requests while (rb.outgoing.size() > 0) { // submit all current request tasks at once while (rb.outgoing.size() > 0) { ShardRequest sreq = rb.outgoing.remove(0); sreq.actualShards = sreq.shards; if (sreq.actualShards==ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards; } sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to // TODO: map from shard to address[] for (String shard : sreq.actualShards) { // 构造查询请求 ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); params.remove(ShardParams.SHARDS); // not a top-level request params.set(CommonParams.DISTRIB, "false"); // not a top-level request params.remove("indent"); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true); // a sub (shard) request params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose); // 在第一次查询和第二次删除的时候这个purpose是不同的。 params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked if (rb.requestInfo != null) { // we could try and detect when this is needed, but it could be tricky params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime())); } String shardQt = params.get(ShardParams.SHARDS_QT); if (shardQt != null) { params.set(CommonParams.QT, shardQt); } else { // for distributed queries that don't include shards.qt, use the original path // as the default but operators need to update their luceneMatchVersion to enable // this behavior since it did not work this way prior to 5.1 if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) { String reqPath = (String) req.getContext().get(PATH); if (!"/select".equals(reqPath)) { params.set(CommonParams.QT, reqPath); } // else if path is /select, then the qt gets passed thru if set } else { // this is the pre-5.1 behavior, which translates to sending the shard request to /select params.remove(CommonParams.QT); } } // 正式做请求的发送,使用了LB请求发送到live状态的节点,不分主备。 shardHandler1.submit(sreq, shard, params, rb.preferredHostAddress); } } // now wait for replies, but if anyone puts more requests on // the outgoing queue, send them out immediately (by exiting // this loop) boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); // 阻塞式等待请求返回结果 while (rb.outgoing.size() == 0) { ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors(): shardHandler1.takeCompletedOrError(); if (srsp == null) break; // no more requests to wait for // Was there an exception? if (srsp.getException() != null) { // If things are not tolerant, abort everything and rethrow if(!tolerant) { shardHandler1.cancelAll(); if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } else { if(rsp.getResponseHeader().get(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY) == null) { rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE); } } } rb.finished.add(srsp.getShardRequest()); // let the components see the responses to the request // 还是主要关注QueryComponent的handleResponses方法,在这里就有我上面提到的mergeIds和returnFields的方法。 for(SearchComponent c : components) { c.handleResponses(rb, srsp.getShardRequest()); } } } // 一样,关注QueryComponent for(SearchComponent c : components) { c.finishStage(rb); } // we are done when the next stage is MAX_VALUE } while (nextStage != Integer.MAX_VALUE); } ...... ...... ...... }
QueryComponent 处理逻辑主要还是在这个类里
process 有分布式和单节点的区别
单节点的话就只做一次查询,结果排序也在这里面完成。多节点的排序在mergeIds中进行。
@Override public void process(ResponseBuilder rb) throws IOException { LOG.debug("process: {}", rb.req.getParams()); SolrQueryRequest req = rb.req; SolrParams params = req.getParams(); if (!params.getBool(COMPONENT_NAME, true)) { return; } SolrIndexSearcher searcher = req.getSearcher(); StatsCache statsCache = req.getCore().getStatsCache(); int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS); if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) { statsCache.returnLocalStats(rb, searcher); return; } // check if we need to update the local copy of global dfs if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) { // retrieve from request and update local cache statsCache.receiveGlobalStats(req); } SolrQueryResponse rsp = rb.rsp; IndexSchema schema = searcher.getSchema(); String ids = params.get(ShardParams.IDS); // 刚才提到的只有第二次查询的时候才有ids参数,单节点和第一次查询都没有。 if (ids != null) { // 所以只有第二次查询才有这个逻辑 SchemaField idField = schema.getUniqueKeyField(); List idArr = StrUtils.splitSmart(ids, ",", true); int[] luceneIds = new int[idArr.size()]; int docs = 0; for (int i=0; i = 0) luceneIds[docs++] = id; } DocListAndSet res = new DocListAndSet(); res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0); if (rb.isNeedDocSet()) { // TODO: create a cache for this List queries = new ArrayList<>();// !!!!!!重点了,这次确实做了查询,但是在返回的时候还只是DocListAndSet只是id和score。缓存也就是为了在writeResponse的时候,可以从cache中获取出数据。 queries.add(rb.getQuery()); List filters = rb.getFilters(); if (filters != null) queries.addAll(filters); res.docSet = searcher.getDocSet(queries); } rb.setResults(res); ResultContext ctx = new BasicResultContext(rb); rsp.addResponse(ctx); return; // 分布式查询的第二次查询到此已经返回。 } ...... ...... ...... // normal search result searcher.search(result, cmd); // 查询ing,lucene查询逻辑不再展开 rb.setResult(result); ResultContext ctx = new BasicResultContext(rb); rsp.addResponse(ctx); rsp.getToLog().add("hits", rb.getResults().docList.matches()); if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) { if (null != rb.getNextCursorMark()) { rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT, rb.getNextCursorMark().getSerializedTotem()); } } if(rb.mergeFieldHandler != null) { rb.mergeFieldHandler.handleMergeFields(rb, searcher); } else { doFieldSortValues(rb, searcher); // 单节点查询的排序流程 } doPrefetch(rb); }
protected void mergeIds(ResponseBuilder rb, ShardRequest sreq) { ...... SortSpec ss = rb.getSortSpec(); Sort sort = ss.getSort(); // 获取排序字段 SortField[] sortFields = null; if(sort != null) sortFields = sort.getSort(); else { sortFields = new SortField[]{SortField.FIELD_SCORE}; // 默认要对score进行排序优先级最高 } IndexSchema schema = rb.req.getSchema(); SchemaField uniqueKeyField = schema.getUniqueKeyField(); // id to shard mapping, to eliminate any accidental dups HashMap
解析所有的fields
这个没什么不好理解的,就是构造最终的response。
protected void returnFields(ResponseBuilder rb, ShardRequest sreq) { if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) { boolean returnScores = (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0; String keyFieldName = rb.req.getSchema().getUniqueKeyField().getName(); boolean removeKeyField = !rb.rsp.getReturnFields().wantsField(keyFieldName); for (ShardResponse srsp : sreq.responses) { if (srsp.getException() != null) { // Don't try to get the documents if there was an exception in the shard if(rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false)) { @SuppressWarnings("unchecked") NamedList shardInfo = (NamedList) rb.rsp.getValues().get(ShardParams.SHARDS_INFO); @SuppressWarnings("unchecked") SimpleOrderedMap nl = (SimpleOrderedMap) shardInfo.get(srsp.getShard()); if (nl.get("error") == null) { // Add the error to the shards info section if it wasn't added before Throwable t = srsp.getException(); if(t instanceof SolrServerException) { t = ((SolrServerException)t).getCause(); } nl.add("error", t.toString() ); StringWriter trace = new StringWriter(); t.printStackTrace(new PrintWriter(trace)); nl.add("trace", trace.toString() ); } } continue; } SolrDocumentList docs = (SolrDocumentList) srsp.getSolrResponse().getResponse().get("response"); for (SolrDocument doc : docs) { Object id = doc.getFieldValue(keyFieldName); ShardDoc sdoc = rb.resultIds.get(id.toString()); if (sdoc != null) { if (returnScores) { doc.setField("score", sdoc.score); } else { // Score might have been added (in createMainQuery) to shard-requests (and therefore in shard-response-docs) // Remove score if the outer request did not ask for it returned doc.remove("score"); } if (removeKeyField) { doc.removeFields(keyFieldName); } rb.getResponseDocs().set(sdoc.positionInResponse, doc); } } } } }