当前位置: 首页 > news >正文

ELK日志平台(elasticsearch +logstash+kibana)原理和实操(史上最全)

文章很长,建议收藏起来慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 :

  • 免费赠送 :《尼恩Java面试宝典》持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备

  • 免费赠送 经典图书:《Java高并发核心编程(卷1)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《Java高并发核心编程(卷2)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《Netty Zookeeper Redis 高并发实战》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《SpringCloud Nginx高并发核心编程》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取

推荐:尼恩Java面试宝典(持续更新 + 史上最全 + 面试必备)具体详情,请点击此链接

尼恩Java面试宝典,32个最新pdf,含2000多页不断更新、持续迭代 具体详情,请点击此链接

在这里插入图片描述


ELK的高并发场景的问题

elk能支撑50W到100W级qps场景的 大流量日志监控吗?
具体的架构如下:

在这里插入图片描述

**答案,当然没法撑住。**解决方案,稍后介绍。

但是,咱们先得把ELK 的原理搞清楚,知己才能知彼

Spring Boot整合ELK+Filebeat构建日志系统

ELK指的是Elastic公司下面Elasticsearch、Logstash、Kibana三大开源框架首字母大写简称。
Elasticsearch、Logstash、Kibana三大开源框架首字母大写简称。

ELK的关系

  1. Logstash可以将数据收集起来统一刷入Elasticsearch中
  2. 数据在Elasticsearch中进行分析然后通过kibana进行展示

在这里插入图片描述

Logstash:从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作,然后存储到 ES。

Elasticsearch:对大容量的数据进行接近实时的存储、搜索和分析操作。

Kibana:数据分析和可视化平台。与 Elasticsearch 配合使用,对数据进行搜索、分析和以统计图表的方式展示。

ELK的应用

  • 异常分析

通过将应用的日志内容通过Logstash输入到Elasticsearch中来实现对程序异常的分析排查

  • 业务分析

将消息的通讯结果通过Logstash输入到Elasticsearch中来实现对业务效果的整理

  • 系统分析

将处理内容的延迟作为数据输入到Elasticsearch 中来实现对应用性能的调优

Elasticsearch概述

Elasticsearch 是一个分布式的开源搜索和分析引擎,在 Apache Lucene 的基础上开发而成。

Lucene 是开源的搜索引擎工具包,Elasticsearch 充分利用Lucene,并对其进行了扩展,使存储、索引、搜索都变得更快、更容易, 而最重要的是, 正如名字中的“ elastic ”所示, 一切都是灵活、有弹性的。而且,应用代码也不是必须用Java 书写才可以和Elasticsearc兼容,完全可以通过JSON 格式的HTTP 请求来进行索引、搜索和管理Elasticsearch 集群。

如果你已经听说过Lucene ,那么可能你也听说了Solr,

Solr也是开源的基于Lucene 的分布式搜索引擎,跟Elasticsearch有很多相似之处。

img

但是Solr 诞生于2004 年,而Elasticsearch诞生于2010,Elasticsearch凭借后发优势和更活跃的社区、更完备的生态系统,迅速反超Solr,成为搜索市场的第二代霸主。

Elasticsearch具有以下优势:

  • Elasticsearch 很快。 由于 Elasticsearch 是在 Lucene 基础上构建而成的,所以在全文本搜索方面表现十分出色。Elasticsearch 同时还是一个近实时的搜索平台,这意味着从文档索引操作到文档变为可搜索状态之间的延时很短,一般只有一秒。因此,Elasticsearch 非常适用于对时间有严苛要求的用例,例如安全分析和基础设施监测。
  • Elasticsearch 具有分布式的本质特征。 Elasticsearch 中存储的文档分布在不同的容器中,这些容器称为分片,可以进行复制以提供数据冗余副本,以防发生硬件故障。Elasticsearch 的分布式特性使得它可以扩展至数百台(甚至数千台)服务器,并处理 PB 量级的数据。
  • Elasticsearch 包含一系列广泛的功能。 除了速度、可扩展性和弹性等优势以外,Elasticsearch 还有大量强大的内置功能(例如数据汇总和索引生命周期管理),可以方便用户更加高效地存储和搜索数据。
  • Elastic Stack 简化了数据采集、可视化和报告过程。 人们通常将 Elastic Stack 称为 ELK Stack(代指ElasticsearchLogstashKibana),目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理,这些代理统称为 Beats,可用来向 Elasticsearch 发送数据。通过与 Beats 和 Logstash 进行集成,用户能够在向 Elasticsearch 中索引数据之前轻松地处理数据。同时,Kibana 不仅可针对 Elasticsearch 数据提供实时可视化,同时还提供 UI 以便用户快速访问应用程序性能监测 (APM)、日志和基础设施指标等数据。

logstash概述

简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。


logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用

在这里插入图片描述

logstash作用:

集中、转换和存储你的数据,是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到你最喜欢的“存储

logstash的架构:

logstash的基本流程架构:input  |  filter  |  output 如需对数据进行额外处理,filter可省略。

在这里插入图片描述

Input(输入):

采集各种样式,大小和相关来源数据,从各个服务器中收集数据。

数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。
能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

在这里插入图片描述

inpust:必须,负责产生事件(Inputs generate events),

常用:File、syslog、redis、beats(如:Filebeats)

Filter(过滤器)

用于在将event通过output发出之前,对其实现某些处理功能。

filters:可选,负责数据处理与转换(filters modify them),

常用:grok、mutate、drop、clone、geoip


grok:用于分析结构化文本数据。


Output(输出):

将我们过滤出的数据保存到那些数据库和相关存储中。
在这里插入图片描述

outputs:必须,负责数据输出(outputs ship them elsewhere),

常用:elasticsearch、file、graphite、statsd

在这里插入图片描述
在这里插入图片描述

Logstash的角色与不足

早期的ELK架构中使用Logstash收集、解析日志,

但是:Logstash对内存、cpu、io等资源消耗比较高。

相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。

所以,在收集这块,一般使用filebeat 代替 Logstash

filebeat介绍

Filebeat是一个轻量级日志传输Agent,可以将指定日志转发到Logstash、Elasticsearch、Kafka、Redis等中。

Filebeat占用资源少,而且安装配置也比较简单,支持目前各类主流OS及Docker平台。

Filebeat是用于转发和集中日志数据的轻量级传送程序。

作为服务器上的代理安装,Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或Logstash进行索引。

Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。

对于Filebeat所找到的每个日志,Filebeat都会启动收割机。

每个收割机都读取一个日志以获取新内容,并将新日志数据发送到libbeat,libbeat会汇总事件并将汇总的数据发送到您为Filebeat配置的输出。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9xi1idFV-1601197715419)(https://www.elastic.co/guide/en/beats/filebeat/6.5/images/filebeat.png#pic_center)]

Filebeat下载页面

https://www.elastic.co/cn/downloads/past-releases#filebeat

Filebeat文件夹结构

描述
filebeat用于启动filebeat的二进制文件
data持久化数据文件的位置
logsFilebeat创建的日志的位置
modules.d简化filebeat配置的模板文件夹,如nginx/kafka等日志收集模板
filebeat.ymlfilebeat配置文件
Filebeat启动命令
./filebeat -e -c filebeat配置文件

filebeat和beats的关系

filebeat是Beats中的一员。

Beats在是一个轻量级日志采集器,其实Beats家族有6个成员,目前Beats包含六种工具:

  • Packetbeat:网络数据(收集网络流量数据)
  • Metricbeat:指标(收集系统、进程和文件系统级别的CPU和内存使用情况等数据)
  • Filebeat:日志文件(收集文件数据)
  • Winlogbeat:windows事件日志(收集Windows事件日志数据)
  • Auditbeat:审计数据(收集审计日志)
  • Heartbeat:运行时间监控(收集系统运行时的数据)

一键安装 es+logstash+ kibana

对应的镜像版本

elasticsearch:7.14.0
kibana:7.14.0
logstash:7.14.0
filebeat:7.14.0

docker编码文件

version: "3.5"
services:
  elasticsearch:
     image: andylsr/elasticsearch-with-ik-icu:7.14.0
     container_name: elasticsearch
     hostname: elasticsearch
     restart: always
     ports:
       - 9200:9200
     volumes:
       - ./elasticsearch7/logs:/usr/share/elasticsearch/logs
       - ./elasticsearch7/data:/usr/share/elasticsearch/data
       - ./elasticsearch7/config/single-node.yml:/usr/share/elasticsearch/config/elasticsearch.yml
       - ./elasticsearch7/config/jvm.options:/usr/share/elasticsearch/config/jvm.options
       - ./elasticsearch7/config/log4j2.properties:/usr/share/elasticsearch/config/log4j2.properties
     environment:
       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
       - "TZ=Asia/Shanghai"
       - "TAKE_FILE_OWNERSHIP=true"   #volumes 挂载权限 如果不想要挂载es文件改配置可以删除
     ulimits:
       memlock:
         soft: -1
         hard: -1
     networks:
       base-env-network:
         aliases:
          - elasticsearch
  kibana:
    image: docker.elastic.co/kibana/kibana:7.14.0
    container_name: kibana
    volumes:
      - ./elasticsearch7/config/kibana.yml:/usr/share/kibana/config/kibana.yml
    ports:
      - 15601:5601
    ulimits:
      nproc: 65535
      memlock: -1
    depends_on:
       - elasticsearch
    networks:                    
       base-env-network:
         aliases:
          - kibana
  logstash:
    image:  logstash:7.14.0
    container_name: logstash
    hostname: logstash
    restart: always
    ports:
      - 19600:9600
      - 15044:5044
    volumes:
      - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:rw
      - ./logstash/logstash.yml:/usr/share/logstash/config/logstash.yml
      - ./logstash/data:/home/logstash/data
    networks:
       base-env-network:
         aliases:
          - logstash
# docker network create base-env-network          
networks:
  base-env-network:
    external:
      name: "base-env-network"
    

访问kibana

http://cdh1:15601

SkyWalking

http://cdh2:13800/

kibana

在这里插入图片描述

读取filebeat-输出到es集群

在分布式系统中,一台主机可能有多个应用,应用将日志输出到主机的指定目录,这时由logstash来搬运日志并解析日志,然后输出到elasticsearch上。

由于于logstash是java应用,解析日志是非的消耗cpu和内存,logstash安装在应用部署的机器上显得非常的笨重。

最常见的做法是用filebeat部署在应用的机器上,logstash单独部署,然后由filebeat将日志输出给logstash解析,解析完由logstash再传给elasticsearch。

在上面的配置中,输入数据源为filebeat,输出源为elasticsearch。

修改logstash的安装目录的config目录下的logstash.conf文件,配置如下:

input {
  beats {
    port => "5044"
  }
}


filter {
    
    if "message-dispatcher" in [tags]{
        grok {
            match => ["message", "%{TIMESTAMP_ISO8601:time}\s* \s*%{NOTSPACE:thread-id}\s* \s*%{LOGLEVEL:level}\s* \s*%{JAVACLASS:class}\s* \- \s*%{JAVALOGMESSAGE:logmessage}\s*"]
        }
        
    }

    if "ExampleApplication" in [tags]{
        grok {
            match => ["message", "%{TIMESTAMP_ISO8601:time}\s* \s*%{NOTSPACE:thread-id}\s* \s*%{LOGLEVEL:level}\s* \s*%{JAVACLASS:class}\s* \- \s*%{JAVALOGMESSAGE:logmessage}\s*"]
        }
        
    }
    mutate {
        remove_field => "log"
        remove_field => "beat"
        remove_field => "meta"
        remove_field => "prospector"
        remove_field => "[host][os]"
    }
}


output {
    stdout { codec => rubydebug }
    if "message-dispatcher" in [tags]{
        elasticsearch {
           hosts => [ "elasticsearch:9200" ]
           index => "message-dispatcher-%{+yyyy.MM.dd}"      
        }
    }
    if "ExampleApplication" in [tags]{
        elasticsearch {
           hosts => [ "elasticsearch:9200" ]
           index => "ExampleApplication-%{+yyyy.MM.dd}"      
        }
    }    
}

更多的输入和输出源的配置见官网

https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html

在kibana显示的效果

在这里插入图片描述

在kibana组件上查看,可以看到创建了一个filebeat开头的数据索引,如下图:

img

在日志搜索界面,可以看到service-hi应用输出的日志,如图所示:

img

使用filebeat发送日志

制作filebeat镜像

官方文档

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-getting-started.html

下载filebeat,下载命令如下:

https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.14.0-linux-x86_64.tar.gz

 wget  https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.2.0-linux-x86_64.tar.gz
 
 
tar -zxvf filebeat-7.2.0-linux-x86_64.tar.gz
mv filebeat-7.2.0-linux-x86_64 /usr/share/
cd /usr/share/filebeat-7.2.0-linux-x86_64/

制作基础的unbantu镜像

why unbantu? not alpine? not centos?

Alpine 只有仅仅 5 MB 大小,并且拥有很友好的包管理机制。

Docker 官方推荐使用 Alpine 替代 Ubuntu 做为容器的基础镜像。

曾经尝试使用alpine:3.7作为底层镜像, 按照zookeeper,但是一直启动不来,换成了centos的镜像,排查过程反复实验,耗时很久。

网上小伙伴构建filebeat镜像,基于alpine:3.7, 构建后的镜像运行时报“standard_init_linux.go:190: exec user process caused “no such file or directory””,故最后还是选择ubuntu。

这里选择ubuntu的原因,是其作为底层打包出来的镜像比centos要小很多。

# 基础镜像 生成的镜像作为基础镜像
FROM ubuntu:18.04


# 指定维护者的信息
MAINTAINER 尼恩@疯狂创客圈

# RUN apt-get update  && apt-get -y install openjdk-8-jdk



#install wget,sudo,python,vim,ping and ssh command

RUN sed -i s@/archive.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list   && apt-get  clean && \
    apt-get update && apt-get -y install wget && apt-get -y install sudo && \
    apt-get -y install iputils-ping && \
    apt-get -y install net-tools && \
    apt install -y tzdata && \
    rm -rf /etc/localtime  && ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime &&  dpkg-reconfigure -f noninteractive tzdata && \
    apt-get  clean

 #    echo "Asia/Shanghai" > /etc/timezone &&  dpkg-reconfigure -f noninteractive tzdata && \
 


# RUN dpkg-reconfigure -f noninteractive tzdata
    
# RUN apt-get clean


 #apt-get -y install python && \
 # apt-get -y install vim && \
 #  apt-get -y install openssh-server && \
 # apt-get -y install python-pip  && \


# 复制并解压
ADD jdk-8u121-linux-x64.tar.gz /usr/local/


ENV work_path /usr/local
WORKDIR $work_path

# java
ENV JAVA_HOME /usr/local/jdk1.8.0_121
ENV JRE_HOME  /usr/local/jdk1.8.0_121/jre
ENV CLASSPATH .:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
ENV PATH ${PATH}:${JAVA_HOME}/bin

dockfile add命令:

ADD指令的功能是将主机构建环境(上下文)目录中的文件和目录、以及一个URL标记的文件 拷贝到镜像中。

其格式是: ADD 源路径 目标路径

注意事项:

1、如果源路径是个文件,且目标路径是以 / 结尾, 则docker会把目标路径当作一个目录,会把源文件拷贝到该目录下。

如果目标路径不存在,则会自动创建目标路径。

2、如果源路径是个文件,且目标路径是不是以 / 结尾,则docker会把目标路径当作一个文件。

如果目标路径不存在,会以目标路径为名创建一个文件,内容同源文件;

如果目标文件是个存在的文件,会用源文件覆盖它,当然只是内容覆盖,文件名还是目标文件名。

如果目标文件实际是个存在的目录,则会源文件拷贝到该目录下。 注意,这种情况下,最好显示的以 / 结尾,以避免混淆。

3、如果源路径是个目录,且目标路径不存在,则docker会自动以目标路径创建一个目录,把源路径目录下的文件拷贝进来。

如果目标路径是个已经存在的目录,则docker会把源路径目录下的文件拷贝到该目录下。

4、如果源文件是个归档文件(压缩文件,比如 .tar文件),则docker会自动帮解压。

推送镜像到dockerhub

这个镜像解决了jdk问题,时区问题

推送到了dockerhub,大家可以直接作为基础镜像使用

docker login

docker tag 8d0abdffe76f nien/ubuntu:18.04

docker push nien/ubuntu:18.04

制作filebeat镜像

官方文档

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-getting-started.html

下载filebeat,下载命令如下:

https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.14.0-linux-x86_64.tar.gz

 wget  https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.2.0-linux-x86_64.tar.gz
 
 
tar -zxvf filebeat-7.2.0-linux-x86_64.tar.gz
mv filebeat-7.2.0-linux-x86_64 /usr/share/
cd /usr/share/filebeat-7.2.0-linux-x86_64/

dockerfile

# 基础镜像 生成的镜像作为基础镜像
FROM nien/ubuntu:18.04


# 指定维护者的信息
MAINTAINER 尼恩@疯狂创客圈


# 复制并解压
ADD filebeat-7.14.0-linux-x86_64.tar.gz /usr/local/

构建镜像

docker build -t filebeat:7.14.0  .

构建之后,进入容器,可以看到 /usr/local 目录下的filebeat-7.14.0-linux-x86_64

[root@cdh2 filebeat]# docker run -it filebeat:7.14.0 /bin/bash
root@7ba04f21f26e:/usr/local# ll
total 48
drwxr-xr-x 1 root root 4096 Apr  2 09:26 ./
drwxr-xr-x 1 root root 4096 Mar 16 03:27 ../
drwxr-xr-x 2 root root 4096 Mar 16 03:27 bin/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 etc/
drwxr-xr-x 5 root root 4096 Apr  2 09:26 filebeat-7.14.0-linux-x86_64/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 games/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 include/
drwxr-xr-x 8 uucp  143 4096 Dec 13  2016 jdk1.8.0_121/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 lib/
lrwxrwxrwx 1 root root    9 Mar 16 03:27 man -> share/man/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 sbin/
drwxr-xr-x 1 root root 4096 Apr  2 00:44 share/
drwxr-xr-x 2 root root 4096 Mar 16 03:27 src/

推送镜像到dockerhub

这个镜像解决了jdk问题,时区问题

推送到了dockerhub,大家可以直接作为基础镜像使用

[root@cdh2 filebeat]# docker tag fb44037ab5f9 nien/filebeat:7.14.0

[root@cdh2 filebeat]# docker push nien/filebeat:7.14.0
The push refers to repository [docker.io/nien/filebeat]
069c957c7a4e: Pushing [=======>                                           ]  19.99MB/140MB
b17e3cbc28a1: Mounted from nien/ubuntu
5695cc8dd56c: Mounted from nien/ubuntu
9d6787a516e7: Mounted from nien/ubuntu


如果要收集日志,就可以用这个基础镜像加点配置就ok啦

message-dispatcher微服务的filebeat.yml配置:

filebeat.inputs:
- type: message-dispatcher
  enabled: true
  paths:
    - /work/logs/output.log
output.logstash:
  hosts: ["logstash:5044"]

filebeat.yml的参考配置:

#=========================== Filebeat inputs =============================

filebeat.inputs:

- type: log
  enabled: true     #默认为false,修改为true则启用该配置
  paths:
    - /home/logs/*.log
  fields:
    filetype: test1    #自定义字段,用来区分多个类型日志
  fields_under_root: true    #如果该选项设置为true,则新增fields成为顶级目录,而不是将其放在fields目录下

#============================= Filebeat modules ===============================

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

#==================== Elasticsearch template setting ==========================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false

#================================ Outputs =====================================
#直接将log数据传输到Elasticsearch
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["localhost:9200"]
  username: "elastic"
  password: "elastic"

#----------------------------- Logstash output --------------------------------#将log数据传输到logstash#先启动logstash,不然的话filebeat会找不到logstash的5044端口
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

message-dispatcher微服务的filebeat.yml配置:

主要配置的是日志的搜集目录为/work/logs/output.log,这个目录是应用message-dispatcher输出日志的文件。

由于其他的微服务也是固定在这个 文件,

在这里插入图片描述

所以这个路径,基本可以固定。

message-dispatcher微服务的filebeat.yml配置:

输出到logstsh的地址为logstash,这里用的是容器的名称, logstash和 这个微服务,需要在同一个网络。

如果不是,可以使用虚拟机的名称,然后把 5044,映射到15044

filebeat.inputs:
- type: message-dispatcher
  enabled: true
  paths:
    - /work/logs/output.log
output.logstash:
  hosts: ["cdh2:15044"]

启动filebeat,执行一下命令:

nohup  /user/local/filebeat-7.14.0-linux-x86_64/filebeat  -c /work/filebeat/filebeat.yaml   >> /work/filebeat/out.log 2>&1  &

修改message-dispatcher的dockerfile

FROM  nien/filebeat:7.14.0

# 指定维护者的信息
MAINTAINER 尼恩@疯狂创客圈


ADD dispatcher-provider-1.0-SNAPSHOT.jar  /app/message-dispatcher.jar
ADD deploy-sit.sh  /app/run.sh
RUN chmod +x /app/run.sh

# WORKDIR /app/

ENTRYPOINT /bin/bash -c  "/app/run.sh start"
# ENTRYPOINT /bin/bash 

一键发布message-dispatcher

mkdir -p  /home/docker-compose/sit-ware/message-dispatcher

cd /home/docker-compose/sit-ware/message-dispatcher

docker-compose down
 
docker image  rm message-dispatcher:1.0-SNAPSHOT

rm -rf  /home/docker-compose/sit-ware/message-dispatcher

cp -rf /vagrant/sit-ware/message-dispatcher  /home/docker-compose/sit-ware


cd /home/docker-compose/sit-ware

chmod 777 -R  message-dispatcher

cd message-dispatcher

docker-compose --compatibility  up  -d

docker-compose  logs -f


docker-compose down

mysql/5.7.31

docker save mysql:5.7.31  -o /vagrant/3G-middleware/mysql.5.7.31.tar

docker save registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-server:v1.1.5        -o /vagrant/3G-middleware/canal-server.v1.1.5.tar

docker save registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-admin:v1.1.5        -o /vagrant/3G-middleware/canal-admin.v1.1.5.tar


docker exec -it message-dispatcher /bin/bash

tail -f /work/filebeat/out.log 


java -server -Xms64m -Xmx256m   -Dserver.port=7790 -Dspring.profiles.active=sit  -jar  /vagrant/sit-ware/springcloud-gateway/springcloud-gateway-1.0-SNAPSHOT.jar com.crazymaker.cloud.nacos.demo.gateway.starter.GatewayProviderApplication


java -server -Xms64m -Xmx256m -javaagent: -Dskywalking.agent.service_name=springcloud-gateway -Dskywalking.collector.backend_service=127.0.0.1  -Dserver.port=7790 -Dspring.profiles.active=sit  -jar /app/springcloud-gateway.jar com.crazymaker.cloud.nacos.demo.gateway.starter.GatewayProviderApplication

启动之后

spatcher    | ----------------------------------------------------------
message-dispatcher    |         UAA 推送中台 push-provider is running! Access URLs:
message-dispatcher    |         Local:          http://127.0.0.1:7703/message-dispatcher-provider/
message-dispatcher    |         swagger-ui:     http://127.0.0.1:7703/message-dispatcher-provider/swagger-ui.html
message-dispatcher    |         actuator:       http://127.0.0.1:7703/message-dispatcher-provider/actuator/info
message-dispatcher    |         ----------------------------------------------------------
message-di

http://cdh2:7703/message-dispatcher-provider/swagger-ui.html

message-dispatcher微服务的日志

在SpringBoot应用message-dispatcher微服务的日志,输出日志如下:

[root@cdh2 filebeat]# cd  /home/docker-compose/sit-ware/message-dispatcher/work/logs/
[root@cdh2 logs]# cat output.log
2022-04-02 09:03:30.103 [background-preinit] DEBUG o.h.v.m.ResourceBundleMessageInterpolator:89 - Loaded expression factory via original TCCL
2022-04-02 09:03:59.633 [main] INFO  o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e81692de] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-04-02 09:04:05.331 [main] INFO  c.a.n.client.config.impl.LocalConfigInfoProcessor:195 - LOCAL_SNAPSHOT_PATH:/root/nacos/config
2022-04-02 09:04:06.034 [main] INFO  com.alibaba.nacos.client.config.impl.Limiter:53 - limitTime:5.0
2022-04-02 09:04:06.899 [main] INFO  com.alibaba.nacos.client.config.utils.JVMUtil:47 - isMultiInstance:false
2022-04-02 09:04:07.068 [main] WARN  c.a.cloud.nacos.client.NacosPropertySourceBuilder:87 - Ignore the empty nacos configuration and get it based on dataId[message-dispatcher-provider] & group[DEFAULT_GROUP]
2022-04-02 09:04:07.100 [main] WARN  c.a.cloud.nacos.client.NacosPropertySourceBuilder:87 - Ignore the empty nacos configuration and get it based on dataId[message-dispatcher-provider.yml] & group[DEFAULT_GROUP]
2022-04-02 09:04:07.191 [main] INFO  o.s.c.b.c.PropertySourceBootstrapConfiguration:101 - Located property source: CompositePropertySource {name='NACOS', propertySources=[NacosPropertySource {name='message-dispatcher-provider-sit.yml,DEFAULT_GROUP'}, NacosPropertySource {name='message-dispatcher-provider.yml,DEFAULT_GROUP'}, NacosPropertySource {name='message-dispatcher-provider,DEFAULT_GROUP'}, NacosPropertySource {name='sharding-db-dev.yml,DEFAULT_GROUP'}]}
2022-04-02 09:04:07.304 [main] INFO  c.c.s.message.start.MessageDispatchApplication:652 - The following profiles are active: sit
2022-04-02 09:04:28.417 [main] INFO  o.s.d.r.config.RepositoryConfigurationDelegate:247 - Multiple Spring Data modules found, entering strict repository configuration mode!
2022-04-02 09:04:28.418 [main] INFO  o.s.d.r.config.RepositoryConfigurationDelegate:127 - Bootstrapping Spring Data JPA repositories in DEFAULT mode.
2022-04-02 09:04:34.251 [main] INFO  o.s.d.r.config.RepositoryConfigurationDelegate:185 - Finished Spring Data repository scanning in 5673ms. Found 3 JPA repository interfaces.
2022-04-02 09:04:37.630 [main] WARN  o.springframework.boot.actuate.endpoint.EndpointId:131 - Endpoint ID 'nacos-config' contains invalid characters, please migrate to a valid format.
2022-04-02 09:07:17.969 [main] ERROR org.springframework.boot.SpringApplication:823 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'messageController': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'messagePushServiceImpl': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'rocketmqMessageService' defined in URL [jar:file:/app/message-dispatcher.jar!/BOOT-INF/classes!/com/crazymaker/springcloud/message/service/impl/RocketmqMessageService.class]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <dh2/192.168.56.122:9876> timeout, 3000(ms)
        at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.postProcessProperties(CommonAnnotationBeanPostProcessor.java:325)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1404)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:592)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)

然后在部署了filebeat的机器上部署该应用,应用的输出文件为/var/log/service-hi.log,应用启动命令如下:

1 nohup java -jar  elk-test-0.0.1-SNAPSHOT.jar > /var/log/service-hi.log 2>&1  &

应用启动成功后日志输出如下:

1 2019-07-02 17:13:13.530  INFO 31579 --- [pool-1-thread-1] com.example.elktest.ElkTestApplication   : seed:562779
2 2019-07-02 17:13:13.630  INFO 31579 --- [pool-1-thread-1] com.example.elktest.ElkTestApplication   : seed:963836
3 2019-07-02 17:13:13.730  INFO 31579 --- [pool-1-thread-1] com.example.elktest.ElkTestApplication   : seed:825694
4 2019-07-02 17:13:13.830  INFO 31579 --- [pool-1-thread-1] com.example.elktest.ElkTestApplication   : seed:33228
5 2019-07-02 17:13:13.930  INFO 31579 --- [pool-1-thread-1] com.example.elktest.ElkTestApplication   : seed:685589

这时的日志数据的传输路径如下图:

img

查看日志索引

docker run --name filebeat -d  \
-v /home/qw/elk/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v /home/qw/elk/testlog/:/home/ \
 elastic/filebeat:7.2.0

效果

在这里插入图片描述

可以看到 在kibana中多了两个索引
需要配置

在这里插入图片描述
创建一个
在这里插入图片描述
选择
在这里插入图片描述
最终展示

在这里插入图片描述
到这里简单收集日志就完成了,需要更多复杂业务配置,需要大家根据需求自己配置详细信息.

logstash 详解

Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,

logstash 还有强大的插件功能,常用于日志处理.
logstash我们只让它进行日志处理,处理完之后将其输出到elasticsearch。

官方文档

https://www.elastic.co/guide/en/logstash/7.17/index.html

stash第一个事件

Logstash管道有两个必需元素,输入和输出,以及一个可选元素filter。

输入插件使用来自源的数据,过滤器插件在您指定时修改数据,输出插件将数据写入目标。
如下图

img

根据官方文档Logstash对数据的处理主要流程是

  1. 首先数据传入logstash,在其内部对数据进行过滤和处理
  2. logstash将处理过的数据传递给Elasticsearch
  3. Elasticsearch对数据进行存储、创建索引等内容
  4. kibana对数据提供可视化的支持

Logstash的核心流程的三个环节

Logstash核心分三个环节:

  • 数据输入
  • 数据处理
  • 数据输出

其数据输入、处理、输出主要在配置中间中下面部分进行配置

input {}
filter {}
output {}

logstash数值类型

  • 数组

match =>[“datetime”, “UNIX”, “ISO8601”]

  • 布尔

必须是一个true或false

ssl_enable => true

  • 字节

一个字段是字节字符串字段表示有效字节的单元。它是一种方便的方式在特定尺寸的插件选项。

支持SI (k M G T P E Z Y)和Binary (TiKimigipiziyiei)单位。

二进制单元在基座单元和Si-1024在基底1000。

这个字段是大小写敏感的。如果未指定单位,则整数表示的字符串的字节数。

my_bytes => "1113" # 1113 bytes 

my_bytes => "10MiB" # 10485760 bytes

 my_bytes => "100kib" # 102400bytes 

my_bytes => "180 mb"# 180000000 bytes
  • 编解码器

codec => “json”

  • 哈希

哈希是一个键值对的集合中指定的格式,多个键值对的条目以空格分隔而不是逗号。

match => { “field1” => “value1” “field2” =>“value2” … }

  • 数字

数字必须有效的数字值(浮点或整数)。

port => 33

  • 密码

密码是一个字符串的单个值,则不对其进行记录或打印。

my_password => “password”

  • uri

my_uri =>“http://foo:bar@example.net”

  • 路径

一个路径是一个字符串,表示系统运行的有效路径。

my_path =>“/tmp/logstash”

  • 转义序列

默认地,转义字符没有被启用。如果你希望使用转义字符串序列,您需要在你的logstash.yml中设置config.support_escapes: true

TextResult
\rcarriage return (ASCII 13)
\nnew line (ASCII 10)
\ttab (ASCII 9)
\backslash (ASCII 92)
"double quote (ASCII 34)
single quote (ASCII 39)

logstash 条件判断

有时您只想在特定条件下过滤或输出事件。为此,您可以使用条件。

Logstash中的条件查看和行为与编程语言中的条件相同。条件语句支持if,else if以及else报表和可以被嵌套。

条件语法

if EXPRESSION{ … } else if EXPRESSION { … } else { … }

logstash 比较运算符

等于: ==, !=, <, >, <=, >=
  正则: =~, !~ (checks a pattern on the right against a string value on the left)
  包含关系: in, not in

支持的布尔运算符:and, or, nand, xor

支持的一元运算符: !

作用符号
等于==
不等于!=
小于<
大于>
小于等于<=
大于等于>=
匹配正则=~
不匹配正则!~
包含in
不包含not in
and
or
非与nand
非或xor
复合表达式()
取反符合!()

数据输入环节

input配置定义了数据的来源。其主要支持下面方式

事件源可以是从stdin屏幕输入读取,可以从file指定的文件,也可以从es,filebeat,kafka,redis等读取

stdin

监控控制台输入。

要测试Logstash安装成功,运行最基本的Logstash管道。 执行以下的命令

bin/logstash -e 'input { stdin { } } output { stdout {} }'

-e 标志使您可以直接从命令行指定配置。

通过在命令行指定配置,可以快速测试配置,而无需在迭代之间编辑文件。

示例中的管道从标准输入stdin获取输入,并以结构化格式将输入移动到标准输出stdout。

启动Logstash后,等到看到“Pipeline main started”,然后在命令提示符下输入hello world,显示的如下:

hello world
{
     "host" => "VM_0_13_centos",
     "message" => "hello world",
     "@version" => "1",
    "@timestamp" => 2019-07-02T06:26:28.684Z
}

file

监控文件内容

file{
    path => ['/var/log/nginx/access.log']  #要输入的文件路径
    type => 'nginx_access_log'
    start_position => "beginning"
}

  • path 可以用/var/log/.log,/var/log/**/.log,

  • type 通用选项. 用于激活过滤器

  • start_position 选择logstash开始读取文件的位置,begining或者end。

还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网

syslogs

从syslogs读取数据

syslog{
    port =>"514" 
    type => "syslog"
}

# port 指定监听端口(同时建立TCP/UDP的514端口的监听)

#从syslogs读取需要实现配置rsyslog:
# cat /etc/rsyslog.conf   加入一行
*.* @172.17.128.200:514   #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
# service rsyslog restart   #重启日志服务

beats

从Elastic beats接收数据

beats {
    port => 5044   #要监听的端口
}
# 还有host等选项

# 从beat读取需要先配置beat端,从beat输出到logstash。
# vim /etc/filebeat/filebeat.yml 
..........
output.logstash:
hosts: ["localhost:5044"]

kafka

从kafka topic中读取数据

kafka{
    bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
    topics => ["access_log"]
    group_id => "logstash-file"
    codec => "json"
}
kafka{
    bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
    topics => ["weixin_log","user_log"]  
    codec => "json"
}


# bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
# topics  要订阅的主题列表,kafka topics
# group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id
# codec 通用选项,用于输入数据的编解码器。

数据处理环节

filter plugin 过滤器插件,主要是对数据进行处理。

grok解析文本并构造

Grok 是一个十分强大的 Logstash Filter 插件,它可以通过正则解析任意文本,将非结构化日志数据格式转换为结构化的、方便查询的结构。

它是目前 Logstash 中解析非结构化日志数据最好的方式。

Grok 的语法规则是:
这里的 “语法” 指的是匹配模式,例如,使用 NUMBER 模式可以匹配出数字,IP 模式则会匹配出 127.0.0.1 这样的 IP 地址。比如按以下格式输入内容:

172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

那么,
• %{IP:clientip} 匹配模式将获得的结果为:clientip: 172.16.213.132
• %{HTTPDATE:timestamp} 匹配模式将获得的结果为:timestamp: 16/Jun/2020:16:24:19 +0800
• %{QS:referrer} 匹配模式将获得的结果为:referrer: “GET / HTTP/1.1”
到这里为止,我们已经获取了上面输入中前三个部分的内容,分别是 clientip、timestamp 和 referrer 三个字段。

如果要获取剩余部分的信息,方法类似。

要在线调试 Grok,可以点击在线调试,可点击这里进行在线调试,非常方便。

下面是一个组合匹配模式,它可以获取上面输入的所有内容:

%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}

正则匹配是非常严格的匹配,在这个组合匹配模式中,使用了转义字符 \,这是因为输入的内容中有空格和中括号。

通过上面这个组合匹配模式,我们将输入的内容分成了 5 个部分,即 5 个字段。

将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是我们使用 grok 的目的。

Logstash 默认提供了近 200 个匹配模式(其实就是定义好的正则表达式)让我们来使用,可以在 Logstash 安装目录下找到。

例如,我这里的路径为:

/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns

此目录下有定义好的各种匹配模式,基本匹配定义在 grok-patterns 文件中。

从这些定义好的匹配模式中,可以查到上面使用的四个匹配模式对应的定义规则。

除此之外,还有很多默认定义好的匹配模式文件,比如 httpd、java、linux-syslog、redis、mongodb、nagios 等,这些已经定义好的匹配模式,可以直接在 Grok 过滤器中进行引用。

当然也可以定义自己需要的匹配模式。

在了解完 Grok 的匹配规则之后,下面通过一个配置实例深入介绍下 Logstash 是如何将非结构化日志数据转换成结构化数据的。

首先看下面的一个事件配置文件:

input{
  stdin{}
}
filter{
   grok{
     match => ["message", "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %         {NUMBER:response}\ %{NUMBER:bytes}"]
   }
}
output{
   stdout{
     codec => "rubydebug"
   }
}

在这个配置文件中,输入配置成了 stdin,在 filter 中添加了 grok 过滤插件,并通过 match 来执行正则表达式解析,

grok 中括号中的正则表达式就是上面提到的组合匹配模式,然后通过 rubydebug 编码格式输出信息。

这样的组合有助于调试和分析输出结果。

通过此配置启动 Logstash 进程后,我们仍然输入之前给出的那段内容:

172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

然后,查看 rubydebug 格式的日志输出,内容如下:

{
     "timestamp" => "16/Jun/2020:16:24:19 +0800",
      "response" => "403",
         "bytes" => "5039",
      "@version" => "1",
      "clientip" => "172.16.213.132",
          "host" => "nnmaster.cloud",
      "referrer" => "\"GET / HTTP/1.1\"",
       "message" => "172.16.213.132 [16/Jun/2020:16:24:19 +0800] \"GET / HTTP/1.1\" 403 5039",
    "@timestamp" => 2020-06-16T07:46:53.120Z
}

从这个输出可知,通过 Grok 定义好的 5 个字段都获取到了内容,并正常输出了。

date日期解析

解析字段中的日期,然后转存到@timestamp

[2018-07-04 17:43:35,503]

grok{
      match => {"message"=>"%{DATA:raw_datetime}"}
}
date{
      match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
      remove_field =>["raw_datetime"]
}
#将raw_datetime存到@timestamp 然后删除raw_datetime



#24/Jul/2018:18:15:05 +0800
date {
      match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
}



mutate字段转换

mutate字段转换, 对字段做处理 重命名、删除、替换和修改字段。

Mutate过滤器的配置选项

选项类型是否必须简述
converthashNo转化命令,是对字段类型做转化,例如:String转为integer
copyhashNo将一个已经存在的字段复制给另一个字段。
gsubarrayNo通过正则表达式匹配字段的值,然后替换为指定的字符串。
joinhashNo使用分隔符连接数组。
lowercasearrayNo将string类型的字段值转化为小写的形式。
mergehashNo合并两个数组或者Hash类型的字段。string类型的字段会自动的合并为一个数组。
coercehashNo为存在但是不为空的字段设置默认值
renamehashNo字段重命名
replacehashNo将一个字段的值替换为一个新的值。
splithashNo将一个字段按照指定符号切割为数组。
striparrayNo去除字段中的空格。
updatehashNo更新字段为一个新值。
uppercasearrayNo将字符串字段转化为大写形式。
capitalizearrayNo将字符串字段转化为首字母大写的形式。
tag_on_failurestringNo错误发生时的配置

covert类型转换

covert:类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean

  • 字段类型为 hash
  • 没有默认值

将字段转化为不同的类型,例如:string 转 integer。

如果被转化的字段类型是数组,数组的所有成员都将被转化。如果对象是hash 就不会进行转化。

实例:

filter {
  mutate {
    convert => {
      "fieldname" => "integer"
      "booleanfield" => "boolean"
    }
  }
}

split

split:使用分隔符把字符串分割成数组

eg:

mutate{
    split => {"message"=>","}
}

aaa,bbb


{
    "@timestamp" => 2018-06-26T02:40:19.678Z,
      "@version" => "1",
          "host" => "localhost",
       "message" => [
        [0] "aaa",
        [1] "bbb"
    ]}

192,128,1,100

{
        "host" => "localhost",
     "message" => [
      [0] "192",
      [1] "128",
      [2] "1",
      [3] "100"
 ],
  "@timestamp" => 2018-06-26T02:45:17.877Z,
    "@version" => "1"
}


mutate{
    split => {"message"=>","}
}

merge

merge:合并字段 。数组和字符串 ,字符串和字符串

eg:

filter{
    mutate{
        add_field => {"field1"=>"value1"}
    }
    mutate{ 
          split => {"message"=>"."}   #把message字段按照.分割
    }
    mutate{
        merge => {"message"=>"field1"}   #将filed1字段加入到message字段
    }
}

输入:abc



{
       "message" => [
        [0] "abc,"
        [1] "value1"
    ],
    "@timestamp" => 2018-06-26T03:38:57.114Z,
        "field1" => "value1",
      "@version" => "1",
          "host" => "localhost"
}

输入:abc,.123



{
       "message" => [
        [0] "abc,",
        [1] "123",
        [2] "value1"
    ],
    "@timestamp" => 2018-06-26T03:38:57.114Z,
        "field1" => "value1",
      "@version" => "1",
          "host" => "localhost"
}

rename

rename:对字段重命名

filter{
    mutate{
        rename => {"message"=>"info"}
    }
}

123


{
    "@timestamp" => 2018-06-26T02:56:00.189Z,
          "info" => "123",
      "@version" => "1",
          "host" => "localhost"
}



remove_field:移除字段

mutate {
    remove_field => ["message","datetime"]
}

join

join:用分隔符连接数组,如果不是数组则不做处理

mutate{
        split => {"message"=>":"}
}
mutate{
        join => {"message"=>","}
}

abc:123
{
    "@timestamp" => 2018-06-26T03:55:41.426Z,
       "message" => "abc,123",
          "host" => "localhost",
      "@version" => "1"
}
aa:cc
{
    "@timestamp" => 2018-06-26T03:55:47.501Z,
       "message" => "aa,cc",
          "host" => "localhost",
      "@version" => "1"
}


gsub:用正则或者字符串替换字段值。仅对字符串有效

mutate{
        gsub => ["message","/","_"]   #用_替换/
    }

------>
a/b/c/
{
      "@version" => "1",
       "message" => "a_b_c_",
          "host" => "localhost",
    "@timestamp" => 2018-06-26T06:20:10.811Z
}

update:更新字段。如果字段不存在,则不做处理

mutate{
        add_field => {"field1"=>"value1"}
    }
    mutate{
        update => {"field1"=>"v1"}
        update => {"field2"=>"v2"}    #field2不存在 不做处理
    }
---------------->
{
    "@timestamp" => 2018-06-26T06:26:28.870Z,
        "field1" => "v1",
          "host" => "localhost",
      "@version" => "1",
       "message" => "a"
}


replace:更新字段。如果字段不存在,则创建

mutate{
        add_field => {"field1"=>"value1"}
    }
    mutate{
        replace => {"field1"=>"v1"}
        replace => {"field2"=>"v2"}
    }
---------------------->
{
       "message" => "1",
          "host" => "localhost",
    "@timestamp" => 2018-06-26T06:28:09.915Z,
        "field2" => "v2",        #field2不存在,则新建
      "@version" => "1",
        "field1" => "v1"
}


geoip

根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息

 geoip {
            source => "clientip"
            database =>"/tmp/GeoLiteCity.dat"
        }

ruby

ruby插件可以执行任意Ruby代码

filter{
    urldecode{
        field => "message"
    }
    ruby {
        init => "@kname = ['url_path','url_arg']"
        code => " 
            new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))]) 
            event.append(new_event)"
    }
    if [url_arg]{
        kv{
            source => "url_arg"
            field_split => "&"
            target => "url_args"
            remove_field => ["url_arg","message"]
        }
    }
}
# ruby插件
# 以?为分隔符,将request字段分成url_path和url_arg
-------------------->
www.test.com?test
{
       "url_arg" => "test",
          "host" => "localhost",
      "url_path" => "www.test.com",
       "message" => "www.test.com?test",  
      "@version" => "1",
    "@timestamp" =>  2018-06-26T07:31:04.887Z
}
www.test.com?title=elk&content=学习elk
{
      "url_args" => {
          "title" => "elk",
        "content" => "学习elk"
    },
          "host" => "localhost",
      "url_path" => "www.test.com",
      "@version" => "1",
    "@timestamp" =>  2018-06-26T07:33:54.507Z
}

urldecode

用于解码被编码的字段,可以解决URL中 中文乱码的问题

urldecode{
        field => "message"
    }

# field :指定urldecode过滤器要转码的字段,默认值是"message"
# charset(缺省): 指定过滤器使用的编码.默认UTF-8

kv

通过指定分隔符将字符串分割成key/value

kv{
        prefix => "url_"   #给分割后的key加前缀
        target => "url_ags"    #将分割后的key-value放入指定字段
        source => "message"   #要分割的字段
        field_split => "&"    #指定分隔符
        remove_field => "message"
    }
-------------------------->
a=1&b=2&c=3
{
            "host" => "localhost",
       "url_ags" => {
          "url_c" => "3",
          "url_a" => "1",
          "url_b" => "2"
    },
      "@version" => "1",
    "@timestamp" => 2018-06-26T07:07:24.557Z

useragent

添加有关用户代理(如系列,操作系统,版本和设备)的信息

if [agent] != "-" {
  useragent {
    source => "agent"
    target => "ua"
    remove_field => "agent"
  }
}
# if语句,只有在agent字段不为空时才会使用该插件
#source 为必填设置,目标字段
#target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中


数据输出

output配置定义了数据输出目标

stdout

将数据输出到屏幕上

input{
      file{
        path=>"/home/order.log"
 	    discover_interval => 10 
 	    start_position => "beginning"
      }
}
output{
    stdout { codec => rubydebug }
}

在这里插入图片描述

file

将数据写入文件

读取指定文件-输出到文件

input{
      file{
        path=>"/home/order.log"
 	    discover_interval => 10 
 	    start_position => "beginning"
      }
}
output{
     file{
         path=>"/home/aaa.log"
      }
}

ps: 需要注意的是 这里的输出文件必须要求 w的权限 看看是否报错

如果报错需要进入容器赋权

kafka

数据发送到kafka

 kafka{
        bootstrap_servers => "localhost:9092"
        topic_id => "test_topic"  #必需的设置。生成消息的主题
    }

elasticseach

数据存储到elasticseach中

读取指定文件-输出到es

input{
      file{
            path=>"/home/order.log"
 	    discover_interval => 10 
 	    start_position => "beginning"
      }
}
output{
      elasticsearch{
            hosts=>["172.30.66.86:9200"]
            index => "test-%{+YYYY.MM.dd}"
      }
}

es的安全认证

通常搭建的elk默认是不需要身份认证,这样就会把数据暴露在外网,因此会显得非常危险。

下面我们介绍如何为es加入身份认证
es身份认证参考链接

切记,这里 修改es 配置文件和 启动es的二进制文件的时候 一定要用es系统用户不要用ubuntu或root用户操作。不然会报错。

配置了 安全认证后 logstash + filebeat +es +kibfana 都需要在配置文件中 加入 访问的账号密码来认证。
logstash 配置文件

elasticsearch {
      hosts => ["ip:9200"]
      user => elastic  --加入es用户
      password => xxxx   --加入es密码
      index => "test-%{+YYYY-MM-dd}"
      timeout => 300
  }

kibfana 配置文件

配置 Kibana 以使用内置 kibana 用户和您创建的密码

server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"  --配置 kibana 显示中文
elasticsearch.username: "kibana"   --加入kibana 账户
elasticsearch.password: "123456"   --加入kibana 账户的密码 

配置 elk的ElastAlert 预警插件

我们都知道 elk架构 是收集与分析 集群的日志 方便开发排错

但是 预警功能是缺一不可的,如果开发人员不能及时查看线上错误日式,这个时候 就需要我们的预警插件来实现实时推送告警。
ElastAlert : 是python开发一个插件因此需要配合python运行环境和python-pip 包管理工具,以及相关依赖包

1.安装相关依赖包

yum -y install openssl openssl-devel gcc gcc-c++  --centos系统安装方式
--ubuntu 安装方式
sudo apt-get install openssl  --openssl依赖包
sudo apt-get install libssl-dev  --openssl-devel 依赖包 
sudo apt-get  install  build-essential   --gcc 依赖包 注意:gcc和g++版本必须一致
sudo apt-get install g++ 7.4  --g++ 依赖包 
g++ --version --查看版本
gcc --version
wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz --下载二进制python源码

2.安装python运行环境

tar xf Python-3.6.9.tgz
cd Python-3.6.9./configure --prefix=/usr/local/python --with-openssl
make && make install  --编译源码

配置

mv /usr/bin/python /usr/bin/python_old  //把ubuntu自带的python2.7环境移出到另外一个文件夹
ln -s /usr/local/python/bin/python3 /usr/bin/python  //建立python软链接
ln -s /usr/local/python/bin/pip3 /usr/bin/pip  //建立pip软链接
pip install --upgrade pip //此处没有安装pip的需要去安装pip
sudo apt install python3-pip //安装pip3.0版本 对应了python 3.6.9版本
//此处我没有动ubuntu自带的python2.7版本的 因此我们使用新的python使用3.6.9时,按以下方式使用:
python3.6 --version
python2.7 --version
pip3 --version
//使用python和pip命令时 都改为 python3.6与pip3

到此python环境配置完成

3.安装elastalert

下载源码

git clone https://github.com/Yelp/elastalert.git //下载 源码
cd elastalert
pip3 install "elasticsearch<8,>7"    
//因为我们的es是7.4.0,所以这里选用的版本是这个
pip3 install -r requirements.txt 用pip安装依赖

安装成功时候 /usr/local/python/bin/目录下会有四个文件

ls /usr/local/python/bin/elastalert* 或者这个目录下
ls /usr/local/bin/elastalert*

在这里插入图片描述

ln -s /usr/local/python/bin/elastalert* /usr/bin  //建立软链接把这四个命令链接到bin目录下

4. 配置ElastAlert
配置config.yaml 文件 (创建)

cp config.yaml.example  config.yaml 
sudo vi config.yaml

在这里插入图片描述

rules_folder:ElastAlert从中加载规则配置文件的位置。它将尝试加载文件夹中的每个.yaml文件。

没有任何有效规则,ElastAlert将无法启动。

run_every: ElastAlert多久查询一次Elasticsearch的时间。

buffer_time:查询窗口的大小,从运行每个查询的时间开始向后延伸。对于其中use_count_query或use_terms_query设置为true的规则,将忽略此值。

es_host:是Elasticsearch群集的地址,ElastAlert将在其中存储有关其状态,查询运行,警报和错误的数据。
es_port:es对应的端口。

es_username: 可选的; 用于连接的basic-auth用户名es_host。

es_password: 可选的; 用于连接的basic-auth密码es_host。

es_send_get_body_as: 可选的; 方法查询Elasticsearch - GET,POST或source。

默认是GET writeback_index:ElastAlert将在其中存储数据的索引的名称。我们稍后将创建此索引。

alert_time_limit: 失败警报的重试窗口。

创建elastalert-create-index索引 告警索引

$ elastalert-create-index
New index name (Default elastalert_status)
Name of existing index to copy (Default None)
New index elastalert_status created
Done!

5.配置Rule 告警规则配置
所有的告警规则,通过在example_rules目下创建配置文件进行定义,这里简单创建一个来作为演示

name: Nginx_err  //规则名称
use_strftine_index: true 
index: 10.0.0.153-system_cro-2020.11.18  //监听查询es的索引
type: any    //告警规则类型 有很多种 这种是 只要匹配到就触发告警
aggregation:
 seconds: 1    //告警频率
filter:
- query:
    query_string:
         query: "status:500 or status:404" //触发报警的匹配条件 这里可以用kibana的语法去匹配
num_events: 1  //事件触发次数 的贬值
timeframe:
  minutes: 1   //一分钟内超过 num_envents触发的次数 就触发告警
alert:
 - "email"   //告警类型 此处是email 例如钉钉 企业微信
email_format: html  //email 正文格式
alert_subject: "正式环境Error告警"  //告警正文标题
alert_text_type: alert_text_only   //正文类型
alert_text: "<br><br><h3>告警详情</h3><table><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>@timestamp:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>@version:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>_id:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>_index:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>ip:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>request:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td  style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>status:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>method:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>bytes:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>source:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>client_ip:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr><tr><td style='padding:5px;text-align: right;font-weight: bold;border-radius: 5px;background-color: #eef;'>httpversion:</td><td style='padding:5px;border-radius: 5px;background-color: #eef;'>{}</td></tr></table>"  //正文内容
alert_text_args:
 - "@timestamp"   //使用的是python的format格式动态填充数据
 - "@version"     //这些是属性值 按顺序对饮正文内容里面的 {} 
 - _id
 - _index
 - host.name
 - request
 - status
 - method
 - bytes
 - message
 - remote_ip
 - httpversion
email:
 - "xxx@xx.com"  //收件人 多个请依次往下填写
 - "xxxx@qq.com"
 - "xxxx@xx.com"
smtp_host: smtp.mxhichina.com  //邮件服务器
smtp_port: 25   //邮件端口
smtp_auth_file: /home/ubuntu/elk/alert/elastalert/smtp_auth_file.yaml //此处新建了一个文件是 发件人的认证文件 存放发件人账户和密码或授权码
from_addr: haoyacong@gimmake.com  //发件人
email_reply_to: haoyacong@gimmake.com  //收件人标头

运行ElastAlert

cd ElastAlert  //ElastAlert 的安装目录
python3.6 -m elastalert.elastalert --verbose --config config.yaml --rule ./example_rules/nginx_404.yaml  //指定告警规则文件  
nohup python3.6 -m elastalert.elastalert --verbose --config config.yaml --rule ./example_rules/nginx_404.yaml & //在后台运行
//如果运行多个告警规则执行多个上面的命令  如果执行example_rules下的全部规则文件 使用以下命令:
nohup python3.6 -m elastalert.elastalert --verbose --config config.yaml &

参考链接

https://gitee.com/bison-fork/loki/blob/v2.2.1/production/docker-compose.yaml

SkyWalking官网 http://skywalking.apache.org/zh/
SkyWalking的docker github地址 https://github.com/apache/sky…
elasticsearch https://www.elastic.co/guide/…
skywalking中文文档 https://skyapm.github.io/docu…
agent config https://github.com/apache/sky…

skywalking和其它agent一起使用的处理

https://zhuanlan.zhihu.com/p/163809795

https://www.cnblogs.com/you-men/p/14900249.html

https://cloud.tencent.com/developer/article/1684909

https://www.cnblogs.com/javaadu/p/11742605.html

https://www.jianshu.com/p/2fa99bd1997e

https://blog.csdn.net/weixin_42073629/article/details/106775584

https://www.cnblogs.com/kebibuluan/p/14466285.html

https://blog.csdn.net/weixin_42073629/article/details/106775584

https://blog.csdn.net/Jerry_wo/article/details/107937902

https://www.cnblogs.com/wzxmt/p/11031110.html

https://blog.csdn.net/zhangshng/article/details/104558016

https://blog.csdn.net/yurun_house/article/details/109025588

相关文章:

  • 纯CSS锚点过渡效果,CSS3的属性scroll-behavior: smooth;
  • 手撕前端面试题【javascript~文件扩展名、分隔符、单向绑定、判断版本、深浅拷贝、内存泄露等】
  • 谷粒学苑项目后台管理系统
  • 数据分析之Numpy学习
  • es6的学习
  • CDGA|商业银行要强化数据风险管理
  • 怎么进行你的代码优化 编译器怎么优化你的代码
  • vue实战-Search模块开发(大体步骤)
  • C#基础--特殊的集合
  • 吴恩达2022机器学习_第二部分高级学习算法笔记
  • DGL教程
  • FAST-LIO2代码解析(五)
  • 苦卷28天,阿里P8给我的Alibaba面试手册,终于成功踹开字节大门
  • Vue:v-on、v-bind、v-model、@click、:model用法以及区别(附代码实例)
  • 手写Sping IOC(基于Setter方法注入)
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • AWS实战 - 利用IAM对S3做访问控制
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • golang 发送GET和POST示例
  • iOS小技巧之UIImagePickerController实现头像选择
  • MaxCompute访问TableStore(OTS) 数据
  • PHP的类修饰符与访问修饰符
  • python docx文档转html页面
  • ReactNative开发常用的三方模块
  • React中的“虫洞”——Context
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • swift基础之_对象 实例方法 对象方法。
  • Swoft 源码剖析 - 代码自动更新机制
  • vue2.0开发聊天程序(四) 完整体验一次Vue开发(下)
  • 浮动相关
  • 基于游标的分页接口实现
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 提升用户体验的利器——使用Vue-Occupy实现占位效果
  • 提醒我喝水chrome插件开发指南
  • 通信类
  • 我这样减少了26.5M Java内存!
  • Salesforce和SAP Netweaver里数据库表的元数据设计
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • # .NET Framework中使用命名管道进行进程间通信
  • #大学#套接字
  • (1)(1.13) SiK无线电高级配置(六)
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (C语言)字符分类函数
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (附源码)springboot工单管理系统 毕业设计 964158
  • (紀錄)[ASP.NET MVC][jQuery]-2 純手工打造屬於自己的 jQuery GridView (含完整程式碼下載)...
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(七)
  • (三)elasticsearch 源码之启动流程分析
  • (生成器)yield与(迭代器)generator
  • (一)C语言之入门:使用Visual Studio Community 2022运行hello world
  • (一)Mocha源码阅读: 项目结构及命令行启动
  • (一一四)第九章编程练习
  • *(长期更新)软考网络工程师学习笔记——Section 22 无线局域网