前言

上一篇了解了ES的初步使用,本篇主要介绍如何将其它数据源的数据同步到ES中,会重点讨论通过Logstash将Mysql导入到ES中。

先来了解下Logstash吧

Logstash是Elastic Stack的一部分,一个开源的数据收集引擎,具有强大的数据收集、处理、传输能力。支持丰富的输入输出插件,可以使用Jdbc插件完成将Mysql数据迁移到ES中。

Logstash支持的输入插件可参考>


安装

1、进入下载地址进行下载,选择对应平台,点击下载

下载地址传送门>
如果想用Docker进行安装可以参考>

2、下载的文件解压缩到对应目录path下,进入logstash目录下,该目录便是 $LOGSTASH_HOME。

本篇示例版本:logstash-8.6.2

3、进入 $LOGSTASH_HOME 目录,可以看到几个文件夹,包括bin、log、config、data等

具体的目录文件说明可参考>

4、启动时需要指定配置文件,配置文件进行输入输出插件的配置,启动后进行相应的数据传输。

导入数据

流程如下图所示:
image-1684421658475

初始化表数据

1、定义表结构

CREATE TABLE `user` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `name` varchar(30) DEFAULT NULL COMMENT '姓名',
  `age` int DEFAULT NULL COMMENT '年龄'
  PRIMARY KEY (`id`),
  KEY `idx_name` (`name`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4  COMMENT='用户表';

2、插入初始化数据

insert into user(id, `name`, age) 
values(1, 'jack', 18),
(2, 'rock', 20), 
(3, 'hock', 21);

配置文件

首先需要配置输入输出插件,配置文件可放于config目录下

input {
  jdbc {
    jdbc_driver_library => "~/Documents/dev/es/logstash-8.6.2/bin/mysql-connector-java-8.0.20.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => 
    jdbc_paging_enabled => "true" #是否进行分页
    jdbc_page_size => "2"
    tracking_column => "id"
    use_column_value => true
    clean_run => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT id,name,age FROM user id WHERE id > :sql_last_value"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => " 10 * * * * *"
  }
}
output {
  elasticsearch {
    document_id => "%{id}"
    # document_type => ""
    index => "userindex2"
    hosts => ["http://localhost:9200"]
  }
  stdout{
    codec => json_lines
  }
}

主要包括input和output插件配置。

1、input jdbc相关属性说明

  • jdbc_driver_library:指定jdbc驱动包路径
  • jdbc_driver_class:定义数据库驱动类
  • jdbc_connection_string:配置数据库的连接
  • jdbc_user:配置数据库的用户名
  • jdbc_password:配置数据库的密码
  • jdbc_paging_enabled:配置是否进行分页,如果配置分页,则会先进行count查询,然后按分页大小进行获取数据,默认false
  • jdbc_page_size:分页大小,配合jdbc_paging_enabled使用,默认值100000
  • tracking_column:用于配置追踪的字段,设置后配合:sql_last_value,每次查询可以从该位置查询
  • tracking_column_type:追踪字段的类型,目前仅支持类型支持两种:numeric(默认)、timestamp
  • use_column_value:如果设置成true,则使用tracking_column的值作为:sql_last_value;如果设置成false,:sql_last_value则使用最近一次查询的时间,默认false
  • clean_run:上一次的执行状态是否被保留,如果设置成true则不会被保留,当use_column_value是true,那么初始时 :sql_last_value则是1970-01-01或者0;如果是false,则上次执行状态保留在属性last_run_metadata_path设置的文件中,默认false
  • statement:配置查询sql
  • schedule:设置监听间隔,多久进行一次同步,符合cron表达式

更多属性说明参考>

2、output相关属性说明
elasticsearch相关属性:

  • document_id:es中存储的文档id
  • index:es中存储的索引名称(注意es中索引名称字母的话必须小写)
  • hosts:配置es的域名和端口

stdout相关属性:
用于配置标准输出,方便进行Debug,数据进行处理和过滤时可以打印日志,codec包括两种,rubydebug和json

执行

配置完成执行,进入 $LOGSTASH_HOME 目录
执行:./bin/logstash -f config/my-demo-logstash.conf

-- 部分日志如下:

[2023-05-20T09:51:10,717][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.060362s) SELECT version()
[2023-05-20T09:51:10,738][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.000747s) SELECT version()
[2023-05-20T09:51:10,763][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.010282s) SELECT count(*) AS `count` FROM (SELECT id,name,age FROM user id WHERE id > 0) AS `t1` LIMIT 1
[2023-05-20T09:51:10,770][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.000835s) SELECT * FROM (SELECT id,name,age FROM user id WHERE id > 0) AS `t1` LIMIT 2 OFFSET 0
[2023-05-20T09:51:10,782][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.000938s) SELECT * FROM (SELECT id,name,age FROM user id WHERE id > 0) AS `t1` LIMIT 2 OFFSET 2
/Users/xudj/Documents/dev/es/logstash-8.6.2/vendor/bundle/jruby/2.6.0/gems/manticore-0.9.1-java/lib/manticore/client.rb:284: warning: already initialized constant Manticore::Client::HttpPost
/Users/xudj/Documents/dev/es/logstash-8.6.2/vendor/bundle/jruby/2.6.0/gems/manticore-0.9.1-java/lib/manticore/client.rb:284: warning: already initialized constant Manticore::Client::HttpPost
{"@timestamp":"2023-05-20T01:51:10.780010Z","age":20,"@version":"1","name":"rock","id":2}
{"@timestamp":"2023-05-20T01:51:10.784010Z","age":21,"@version":"1","name":"hock","id":3}
{"@timestamp":"2023-05-20T01:51:10.778956Z","age":18,"@version":"1","name":"jack","id":1}

配置了调度任务schedule

schedule => " 10 * * * * *" 表示每分钟的第10秒进行任务调度
如果没有配置schedule,初始化处理完成则结束。

[2023-05-20T09:52:10,620][INFO ][logstash.inputs.jdbc     ][main][a5d1e2414385afacea929789902493118eab9c05a565554e75462e93425330a0] (0.007535s) SELECT count(*) AS `count` FROM (SELECT id,name,age FROM user id WHERE id > 3) AS `t1` LIMIT 1

执行完成后,通过es查看输入的索引数据:
GET:http://127.0.0.1:9200/userindex2/_search
能查到数据则表明数据传输成功。


问题

问题1 报错未知命令

执行Logstash 命令时没有使用 -f 参数指定配置文件,Logstash 会认为您指定的配置文件路径是要进行的操作或输入,从而报错未知命令。

解决:
执行命令加上 -f 指定配置文件即可。

问题2 报错ClassCastException

[ERROR][logstash.inputs.jdbc     ][main] 
java.sql.SQLException: java.lang.ClassCastException: class java.math.BigInteger cannot be cast to class java.lang.Long (java.math.BigInteger and java.lang.Long are in module java.base of loader 'bootstrap')

低版本不兼容导致。

解决:
jbdc驱动包升级到8 版本(mysql-connector-java-8.0.20.jar)

问题3 使用分页报错

配置了
jdbc_paging_enabled => “true” #是否进行分页
同时执行了分页大小。

在测试tracking_column为时间时,报错:

Exception when executing JDBC query {:exception=>Sequel::DatabaseError, :message=>"Java::JavaSql::SQLException: [161ca41c0b002000]
[10.40.26.240:3015][pbc_price_pre]syntax error, error in :'where create_time > '1970-01-01 00:0', expect ), actual ;, pos 415, line 3, column 86, token ;", 
:cause=>"#<Java::JavaSql::SQLException: [161ca41c0b002000][10.40.26.240:3015][pbc_price_pre]syntax error, error in :'where create_time > '1970-01-01 00:0', expect ), actual ;, pos 415, line 3, column 86, token ;>"}

根据:SELECT count(*) AS count FROM (SELECT * FROM table id WHERE id > 0;) AS t1 LIMIT 1;得知,statement配置sql语句时多了个;符号,所以查询数量时报错。

解决:
1、去掉sql最后的英文分号
2、不使用分页配置。

问题4 tracking_column配置失败

tracking_column not found in dataset. {:tracking_column=>"create_time"}

如果sql语句中重新定义了column,tracking_column配置的字段不认识原sql的字段名。

解决:
tracking_column需要填写定义的列名称

另外,logstash默认会把列名转小写,需要配置lowercase_column_names => “false” 方可支持保留大写。

问题5 索引名称错误

"error"=>{"type"=>"invalid_index_name_exception", "reason"=>"Invalid index name [userIndex2], must be lowercase"
output.elasticsearch.index 

解决:
索引名称不支持大写字母,需要改成小写。


总结

Logstash支持了非常多的输入输出插件,进行配置即可。

Jdbc输入插件将数据库数据导入ES,目前是通过主动调度任务进行配置,以一种主动拉取的方式进行的,最快支持每秒拉取(调度配置:* * … 或 */1 *…)。
如果不配置调度任务,则初始化完成变会结束,不进行增量的同步。

如果想通过数据库binlog做到实时同步,可以参考中间件Canal、一些云产品DTS等。