社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

使用logstash-jdbc-input插件实现mongodb数据实时同步到elasticsearch

FannieCream • 3 年前 • 722 次点击  

一、实验介绍

logstash-jdbc-input 是Logstash提供的官方插件之一,该插件通过JDBC接口将任何数据库中的数据导入 Logstash。关于使用 logstash-jdbc-input 插件从数据库中导出数据到es上,大部分是关于mysql数据库的导入。本篇文章是关于如何使用 logstash-jdbc-input 插件对mongodb的数据进行实时导入。

二、版本说明

本实验使用的ELK版本是7.6.2。
(这里想要补充一下,关于mongodb数据库的数据导入,另外一种常使用的插件是 mongo-connector ,但该插件仅支持到elasticsearch5.x,因此对于更高版本的elasticsearch更推荐使用本篇文章使用的方法。)

三、具体实现

1. 下载相关的jdbc-driver文件并解压
  • 下载地址: https://dbschema.com/jdbc-drivers/MongoDbJdbcDriver.zip
  • 解压安装包: unzip MongoDbJdbcDriver.zip
    (安装包里面包括三个 jar 包文件: gson-2.8.6.jar mongo-java-driver-3.12.4.jar mongojdbc2.1.jar )
  • 将所有文件(即三个jar包)复制到 (~/logstash-7.6.2/logstash-core/lib/jars/) 目录(即你的logstash所在的安装目录)
2. 编写配置文件内容
  • 在你的logstash安装目录下新建一个 .conf 文件
  • 关于 .conf 配置文件主要由 input , filter , output 三大板块组成,我们依次介绍如何填写各部分的内容:
2.1 input
input {
  jdbc {
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    # jar包的目录
    jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar"
    # mongo数据库对应的uri
    jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest"
    # 这个不填  
    jdbc_user => ""
    # 这个不填
    jdbc_password => ""
    # 表示每分钟中执行一次,以实现实时同步的效果
    schedule => "* * * * *"
    # mongodb的查询语句
    statement => "db.dbtest.find({}, {_id: 0})"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 在编写mongodb查询语句时我们需要注意,由于logstash无法识别mongodb中的 ObjectId 类型,因此我们需要抛弃该字段,因此在 find 语句中我们设置 _id:0 ,即表示不需要该字段。
2.2 filter
filter {
  # 数据预处理
}
  • 1
  • 2
  • 3
  • filter部分主要是针对mongodb中的数据进行预处理,如果不需要进行预处理,这部分内容不必填写;关于filter实现预处理的部分内容比较繁多,之后会专门出一篇文章进行总结,这里不再赘述。
2.3 output
output {
 elasticsearch {
    # es所在的地址
    hosts => "localhost:9200"
    # 导入到es上对应的索引
    index => "test"
  }
  
  stdout {
    codec => json_lines
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
3. 实现数据的实时同步(全量法)
  • 全量法,即指每次将表的所有数据全部导入,这种方法可能会导致数据重复的问题,因为每次同步时都会将之前已经导入的数据再导入一遍,为避免数据重复的问题,我们需要对每条数据进行标识,这样在每次同步时es中若已出现相同标识的数据则会选择覆盖,以此实现数据实时同步的效果。
  • 实现数据标识效果,即在 output 部分指定 document_id 即可
output {
 elasticsearch {
    # es所在的地址
    hosts => "localhost:9200"
    # 导入到es上对应的索引
    index => "test"
    # 指定标识每条数据的字段
    document_id => "%{id}"
  }
  
  stdout {
    codec => json_lines
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 这里需要注意的是,我们无法使用mongodb自动生成的id作为标识符,因为id是 ObjectId 类型,在 input 阶段我们已经把该字段删去了,因此这里应该选择表中其他能标识数据且不是 ObjectId 类型的字段(string, int等皆可)
4. 实现数据的实时同步(增量法)
  • 若在你的数据中除了mongodb自动生成的 id 不再有其它具有标识性质的字段,可以考虑使用 增量法 实现数据的实时同步。增量法,即每次同步时是从上一次执行命令的时间开始,将插入时间在上一次命令之后的数据导入es中。增量法的优点是不必每次将全部数据导入,而是只导入新加入到数据库的数据,可以减小每次同步时的压力。
  • 使用增量法实现数据同步,需要修改 input 部分的代码
input {
  jdbc {
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    # jar包的目录
    jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar"
    # mongo数据库对应的uri
    jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest"
    # 这个不填  
    jdbc_user => ""
    # 这个不填
    jdbc_password => ""
    # 表示每分钟中执行一次,以实现实时同步的效果
    schedule => "* * * * *"
    # 实现增量同步的mongodb的查询语句
    statement => "db.dbtest.find({ $gte: ISODate(:sql_last_value) }, {_id: 0})"
    # 保存上一次执行时间的文件
    last_run_metadata_path => "/logstash-7.6.2/.logstash_jdbc_last_run"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 实现增量同步主要是两个字段:
    • statement: 执行mongodb查询的字段
      • 关于 :sql_last_value :logstash中提供的一个协助查询的时间参数,默认值是 1970-01-01 08:00:00 ,数据类型是 string ,每次执行命令之后,该值会替换成执行命令时刻的时间。
      • 在修改 find 语句时容易因为 :sql_last_value 的类型出错:如果表中关于时间的数据类型是 string ,那在 find 语句中改为 db.dbtest.find({ $gte: :sql_last_value}, {_id: 0}) 即可;若如果表中关于时间的数据类型是 date ,那在 find 语句需要进行类型转换,即改为· db.dbtest.find({ $gte: ISODate(:sql_last_value)}, {_id: 0})
    • last_run_metadata_path: 保存上一次执行时间的文件,可以放在任意目录下,我这里放在了 /logstash-7.6.2 的目录下面
5. 运行文件
/logstash-7.6.2/bin/logstash -f /logstash-7.6.2/dbtest.conf --path.data=/logstash-7.6.2/data/dbtest
  • 1

四、可能出现的报错

1. 无法识别ObjectId错误
  • 报错信息: Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.bson.types.ObjectId, simple name=ObjectId>}

  • 错误原因:在 input 部分编写mongodb查询语句时需要注意,由于logstash无法识别mongodb中的 ObjectId 类型,因此我们需要抛弃该字段,因此在 find 语句中我们设置 _id:0 ,即表示不需要该字段。

db.dbtest.find({}, {_id: 0})
  • 1

【tips】通过mongodb的 find 语句我们还可以选取只导出文档中的某一字段,具体操作可参考官方文档: https://docs.mongodb.com/manual/reference/method/db.collection.find/

2. 增量同步无效但是没有报错信息
  • 这一问题的原因在上面的增量法部分中也有提到过,可能是 find 语句中 :sql_last_value 或者其他字段的数据类型不正确,建议检查一下数据库中字段类型和 find 语句中的查询条件是否匹配

参考文章

  1. https://stackoverflow.com/questions/58342818/sync-mongodb-to-elasticsearch
  2. https://docs.mongodb.com/manual/reference/method/db.collection.find/
  3. https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/72404
 
722 次点击