Py学习  »  Elasticsearch

【Elasticsearch源码】CCR源码分析(一)

少加点香菜 • 4 年前 • 195 次点击  

1 CCR的基本概念

什么是CCR?

CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制。

集群复制类似于数据订阅的方式,一个集群的数据可以被多个集群订阅,也就是可以被复制到多个集群上面去。

CCR 有两个角色,一个是 Leader,表示数据的源头,另外一个Follower,表示数据的订阅方,得到的是数据副本。
CCR 工作在索引层面,使用 Pull 的模式,Follower 索引主动的去 Pull Leader 的数据。

为什么需要CCR?

CCR可以解决下面的几个场景的问题:

  1. 集群高可用以及灾难恢复,在不同的地域部署多套ES集群,通过CCR实时同步数据。
  2. 实现数据的就近访问(地理),数据的就近访问,提升访问速度。
  3. 集中式的报告集群,通过多个备集群的处理,主集群可以进行分析处理。

2 CCR的使用

CCR是怎么使用的呢?

1. 本地集群连接远程集群

通过远程集群的节点进行连接,使用transport端口,命令如下:

PUT /_cluster/settings
{
  "persistent" : {
    "cluster" : {
      "remote" : {
        "leader" : {
          "seeds" : [
            "127.0.0.1:9300" 
          ]
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2. 在远程集群创建leader索引

远程集群创建leader索引,和正常创建索引操作一样,需要开启soft_deletes,不过在7.x版本之后,已经默认开启了。

3. 在本地集群创建follower索引

创建follower索引有两种方式:手动创建和自动跟随方式。

手动创建:需要指定远程集群和复制的远程索引

PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{
  "remote_cluster" : "<remote_cluster>",
  "leader_index" : "<leader_index>"
}
  • 1
  • 2
  • 3
  • 4
  • 5

自动跟随创建:通过auto_follow API建立自动跟随的模板

PUT /_ccr/auto_follow/<auto_follow_pattern_name>
{
  "remote_cluster" : "<remote_cluster>",
  "leader_index_patterns" :
  [
    "<leader_index_pattern>"
  ],
  "follow_index_pattern" : "<follow_index_pattern>"
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里不详细介绍CCR的使用,更多API请参考官网链接: Cross-cluster replication APIs

3 CCR的源码分析

CCR的底层是怎么实现的呢?如何将远程集群的数据实时同步到本地集群呢?

主要涉及两部分:全量复制和增量复制

CCR是以plugins插件的形式:入口类在Ccr,继承了ActionPlugin(注册action),PersistentTaskPlugin(注册持久性任务),EnginePlugin(engine引擎接口),RepositoryPlugin(自定义快照接口)。
在这里插入图片描述
主要注册了下面几类action。
内部请求action: 内部的核心实现,用于处理数据的同步复制;状态请求action:用于获取follow和ccr的状态;follow actions:开启,暂停以及恢复follow的处理;auto-follow actions:自动跟随follow的处理action。
而且实现了FollowingEngine用于follow shard,实现了CcrRepository依靠远程集群快照用于还原数据。如下所示:

    public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
        return Arrays.asList(
                // internal actions
                new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
                new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.


    
TransportAction.class),
                new ActionHandler<>(PutInternalCcrRepositoryAction.INSTANCE,
                    PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
                new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
                    DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
                new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
                    PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
                new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
                    ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
                new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE,
                    GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class),
                // stats action
                new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
                new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
                new ActionHandler<>(FollowInfoAction.INSTANCE, TransportFollowInfoAction.class),
                // follow actions
                new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
                new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
                new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
                new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class),
                // auto-follow actions
                new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
                new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
                new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),
                // forget follower action
                new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class));
    }
    public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
        ...


    
..
            return Optional.of(new FollowingEngineFactory());
        .....
    }
    public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
        Repository.Factory repositoryFactory =
            (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());
        return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

虽然整个复制是在索引层面进行管理的,但实际复制是在shard级别进行的。创建完follower索引之后,会自动配置和leader索引相同的mapping以及settings。然后follower索引中的shard发起request从leader索引中拉取数据。

3.1 全量复制

当创建follower索引时,会先构造快照和仓库,然后进行快照恢复。

代码入口:TransportPutFollowAction#masterOperation
因为TransportPutFollowAction继承了TransportMasterNodeAction,所以TransportMasterNodeAction#doExecute->AsyncSingleAction#start->AsyncSingleAction#doStart->TransportPutFollowAction#masterOperation

先连接远程集群,获取leader index,然后创建follower index,代码如下:

    protected void masterOperation(....) {
        .....
        // 连接远程集群
        String remoteCluster = request.getRemoteCluster();
        client.getRemoteClusterClient(remoteCluster);
		// 获取远程集群的leaderIndex 
        String leaderIndex = request.getLeaderIndex();
        // 创建follower index
        ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
            client,
            remoteCluster,
            leaderIndex,
            listener::onFailure,
            (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, request, listener)); 
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

创建follower index的过程如下:

  1. 先检查leaderIndexMetaData 是否为空,再检查是否开启了soft deletes。
  2. 根据内部PutInternalCcrRepositoryRequest创建好的快照仓库,快照仓库创建的逻辑在PutInternalCcrRepositoryAction#doExecute->RepositoriesService#registerInternalRepository->RepositoriesService#createRepository,调用的是snapshot模块自身的实现。
  3. 构建restoreRequest,快照恢复request。
  4. 在线程池中启动snapshot线程,进入restoreSnapshot方法进行快照恢复,调用的是snapshot模块自身的实现。
  5. afterRestoreStarted方法监听并处理整个快照恢复过程,当所有的shard都恢复成功之后,返回成功。

这里不深入分析snapshot模块是如何创建快照仓库和恢复快照的,这部分实现原理后续再更新。

    private void createFollowerIndex(.....) {
        .....// 参数检查
        // 创建好的快照仓库
        final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
        // 构建restoreRequest 快照恢复request
        final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST).indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$").renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()).indexSettings(settingsBuilder);

        final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
        // 在线程池中执行snapshot线程请求
        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
            @Override
            public void onFailure(Exception e) {
                listener.onFailure(e);
            }
            @Override
            protected void doRun() throws Exception {
                // 执行快照恢复
                restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {
                    @Override
                    public void onResponse(RestoreService.RestoreCompletionResponse response) {
                        // 开始恢复之后进行监听
                        afterRestoreStarted(clientWithHeaders, request, listener, response);
                    }
                    @Override
                    public void onFailure(Exception e) {
                        listener.onFailure(e);
                    }
                });
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3.2 增量复制

前面讲过复制是在shard级别进行的,每个shard的复制都有对应的shardFollowTask,CCR插件注册了ShardFollowTasksExecutor执行器,follower接收到read request之后,会先将它们放置在写缓存区,ShardFollowTask管理器管理缓存区,并将缓存区的写请求批量提交给followe shard进行数据写入。

代码入口:ShardFollowTasksExecutor#nodeOperation

在每个执行node上执行的操作,进入ShardFollowNodeTask#start,更新follower索引的mapping和settings,确保和leader索引的mapping和settings版本一致。

    updateMapping(0L, leaderMappingVersion -> {
            ......
            updateSettings(leaderSettingsVersion -> {
                synchronized (ShardFollowNodeTask.this) {
                    currentSettingsVersion = leaderSettingsVersion;
                }
                ......
                coordinateReads();
            });
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

然后执行调用coordinateReads方法,根据seq_no和GlobalCheckpoint判断read request的范围,并通过sendShardChangesRequest方法发送该read request。
hasReadBudget方法主要判断read容量是否已满:1.是否已经超过了并发read的最大数量;2.是否已经超过buffer size的限制;3.是否已经超过了buffer count的限制。

    synchronized void coordinateReads() {
        final int maxReadRequestOperationCount = params.getMaxReadRequestOperationCount();
        // 当read容量未满且最新的seq_no小于leader的GlobalCheckpoint(即最新操作),则有数据需要更新
        while (hasReadBudget() && lastRequestedSeqNo < leaderGlobalCheckpoint) {
            final long from = lastRequestedSeqNo + 1;
            final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxReadRequestOperationCount - 1);
            final int requestOpCount;
            if (numOutstandingReads == 0) {
                requestOpCount = maxReadRequestOperationCount;
            } else {
                requestOpCount = Math.toIntExact(maxRequiredSeqNo - from + 1);
            }
            numOutstandingReads++;
            sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);
            lastRequestedSeqNo = maxRequiredSeqNo;
        }
        if (numOutstandingReads == 0 && hasReadBudget()) {
            numOutstandingReads++;
            long from = lastRequestedSeqNo + 1;
            sendShardChangesRequest(from, maxReadRequestOperationCount, lastRequestedSeqNo);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

接下一篇: 【Elasticsearch源码】CCR源码分析(二)

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/53497
 
195 次点击