Elasticsearch进阶教程
集群搭建
名词解释
- Cluster
代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的
- Shards
代表索引分片,es可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上。构成分布式搜索
分片的数量只能在索引创建前指定,并且索引创建后不能更改
- Replicas
代表索引副本,es可以设置多个索引的副本,副本的作用一是提高系统的容错性,当某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡
- Recovery
代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复
遵循原则
- 至少三个节点
保证可用性
- 每个索引至少有一个副本(Replicas)在所有的分片(Shard)
保证数据容错
- 至少有三个节点允许被选举(Voting)为主节点
保证节点容错,单个主节点且没有可用的备选主节点,ElasticSearch就不能提供服务了
- 客户端可以向所有节点提交请求,或者提供负载均衡(Load Banance)
客户端负载均衡,或者提供反向代理。平衡节点之前的请求负载。
在RestClient中提供轮询机制, 与阻塞与非阻塞形两种形式的请求,具体可以参考
RestClient
源码下面节选阻塞(同步)请求的部分代码
/** * Sends a request to the Elasticsearch cluster that the client points to. * 向客户端配置的集群发送请求 * * Blocks until the request is completed and returns its response or fails * by throwing an exception. * 阻塞式的,直到请求响应或者失败并抛出异常 * * Selects a host out of the provided ones in a * round-robin fashion. Failing hosts are marked dead and retried after a * certain amount of time (minimum 1 minute, maximum 30 minutes), depending * on how many times they previously failed (the more failures, the later * they will be retried). * 通过轮询向配置的集群地址发送请求,如果失败将会标记,然后在一定时间内重试(1-30分钟不等) * 时间的长短具体取决于失败的次数(失败越多时间越长) * * In case of failures all of the alive nodes (or * dead nodes that deserve a retry) are retried until one responds or none * of them does, in which case an {@link IOException} will be thrown. * 如果发生故障,将重试所有节点(包括标记失败的节点),当所有节点都失败时抛出 IOException * * This method works by performing an asynchronous call and waiting * for the result. If the asynchronous call throws an exception we wrap * it and rethrow it so that the stack trace attached to the exception * contains the call site. While we attempt to preserve the original * exception this isn't always possible and likely haven't covered all of * the cases. You can get the original exception from {@link Exception#getCause()}. * 方法通过异步调用并等待结果,如果发生异常则会对异常进行包装,以便追踪 * 原始异常信息可以公国getCause()获取 */ private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple, final InternalRequest request, Exception previousException) throws IOException { // next()获取下一个节点,并构建上下文 RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); HttpResponse httpResponse; try { // 发送请求 httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); } catch(Exception e) { // 如果发生异常记录日志 RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); // 标记失败的请求的节点 onFailure(context.node); Exception cause = extractAndWrapCause(e); addSuppressedException(previousException, cause); // 如果有可用节点,尝试重试 if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } if (cause instanceof IOException) { throw (IOException) cause; } if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); } ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { return responseOrResponseException.response; } addSuppressedException(previousException, responseOrResponseException.responseException); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, responseOrResponseException.responseException); } throw responseOrResponseException.responseException; } 复制代码
相关配置
配置文件路径位于 {es安装路径}/config/elasticsearch.yml
注意点
- discovery.zen.ping.unicast.hosts 需要保证所有节点一致
- cluster.name 保证所有节点一致
- node.name 每个节点的名称不能一样
cluster.name: elasticsearch
# 配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
node.name: "es1"
# 节点名
node.master: true
# 指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
network.host: 192.168.0.1
# 真实IP,保证绑定的IP其他节点可访问
transport.tcp.port: 9300
# 设置节点间交互的tcp端口,默认是9300。节点通过tcp协议交互
transport.tcp.compress: true
# 设置是否压缩tcp传输时的数据,默认为false,不压缩。
http.port: 9200
# 设置对外服务的http端口,默认为9200。
http.enabled: false
# 是否使用http协议对外提供服务,默认为true,开启。不使用RestApi可以关闭
discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]
# 设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。
复制代码
配置文件详解
cluster.name: elasticsearch
# 配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
node.name: "Franz Kafka"
# 节点名
node.master: true
# 指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
node.data: true
# 指定该节点是否存储索引数据,默认为true。
index.number_of_shards: 5
# 设置默认索引分片个数,默认为5片。
index.number_of_replicas: 1
# 设置默认索引副本个数,默认为1个副本。
path.conf: /path/to/conf
# 设置配置文件的存储路径,默认是es根目录下的config文件夹。
path.data: /path/to/data
# 设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
# path.data: /path/to/data1,/path/to/data2
path.work: /path/to/work
# 设置临时文件的存储路径,默认是es根目录下的work文件夹。
path.logs: /path/to/logs
# 设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins: /path/to/plugins
# 设置插件的存放路径,默认是es根目录下的plugins文件夹
bootstrap.mlockall: true
# 设置为true来锁住内存。因为当jvm开始swapping时es的效率 会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。 同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit -l unlimited`命令。
network.bind_host: 0.0.0.0
# 设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。
network.publish_host: 192.168.0.1
# 设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。
network.host: 192.168.0.1
# 这个参数是用来同时设置bind_host和publish_host上面两个参数。
transport.tcp.port: 9300
# 设置节点间交互的tcp端口,默认是9300。
transport.tcp.compress: true
# 设置是否压缩tcp传输时的数据,默认为false,不压缩。
http.port: 9200
# 设置对外服务的http端口,默认为9200。
http.max_content_length: 100mb
# 设置内容的最大容量,默认100mb
http.enabled: false
# 是否使用http协议对外提供服务,默认为true,开启。
gateway.type: local
# gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器等等
gateway.recover_after_nodes: 1
# 设置集群中N个节点启动时进行数据恢复,默认为1。
gateway.recover_after_time: 5m
# 设置初始化数据恢复进程的超时时间,默认是5分钟。
gateway.expected_nodes: 2
# 设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。
cluster.routing.allocation.node_initial_primaries_recoveries: 4
# 初始化数据恢复时,并发恢复线程的个数,默认为4。
cluster.routing.allocation.node_concurrent_recoveries: 2
# 添加删除节点或负载均衡时并发恢复线程的个数,默认为4。
indices.recovery.max_size_per_sec: 0
# 设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。
indices.recovery.concurrent_streams: 5
# 设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。
discovery.zen.minimum_master_nodes: 1
# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
discovery.zen.ping.timeout: 3s
# 设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。
discovery.zen.ping.multicast.enabled: true
# 设置是否打开多播发现节点,默认是true。
discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]
# 设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
# 以上三个为跨域支持
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
# 开启安全认证
复制代码
安全设置
在6.8.0之前的版本安全功能做为收费功能提供,之后的版本做为默认功能免费开放
ElasticSearch支持多种认证方式,并且支持安全审计日志,IP限制等
下面介绍最常用的用户密码认证,具体步骤为linux环境,windows环境可参考
具体详细可以参考官方文档的 集群安全 章节
- 进入ElasticSearch安装目录
$ cd {安装目录}
复制代码
- 编辑配置文件
$ vi config/elasticsearch.yml
复制代码
- 增加配置安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
复制代码
- 重启服务
$ ps -ef|grep elastic
$ kill xxxx
复制代码
如果启动参数配置了pid记录文件,可直接使用
$ bin/elasticsearch -d -p pid
$ kill `cat pid`
复制代码
- 设置密码 根据提示依次输入即可
$ bin/elasticsearch-setup-passwords interactive
复制代码
- 修改RestClient配置 为
HttpAsyncClients
提供CredentialsProvider
/**
* ElasticSearch配置类
* 引入对应依赖,且上下文没有配置RestClientBuilder时触发
*
* @author shadow
* @version 0.0.1
* @date 2020-01-13
* @since 0.0.1
*/
@Configuration
@ConditionalOnClass(RestClientBuilder.class)
@ConditionalOnMissingBean(RestClientBuilder.class)
public class ElasticSearchAutoConfiguration {
@Autowired
private
RestClientProperties properties;
/**
* RestClientBuilder
*
* @return {@link RestClientBuilder}
*/
@Bean
public RestClientBuilder restClientBuilder() {
List<String> urls = properties.getUris();
List<HttpHost> httpHosts = new ArrayList<>(urls.size());
for (String uri : urls) {
httpHosts.add(HttpHost.create(uri));
}
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[urls.size()]));
if (StringUtils.isNotEmpty(properties.getUsername()) && StringUtils.isNotEmpty(properties.getPassword())) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword()));
builder.setHttpClientConfigCallback(clientBuilder -> clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return builder;
}
/**
* RestClient
*
* @return {@link RestClient}
*/
@Bean
public RestClient restClient() {
return restClientBuilder().build();
}
/**
* RestHighLevelClient
*
* @return {@link RestHighLevelClient}
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(restClientBuilder());
}
}
复制代码
数据同步
数据同步分为两块,分别是 同步方案 和 中间件选型
一般都是优先写入数据库,保证数据的一致性。然后考虑通过技术方案提升性能与降低耦合
因为ElasticSearch没有事务控制,所以不会优先写入ElasticSearch
同步方案
数据同步有多种方案,基本是在性能、耦合度、时效性中做权衡考量,基本的方案有以下四种
同步双写
同字面意思,同时写入数据库与ES
- 优点
- 编码简单
- 缺点
- 性能较差
- 代码耦合度较高
- 存在双失败风险
异步双写(MQ)
对第一种性能较差的问题,增加MQ做为中间缓冲,提升性能
- 优点
- 性能提升
- 基本不会丢失数据
- 缺点
- 代码耦合度较高
- MQ带来的非时效性
异步双写(WORKER)
直接存入MQ虽然性能有所提升,但是带来的强耦合MQ代码的问题依然存在。在时效性要求不高的情况下,可以通过定时任务的方式做抽取
- 优点
- 代码松耦合
- 缺点
- 时效性较差,无法设置到秒级
- 增加数据库的压力
- 需要有明确的时间戳字段表示标识更新时间(timestamp)
- 删除的逻辑不易处理
数据库日志同步
就像微服务的CAP理论一样,CP/AP二选一,不可兼备。所以如果对性能与耦合度要求较高,时效性要求不高的情况下可以考虑通过数据库更新日志的形式来做
简单概述就是构建中间服务MONITOR-SERVER,高频率的读取数据库日志,例如Mysql的Binlog,Oracle的归档日志。或者伪装成数据库的子节点(slave-node)。这样可以监听到数据的CRUD将数据转换交由MQ,由下游SYNC-SERVER消费再写入ES
- 优点
- 无侵入,无感知
- 性能高
- 低耦合
- 缺点
- 数据库日志抽取构建繁琐
中间件选型
虽说是中间件选型,但其实没有太多的选择性。
canal
阿里系产品支持Mysql,需要测试ES版本,非Mysql抽取到es不支持。
增量更新,不支持全量。优点是采用binlog实现,支持删除,并且时效性比较好
go-mysql-elasticsearch
个人开发者开发,语言栈相对小众,且只支持Mysql。ES版本更新较快,且维护修复具有不确定性。不推荐
elasticsearch-jdbc
更新基本停止,不推荐
logstash-input-jdbc
ES官方提供,更新稳定。采用jdbc的方式抽取,没有数据库要求。同时支持全量与增量更新。安装部署方便,上手难度低。
部署简单描述
- 下载
- 编写配置文件
基础准备
# 进入目录
$ cd /{logstash目录}
# 复制配置文件
$ cp config/logstash-sample.conf config/data-async.conf
# 创建sql脚本目录
$ mkdir sql
$ touch sql/xxxx.sql
# 创建元数据目录
$ mkdir metadata
# 复制jdbc依赖
$ cp xxxx/xxx-xxx-jdbc.jar sql/xxx-xxx-jdbc.jar
# 搜索可用插件,离线安装需要下载
$ bin/logstash-plugin list
# 安装es插件
$ bin/logstash-plugin install logstash-output-elasticsearch
复制代码
将以下配置写入配置文件
input {
jdbc {
# mysql相关jdbc配置
jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx:3306/xxxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false"
jdbc_user => "xxxxx"
jdbc_password => "xxxxx"
# jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
jdbc_driver_library => "./sql/mysql-connector-java-8.0.20.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 开启分页
jdbc_paging_enabled => true
jdbc_page_size => "50000"
# mysql文件, 也可以直接写SQL语句在此处,如下:
statement => "SELECT * FROM SYS_LOG WHERE TIMESTAMP >= :sql_last_value"
# statement_filepath => "./sql/data-async.sql"
# cron表达式,默认每分钟
schedule => "* * * * *"
#type => "jdbc"
jdbc_validate_connection => true
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
#record_last_run => true
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "timestamp"
# 字段类型
tracking_column_type => "timestamp"
# 记录位置
last_run_metadata_path => "./metadata/log-last-logdate"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => true
}
}
output {
# es配置
elasticsearch {
hosts => ["http://xxx.xxx.xxx.xx:9200"]
index => "log"
document_id => "%{id}"
user => "xxxx"
password => "xxxxx"
}
# 这里输出调试,正式运行时可以注释掉
#stdout {
#codec => json_lines
#}
}
复制代码
- 启动
logstash -f confg/xxxx.yml
复制代码
- 支持的同步方式
- 增量更新
- 全量同步
- 注意点
- 不能直接删除ES数据,Logstash不支持
- 可以使用标记位做逻辑删除