当前位置: 首页 > news >正文

5、Kafka海量日志收集系统架构设计

 

5.1、 Kafka海量日志收集实战-log4j2日志输出实战

 5.1.1、新建SpringBoot工程并引入引入maven相关依赖。

pom.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Kafka</artifactId>
        <groupId>com.lvxiaosha</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>collector</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!-- 	排除spring-boot-starter-logging -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- log4j2 -->
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
            <version>2.7.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.12</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>collector</finalName>
        <!-- 打包时包含properties、xml -->
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替换资源中的属性-->
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.lvxiaosha.collector.CollectorApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

 5.1.2、添加工具类:

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.Enumeration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * $NetUtil
 * @author lvxiaosha
 * @since 2022年9月9日 下午4:59:02
 */
public class NetUtil {   
	
	public static String normalizeAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length > 2){
			throw new IllegalArgumentException(address + " is invalid");
		}
		String host = blocks[0];
		int port = 80;
		if(blocks.length > 1){
			port = Integer.valueOf(blocks[1]);
		} else {
			address += ":"+port; //use default 80
		} 
		String serverAddr = String.format("%s:%d", host, port);
		return serverAddr;
	}
	
	public static String getLocalAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length != 2){
			throw new IllegalArgumentException(address + " is invalid address");
		} 
		String host = blocks[0];
		int port = Integer.valueOf(blocks[1]);
		
		if("0.0.0.0".equals(host)){
			return String.format("%s:%d",NetUtil.getLocalIp(), port);
		}
		return address;
	}
	
	private static int matchedIndex(String ip, String[] prefix){
		for(int i=0; i<prefix.length; i++){
			String p = prefix[i];
			if("*".equals(p)){ //*, assumed to be IP
				if(ip.startsWith("127.") ||
				   ip.startsWith("10.") ||	
				   ip.startsWith("172.") ||
				   ip.startsWith("192.")){
					continue;
				}
				return i;
			} else {
				if(ip.startsWith(p)){
					return i;
				}
			} 
		}
		
		return -1;
	}
	
	public static String getLocalIp(String ipPreference) {
		if(ipPreference == null){
			ipPreference = "*>10>172>192>127";
		}
		String[] prefix = ipPreference.split("[> ]+");
		try {
			Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
			Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
			String matchedIp = null;
			int matchedIdx = -1;
			while (interfaces.hasMoreElements()) {
				NetworkInterface ni = interfaces.nextElement();
				Enumeration<InetAddress> en = ni.getInetAddresses(); 
				while (en.hasMoreElements()) {
					InetAddress addr = en.nextElement();
					String ip = addr.getHostAddress();  
					Matcher matcher = pattern.matcher(ip);
					if (matcher.matches()) {  
						int idx = matchedIndex(ip, prefix);
						if(idx == -1) continue;
						if(matchedIdx == -1){
							matchedIdx = idx;
							matchedIp = ip;
						} else {
							if(matchedIdx>idx){
								matchedIdx = idx;
								matchedIp = ip;
							}
						}
					} 
				} 
			} 
			if(matchedIp != null) return matchedIp;
			return "127.0.0.1";
		} catch (Exception e) { 
			return "127.0.0.1";
		}
	}
	
	public static String getLocalIp() {
		return getLocalIp("*>10>172>192>127");
	}
	
	public static String remoteAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getRemoteSocketAddress();
		String res = String.format("%s", addr);
		return res;
	}
	
	public static String localAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getLocalSocketAddress();
		String res = String.format("%s", addr);
		return addr==null? res: res.substring(1);
	}
	
	public static String getPid(){
		RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName();
        int index = name.indexOf("@");
        if (index != -1) {
            return name.substring(0, index);
        }
		return null;
	}
	
	public static String getLocalHostName() {
        try {
            return (InetAddress.getLocalHost()).getHostName();
        } catch (UnknownHostException uhe) {
            String host = uhe.getMessage();
            if (host != null) {
                int colon = host.indexOf(':');
                if (colon > 0) {
                    return host.substring(0, colon);
                }
            }
            return "UnknownHost";
        }
    }
}
import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;

import lombok.extern.slf4j.Slf4j;

/**
 * $FastJsonConvertUtil
 * @author lvxiaosha
 * @since 2022年9月9日 下午4:53:28
 */
@Slf4j
public class FastJsonConvertUtil {

	private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
	        SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };

	/**
	 * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
	 * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:53:49 
	 * @param data JSON字符串
	 * @param clzss 转换对象
	 * @return T
	 */
	public static <T> T convertJSONToObject(String data, Class<T> clzss) {
		try {
			T t = JSON.parseObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}
	
	/**
	 * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
	 * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:54:32
	 * @param data JSONObject对象
	 * @param clzss 转换对象
	 * @return T
	 */
	public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
		try {
			T t = JSONObject.toJavaObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
	 * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:54:50
	 * @param data JSON字符串数组
	 * @param clzss 转换对象
	 * @return List<T>集合对象
	 */
	public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
		try {
			List<T> t = JSON.parseArray(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
	 * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:55:11
	 * @param data List<JSONObject>
	 * @param clzss 转换对象
	 * @return List<T>集合对象
	 */
	public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
		try {
			List<T> t = new ArrayList<T>();
			for (JSONObject jsonObject : data) {
				t.add(convertJSONToObject(jsonObject, clzss));
			}
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将对象转为JSON字符串<BR>
	 * <B>概要说明:</B>将对象转为JSON字符串<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:55:41
	 * @param obj 任意对象
	 * @return JSON字符串
	 */
	public static String convertObjectToJSON(Object obj) {
		try {
			String text = JSON.toJSONString(obj);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSON Exception", e);
			return null;
		}
	}
	
	/**
	 * <B>方法名称:</B>将对象转为JSONObject对象<BR>
	 * <B>概要说明:</B>将对象转为JSONObject对象<BR>
	 * @author lvxiaosha
	 * @since 2022年9月9日 下午4:55:55
	 * @param obj 任意对象
	 * @return JSONObject对象
	 */
	public static JSONObject convertObjectToJSONObject(Object obj){
		try {
			JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
			return jsonObject;
		} catch (Exception e) {
			log.error("convertObjectToJSONObject Exception", e);
			return null;
		}		
	}

	public static String convertObjectToJSONWithNullValue(Object obj) {
		try {
			String text = JSON.toJSONString(obj, featuresWithNullValue);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSONWithNullValue Exception", e);
			return null;
		}
	}
}
import org.jboss.logging.MDC;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
public class InputMDC implements EnvironmentAware {

	private static Environment environment;
	
	@Override
	public void setEnvironment(Environment environment) {
		InputMDC.environment = environment;
	}
	
	public static void putMDC() {
		MDC.put("hostName", NetUtil.getLocalHostName());
		MDC.put("ip", NetUtil.getLocalIp());
		MDC.put("applicationName", environment.getProperty("spring.application.name"));
	}

}

 5.1.3、编写IndexController

import com.lvxiaosha.collector.util.InputMDC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class IndexController {

	
	/**
	 * [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]
	 * [%level{length=5}] 
	 * [%thread-%tid] 
	 * [%logger] 
	 * [%X{hostName}] 
	 * [%X{ip}] 
	 * [%X{applicationName}] 
	 * [%F,%L,%C,%M] 
	 * [%m] ## '%ex'%n
	 * -----------------------------------------------
	 * [2019-09-18T14:42:51.451+08:00]
	 * [INFO] 
	 * [main-1] 
	 * [org.springframework.boot.web.embedded.tomcat.TomcatWebServer]
	 * [] 
	 * [] 
	 * [] 
	 * [TomcatWebServer.java,90,org.springframework.boot.web.embedded.tomcat.TomcatWebServer,initialize] 
	 * [Tomcat initialized with port(s): 8001 (http)] ## ''
	 * 
	 * ["message", 
	 * "\[%{NOTSPACE:currentDateTime}\] 
	 *  \[%{NOTSPACE:level}\] 
	 *  \[%{NOTSPACE:thread-id}\] 
	 *  \[%{NOTSPACE:class}\] 
	 *  \[%{DATA:hostName}\] 
	 *  \[%{DATA:ip}\] 
	 *  \[%{DATA:applicationName}\]
	 *  \[%{DATA:location}\] 
	 *  \[%{DATA:messageInfo}\] 
	 *  ## (\'\'|%{QUOTEDSTRING:throwable})"]
	 * @return
	 */
	@RequestMapping(value = "/index")
	public String index() {
		InputMDC.putMDC();
		
		log.info("我是一条info日志");
		
		log.warn("我是一条warn日志");

		log.error("我是一条error日志");
		
		return "idx";
	}
	
	
	@RequestMapping(value = "/err")
	public String err() {
		InputMDC.putMDC();
		try {
			int a = 1/0;
		} catch (Exception e) {
			log.error("算术异常", e);
		}
		return "err";
	}
}

 5.1.4、添加application.properties和log4j2.xml配置文件

application.properties内容:

server.servlet.context-path=/
server.port=8001

spring.application.name=collector
server.servlet.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

log4j2.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
    <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
    <!--
    Properties里面可以定义一些变量供下面使用
    Property的LOG_HOME定义了日志输出的目录名称
    Property的FILE_NAME定义了日志输出的文件名称,这里和项目名称一致。
    Property的FILE_NAME定义了日志输出的格式。
    [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]  [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n

    [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]  %d表示时间,后面的中括号里面定义了时间格式。
    [%level{length=5}]  %level表示日志级别。
    [%thread-%tid]  线程id
    [%logger]   日志输出的具体信息
    [%X{hostName}]  %X表示自定义hostName
    [%X{ip}]    %X表示自定义ip
    [%X{applicationName}]   %X表示自定义applicationName
    [%F,%L,%C,%M]   %F表示当前执行的类是哪个文件,%L表示行号,%C代表class,%M代表Method
    [%m] ## '%ex'%n     %m表示message,是日志输出的内容。   %ex表示异常信息   %n表示换行

    -->
    <Properties>
        <Property name="LOG_HOME">D:\1-code\0-learning\1-back-end\996-dev\all-learning\Kafka\collector\logs</Property>
        <property name="FILE_NAME">collector</property>
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
        <!-- 控制台显示的日志最低级别 -->
        <property name="console_print_level">DEBUG</property>

    </Properties>

    <!-- Appenders表示输出的组件有哪些 -->
    <Appenders>
        <!--定义输出到控制台的日志格式为上面定义的patternLayout格式-->
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <!-- 设置控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
            <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="${patternLayout}"/>
        </Console>
        <!--
            fileName定义了日志路径及名称,最后输出的文件是:logs/app-collector.log
            app-collector.log里面输出我们的全量日志
            filePattern定义了日志的时间节点
        -->
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <!-- 500MB一个日志文件 -->
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>

        <!--
            fileName定义了日志路径及名称,最后输出的文件是:logs/error-collector.log
            error-collector.log里面输出我们的错误日志
            filePattern定义了日志的时间节点
        -->
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <!--
            Filters对日志进行过滤,只有warn级别及以上级别的日志才会收集。
          -->
          <Filters>
              <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
          </Filters>              
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>            
    </Appenders>
    <Loggers>
        <logger name="com.cm.server.busi" level="debug" additivity="false">
            <appender-ref ref="CONSOLE"/>
            <appender-ref ref="appAppender"/>
            <appender-ref ref="errorAppender"/>
        </logger>

        <!-- 业务相关 异步logger -->
        <AsyncLogger name="com.lvxiaosha.*" level="info" includeLocation="true">
          <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.lvxiaosha.*" level="info" includeLocation="true">
          <AppenderRef ref="errorAppender"/>
        </AsyncLogger>       
        <Root level="info">
            <Appender-Ref ref="CONSOLE"/>
            <Appender-Ref ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>         
    </Loggers>
</Configuration>

5.2、Kafka海量日志搜集实战_filebeat日志收集实战

5.2.1 Filebeat8.2.3安装

参考文章:Filebeat 的学习笔记_白居不易.的博客-CSDN博客_filebeat 无法执行二进制文件

下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.2.3-linux-x86_64.tar.gz

cd /home/software/11-Filebeat/

tar -zxvf filebeat-8.2.3-linux-x86_64.tar.gz -C /usr/local/

cd /usr/local

mv filebeat-8.2.3-linux-x86_64/ filebeat-8.2.3

## 配置filebeat,可以参考filebeat.full.yml中的配置。

vim /usr/local/filebeat-8.2.3/filebeat.yml

###################### Filebeat Configuration Example #########################
filebeat.prospectors:

- input_type: log

  paths:
    ## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
    - /usr/local/logs/app-collector.log
  #定义写入 ES 时的 _type 值
  document_type: "app-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                             # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: app-log-collector   ## 按服务划分用作kafka topic
    evn: dev

- input_type: log

  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                             # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: error-log-collector   ## 按服务划分用作kafka topic
    evn: dev
    
output.kafka:
  enabled: true
  hosts: ["192.168.110.130:9092"]
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

filebeat启动:

## 检查配置是否正确

cd /usr/local/filebeat-8.2.3

./filebeat test config

## 启动zookeeper:

zkServer.sh start

查看zookeeper状态:

zkServer.sh status

## 启动kafka:

/usr/local/kafka_2.13/bin/kafka-server-start.sh /usr/local/kafka_2.13/config/server.properties &

## 查看topic列表:

kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --list

## 创建topic

kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --create --topic app-log-collector --partitions 1  --replication-factor 1

kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --create --topic error-log-collector --partitions 1  --replication-factor 1  


## 查看topic情况

kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --topic app-log-collector --describe

kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --topic error-log-collector --describe

## 启动filebeat

cd /usr/local/filebeat-8.2.3/

filebeat &

ps -ef | grep filebeat

Kibana8.2.2安装

下载地址:https://artifacts.elastic.co/downloads/kibana/kibana-8.2.3-linux-x86_64.tar.gz

相关文章:

  • Linux命令--压缩/解压缩--使用/实例
  • Lua脚本如何调用C/C++模块,Windows以及Linux版本演示
  • springboot+jsp球队球员比赛数据管理系统java
  • upload-labs靶场通关指南(9-11关)
  • 【Arduino+ESP32专题】案例:使用INA3221监控电压电流
  • 微信小程序——语法篇
  • 【数据结构】交换排序之冒泡排序与快速排序
  • 第二十七章 使用后台任务页面
  • 【Hive】建表时的存储格式
  • 计算机网络 | 计算机网络体系结构
  • 【云原生】Docker的安装和卸载
  • 古琴【A5】良宵引
  • Causality
  • 【云原生 • Kubernetes】kubernetes 核心技术 - 持久化存储
  • 剑指offer--重建二叉树
  • [LeetCode] Wiggle Sort
  • CentOS 7 修改主机名
  • co.js - 让异步代码同步化
  • crontab执行失败的多种原因
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • github指令
  • JS基础篇--通过JS生成由字母与数字组合的随机字符串
  • webgl (原生)基础入门指南【一】
  • 回顾 Swift 多平台移植进度 #2
  • 嵌入式文件系统
  • 全栈开发——Linux
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 微信小程序--------语音识别(前端自己也能玩)
  • 小程序button引导用户授权
  • 小而合理的前端理论:rscss和rsjs
  • 携程小程序初体验
  • 异步
  • 运行时添加log4j2的appender
  • Spring第一个helloWorld
  • 阿里云移动端播放器高级功能介绍
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • ​LeetCode解法汇总2304. 网格中的最小路径代价
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • #LLM入门|Prompt#1.7_文本拓展_Expanding
  • #NOIP 2014#Day.2 T3 解方程
  • #我与Java虚拟机的故事#连载12:一本书带我深入Java领域
  • $.ajax中的eval及dataType
  • (11)MSP430F5529 定时器B
  • (31)对象的克隆
  • (70min)字节暑假实习二面(已挂)
  • (C语言)求出1,2,5三个数不同个数组合为100的组合个数
  • (floyd+补集) poj 3275
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (阿里云万网)-域名注册购买实名流程
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (转)ABI是什么
  • (转)VC++中ondraw在什么时候调用的
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .gitattributes 文件