当前位置: 首页 > news >正文

Spring集成Apache Kafka教程

Apache Kafka是分布式、容错的流处理平台。本文介绍Spring对Apache Kafka集成访问方式,提供了对原始访问方式的封装抽象,实现基于模板和注解方式对Kafka的访问。

环境依赖

首先需要下载安装Kafka,并增加spring-kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

我们的示例使用Spring Boot,并假定kafka使用默认配置,端口没有变化。

配置主题

我们通过命令行创建kafka主题:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

当然也可以通过AdminClient以编程方式创建主题:

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("test001", 1, (short) 1);
    }
}

首先需要增加KafkaAdmin bean,通过它增加主题。

生产消息

要生产消息,需要配置ProductFactory,用于设置创建Kafka Producer实例的策略。然后需要KafkaTemplate,它是对Producer实例的包装,提供了便捷的方法给主题发送消息。

Producer实例是线程安全的,所以在整个Spring上下文中使用单例性能更好,因此KafkaTemplate实例也是线程安全的,建议使用单例。

生产者配置

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>(5);
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

发送消息

现在可以通过KafkaTemplate类发送消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

send方法返回ListenableFuture对象。如果希望阻塞发送线程、过的发送的结果,可以通过调用ListenableFuture对象的get方法,则线程会等待结果,单这样会拖慢生产者。

Kafka是非常快的流程处理平台,因此最好使用异步方式处理结果,这样后续消息无需等待前一个消息的结果。我们可以通过回调方式实现:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
        }
    });
}

消费消息

消费者配置

消费消息需要配置 ConsumerFactory 和 KafkaListenerContainerFactory。只要这些bean在Spring Bean工厂中有效,基于pojo的消费者就可以使用@KafkaListener注解。

在配置类上增加@EnableKafka 注解是为了监测Spring管理bean上的@KafkaListener注解:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>(6);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
                                                           kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                                              new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

接收消息

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

我们可以为单个主题实现多个监听器,每个使用不同的分组ID,而且一个消费者可以监听多个主题:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring也支持获取一个或多个消息头信息,通过在监听器上是哟个@Header注解:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println("Received Message: " + message" + "from partition: " + partition);
}

从指定分区消费消息

上面创建的主题test001,只有一个分区。对于有多个分区的主题,@KafkaListener注解可以显示订阅主题的特定分区和初始偏移量:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

这里initialOffset 设置为0,每次监听器初始化时,从分区0、3两个分区之前消费过的消息将被重新消费。

如果我们不需要设置偏移量,可以是使用@TopicPartition注解的partitions属性,仅设置分区,不需要指定偏移量:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

监听器增加消息过滤器

我们可以通过增加自定义过滤器配置监听器消费特定的消息内容。可以给KafkaListenerContainerFactory增加 RecordFilterStrategy 策略:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setRecordFilterStrategy(record -> record.value().contains("World"));

    return factory;
}

现在配置监听器使用该容器工厂:

@KafkaListener(topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

上述监听器中,符合过滤条件的消息将被丢弃。

自定义消息转换

前面介绍了发送、接收字符串消息,我们可以发送接收自定义java对象。这选哟配置相应序列化和反序列类。
下面定义简单的bean,用于作为消息进行传递:

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

生产自定义消息

这个示例使用JsonSerializer,下面代码配置ProducerFactory 和 KafkaTemplate:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

现在能够是使用新的KafkaTemplate发送消息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

消费自定义消息

类似的,我们修改ConsumerFactory 和 KafkaListenerContainerFactory 配置反序列好Greeting消息:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSON 序列化和反序列化是使用Jackson库,需要增加相应依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

现在写个监听器消费Greeting消息:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

多类型监听器

现在看如何配置应用发送不同类型对象给同一主题,然后消费消息。首先定义新的类型Farewell:

public class Farewell {

    private String message;
    private Integer remainingMinutes;

    // standard getters, setters and constructor
}

我们需要增加额外的配置,从而能够给同一主题发送 Greeting 和 Farewell类型的对象消息。

设置生产者类型映射

给生产者配置Json类型映射:

configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.dto.Greeting, farewell:com.dto.Farewell");

这种方式该库将用相应的类名填充类型头,因此, ProducerFactory 和 KafkaTemplate看上去类似这样:

@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.dto.Greeting, farewell:com.dto.Farewell");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
    return new KafkaTemplate<>(multiTypeProducerFactory());
}

现在可以使用 KafkaTemplate 去给该主题发送 Greeting, Farewell或任何Object:

multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");

消费者使用自定义类型转换

为了反序列化接收的消息,需要给消费者提供自定义MessageConverter.
在后台,MessageConverter依赖于Jackson2JavaTypeMapper。默认情况下,映射器推断接收对象的类型:相反,我们需要显式地指定使用类型头来确定反序列化的目标类型:

typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);

我们还需要提供反向映射信息。在消息头中指定greeting关联Greeting对象,同样farewell关联Farewell对象:

Map<String, Class<?>> mappings = new HashMap<>(); 
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);

typeMapper.setIdClassMapping(mappings);

最后需要配置mapper信任的包,一定要确保它包含目标类的位置:

typeMapper.addTrustedPackages("com.dataz.dto");

最终完整MessageConverter转换器的定义如下:

@Bean
public RecordMessageConverter multiTypeConverter() {
    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("com.dataz.dto");

    Map<String, Class<?>> mappings = new HashMap<>();
    mappings.put("greeting", Greeting.class);
    mappings.put("farewell", Farewell.class);
    typeMapper.setIdClassMapping(mappings);
    converter.setTypeMapper(typeMapper);

    return converter;
}

现在需要告诉ConcurrentKafkaListenerContainerFactory使用MessageConverter,而不是基本的 ConsumerFactory:

@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
    HashMap<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(multiTypeConsumerFactory());
    factory.setMessageConverter(multiTypeConverter());
    return factory;
}

在监听器上使用 @KafkaHandler

最后在监听器中,创建方法处理接收到的消息,每个处理方法需要增加 @KafkaHandler注解。当然还是可以定义默认对象处理程序:

@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {

    @KafkaHandler
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }

    @KafkaHandler
    public void handleF(Farewell farewell) {
        System.out.println("Farewell received: " + farewell);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Unkown type received: " + object);
    }
}

总结

本文介绍了Spring 对Apache Kafka的支持。通过示例展示了如何Spring 实现发送和接收消息。

相关文章:

  • 基于SSM实现图书馆座位预约系统
  • java部分排序算法
  • Java8-特性
  • Mybatis-Plus快速入门|比Mybatis更简单好用的ORM框架
  • Java异常的捕获和处理
  • 若依一体式改包名
  • 【机器学习kaggle赛事】泰坦尼克号生存预测
  • 【C进阶】——详解10个C语言中常见的字符串操作函数及其模拟实现
  • vue--面试题
  • HotPlot
  • 只想买把牙刷却花了100块,千万警惕!一不小心就落入商家圈套
  • 检查java死锁的三种方式
  • 什么是内网穿透
  • 没交公积金可以贷款买房吗?
  • Go 语言中的多变量同时赋值、匿名变量以及变量作用域(Let‘s Go 四)
  • ----------
  • [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • Android交互
  • android图片蒙层
  • CSS3 变换
  • es的写入过程
  • gulp 教程
  • JavaScript异步流程控制的前世今生
  • JWT究竟是什么呢?
  • Node 版本管理
  • NSTimer学习笔记
  • Python爬虫--- 1.3 BS4库的解析器
  • Sass Day-01
  • 后端_ThinkPHP5
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 让你的分享飞起来——极光推出社会化分享组件
  • 如何抓住下一波零售风口?看RPA玩转零售自动化
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 系统认识JavaScript正则表达式
  • 项目实战-Api的解决方案
  • 以太坊客户端Geth命令参数详解
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • #etcd#安装时出错
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第2节(共同的基类)
  • (分享)一个图片添加水印的小demo的页面,可自定义样式
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (十八)SpringBoot之发送QQ邮件
  • (五)MySQL的备份及恢复
  • (一)基于IDEA的JAVA基础12
  • (转)负载均衡,回话保持,cookie
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • .NET 回调、接口回调、 委托
  • .NET 设计模式—适配器模式(Adapter Pattern)
  • .NET中的Exception处理(C#)
  • @entity 不限字节长度的类型_一文读懂Redis常见对象类型的底层数据结构
  • @Transient注解
  • [ C++ ] STL---stack与queue
  • [.net] 如何在mail的加入正文显示图片