Swoole实时推送数据(webSwoole和httpSwoole结合使用)简单实例

Swoole 329 浏览

    最近闲来无事看了下swoole文档,表示phper看起来比较痛苦 ~ .~

    因为文档有实例,所以结合网上各种参考例子还是弄了个简单的数据推送。嗯,我这个是客户端和webSocket建立长链接通信,并绑定了定时器,5秒查询一次数据库是否有变动,有变动就推送数据到客户端(个人测试,为了方便和直观的查看数据用了mysql,用redis等也行)。这时候我还没看到onRequest在webSoket下可以接收http请求,先不表,下文一起说。

先放上实例的链接:http://www.swoole.itseekers.cn

    客户端数据测试就是ajax调用的http请求,模拟其他客户端发起http请求实时推送。当前是用户第一次建立连接后swoole推送过来的数据。当点击开始测试的时候就会发起http请求,swoole接收到请求后,就会广播给所有fd。大致就是这么个简单的例子,在实际项目中应该也能用到,比如选座:

    当我停留在选座页面时,位置被占了,就能及时推送到我这边的客户端上,我就不用在我选好提交的时候触发提示说位子被选了这种坑爹的情况了。

    这里话不多说了,上代码先。服务器端代码:

<?php

define('HOST', '0.0.0.0');
define('PORT', '9501');
define('WORKER_NUM', 4);
define('DISPATCH_MODE', 2);
define('DAEMONIZE', 0);
define('LOG_PATH', '/data/logs/swoole.log');

class Swoole
{
    private $pdo;

    private $server;

    private $table = 'swoole_test';

    public function __construct()
    {
        $this->server = new swoole_websocket_server(HOST, PORT);
        $this->server->set(
            [
                //开启woker进程数
                'worker_num'    => WORKER_NUM,
                //请求分发策略,
                'dispatch_mode' => DISPATCH_MODE,
                //是否守护进程
                'daemonize'     => DAEMONIZE,
                //启用守护进程后,标准输入和输出会被重定向到 log_file。
                'log_file'      => LOG_PATH,
                //心跳检测
                'heartbeat_check_interval' => 60,
                //长链接断开时间
                'heartbeat_idle_time'      => 120,
            ]
        );

        $this->server->on('workerStart', [$this, 'onWorkerStart']);
        $this->server->on('open', [$this, 'onOpen']);
        $this->server->on('message', [$this, 'onMessage']);
        $this->server->on('request', [$this, 'onRequest']);
        $this->server->on('close', [$this, 'onClose']);
        $this->server->start();
    }

    // step1、开启woker进程
    public function onWorkerStart(swoole_websocket_server $server, $worker_id)
    {
        // 在Worker进程开启时绑定定时器
        if ($worker_id == 0) {
            // 只有当worker_id为0时才添加定时器,避免重复添加
            $this->server->tick(5000, [$this, 'onTick']);
        }
        // 必须每个worker进程维持一个pdo连接
        $this->pdo = new PDO("mysql:host=mysql;dbname=swoole", "root", "123456");
    }

    // step2、客户端服务端建立连接并完成握手后的回调
    public function onOpen(swoole_websocket_server $server, swoole_http_request $request)
    {
        echo "server: handshake success with fd{$request->fd}\n";
    }

    // step3、接收客户端数据
    public function onMessage(swoole_websocket_server $server, swoole_websocket_frame $frame)
    {
        if ( !$frame->finish ) {
            echo "接收客户端数据失败\n";
            $this->server->close($frame->fd);
        }
        //向所有客户端发送数据
        $this->sendClient();
    }

    // http请求
    public function onRequest(swoole_http_request $request, swoole_http_response $response)
    {
        $response->header('Access-Control-Allow-Origin', '*');
        $response->header('Access-Control-Allow-Methods', 'OPTIONS');
        $response->header('Access-Control-Allow-Headers', 'x-requested-with,session_id,Content-Type,token,Origin');
        $response->header('Access-Control-Max-Age', '86400');
        $response->header('Access-Control-Allow-Credentials', 'true');
        $response->header('Content-Type:application/json;charset=utf-8');

        if ($request->server['request_method'] == 'OPTIONS') {
            $response->status(200);
            $response->end();
            return;
        };

        // 接收http请求从get获取message参数的值,给用户推送
        $msg = $this->processData($request);
        if (!$msg) {
            $response->status(500);
            $response->end(json_encode(['code'=>500,'msg'=>'error','data'=>[]]));
        }

        // $this->server->connections 遍历所有websocket连接用户的fd,给所有用户推送
        foreach ($this->server->connections as $fd) {
            // 当前http请求的fd不广播
            if ( $request->fd == $fd ) {
                continue;
            }
            $this->server->push($fd, json_encode($msg));
        }

        $response->status(200);
        $response->end(json_encode(['code'=>0,'msg'=>'success','data'=>[]]));
    }

    protected function processData($request){
        if (empty($request->post['message']) || empty($request->post['ori'])) {
            return false;
        }
        $msg = json_decode($request->post['message'],true);
        $ori = json_decode($request->post['ori'],true);

        $time = date('Y-m-d H:i:s');
        foreach ($msg as &$v) {
            foreach ($ori as $v1) {
                if ($v['id'] != $v1['id']) {
                    continue;
                }
                if ($v['max'] != $v1['max'] || $v['min'] != $v1['min']) {
                    $v['update_time'] = $time;
                } else {
                    $v['update_time'] = $v1['update_time'];
                }
            }
        }
        return $msg;
    }

    //关闭连接
    public function onClose(swoole_websocket_server $server, $fd)
    {
        echo "client {$fd} closed\n";
    }

    //向所有客户端发送数据
    private function sendClient()
    {
        //第一次连接去获取最新数据
        $data = $this->getLatestData();

        //向所有客户端发送数据
        foreach ($this->server->connections as $fd) {
            $this->server->push($fd, json_encode($data));
        }
    }

    //获取表数据
    private function getLatestData()
    {
        $sql = 'select * from ' . $this->table;
        try {
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute();
            $res = $stmt->fetchAll(PDO::FETCH_ASSOC);

            return $res ?: [];
        } catch (Exception $e) {
            return [];
        }
    }

    //定时器回调
    public function onTick()
    {
        try {
            $sql  = 'select 1 from swoole_test where (now() - `update_time`) <= 5 limit 1';
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute();
            $res = $stmt->fetch(PDO::FETCH_ASSOC);
            if ( $res ) {
                //向所有客户端发送数据
                $this->sendClient();
            }
        } catch (Exception $e) {
            $this->pdo = new PDO("mysql:host=mysql;dbname=swoole", "root", "123456");
        }
    }
}

new Swoole();

    这就是后端的代码了,数据库连接什么的请自行修改。

    daemonize 参数为1的情况下,开启守护进程,记得开启了守护进程要加上log_file参数,不然所有输入输出就丢失啦。

    heartbeat_check_interval 参数是心跳检测,避免某些客户端长时间占着茅坑不拉屎的情况,浪费我们的资源,我们就发起心跳检测,如果 heartbeat_idle_time 时长内不给我反馈就close掉了。这时候前端一般也会设置心跳,定时告诉服务器端,我还活着。

    定时器呢,一般分配一个worker进程去监听,当worker_id = 0 的时候才添加定时器。定时器用的地方也多啊,比如我要实时监听某个数据,在某个端,不想或不能给我们发送http请求推送数据的时候,我们只能自己定时去redis/mysql看是否有新数据啊,有我就广播/单播给fd。

    request请求要记得跨域问题,这里我们用header解决跨域就可以了。响应给客户端要用response自带的方法了,直接echo只能输出到日志里面。

    然后这边就没有什么需要注意的了。基本知道怎么启用swoole应该就能弄出这个简单例子了。下面看看客户端的代码:

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Swoole</title>
    <link rel="stylesheet" href="https://cdn.bootcss.com/bootstrap/3.3.7/css/bootstrap.min.css"
 integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
 <!--[if lt IE 9]>
    <script src="https://cdn.bootcss.com/html5shiv/3.7.3/html5shiv.min.js"></script>
    <script src="https://cdn.bootcss.com/respond.js/1.4.2/respond.min.js"></script>
    <![endif]-->
 <script src="https://cdn.bootcss.com/jquery/1.12.4/jquery.min.js"></script>
    <script src="https://cdn.bootcss.com/bootstrap/3.3.7/js/bootstrap.min.js"
 integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa"
 crossorigin="anonymous"></script>
    <style>
        ul { padding: 0; display: inline-block; margin-right: 20px;display: inline-block}
        li { list-style: none; margin-top: 10px}
        li span { width: 50px; display: inline-block;}
    </style>
</head>
<body>
<div class="container">
    <div class="panel panel-default">
        <div class="panel-heading">
            <h3 class="panel-title">Swoole实时数据</h3>
        </div>
        <div class="panel-body">
            <table class="table table-bordered  table-hover">
                <thead id="thead">
                </thead>
                <tbody id="tbody">
                </tbody>
            </table>
        </div>
    </div>

    <div class="panel panel-default">
        <div class="panel-heading">
            <h3 class="panel-title">客户端数据测试</h3>
        </div>
        <div class="panel-body">
            <table class="table table-bordered  table-hover">
                <ul>
                    <li>原油</li>
                    <li><span>max:</span><input type="text" value="" id="oil_max"></li>
                    <li><span>min:</span><input type="text" value="" id="oil_min"></li>
                </ul>
                <ul>
                    <li>黄金</li>
                    <li><span>max:</span><input type="text" value="" id="gold_max"></li>
                    <li><span>min:</span><input type="text" value="" id="gold_min"></li>
                </ul>
                <ul>
                    <li>白银</li>
                    <li><span>max:</span><input type="text" value="" id="sliver_max"></li>
                    <li><span>min:</span><input type="text" value="" id="sliver_min"></li>
                </ul>
                <input type="button" value="开始测试" style="margin-top: 5px" onclick="http_request()">
            </table>
        </div>
    </div>
</div>
<script>
 var ori = [];
 var websocket = new WebSocket('ws://127.0.0.1:9501');

 websocket.onopen = function (event) {
     console.log('websocket connect');
     websocket.send('{"id":1}');
 };

 websocket.onclose = function (event) {
     console.log('websocket close');
 };

 websocket.onerror = function (event, e) {
     console.log('error occured:' + event.data);
 };

 websocket.onmessage = function (event) {
     ori  = event.data;
     data = JSON.parse(event.data);
     var th = '<tr>';
     var width = 1 / json_length(data[0]) * 100;
     for (var key in data[0]) {
         th += "<th style='width:" + width + "%'>" + key + "</th>";
     }
     th += '</tr>';
     $("#thead").html(th);

     var tbody = '';
     for (var line in data) {
         tbody += '<tr>';
         var td = '';
         for (var column in data[line]) {
             td += "<td>" + data[line][column] + "</td>";
         }
         tbody += td + '</tr>';
     }
     $("#tbody").html(tbody);
     $('#oil_max').val(data[0]['max']);
     $('#oil_min').val(data[0]['min']);
     $('#gold_max').val(data[1]['max']);
     $('#gold_min').val(data[1]['min']);
     $('#sliver_max').val(data[2]['max']);
     $('#sliver_min').val(data[2]['min']);
 };

 function json_length(json) {
     var length = 0;
     for (var item in json) {
         length++;
     }
     return length;
 }

 // 发起一个http请求
 function http_request() {
     var data = [];
     var oil_max    = $('#oil_max').val();
     var oil_min    = $('#oil_min').val();
     var gold_max   = $('#gold_max').val();
     var gold_min   = $('#gold_min').val();
     var sliver_max = $('#sliver_max').val();
     var sliver_min = $('#sliver_min').val();

     data.push({id:1,product:'原油',max:oil_max,min:oil_min});
     data.push({id:2,product:'黄金',max:gold_max,min:gold_min});
     data.push({id:3,product:'白银',max:sliver_max,min:sliver_min});
     var str = JSON.stringify(data);

     $.ajax({
        url : 'http://127.0.0.1:9501',
        type : 'post',
        data : {message:str,ori:ori},
        success : function (res) {
            console.log(res);
        }
     });
 }
</script>
</body>
</html>

    客户端没什么好说的,官方就是这么调用的。我把http的请求一起放到这里来展示了。

    数据库我是新建了个swoole库,swoole_test表,里面插了几条数据:

/*
 Navicat Premium Data Transfer

 Source Server         : localhost
 Source Server Type    : MySQL
 Source Server Version : 80013
 Source Host           : localhost:3306
 Source Schema         : swoole

 Target Server Type    : MySQL
 Target Server Version : 80013
 File Encoding         : 65001

 Date: 09/04/2019 12:05:25
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for swoole_test
-- ----------------------------
DROP TABLE IF EXISTS `swoole_test`;
CREATE TABLE `swoole_test` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `product` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '',
  `max` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '',
  `min` varchar(255) NOT NULL DEFAULT '',
  `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- ----------------------------
-- Records of swoole_test
-- ----------------------------
BEGIN;
INSERT INTO `swoole_test` VALUES (1, '原油', '1045.98', '1001.23', '2019-04-08 16:29:08');
INSERT INTO `swoole_test` VALUES (2, '黄金', '1789.12', '1354.65', '2019-04-08 16:00:41');
INSERT INTO `swoole_test` VALUES (3, '白银', '433.66', '398.61', '2019-04-08 16:43:57');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

    这样基本接完结了,撒花。

|  版权声明:本文为博主原创文章,转载请注明出处。