1.系统环境
Windows 10,elasticsearch-7.2.0 ,logstash-7.2.0,kibana-7.2.0-windows-x86_64,jdk 11, nodejs 12..4.0
2. 安装配置Elasticsearch
2.1 Elasticsearch下载
历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch
2.2 配置启动
1)config目录修改配置文件elasticsearch.yml
network.host: 192.168.0.1 #修改可以访问的ip
http.port: 9200 #设置访问端口
2)bin目录,运行elasticsearch.bat启动
2.3 成功验证
启动完成后浏览器访问,http://localhost:9200或自己配置的ip:端口,返回如下:则配置启动成功。
3. 安装配置kibana
3.1 kibana简介
Kibana 是一个设计出来用于和 Elasticsearch 一起使用的开源的分析与可视化平台,可以用 kibana 搜索、查看、交互存放在Elasticsearch 索引里的数据,使用各种不同的图表、表格、地图等展示高级数据分析与可视化,基于浏览器的接口使你能快速创建和分享实时展现Elasticsearch查询变化的动态仪表盘,让大量数据变得简单,容易理解。
3.2 kibana下载
历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#kibana
3.3 安装条件
- 保证安装了JDK
- 保证安装node
- 保证安装了Elasticsearch,Elasticsearch版本要与kibana相同
3.4 配置启动
1)config目录修改配置文件kibana.yml
server.port: 5601 #kibana访问端口
server.host: "10.140.128.220" #kibana访问地址
elasticsearch.hosts: ["http://localhost:9200"] #配置elasticsearch访问地址
elasticsearch.requestTimeout: 90000 #如果有超时报错可以将这个配置改大
i18n.locale: "zh-CN" #界面国际化配置
2)bin目录,运行kibana.bat启动
3.5 成功验证
启动完成后浏览器访问,http://localhost:5601或自己配置的ip:端口,返回如下:
3.6 使用NSSM将Kibana安装为Windows服务
1.下载NSSM:http://www.nssm.cc/download,解压后目录结构如下:
2.命令行执行安装操作: nssm install kibana
会弹出下面的窗口
3.关联Kibana的批处理的启动文件
4.启动刚刚安装好的Kibana服务: nssm start kibana
4. 安装配置Logstash
4.1 下载Logstash
历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash
4.2 Logstash简介
Logstash 是开源的服务器端数据处理管道, 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构。
4.3 Logstash原理
Logstah处理事件有三个阶段:input、filter、output。
1. input
产生事件,常见的input有file、beats。
2. filter
结合条件语句对符合标准的事件进行处理。
- grok:解析和结构化任何文本。Grok目前是logstash最好的方式对非结构化日志数据解析
成结构化和可查询化。 - mutate:在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
- drop:完全丢弃事件,如debug事件
- clone:复制事件,可能添加或者删除字段。
- geoip:添加有关IP地理位置信息。
3. output
output是logstash管道的最后一个阶段,一个事件可以经过多个output。但是一旦所有输出
处理完,该事件已经执行完。常用的output有elasticsearch、file。
4.4 配置启动
一定要在logstash的/bin目录下,新建一个配置文件logstash_default.conf。配置文件可以参考logstash/config目录下的logstash-sample.conf。
配置文件格式:
input {
beats {
port => 5044
}
}
filter {
if "java-logs" in [tags]{
grok{
#筛选过滤
match => {
"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
}
remove_field => ["message"]
}
#不匹配正则则删除,匹配正则则用=~
if [level] !~ "(ERROR|WARN|INFO)" {
# 删除日志
drop {}
}
}
}
output {
elasticsearch {
hosts => "localhost:9200"
}
}
logstash配置文件详解
1. Input
input {
path => "D:\input\a\*.log"
exclude => "*.gz"
start_position => "beginning"
ignore_older => 0
sincedb_path => "D:\input\record"
add_field => {"test"="test"}
type => "apache-log"
}
path:数组类型,读取文件的路径。
exclude:数组类型,排除不想监听的文件规则。
sincedb_path:字符串类型,记录sincedb文件路径,logstash会通过sincedb文件来跟踪记录每个文件中的当前位置,停止和重新启动logstash,logstash会从记录位置继续读取文件。
start_position: 字符串类型,可以配置为beginning/end,是否从头读取文件
stat_interval: 数值类型,定时检查文件是否有更新,默认是1秒
discover_interval: 数值类型,定时检查是否有新文件待读取,默认是15秒
ignore_older: 数值类型,扫描文件列表时,如果该文件上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭
close_older: 数值类型,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600秒,即1小时
add_field: 增加一个字段
type: 表明导入的日志类型
2. Codec
Codec作用于Input和Output,负责将数据在原始与Logstash Event之间转换,常见的codec有:
- plain: 读取原始内容
- dots: 将内容简化为点进行输出
- rubydebug: 将Logstash Events按照ruby格式输出,方便调试
- line: 处理带有换行符的内容
- json: 处理json格式的内容
- multiline: 处理多行数据的内容,当一个事件的message由多行组成时,需要使用multiline,常见的情况是处理日志信息。
input {
file {
path => "D:\input\a\*.log"
start_position => "beginning"
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
pattern: 设置行匹配的正则表达式,可以使用grok
what: 可设置为previous或next。表示当与pattern匹配成功的时候,匹配行是归属上一个事件还是下一个事件
negate: 布尔值,表示是否对pattern的结果取反
3. Filter
Filter可以对Logstash事件进行丰富的处理,比如解析数据、删除字段、类型转换等等
启动logstash
logstash -f logstash.conf
4.5 成功验证
启动成功如图所示。默认端口为9600。在浏览器中输入localhost:9600即可查看效果。
注意:本文归作者所有,未经作者允许,不得转载