当前位置: 首页 > news >正文

RabbitMQ详解(二)------消息通信的概念

RabbitMQ详解(二)------消息通信的概念

消息通信,有很多种,邮箱 qq 微信 短信等,这些通信方式都有发送者,接受者,还有一个中间存储离线消息的容器.但是这些通信方式和RabbitMQ的通信模型是不一样的,比如邮件,邮件服务器基于POP3/SMTP协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构.而RabbitMQ服务器基于AMQP协议,这个协议是不需要明确指定发送方和接受方的,而且发送的消息也没有固定的结构,甚至可直接存储二进制数据,并且和邮件服务器一样,也能存储离线消息,最关键的是RabbitMQ既能够以一对一的方式进行路由,还能够以一对多的方式进行广播.


生产者和消费者

在RabbitMQ的通信过程中,有两个主要的角色:生产者和消费者.类比于邮件通信的发送方和接收方.

首先我们需要明确RabbitMQ服务器是不能够产生数据的,它是——消息中间件,是一个用来传递消息的中间商.生产者产生创建消息,然后发布到代理服务器(RabbitMQ),而消费者则从代理服务器获取消息(不是直接找生产者要消息),而且在实际应用中,生产者和消费者也是可以角色互相转换的,所以当我们应用程序连接到RabbitMQ服务器时,必须要明确我是生产者还是消费者.


消息

生产者创建消息,然后发布到RabbitMQ服务器中.

这里的消息分为两部分:有效内容内容标签.

  1. 有效内容:可以是任何内容,一个数组,一个集合,甚至二进制数据都可以.RabbitMQ不会在意你发什么数据,尽管发就可以了.
  2. 内容标签:描述有效内容,是RabbitMQ用来决定谁将获得消息.前面说的邮件通信,必须明确指定发送方地址和收件方地址,而基于AMQP协议的RabbitMQ则是通过生产者发送消息附带的内容标签将消息发送给感兴趣的消费者.

一般来说生产者创建消息会设置标签,但是传输到消费者那里就没有标签了,除非在有效内容中说明谁是生产者,一般消费者是不知道谁产生的消息的.


信道

生产者产生了消息,然后发布到RabbitMQ服务器,发布之前肯定要先连接上服务器,也就是要在应用程序和RabbitMQ服务器之间建立一条TCP连接,一旦连接建立,应用程序就可以创建一条AMQP信道.

信道是建立在"真实的TCP"连接内的虚拟连接,AMQP命令都是通过信道发送出去的,每条信道都会被指派一个唯一的ID(AMQP库会帮你记住ID),不论是发布消息,订阅队列或者接受消息,这些动作都是通过信道来完成的.

为什么不直接通过TCP连接来发送AMQP命令呢?

这是因为效率问题,对于操作系统来说,每次建立和销毁TCP会话是非常昂贵的开销,而实际系统中,比如电商双十一,每秒钟高峰期成千上万条连接,一般来说操作系统建立TCP连接是有数量限制的,那这就会遇到瓶颈.

引入信道的概念,我们可以在一条TCP连接上创建N多个信道,这样既能发送命令,也能够保证每条信道的私密性,我们可以将其想象为光纤电缆.


交换器和队列

交换器和队列都是RabbitMQ服务器的一部分,我们知道生产者会将消息发送到RabbitMQ服务器,而进入该服务器后,首先进入交换器部分,然后交换器根据消息附带的内容标签,将消息绑定到相应的队列.什么是队列?

  1. 容纳消息的场所,生产者发送到RabbitMQ服务器的消息会在队列中等待消费者消费.
  2. 队列是RabbitMQ服务器的终点(除非消息进入了黑洞).
  3. 队列可以实现负载均衡,我们可以增加一堆消费者,然后让RabbitMQ以循环的方式来均匀的分配消息.

消息进入RabbitMQ服务器时,会首先将消息发送到交换器,然后交换器会根据特定的路由算法以及消息的内容标签将消息绑定到相应的队列.在AMQP协议中有四种交换器:direct、fanout、topic、headers,每种交换器都实现了不同的路由算法,这也对应RabbitMQ工作的几种不同工作方式.


虚拟主机

首先我们抛出一个问题,一个RabbitMQ肯定不是只服务一个应用程序,那么多个应用程序同时使用RabbitMQ服务器,如何保证彼此之间不会冲突?

答案就是虚拟主机,虚拟主机其实就是一个迷你版的RabbitMQ服务器,它拥有自己的交换器和队列,更重要的是虚拟主机拥有自己的权限机制,一个服务器能够创建多个虚拟主机.那么我们在使用RabbitMQ服务器的时候,只需要讲一个应用程序对应一个虚拟主机,这种各个实例间逻辑上的分离就能够保证不同的应用程序安全的传递信息.

默认的虚拟主机是"/".


简单实例

开启RabbitMQ服务,这里使用的是docker,具体安装可看上篇博文.

pom.xml

<!--RabbitMQ-client-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>

ConnectionUtil.java

package org.alva.Utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,RabbitMQ的连接工具类
 * <详细介绍>,
 *
 */
public class ConnectionUtil {
    public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
        //1.定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置服务器地址
        connectionFactory.setHost(host);
        //3.设置端口
        connectionFactory.setPort(port);
        //4.设置虚拟主机,用户名,密码
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        //5.通过连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

Producer.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,生产者
 * <详细介绍>,
 *
 */
public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.声明通道
        Channel channel = connection.createChannel();
        //3.声明(创建)队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4.定义消息内容
        String message = "hello rabbitmq";
        //5.发布消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("[x] send'"+message+"'");
        //6.关闭通道和连接
        channel.close();
        connection.close();

    }
}

Consumer.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,消费者
 * <详细介绍>,
 *
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.声明通道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4.定义队列的消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5.监听队列
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
        //6.获取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received '" + message + "'");
        }
    }
}

转载于:https://www.cnblogs.com/Alva-mu/p/9493141.html

相关文章:

  • rinted端口转发工具
  • 数据结构(五)图---最短路径(弗洛伊德算法)
  • bind,apply,call,caller,callee还傻傻分不清楚?
  • linux常用命令三
  • 作业4
  • 文件比较命令(comp)
  • lodash的一些实用的方法 TODO
  • UVA11853-Paintball(对偶图)
  • vue版 文字滚动
  • 面试题:合并两个排序的链表
  • .NET CORE 第一节 创建基本的 asp.net core
  • 3ds Max学习日记(九)
  • 【Linux】time+dd测试硬盘读写速度
  • [洛谷P2801]教主的魔法
  • 共享服务-FTP基础(一)
  • 【每日笔记】【Go学习笔记】2019-01-10 codis proxy处理流程
  • 【刷算法】从上往下打印二叉树
  • 345-反转字符串中的元音字母
  • Django 博客开发教程 8 - 博客文章详情页
  • docker-consul
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • leetcode388. Longest Absolute File Path
  • Spark RDD学习: aggregate函数
  • Web Storage相关
  • 道格拉斯-普克 抽稀算法 附javascript实现
  • 看域名解析域名安全对SEO的影响
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 深度学习中的信息论知识详解
  • 译自由幺半群
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 【运维趟坑回忆录 开篇】初入初创, 一脸懵
  • Spring第一个helloWorld
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • (2)关于RabbitMq 的 Topic Exchange 主题交换机
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (附源码)ssm旅游企业财务管理系统 毕业设计 102100
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (六) ES6 新特性 —— 迭代器(iterator)
  • (六)c52学习之旅-独立按键
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转)项目管理杂谈-我所期望的新人
  • (轉貼) 蒼井そら挑戰筋肉擂台 (Misc)
  • **python多态
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET 动态调用WebService + WSE + UsernameToken
  • .NET6实现破解Modbus poll点表配置文件
  • .NET面试题(二)
  • /dev/sda2 is mounted; will not make a filesystem here!
  • @kafkalistener消费不到消息_消息队列对战之RabbitMq 大战 kafka
  • [1]-基于图搜索的路径规划基础
  • [20171113]修改表结构删除列相关问题4.txt
  • [ai笔记9] openAI Sora技术文档引用文献汇总
  • [Angularjs]asp.net mvc+angularjs+web api单页应用之CRUD操作