Py学习  »  Elasticsearch

Elasticsearch基础概念之查询流程

逍遥白亦 • 3 年前 • 251 次点击  

ES查询流程

查询流程总体可以分为以下几个部分

  1. 请求解析
  2. 搜索Query阶段
  3. 搜索Fetch阶段
  4. 数据排序和合并

1.总体流程

2.请求解析

2.1 解析request对象并构建SearchRequest对象

对于客户端发过来的search请求,最终会在RestSearchAction#prepareRequest方法中处理

    SearchRequest searchRequest = new SearchRequest();

    IntConsumer setSize = size -> searchRequest.source().size(size);
    //根据request中的参数,构建SearchRequest对象
    request.withContentOrSourceParamParserOrNull(parser ->
        parseSearchRequest(searchRequest, request, parser, setSize));
    //执行搜索请求
    return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));

2.2 调用search

经过层层封装,会调用NodeClient#executeLocally方法

    public <    Request extends ActionRequest,
                Response extends ActionResponse
            > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
        return transportAction(action).execute(request, listener);
    }

之后会执行TransportAction#execute方法

public final Task execute(Request request, ActionListener<Response> listener) {
        Task task = taskManager.register("transport", actionName, request);
        if (task == null) {
            execute(null, request, listener);
        } else {
            execute(task, request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response response) {
                    taskManager.unregister(task);
                    listener.onResponse(response);
                }

                @Override
                public void onFailure(Exception e) {
                    taskManager.unregister(task);
                    listener.onFailure(e);
                }
            });
        }
        return task;
    }

最后会调用TransportSearchAction#doExecute方法

3.Query阶段

3.1 获取本地shard和远程shard并合并

TransportSearchAction#doExecute方法

protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
            ...
            if (remoteClusterIndices.isEmpty()) {   //无远程索引
                executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
                    (clusterName, nodeId) -> {
                        return null;
                    }, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
                        .getDataNodes().size(), SearchResponse.Clusters.EMPTY);
            } else {    //有远程索引
                //收集所有远程shard
                remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
                    searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
                        List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
                        Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
                        BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
                            remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                        int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
                            + clusterState.getNodes().getDataNodes().size();
                        SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
                        executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
                            remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved,
                            clusters);
                    }, listener::onFailure));
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }

执行executeSearch方法

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
                               Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
                               BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
                               Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount,
                               SearchResponse.Clusters clusters) {

        ...
        //获取本地shard
        GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
        //将本地shard和远程shard合并        
        GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
            remoteShardIterators);
        ...
        //是否为QUERY_THEN_FETCH
        boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
        //异步执行搜索
        searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
            Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
    }

3.2 遍历所有shard执行请求

执行搜索方法searchAsyncAction(...).start()




    
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
                                                        GroupShardsIterator<SearchShardIterator> shardIterators,
                                                        SearchTimeProvider timeProvider,
                                                        BiFunction<String, String, Transport.Connection> connectionLookup,
                                                        long clusterStateVersion,
                                                        Map<String, AliasFilter> aliasFilter,
                                                        Map<String, Float> concreteIndexBoosts,
                                                        Map<String, Set<String>> indexRoutings,
                                                        ActionListener<SearchResponse> listener,
                                                        boolean preFilter,
                                                        SearchResponse.Clusters clusters) {
        //使用search线程池管理搜索请求                                                
        Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
        if (preFilter) { //搜索类型为QUERY_THEN_FETCH
            return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
                aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
                timeProvider, clusterStateVersion, task, (iter) -> {
                AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
                    clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
                return new SearchPhase(action.getName()) {
                    @Override
                    public void run() {
                        action.start();
                    }
                };
            }, clusters);
        } else { //执行搜索type
            AbstractSearchAsyncAction searchAsyncAction;
            switch (searchRequest.searchType()) {
                case DFS_QUERY_THEN_FETCH:
                    searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
                        shardIterators, timeProvider, clusterStateVersion, task, clusters);
                    break;
                case QUERY_AND_FETCH:
                case QUERY_THEN_FETCH:
                    searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
                        shardIterators, timeProvider, clusterStateVersion, task, clusters);
                    break;
                default:
                    throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
            }
            return searchAsyncAction;
        }
    }

这里只分析QUERY_THEN_FETCH这个type类型,最终会调用InitialSearchPhase的run方法,该方法会执行每一个shard请求

@Override
    public final void run() throws IOException {
        ...
        if (shardsIts.size() > 0) {
            //最大分片请求数
            int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
            ...
            for (int index = 0; index < maxConcurrentShardRequests; index++) {
                final SearchShardIterator shardRoutings = shardsIts.get(index);
                assert shardRoutings.skip() == false;
                //执行shard请求
                performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }

调用performPhaseOnShard方法执行shard请求

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {

        final Thread thread = Thread.currentThread();
        if (shard == null) {
            fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
        } else {
            try {
                executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
                    shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
                    //收到执行成功回复
                    @Override
                    public void innerOnResponse(FirstResult result) {
                        maybeFork(thread, () -> onShardResult(result, shardIt));
                    }
                    //收到执行失败回复
                    @Override
                    public void onFailure(Exception t) {
                        maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
                    }
                });
            } catch (final Exception e) {
                /*
                 * It is possible to run into connection exceptions here because we are getting the connection early and might run in to
                 * nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
                 */
                fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
            }
        }
    }

会调用TransportInterceptor#sendRequest方法发送请求

3.3 收集结果

调用onShardResult收集成功结果

    private void onShardResult(FirstResult result, SearchShardIterator shardIt) {
        assert result.getShardIndex() != -1 : "shard index is not set";
        assert result.getSearchShardTarget() != null : "search shard target must not be null";
        //对所有结果进行收集
        onShardSuccess(result);
        //是否所有请求都已收到回复
        successfulShardExecution(shardIt);
    }

onShardSuccess对所有结果进行收集

    @Override
    public final void onShardSuccess(Result result) {
        successfulOps.incrementAndGet();
        results.consumeResult(result);
        if (logger.isTraceEnabled()) {
            logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
        }

        AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
        if (shardFailures != null) {
            shardFailures.set(result.getShardIndex(), null);
        }
    }

successfulShardExecution是否所有请求都已收到回复,都收到之后进行下一阶段

private void successfulShardExecution(SearchShardIterator shardsIt) {
        final int remainingOpsOnIterator;
        if (shardsIt.skip()) {
            remainingOpsOnIterator = shardsIt.remaining();
        } else {
            remainingOpsOnIterator = shardsIt.remaining() + 1;
        }
        final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
        //收到全部请求结果
        if (xTotalOps == expectedTotalOps) {
            onPhaseDone();
        } else if (xTotalOps > expectedTotalOps) {
            throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
                + expectedTotalOps + "]");
        } else if (shardsIt.skip() == false) {
            maybeExecuteNext();
        }
    }

4.Fetch阶段

4.1 发送fetch请求

onPhaseDone()会调用AbstractSearchAsyncAction#executeNextPhase方法

@Override
    public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
        /* This is the main search phase transition where we move to the next phase. At this point we check if there is
         * at least one successful operation left and if so we move to the next phase. If not we immediately fail the
         * search phase as "all shards failed"*/
        if (successfulOps.get() == 0) { // 无搜索结果
            final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
            Throwable cause = shardSearchFailures.length == 0 ? null :
                ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
            logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
            onPhaseFailure(currentPhase, "all shards failed", cause);
        } else { //如果请求将产生部分结果, 设置为false用来返回整体故障
            Boolean allowPartialResults = request.allowPartialSearchResults();
            assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
            if (allowPartialResults == false && shardFailures.get() != null ){
                if (logger.isDebugEnabled()) {
                    final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
                    Throwable cause = shardSearchFailures.length == 0 ? null :
                        ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
                    logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
                            shardSearchFailures.length, getName()), cause);
                }
                onPhaseFailure(currentPhase, "Partial shards failure", null);
            } else { //将在超时或部分失败的情况下, 返回部分结果.
                if (logger.isTraceEnabled()) {
                    final String resultsFrom = results.getSuccessfulResults()
                        .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
                    logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
                        currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
                }
                //执行下一阶段方法
                executePhase(nextPhase);
            }
        }
    }
    private void executePhase(SearchPhase phase) {
        try {
            phase.run();
        } catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
            }
            onPhaseFailure(phase, "", e);
        }
    }

FetchSearchPhase继承了SearchPhase类,所以最终会调用FetchSearchPhase#run方法

    @Override
    public void run() throws IOException {
        context.execute(new ActionRunnable<SearchResponse>(context) {
            @Override
            public void doRun() throws IOException {
                //具体执行方法
                innerRun();
            }

            @Override
            public void onFailure(Exception e) {
                context.onPhaseFailure(FetchSearchPhase.this, "", e);
            }
        });
    }

innerRun()方法




    
private void innerRun() throws IOException {
        ...
        //查询都处理完之后进入下一阶段
         final Runnable finishPhase = ()
            -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
            queryResults : fetchResults);
        ...
        if (queryAndFetchOptimization) {
            assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
                + "], single result: " +  phaseResults.get(0).fetchResult();
            // query AND fetch optimization
            finishPhase.run();
        } else {
            final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
            if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
                phaseResults.stream()
                    .map(SearchPhaseResult::queryResult)
                    .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
                finishPhase.run();
            } else {
                final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
                    searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
                    : null;
                //定义收集器
                final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
                    docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
                    finishPhase, context);
                for (int i = 0; i < docIdsToLoad.length; i++) {
                    IntArrayList entry = docIdsToLoad[i];
                    SearchPhaseResult queryResult = queryResults.get(i);
                    if (entry == null) { // no results for this shard ID
                        if (queryResult != null) {
                            // if we got some hits from this shard we have to release the context there
                            // we do this as we go since it will free up resources and passing on the request on the
                            // transport layer is cheap.
                            releaseIrrelevantSearchContext(queryResult.queryResult());
                        }
                        // in any case we count down this result since we don't talk to this shard anymore
                        counter.countDown();
                    } else {
                        SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                        Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
                            searchShardTarget.getNodeId());
                        ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
                            lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
                            
                        //执行Fetch    
                        executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
                            connection);
                    }
                }
            }
        }
    }

executeFetch方法会处理成功或失败的请求,executeFetch的参数querySearchResult中包含分页信息,最后定义一个Listener,每成功获取一个shard数据后就执行counter.onResult,调用对结果的处理回调,把result保存到数组中,然后执行countDown

    private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
                              final CountedCollector<FetchSearchResult> counter,
                              final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
                              final Transport.Connection connection) {
        // 发送请求
        context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
            new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
                @Override
                //处理成功的请求
                public void innerOnResponse(FetchSearchResult result) {
                    counter.onResult(result);
                }

                @Override
                //处理失败的请求
                public void onFailure(Exception e) {
                    try {
                        logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
                        counter.onFailure(shardIndex, shardTarget, e);
                    } finally {
                        // the search context might not be cleared on the node where the fetch was executed for example
                        // because the action was rejected by the thread pool. in this case we need to send a dedicated
                        // request to clear the search context.
                        releaseIrrelevantSearchContext(querySearchResult);
                    }
                }
            });
    }

onResult方法

    void onResult(R result) {
        try {
            resultConsumer.accept(result);
        } finally {
            countDown();
        }
    }

4.2 收集结果

当所有shard数据收集完毕后,countDown会执行finishPhase,最终会触发ExpandSearchPhase

    FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
                     SearchPhaseController searchPhaseController,
                     SearchPhaseContext context) {
        this(resultConsumer, searchPhaseController, context,
            (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
                (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
    }

4.3 ExpandSearchPhase

取回所有数据之后,执行ExpandSearchPhase#方法

@Override
    public void run() throws IOException {
        if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
            SearchRequest searchRequest = context.getRequest();
            CollapseBuilder collapseBuilder = searchRequest.source().collapse();
            final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
            MultiSearchRequest multiRequest = new MultiSearchRequest();
            if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
                multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
            }
            for (SearchHit hit : searchResponse.hits().getHits()) {
                BoolQueryBuilder groupQuery = new BoolQueryBuilder();
                Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
                if (collapseValue != null) {
                    groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
                } else {
                    groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
                }
                QueryBuilder origQuery = searchRequest.source().query();
                if (origQuery != null) {
                    groupQuery.must(origQuery);
                }
                for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
                    CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
                    SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder)
                        .query(groupQuery)
                        .postFilter(searchRequest.source().postFilter());
                    SearchRequest groupRequest = buildExpandSearchRequest(searchRequest, sourceBuilder);
                    multiRequest.add(groupRequest);
                }
            }
            context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
                ActionListener.wrap(response -> {
                    Iterator<MultiSearchResponse.Item> it = response.iterator();
                    for (SearchHit hit : searchResponse.hits.getHits()) {
                        for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
                            MultiSearchResponse.Item item = it.next();
                            if (item.isFailure()) {
                                context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
                                return;
                            }
                            SearchHits innerHits = item.getResponse().getHits();
                            if (hit.getInnerHits() == null) {
                                hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));
                            }
                            hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
                        }
                    }
                    context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
                }, context::onFailure)
            );
        } else {
            context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
        }
    }

4.4 回复客户端

通过sendResponsePhase方法返回给客户端

    private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
        return new SearchPhase("response") {
            @Override
            public void run() throws IOException {
                context.onResponse(context.buildSearchResponse(response, scrollId));
            }
        };
    }

5 数据结点执行搜索过程

对于所有的Query和Fetch过程的入口函数在SearchTransportService#registerRequestHandler




    
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
            new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
                @Override
                //收到请求
                public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
                    //执行查询
                    searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
                        @Override
                        //处理成功结果
                        public void onResponse(SearchPhaseResult searchPhaseResult) {
                            try {
                                channel.sendResponse(searchPhaseResult);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }

                        @Override
                        //处理失败的结果
                        public void onFailure(Exception e) {
                            try {
                                channel.sendResponse(e);
                            } catch (IOException e1) {
                                throw new UncheckedIOException(e1);
                            }
                        }
                    });
                }
            });

查询入口函数是searchService.executeQueryPhase,查询是会优先差是否有缓存,如果有缓存,就查缓存

SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
        final SearchContext context = createAndPutContext(request);
        final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
        context.incRef();
        boolean queryPhaseSuccess = false;
        try {
            context.setTask(task);
            operationListener.onPreQueryPhase(context);
            long time = System.nanoTime();
            contextProcessing(context);
            //查询是否有缓存
            loadOrExecuteQueryPhase(request, context);

            if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
                freeContext(context.id());
            } else {
                contextProcessedSuccessfully(context);
            }
            final long afterQueryTime = System.nanoTime();
            queryPhaseSuccess = true;
            operationListener.onQueryPhase(context, afterQueryTime - time);
            if (request.numberOfShards() == 1) {
                return executeFetchPhase(context, operationListener, afterQueryTime);
            }
            return context.queryResult();
        } catch (Exception e) {
            // execution exception can happen while loading the cache, strip it
            if (e instanceof ExecutionException) {
                e = (e.getCause() == null || e.getCause() instanceof Exception) ?
                    (Exception) e.getCause() : new ElasticsearchException(e.getCause());
            }
            if (!queryPhaseSuccess) {
                operationListener.onFailedQueryPhase(context);
            }
            logger.trace("Query phase failed", e);
            processFailure(context, e);
            throw ExceptionsHelper.convertToRuntime(e);
        } finally {
            cleanContext(context);
        }
    }

loadOrExecuteQueryPhase有缓存从缓存加载数据,判断index.requests.cache.enable是否为true(默认为true),来判断是否有缓存。这个cache由节点的所有分片共享,空间满的时候删除最近最少使用的数据,cache并不缓存全部搜索结果

    /**
     * Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
     */
    private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
        final boolean canCache = indicesService.canCache(request, context);
        context.getQueryShardContext().freezeContext();
        if (canCache) {
            //查缓存
            indicesService.loadIntoContext(request, context, queryPhase);
        } else {
            //查Lucene
            queryPhase.execute(context);
        }
    }

queryPhase.execute查询方法

    @Override
    public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        if (searchContext.hasOnlySuggest()) {
            suggestPhase.execute(searchContext);
            // TODO: fix this once we can fetch docs for suggestions
            searchContext.queryResult().topDocs(
                    new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
                    new DocValueFormat[0]);
            return;
        }
        // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
        // request, preProcess is called on the DFS phase phase, this is why we pre-process them
        // here to make sure it happens during the QUERY phase
        aggregationPhase.preProcess(searchContext);
        final ContextIndexSearcher searcher = searchContext.searcher();
        boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);

        if (rescore) { //全文搜索且需要打分
            rescorePhase.execute(searchContext);
        }
        //自动补全及纠错
        suggestPhase.execute(searchContext);
        //实现聚合
        aggregationPhase.execute(searchContext);

        if (searchContext.getProfilers() != null) {
            ProfileShardResult shardResults = SearchProfileShardResults
                    .buildShardResults(searchContext.getProfilers());
            searchContext.queryResult().profileResults(shardResults);
        }
    }

慢操作Query日志的统计时间在于本阶段的处理时间

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/103933
 
251 次点击