当前位置: 首页 > news >正文

Laravel+swoole 实现websocket长链接

需要使用 swoole 扩展

我使用的是 swoole 5.x

start 方法启动服务 和 定时器

调整 listenQueue 定时器可以降低消息通讯延迟

定时器会自动推送队列里面的消息

testMessage 方法测试给指定用户推送消息

使用 laravel console 启动

<?phpnamespace App\Console\Commands;use App\Services\SocketService;
use Illuminate\Console\Command;class WsServer extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'app:wsServer';/*** The console command description.** @var string*/protected $description = 'Command description';/*** Execute the console command.*/public function handle(){$SocketService = new SocketService();$SocketService->start();}
}

socket 服务实现代码

<?phpnamespace App\Services;use Swoole\WebSocket\Server;
use Swoole\Timer;
use Illuminate\Support\Facades\Redis;
use RedisException;
use Swoole\Http\Request;class SocketService
{public $port = 9501;public $server;public $links;public $cmds = [];public function __construct (){$this->links = collect([]);$this->server = new Server("0.0.0.0", env('APP_SOCKET_PORT', $this->port ));$this->server->on( 'open', function (Server $server, Request $request){$this->open( $server, $request );} );$this->server->on( 'message', function (Server $server, $frame){$this->message( $server, $frame );} );$this->server->on( 'close', function (Server $server, $fd){$this->close( $server, $fd );} );}public function start(){$this->linkManage();$this->listenQueue();$this->server->start();}public function print( $message, $level = 'info' ){if( is_array($message) || is_object($message) ){$message = json_encode($message, 320);}print_r( "[". date("Y-m-d H:i:s") ."] " . $level . ' ' . $message . "\n" );}public function linkManage(){Timer::tick( 100, function (){//var_dump( "listenQueue while: " . json_encode($this->cmds, 320) );$cmd = array_shift( $this->cmds );if( $cmd ){switch ( $cmd['operate'] ){case 'open':// 活跃$this->links->push( [ "fd" => $cmd['fd'], "user_id" => intval($cmd['user_id']??0), 'updated_at' => date("Y-m-d H:i:s") ] );$this->print( "添加客户端:fd = " . json_encode($cmd, 320)  );break;case 'close':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );$this->print( "删除客户端:fd = " . json_encode($cmd, 320) );break;case 'heartbeat':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){$link['updated_at'] = date("Y-m-d H:i:s");}$newLinks[] = $link;}$this->links = collect( $newLinks );break;}// $this->print( "连接数量是:" . $this->links->count() );// $this->print( "连接数量是:" . $this->links->toJson() );}$newLinks = [];foreach ( $this->links as $link ){if( strtotime( $link['updated_at'] ) < (time() - 60) ){$this->print( "长时间未心跳,删除客户端:fd = " . json_encode($link, 320) );if( $this->server->isEstablished( $link['fd'] ) ){$this->disconnect( $link['fd'], '未进行心跳' );}continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );} );}public function listenQueue(){Timer::tick( 1000, function (){// Redis::rpush( "SocketService:listenQueue", serialize(["hahah"]) )try{$element = Redis::lpop('SocketService:listenQueue');if( $element ){$this->print( "listenQueue 有新的信息哦:" . $element );$data = unserialize($element);if( ! empty( $data['user_id']) ){$links = $this->links->where( "user_id", $data['user_id'] )->values()->all();if( empty($links) ){$this->print( "没有在线用户:user_id = " . json_encode($data, 320) );//var_export( $this->links );//var_export( $links );}foreach ( $links as $link ){if( ! $this->server->isEstablished( $link['fd'] ) ){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $link['fd'] ] );continue;}try{// 生成消息数据$message = $this->makeMessage( $data['data'], $data['type'], $data['message'] );// 开始推送$this->runPush( $link['fd'], $message );}catch (\Throwable $e){$this->print( "数据推送异常:" . json_encode([ $e->getMessage(),$e->getLine(), $e->getFile() ], 320) );}}}}}catch (RedisException $e){Redis::connect();}});}public function open( Server $server, Request $request ){$params = $request->get;if( empty( $params['user_id'] ) ){$this->disconnect( $request->fd, '缺少用户信息' );return true;}array_push( $this->cmds, [ 'operate' => 'open', 'fd' => $request->fd, 'user_id' => $params['user_id'] ] );// 生成消息数据$message = $this->makeMessage( [ 'fd' => $request->fd ], "connectionSuccessful", "连接成功" );// 开始推送$this->runPush( $request->fd, $message );$this->print( "server: handshake success with fd{$request->fd} " );}public function message( Server $server, $frame ){//$data = json_decode( $frame->data, true );if( is_array( $data ) ){if( $data['type'] == "ping" ){array_push( $this->cmds, [ 'operate' => 'heartbeat', 'fd' => $frame->fd ] );$this->server->push( $frame->fd, json_encode( [ "type" => "pong" ] , 320 ) );}else{$this->print( "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish} " );}}}public function close(Server $server, $fd){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $fd ] );$this->print( "client {$fd} closed " );}public function push( $fd, string $data ){$this->server->push($fd, $data);}public function disconnect(int $fd, string $reason = '', int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL){$this->server->disconnect($fd, $code, $reason);}public function makeMessage( array $data, $type = "", $message = "" ){return [ 'type' => $type, "message" => $message, "data" => $data ];}public function runPush( $fd, $message ){$this->print( "推送消息: {$fd} - " .  json_encode(  $message, 320 ) );$this->server->push( $fd, json_encode( $message , 320 ) );}/*** App\Services\SocketService::testMessage( 92 )* @param $user_id* @return void*/public static function testMessage( $user_id ){Redis::rpush( "SocketService:listenQueue", serialize(["user_id" => $user_id,"type" =>  "testMessage", "message" => "测试消息", "data" => ["hello world!"],]) );}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Netty Websocket
  • 【数据结构进阶】二叉搜索树
  • DC-1靶场打靶第一次!!!!冲冲冲!
  • 算法日记day 16(二叉树的广度优先遍历|反转、对称二叉树)
  • Android APP 基于RecyclerView框架工程(知识体系积累)
  • 在虚拟机 CentOS7 环境下安装 MySQL5.7 数据库
  • 深入理解Linux网络(三):TCP对象创建
  • [HTML]一文掌握
  • MySQL中EXPLAIN关键字详解
  • Python入门基础教程(非常详细)
  • C++ | Leetcode C++题解之第264题丑数II
  • 轨道相互作用和带隙
  • 为什么要从C语言开始编程
  • Python 热门面试题(七)
  • 十五、公开课
  • 「前端」从UglifyJSPlugin强制开启css压缩探究webpack插件运行机制
  • 【翻译】babel对TC39装饰器草案的实现
  • 【每日笔记】【Go学习笔记】2019-01-10 codis proxy处理流程
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • CentOS6 编译安装 redis-3.2.3
  • ES6之路之模块详解
  • Git初体验
  • Linux后台研发超实用命令总结
  • Spring Boot快速入门(一):Hello Spring Boot
  • Travix是如何部署应用程序到Kubernetes上的
  • webpack+react项目初体验——记录我的webpack环境配置
  • 分享几个不错的工具
  • 浅谈Kotlin实战篇之自定义View图片圆角简单应用(一)
  • 无服务器化是企业 IT 架构的未来吗?
  • 一份游戏开发学习路线
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 第二十章:异步和文件I/O.(二十三)
  • 数据库巡检项
  • ​LeetCode解法汇总307. 区域和检索 - 数组可修改
  • # Apache SeaTunnel 究竟是什么?
  • #Linux(Source Insight安装及工程建立)
  • (4)事件处理——(6)给.ready()回调函数传递一个参数(Passing an argument to the .ready() callback)...
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (ZT)薛涌:谈贫说富
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (黑马出品_高级篇_01)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
  • (入门自用)--C++--抽象类--多态原理--虚表--1020
  • (四)linux文件内容查看
  • (学习日记)2024.01.19
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • ..回顾17,展望18
  • .NET : 在VS2008中计算代码度量值
  • .net 怎么循环得到数组里的值_关于js数组
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .NET微信公众号开发-2.0创建自定义菜单
  • .NET中的Exception处理(C#)