当前位置: 首页 > news >正文

Go 使用 RabbitMQ---------------之一

RabbitMQ 是一种消息代理。消息代理的主要目的是接收、存储并转发消息。在复杂的系统设计和微服务架构中,RabbitMQ 经常被用作中间件来处理和转发系统之间的消息,以确保数据的一致性和可靠性。正是因为提供了可靠的消息机制、跟踪机制和灵活的消息路由,常常被用于排队算法、秒杀活动、消息分发、异步处理、耗时任务等场景。

一、Go RabbitMQ 客户端&服务端 

1、代码结构

2、生产者发送消息

1). 初始化连接 rabbitmq 服务器

2). 创建 rabbitmq  通道

3). 声明队列,队列名为 go_hello,供我们发送消息

4)、交换机为默认交换机

5). 往队列里发送消息

创建 send.go 文件,通过 amqp091-go 库,将消息写入到 go_hello 这个队列 

package mainimport ("context"amqp "github.com/rabbitmq/amqp091-go""log""time"
)// 将消息写入到 go_hello 这个队列
func main() {// 1. 初始化连接(amqp://账号:密码@地址:端口默认为5672/")conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "无法连接到RabbitMQ")defer conn.Close()// 2. 创建通道ch, err := conn.Channel()failOnError(err, "无法创建通道")defer ch.Close()/*QueueDeclare声明一个队列,用于保存消息并传递给使用者。如果队列不存在,则声明会创建一个队列,或者确保现有队列匹配相同的参数。声明的每个队列都获得到空交换机“”的默认绑定,该交换机具有与队列名称匹配的路由,关键字的类型“direct”。有了这个默认绑定,可以发布直接路由到的消息,通过使用队列名称的路由关键字发布到“”来创建此队列。*/// 3. 声明队列q, err := ch.QueueDeclare("go_hello", // 队列名称false,      // 指定队列是否是持久的。如果设置为 true,则队列在服务器重启后仍然存在,消息也不会丢失。设置为 false 意味着队列是非持久的,服务器重启时队列将被删除,并且队列中的消息也会丢失。false,      // 指定当没有消费者连接到队列时,队列是否应该被自动删除。如果设置为 true,当最后一个消费者断开连接时,队列将被删除。false,      // 指定队列是否是排他的。如果设置为 true,则队列只能被声明它的连接使用,并且当连接关闭时,队列将被自动删除。false,      // 指定是否应该在声明队列时阻塞等待队列被成功创建。如果设置为 true,则函数将立即返回,不会等待队列被创建。nil,        // 可选的参数,可以传递给队列以控制其行为。在这个例子中,没有传递任何参数,所以它是 nil。)failOnError(err, "无法声明队列")// 4. 发送消息ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()body := "Hello World!"err = ch.PublishWithContext(ctx,"",     // 交换机的名称。在这个例子中,空字符串表示使用默认的交换机。交换机是 AMQP 中的一个组件,它接收消息并根据路由键将消息路由到一个或多个队列。q.Name, // 路由键,用于指定消息应该发送到哪个队列。在这个例子中,路由键是之前声明的队列的名称。false,  // 指定是否需要服务器确认消息的路由。如果设置为 true,则如果消息不能被路由到任何队列,服务器将返回一个错误。false,  // 指定是否需要服务器立即将消息路由给消费者。如果设置为 true,则如果队列上没有消费者,服务器将返回一个错误。amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "无法发布消息")log.Printf("生产者发送了消息:%s\n", body)
}func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}

 

3、消费者接收消息

创建 receive.go 文件,监听来自 RabbitMQ 的消息

package mainimport (amqp "github.com/rabbitmq/amqp091-go""log"
)func main() {// 1. 初始化连接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "无法连接到RabbitMQ")defer conn.Close()// 2. 创建通道ch, err := conn.Channel()failOnError(err, "无法创建通道")defer ch.Close()// 3. 声明队列q, err := ch.QueueDeclare("go_hello", // namefalse,      // durablefalse,      // delete when unusedfalse,      // exclusivefalse,      // no-waitnil,        // arguments)failOnError(err, "无法声明队列")// 4. 构造消费者实例msgs, err := ch.Consume(q.Name, // 消费消息的队列的名称"",     // 消费者的标识符。在这个例子中,空字符串表示没有特定的消费者标识符true,   // 指定是否自动确认消息。如果设置为 true,则消息在被接收后会自动被确认,不需要显式调用确认方法。如果设置为 false,则需要手动确认每条消息。false,  // 指定消费者是否是排他的。如果设置为 true,则队列只能被这个消费者使用,并且当消费者断开连接时,队列将被删除。false,  // 指定是否将消息路由给与消息发布者在同一台服务器上的消费者。如果设置为 true,则不会将消息路由给本地消费者。false,  // 指定是否应该在声明消费者时阻塞等待消费者被成功创建。如果设置为 true,则函数将立即返回,不会等待消费者被创建。nil,    // 可选的参数,可以传递给消费者以控制其行为。在这个例子中,没有传递任何参数,所以它是 nil。)failOnError(err, "注册消费者失败")var forever chan struct{}go func() {for d := range msgs {log.Printf("收到消息: %s", d.Body)log.Printf("正在等待消息。要退出,请按CTRL+C")}}()log.Printf("正在等待消息。要退出,请按CTRL+C")<-forever
}func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}

 

二、工作队列 

工作队列:又名任务队列,其背后的主要思想是避免立即执行资源密集型任务并必须等待其完成。相反,我们将任务安排在以后完成。使用工作队列的好处就是它能够并行的处理队列,如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了。 

1、代码结构

 仅供参考:

2、生产者 

package mainimport ("context"amqp "github.com/rabbitmq/amqp091-go""log""os""strings""time"
)func main() {// 1. 初始化连接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "无法连接到RabbitMQ")defer conn.Close()// 2. 创建通道ch, err := conn.Channel()failOnError(err, "无法创建通道")defer ch.Close()// 3. 声明队列q, err := ch.QueueDeclare("task_queue", // nametrue,         // durablefalse,        // d

相关文章:

  • Python零基础一天丝滑入门教程(非常详细)
  • layui扩展件(xm-select)实现下拉框
  • 【Python-基础】函数合集
  • EureKa是什么?
  • YOLOv10最详细全面讲解1- 目标检测-准备自己的数据集(YOLOv5,YOLOv8均适用)
  • clickhouse——ck目录介绍
  • 嵌入式要卷成下一个Java了吗?
  • Java高级面试问题及答案
  • 中科驭数驭云、超低时延网络案例双双入选第七届数字中国建设峰会数字化转型典型应用案例
  • C++ (week5):Linux系统编程3:线程
  • 数组-捡石子小游戏
  • 新零售数据中台:打造智能商业运营的核心引擎_光点科技
  • Owinps静态IP代理:跨境电商的优选解决方案
  • 【头歌】计算机网络DHCP服务器配置第二关access口配置答案
  • Linux shell命令
  • 9月CHINA-PUB-OPENDAY技术沙龙——IPHONE
  • Android优雅地处理按钮重复点击
  • es6--symbol
  • ES6之路之模块详解
  • Koa2 之文件上传下载
  • LeetCode算法系列_0891_子序列宽度之和
  • niucms就是以城市为分割单位,在上面 小区/乡村/同城论坛+58+团购
  • PAT A1120
  • python3 使用 asyncio 代替线程
  • vue:响应原理
  • vuex 学习笔记 01
  • 分享一份非常强势的Android面试题
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 理解 C# 泛型接口中的协变与逆变(抗变)
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 使用前端开发工具包WijmoJS - 创建自定义DropDownTree控件(包含源代码)
  • 我建了一个叫Hello World的项目
  • 追踪解析 FutureTask 源码
  • # AI产品经理的自我修养:既懂用户,更懂技术!
  • #Ubuntu(修改root信息)
  • #Z0458. 树的中心2
  • $.extend({},旧的,新的);合并对象,后面的覆盖前面的
  • (超详细)2-YOLOV5改进-添加SimAM注意力机制
  • (二)丶RabbitMQ的六大核心
  • (附源码)php新闻发布平台 毕业设计 141646
  • (接口自动化)Python3操作MySQL数据库
  • (论文阅读30/100)Convolutional Pose Machines
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .NET Compact Framework 多线程环境下的UI异步刷新
  • .net framework 4.0中如何 输出 form 的name属性。
  • .net 怎么循环得到数组里的值_关于js数组
  • .NET 中 GetProcess 相关方法的性能
  • .NET中GET与SET的用法
  • /run/containerd/containerd.sock connect: connection refused
  • @Responsebody与@RequestBody
  • [ 2222 ]http://e.eqxiu.com/s/wJMf15Ku
  • [【JSON2WEB】 13 基于REST2SQL 和 Amis 的 SQL 查询分析器
  • [100天算法】-二叉树剪枝(day 48)
  • [12] 使用 CUDA 加速排序算法
  • [android] 天气app布局练习