当前位置: 首页 > news >正文

基于Kafka的日志采集

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/messages         ###要监控的日志文件
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: 'topic_test1'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/httpd/access_log         ###要监控的日志文件fields:kafka_topic: httpd_access
- type: logenabled: truepaths:- /var/log/httpd/error_log         ###要监控的日志文件fields:kafka_topic: httpd_error
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: '%{[fields.kafka_topic]}'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOFcat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOFsource /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOFsed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.dcat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"topic_test1" type =>"topic_test1"codec =>"json" } 
}
output { if [type] == "topic_test1" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"kafka-system-%{+YYYY.MM.dd}" } }
}EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_access" type =>"httpd_access"codec =>"json" } kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_error" type =>"httpd_error"codec =>"json" }
}
output { if [type] == "httpd_access" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-access-%{+YYYY.MM.dd}" } }if [type] == "httpd_error" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-error-%{+YYYY.MM.dd}" } }
}EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • ArcGIS提取含有计曲线的等高线
  • 设置环境变量,忽略yarn对node的版本检查
  • 行业分析---造车新势力之蔚来汽车
  • [力扣题解] 463. 岛屿的周长
  • 解释JAVA语言中关于方法的重载
  • JMeter正则表达式提取器和JSON提取器基础用法,小白必会!
  • [BT]小迪安全2023学习笔记(第29天:Web攻防-SQL注入)
  • 第一章:操作系统概述
  • CSDN 自动评论互动脚本
  • sudo apt install make;make build ;make start
  • 手撕算法|斯坦福大学教授用60页PPT搞定了八大神经网络
  • mybatis plus 配置多数据源(数据源进行切换)
  • ssm139选课排课系统的设计与开发+vue
  • Pytorch: 解决因pytorch版本不同 导致训练ckpt加载失败
  • FPGA 纯逻辑arinc818 ip core
  • [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  • 【402天】跃迁之路——程序员高效学习方法论探索系列(实验阶段159-2018.03.14)...
  • 【前端学习】-粗谈选择器
  • android 一些 utils
  • CentOS7简单部署NFS
  • centos安装java运行环境jdk+tomcat
  • Computed property XXX was assigned to but it has no setter
  • ECMAScript6(0):ES6简明参考手册
  • EventListener原理
  • KMP算法及优化
  • Node 版本管理
  • 多线程事务回滚
  • 函数式编程与面向对象编程[4]:Scala的类型关联Type Alias
  • 记录:CentOS7.2配置LNMP环境记录
  • 理解在java “”i=i++;”所发生的事情
  • 前端代码风格自动化系列(二)之Commitlint
  • 深入浅出Node.js
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • 要让cordova项目适配iphoneX + ios11.4,总共要几步?三步
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • 7行Python代码的人脸识别
  • 大数据全解:定义、价值及挑战
  • #if 1...#endif
  • #中的引用型是什么意识_Java中四种引用有什么区别以及应用场景
  • $(document).ready(function(){}), $().ready(function(){})和$(function(){})三者区别
  • %3cli%3e连接html页面,html+canvas实现屏幕截取
  • (1)bark-ml
  • (35)远程识别(又称无人机识别)(二)
  • (4)事件处理——(2)在页面加载的时候执行任务(Performing tasks on page load)...
  • (delphi11最新学习资料) Object Pascal 学习笔记---第13章第6节 (嵌套的Finally代码块)
  • (k8s)Kubernetes本地存储接入
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (Redis使用系列) Springboot 整合Redisson 实现分布式锁 七
  • (SpringBoot)第七章:SpringBoot日志文件
  • (ZT)出版业改革:该死的死,该生的生
  • (回溯) LeetCode 78. 子集
  • (南京观海微电子)——示波器使用介绍
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (转)项目管理杂谈-我所期望的新人
  • (转载)hibernate缓存