当前位置: 首页 > news >正文

Apache Kafka(九)- Kafka Consumer 消费行为

1. Poll Messages

Kafka Consumer 中消费messages时,使用的是poll模型,也就是主动去Kafka端取数据。其他消息管道也有的是push模型,也就是服务端向consumer推送数据,consumer仅需等待即可。

Kafka Consumerpoll模型使得consumer可以控制从log的指定offset去消费数据、消费数据的速度、以及replay events的能力。

Kafka Consumer poll模型工作如下图:

 

 

  • ·       Consumer 调用.poll(Duration timeout) 方法,向broker请求数据
  • ·       若是broker端有数据则立即返回;否则在timeout时间后返回empty

 

我们可以通过参数控制 Kafka Consumer 行为,主要有:

  • ·       Fetch.min.bytes(默认值是1

o   控制在每个请求中,至少拉取多少数据

o   增加此参数可以提高吞吐并降低请求的数目,但是代价是增加延时

 

  • ·       Max.poll.records(默认是500

o   控制在每个请求中,接收多少条records

o   如果消息普遍都比较小而consumer端又有较大的内存,则可以考虑增大此参数

o   最好是监控在每个请求中poll了多少条消息

 

  • ·       Max.partitions.fetch.bytes(默认为1MB

o   Broker中每个partition可返回的最多字节

o   如果目标端有100多个partitions,则需要较多内存

 

  • ·       Fetch.max.bytes(默认50MB

o   对每个fetch 请求,可以返回的最大数据量(一个fetch请求可以覆盖多个partitions

o   Consumer并行执行多个fetch操作

 

默认情况下,一般不建议手动调整以上参数,除非我们的consumer已经达到了默认配置下的最高的吞吐,且需要达到更高的吞吐。

 

2. Consumer Offset Commit 策略

在一个consumer 应用中,有两种常见的committing offsets的策略,分别为:

  • ·       (较为简单)enable.auto.commit = true:自动commit offsets,但必须使用同步的方式处理数据
  • ·       (进阶)enable.auto.commit = false:手动commit offsets

 

在设置enable.auto.commit = true时,考虑以下代码:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     doSomethingSynchronous(batch);
 }

 

一个Consumer 每隔100ms poll一次消息,然后以同步地方式处理这个batch的数据。此时offsets 会定期自动被commit,此定期时间由 auto.commit.interval.ms 决定,默认为 5000,也就是在每次调用 .poll() 方法 5 秒后,会自动commit offsets

但是如果在处理数据时用的是异步的方式,则会导致“at-most-once”的行为。因为offsets可能会在数据被处理前就被commit

所以对于新手来说,使用 enable.auto.commit = true 可能是有风险的,所以不建议一开始就使用这种方式

 

若设置 enable.auto.commit = false,考虑以下代码:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     if isReady(batch){
         doSomethingSynchronous(batch);
         consumer.commitSync();
     }
 }

  

此例子明确指示了在同步地处理了数据后,再主动commit offsets。这样我们可以控制在什么条件下,去commit offsets。一个比较典型的场景为:将接收的数据读入缓存,然后flush 缓存到一个数据库中,最后再commit offsets

 

3. 手动Commit Offset 示例

首先我们关闭自动commit offsets

// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

指定每个请求最多接收10records,便于测试:
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

添加以下代码逻辑:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
             String id = indexResponse.getId();
 
             logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

这里我们在处理每次获取的10records后(也就是for 循环完整执行一次),手动执行一次offsets commit。打印日志记录为:

 

手动停止consumer 程序后,可以看到最后的committed offsets165

  

使用consumer-group cli 也可以验证当前committed offsets165

  

4. Performance Improvement using Batching

在这个例子中,consumer 限制每次poll 10条数据,然后每条依次处理(插入elastic search)。此方法效率较低,我们可以通过使用 batching 的方式增加吞吐。这里实现的方式是使用 elastic search API 提供的BulkRequest,基于之前的代码,修改如下:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         // bulk request
         BulkRequest bulkRequest = new BulkRequest();
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 
             // add to our bulk request (takes no time)
             bulkRequest.add(indexRequest);
 
 
             //String id = indexResponse.getId();
             //logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         // bulk response
         BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

   

可以看到,consumerpoll到记录后,并不会一条条的向elastic search 发送,而是将它们放入一个BulkRequest,并在for循环结束后发送。在发送完毕后,再手动commit offsets

 

执行结果为:

 

转载于:https://www.cnblogs.com/zackstang/p/11515203.html

相关文章:

  • xray写POC踩坑
  • 对 Watchbog Botnet 渗透过程和 Payload 的分析
  • c++ 初学者 慢慢成长中
  • max pool实现
  • Kafka Stream 以及其他流处理框架对比
  • cpp 面向对象初步探索
  • cpp 实现简易String类
  • Apache Kafka(十)Partitions与Replication Factor 调整准则
  • 蒜头君的购物袋1、蒜头君的购物袋2-(01背包)
  • vue页面传参
  • SSM框架-Spring
  • 端口扫描
  • js 条件方法、数组方法
  • 利用Python原始库完成一个端口扫描的功能
  • 服务器spring boot版本,平滑升级
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 【每日笔记】【Go学习笔记】2019-01-10 codis proxy处理流程
  • golang 发送GET和POST示例
  • Java-详解HashMap
  • js作用域和this的理解
  • KMP算法及优化
  • MYSQL 的 IF 函数
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Sass 快速入门教程
  • thinkphp5.1 easywechat4 微信第三方开放平台
  • 浮动相关
  • 经典排序算法及其 Java 实现
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 面试遇到的一些题
  • 浅谈web中前端模板引擎的使用
  • 全栈开发——Linux
  • 如何选择开源的机器学习框架?
  • 项目实战-Api的解决方案
  • ​2021半年盘点,不想你错过的重磅新书
  • $.ajax中的eval及dataType
  • (a /b)*c的值
  • (Mac上)使用Python进行matplotlib 画图时,中文显示不出来
  • (二)Linux——Linux常用指令
  • (接口自动化)Python3操作MySQL数据库
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • (三)c52学习之旅-点亮LED灯
  • (十八)三元表达式和列表解析
  • (转) ns2/nam与nam实现相关的文件
  • (转)chrome浏览器收藏夹(书签)的导出与导入
  • (转)JAVA中的堆栈
  • (转贴)用VML开发工作流设计器 UCML.NET工作流管理系统
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .Net mvc总结
  • .net反编译工具
  • .net访问oracle数据库性能问题
  • .NET序列化 serializable,反序列化
  • /etc/shadow字段详解
  • ?
  • ?php echo $logosrc[0];?,如何在一行中显示logo和标题?
  • [ C++ ] STL_list 使用及其模拟实现