当前位置: 首页 > news >正文

Kafka 优化问题

1.如何防止消息丢失

  • Kakfa本身配置层面
  1. replication.factor 默认值1
    创建kafka的topic时,设置副本数,根据broker数量设置,通常设置成3
  2. min.insync.replicas 默认值1
    消息至少要被多少个副本落盘成功才返回ack给生成者,小于replication.factor的值,当request.required.acks = all时生效
  • Kafka生产者层面
  1. 添加发送消息,成功和失败的回调处理逻辑。
  2. request.required.acks 设置成1 或 all
  3. 设置重试次数
    retries 默认是0,不重试
  • Kafka消费者层面
  1. 修改成手动提交,默认是自动提交。
    enable-auto-commit: false
  2. 手动提交offset
    使用kafka的Consumer的方法commitSync()提交
    或者
    spring-kafka的Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)

2.如何防止重复消费

Producer生产者

如果生产者发完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了,此时生产会进行重试,于是broker就会收到多条相同的消息,而造成消费者重复消费

  • 设置幂等性参数

Kafka的幂等性就是为了避免出现生产者重试的时候出现重复写入消息的情况。

开启幂等性功能配置(该配置默认为false)如下

prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
  • 关闭重试
    会造成丢消息(不建议)

Consumer消费方

  • 解决非幂等性消费问题

幂等性:多次访问的结果时一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等),delete(幂等))

  • 将消费的消息在消费方保存,进行检查时候已消费

解决方法:可以针对消息生成md5等保存在mysql(唯一索引)或者redis里面,在处理之前去mysql或者redis里面判断是否已经消费过。这个也是幂等性的思想

  • rebalance问题

默认情况下,消费完消息会提交offset给kafka,避免消费,

场景1:当consumer消费完消息,但是没有返回offset时,Consumser挂了,触发rebalance,则会出现重复消费

场景2:当一个消费组在消耗一个poll到的消息时,超过了设定的poll的间隔时间,则kafka会剔除此消费者,触发rebalance,导致Offset提交失败。Rebalance以后还是会从消费者之前的Offset处消费消息

解决方法:提高消费端的性男女,调整消息处理的超时时间,或者较少一次poll的条数。

3.如何做到消息顺序消费

生产者

保证消息按顺序发送给kafka,且消息不丢失

  • 设置幂等性和消息顺序性
//保证幂等性、消息顺序性
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaProducerProp.getMaxInFlightRequestsPerConnection());
  • 可以手动发送消息,自己控制发送到kafka的顺序

消费者

必须是相同的主题,同一个主题,必须发送到同一个分区,并且并需保证Kafka中的消息是顺序的,因为同一个主题,同一个分区,一个消费组,只会有一个消费者消费消息,就可以实现顺序消费

kafka的顺序消费使用场景不多,因为牺牲了性能,但是rocketmq有专门的功能保证顺序性

4.如果解决消息积压问题

消息的消费速度远赶不上生产者的生成消息的速度,随着没有被消费的数据堆积越来越多,消费的寻址性能会越来越差,最后导致整个kafka的对外服务性能降低,从而印象其他服务访问速度,造成雪崩。

  • 先重新启动消费任务,排查是否是宕机原因
  • 在消费者中,使用多线程,充分利用机器性能进行消费
  • 跳过滞后历史消息,直接消费最新消息,采用离线程序进行补漏
  • 创建新的topic并配置更多数量的分区,将积压消息的topic消费者逻辑修改为直接把消息打入新的topic,将消费逻辑写在新的topic的消费者中
  • 分区数量设置不合理或者消费能力不足
    如果数据量很大,消费能力不足,则添加topic的partition个数,同时添加消费组的消费者数量
  • 如果设置了消息的key,查看是否是因为key导致数据倾斜。合理的key

5.延时队列的效果 

kafka本身是没有延迟队列的功能的,RabbitMQ、RocketMQ有延迟队列的功能。

可以有一下的解决方案来实现延迟加载

1.在发送延迟消息时不直接发送到目标topic,而是发送到一个处理延迟消息的topic,例如delay-minutes-1

2.写一段代码拉取delay-minutes-1中的消息,将满足条件的的消息发送给真正的主题里

因为kafka配置“max.poll.interval.ms”则,如果不在设定的时间内处理完消息则视为消费者挂掉,会进行rebalance,KafaConsumer提供了暂停和恢复的API,调用暂停就无法拉取新的消息,同时长时间不消费也不会认为消费者挂掉

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;

@SpringBootTest
public class DelayQueueTest {

    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private volatile Boolean exit = false;
    private final Object lock = new Object();
    private final String servers = "";

    @BeforeEach
    void initConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @BeforeEach
    void initProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    @Test
    void testDelayQueue() throws JsonProcessingException, InterruptedException {
        String topic = "delay-minutes-1";
        List<String> topics = Collections.singletonList(topic);
        consumer.subscribe(topics);

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                synchronized (lock) {
                    consumer.resume(consumer.paused());
                    lock.notify();
                }
            }
        }, 0, 1000);

        do {

            synchronized (lock) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));

                if (consumerRecords.isEmpty()) {
                    lock.wait();
                    continue;
                }

                boolean timed = false;
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    long timestamp = consumerRecord.timestamp();
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    if (timestamp + 60 * 1000 < System.currentTimeMillis()) {

                        String value = consumerRecord.value();
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(value);
                        JsonNode jsonNodeTopic = jsonNode.get("topic");

                        String appTopic = null, appKey = null, appValue = null;

                        if (jsonNodeTopic != null) {
                            appTopic = jsonNodeTopic.asText();
                        }
                        if (appTopic == null) {
                            continue;
                        }
                        JsonNode jsonNodeKey = jsonNode.get("key");
                        if (jsonNodeKey != null) {
                            appKey = jsonNode.asText();
                        }

                        JsonNode jsonNodeValue = jsonNode.get("value");
                        if (jsonNodeValue != null) {
                            appValue = jsonNodeValue.asText();
                        }
                        // send to application topic
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
                        try {
                            producer.send(producerRecord).get();
                            // success. commit message
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
                            HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
                            metadataHashMap.put(topicPartition, offsetAndMetadata);
                            consumer.commitSync(metadataHashMap);
                        } catch (ExecutionException e) {
                            consumer.pause(Collections.singletonList(topicPartition));
                            consumer.seek(topicPartition, consumerRecord.offset());
                            timed = true;
                            break;
                        }
                    } else {
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        timed = true;
                        break;
                    }
                }

                if (timed) {
                    lock.wait();
                }
            }
        } while (!exit);

    }
}

相关文章:

  • 【opencv-c++】windows10系统VisualStudio2022配置opencv_contrib-4.6.0
  • windows安装动力学仿真软件Frost并计算cassie机器人运动学和动力学
  • 使用 SolidJS 和 TypeScript 构建任务跟踪器
  • 【C++】list的模拟实现
  • 【Kotlin基础系列】第4章 类型
  • Vm虚拟机安装Linux系统教程
  • Java设计模式-单列模式
  • 算法 | 算法是什么?深入精讲
  • C++虚函数具体实现机制以及纯虚函数和抽象类(对多态的补充)
  • Trusted Applications介绍
  • Python函数与参数
  • C++发布订阅模式
  • CentOS7 下载安装卸载 Docker——Docker启动关闭
  • iOS 集成Jenkins 完整流程 (自由风格)
  • okcc呼叫中心所选的客户服务代表应该具备什么条件?
  • 【Under-the-hood-ReactJS-Part0】React源码解读
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • CSS实用技巧干货
  • JSDuck 与 AngularJS 融合技巧
  • Laravel5.4 Queues队列学习
  • Linux快速配置 VIM 实现语法高亮 补全 缩进等功能
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • Vue ES6 Jade Scss Webpack Gulp
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 理解 C# 泛型接口中的协变与逆变(抗变)
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 一个JAVA程序员成长之路分享
  • 如何在 Intellij IDEA 更高效地将应用部署到容器服务 Kubernetes ...
  • ​ArcGIS Pro 如何批量删除字段
  • ​一、什么是射频识别?二、射频识别系统组成及工作原理三、射频识别系统分类四、RFID与物联网​
  • # Swust 12th acm 邀请赛# [ E ] 01 String [题解]
  • #调用传感器数据_Flink使用函数之监控传感器温度上升提醒
  • (1) caustics\
  • (6)设计一个TimeMap
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (教学思路 C#之类三)方法参数类型(ref、out、parmas)
  • (经验分享)作为一名普通本科计算机专业学生,我大学四年到底走了多少弯路
  • (四)汇编语言——简单程序
  • (正则)提取页面里的img标签
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)甲方乙方——赵民谈找工作
  • (转载)CentOS查看系统信息|CentOS查看命令
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .NET Core 中插件式开发实现
  • .NET Core实战项目之CMS 第十二章 开发篇-Dapper封装CURD及仓储代码生成器实现
  • .NET Framework 4.6.2改进了WPF和安全性
  • .Net 代码性能 - (1)
  • .NET/C# 编译期间能确定的相同字符串,在运行期间是相同的实例
  • .netcore如何运行环境安装到Linux服务器
  • .NET程序员迈向卓越的必由之路
  • .net的socket示例
  • .net反编译的九款神器
  • .NET开发人员必知的八个网站
  • .net通用权限框架B/S (三)--MODEL层(2)
  • /*在DataTable中更新、删除数据*/