当前位置: 首页 > news >正文

SpringCloud Stream消息驱动

SpringCloud Stream消息驱动

前言

在上一篇文章中《SpringCloud集成RocketMQ》;我们介绍了如何在自己的SpringCloud项目中使用RocketMQ消息中间件来实现消息队列的生产和消费的过程;在文章中我们使用的集成方式;并不是直接使用的RocketMQ的API进行调用,都是通过SpringCloud Stream提供给我们的接口方式来实现的,今天我们这个文章给大家介绍的就是这个SpringCloud Stream消息驱动框架,以及其特点;

SpringCloud Stream

Spring Cloud Stream是SpringCloud成员中的一个框架组件;用于构建基于消息的微服务应用框架,在一般的消息中间件中都有一个 Broker Server(代理服务器)或者类似功能的部分,作为消息中转的角色,负责存储消息、转发消息。

Spring Cloud Stream 提供了消息中间件的统一抽象,通过binder和binding的抽象;对各种消息中间件产品中的publish-subscribe、consumer groups、partition 这些概念进行了统一;各种消息中间件产品实现支持SpringCloud Stream的Binder实现;在Spring Cloud Stream的消息流框架里通过inputs或者outputs来与SpringCloud Stream中的binder进行交互,SpringCloud Stream 的binder负责与中间件交互,所以我们只需要和Stream交互就可以很方便实现消息驱动。 如下图所示:

 

为什么需要SpringCloud Stream

如上图所示;通过Spring Cloud Stream的框架;应用业务这块的实现代码,都是通过inputs,outputs在springCloud Stream体系之内和Binder进行交互; 而binder在具体去和对应的消息中间件的产品进行交互;通过这样的方式,在我们的实现的应用中没有任何具体的消息中间件的接口调用的代码侵入; 所有的实现代码都是通过Spring Cloud Stream框架提供的, 要切换不同的消息中间件产品的实现,我们只需要通过spring Cloud stream提供的配置就可以轻松的实现IOC;以后的业务如果发生消息中间件的任何变化,都不会影响到我们的业务代码变化; 这样就是实现了对具体实现的依赖。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可!

重要概念

Binder

Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间交互的实现,目前SpringCloud Stream实现支持Kafka,RabbitMQ,RocketMQ等消息中间件的binder; 通过binder,可以实现中间件的连接,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),都可以通过外部配置项来实现,而不需要修改一行代码

例如:在上一文中的代码样例里,出现的

spring:
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址

Bindings

包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个接口和规范,开发者只需使用SpringCloud Stream里定义的接口和配置规范;就可以在咱们的业务应用程序里实现消息通道里的生产者Producer和消费者Consumer ,发送消息或者处理消息数据,开发者不需要考虑与底层消息中间件的接口调用。

例如:在上一文中的生产者的代码和消费者的代码

生产者

@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

消费者

@EnableBinding(Sink.class)
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

生产消息

    @Autowired
    private Source source;

    @GetMapping("/produce")
    public boolean produce(String msg) {
        MyMessage message = new MyMessage(msg).setId(new Random().nextInt());
        Message<MyMessage > springMessage = MessageBuilder.withPayload(message)
                .build();

        return source.output().send(springMessage);
    }

处理消息

@Component
public class MyConsumer {

    private Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    @StreamListener(Sink.INPUT)
    public void onMessage(@Payload MyMessage message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

结束语

本文主要介绍的Spring Cloud Stream;并且Spring Cloud Stream的特点以及使用的方法;在SpringCloud的项目中,如果有需要集成Kafka,RabbitMQ,RocketMQ等消息中间进行消息处理时;可以通过Spring Cloud Stream来进行实现; 大家还可以结合着上一篇文章《SpringCloud集成RocketMQ》来具体使用SpringCloud Stream来实现对RocketMQ的集成。

 

谢谢大家持续的关注

 

相关文章:

  • JVisualVM 中线程状态(运行/休眠/等待/驻留/监视)解析
  • 常识——绳结打折法
  • AVL树的特性和模拟实现
  • java剧院售票系统计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
  • SpringBoot-36-分布式理论概述
  • 第一章 Linux及Linux Shell简介
  • http客户端Feign
  • SpringBoot-37-RPC概述
  • tensorflo之keras高层接口
  • OpenCV图像处理学习二十一,直方图比较方法
  • 第5章 总体设计【软件设计一般分为总体设计和详细设计,它们之间的关系是全局与局部】
  • java开放式教学评价管理系统计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
  • define宏定义和const的区别
  • 电量优化 - Hook 系统服务
  • 网课搜题公众号 接口查题 查课接口 网课题库对接教程 附接口
  • JS中 map, filter, some, every, forEach, for in, for of 用法总结
  • 【347天】每日项目总结系列085(2018.01.18)
  • android高仿小视频、应用锁、3种存储库、QQ小红点动画、仿支付宝图表等源码...
  • C++类中的特殊成员函数
  • Docker下部署自己的LNMP工作环境
  • DOM的那些事
  • ES6 ...操作符
  • github指令
  • Next.js之基础概念(二)
  • node 版本过低
  • weex踩坑之旅第一弹 ~ 搭建具有入口文件的weex脚手架
  • 不上全站https的网站你们就等着被恶心死吧
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 基于webpack 的 vue 多页架构
  • 讲清楚之javascript作用域
  • 前端性能优化——回流与重绘
  • 小程序开发之路(一)
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 你学不懂C语言,是因为不懂编写C程序的7个步骤 ...
  • ​如何在iOS手机上查看应用日志
  • # C++之functional库用法整理
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • (31)对象的克隆
  • (C语言)fgets与fputs函数详解
  • (NSDate) 时间 (time )比较
  • (二)构建dubbo分布式平台-平台功能导图
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (强烈推荐)移动端音视频从零到上手(下)
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • ./configure,make,make install的作用(转)
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .net refrector
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端
  • .net websocket 获取http登录的用户_如何解密浏览器的登录密码?获取浏览器内用户信息?...
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .NET/C# 中设置当发生某个特定异常时进入断点(不借助 Visual Studio 的纯代码实现)
  • .Net通用分页类(存储过程分页版,可以选择页码的显示样式,且有中英选择)
  • :中兴通讯为何成功
  • @KafkaListener注解详解(一)| 常用参数详解
  • [ C++ ] STL---string类的使用指南