社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Elasticsearch基础概念之查询流程

逍遥白亦 • 4 年前 • 301 次点击  

ES查询流程

查询流程总体可以分为以下几个部分

  1. 请求解析
  2. 搜索Query阶段
  3. 搜索Fetch阶段
  4. 数据排序和合并

1.总体流程

2.请求解析

2.1 解析request对象并构建SearchRequest对象

对于客户端发过来的search请求,最终会在RestSearchAction#prepareRequest方法中处理

    SearchRequest searchRequest = new SearchRequest();

    IntConsumer setSize = size -> searchRequest.source().size(size);
    //根据request中的参数,构建SearchRequest对象
    request.withContentOrSourceParamParserOrNull(parser ->
        parseSearchRequest(searchRequest, request, parser, setSize));
    //执行搜索请求
    return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));

2.2 调用search

经过层层封装,会调用NodeClient#executeLocally方法

    public <    Request extends ActionRequest,
                Response extends ActionResponse
            > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
        return transportAction(action).execute(request, listener);
    }

之后会执行TransportAction#execute方法

public final Task execute(Request request, ActionListener<Response> listener) {
        Task task = taskManager.register("transport", actionName, request);
        if (task == null) {
            execute(null, request, listener);
        } else {
            execute(task, request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response response) {
                    taskManager.unregister(task);
                    listener.onResponse(response);
                }

                @Override
                public void onFailure(Exception e) {
                    taskManager.unregister(task);
                    listener.onFailure(e);
                }
            });
        }
        return task;
    }

最后会调用TransportSearchAction#doExecute方法

3.Query阶段

3.1 获取本地shard和远程shard并合并

TransportSearchAction#doExecute方法

protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
            ...
            if (remoteClusterIndices.isEmpty()) {   //无远程索引
                executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
                    (clusterName, nodeId) -> {
                        return null;
                    }, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
                        .getDataNodes().size(), SearchResponse.Clusters.EMPTY);
            } else {    //有远程索引
                //收集所有远程shard
                remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
                    searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
                        List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
                        Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
                        BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
                            remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                        int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
                            + clusterState.getNodes().getDataNodes().size();
                        SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
                        executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
                            remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved,
                            clusters);
                    }, listener::onFailure));
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }

执行executeSearch方法

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
                               Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
                               BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
                               Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount,
                               SearchResponse.Clusters clusters) {

        ...
        //获取本地shard
        GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
        //将本地shard和远程shard合并        
        GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
            remoteShardIterators);
        ...
        //是否为QUERY_THEN_FETCH
        boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
        //异步执行搜索
        searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
            Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
    }

3.2 遍历所有shard执行请求

执行搜索方法searchAsyncAction(...).start()




    
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
                                                        GroupShardsIterator<SearchShardIterator> shardIterators,
                                                        SearchTimeProvider timeProvider,
                                                        BiFunction<String, String, Transport.Connection> connectionLookup,
                                                        long clusterStateVersion,
                                                        Map<String, AliasFilter> aliasFilter,
                                                        Map<String, Float> concreteIndexBoosts,
                                                        Map<String, Set<String>> indexRoutings,
                                                        ActionListener<SearchResponse> listener,
                                                        boolean preFilter,
                                                        SearchResponse.Clusters clusters) {
        //使用search线程池管理搜索请求                                                
        Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
        if (preFilter) { //搜索类型为QUERY_THEN_FETCH
            return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
                aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
                timeProvider, clusterStateVersion, task, (iter) -> {
                AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
                    clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
                return new SearchPhase(action.getName()) {
                    @Override
                    public void run() {
                        action.start();
                    }
                };
            }, clusters);
        } else { //执行搜索type
            AbstractSearchAsyncAction searchAsyncAction;
            switch (searchRequest.searchType()) {
                case DFS_QUERY_THEN_FETCH:
                    searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
                        shardIterators, timeProvider, clusterStateVersion, task, clusters);
                    break;
                case QUERY_AND_FETCH:
                case QUERY_THEN_FETCH:
                    searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
                        shardIterators, timeProvider, clusterStateVersion, task, clusters);
                    break;
                default:
                    throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
            }
            return searchAsyncAction;
        }
    }

这里只分析QUERY_THEN_FETCH这个type类型,最终会调用InitialSearchPhase的run方法,该方法会执行每一个shard请求

@Override
    public final void run() throws IOException {
        ...
        if (shardsIts.size() > 0) {
            //最大分片请求数
            int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
            ...
            for (int index = 0; index < maxConcurrentShardRequests; index++) {
                final SearchShardIterator shardRoutings = shardsIts.get(index);
                assert shardRoutings.skip() == false;
                //执行shard请求
                performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }

调用performPhaseOnShard方法执行shard请求

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {

        final Thread thread = Thread.currentThread();
        if (shard == null) {
            fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
        } else {
            try {
                executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
                    shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
                    //收到执行成功回复
                    @Override
                    public void innerOnResponse(FirstResult result) {
                        maybeFork(thread, () -> onShardResult(result, shardIt));
                    }
                    //收到执行失败回复
                    @Override
                    public void onFailure(Exception t) {
                        maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
                    }
                });
            } catch (final Exception e) {
                /*
                 * It is possible to run into connection exceptions here because we are getting the connection early and might run in to
                 * nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
                 */
                fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
            }
        }
    }

会调用TransportInterceptor#sendRequest方法发送请求

3.3 收集结果

调用onShardResult收集成功结果

    private void onShardResult(FirstResult result, SearchShardIterator shardIt) {
        assert result.getShardIndex() != -1 : "shard index is not set";
        assert result.getSearchShardTarget() != null : "search shard target must not be null";
        //对所有结果进行收集
        onShardSuccess(result);
        //是否所有请求都已收到回复
        successfulShardExecution(shardIt);
    }

onShardSuccess对所有结果进行收集

    @Override
    public final void onShardSuccess(Result result) {
        successfulOps.incrementAndGet();
        results.consumeResult(result);
        if (logger.isTraceEnabled()) {
            logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
        }

        AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
        if (shardFailures != null) {
            shardFailures.set(result.getShardIndex(), null);
        }
    }

successfulShardExecution是否所有请求都已收到回复,都收到之后进行下一阶段

private void successfulShardExecution(SearchShardIterator shardsIt) {
        final int remainingOpsOnIterator;
        if (shardsIt.skip()) {
            remainingOpsOnIterator = shardsIt.remaining();
        } else {
            remainingOpsOnIterator = shardsIt.remaining() + 1;
        }
        final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
        //收到全部请求结果
        if (xTotalOps == expectedTotalOps) {
            onPhaseDone();
        } else if (xTotalOps > expectedTotalOps) {
            throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
                + expectedTotalOps + "]");
        } else if (shardsIt.skip() == false) {
            maybeExecuteNext();
        }
    }

4.Fetch阶段

4.1 发送fetch请求

onPhaseDone()会调用AbstractSearchAsyncAction#executeNextPhase方法

@Override
    public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
        /* This is the main search phase transition where we move to the next phase. At this point we check if there is
         * at least one successful operation left and if so we move to the next phase. If not we immediately fail the
         * search phase as "all shards failed"*/
        if (successfulOps.get() == 0) { // 无搜索结果
            final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
            Throwable cause = shardSearchFailures.length == 0 ? null :
                ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
            logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
            onPhaseFailure(currentPhase, "all shards failed", cause);
        } else { //如果请求将产生部分结果, 设置为false用来返回整体故障
            Boolean allowPartialResults = request.allowPartialSearchResults();
            assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
            if (allowPartialResults == false && shardFailures.get() != null ){
                if (logger.isDebugEnabled()) {
                    final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
                    Throwable cause = shardSearchFailures.length == 0 ? null :
                        ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
                    logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
                            shardSearchFailures.length, getName()), cause);
                }
                onPhaseFailure(currentPhase, "Partial shards failure", null);
            } else { //将在超时或部分失败的情况下, 返回部分结果.
                if (logger.isTraceEnabled()) {
                    final String resultsFrom = results.getSuccessfulResults()
                        .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
                    logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
                        currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
                }
                //执行下一阶段方法
                executePhase(nextPhase);
            }
        }
    }
    private void executePhase(SearchPhase phase) {
        try {
            phase.run();
        } catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
            }
            onPhaseFailure(phase, "", e);
        }
    }

FetchSearchPhase继承了SearchPhase类,所以最终会调用FetchSearchPhase#run方法

    @Override
    public void run() throws IOException {
        context.execute(new ActionRunnable<SearchResponse>(context) {
            @Override
            public void doRun() throws IOException {
                //具体执行方法
                innerRun();
            }

            @Override
            public void onFailure(Exception e) {
                context.onPhaseFailure(FetchSearchPhase.this, "", e);
            }
        });
    }

innerRun()方法




    
private void innerRun() throws IOException {
        ...
        //查询都处理完之后进入下一阶段
         final Runnable finishPhase = ()
            -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
            queryResults : fetchResults);
        ...
        if (queryAndFetchOptimization) {
            assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
                + "], single result: " +  phaseResults.get(0).fetchResult();
            // query AND fetch optimization
            finishPhase.run();
        } else {
            final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
            if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
                phaseResults.stream()
                    .map(SearchPhaseResult::queryResult)
                    .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
                finishPhase.run();
            } else {
                final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
                    searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
                    : null;
                //定义收集器
                final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
                    docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
                    finishPhase, context);
                for (int i = 0; i < docIdsToLoad.length; i++) {
                    IntArrayList entry = docIdsToLoad[i];
                    SearchPhaseResult queryResult = queryResults.get(i);
                    if (entry == null) { // no results for this shard ID
                        if (queryResult != null) {
                            // if we got some hits from this shard we have to release the context there
                            // we do this as we go since it will free up resources and passing on the request on the
                            // transport layer is cheap.
                            releaseIrrelevantSearchContext(queryResult.queryResult());
                        }
                        // in any case we count down this result since we don't talk to this shard anymore
                        counter.countDown();
                    } else {
                        SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                        Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
                            searchShardTarget.getNodeId());
                        ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
                            lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
                            
                        //执行Fetch    
                        executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
                            connection);
                    }
                }
            }
        }
    }

executeFetch方法会处理成功或失败的请求,executeFetch的参数querySearchResult中包含分页信息,最后定义一个Listener,每成功获取一个shard数据后就执行counter.onResult,调用对结果的处理回调,把result保存到数组中,然后执行countDown

    private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
                              final CountedCollector<FetchSearchResult> counter,
                              final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
                              final Transport.Connection connection) {
        // 发送请求
        context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
            new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
                @Override
                //处理成功的请求
                public void innerOnResponse(FetchSearchResult result) {
                    counter.onResult(result);
                }

                @Override
                //处理失败的请求
                public void onFailure(Exception e) {
                    try {
                        logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
                        counter.onFailure(shardIndex, shardTarget, e);
                    } finally {
                        // the search context might not be cleared on the node where the fetch was executed for example
                        // because the action was rejected by the thread pool. in this case we need to send a dedicated
                        // request to clear the search context.
                        releaseIrrelevantSearchContext(querySearchResult);
                    }
                }
            });
    }

onResult方法

    void onResult(R result) {
        try {
            resultConsumer.accept(result);
        } finally {
            countDown();
        }
    }

4.2 收集结果

当所有shard数据收集完毕后,countDown会执行finishPhase,最终会触发ExpandSearchPhase

    FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
                     SearchPhaseController searchPhaseController,
                     SearchPhaseContext context) {
        this(resultConsumer, searchPhaseController, context,
            (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
                (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
    }

4.3 ExpandSearchPhase

取回所有数据之后,执行ExpandSearchPhase#方法

@Override
    public void run() throws IOException {
        if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
            SearchRequest searchRequest = context.getRequest();
            CollapseBuilder collapseBuilder = searchRequest.source().collapse();
            final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
            MultiSearchRequest multiRequest = new MultiSearchRequest();
            if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
                multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
            }
            for (SearchHit hit : searchResponse.hits().getHits()) {
                BoolQueryBuilder groupQuery = new BoolQueryBuilder();
                Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
                if (collapseValue != null) {
                    groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
                } else {
                    groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
                }
                QueryBuilder origQuery = searchRequest.source().query();
                if (origQuery != null) {
                    groupQuery.must(origQuery);
                }
                for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
                    CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
                    SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder)
                        .query(groupQuery)
                        .postFilter(searchRequest.source().postFilter());
                    SearchRequest groupRequest = buildExpandSearchRequest(searchRequest, sourceBuilder);
                    multiRequest.add(groupRequest);
                }
            }
            context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
                ActionListener.wrap(response -> {
                    Iterator<MultiSearchResponse.Item> it = response.iterator();
                    for (SearchHit hit : searchResponse.hits.getHits()) {
                        for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
                            MultiSearchResponse.Item item = it.next();
                            if (item.isFailure()) {
                                context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
                                return;
                            }
                            SearchHits innerHits = item.getResponse().getHits();
                            if (hit.getInnerHits() == null) {
                                hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));
                            }
                            hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
                        }
                    }
                    context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
                }, context::onFailure)
            );
        } else {
            context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
        }
    }

4.4 回复客户端

通过sendResponsePhase方法返回给客户端

    private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
        return new SearchPhase("response") {
            @Override
            public void run() throws IOException {
                context.onResponse(context.buildSearchResponse(response, scrollId));
            }
        };
    }

5 数据结点执行搜索过程

对于所有的Query和Fetch过程的入口函数在SearchTransportService#registerRequestHandler




    
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
            new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
                @Override
                //收到请求
                public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
                    //执行查询
                    searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
                        @Override
                        //处理成功结果
                        public void onResponse(SearchPhaseResult searchPhaseResult) {
                            try {
                                channel.sendResponse(searchPhaseResult);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }

                        @Override
                        //处理失败的结果
                        public void onFailure(Exception e) {
                            try {
                                channel.sendResponse(e);
                            } catch (IOException e1) {
                                throw new UncheckedIOException(e1);
                            }
                        }
                    });
                }
            });

查询入口函数是searchService.executeQueryPhase,查询是会优先差是否有缓存,如果有缓存,就查缓存

SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
        final SearchContext context = createAndPutContext(request);
        final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
        context.incRef();
        boolean queryPhaseSuccess = false;
        try {
            context.setTask(task);
            operationListener.onPreQueryPhase(context);
            long time = System.nanoTime();
            contextProcessing(context);
            //查询是否有缓存
            loadOrExecuteQueryPhase(request, context);

            if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
                freeContext(context.id());
            } else {
                contextProcessedSuccessfully(context);
            }
            final long afterQueryTime = System.nanoTime();
            queryPhaseSuccess = true;
            operationListener.onQueryPhase(context, afterQueryTime - time);
            if (request.numberOfShards() == 1) {
                return executeFetchPhase(context, operationListener, afterQueryTime);
            }
            return context.queryResult();
        } catch (Exception e) {
            // execution exception can happen while loading the cache, strip it
            if (e instanceof ExecutionException) {
                e = (e.getCause() == null || e.getCause() instanceof Exception) ?
                    (Exception) e.getCause() : new ElasticsearchException(e.getCause());
            }
            if (!queryPhaseSuccess) {
                operationListener.onFailedQueryPhase(context);
            }
            logger.trace("Query phase failed", e);
            processFailure(context, e);
            throw ExceptionsHelper.convertToRuntime(e);
        } finally {
            cleanContext(context);
        }
    }

loadOrExecuteQueryPhase有缓存从缓存加载数据,判断index.requests.cache.enable是否为true(默认为true),来判断是否有缓存。这个cache由节点的所有分片共享,空间满的时候删除最近最少使用的数据,cache并不缓存全部搜索结果

    /**
     * Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
     */
    private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
        final boolean canCache = indicesService.canCache(request, context);
        context.getQueryShardContext().freezeContext();
        if (canCache) {
            //查缓存
            indicesService.loadIntoContext(request, context, queryPhase);
        } else {
            //查Lucene
            queryPhase.execute(context);
        }
    }

queryPhase.execute查询方法

    @Override
    public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        if (searchContext.hasOnlySuggest()) {
            suggestPhase.execute(searchContext);
            // TODO: fix this once we can fetch docs for suggestions
            searchContext.queryResult().topDocs(
                    new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
                    new DocValueFormat[0]);
            return;
        }
        // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
        // request, preProcess is called on the DFS phase phase, this is why we pre-process them
        // here to make sure it happens during the QUERY phase
        aggregationPhase.preProcess(searchContext);
        final ContextIndexSearcher searcher = searchContext.searcher();
        boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);

        if (rescore) { //全文搜索且需要打分
            rescorePhase.execute(searchContext);
        }
        //自动补全及纠错
        suggestPhase.execute(searchContext);
        //实现聚合
        aggregationPhase.execute(searchContext);

        if (searchContext.getProfilers() != null) {
            ProfileShardResult shardResults = SearchProfileShardResults
                    .buildShardResults(searchContext.getProfilers());
            searchContext.queryResult().profileResults(shardResults);
        }
    }

慢操作Query日志的统计时间在于本阶段的处理时间

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/103933
 
301 次点击