当前位置: 首页 > news >正文

消息推送只会用websocket、轮询?试试SSE,轻松高效。

SSE介绍

HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点:

单向通信:
SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。

持久连接:
SSE 使用 HTTP 持久连接(long-lived connection)来保持客户端与服务器之间的连接,避免频繁地重新建立连接。

事件驱动:
SSE 采用事件驱动的方式,服务器将数据封装成事件推送给客户端,客户端可以根据事件类型进行相应的处理。

简单易用:
SSE 的协议简单,基于标准的 HTTP 协议,可以在任何支持 HTTP 的环境中使用。
客户端和服务器端的实现也相对简单,开发成本较低。

可靠性:
SSE 基于 HTTP 协议,可以利用 HTTP 的重试机制来提高数据传输的可靠性。
如果连接断开,客户端可以自动重新连接并恢复数据传输。

浏览器支持:
主流浏览器(Chrome、Firefox、Safari 等)都原生支持 SSE。
对于不支持 SSE 的浏览器,可以使用 polyfill 库来实现兼容性。

应用场景:
SSE 适用于实时性要求较高的场景,如聊天应用、体育赛事直播、股票行情更新等。
与 WebSocket 相比,SSE 更加轻量级,适用于一些对实时性要求不太高但需要持续更新的场景。

总的来说,HTTP SSE 提供了一种简单、可靠、高效的服务器推送机制,可以在各种 Web 应用中得到广泛应用。它是 Web 实时通信技术的一种重要补充。

与websocket对比

HTTP Server-Sent Events (SSE) 和 WebSocket 都是实现服务器与客户端之间实时双向通信的技术,但它们在某些方面存在一些差异。以下是它们的对比:

  1. 通信模式:
    • SSE 是单向通信,服务器只能主动推送数据给客户端,客户端只能被动接收。
    • WebSocket 是双向通信,服务器和客户端可以互相发送和接收数据。
  2. 连接方式:
    • SSE 使用标准的 HTTP 连接,利用 HTTP 持久连接来保持连接。
    • WebSocket 使用独立的 WebSocket 协议,建立全双工的 TCP 连接。
  3. 传输协议:
    • SSE 使用标准的 HTTP 协议,数据以文本的形式传输。
    • WebSocket 使用自己的二进制协议,可以传输二进制数据。
  4. 浏览器支持:
    • SSE 被大多数现代浏览器原生支持。
    • WebSocket 也被大多数现代浏览器原生支持。
  5. 可靠性:
    • SSE 可以利用 HTTP 的重试机制来提高数据传输的可靠性。
    • WebSocket 建立在 TCP 协议之上,也具有较高的可靠性。
  6. 实时性:
    • SSE 的实时性略低于 WebSocket,因为它需要依赖 HTTP 的连接机制。
    • WebSocket 建立在独立的 TCP 连接之上,实时性更高。
  7. 应用场景:
    • SSE 更适合于一些实时性要求不太高但需要持续更新的场景,如聊天应用、体育赛事直播等。
    • WebSocket 更适合于需要实时双向通信的场景,如在线游戏、视频会议等。

总的来说,SSE 和 WebSocket 都是实现服务器与客户端实时通信的有效方式,它们各有优缺点,适用于不同的应用场景。在选择时需要根据具体的需求来权衡取舍。

上代码

主体工具类 SseUtil

import com.alibaba.fastjson.JSON;
import com.enums.EnumDeviceType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** SSE 通信工具类** @author Supreme_Sir* @version V1.0.0*/
@Component
@Slf4j
public class SseUtil {/*** SSE 超时时间 24小时*/private static final Long TIMEOUT_24_HOUR = 86400000L;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate UnreadMessageCountCacheUtil unreadMessageCountCacheUtil;/*** 订阅SSE*/public SseEmitter subscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter == null) {//生成连接并存储sseEmitter = new SseEmitter(TIMEOUT_24_HOUR);SingletonConcurrentHashMap.INSTANCE.put(deviceType, userId, sseEmitter);}//设置回调函数sseEmitter.onCompletion(completionCallBack(deviceType, userId));sseEmitter.onTimeout(timeoutCallBack(deviceType, userId));sseEmitter.onError(errorCallBack(deviceType, userId));// 立即发送未读消息数量,消除前端等待Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));log.info("用户-{}-{} SSE连接成功", userId, deviceType.getName());return sseEmitter;}/*** 退订消息** @param userId 用户ID*/public String unsubscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {log.info("用户-{}-{} 主动断开连接", userId, deviceType.getName());//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。sseEmitter.complete();SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);}return "退订成功";}/*** 发送SSE消息** @param userId  用户ID* @param content 消息内容*/public void sendMessage(Long userId, SseMessageVo content) {for (EnumDeviceType deviceType : EnumDeviceType.values()) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {try {log.info("向用户-{} SSE发送消息-{}", userId, JSON.toJSONString(content));sseEmitter.send(content);} catch (IOException e) {log.error("用户-{}-{} SSE发送消息异常-{}", userId, deviceType.getName(), e.getMessage());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);log.error("用户-{}-{} SSE发送消息异常被移除", userId, deviceType.getName());}}}}/*** SSE 单向通信心跳检测(需配合定时任务)*/public void heartbeat() {SingletonConcurrentHashMap.INSTANCE.getMap().forEach((key, value) -> {Long userId = extractNumbers(key.toString());Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));});}/*** SSE 连接成功回调** @param userId 用户ID*/private Runnable completionCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> log.info("用户-{}-{} SSE连接断开", userId, deviceType.getName()));}/*** 出现超时,将当前用户缓存删除** @param userId 用户ID*/private Runnable timeoutCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.error("用户-{}-{} SSE连接超时", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接超时被移除", userId, deviceType.getName());});}/*** 出现异常,将当前用户缓存删除** @param userId 用户ID*/private Consumer<Throwable> errorCallBack(EnumDeviceType deviceType, Long userId) {return throwable -> {log.error("用户-{}-{} SSE连接异常", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接异常被移除", userId, deviceType.getName());};}/*** 截取字符串中的数字** @param input 待截取的字符串*/private Long extractNumbers(String input) {Pattern pattern = Pattern.compile("[a-zA-Z](\\d+)");Matcher matcher = pattern.matcher(input);if (matcher.find()) {// 返回第一个匹配的数字序列return Long.valueOf(matcher.group(1));} else {// 如果没有找到匹配项,可以返回null或抛出异常return null;}}
}

要点:

  1. 新建好的 SSE 对象需要用容器存储起来,以服务于后续消息通信。
  2. 回调使用 ThreadPool 进行管理避免线程过多。
  3. 一个 SSE 对象只能与一端保持通信,如果存在多端的话,需要创建多个对象。

SSE对象单例存储容器 SingletonConcurrentHashMap

import com.enums.EnumDeviceType;
import lombok.Getter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;/*** 基于ConcurrentHashMap的单例版SSE存储容器*/
@Getter
public enum SingletonConcurrentHashMap {/*** 单例版存储容器*/INSTANCE;private final ConcurrentHashMap<Object, SseEmitter> map = new ConcurrentHashMap<>();/*** 存入对象*/public void put(EnumDeviceType deviceType, Object key, SseEmitter value) {map.put(deviceType.getCode() + key, value);}/*** 获取对象*/public SseEmitter get(EnumDeviceType deviceType, Object key) {return map.get(deviceType.getCode() + key);}/*** 判断缓存中是否存在当前用户的SSE实例** @param key 用户ID*/public boolean haveInstance(Object key) {// 分别查询PC、小程序的SSE实例for (EnumDeviceType deviceType : EnumDeviceType.values()) {if (map.get(deviceType.getCode() + key) != null) {return true;}}return false;}/*** 移除对象*/public void remove(EnumDeviceType deviceType, Object key) {map.remove(deviceType.getCode() + key);}/*** 判断是否存在*/public boolean containsKey(EnumDeviceType deviceType, Object key) {return map.containsKey(deviceType.getCode() + key);}/*** 获取对象数量*/public int size() {return map.size();}/*** 清空*/public void clear() {map.clear();}}

心跳数据缓存工具 UnreadMessageCountCacheUtil

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.redis.RedisTemplateUtils;
import com.enums.EnumYesOrNo;
import com.util.RedisKeyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** @author Supreme_Sir* @description 未读消息条数缓存工具**/
@Component
@Slf4j
public class UnreadMessageCountCacheUtil {@Resourceprivate IDao dao;// 过期时间30分钟private static final Long TIMEOUT = 30L;/*** 添加缓存*/private void put(Long key, Object value) {if (Objects.isNull(key) || Objects.isNull(value)) {return;}RedisTemplateUtils.setCacheObject(RedisKeyUtils.getUnreadMessageCount() + key, value, TIMEOUT, TimeUnit.MINUTES);}/*** 获取缓存(缓存中如果没有则回数据库查询)*/public Long getWithCallBack(Long key) {if (Objects.isNull(key)) {return null;}Object cnt = RedisTemplateUtils.getCacheObject(RedisKeyUtils.getUnreadMessageCount() + key);if (Objects.isNull(cnt)) {cnt = queryCount(key);put(key, cnt);}return Long.valueOf(cnt.toString());}/*** 获取最新缓存** @return {@link Long} 最新未读数据条数*/public Long getWithRefresh(Long key) {if (Objects.isNull(key)) {return null;}Long cnt = queryCount(key);put(key, cnt);return cnt;}/*** 手动刷新缓存*/public void refresh(Long key) {if (Objects.isNull(key)) {return;}put(key, queryCount(key));}/*** 回库查询未读消息条数** @param userId 用户ID* @return {@link Long} 未读消息数量*/private Long queryCount(Long userId) {QueryWrapper<> wrapper = new QueryWrapper<>();// 连接数据库查询数据return dao.selectCount(wrapper);}
}

注意:该缓存工具对象由 Spring 容器管理,以确保单例。

前端关键代码

import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
fetchEventSource(`${env.VITE_API_URL_PREFIX}/xxx/sse/xxx`, {signal: ctrl.signal,method: 'POST',headers: {'Auth-Token': localStorage.getItem(TOKEN_NAME),},body: JSON.stringify({UserID: localStorage.getItem('userID'),}),openWhenHidden: true,onopen: async (event: any) => {console.log('sse open:', event);},onmessage: async (event: any) => {const data = JSON.parse(event.data);this.setMsgCount(data.UnreadMsgCount || 0);console.log('SSE 消息:', data);if (data.Data) {const NotifyInstance = await NotifyPlugin.info({class: 'global-notify-card-wrap',icon: false,duration: 10000,closeBtn: false,offset: [0, 53],content: (h) =>h(MessageBox, {Data: data.Data,onHide: () => {NotifyInstance.close();},}),} as any);}},
});
this.see = {close: () => ctrl.abort(),
};

-------------------------------------------风雨里做个大人,阳光下做个孩子。-------------------------------------------

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Linux云计算 |【第二阶段】AUTOMATION-DAY5
  • moment.js的使用方法
  • mysql数据库知识总结
  • 10个append()函数在Python程序开发中的创新应用
  • 如何搭建数字人直播系统?快速上手方法来了!
  • docker安装phpMyAdmin
  • MySQL频繁超时原因调查
  • 达梦数据库归档介绍
  • MyBatis TypeHandler详解
  • 微信小程序教程007:数据绑定
  • Spring Task详解
  • 瑞芯微平台RK3568系统开发(2)Camera 开发2
  • PHP压缩打包,下载目录或者文件,解压zip文件
  • 【27】23种设计模式
  • UGUI跟随鼠标
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • Javascript编码规范
  • js写一个简单的选项卡
  • PHP的类修饰符与访问修饰符
  • PHP面试之三:MySQL数据库
  • vue从创建到完整的饿了么(11)组件的使用(svg图标及watch的简单使用)
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • windows下如何用phpstorm同步测试服务器
  • 多线程事务回滚
  • 基于Dubbo+ZooKeeper的分布式服务的实现
  • 极限编程 (Extreme Programming) - 发布计划 (Release Planning)
  • 模型微调
  • 通过几道题目学习二叉搜索树
  • 微信小程序--------语音识别(前端自己也能玩)
  • 用 Swift 编写面向协议的视图
  • scrapy中间件源码分析及常用中间件大全
  • 策略 : 一文教你成为人工智能(AI)领域专家
  • ​Python 3 新特性:类型注解
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • # Kafka_深入探秘者(2):kafka 生产者
  • # 利刃出鞘_Tomcat 核心原理解析(七)
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #14vue3生成表单并跳转到外部地址的方式
  • #我与Java虚拟机的故事#连载06:收获颇多的经典之作
  • $(document).ready(function(){}), $().ready(function(){})和$(function(){})三者区别
  • (HAL)STM32F103C6T8——软件模拟I2C驱动0.96寸OLED屏幕
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (八)c52学习之旅-中断实验
  • (附源码)ssm教材管理系统 毕业设计 011229
  • (每日持续更新)jdk api之FileReader基础、应用、实战
  • (十六)串口UART
  • (未解决)macOS matplotlib 中文是方框
  • (一)kafka实战——kafka源码编译启动
  • *setTimeout实现text输入在用户停顿时才调用事件!*
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .Net Core webapi RestFul 统一接口数据返回格式
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .net mvc部分视图