php swoole websocket消息推送
背景:目的实现web网页端消息推送通知。传统做法,ajax轮询则太耗费资源,所以改用swoole的websocket服务器实现功能。1.用户登录成功,js的websocket进行握手;2.服务端接收到用户握手信息,保存当前用户的设备id(即fd)至redis中,并执行消息查询推送动作;3.产生新消息时,通过curl调用请求swoole的HttpServer服务的request进行触发消息推送;4.
背景:目的实现web网页端消息推送通知。传统做法,ajax轮询则太耗费资源,所以改用swoole的websocket服务器实现功能。
1.用户登录成功,js的websocket进行握手;
2.服务端接收到用户握手信息,保存当前用户的设备id(即fd)至redis中,并执行消息查询推送动作;
3.产生新消息时,通过curl调用请求swoole的HttpServer服务的request进行触发消息推送;
4.定时查询消息,主动广播推送,Server服务的onWorkerStart进行触发推送。
注意:代码是基于thinkphp5开发的,swoole扩展类可通过composer下载
建立server.php文件
namespace app\websocket\controller;
use think\swoole\Server as swoole_server;
class Server extends swoole_server{
// 监听所有地址
protected $host = '0.0.0.0';
//监听 9501 端口
protected $port = 9501;
//指定运行模式为多进程
protected $mode = SWOOLE_PROCESS;
protected $serverType = 'socket';
// 指定 socket 的类型为 ipv4 的 tcp socket
protected $sockType = SWOOLE_SOCK_TCP;
//配置项
protected $option = [
/**
* 设置启动的worker进程数
* 业务代码是全异步非阻塞的,这里设置为CPU的1-4倍最合理
* 业务代码为同步阻塞,需要根据请求响应时间和系统负载来调整
*/
'worker_num' => 4,
// 守护进程化
'daemonize' => false,
// 监听队列的长度
'backlog' => 128
];
public function __construct(){
parent::__construct();swoole_set_process_name("swoole_websocket");//设置进程名称,方便重启服务
}
/**
* 握手连接
* @param $server
* @param $request
*/
public function onOpen($server, $request){
echo "server: handshake success with fd{$request->fd}\n";
//创建新在线fd
$redis = new \app\common\controller\RedisBase();
$redis->setFD($request->fd);
$result = $this->getOrder();
if($result > 0) {
foreach ($server->connections as $fd) {
$server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
}
}
}
/**
* 定时执行广播推送
* @param $server
* @param $workerId
*/
public function onWorkerStart($server, $workerId){
if ($workerId == 0) {
# 定时推送数据
$server->tick(5000, function() use ($server) {
$result = $this->getOrder();
if($result > 0) {
foreach ($server->connections as $fd) {
$server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
}
}
});
}
}
/**
* 发送消息(客户端触发)
* @param $server
* @param $frame
*/
public function onMessage($server, $frame){
$result = $this->getOrder();
if($result > 0) {
foreach ($server->connections as $fd) {
$server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
}
}
}
/**
* 发送消息(链接触发)
* @param $server
* @param $frame
*/
public function onRequest($server, $frame){
$result = $this->getOrder();
if($result > 0) {
$redis = new \app\common\controller\RedisBase();
$fds = $redis->getFD();
if($fds) {
//存在在线用户才进行推送
foreach ($fds as $fd) {
//校验fd是否存在
$res_fd = $this->swoole->exist($fd);
if($res_fd) {
$this->swoole->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
}else{
$redis = new \app\common\controller\RedisBase();
$redis->delFD($fd);
}
}
}
}
}
/**
* 退出
* @param $server
* @param $fd
*/
public function onClone($server, $fd){
echo "client {$fd} closed\n";
//删除在线fd
$redis = new \app\common\controller\RedisBase();
$redis->delFD($fd);
}
}
swoole扩展类
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2014 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------
namespace think\swoole;
use swoole_http_server;
use swoole_server;
use swoole_websocket_server;
/**
* Worker控制器扩展类
*/
abstract class Server
{
protected $swoole;
protected $serverType;
protected $sockType;
protected $mode;
protected $host = '0.0.0.0';
protected $port = 9501;
protected $option = [];
/**
* 架构函数
* @access public
*/
public function __construct()
{
// 实例化 Swoole 服务
switch ($this->serverType) {
case 'socket':
$this->swoole = new swoole_websocket_server($this->host, $this->port);
$eventList = ['Open', 'Message', 'Close', 'HandShake','WorkerStart','Request'];
break;
case 'http':
$this->swoole = new swoole_http_server($this->host, $this->port);
$eventList = ['Request'];
break;
default:
$this->swoole = new swoole_server($this->host, $this->port, $this->mode, $this->sockType);
$eventList = ['Start', 'ManagerStart', 'ManagerStop', 'PipeMessage', 'Task', 'Packet', 'Finish', 'Receive', 'Connect', 'Close', 'Timer', 'WorkerStart', 'WorkerStop', 'Shutdown', 'WorkerError'];
}
// 设置参数
if (!empty($this->option)) {
$this->swoole->set($this->option);
}
// 初始化
$this->init();
// 设置回调
foreach ($eventList as $event) {
if (method_exists($this, 'on' . $event)) {
$this->swoole->on($event, [$this, 'on' . $event]);
}
}
}
protected function init()
{
}
public function start()
{
// Run worker
$this->swoole->start();
}
public function stop()
{
$this->swoole->stop();
}
/**
* 魔术方法 有不存在的操作的时候执行
* @access public
* @param string $method 方法名
* @param array $args 参数
* @return mixed
*/
public function __call($method, $args)
{
call_user_func_array([$this->swoole, $method], $args);
}
}
curl请求触发
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://ip:9501");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
//设置 post 数据
curl_setopt($ch, CURLOPT_POSTFIELDS, []);
curl_exec($ch);
curl_close($ch);
简单的js部分代码
var server = "ws://ip:9501";
var ws = new WebSocket(server);
//连接
ws.onopen = function (evt) {
console.log('连接成功');
}
//发送消息
ws.onmessage = function (evt) {
console.log(evt.data);
}
//关闭
ws.onclose = function (evt) {
console.log('关闭成功');
}
//错误
ws.onerror = function (evt) {
console.log('错误:'+evt.data);
}
服务器开启swoole服务(调试阶段)
php index.php websocket/server/start
测试:
1.新建一个html文件,将js代码部分粘贴,并访问,看是否进行握手成功;
2.新建一个php文件,将curl代码部分粘贴,并浏览器访问,查看html页面是否成功接收服务器推送的消息;
3.查看Server服务定时是否正确推送,html页面是否成功接收服务器推送的消息
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)