用logstash同步Mysql数据到ES

wylc123 1年前 ⋅ 3070 阅读

1、使用背景

我们习惯性,将数据存储到mysql,那么怎么使用ES实现全文检索呢。Elasticsearch为我们提供了一个插件logstash来同步我们的数据库。

2、安装logsash插件

注意事项:要安装跟es版本一样的版本。

logstash下载地址:https://www.elastic.co/downloads/logstash

在windows下面解压即可使用。我这里用的都是7.2.0版本。不需要再像2.x版本需要集成logstash-jdbc-input,才能实现数据同步。

3、配置logsash实现同步

1)在根目录下新建文件夹mysqletc,用来放置同步所需要的文件和mysql驱动

文件夹内容:

2)将要同步的数据库对应的mysql驱动放到该文件夹

3)在创建的新文件夹中(mysqletc文件夹)中创建sql文件

sql文件里就是你需要同步到es中的数据的查询语句,如果是全量同步,只需要要select * from [table]即可。

如果同步多个表格可以创建多个sql文件。

比如我的:

soul.sql

SELECT * FROM soul

mto_user.sql

SELECT * FROM mto_user

注意:结尾不要加分号,我加分号同步报错了

3)logstash链接数据库和Elasticsearch的conf文件

名字随便取,我这里习惯叫mysql.conf,内容如下:

input {
     	stdin {}
     	jdbc {
 		      # mysql 数据库链接,shop为数据库名
 		      jdbc_connection_string => "jdbc:mysql://114.67.169.20:3306/myblog"
 		      # 用户名和密码
 		      jdbc_user => "root"
 		      jdbc_password => "Wylc!@#$5"
 		      # 驱动
 		      jdbc_driver_library => "C:/softs/elk/logstash-7.2.0/mysqletc/mysql-connector-java-8.0.13.jar"
 		      # 驱动类名
 		      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 		      jdbc_paging_enabled => "true"
 		      jdbc_page_size => "50000"
 		      # 执行的sql 文件路径+名称
 		      statement_filepath => "C:/softs/elk/logstash-7.2.0/mysqletc/soul.sql"
 		      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
 		      schedule => "* * * * *"
 		      # 索引类型
 		      type => "soul"
     	}
 	    jdbc {
 		      # mysql 数据库链接,shop为数据库名
 		      jdbc_connection_string => "jdbc:mysql://114.67.169.20:3306/myblog"
 		      # 用户名和密码
 		      jdbc_user => "root"
 		      jdbc_password => "Wylc!@#$5"
 		      # 驱动
 		      jdbc_driver_library => "C:/softs/elk/logstash-7.2.0/mysqletc/mysql-connector-java-8.0.13.jar"
 		      # 驱动类名
 		      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 		      jdbc_paging_enabled => "true"
 		      jdbc_page_size => "50000"
 		      # 执行的sql 文件路径+名称
 		      statement_filepath => "C:/softs/elk/logstash-7.2.0/mysqletc/mto_user.sql"
 		      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
 		      schedule => "* * * * *"
 		      # 索引类型
 		      type => "mto_user"
 	    	}
   }
   filter {
 	    json {
 	        source => "message"
 	        remove_field => ["message"]
 	    }
    }
 	output {
 		if [type]=="soul"{
 		    elasticsearch {
 		        hosts => ["localhost:9200"]
 		        index => "soul"
 		        document_id => "%{id}"
 		    }
 		}
 		if [type]=="mto_user"{
 		    elasticsearch {
 		        hosts => ["localhost:9200"]
 		        index => "mto_user"
 		        document_id => "%{id}"
 		    }
	 	}
     	stdout {
       		codec => json_lines
        }
    }

注意:创建的所有文本文件一定要是utf-8无BOM格式编码

 

如果需要实时更新数据:

需要在相关表中设置一个时间戳字段,数据有改动时时间戳更新,只更新时间戳大于上次同步时间的记录:

ALTER TABLE mix_data CHANGE update_time update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
UPDATE mix_data set update_time = now();

 

input {
     	stdin {}
     	jdbc {
 		      # mysql 数据库链接,shop为数据库名
 		      jdbc_connection_string => "jdbc:mysql://10.120.64.79:13306/qingbao"
 		      # 用户名和密码
 		      jdbc_user => "root"
 		      jdbc_password => "1qaz2wsx#EDC"
 		      # 驱动
 		      jdbc_driver_library => "C:/softs/elk/logstash-7.5.1/mysqletc/mysql-connector-java-8.0.28.jar"
 		      # 驱动类名
 		      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 		      jdbc_paging_enabled => "true"
 		      jdbc_page_size => "50000"
 		      # 执行的sql 文件路径+名称
 		      #statement_filepath => "C:/softs/elk/logstash-7.5.1/mysqletc/mix_data.sql"
			  statement => "SELECT * FROM mix_data WHERE update_time > :sql_last_value ORDER BY id desc"
			  tracking_column => 'update_time'
 		      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
 		      schedule => "* * * * *"
 		      # 索引类型
 		      type => "mix_data"
     	}
   }
   filter {
 	    json {
 	        source => "message"
 	        remove_field => ["message"]
 	    }
    }
 	output {
 		if [type]=="mix_data"{
 		    elasticsearch {
 		        hosts => ["localhost:9200"]
				user => elastic
				password => "root123456"
 		        index => "mix_data"
 		        document_id => "%{mix_data_id}"
 		    }
 		}
     	stdout {
       		codec => json_lines
        }
    }

完成了上面的步骤,logstash的配置也就完成了。

4、启动logstash开始同步数据库

第一步:运行elasticsearch.bat文件,打开elasticsearch,运行成功可以看到:

第二步:到bin目录下创建一个批处理文件run_default.bat,内容为:

logstash -f ../mysqletc/mysql.conf

第三步:双击run_default.bat,其中logstash -f 表示运行指令, ../mysqlec/mysql.conf表示我们配置的mysql.conf文件路径,成功启动后,可以在终端中看见运行的sql和同步的数据

5、测试是否同步成功

同步完成后,我们可以去head插件里,查看es中是否存在我们刚才创建的索引库:

在基本查询中,可以看见我们同步的数据

其中:timestamp和version是elastisearch自己添加的字段。

相关文章:怎么安装使用elasticsearch-head插件

input {
     	stdin {}
     	jdbc {
 		      # mysql 数据库链接,shop为数据库名
 		      jdbc_connection_string => "jdbc:mysql://114.67.169.20:3306/myblog"
 		      # 用户名和密码
 		      jdbc_user => "root"
 		      jdbc_password => "xxxxxx"
 		      # 驱动
 		      jdbc_driver_library => "C:/softs/elk/logstash-7.2.0/mysqletc/mysql-connector-java-8.0.13.jar"
 		      # 驱动类名
 		      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 		      jdbc_paging_enabled => "true"
 		      jdbc_page_size => "50000"
 		      # 执行的sql 文件路径+名称
 		      statement_filepath => "C:/softs/elk/logstash-7.2.0/mysqletc/soul.sql"
 		      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
 		      schedule => "* * * * *"
 		      # 索引类型
 		      type => "soul"
     	}
 	    jdbc {
 		      # mysql 数据库链接,shop为数据库名
 		      jdbc_connection_string => "jdbc:mysql://114.67.169.20:3306/myblog"
 		      # 用户名和密码
 		      jdbc_user => "root"
 		      jdbc_password => "xxxxxx"
 		      # 驱动
 		      jdbc_driver_library => "C:/softs/elk/logstash-7.2.0/mysqletc/mysql-connector-java-8.0.13.jar"
 		      # 驱动类名
 		      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 		      jdbc_paging_enabled => "true"
 		      jdbc_page_size => "50000"
 		      # 执行的sql 文件路径+名称
 		      statement_filepath => "C:/softs/elk/logstash-7.2.0/mysqletc/mto_user.sql"
 		      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
 		      schedule => "* * * * *"
 		      # 索引类型
 		      type => "mto_user"
 	    	}
   }
   filter {
 	    json {
 	        source => "message"
 	        remove_field => ["message"]
 	    }
    }
 	output {
 		if [type]=="soul"{
 		    elasticsearch {
 		        hosts => ["localhost:9200"]
 		        index => "soul"
 		        document_id => "%{id}"
 		    }
 		}
 		if [type]=="mto_user"{
 		    elasticsearch {
 		        hosts => ["localhost:9200"]
 		        index => "mto_user"
 		        document_id => "%{id}"
 		    }
	 	}
     	stdout {
       		codec => json_lines
        }
    }
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: