当前位置: 首页 > news >正文

RabbitMQ再回首--往事如梦

这文章你就读吧,越读越🥸,一读一个不吱声 

可靠的🐰警官:rabbitMQ,功能全面,不丢数据,体量小,容易堆积

声明exchange
channel . exchangeDeclare ( String exchange , String type , boolean durable , boolean
autoDelete , Map < String , Object > arguments ) throws IOException ;
  
   durable=false 我勒个天才 会丢消息哒,broker重启之后 exchange就从这个世界上消失了,pro还怎么发消息 啊!
声明queue
channel . queueDeclare ( String queue , boolean durable , boolean exclusive , boolean
autoDelete , Map < String , Object > arguments );
      exclusive 排他队列:仅对 首次 申明它的 连接 可见,连接断开自动删除
          同一连接不同信道:可以访问同一连接创建的排他队列
          
  1. type=QuorumStream,默认持久化:durable=true,exclusive=false
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L); //日志文件的最大字节数 maximum stream size:20 GB
params.put("x-stream-max-segment-size-bytes", 100_000_000); //每一个日志文件的最大大小 size of segment files: 100 MB
channel.queueDeclare(QUEUE_NAME, true, false, false, params);

   queue持久化不代表消息持久化,消息要看消息的:basicPublish

   单队列持久化 重启之后 消息会离你而去,单消息持久化 重启之后 消息早跑了 妥妥的

exchange与queue绑定关系

channel . queueBind ( String queue , String exchange , String routingKey ) throws
IOException ; // Producer 发送过来的消息将分到哪些 Queue
发送到queue
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
//对应页面上的Properties部分,传入一些预定的参数值。
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());//持久化
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());//持久化
//builder.headers(headers);对应页面上的Headers部分。传入自定义的参数值
builder.build()
AMQP.BasicProperties prop = builder.build();

RabbitMQ持久化机制-CSDN博客

public BasicProperties(String contentType,//消息类型如:text/plainString contentEncoding,//编码Map<String,Object> headers,Integer deliveryMode,//1:nonpersistent不持久化 2:persistent持久化Integer priority,//优先级String correlationId,String replyTo,//反馈队列String expiration,//expiration到期时间String messageId,Date timestamp,String type,String userId,String appId,String clusterId)

consumer消费消息

  • 被动消费:服务器推送,消费者设置缓冲区缓存,效率是高了但是缓冲区可能溢出 一口胖成🪣(这届奥运会的谐音梗 你懂不懂)
       channel . basicConsume ( String queue , boolean autoAck , Consumer callback );#不断推送
           每个channel有独立线程,一个channel一个消费比较推荐,做channel最重要的 专一
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback
cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws
IOException;Callback,可实现handle方法
  • 主动消费:拉取msg,需要时拉取 增加消息延迟/降低系统吞吐量;单条 注意啊 单条

       GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);

        autoAck=true,消息被consumer消费成功后,无法再消费,一次性的 懂?

        autoAck=false,需手动调channel.basicAck(),不凋则重复消费 资源多就是这么豪横

Rabbit MQ 消息消费模式_rabbitmq消费模式-CSDN博客

消费场景

 他又来了 真的是 大水冲了龙王庙 不是一家人不进一家门

一,pro 送一个 msg 到que,不需要exchange,con按que 消费

二,pro发送msg到que,多con消费同一个队列queue

  • con 的autoAck=false,调channel.basicAck通知服务器消息完成,否则重复消费
  • msg持久,queue不能被多次定义,一旦接受了durable的设定就守身如玉,及时改了 骨子里还是durable
  • channel.basicQos(prefetchCount) ;consumer同时处理msg个数,发送msg前检查发送但未收到basicAck有几个,如超过prefetchCount则换一个consumer;这样始乱终弃当然有问题了,all节点都超了,怎么办?宝宝就问你怎么办?及时发现 添加节点 再者你就换 下一个更乖
三,发布订阅 type=fanout
   pro消息发exchange,将消息路由到绑定的队列,队列对应一个消费者
    channel . exchangeDeclare ( EXCHANGE_NAME , "fanout" );type=fanout,向绑定的队列发送msg
   
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();// 声明交换器
String exchangeName = "publishSubscribeExchange";
channel.exchangeDeclare(exchangeName, "fanout");// 创建随机队列并绑定到交换器
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, "", null, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);// 接收消息
boolean autoAck = true;
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received message: " + message);}
};
channel.basicConsume(queueName, autoAck, consumer);
四,路由 type=direct
五,topic
pro通过通配符匹配路由键,交换机将消息 路由到 它绑定的队列 匹配的路由键的队列中,这就话是不是很绕,让我来用中文翻译一下:交换机 把消息发送到 配置了同样路由键的队列中
RabbitMQ的五种常见消费模型_rabbit mq消费模式-CSDN博客

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 代码随想录算法训练营第45天|LeetCode 115.不同的子序列、583. 两个字符串的删除操作、72. 编辑距离
  • Netty技术全解析:DelimiterBasedFrameDecoder类深度解析
  • MySQL增删改查(基础)
  • Java入门基础17:集合框架2(可变参数、Collections、Map系列集合、集合的嵌套、Stream流)
  • 不知道msvcp140.dll丢失的解决方法有哪些?看这篇文章教你修复丢失的msvcp140.dll
  • 8月9日笔记
  • Leetcode 17.电话号码的字母组合
  • SpringBoot自动装配原理
  • 探索 Go 语言的 json 库
  • 1Panel应用推荐:KubePi开源Kubernetes管理面板
  • 【运维项目经历|040】高可用Web服务平台:LVS+Apache集群+NFS共享存储系统
  • C 循环
  • GNU/Linux - memtool使用
  • 【YOLOV8】YOLOV8模型训练train及参数详解
  • 12322222222
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • Angular 响应式表单 基础例子
  • CSS魔法堂:Absolute Positioning就这个样
  • Flex布局到底解决了什么问题
  • java8-模拟hadoop
  • JS专题之继承
  • MySQL用户中的%到底包不包括localhost?
  • XML已死 ?
  • 二维平面内的碰撞检测【一】
  • 浮动相关
  • 复杂数据处理
  • ------- 计算机网络基础
  • 解决iview多表头动态更改列元素发生的错误
  • 如何借助 NoSQL 提高 JPA 应用性能
  • 使用权重正则化较少模型过拟合
  • 算法-插入排序
  • 《码出高效》学习笔记与书中错误记录
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • ​香农与信息论三大定律
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • # 服务治理中间件详解:Spring Cloud与Dubbo
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • ###C语言程序设计-----C语言学习(6)#
  • #QT(串口助手-界面)
  • (Oracle)SQL优化基础(三):看懂执行计划顺序
  • (第9篇)大数据的的超级应用——数据挖掘-推荐系统
  • (附源码)springboot太原学院贫困生申请管理系统 毕业设计 101517
  • (附源码)ssm高校实验室 毕业设计 800008
  • (一)Docker基本介绍
  • (一)认识微服务
  • (转载)微软数据挖掘算法:Microsoft 时序算法(5)
  • .Net Core webapi RestFul 统一接口数据返回格式
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • .NET 项目中发送电子邮件异步处理和错误机制的解决方案
  • .NET/C# 在代码中测量代码执行耗时的建议(比较系统性能计数器和系统时间)...
  • .NET技术成长路线架构图
  • .NET开源的一个小而快并且功能强大的 Windows 动态桌面软件 - DreamScene2
  • /etc/motd and /etc/issue
  • @NestedConfigurationProperty 注解用法