背景:目的实现web网页端消息推送通知。传统做法,ajax轮询则太耗费资源,所以改用swoole的websocket服务器实现功能。

1.用户登录成功,js的websocket进行握手;

2.服务端接收到用户握手信息,保存当前用户的设备id(即fd)至redis中,并执行消息查询推送动作;

3.产生新消息时,通过curl调用请求swoole的HttpServer服务的request进行触发消息推送;

4.定时查询消息,主动广播推送,Server服务的onWorkerStart进行触发推送。


注意:代码是基于thinkphp5开发的,swoole扩展类可通过composer下载

建立server.php文件

namespace app\websocket\controller;

use think\swoole\Server as swoole_server;

class Server extends swoole_server{
    // 监听所有地址
    protected $host = '0.0.0.0';
    //监听 9501 端口
    protected $port = 9501;
    //指定运行模式为多进程
    protected $mode = SWOOLE_PROCESS;
    protected $serverType = 'socket';
    // 指定 socket 的类型为 ipv4 的 tcp socket
    protected $sockType = SWOOLE_SOCK_TCP;
    //配置项
    protected $option = [
        /**
         *  设置启动的worker进程数
         *  业务代码是全异步非阻塞的,这里设置为CPU的1-4倍最合理
         *  业务代码为同步阻塞,需要根据请求响应时间和系统负载来调整
         */
        'worker_num' => 4,
        // 守护进程化
        'daemonize'  => false,
        // 监听队列的长度
        'backlog'    => 128
    ];

    public function __construct(){
        parent::__construct();swoole_set_process_name("swoole_websocket");//设置进程名称,方便重启服务
    }

    /**
     * 握手连接
     * @param $server
     * @param $request
     */
    public function onOpen($server, $request){
        echo "server: handshake success with fd{$request->fd}\n";
        //创建新在线fd
        $redis = new \app\common\controller\RedisBase();
        $redis->setFD($request->fd);

        $result = $this->getOrder();
        if($result > 0) {
            foreach ($server->connections as $fd) {
                $server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
            }
        }
    }

    /**
     * 定时执行广播推送
     * @param $server
     * @param $workerId
     */
    public function onWorkerStart($server, $workerId){

        if ($workerId == 0) {
            # 定时推送数据
            $server->tick(5000, function() use ($server) {
                $result = $this->getOrder();
                if($result > 0) {
                    foreach ($server->connections as $fd) {
                        $server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
                    }
                }
            });
        }
    }

    /**
     * 发送消息(客户端触发)
     * @param $server
     * @param $frame
     */
    public function onMessage($server, $frame){
        $result = $this->getOrder();
        if($result > 0) {
            foreach ($server->connections as $fd) {
                $server->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
            }
        }
    }

    /**
     * 发送消息(链接触发)
     * @param $server
     * @param $frame
     */
    public function onRequest($server, $frame){

        $result = $this->getOrder();
        if($result > 0) {
            $redis = new \app\common\controller\RedisBase();
            $fds = $redis->getFD();
            if($fds) {
                //存在在线用户才进行推送
                foreach ($fds as $fd) {
                     //校验fd是否存在
                    $res_fd = $this->swoole->exist($fd);
                    if($res_fd) {
                        $this->swoole->push($fd, json_encode(['data' => ['count' => $result], 'code' => 200, 'message' => 'getOrder']));
                    }else{
                        $redis = new \app\common\controller\RedisBase();
                        $redis->delFD($fd);
                    }
                }
            }
        }
    }

    /**
     * 退出
     * @param $server
     * @param $fd
     */
    public function onClone($server, $fd){
        echo "client {$fd} closed\n";
        //删除在线fd
        $redis = new \app\common\controller\RedisBase();
        $redis->delFD($fd);
    }

}

swoole扩展类

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2014 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use swoole_http_server;
use swoole_server;
use swoole_websocket_server;

/**
 * Worker控制器扩展类
 */
abstract class Server
{
    protected $swoole;
    protected $serverType;
    protected $sockType;
    protected $mode;
    protected $host   = '0.0.0.0';
    protected $port   = 9501;
    protected $option = [];

    /**
     * 架构函数
     * @access public
     */
    public function __construct()
    {
        // 实例化 Swoole 服务
        switch ($this->serverType) {
            case 'socket':
                $this->swoole = new swoole_websocket_server($this->host, $this->port);
                $eventList    = ['Open', 'Message', 'Close', 'HandShake','WorkerStart','Request'];
                break;
            case 'http':
                $this->swoole = new swoole_http_server($this->host, $this->port);
                $eventList    = ['Request'];
                break;
            default:
                $this->swoole = new swoole_server($this->host, $this->port, $this->mode, $this->sockType);
                $eventList    = ['Start', 'ManagerStart', 'ManagerStop', 'PipeMessage', 'Task', 'Packet', 'Finish', 'Receive', 'Connect', 'Close', 'Timer', 'WorkerStart', 'WorkerStop', 'Shutdown', 'WorkerError'];

        }
        // 设置参数
        if (!empty($this->option)) {
            $this->swoole->set($this->option);
        }
        // 初始化
        $this->init();

        // 设置回调
        foreach ($eventList as $event) {
            if (method_exists($this, 'on' . $event)) {
                $this->swoole->on($event, [$this, 'on' . $event]);
            }
        }
    }

    protected function init()
    {
    }

    public function start()
    {
        // Run worker
        $this->swoole->start();
    }

    public function stop()
    {
        $this->swoole->stop();
    }

    /**
     * 魔术方法 有不存在的操作的时候执行
     * @access public
     * @param string $method 方法名
     * @param array $args 参数
     * @return mixed
     */
    public function __call($method, $args)
    {
        call_user_func_array([$this->swoole, $method], $args);
    }
}

curl请求触发

$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://ip:9501");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
//设置 post 数据
curl_setopt($ch, CURLOPT_POSTFIELDS, []);
curl_exec($ch);
curl_close($ch);

简单的js部分代码

var server = "ws://ip:9501";
    var ws = new WebSocket(server);
    //连接
    ws.onopen = function (evt) {
        console.log('连接成功');
    }
    //发送消息
    ws.onmessage = function (evt) {
        console.log(evt.data);
    }

    //关闭
    ws.onclose = function (evt) {
        console.log('关闭成功');
    }
    //错误
    ws.onerror = function (evt) {
        console.log('错误:'+evt.data);
    }

服务器开启swoole服务(调试阶段)

php index.php websocket/server/start

测试:

1.新建一个html文件,将js代码部分粘贴,并访问,看是否进行握手成功;

2.新建一个php文件,将curl代码部分粘贴,并浏览器访问,查看html页面是否成功接收服务器推送的消息;

3.查看Server服务定时是否正确推送,html页面是否成功接收服务器推送的消息


Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐