RabbitMQ学习系列(五):routing路由模式和Topic主题模式
(一)routing路由模式
在前面一篇博客中讲到了exchange的类型,其中direct类型的exchange就是用于routing路由模式。direct类型的交换机是指:交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。
如上图所示,当生产者发送消息的路由键为error时,两个队列均可以收到消息;当生产者发送消息的路由键为info或warning时,只有第二个队列可收到消息。
(二)路由模式实践
2.1 工具类
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//设置VHOSTS
factory.setVirtualHost("/vhosts_sdxb");
//设置用户名
factory.setUsername("user_sdxb");
factory.setPassword("123456");
return factory.newConnection();
}
}
2.2 生产者
与前面几种模式不同的地方已经通过注解标出,其中交换机类型选择成direct,并且按需要设置路由键
public class Sent {
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//交换机类型选择direct
channel.exchangeDeclare(EXCHANGENAME,"direct");
String msg="hello world";
//设置路由键(routingkey)
String routingkey="error";
channel.basicPublish(EXCHANGENAME,routingkey,null,msg.getBytes());
channel.close();
connection.close();
}
}
2.3 消费者一
public class Receive1 {
private static final String QUEUENAME="routing_queue1";
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
//在队列绑定时设置路由键
channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUENAME, true, consumer);
}
}
2.4 消费者二
public class Receive2 {
private static final String QUEUENAME="routing_queue2";
private static final String EXCHANGENAME="routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
//在队列绑定时设置多个路由键
channel.queueBind(QUEUENAME, EXCHANGENAME, "error");
channel.queueBind(QUEUENAME, EXCHANGENAME, "info");
channel.queueBind(QUEUENAME, EXCHANGENAME, "warning");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUENAME, true, consumer);
}
}
2.5运行结果
当路由键为error时,两个消费者均输出消息。当路由键为info或warning时,只有第二个消费者输出消息。
(三)Topic主题模式
Topic主题模式和routing路由模式类似,只不过这里的交换机使用的是topic类型,topic类型的交换机和direct的不同就在于topic可以匹配通配符。*代表匹配一个元素,#代表匹配一个或多个元素
以上图为例,Q1队列的路由键为*.orange.*,Q2队列的路由键为*.*.rabbit和lazy.#,当消息的路由键为quick.orange.rabbit时,两个队列均能收到,当消息的路由键为lazy.orange.male.rabbit时,只有Q2队列能收到。
(四)主题模式实践
4.1 生产者
//交换机类型选择direct
channel.exchangeDeclare(EXCHANGENAME,"topic");
//设置路由键(routingkey)
String routingkey="lazy.brown.fox";
4.2 消费者一
channel.queueBind(QUEUENAME, EXCHANGENAME, "*.orange.*");
4.3 消费者二
channel.queueBind(QUEUENAME, EXCHANGENAME, "*.*.rabbit");
channel.queueBind(QUEUENAME, EXCHANGENAME, "lazy.#");