当前位置: 首页 > news >正文

ElasticSearch-ELK

  • Logstash
    • Logstash 配置文件结构
    • Logstash 导入数据到 ES
    • 同步数据库数据到 ES
  • FileBeat
  • ELK(采集 Tomcat 服务器日志)
    • 使用FileBeats将日志发送到Logstash
    • Logstash输出数据到Elasticsearch(logstash开头的索引)
      • 利用Logstash过滤器解析日志
        • 使用Grok插件通过模式匹配的方式来识别日志中的数据
        • 使用mutate插件过滤掉不需要的字段
        • 使用Date插件将日期格式进行转换
    • 输出到Elasticsearch指定索引
    • 完整的Logstash配置文件

Logstash

  • Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到存储库

  • Pipeline

    • 包含了 input-filter-output 三个阶段的处理流程
    • 插件生命周期管理
    • 队列管理
  • Logstash Event

    • 数据在内部流转时的具体表现形式:数据在 input 阶段被转换为 Event,在 output 被转化成目标格式数据
    • Event 其实是一个 Java Object,在配置文件中,对 Event 的属性进行增删改查
  • Codec (Code / Decode):将原始数据decode成Event;将Event encode成目标数据

    • 在这里插入图片描述
  • Logstash数据传输原理

    • 数据采集与输入:Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据
    • 实时解析和数据转换:通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中
    • 存储与数据导出:Logstash提供多种输出选择,可以将数据发送到指定的地方
  • Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件

    • input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理
    • 在这里插入图片描述
  • Logstash 配置文件结构

    • Logstash的管道配置文件对每种类型的插件都提供了一个单独的配置部分,用于处理管道事件
    • 每个配置部分可以包含一个或多个插件
      • 指定多个filter插件,Logstash会按照它们在配置文件中出现的顺序进行处理
  • bin/logstash ‐f logstash‐demo.conf 运行

input {stdin { } 
}filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }}date {match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] }
}output {elasticsearch { hosts => ["localhost:9200"]} stdout { codec => rubydebug }
}
  • Logstash Queue

    • 在这里插入图片描述

      • In Memory Queue: 进程Crash,机器宕机,都会引起数据的丢失
      • Persistent Queue: 机器宕机,数据也不会丢失; 数据保证会被消费; 可以替代 Kafka等消息队列缓冲区的作用
    • queue.type: persisted 默认是 memory

    • queue.max_bytes: 4gb

  • Codec Plugin - Multiline

    • pattern: 设置行匹配的正则表达式
    • what : 如果匹配成功,那么匹配行属于上一个事件还是下一个事件
      • previous / next
    • negate : 是否对pattern结果取反
      • true / false
input {stdin {codec => multiline {pattern => "^\s"what => "previous"}}
}filter { }output {stdout { codec => rubydebug } 
}
  • Input Plugin - File
    • 支持从文件中读取数据,如日志文件
    • 文件读取需要解决的问题:只被读取一次。重启后需要从上次读取的位置继续 (通过 sincedb 实现)
    • 读取到文件新内容,发现新文件
    • 文件发生归档操作 (文档位置发生变化,日志 rotation),不能影响当前的内容读取
  • Filter Plugin:Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换
    • Date: 日期解析
    • Dissect: 分割符解析
    • Grok: 正则匹配解析
    • Mutate: 处理字段。重命名,删除,替换
    • Ruby: 利用Ruby 代码来动态修改Event
  • Filter Plugin - Mutate
    • Convert : 类型转换
    • Gsub : 字符串替换
    • Split / Join /Merge: 字符串切割,数组合并字符串,数组合并数组
    • Rename: 字段重命名
    • Update / Replace: 字段内容更新替换
    • Remove_field: 字段删除
  • Logstash 导入数据到 ES
input { file { path => "/home/es/logstash‐7.17.3/dataset/movies.csv"start_position => "beginning"sincedb_path => "/dev/null"}
}filter {csv {separator => ","columns => ["id","content","genre"] }mutate {split => { "genre" => "|" }remove_field => ["path", "host","@timestamp","message"] }mutate {split => ["content", "("]add_field => { "title" => "%{[content][0]}"} add_field => { "year" => "%{[content][1]}"} }mutate {convert => {"year" => "integer"}strip => ["title"]remove_field => ["path", "host","@timestamp","message","content"] }
}output {elasticsearch {hosts => "http://localhost:9200" index => "movies"document_id => "%{id}"user => "elastic"password => "123456"}stdout { }
}
  • 同步数据库数据到 ES: 借助 JDBC Input Plugin 将数据从数据库读到 Logstash
    • 需要自己提供所需的 JDBC Driver
    • JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler
      • 其扩展了 Cron,使用 Cron 的语法可以完成任务的触发
    • JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新
    • 需要拷贝 jdbc 依赖到 logstash/drivers 目录下
    • ES 创建 alias,只显示没有被标记 deleted 的用户
      • 需要数据库表中有 deleted 字段
input { jdbc { jdbc_driver_library => "/home/es/logstash‐7.17.3/drivers/mysql‐connector-java‐5.1.49.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false"jdbc_user => "root"jdbc_password => "123456"# 启用追踪,如果为true,则需要指定tracking_columnuse_column_value => true# 指定追踪的字段,tracking_column => "last_updated"# 追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型 tracking_column_type => "numeric"# 记录最后一次运行的结果record_last_run => true# 上面运行结果的保存位置last_run_metadata_path => "jdbc‐position.txt"statement => "SELECT * FROM user where last_updated >:sql_last_value;" schedule => " * * * * * *"}
}output {elasticsearch {document_id => "%{id}"document_type => "_doc"index => "users"hosts => ["http://localhost:9200"] user => "elastic"password => "123456"}stdout{codec => rubydebug}
}
# 创建 alias,只显示没有被标记 deleted 的用户
POST /_aliases
{"actions":[{"add":{"index":"users","alias":"view_users","filter":{"term":{"is_deleted":0}}}}]}

FileBeat

  • Beats 轻量型数据采集器是一个免费且开放的平台,集合了多种单一用途的数据采集器
  • FileBeat 专门用于转发和收集日志数据的轻量级采集工具
    • 它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据
    • 并将收集到的日志转发到Elasticsearch或者Logstash
  • FileBeat 的工作原理
    • ![[../../assets/Attachment/Pasted image 20240527042649.png | 400]]

    • 启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置

    • FileBeat会针对每一个文件启动一个Harvester(收割机)

    • Harvester读取每一个文件的日志,将新的日志发送到libbeat

    • libbeat将数据收集到一起,并将数据发送给输出(Output)

  • logstash vs FileBeat
    • Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级
    • Logstash 和Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
    • Logstash 具有Filter功能,能过滤分析日志
    • 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中
    • FileBeat和Logstash配合,实现背压机制
      • 当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量
      • 如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度
      • 一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据
  • filebeat.yml
 output.elasticsearch:hosts: ["192.168.65.174:9200","192.168.65.192:9200","192.168.65.204:9200"]username: "elastic"password: "123456"setup.kibana:host: "192.168.65.174:5601"
  • 启用和配置数据收集模块
# 查看可以模块列表
./filebeat modules list
# 启用 nginx 模块
./filebeat modules enable nginx
# 启用 Logstash 模块
./filebeat modules enable logstash
# setup 命令加载 Kibana 仪表板。 如果仪表板已经设置,则忽略此命令。 
./filebeat setup
# 启动 Filebeat
./filebeat ‐e
# 如果需要更改 nginx 日志路径,修改 modules.d/nginx.yml 
‐ module: nginxaccess:var.paths: ["/var/log/nginx/access.log*"]
# 在 modules.d/logstash.yml 文件中修改设置
‐ module: logstashlog:enabled: truevar.paths: ["/home/es/logstash‐7.17.3/logs/*.log"]

ELK

  • 集中化日志管理思路:日志收集 -> 格式化分析 -> 检索和可视化 -> 风险告警
  • ELK架构分为两种,一种是经典的ELK,另外一种是加上消息队列(Redis或Kafka或RabbitMQ)和Nginx结构
    • 经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成

      • 适用于数据量小的开发环境,存在数据丢失的危险
      • 在这里插入图片描述
    • 整合消息队列+Nginx架构:主要加上了Redis或Kafka或RabbitMQ做消息队列,保证了消息的不丢失

      • 主要用在生产环境,可以处理大数据量,并且不会丢失数据
      • 在这里插入图片描述

采集 Tomcat 服务器日志

  • 使用FileBeats将日志发送到Logstash
    • 因为Tomcat的web log日志都是以IP地址开头的,所以我们需要把不以ip地址开头的行追加到上一行
    • multiline 多行日志
      • pattern:正则表达式
      • negate:true 或 false
        • 默认是false,匹配pattern的行合并到上一行
        • true,不匹配pattern的行合并到上一行
      • match:after 或 before,合并到上一行的末尾或开头
vim filebeat‐logstash.yml
chmod 644 filebeat‐logstash.yml
./filebeat ‐e ‐c filebeat‐logstash.yml
filebeat.inputs:‐ type: logenabled: truepaths:‐ /home/es/apache‐tomcat‐8.5.33/logs/*access*.*multiline.pattern: '^\\d+\\.\\d+\\.\\d+\\.\\d+ 'multiline.negate: truemultiline.match: afteroutput.logstash:enabled: truehosts: ["192.168.65.204:5044"]
  • Logstash输出数据到Elasticsearch
    • filebeat‐elasticSearch.conf
    • bin/logstash ‐f config/filebeat‐elasticSearch.conf ‐‐config.reload.automatic
  • ES中会生成一个以logstash开头的索引
input {beats {port => 5044}
}output {elasticsearch {hosts => ["http://localhost:9200"] user => "elastic"password => "123456"}stdout{codec => rubydebug}
}
  • 利用Logstash过滤器解析日志
    • bin/logstash‐plugin list
  • Grok插件:Grok是一种将非结构化日志解析为结构化的插件
    • 适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式
    • Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式
    • 它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配
    • 可以使用Kibana来进行Grok开发
  • Grok语法 %{SYNTAX:SEMANTIC}
    • SYNTAX(语法)指的是Grok模式名称
    • SEMANTIC(语义)是给模式匹配到的文本字段名
    • %{NUMBER:duration} %{IP:client}
      • duration表示:匹配一个数字,client表示匹配一个IP地址
    • 默认在Grok中,所有匹配到的的数据类型都是字符串
      • 转换成int类型(目前只支持int和float): %{NUMBER:duration:int} %{IP:client}
filter {grok {
# 192.168.65.103 ‐ ‐ [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs‐stylesheet.css HTTP/1.1" 200 5780match => { "message" => "%{IP:ip} ‐ ‐ \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status} %{INT:length}" }}
}
  • 使用mutate插件过滤掉不需要的字段
mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
  • 要将日期格式进行转换,可以使用Date插件来实现
filter {date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy‐MM‐dd HH:mm:ss"]target => "date"}
}
  • 输出到Elasticsearch指定索引
    • index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}
    • 要在index中使用时间格式化,filter的输出必须包含 @timestamp 字段,否则将无法解析日期
  • 注意:index名称中,不能出现大写字符
output {elasticsearch {index => "tomcat_web_log_%{+YYYY‐MM}" hosts => ["http://localhost:9200"]user => "elastic"password => "123456"}stdout{codec => rubydebug }
}
  • 完整的Logstash配置文件
    • bin/logstash ‐f config/filebeat‐filter‐es.conf ‐‐config.reload.automatic
input {beats {port => 5044 }
}filter {grok {match => {"message" => "%{IP:ip} ‐ ‐ \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"}}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy‐MM‐dd HH:mm:ss"]target => "date"}
}output {stdout {codec => rubydebug}elasticsearch {index => "tomcat_web_log_%{+YYYY‐MM}" hosts => ["http://localhost:9200"]user => "elastic"password => "123456"} 
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • modelsim 关闭 warning 的方法
  • Linux系统下载并配置vscode(无废话)写C++
  • Spring事务和事务传播机制(下)
  • RK3588 系列之4—入门级完整demo项目
  • 银行创新技术应用系统概览(一)
  • linux基础IO——动静态库——实现与应用学习、原理深入详解
  • 【C语言可变参数函数的使用与原理分析】
  • 搭建VUE+VScode+elementUI环境遇到的问题
  • 【每日一题】【平衡树】【__gnu_pbds :: tree】小红的中位数 牛客周赛 Round 29 D题 C++
  • Rust: Web框架Axum和Rest Client协同测试
  • 常见概念 -- 非线性效应
  • FPGA随记——8B/10B编码
  • 倍福——ADS协议解析及C语言读写库
  • 2024年6月第2套英语四级真题PDF
  • 第二章 深信服超融合测试历程第二天
  • oschina
  • PHP那些事儿
  • React-Native - 收藏集 - 掘金
  • RedisSerializer之JdkSerializationRedisSerializer分析
  • storm drpc实例
  • 从零开始的webpack生活-0x009:FilesLoader装载文件
  • 关于List、List?、ListObject的区别
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 基于 Ueditor 的现代化编辑器 Neditor 1.5.4 发布
  • 聊聊sentinel的DegradeSlot
  • 免费小说阅读小程序
  • 批量截取pdf文件
  • 三分钟教你同步 Visual Studio Code 设置
  • 体验javascript之美-第五课 匿名函数自执行和闭包是一回事儿吗?
  • 译自由幺半群
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • NLPIR智能语义技术让大数据挖掘更简单
  • 第二十章:异步和文件I/O.(二十三)
  • ​secrets --- 生成管理密码的安全随机数​
  • ​什么是bug?bug的源头在哪里?
  • # 执行时间 统计mysql_一文说尽 MySQL 优化原理
  • ## 临床数据 两两比较 加显著性boxplot加显著性
  • (1)Jupyter Notebook 下载及安装
  • (C语言)二分查找 超详细
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (二刷)代码随想录第16天|104.二叉树的最大深度 559.n叉树的最大深度● 111.二叉树的最小深度● 222.完全二叉树的节点个数
  • (附源码)ssm考试题库管理系统 毕业设计 069043
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • (转)Linq学习笔记
  • .Net - 类的介绍
  • .NET CF命令行调试器MDbg入门(二) 设备模拟器
  • .NET Project Open Day(2011.11.13)
  • .NET Standard 支持的 .NET Framework 和 .NET Core
  • .net 获取某一天 在当月是 第几周 函数
  • .NetCore 如何动态路由
  • .NET开发不可不知、不可不用的辅助类(一)
  • .net企业级架构实战之7——Spring.net整合Asp.net mvc
  • .NET企业级应用架构设计系列之结尾篇