当前位置: 首页 > news >正文

kafka生产者2

1.数据可靠

• 0:生产者发送过来的数据,不需要等数据落盘应答。 

风险:leader挂了之后,follower还没有收到消息。。。。

• 1:生产者发送过来的数据,Leader收到数据后应答。

风险:leader应答完成之后,还没有开始同步副本。。。。

 

 生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。

可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);

2.数据去重 

 

3.生产者事务

 说明:开启事务,必须开启幂等性。

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerTransaction {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);kafkaProducer.initTransactions();kafkaProducer.beginTransaction();// 4. 调用 send 方法,发送消息try{for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));int i1 = 1 / 0;}kafkaProducer.commitTransaction();}catch (Exception e){kafkaProducer.abortTransaction();}finally {// 5. 关闭资源kafkaProducer.close();}}
}

4.数据乱序

 

相关文章:

  • [SUCTF 2019]EasySQL1 题目分析与详解
  • 算法打卡day1|数组篇|Leetcode 704.二分查找、27.移除元素
  • OpenAI文生视频大模型Sora概述
  • C 标准库 - <float.h>
  • 【Ubuntu】通过网线连接两台电脑以实现局域网连接的方法
  • 【docker入门】1-
  • 【Java面试】MongoDB
  • (3)llvm ir转换过程
  • GIT中对子仓库的使用方法介绍
  • 软件测试入门(全面认识软件测试)
  • LeetCode24.两两交换链表中的节点
  • 【LNMP】云导航项目部署及环境搭建(复杂)
  • [HTML]Web前端开发技术30(HTML5、CSS3、JavaScript )JavaScript基础——喵喵画网页
  • 【Django开发】0到1开发美多shop项目:用户登录模块开发。全md文档笔记(附代码 文档)
  • Python 高级语法:一切皆对象
  • ----------
  • 9月CHINA-PUB-OPENDAY技术沙龙——IPHONE
  • CSS 专业技巧
  • Go 语言编译器的 //go: 详解
  • Intervention/image 图片处理扩展包的安装和使用
  • iOS小技巧之UIImagePickerController实现头像选择
  • Js基础知识(四) - js运行原理与机制
  • Kibana配置logstash,报表一体化
  • MyEclipse 8.0 GA 搭建 Struts2 + Spring2 + Hibernate3 (测试)
  • PHP 小技巧
  • RxJS: 简单入门
  • ViewService——一种保证客户端与服务端同步的方法
  • 诡异!React stopPropagation失灵
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 前言-如何学习区块链
  • 少走弯路,给Java 1~5 年程序员的建议
  • 最简单的无缝轮播
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 《码出高效》学习笔记与书中错误记录
  • 数据库巡检项
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • #在 README.md 中生成项目目录结构
  • (C语言)二分查找 超详细
  • (附源码)计算机毕业设计SSM疫情下的学生出入管理系统
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (七)Java对象在Hibernate持久化层的状态
  • (全注解开发)学习Spring-MVC的第三天
  • (一) storm的集群安装与配置
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (轉貼) 寄發紅帖基本原則(教育部禮儀司頒布) (雜項)
  • .aanva
  • .NET CLR Hosting 简介
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .NET HttpWebRequest、WebClient、HttpClient
  • .NET 中使用 Mutex 进行跨越进程边界的同步
  • .NET(C#) Internals: as a developer, .net framework in my eyes
  • .net对接阿里云CSB服务
  • .sh
  • :=