1 CCR的基本概念
什么是CCR?
CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制。
集群复制类似于数据订阅的方式,一个集群的数据可以被多个集群订阅,也就是可以被复制到多个集群上面去。
CCR 有两个角色,一个是 Leader,表示数据的源头,另外一个Follower,表示数据的订阅方,得到的是数据副本。
CCR 工作在索引层面,使用 Pull 的模式,Follower 索引主动的去 Pull Leader 的数据。
为什么需要CCR?
CCR可以解决下面的几个场景的问题:
集群高可用以及灾难恢复,在不同的地域部署多套ES集群,通过CCR实时同步数据。
实现数据的就近访问(地理),数据的就近访问,提升访问速度。
集中式的报告集群,通过多个备集群的处理,主集群可以进行分析处理。
2 CCR的使用
CCR是怎么使用的呢?
1. 本地集群连接远程集群
通过远程集群的节点进行连接,使用transport端口,命令如下:
PUT / _cluster/ settings
{
"persistent" : {
"cluster" : {
"remote" : {
"leader" : {
"seeds" : [
"127.0.0.1:9300"
]
}
}
}
}
}
2. 在远程集群创建leader索引
远程集群创建leader索引,和正常创建索引操作一样,需要开启soft_deletes,不过在7.x版本之后,已经默认开启了。
3. 在本地集群创建follower索引
创建follower索引有两种方式:手动创建和自动跟随方式。
手动创建:需要指定远程集群和复制的远程索引
PUT / < follower_index> / _ccr/ follow? wait_for_active_shards= 1
{
"remote_cluster" : "<remote_cluster>" ,
"leader_index" : "<leader_index>"
}
自动跟随创建:通过auto_follow API建立自动跟随的模板
PUT / _ccr/ auto_follow/ < auto_follow_pattern_name>
{
"remote_cluster" : "<remote_cluster>" ,
"leader_index_patterns" :
[
"<leader_index_pattern>"
] ,
"follow_index_pattern" : "<follow_index_pattern>"
}
这里不详细介绍CCR的使用,更多API请参考官网链接:
Cross-cluster replication APIs
。
3 CCR的源码分析
CCR的底层是怎么实现的呢?如何将远程集群的数据实时同步到本地集群呢?
主要涉及两部分:全量复制和增量复制
CCR是以plugins插件的形式:入口类在Ccr,继承了ActionPlugin(注册action),PersistentTaskPlugin(注册持久性任务),EnginePlugin(engine引擎接口),RepositoryPlugin(自定义快照接口)。
主要注册了下面几类action。
内部请求action:
内部的核心实现,用于处理数据的同步复制;状态请求action:用于获取follow和ccr的状态;follow actions:开启,暂停以及恢复follow的处理;auto-follow actions:自动跟随follow的处理action。
而且实现了FollowingEngine用于follow shard,实现了CcrRepository依靠远程集群快照用于还原数据。如下所示:
public List< ActionHandler< ? extends ActionRequest , ? extends ActionResponse >> getActions ( ) {
return Arrays. asList (
new ActionHandler < > ( BulkShardOperationsAction. INSTANCE, TransportBulkShardOperationsAction. class ) ,
new ActionHandler < > ( ShardChangesAction. INSTANCE, ShardChangesAction.
TransportAction. class ) ,
new ActionHandler < > ( PutInternalCcrRepositoryAction. INSTANCE,
PutInternalCcrRepositoryAction. TransportPutInternalRepositoryAction. class ) ,
new ActionHandler < > ( DeleteInternalCcrRepositoryAction. INSTANCE,
DeleteInternalCcrRepositoryAction. TransportDeleteInternalRepositoryAction. class ) ,
new ActionHandler < > ( PutCcrRestoreSessionAction. INSTANCE,
PutCcrRestoreSessionAction. TransportPutCcrRestoreSessionAction. class ) ,
new ActionHandler < > ( ClearCcrRestoreSessionAction. INSTANCE,
ClearCcrRestoreSessionAction. TransportDeleteCcrRestoreSessionAction. class ) ,
new ActionHandler < > ( GetCcrRestoreFileChunkAction. INSTANCE,
GetCcrRestoreFileChunkAction. TransportGetCcrRestoreFileChunkAction. class ) ,
new ActionHandler < > ( FollowStatsAction. INSTANCE, TransportFollowStatsAction. class ) ,
new ActionHandler < > ( CcrStatsAction. INSTANCE, TransportCcrStatsAction. class ) ,
new ActionHandler < > ( FollowInfoAction. INSTANCE, TransportFollowInfoAction. class ) ,
new ActionHandler < > ( PutFollowAction. INSTANCE, TransportPutFollowAction. class ) ,
new ActionHandler < > ( ResumeFollowAction. INSTANCE, TransportResumeFollowAction. class ) ,
new ActionHandler < > ( PauseFollowAction. INSTANCE, TransportPauseFollowAction. class ) ,
new ActionHandler < > ( UnfollowAction. INSTANCE, TransportUnfollowAction. class ) ,
new ActionHandler < > ( DeleteAutoFollowPatternAction. INSTANCE, TransportDeleteAutoFollowPatternAction. class ) ,
new ActionHandler < > ( PutAutoFollowPatternAction. INSTANCE, TransportPutAutoFollowPatternAction. class ) ,
new ActionHandler < > ( GetAutoFollowPatternAction. INSTANCE, TransportGetAutoFollowPatternAction. class ) ,
new ActionHandler < > ( ForgetFollowerAction. INSTANCE, TransportForgetFollowerAction. class ) ) ;
}
public Optional< EngineFactory> getEngineFactory ( final IndexSettings indexSettings) {
. . .
. .
return Optional. of ( new FollowingEngineFactory ( ) ) ;
. . . . .
}
public Map< String, Repository. Factory> getInternalRepositories ( Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository. Factory repositoryFactory =
( metadata) - > new CcrRepository ( metadata, client, ccrLicenseChecker, settings, ccrSettings. get ( ) , threadPool. get ( ) ) ;
return Collections. singletonMap ( CcrRepository. TYPE, repositoryFactory) ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
虽然整个复制是在索引层面进行管理的,但实际复制是在shard级别进行的。创建完follower索引之后,会自动配置和leader索引相同的mapping以及settings。然后follower索引中的shard发起request从leader索引中拉取数据。
3.1 全量复制
当创建follower索引时,会先构造快照和仓库,然后进行快照恢复。
代码入口:TransportPutFollowAction#masterOperation
因为TransportPutFollowAction继承了TransportMasterNodeAction,所以TransportMasterNodeAction#doExecute->AsyncSingleAction#start->AsyncSingleAction#doStart->TransportPutFollowAction#masterOperation
先连接远程集群,获取leader index,然后创建follower index,代码如下:
protected void masterOperation ( . . . . ) {
. . . . .
String remoteCluster = request. getRemoteCluster ( ) ;
client. getRemoteClusterClient ( remoteCluster) ;
String leaderIndex = request. getLeaderIndex ( ) ;
ccrLicenseChecker. checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs (
client,
remoteCluster,
leaderIndex,
listener: : onFailure,
( historyUUID, leaderIndexMetaData) - > createFollowerIndex ( leaderIndexMetaData, request, listener) ) ;
}
创建follower index的过程如下:
先检查leaderIndexMetaData 是否为空,再检查是否开启了soft deletes。
根据内部PutInternalCcrRepositoryRequest创建好的快照仓库,快照仓库创建的逻辑在PutInternalCcrRepositoryAction#doExecute->RepositoriesService#registerInternalRepository->RepositoriesService#createRepository,调用的是snapshot模块自身的实现。
构建restoreRequest,快照恢复request。
在线程池中启动snapshot线程,进入restoreSnapshot方法进行快照恢复,调用的是snapshot模块自身的实现。
afterRestoreStarted方法监听并处理整个快照恢复过程,当所有的shard都恢复成功之后,返回成功。
这里不深入分析snapshot模块是如何创建快照仓库和恢复快照的,这部分实现原理后续再更新。
private void createFollowerIndex ( . . . . . ) {
. . . . .
final String leaderClusterRepoName = CcrRepository. NAME_PREFIX + request. getRemoteCluster ( ) ;
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest ( leaderClusterRepoName, CcrRepository. LATEST) . indices ( request. getLeaderIndex ( ) ) . indicesOptions ( request. indicesOptions ( ) ) . renamePattern ( "^(.*)$" ) . renameReplacement ( request. getFollowerIndex ( ) ) . masterNodeTimeout ( request. masterNodeTimeout ( ) ) . indexSettings ( settingsBuilder) ;
final Client clientWithHeaders = CcrLicenseChecker. wrapClient ( this . client, threadPool. getThreadContext ( ) . getHeaders ( ) ) ;
threadPool. executor ( ThreadPool. Names. SNAPSHOT) . execute ( new AbstractRunnable ( ) {
@Override
public void onFailure ( Exception e) {
listener. onFailure ( e) ;
}
@Override
protected void doRun ( ) throws Exception {
restoreService. restoreSnapshot ( restoreRequest, new ActionListener < RestoreService. RestoreCompletionResponse> ( ) {
@Override
public void onResponse ( RestoreService. RestoreCompletionResponse response) {
afterRestoreStarted ( clientWithHeaders, request, listener, response) ;
}
@Override
public void onFailure ( Exception e) {
listener. onFailure ( e) ;
}
} ) ;
}
} ) ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
23 24 25 26 27 28 29 30 31
3.2 增量复制
前面讲过复制是在shard级别进行的,每个shard的复制都有对应的shardFollowTask,CCR插件注册了ShardFollowTasksExecutor执行器,follower接收到read request之后,会先将它们放置在写缓存区,ShardFollowTask管理器管理缓存区,并将缓存区的写请求批量提交给followe shard进行数据写入。
代码入口:ShardFollowTasksExecutor#nodeOperation
在每个执行node上执行的操作,进入ShardFollowNodeTask#start,更新follower索引的mapping和settings,确保和leader索引的mapping和settings版本一致。
updateMapping ( 0 L, leaderMappingVersion - > {
. . . . . .
updateSettings ( leaderSettingsVersion - > {
synchronized ( ShardFollowNodeTask. this ) {
currentSettingsVersion = leaderSettingsVersion;
}
. . . . . .
coordinateReads ( ) ;
} ) ;
} ) ;
然后执行调用coordinateReads方法,根据seq_no和GlobalCheckpoint判断read request的范围,并通过sendShardChangesRequest方法发送该read request。
hasReadBudget方法主要判断read容量是否已满:1.是否已经超过了并发read的最大数量;2.是否已经超过buffer size的限制;3.是否已经超过了buffer count的限制。
synchronized void coordinateReads ( ) {
final int maxReadRequestOperationCount = params. getMaxReadRequestOperationCount ( ) ;
while ( hasReadBudget ( ) && lastRequestedSeqNo < leaderGlobalCheckpoint) {
final long from = lastRequestedSeqNo + 1 ;
final long maxRequiredSeqNo = Math. min ( leaderGlobalCheckpoint, from + maxReadRequestOperationCount - 1 ) ;
final int requestOpCount;
if ( numOutstandingReads == 0 ) {
requestOpCount = maxReadRequestOperationCount;
} else {
requestOpCount = Math. toIntExact ( maxRequiredSeqNo - from + 1 ) ;
}
numOutstandingReads++ ;
sendShardChangesRequest ( from, requestOpCount, maxRequiredSeqNo) ;
lastRequestedSeqNo = maxRequiredSeqNo;
}
if ( numOutstandingReads == 0 && hasReadBudget ( ) ) {
numOutstandingReads++ ;
long from = lastRequestedSeqNo + 1 ;
sendShardChangesRequest ( from, maxReadRequestOperationCount, lastRequestedSeqNo) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
22
接下一篇:
【Elasticsearch源码】CCR源码分析(二)
。