当前位置: 首页 > news >正文

SpringBoot集成kafka开发-消息消费的分区策略(消费者如何判断从哪个分区中消费消息的?)

这里写目录标题

  • 1、kafak消息者消费消息的4种分区策略
  • 2、kafka默认的消费分区策略1-RangeAssignor(均匀分配、默认分配策略)
    • 2.1、代码验证RangeAssignor的消息分区策略
    • 2.1.1、消费者
    • 2.1.2、生产者
    • 2.1.3、kafak配置类
    • 2.1.4、对象实体类
    • 2.1.5、项目配置文件application.yml
    • 2.1.6、测试类
    • 2.1.7、测试
  • 3、kafka消费分区策略2-RoundRobinAssignor(轮询分配)
    • 3.1、消费者
    • 3.2、kafka配置类
    • 3.3、项目配置文件application.yml
    • 3.4、其余文件与上一章节一致
    • 3.5、测试
  • 4、StickyAssignor(粘性分配)和CooperativeStickyAssignor(协作分区)消息者消费分区策略
    • 4.1、修改kafka配置类
    • 4.2、kafka配置类

1、kafak消息者消费消息的4种分区策略

推荐采用StickyAssignor(粘性分配)或者CooperativeStickyAssignor(协作分区)
在这里插入图片描述在这里插入图片描述

2、kafka默认的消费分区策略1-RangeAssignor(均匀分配、默认分配策略)

在这里插入图片描述

2.1、代码验证RangeAssignor的消息分区策略

2.1.1、消费者

  • topics = {“myTopic”},消费主题为myTopic下分区这 的消息
  • concurrency = “3”,指定3个消费者
package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {/*** topics 用于指定从哪个主题中消费消息* concurrency 用于指定有多少个消费者* @param record*/@KafkaListener(topics = {"myTopic"}, groupId = "myGroup",concurrency = "3")public void onEventA(ConsumerRecord<String, String> record) {System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);}
}

2.1.2、生产者

向myTopic主题中发送100个消息

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){for (int i = 0; i < 100; i++) {User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("myTopic","k"+i, userJson);}}}

2.1.3、kafak配置类

项目启动,自动创建myTopic主题,分配10个分区

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaConfig {@Beanpublic NewTopic newTopic(){return new NewTopic("myTopic",10, (short)1);}
}

2.1.4、对象实体类

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

2.1.5、项目配置文件application.yml

spring:application:#应用名称name: spring-boot-06-kafka-ConsumerPartitonStrategy#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消费者的反序列化consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest

2.1.6、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot06KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2.1.7、测试

  • 先启动生产者,向myTopic主题中9个分区中发送100条消息

  • 再启动消费者监听

  • 通过打印发现,因为我们指定了三个消费者,所以共有3个Thread.currentThread().getId()线程ID启动,进行消息消费

在这里插入图片描述

  • 其中线程ID为27线程只读取0、1、2、3分区消息

在这里插入图片描述在这里插入图片描述

  • 其中线程ID为29线程只读取4、5、6分区消息

在这里插入图片描述
在这里插入图片描述

  • 其中线程ID为31线程只读取7、8、9分区消息

在这里插入图片描述
在这里插入图片描述

3、kafka消费分区策略2-RoundRobinAssignor(轮询分配)

3.1、消费者

  • containerFactory = "ourKafkaListenerContainerFactory"指定使用自己创建的消费者消费工厂
package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {/*** topics 用于指定从哪个主题中消费消息* concurrency 用于指定有多少个消费者* @param record*/@KafkaListener(topics = {"myTopic"}, groupId = "myGroup",concurrency = "3",containerFactory = "ourKafkaListenerContainerFactory")public void onEventA(ConsumerRecord<String, String> record) {System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);}
}

3.2、kafka配置类

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;/*** 消费者相关配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);//指定使用轮询的消息消费分区器props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());return props;}/*** 消费者创建工厂*/@Beanpublic ConsumerFactory<String,String> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfigs());}/*** 创建监听器容器工厂* @param ourConsumerFactory* @return*/@Beanpublic KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String,String> ourConsumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();listenerContainerFactory.setConsumerFactory(ourConsumerFactory);return listenerContainerFactory;}@Beanpublic NewTopic newTopic(){return new NewTopic("myTopic",10, (short)1);}
}

3.3、项目配置文件application.yml

spring:application:#应用名称name: spring-boot-06-kafka-ConsumerPartitonStrategy#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消费者的反序列化consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest

3.4、其余文件与上一章节一致

3.5、测试

  • 先启动消费者,总共有ID为33、35、37的三个消费者
  • 再启动生产者

测试发现,三个消费者轮询消费9个分区中的消息:

  • ID为33的消费者消费的时0、3、6、9分区的消息
  • ID为35的消费者消费的时1、4、7分区的消息
  • ID为37的消费者消费的时2、5、8分区的消息
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1724559582065, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k4, value = {"id":4,"phone":"15676767674","birthday":1724559582065})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1724559582069, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k13, value = {"id":13,"phone":"156767676713","birthday":1724559582069})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 0, CreateTime = 1724559582066, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k5, value = {"id":5,"phone":"15676767675","birthday":1724559582066})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k19, value = {"id":19,"phone":"156767676719","birthday":1724559582072})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 1, CreateTime = 1724559582068, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k9, value = {"id":9,"phone":"15676767679","birthday":1724559582068})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 2, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k29, value = {"id":29,"phone":"156767676729","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 3, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k30, value = {"id":30,"phone":"156767676730","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 4, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k43, value = {"id":43,"phone":"156767676743","birthday":1724559582074})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 5, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k56, value = {"id":56,"phone":"156767676756","birthday":1724559582075})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 6, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k57, value = {"id":57,"phone":"156767676757","birthday":1724559582076})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 7, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k63, value = {"id":63,"phone":"156767676763","birthday":1724559582076})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 8, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k68, value = {"id":68,"phone":"156767676768","birthday":1724559582077})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 9, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k82, value = {"id":82,"phone":"156767676782","birthday":1724559582079})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 10, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k88, value = {"id":88,"phone":"156767676788","birthday":1724559582079})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 11, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k89, value = {"id":89,"phone":"156767676789","birthday":1724559582080})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1724559582067, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k8, value = {"id":8,"phone":"15676767678","birthday":1724559582067})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k21, value = {"id":21,"phone":"156767676721","birthday":1724559582072})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 4, leaderEpoch = 0, offset = 12, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k92, value = {"id":92,"phone":"156767676792","birthday":1724559582080})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1724559582071, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k16, value = {"id":16,"phone":"156767676716","birthday":1724559582071})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1724559582071, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k17, value = {"id":17,"phone":"156767676717","birthday":1724559582071})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k27, value = {"id":27,"phone":"156767676727","birthday":1724559582072})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k34, value = {"id":34,"phone":"156767676734","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 4, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k37, value = {"id":37,"phone":"156767676737","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k64, value = {"id":64,"phone":"156767676764","birthday":1724559582077})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 0, CreateTime = 1724559582063, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k1, value = {"id":1,"phone":"15676767671","birthday":1724559582062})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 1, CreateTime = 1724559582070, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k14, value = {"id":14,"phone":"156767676714","birthday":1724559582069})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 2, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k22, value = {"id":22,"phone":"156767676722","birthday":1724559582072})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 3, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k38, value = {"id":38,"phone":"156767676738","birthday":1724559582073})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1724559582068, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k11, value = {"id":11,"phone":"156767676711","birthday":1724559582068})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k24, value = {"id":24,"phone":"156767676724","birthday":1724559582072})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 4, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k39, value = {"id":39,"phone":"156767676739","birthday":1724559582074})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k36, value = {"id":36,"phone":"156767676736","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 5, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k49, value = {"id":49,"phone":"156767676749","birthday":1724559582074})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k31, value = {"id":31,"phone":"156767676731","birthday":1724559582073})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 6, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k58, value = {"id":58,"phone":"156767676758","birthday":1724559582076})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k41, value = {"id":41,"phone":"156767676741","birthday":1724559582074})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 7, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k80, value = {"id":80,"phone":"156767676780","birthday":1724559582079})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k42, value = {"id":42,"phone":"156767676742","birthday":1724559582074})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 8, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k81, value = {"id":81,"phone":"156767676781","birthday":1724559582079})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 5, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k47, value = {"id":47,"phone":"156767676747","birthday":1724559582074})
35---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 7, leaderEpoch = 0, offset = 9, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k93, value = {"id":93,"phone":"156767676793","birthday":1724559582080})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k46, value = {"id":46,"phone":"156767676746","birthday":1724559582074})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k53, value = {"id":53,"phone":"156767676753","birthday":1724559582075})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k55, value = {"id":55,"phone":"156767676755","birthday":1724559582075})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k70, value = {"id":70,"phone":"156767676770","birthday":1724559582077})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k71, value = {"id":71,"phone":"156767676771","birthday":1724559582077})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k72, value = {"id":72,"phone":"156767676772","birthday":1724559582078})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k74, value = {"id":74,"phone":"156767676774","birthday":1724559582078})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k84, value = {"id":84,"phone":"156767676784","birthday":1724559582079})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k85, value = {"id":85,"phone":"156767676785","birthday":1724559582079})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 15, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k94, value = {"id":94,"phone":"156767676794","birthday":1724559582080})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1724559582081, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k98, value = {"id":98,"phone":"156767676798","birthday":1724559582081})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 0, CreateTime = 1724559582063, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k2, value = {"id":2,"phone":"15676767672","birthday":1724559582063})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 1, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k18, value = {"id":18,"phone":"156767676718","birthday":1724559582071})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 2, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k23, value = {"id":23,"phone":"156767676723","birthday":1724559582072})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 3, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k33, value = {"id":33,"phone":"156767676733","birthday":1724559582073})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 6, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k59, value = {"id":59,"phone":"156767676759","birthday":1724559582076})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 7, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k67, value = {"id":67,"phone":"156767676767","birthday":1724559582077})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 8, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k77, value = {"id":77,"phone":"156767676777","birthday":1724559582078})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 9, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k78, value = {"id":78,"phone":"156767676778","birthday":1724559582078})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 0, offset = 10, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k86, value = {"id":86,"phone":"156767676786","birthday":1724559582079})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 0, CreateTime = 1724559582067, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k7, value = {"id":7,"phone":"15676767677","birthday":1724559582067})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 1, CreateTime = 1724559582068, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k10, value = {"id":10,"phone":"156767676710","birthday":1724559582068})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 2, CreateTime = 1724559582070, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k15, value = {"id":15,"phone":"156767676715","birthday":1724559582070})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 3, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k25, value = {"id":25,"phone":"156767676725","birthday":1724559582072})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 4, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k28, value = {"id":28,"phone":"156767676728","birthday":1724559582073})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 5, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k48, value = {"id":48,"phone":"156767676748","birthday":1724559582074})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 6, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k50, value = {"id":50,"phone":"156767676750","birthday":1724559582075})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 7, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k75, value = {"id":75,"phone":"156767676775","birthday":1724559582078})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 8, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k79, value = {"id":79,"phone":"156767676779","birthday":1724559582078})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 8, leaderEpoch = 0, offset = 9, CreateTime = 1724559582081, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k97, value = {"id":97,"phone":"156767676797","birthday":1724559582081})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 0, CreateTime = 1724559582030, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k0, value = {"id":0,"phone":"15676767670","birthday":1724559581812})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 1, CreateTime = 1724559582067, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k6, value = {"id":6,"phone":"15676767676","birthday":1724559582066})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 2, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k62, value = {"id":62,"phone":"156767676762","birthday":1724559582076})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 3, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k65, value = {"id":65,"phone":"156767676765","birthday":1724559582077})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 4, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k69, value = {"id":69,"phone":"156767676769","birthday":1724559582077})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 5, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k73, value = {"id":73,"phone":"156767676773","birthday":1724559582078})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 6, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k90, value = {"id":90,"phone":"156767676790","birthday":1724559582080})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 7, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k91, value = {"id":91,"phone":"156767676791","birthday":1724559582080})
37---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 5, leaderEpoch = 0, offset = 8, CreateTime = 1724559582081, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k99, value = {"id":99,"phone":"156767676799","birthday":1724559582081})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 4, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k44, value = {"id":44,"phone":"156767676744","birthday":1724559582074})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 5, CreateTime = 1724559582078, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k76, value = {"id":76,"phone":"156767676776","birthday":1724559582078})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 0, offset = 6, CreateTime = 1724559582081, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k96, value = {"id":96,"phone":"156767676796","birthday":1724559582080})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 0, CreateTime = 1724559582065, serialized key size = 2, serialized value size = 55, headers = RecordHeaders(headers = [], isReadOnly = false), key = k3, value = {"id":3,"phone":"15676767673","birthday":1724559582064})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 1, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k20, value = {"id":20,"phone":"156767676720","birthday":1724559582072})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 2, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k32, value = {"id":32,"phone":"156767676732","birthday":1724559582073})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 3, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k40, value = {"id":40,"phone":"156767676740","birthday":1724559582074})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 4, CreateTime = 1724559582074, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k45, value = {"id":45,"phone":"156767676745","birthday":1724559582074})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 5, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k51, value = {"id":51,"phone":"156767676751","birthday":1724559582075})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 6, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k60, value = {"id":60,"phone":"156767676760","birthday":1724559582076})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 7, CreateTime = 1724559582077, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k66, value = {"id":66,"phone":"156767676766","birthday":1724559582077})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 8, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k83, value = {"id":83,"phone":"156767676783","birthday":1724559582079})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 9, CreateTime = 1724559582079, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k87, value = {"id":87,"phone":"156767676787","birthday":1724559582079})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 6, leaderEpoch = 0, offset = 10, CreateTime = 1724559582080, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k95, value = {"id":95,"phone":"156767676795","birthday":1724559582080})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 0, CreateTime = 1724559582068, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k12, value = {"id":12,"phone":"156767676712","birthday":1724559582068})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 1, CreateTime = 1724559582072, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k26, value = {"id":26,"phone":"156767676726","birthday":1724559582072})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 2, CreateTime = 1724559582073, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k35, value = {"id":35,"phone":"156767676735","birthday":1724559582073})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 3, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k52, value = {"id":52,"phone":"156767676752","birthday":1724559582075})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 4, CreateTime = 1724559582075, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k54, value = {"id":54,"phone":"156767676754","birthday":1724559582075})
33---> 消费消息 record = ConsumerRecord(topic = myTopic, partition = 9, leaderEpoch = 0, offset = 5, CreateTime = 1724559582076, serialized key size = 3, serialized value size = 57, headers = RecordHeaders(headers = [], isReadOnly = false), key = k61, value = {"id":61,"phone":"156767676761","birthday":1724559582076})

4、StickyAssignor(粘性分配)和CooperativeStickyAssignor(协作分区)消息者消费分区策略

在这里插入图片描述

4.1、修改kafka配置类

只需要修改kafka配置文件中指定的消息消费分区器即可,其余与章节3代码一致
在这里插入图片描述

4.2、kafka配置类

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;/*** 消费者相关配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);//指定使用轮询的消息消费分区器props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());return props;}/*** 消费者创建工厂*/@Beanpublic ConsumerFactory<String,String> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfigs());}/*** 创建监听器容器工厂* @param ourConsumerFactory* @return*/@Beanpublic KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String,String> ourConsumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();listenerContainerFactory.setConsumerFactory(ourConsumerFactory);return listenerContainerFactory;}@Beanpublic NewTopic newTopic(){return new NewTopic("myTopic",10, (short)1);}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【C#】【EXCEL】Bumblebee/Components/Analysis/GH_Ex_Ana_CondAverage.cs
  • huggingface下载model
  • Linux-远程访问及控制
  • Java数据结构篇
  • arm-Pwn环境搭建+简单题目
  • hostnamectrl到底是什么命令?
  • 还在烦恼Cosplay论坛开发?探索PHP+Vue的完美解决方案!
  • JVM类加载机制—类加载器和双亲委派机制详解
  • Mybatis框架常见问题总结
  • 计算机毕业设计Spark+Tensorflow股票推荐系统 股票预测系统 股票可视化 股票数据分析 量化交易系统 股票爬虫 股票K线图 大数据毕业设计 AI
  • 基于YOLOv8的无人机高空红外(HIT-UAV)检测算法,新的混合型扩张型残差注意力(HDRAB)助力涨点(二)
  • 小琳AI课堂:AIGC
  • 香蕉梨:自然的甜蜜宝藏
  • C\C++ Sqlite3使用详解
  • 云计算实训34——docker环境配置、镜像基本操作、容器基本操作、设置远程连接管理
  • 2017-09-12 前端日报
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • electron原来这么简单----打包你的react、VUE桌面应用程序
  • js操作时间(持续更新)
  • mysql外键的使用
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • SQLServer之索引简介
  • Travix是如何部署应用程序到Kubernetes上的
  • Vue全家桶实现一个Web App
  • Yeoman_Bower_Grunt
  • 汉诺塔算法
  • 技术胖1-4季视频复习— (看视频笔记)
  • 小程序 setData 学问多
  • 译自由幺半群
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • ​Java基础复习笔记 第16章:网络编程
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • ​Redis 实现计数器和限速器的
  • # 移动硬盘误操作制作为启动盘数据恢复问题
  • ${factoryList }后面有空格不影响
  • %check_box% in rails :coditions={:has_many , :through}
  • (09)Hive——CTE 公共表达式
  • (2)STL算法之元素计数
  • (2)从源码角度聊聊Jetpack Navigator的工作流程
  • (纯JS)图片裁剪
  • (附源码)php投票系统 毕业设计 121500
  • (附源码)springboot 基于HTML5的个人网页的网站设计与实现 毕业设计 031623
  • (九十四)函数和二维数组
  • (力扣题库)跳跃游戏II(c++)
  • (算法)前K大的和
  • (一)utf8mb4_general_ci 和 utf8mb4_unicode_ci 适用排序和比较规则场景
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • (转)负载均衡,回话保持,cookie
  • (自用)gtest单元测试
  • ***测试-HTTP方法
  • .bat批处理出现中文乱码的情况
  • .NET / MSBuild 扩展编译时什么时候用 BeforeTargets / AfterTargets 什么时候用 DependsOnTargets?
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
  • .Net Core webapi RestFul 统一接口数据返回格式