当前位置: 首页 > news >正文

RabbitMQ-Java版本生产与消费

---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生产---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
         //如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //将消息保存起来,重启rabbit后待消费的消息不会被删除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重启rabbit后待消费的消息都将丢失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("发送完成:"+(System.currentTimeMillis() - start));
        //关闭频道和连接
        channel.close();
        connection.close();
     }
}

---------消息消费 ---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT =  5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

相关文章:

  • Ajax学习(一)
  • window对象
  • saltstack常用远程命令
  • vuex,vue问题汇集(一)
  • ERROR   OGG-01161 Bad column index (15) specified for table
  • HNUSTOJ-1520 压缩编码
  • java json与map互相转换(一)
  • 『TensotFlow』RNN中文文本_上
  • 高并发网络编程之epoll详解(转载)
  • AdTime:DMC多层面助力企业咨询
  • JSP页面出现乱码
  • Linux 破坏性修复
  • String,StringBuffer ,StringBuilder的区别
  • Android CoordinatorLayout(六) 加入下拉功能
  • 谈谈《Dotnet core结合jquery的前后端加密解密密码密文传输的实现》一文中后端解密失败的原因...
  • [LeetCode] Wiggle Sort
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • angular2 简述
  • extract-text-webpack-plugin用法
  • github指令
  • go append函数以及写入
  • iOS | NSProxy
  • JavaScript-Array类型
  • python_bomb----数据类型总结
  • React-flux杂记
  • SQLServer之创建显式事务
  • Vue--数据传输
  • 从0搭建SpringBoot的HelloWorld -- Java版本
  • 给github项目添加CI badge
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 基于HAProxy的高性能缓存服务器nuster
  • 记一次用 NodeJs 实现模拟登录的思路
  • 聚类分析——Kmeans
  • 前端技术周刊 2019-01-14:客户端存储
  • 容器化应用: 在阿里云搭建多节点 Openshift 集群
  • 实现菜单下拉伸展折叠效果demo
  • 腾讯优测优分享 | Android碎片化问题小结——关于闪光灯的那些事儿
  • 项目管理碎碎念系列之一:干系人管理
  • 小李飞刀:SQL题目刷起来!
  • 在electron中实现跨域请求,无需更改服务器端设置
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • python最赚钱的4个方向,你最心动的是哪个?
  • 交换综合实验一
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (免费领源码)python#django#mysql校园校园宿舍管理系统84831-计算机毕业设计项目选题推荐
  • (十三)Flask之特殊装饰器详解
  • (五)IO流之ByteArrayInput/OutputStream
  • (一)使用Mybatis实现在student数据库中插入一个学生信息
  • (转)C#调用WebService 基础
  • /proc/vmstat 详解
  • :not(:first-child)和:not(:last-child)的用法
  • @Bean, @Component, @Configuration简析
  • [ 蓝桥杯Web真题 ]-布局切换
  • [3D游戏开发实践] Cocos Cyberpunk 源码解读-高中低端机性能适配策略