当前位置: 首页 > news >正文

RabbitMQ常用消息模式

目录

1、RabitMQ工作队列

2、交换机

3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)

3.1、创建连接代码

3.1、生产者代码

3.2、消费者代码

4、Direct路由模式

4.1、生产者代码

4.2、消费者代码

5、Topic主题模式

5.1、生产者代码

5.2、消费者代码


1、RabitMQ工作队列

        默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

采用工作队列

        在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。

2、交换机

Exchange:在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费。

 生产者将消息发送到Exchange, 由Exchange再路由到一个或多个队列中:

        /Virtual Hosts:区分不同的团队

        交换机exchabge: 路由消息存放在那个队列中 类似于nginx

        队列queue:队列 存放消息

       路由key(RoutingKey): 分发规则,生产者将消息发送给交换机的时候, 会指定RoutingKey指定路由规则。

交换机种类:

        Direct exchange(直连交换机)

        Fanout exchange(扇型交换机)

        Topic exchange(主题交换机)

        Headers exchange(头交换机)

3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)

        生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。同一条消息,经路由器转发 c1和c2都能收到。

 

原理:

  1. 需要创建两个队列 ,每个队列对应一个消费者;
  2. 队列需要绑定我们交换机;
  3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
  4. 消费者从队列中获取这个消息;

3.1、创建连接代码

导入依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 获取连接
 */
public class RabbitMQConnection {

    /**
     * 创建连接
     *
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.配置Host
        connectionFactory.setHost("127.0.0.1");
        //3.设置Port
        connectionFactory.setPort(5672);
        //4.设置账户和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //5.设置VirtualHost
        connectionFactory.setVirtualHost("/rkVirtualHost");
        return connectionFactory.newConnection();
    }
}

3.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerFanout {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机    绑定交换机 将消息发送到该交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        String msg = "rk 学 rabbitmq";
        //发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        //关闭连接
        channel.close();
        connection.close();
    }
}

3.2、消费者代码

邮件消费者:

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "fanout_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列   将交换机和队列绑定起来  
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            
            //从队列中获取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

短信消费者:

        和邮件消费者代码一样,唯一不同的就是 QUEUE_NAME 变成了fanout_sms_queue

4、Direct路由模式

        当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息。

 

4.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerDirect {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        String msg = "rk 6666";
        //投递的消息的 路由键为 email  所以只有邮件消费者才能收到
        channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());
        channel.close();
        connection.close();
    }

}

4.2、消费者代码

邮件消费者:

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "direct_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列   关联交换机和队列   路由键为email
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

短信消费者:

只有队列和路由键变了:QUEUE_NAME = "direct_sms_queue";    路由键为: sms

5、Topic主题模式

当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。

#号表示支持匹配多个词;

*号表示只能匹配一个词

        如图所示:生产者的路由键为mayikt.sms发送给交换机的消息会转发给邮件队列

5.1、生产者代码

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerTopic {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机  设置为topic主题模式
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        String msg = "rk 6666";
        //发送消息   rk.sms
        channel.basicPublish(EXCHANGE_NAME, "rk.sms", null, msg.getBytes());
        //关闭连接
        channel.close();
        connection.close();
    }

}

5.2、消费者代码

邮件消费者:

import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "topic_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列  绑定路由器和队列 设置路由键为rk.*
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "rk.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //接收队列消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

短信消费者:

只有队列和路由键变了:QUEUE_NAME = "topic_sms_queue";    路由键为:luo.*

相关文章:

  • Spring中的11个扩展点
  • AcWing 蓝桥杯AB组辅导课 01、递归与递推
  • 2022软考高项十大领域知识整理(三)--项目质量管理、沟通管理
  • spring boot在线投票系统 毕业设计源码141307
  • 申请专利申请流程,专利申请详细步骤
  • echarts文档解读
  • 稀疏矩阵的压缩存储
  • 动手学习深度学习 08:机器视觉
  • CNN(卷积网络)如何处理Size(Shape)大小可变的输入图像数据
  • 小鱼的一键安装系列
  • Spring Cloud Alibaba-Ribbon的源码分析
  • MASA MAUI Plugin IOS蓝牙低功耗(三)蓝牙扫描
  • 30岁以上的程序员还死磕技术,别说拿高薪,可能连饭碗都会保不住
  • numpy快速处理数据学习笔记
  • C++多态详解
  • 2019.2.20 c++ 知识梳理
  • If…else
  • Node 版本管理
  • oldjun 检测网站的经验
  • RxJS: 简单入门
  • underscore源码剖析之整体架构
  • Vue2.0 实现互斥
  • 对JS继承的一点思考
  • 力扣(LeetCode)56
  • 浏览器缓存机制分析
  • 你不可错过的前端面试题(一)
  • 区块链将重新定义世界
  • 三栏布局总结
  • 深度学习中的信息论知识详解
  • 树莓派 - 使用须知
  • 详解NodeJs流之一
  • 学习Vue.js的五个小例子
  • 一道面试题引发的“血案”
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • 阿里云服务器如何修改远程端口?
  • 移动端高清、多屏适配方案
  • ​Base64转换成图片,android studio build乱码,找不到okio.ByteString接腾讯人脸识别
  • ###项目技术发展史
  • #我与Java虚拟机的故事#连载18:JAVA成长之路
  • (16)Reactor的测试——响应式Spring的道法术器
  • (JS基础)String 类型
  • (二)正点原子I.MX6ULL u-boot移植
  • (附源码)apringboot计算机专业大学生就业指南 毕业设计061355
  • (十一)手动添加用户和文件的特殊权限
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (原)本想说脏话,奈何已放下
  • .net websocket 获取http登录的用户_如何解密浏览器的登录密码?获取浏览器内用户信息?...
  • .net 中viewstate的原理和使用
  • .NET 中小心嵌套等待的 Task,它可能会耗尽你线程池的现有资源,出现类似死锁的情况
  • /bin/bash^M: bad interpreter: No such file ordirectory
  • /deep/和 >>>以及 ::v-deep 三者的区别
  • ?php echo $logosrc[0];?,如何在一行中显示logo和标题?
  • @SentinelResource详解
  • @SuppressLint(NewApi)和@TargetApi()的区别