当前位置: 首页 > news >正文

RabbitMQ 入门指南(Java)

RabbitMQ是一个受欢迎的消息代理,通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成。本文简单介绍了如何使用 RabbitMQ,假定你已经配置好了rabbitmq服务器。

RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.0.4</version>
</dependency>

gradle

compile 'com.rabbitmq:amqp-client:4.1.0'

像RabbitMQ这样的消息代理可用来模拟不同的场景,例如点对点的消息分发或者订阅/推送。我们的程序足够简单,有两个基本的组件,一个生产者用于产生消息,还有一个消费者用来使用产生的消息。

在这个例子里,生产者会产生大量的消息,每个消息带有一个序列号,另一个线程中的消费者会使用这些消息。

抽象类EndPoint:

我们首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些。

package com.gl365.payment.util;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * date: 2017年4月26日 下午12:07:22 <br/>
 *
 * @author lenovo
 */
public abstract class EndPoint {

    protected Channel channel;
    protected Connection connection;
    protected String endPointName;

    public EndPoint(String endpointName) throws IOException, TimeoutException {
        this.endPointName = endpointName;

        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();

        // hostname of your rabbitmq server
        factory.setHost("localhost");
        //factory.setHost("192.168.163.33");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // getting a connection
        connection = factory.newConnection();

        // creating a channel
        channel = connection.createChannel();

        // declaring a queue for this channel. If queue does not exist,
        // it will be created on the server.
        channel.queueDeclare(endpointName, false, false, false, null);
    }

    /**
     * 关闭channel和connection。并非必须,因为隐含是自动调用的。
     * 
     * @throws IOException
     * @throws TimeoutException 
     */
    public void close() throws IOException, TimeoutException {
        this.channel.close();
        this.connection.close();
    }
}

 

生产者:

生产者类的任务是向队列里写一条消息。我们使用Apache Commons Lang把可序列化的Java对象转换成 byte 数组。commons lang的maven依赖如下:

<dependency>
	<groupId>commons-lang</groupId>
	<artifactId>commons-lang</artifactId>
	<version>2.6</version>
</dependency>
package com.gl365.payment.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeoutException;

import org.springframework.util.SerializationUtils;

public class Producer extends EndPoint{
    
    public Producer(String endPointName) throws IOException, TimeoutException{
        super(endPointName);
    }

    public void sendMessage(Serializable object) throws IOException {
        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
    }  
}
/**
 * Project Name:payment
 * File Name:Test.java
 * Package Name:com.gl365.payment.util
 * Date:2017年4月26日下午5:49:16
 * Copyright (c) 2017, chenzhou1025@126.com All Rights Reserved.
 *
*/

package com.gl365.payment.util;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

/**
 * ClassName:Test <br/>
 * Date:     2017年4月26日 下午5:49:16 <br/>
 * @author   lenovo
 * @see      
 */
public class Test {

    public static void main(String[] args) throws IOException, TimeoutException {

        Producer producer = new Producer("duan");
        
        for (int i = 0; i < 100000; i++) {
            HashMap message = new HashMap();
            message.put("message number", i);
            producer.sendMessage(message);
            System.out.println("Message Number "+ i +" sent.");
        }

        System.out.println("finish");
    }

}

 

消费者:

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.gl365.payment.util;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * 读取队列的程序端,实现了Runnable接口。
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

    public QueueConsumer(String endPointName) throws IOException, TimeoutException {
        super(endPointName);
    }

    public void run() {
        try {
            // start consuming messages. Auto acknowledge messages.
            channel.basicConsume(endPointName, true, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * Called when consumer is registered.
     */
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer " + consumerTag + " registered");
    }

    /**
     * Called when new message is available.
     */
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body)
            throws IOException {
        Map map = (HashMap) SerializationUtils.deserialize(body);
        System.out.println("Message Number " + map.get("message number") + " received.");

    }

    public void handleCancel(String consumerTag) {
    }

    public void handleCancelOk(String consumerTag) {
    }

    public void handleRecoverOk(String consumerTag) {
    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {
    }
}

 

Putting it together:

在下面的测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。

package com.gl365.payment.util;
 
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
 
public class ConsumeTest {
    public ConsumeTest() throws Exception{
         
        QueueConsumer consumer = new QueueConsumer("duan");
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
    }
     
    /**
     * @param args
     * @throws SQLException 
     * @throws IOException 
     */
    public static void main(String[] args) throws Exception{
      new ConsumeTest();
    }
}

 

 

相关文章:

  • iOS用libcurl发起一个get请求,并保存返回数据到沙盒
  • Maven系列--安装与部署(Win7)
  • 主键自增设置
  • $L^p$ 调和函数恒为零
  • python3基础(七)函数基础
  • php的命名空间
  • java Web相关零碎整理--厚积薄发
  • Hibernate执行原生SQL返回ListMap类型结果集
  • Android编译过程详解(一)
  • 【bootstrap】modal模态框的几种打开方法+问题集锦
  • denyhost防止SSH暴力破解
  • [国家集训队2012]middle
  • Design Pattern: Prototype 模式
  • Linux环境下shell和vim中乱码原因及消除办法
  • 利用Docker轻松玩转Cassandra
  • 《深入 React 技术栈》
  • echarts的各种常用效果展示
  • Elasticsearch 参考指南(升级前重新索引)
  • express如何解决request entity too large问题
  • in typeof instanceof ===这些运算符有什么作用
  • Iterator 和 for...of 循环
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • React as a UI Runtime(五、列表)
  • socket.io+express实现聊天室的思考(三)
  • Vue2.x学习三:事件处理生命周期钩子
  • vue脚手架vue-cli
  • XForms - 更强大的Form
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 简单数学运算程序(不定期更新)
  • 快速构建spring-cloud+sleuth+rabbit+ zipkin+es+kibana+grafana日志跟踪平台
  • 前端_面试
  • 十年未变!安全,谁之责?(下)
  • 一个6年java程序员的工作感悟,写给还在迷茫的你
  • HanLP分词命名实体提取详解
  • puppet连载22:define用法
  • 阿里云API、SDK和CLI应用实践方案
  • #define,static,const,三种常量的区别
  • (2)(2.10) LTM telemetry
  • (附源码)ssm考生评分系统 毕业设计 071114
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (九)c52学习之旅-定时器
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (一)Neo4j下载安装以及初次使用
  • (一)为什么要选择C++
  • * CIL library *(* CIL module *) : error LNK2005: _DllMain@12 already defined in mfcs120u.lib(dllmodu
  • 、写入Shellcode到注册表上线
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .net core 6 使用注解自动注入实例,无需构造注入 autowrite4net
  • .NET 反射的使用
  • .NET程序员迈向卓越的必由之路
  • .NET轻量级ORM组件Dapper葵花宝典
  • .NET使用存储过程实现对数据库的增删改查
  • @GlobalLock注解作用与原理解析
  • @RequestMapping处理请求异常