Py学习  »  Elasticsearch

Elasticsearch基础概念之写入流程

逍遥白亦 • 3 年前 • 246 次点击  

写入流程

1.名词解释

预处理节点:数据前置处理转换的节点,支持 pipeline 设置,对数据进行过滤、转换等操作

协调节点:主要作用于请求转发,请求响应处理等轻量级操作

2.整体流程图

image

红色:协调节点
绿色:主分片节点
蓝色:副分片节点

2.1 协调节点流程

协调节点的流程代码在TransportBulkAction类里

2.1.1 参数检查

校验index、type、source、contentType不为空,参数检查遇到异常会拒绝当前请求

2.1.2 处理pipeline请求

请求中指定了pipeline参数,则先使用相应的pipeline进行处理

2.1.3 自动创建索引

具体步骤

  1. 收集请求中所有索引
  2. 创建不存在的索引
  3. 将创建索引请求发送到Master节点
  4. Master节点返回创建成功或失败的信息
  5. 对失败的响应进行标记,对成功的响应执行写流程
    相关源码
            // 在bulk操作执行之前,尝试创建所有我们需要的索引
            // Step 1: 收集请求中的所有索引
            final Set<String> indices = bulkRequest.requests.stream()
                    //删除不存在的索引
                .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE 
                        || request.versionType() == VersionType.EXTERNAL 
                        || request.versionType() == VersionType.EXTERNAL_GTE)
                .map(DocWriteRequest::index)
                .collect(Collectors.toSet());
            /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
             * that we'll use when we try to run the requests. */
             筛选出我们可以创建的不存在的索引。同时创建一个map来存放我们不能创建的索引
            final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
            Set<String> autoCreateIndices = new HashSet<>();
            ClusterState state = clusterService.state();
            for (String index : indices) {
                boolean shouldAutoCreate;
                try {
                    shouldAutoCreate = shouldAutoCreate(index, state);
                } catch (IndexNotFoundException e) {
                    shouldAutoCreate = false;
                    indicesThatCannotBeCreated.put(index, e);
                }
                if (shouldAutoCreate) {
                    autoCreateIndices.add(index);
                }
            }
            // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
            if (autoCreateIndices.isEmpty()) {
                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
            } else {
                final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                //遍历所有需要创建的索引
                for (String index : autoCreateIndices) {
                    //发送创建索引请求
                    createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
                        @Override
                        //收到执行成功响应
                        public void onResponse(CreateIndexResponse result) {
                            //将计数器递减,计数器的值为需要创建的索引数量
                            if (counter.decrementAndGet() == 0) {
                                //全部创建完毕时执行
                                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                            }
                        }

                        @Override
                        //收到失败的响应
                        public void onFailure(Exception e) {
                            if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
                                //将创建失败索引对应的请求置空
                                for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                    DocWriteRequest request = bulkRequest.requests.get(i);
                                    if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                        bulkRequest.requests.set(i, null);
                                    }
                                }
                            }
                            if (counter.decrementAndGet() == 0) {
                                executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                                    inner.addSuppressed(e);
                                    listener.onFailure(inner);
                                }), responses, indicesThatCannotBeCreated);
                            }
                        }
                    });
                }
            }

2.1.4 对请求的预先处理

该部分实现在TransportBulkAction.BulkOperation.doRun里
检查请求参数、自动生成doc的Id(如果指定Id的话,就不用自动生成)

2.1.5 检测集群状态

协调节点在处理前会检测集群状态,若集群异常会阻塞等待Master节点直至超时(阻塞操作)

        private boolean handleBlockExceptions(ClusterState state) {
            ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
            if (blockException != null) {
                if (blockException.retryable()) {
                    logger.trace("cluster is blocked, scheduling a retry", blockException);
                    retry(blockException);
                } else {
                    onFailure(blockException);
                }
                return true;
            }
            return false;
        }

2.1.6 请求合并

对所有请求进行分析,如果这些写入操作的文档的主分片都属于同一个,那么就把这些请求合并为1个。

Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
    for (int i = 0; i < bulkRequest.requests.size(); i++) {
        DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                    continue;
            }
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                shardRequests.add(new BulkItemRequest(i, request));
        
    }

2.1.7 路由

根据路由算法,确定主分片所在节点

2.1.8 转发请求并等待响应

根据上边路由确定的主分片所在节点,将请求进行转发并等待返回结果,收到所有返回结果之后,再转发给客户端。

protected void doRun() {
            setPhase(task, "routing");
            final ClusterState state = observer.setAndGetObservedState();
            if (handleBlockExceptions(state)) {
                return;
            }

            // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
            final String concreteIndex = concreteIndex(state);
            final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
            if (indexMetaData == null) {
                retry(new IndexNotFoundException(concreteIndex));
                return;
            }
            if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                throw new IndexClosedException(indexMetaData.getIndex());
            }

            // resolve all derived request fields, so we can route and apply it
            resolveRequest(indexMetaData, request);
            assert request.shardId() != null : "request shardId must be set in resolveRequest";
            assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
            //获取主分片所在节点
            final ShardRouting primary = primary(state);
            if (retryIfUnavailable(state, primary)) {
                return;
            }
            final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
            //如果主分片在本节点,则在本地执行,否则转发出去
            if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                performLocalAction(state, primary, node, indexMetaData);
            } else {
                performRemoteAction(state, primary, node);
            }
}

2.2 主分片节点流程

代码入口TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived,然后进入AsyncPrimaryAction#doRun方法

    protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {

        public PrimaryOperationTransportHandler() {

        }

        @Override
        public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
            throw new UnsupportedOperationException("the task parameter is required for this operation");
        }

        @Override
        public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
            new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
        }
    }

2.2.1. 检查请求

AsyncPrimaryAction#doRun方法中会调用acquirePrimaryShardReference方法。
该方法会对请求进行校验

  1. 当前是否为主分片
  2. allocationId是否是预期值
  3. PrimaryTerm是否是预期值
IndexShard indexShard = getIndexShard(shardId);
        //当前是否为主分片
        if (indexShard.routingEntry().primary() == false) {
            throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
                "actual shard is not a primary " + indexShard.routingEntry());
        }
        final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
        //allocationId是否是预期值
        if (actualAllocationId.equals(allocationId) == false) {
            throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
        }
        final long actualTerm = indexShard.getPendingPrimaryTerm();
        //PrimaryTerm是否是预期值
        if (actualTerm != primaryTerm) {
            throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
                primaryTerm, actualTerm);
        }

2.2.2 是否延迟执行

判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续。

    private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
                        final Object debugInfo, final StackTraceElement[] stackTrace) {
        //节点关闭直接返回                
        if (closed) {
            onAcquired.onFailure(new IndexShardClosedException(shardId));
            return;
        }
        final Releasable releasable;
        try {
            synchronized (this) {
                if (queuedBlockOperations > 0) {
                    final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                    final ActionListener<Releasable> wrappedListener;
                    if (executorOnDelay != null) {
                        wrappedListener =
                            new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
                                        new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
                    } else {
                        wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
                    }
                    //放入延迟队列
                    delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
                    return;
                } else {
                    releasable = acquire(debugInfo, stackTrace);
                }
            }
        } catch (final InterruptedException e) {
            onAcquired.onFailure(e);
            return;
        }
        //调用AsyncPrimaryAction的onResponse方法
        onAcquired.onResponse(releasable);
    }

2.2.3 判断主分片是否已经发生迁移

如果已经迁移:

  1. 将phase状态设为“primary_delegation”
  2. 关闭当前分片的primaryShardReference,及时释放资源
  3. 获取已经迁移到的目标节点,将请求转发到该节点,并等待执行结果
  4. 拿到结果后,将task状态更新为“finish”。
                //已经迁移
                if (primaryShardReference.isRelocated()) {
                    //关闭当前分片的primaryShardReference,及时释放资源
                    primaryShardReference.close(); 
                    //将phase状态设为“primary_delegation”
                    setPhase(replicationTask, "primary_delegation");
                    // delegate primary phase to relocation target
                    // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                    // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                    final ShardRouting primary = primaryShardReference.routingEntry();
                    assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                    //获取已经迁移到的目标节点
                    DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                    //将请求转发到该节点,并等待执行结果
                    transportService.sendRequest(relocatingNode, transportPrimaryAction,
                        new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
                        transportOptions,
                        new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                            TransportReplicationAction.this::newResponseInstance) {

                            @Override
                            public void handleResponse(Response response) {
                                setPhase(replicationTask, "finished");
                                super.handleResponse(response);
                            }

                            @Override
                            //拿到结果后,将task状态更新为“finish”。
                            public void handleException(TransportException exp) {
                                setPhase(replicationTask, "finished");
                                super.handleException(exp);
                            }
                        });
                } 
                else {  //没有迁移
                    // 将task状态更新为“primary”
                    setPhase(replicationTask, "primary");
                    final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                    //转发请求给副本分片
                    createReplicatedOperation(request,
                            ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                            primaryShardReference)
                            .execute(); //ReplicationOperation类的execute方法
                }

如果没有迁移:

  1. 将task状态更新为“primary”
  2. 主分片准备操作(主要部分)
  3. 转发请求给副本分片

2.2.4 开始写入主分片(关键步骤)

primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法

    public void execute() throws Exception {
        .......
        //关键,这里开始执行写主分片
        primaryResult = primary.perform(request);
        .......
    }

perform方法:

        @Override
        public PrimaryResult perform(Request request) throws Exception {
            PrimaryResult result = shardOperationOnPrimary(request, indexShard);
            assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
                + "] with a primary failure [" + result.finalFailure + "]";
            return result;
        }

最终会调用InternalEngine#index方法,写数据,先写Lucene(前文提到过的内存缓冲区)再写translog,这么做的目的是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。




    
    public IndexResult index(Index index) throws IOException {
                .......
                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                    // 将数据写入lucene,最终会调用lucene的文档写入接口
                    indexResult = indexIntoLucene(index, plan);
                } else {
                    indexResult = new IndexResult(
                        plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        // 写入translog
                        location = translog.add(new Translog.Index(index, indexResult)); //写translog
                    ......
                    indexResult.setTranslogLocation(location);
                }
              .......
        }

2.2.5 开始写副分片

    public void execute() throws Exception {
        .......
        //关键,这里开始执行写主分片
        primaryResult = primary.perform(request);
        .......
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            ........
            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
            // 关键步骤,写完primary后这里转发请求到replicas
            performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
        }
        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

在写完primary后,会继续写replicas,接下来需要将请求转发到从节点上,如果replica shard未分配,则直接忽略;如果replica shard正在搬迁数据到其他节点,则将请求转发到搬迁的目标shard上,否则,转发到replica shard。replicaRequest是在写入主分片后,从primaryResult中获取,并非原始Request。

private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                   final ReplicationGroup replicationGroup) {
        // for total stats, add number of unassigned shards and
        // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
        totalShards.addAndGet(replicationGroup.getSkippedShards().size());

        final ShardRouting primaryRouting = primary.routingEntry();

        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
            if (shard.isSameAllocation(primaryRouting) == false) {
                performOnReplica(shard, replicaRequest, globalCheckpoint);
            }
        }
    }

performOnReplica方法会将请求转发到目标节点,如果出现异常,如端节点挂掉、shard写入失败等,对于这些异常,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica

private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet();
        replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
            @Override
            public void onResponse(ReplicaResponse response) {
                successfulShards.incrementAndGet();
                try {
                    primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                    primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                } catch (final AlreadyClosedException e) {
                    // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                } catch (final Exception e) {
                    // fail the primary but fall through and let the rest of operation processing complete
                    final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                    primary.failShard(message, e);
                }
                decPendingAndFinishIfNeeded();
            }

            @Override
            public void onFailure(Exception replicaException) {
                logger.trace(() -> new ParameterizedMessage(
                    "[{}] failure while performing [{}] on replica {}, request [{}]",
                    shard.shardId(), opType, shard, replicaRequest), replicaException);
                // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                }
                String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                //处理副分片写入失败
                replicasProxy.failShardIfNeeded(shard, message,
                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
            }
        });
    }

2.3 副分片流程

执行与主分片基本相同的流程

2.4 检查点

上述写入完成之后,再返回客户端结果之前,会执行GlobalCheckpointSyncAction全局检查点的操作,该部分模块里边有一段很重要的代码,会影响ES的写入

    @Override
    protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
            final Request request, final IndexShard indexShard) throws Exception {
        maybeSyncTranslog(indexShard);
        return new PrimaryResult<>(request, new ReplicationResponse());
    }

上面代码中有个maybeSyncTranslog方法,该方法会判断要不要同步对Translog执行flush操作,也就是把translog中的日志文件纪录的索引数据,写到磁盘中,而且是同步写入,该方法源码如下:

    private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
        if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
            indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
            indexShard.sync();
        }
    }

在ES的默认配置中,为了保证数据的可靠性,每一次对translog的写入,都同步刷到磁盘中,保证数据不丢失。默认配置如下:
index.translog.durability:request

也就是上边代码中的Translog.Durability.REQUEST,但是这种配置写入性能非常的差,所以ES提供了额外的定时异步写入的方式,下边是参考的配置项

index.translog.durability:async

index.translog.sync_interval:120s

至此写入流程分析完毕

3 调优建议

  1. 副分片写入过程需要重新建立索引,所以每个主分片的副分片数量不宜太多
  2. ES的默认配置有许多影响性能的坑在里边,比如refresh刷新频率、translog随请求同步刷磁盘,建议如果降低refresh的刷新频率,并将translog改成异步写入磁盘
  3. 由于ES利用了系统缓存区来临时存放segment数据,所以该部分区域的内存占比,不宜太小
  4. 尽量用批量写入代替单条数据写入
  5. 由于ES对磁盘的利用率很大,有条件的话建议使用固态硬盘
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/103934
 
246 次点击