当前位置: 首页 > news >正文

SpringCloud Stream基本使用

介绍

Spring Cloud Stream是一个用于构建与共享消息系统连接的高度可伸缩的事件驱动微服务框架。

该框架提供了一个基于已建立且熟悉的Spring习惯用法和最佳实践的灵活编程模型,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

主要应用场景

可能我们会遇到不同的系统在用不同的消息队列,比如系统A用的Kafka、系统B用的RabbitMQ,但是我们现在又没有学习过Kafka,那么怎么办呢?有没有一种方式像JDBC一样,我们只需要关心SQL和业务本身,而不用关心数据库的具体实现呢?

SpringCloud Stream能够做到,它能够屏蔽底层实现,我们使用统一的消息队列操作方式就能操作多种不同类型的消息队列。

img

它屏蔽了不同消息队列底层操作,让我们使用统一的Input和Output形式,以Binder为中间件,这样就算我们切换了不同的消息队列,也无需修改代码,而具体某种消息队列的底层实现是交给Stream在做的。

Binder
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka、RabbitMQ和RocketMQ等的binder

通过binder,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchange),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行业务代码

实践

Linux安装RabbitMQ

接下来创建一个springboot父子项目来演示一下其基本使用(以rabbitmq为例):

image-20220928141522794

父工程依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>2021.0.1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

image-20220928142106277

子模块依赖:

<dependencies>
    <!--  RabbitMQ的Stream实现  -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

image-20220928142150258

生产者

生产者配置文件:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

定义发送消息接口:

public interface IMessagePublisher {
    void publish(String message);
}

接口实现类:

@EnableBinding(Source.class)    // 定义消息的推送管道(Source是spring的)
@Slf4j
public class MessagePublishImpl implements IMessagePublisher {

    @Resource
    private MessageChannel output;  // 消息发送管道

    @Override
    public void publish(String message) {

        log.info("发送消息:{}", message);
        // MessageBuilder是spring的integration.support.MessageBuilder
        output.send(MessageBuilder.withPayload(message).build());

    }

}

Source接口

image-20220930212128357

新建controller.PublishController

@RestController
public class PublishController {

    @Resource
    IMessagePublisher publisher;

    @RequestMapping("/publish")
    public String publish(String message) {
        publisher.publish(message);
        return "消息发送成功!" + new Date();
    }
}

消费者

配置文件:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

监听消息:

@EnableBinding(Sink.class) // (Sink也是spring的)
public class ReceiveMessageListener {

    @StreamListener(Sink.INPUT) // 监听
    public void input(Message<String> message) {
        System.out.println("消费者1号------>收到的消息:" + message.getPayload());
    }
}

Sink接口

image-20220929165116435

注意: output输入信道是stream自带的,还自带了一个输出信道input,上述两个接口。

启动两个项目进行测试,项目启动完成发现studyExchange交换机已经创建好了。

image-20220929165628415

调用发送消息接口,消息发送成功

image-20220929165806075

消费者也成功消费消息

image-20220929165902093

使用自定义信道实现消息传递

上述代码实现是通过stream默认的信道完成的,本部分实现通过自定义信道实现

类比stream默认信道,创建两个自定义信道MySource、MySink

public interface MySource {
    /**
     * Name of the output channel.
     */
    String OUTPUT1 = "output1";

    /**
     * @return output channel
     */
    @Output(OUTPUT1)
    MessageChannel output();
}
public interface MySink {
    /**
     * Input channel name.
     */
    String INPUT1 = "input1";

    /**
     * @return input channel.
     */
    @Input(INPUT1)
    SubscribableChannel input();
}

生产者新增配置项:

image-20220930213141638

同理消费者新增配置项:

image-20220930213310450

接下来改造发送消息实现类

image-20220930213855720

修改该类,EnableBinding注解的值改为绑定多个传入信道接口,然后使用我们自定义信道发送消息。

消费者消费消息

image-20220930214025905

同理,EnableBinding注解的值改为绑定多个信道接口,新建一个方法监听即可。

启动项目,进行测试:

image-20220930214151104

发现我们自定义信道消息也能正常被消费到。

image-20220930214253006

@EnableBinding注解过时

由上图可以看到 @EnableBinding 注解貌似已经过时了。

@EnableBinding源码中明确声明,该注解在从3.1版本开始被弃用,推荐我们使用函数编程的方式

接下来会演示下这种方式基本使用:

生产者案例:

yml配置:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-out-0:
          destination: demo #表示要使用Exchange名称定义
          contentType: text/plain

注意使用这种方式,bingdings 集合中的key由 通道名-out/in-数字组成

新版发送消息:

@RestController
public class PublishController {

    @Resource
    StreamBridge bridge;

    @RequestMapping("/publish")
    public String publish(String message) {
        bridge.send("myChannel-out-0", message);
        return "消息发送成功!" + new Date();
    }
}

@Autowire注解自动注入StreamBridge的实例,直接使用StreamBridge发送消息,StreamBridge的send方法第一个参数是binding的名字,第二个参数是想要发送的消息。

消费者案例:

yml配置:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-in-0:
          destination: demo
          contentType: text/plain

消费者消费消息:

@Component
public class ConsumerComponent {

    @Bean("myChannel")  
    public Consumer<String> consumer() {
        return message -> System.out.println("新版本消费消息:" + message);
    }
    
    //@Bean
    //public Consumer<String> myChannel() {
    //    return message -> System.out.println("新版本消费消息:" + message);
    //}
}

注意:@Bean里是yml配置文件中通道名称,这样生产者发送的数据才会正确到达,应用程序启动后会自动接收生产者发送的消息;

或者是方法名为yml配置文件中通道名称,两种方式都能正常消费消息。

启动项目进行测试:

交换机正常创建:

image-20220930223918155

消息发送成功:

image-20220930223539563

消息正常被消费:

image-20220930223617667

完毕!

参考博客:

SpringCloud Stream @EnableBinding注解过时

相关文章:

  • 没有CANdela,无法编辑cdd数据库文件,也能轻松完成诊断测试,立省大二十个w
  • 【Linux】基本指令 (下篇)
  • 博途PLC的模糊PID(Matlab “fuzzy“工具箱使用介绍)
  • 【Vue 开发实战】实战篇 # 45:如何构建可交互的组件文档让代码高亮的显示在页面
  • m分别通过matlab和FPGA实现基于高阶循环谱的信号载波调制识别(四阶循环累量)仿真(包括仿真录像,matlab工程,fpga工程)
  • 前端知识体系
  • html5 图像标签
  • Python02--python中的缩进,注释和模块
  • STM32CUBEIDE(14)----外部中断EXTI
  • 项目管理PMP要点
  • Element-UI+vue实现登录表单
  • vmware安装centos7并制作多副本
  • hbase加kerberos 后报错hbase master 起不来
  • iPad 使用技巧:虚拟键盘与实体键盘
  • 皮亚杰将儿童的道德发展分为四个阶段
  • 【402天】跃迁之路——程序员高效学习方法论探索系列(实验阶段159-2018.03.14)...
  • create-react-app项目添加less配置
  • git 常用命令
  • IIS 10 PHP CGI 设置 PHP_INI_SCAN_DIR
  • JavaScript服务器推送技术之 WebSocket
  • Linux后台研发超实用命令总结
  • python docx文档转html页面
  • Vue实战(四)登录/注册页的实现
  • 免费小说阅读小程序
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • ​VRRP 虚拟路由冗余协议(华为)
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • ​业务双活的数据切换思路设计(下)
  • #Linux(Source Insight安装及工程建立)
  • #ubuntu# #git# repository git config --global --add safe.directory
  • $GOPATH/go.mod exists but should not goland
  • (4.10~4.16)
  • (C++17) optional的使用
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (二十五)admin-boot项目之集成消息队列Rabbitmq
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • .NET 设计模式—简单工厂(Simple Factory Pattern)
  • .Net的DataSet直接与SQL2005交互
  • .NET连接MongoDB数据库实例教程
  • .NET面试题解析(11)-SQL语言基础及数据库基本原理
  • .net下简单快捷的数值高低位切换
  • .net中应用SQL缓存(实例使用)
  • [2021 蓝帽杯] One Pointer PHP
  • [AutoSar]工程中的cpuload陷阱(三)测试
  • [Docker]三.Docker 部署nginx,以及映射端口,挂载数据卷
  • [Docker]十一.Docker Swarm集群raft算法,Docker Swarm Web管理工具
  • [HOW TO]怎么在iPhone程序中实现可多选可搜索按字母排序的联系人选择器
  • [MySQL FAQ]系列 -- 如何利用触发器实现账户权限审计
  • [Power Query] 分组依据
  • [python] 过年燃放烟花
  • [Python]Django类视图
  • [SE]软件项目需求分析为什么困难