ELK实时日志分析

star2017 1年前 ⋅ 321 阅读

前言

线上系统有没有报错,如果你不看日志是不知道的。但是一般你线上的应用会很多,如果你每个应用看过来,那也是不太合理的,而且这个也只能开发或者运维来看,对于产品、前端开发等都是比较困难的,那些日志结构也是比较难看懂。还有就是他的延迟会比较高,一般都是用户反馈了,我们才会去看报错日志,这样对我们都很被动。这个是比较传统的做法。

那么有没有办法能实时对日志进行分析,发现错误立刻发出通知呢?
答案是肯定的,这里介绍ELK来实时分析日志,那么什么是ELK,它是Elasticsearch+Logstash+Kibana 这三个的缩写。
Elasticsearch:提供存储、快速搜索功能;
Logstash:日志收集,并传输到Elasticsearch里;
Kibana:在Elasticsearch中的数据进行可视化;

Elasticsearc安装及配置

1、安装

linux安装
1)下载地址

curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz

2)解压

tar -xvf elasticsearch-6.5.4.tar.gz

windows安装
1)下载https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.msi
2)安装
根据提示一步步进行安装就可以了。

2、启动

linux

cd elasticsearch-6.5.4/bin
./elasticsearch

windows

cd elasticsearch-6.5.4/bin
./elasticsearch.bat

Logstash安装

1、安装

windows安装
https://www.elastic.co/cn/downloads/logstash
下载你需要的包

linux安装
1)创建repo
在/etc/yum.repos.d/目录下,创建logstash.repo,文件内容如下

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

2)安装

sudo yum install logstash

2、配置

在config文件夹下,新建my-logstash.conf文件

input {
  tcp {
    mode => "server"
    host => "127.0.0.1"
    port => 4000
	tags => ["es_tags"]
	codec => "json_lines"
  }
}

output {
  elasticsearch {
    action => "index"
    hosts  => "localhost:9200"
    index  => "applog-%{YYYYMMDD}"
	document_type => "log"
  }
}

3、运行

windows

cd bin
logstash.bat -f ../config/my-logstash.conf

linux

cd bin
logstash -f ../config/my-logstash.conf

在logstash-6.5.4\logs下查看日志输出

注意:这里启动的时候,需要看logstash是否启动正常

[ERROR][logstash.config.sourceloader] No configuration found in the configured sources.

说明启动没有成功。是因为配置文件找不到。
No config files found in path {:path=>"E:/worksoft/elk/logstash-6.5.4/bin/my-logstash.conf"}
修改启动命令,把配置文件修改到正确的地址,启动完之后的界面是
imagepng
鼠标是一直停留在界面的,不会跳出来。

页面访问
imagepng

Kibana安装

1、安装

linux安装
1)安装

wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz
shasum -a 512 kibana-6.5.4-linux-x86_64.tar.gz
tar -xzf kibana-6.5.4-linux-x86_64.tar.gz
cd kibana-6.5.4-linux-x86_64/

2)运行

./bin/kibana

windows安装
1)解压
2)运行

.\bin\kibana.bat

2、配置

如果你的elasticsearch设置了用户名和密码,那么这里也需要设置
elasticsearch.username:登录用户名
elasticsearch.password:登录密码
server.port: 5601
server.host: "localhost"

3、访问地址

http://localhost:5601

整体设计

imagepng
没有一个设计是完美的,那么这个设计会存在什么问题呢?
1、logstash的吞吐量会成为瓶颈
2、与应用层耦合
3、数据丢失问题

基于kafka设计

imagepng

kafka启动

cd bin
windows\kafka-server-start.bat ..\config\server.properties

遇到问题

Error while fetching metadata with correlation id

解决
server.properties文件里修改这个配置
imagepng

logstash下新增kafka-logstash.conf文件

input {
  kafka {
	topics => ["test"]
	bootstrap_servers => "localhost:9092"
	type => "kafka-logs"
	consumer_threads =>5
	decorate_events => true
  }
}

output {
  elasticsearch {
    action => "index"
    hosts  => "localhost:9200"
    index  => "kafka-log-%{YYYYMMDD}"
  }
}

java代码

把日志输送到logstash里面,创建一个项目并配置到日志
pom引入

<dependency>
	<groupId>net.logstash.logback</groupId>
	<artifactId>logstash-logback-encoder</artifactId>
	<version>5.2</version>
</dependency>
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-api</artifactId>
	<version>1.7.25</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.1.0</version>
</dependency>

logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="30 second">

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date [%-5level] [%thread] %logger{80} - %msg%X%n</pattern>
        </encoder>
    </appender>
	  
	  <!--<appender name="logstash"-->
              <!--class="net.logstash.logback.appender.LogstashTcpSocketAppender">-->
        <!--&lt;!&ndash; destination:这里的这个地址要跟logstash那边相同 &ndash;&gt;-->
        <!--<destination>localhost:4000</destination>-->
        <!--<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>-->
    <!--</appender>-->
	
    <appender name="kafkaAppender"
              class="com.cimu.elk.custom.KafkaAppender">
        <topic>test</topic>
        <brokerServer>127.0.0.1:9092</brokerServer>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <!-- 简单的日志传输 -->
        <!-- <appender-ref ref="logstash"/> -->
        <!-- 基于kafka的日志传输 -->
        <appender-ref ref="kafkaAppender"/>
    </root>

</configuration>

Controller类

@Controller
@Slf4j
public class TestController {
    @GetMapping("/test")
    public String test(){
        log.info("我的测试日志");
 return "success";
  }
}

KafkaAppender类:

package com.cimu.elk.custom;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * Title: KafkaAppender * Copyright: Copyright (c) 2017 * 
  * date 2019年01月09日 12:22
 */public class KafkaAppender extends UnsynchronizedAppenderBase {
    private String topic;
 private String brokerServer;
  //kafka生产者
  private Producer, String> producer;

  @Override
  public void start() {
        super.start();
  Properties props = new Properties();
  props.put("bootstrap.servers", this.brokerServer);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  producer = new KafkaProducer<>(props);
  }

    @Override
  public void stop() {
        super.stop();
  producer.close();
  }

    @Override
  protected void append(ILoggingEvent event) {
        String msg = event.getMessage();
  //拼接消息内容
  ProducerRecord, String> producerRecord = new ProducerRecord, String>(
                this.topic, 0, "key", msg);
  System.out.println("[推送数据]:" + producerRecord);
  //发送kafka的消息
  producer.send(producerRecord, new Callback() {
            @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
                //监听发送结果
  if (exception != null) {
                    exception.printStackTrace();
  } else {
                    System.out.println("[推送数据到kafka成功]:" + metadata);
  }
            }
        });
  }

    public String getTopic() {
        return topic;
  }

    public void setTopic(String topic) {
        this.topic = topic;
  }

    public String getBrokerServer() {
        return brokerServer;
  }

    public void setBrokerServer(String brokerServer) {
        this.brokerServer = brokerServer;
  }

    public Producer, String> getProducer() {
        return producer;
  }

    public void setProducer(Producer, String> producer) {
        this.producer = producer;
  }
}

源码地址

项目启动之后,访问接口,模拟日志输出,在kibana里面配置index pattern,就可以查询日志了。
添加index pattern
imagepng
查询日志
imagepng

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: