当前位置: 首页 > news >正文

SSE多服务器部署导致消息推送异常问题的处理

之前讲了SSE的基本使用,后来,在项目实际部署的时候出现了新的问题。今天通过这篇文章来基于RabbitMQ解决一下当SSE服务部署到多台服务器后,存在消息推送异常的问题。

问题描述

SSE作为单向消息推送的一种方式,其背后是一种基于HTTP请求的长连接。而当这个连接建立之后,客户端是与服务器端的某一台服务器是存在关系绑定的。如果我们将同一套代码、同一份配置文件部署到多台服务器上的时候,就可能会出现连接建立在客户端与服务端A上,而当新的需要推送的消息由服务端B或其他服务端处理并发起推送的时候,其发现自己没有建立与客户端的SSE连接,就导致了消息推送失败的问题。针对这个问题,本文给出了一个解决方案。

问题分析

  1. 由于SSE连接是客户端与某一台服务器之间是强绑定的关系,所以我们需要让持有SSE连接的服务器100%能够接收到推送消息。
  2. 由于服务端存在多台部署的情况,所以我们需要通过RabbitMQ的发布订阅(fanout)模式将一条消息同时推送给所有服务端。
  3. 由于我们将同一套代码、同一份配置文件部署到多台服务器上,且RabbitMQ的发布订阅模式需要不同名的队列(Queue)绑定到同一个交换器(Exchange)上才能实现,因此多个服务端的队列名需要动态生成。
  4. @RabbitListener 注解的 queues 参数值要求必须为硬编码的字符串或 static final 修饰的变量,所以我们为其赋值的时候不可使用字符串拼接的形式,只能通过 SpEL 表达式赋值。

代码实现

RabbitMQ 广播模式配置类

import com.xxx.core.redis.RedisTemplateUtils;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ 广播模式配置类*/
@Configuration
public class RabbitFanoutConfig {private static Long innerCounter;public static final String FANOUT_QUEUE_COUNTER_KEY = "notice.fanout.queue.counter";// 系统消息交换器public static final String FANOUT_EXCHANGE_NAME = "notice_center_fanout_exchange";// 消息消息 消息队列public static final String NOTICE_CENTER_FANOUT_QUEUE = "notice_center_fanout_";@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE_NAME);}@BeanQueue noticeCenterfanoutQueue() {// 创建非持久化且自动删除的队列,解决因服务重启导致的RabbitMQ中无用队列过多问题return QueueBuilder.nonDurable(NOTICE_CENTER_FANOUT_QUEUE + getQueueCounter()).autoDelete().build();}@BeanBinding noticeCenter4CrowdBinding(FanoutExchange fanoutExchange, Queue noticeCenterfanoutQueue) {return BindingBuilder.bind(noticeCenterfanoutQueue).to(fanoutExchange);}public static long getQueueCounter() {if (null == innerCounter) {innerCounter = RedisTemplateUtils.incrementCounter(FANOUT_QUEUE_COUNTER_KEY);}return innerCounter;}
}

消息队列编号获取函数

public class RedisTemplateUtils {protected static final RedisTemplate redisTemplate = SpringUtils.getBean("redisTemplate");/*** 原子操作增加** @param key Redis键** @return {@link long} 最新值*/public static Long incrementCounter(String key) {// 使用 RedisTemplate 的操作方法来实现原子性的递增return redisTemplate.opsForValue().increment(key);}
}

注意:我这里用了 Redis 的原子自增,其实可以用任意可以取到不重复值的方式。

消费者监听

/*** 系统消息群发-广播模式*/
@RabbitListener(queues = "#{T(String).format('notice_center_fanout_%d', T(com.xxx.config.RabbitFanoutConfig).getQueueCounter())}",ackMode = "MANUAL")
public void noticeCenterFanoutMessageListener(@Payload String dataMsg, Message receivedMessage, Channel channel)throws IOException {long deliveryTag = receivedMessage.getMessageProperties().getDeliveryTag();try {// 处理消息log.info("消费者消息,noticeCenterFanoutMessageListener:deliveryTag:{} dataMsg:{} ", deliveryTag, dataMsg);        System.out.println(dataMsg);// 确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("MQConsumer.noticeCenterFanoutMessageListener,deliveryTag={},dataMsg={},error={}", deliveryTag,dataMsg, e.getMessage());// deliveryTag:表示要拒绝的消息的交付标签。// requeue:布尔值,指示是否将消息重新排队。如果设置为 true,RabbitMQ会尝试将消息重新排队,以便稍后再次发送给其他消费者;// 如果设置为 false,则消息将被丢弃。channel.basicReject(deliveryTag, true);}
}

广播发送

/*** RabbitMQ*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RabbitMQUtil {private static final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class);/*** 广播模式发送消息* @param fanoutExchangeName 广播模式交换器名称* @param message 消息内容*/public static void sendFanoutMessage(String fanoutExchangeName,String message) {rabbitTemplate.convertAndSend(fanoutExchangeName, "", message);}
}

发送广播

RabbitMQUtil.sendFanoutMessage(RabbitFanoutConfig.FANOUT_EXCHANGE_NAME, JSONUtil.toJsonStr(noticeLogList));

----------------------------------像孩子一样,真诚。像夕阳一样,温暖。像天空一样,宁静。----------------------------------

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 光猫的 Telnet 命令(sidbg或sendcmd)和常用管理操作
  • HAProxy的详解
  • CentOS 7 下载/安装
  • stm32智能颜色送餐小车(红外光管避障)
  • MyBatis首篇-入门第一文
  • js根据指定的【时区、日期时间】获取对应时区的日期时间
  • 【HeadFirst 设计模式】单例模式的C++实现
  • CANoe软件中Trace窗口的筛选栏标题不显示(空白)的解决方法
  • 【MySQL核心】MySQL 数据恢复-ibd2sql
  • 2021年上半年网络工程师考试上午真题
  • git , nvm 快速下载安装包链接
  • 5.4 视图的创建与管理
  • windows调试ios记录
  • CH582M低功耗蓝牙温湿度传感器接入HASS
  • IDS 与 IPS:网络安全的两道防线
  • python3.6+scrapy+mysql 爬虫实战
  • 【前端学习】-粗谈选择器
  • 30天自制操作系统-2
  • gulp 教程
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • JavaScript创建对象的四种方式
  • linux安装openssl、swoole等扩展的具体步骤
  • MaxCompute访问TableStore(OTS) 数据
  • Python语法速览与机器学习开发环境搭建
  • Redis中的lru算法实现
  • tweak 支持第三方库
  • Vim Clutch | 面向脚踏板编程……
  • 高性能JavaScript阅读简记(三)
  • 工作中总结前端开发流程--vue项目
  • 极限编程 (Extreme Programming) - 发布计划 (Release Planning)
  • 一道闭包题引发的思考
  • 在Unity中实现一个简单的消息管理器
  • 湖北分布式智能数据采集方法有哪些?
  • #Linux(Source Insight安装及工程建立)
  • $(this) 和 this 关键字在 jQuery 中有何不同?
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • (转)大道至简,职场上做人做事做管理
  • (转)淘淘商城系列——使用Spring来管理Redis单机版和集群版
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .bat文件调用java类的main方法
  • .gitignore文件_Git:.gitignore
  • .net 发送邮件
  • .NET 依赖注入和配置系统
  • .NET开源的一个小而快并且功能强大的 Windows 动态桌面软件 - DreamScene2
  • .NET连接数据库方式
  • .NET轻量级ORM组件Dapper葵花宝典
  • .NET性能优化(文摘)
  • .NET中 MVC 工厂模式浅析
  • @property括号内属性讲解
  • @requestBody写与不写的情况
  • @RestControllerAdvice异常统一处理类失效原因
  • [ IO.File ] FileSystemWatcher
  • [ Linux Audio 篇 ] 音频开发入门基础知识