系统架构:nginx服务器+应用服务器+数据库
通过websocket监听应用服务器的8090端口;前端js发送websocket请求到ng服务器;ng服务器转发请求到应用服务器的8090端口
1、php后端设置websocket监听
$_ip取应用服务器ip
$_port为监听的端口
启动命令:
php /u01/a/b/c/socketServer.php
nohup php /u01/a/b/c/socketServer.php(这样即使退出了终端,依然在后台运行。)
如果提示没有php命令,要将php添加到环境变量
_ip = gethostbyname(gethostname());$this->initSocket();}// 创建WebSocket连接private function initSocket(){try {//创建socket套接字$this->_master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);// 设置IP和端口重用,在重启服务器后能重新使用此端口;socket_set_option($this->_master, SOL_SOCKET, SO_REUSEADDR, 1);//绑定地址与端口socket_bind($this->_master, $this->_ip, $this->_port);//listen函数使用主动连接套接口变为被连接套接口,使得一个进程可以接受其它进程的请求,从而成为一个服务器进程。在TCP服务器编程中listen函数把进程变为一个服务器,并指定相应的套接字变为被动连接,其中的能存储的请求不明的socket数目。socket_listen($this->_master, self::LISTEN_SOCKET_NUM);} catch (Exception $e) {}//将socket保存到socket池中(将套接字放入数组)默认把当前用户放在第一个$this->_socketPool[0] = array('resource' => $this->_master);$pid = getmypid();}public function array_column($arr2, $column_key) {$data = [];foreach ($arr2 as $key => $value) {$data[] = $value[$column_key];}return $data;}// 挂起进程遍历套接字数组,对数据进行接收、处理、发送public function run(){// 死循环直到socket断开while (true) {try {$write = $except = NULL;// 从数组中取出resource列$sockets = $this->array_column($this->_socketPool, 'resource'); /* $sockets 是一个存放文件描述符的数组。$write 是监听是否客户端写数据,传入NULL是不关心是否有写变化$except 是$sockets里面要派粗话的元素,传入null是监听全部最后一个参数是超时时间,0立即结束 n>1则最多n秒后结束,如遇某一个连接有新动态,则提前返回null如遇某一个连接有新动态,则返回*/// 接收套接字数字,监听他们的状态就是有新消息到或有客户端连接/断开时,socket_select函数才会返回,继续往下执行$read_num = socket_select($sockets, $write, $except, NULL);if (false === $read_num) {return;}// 遍历套接字数组foreach ($sockets as $socket) {// 如果有新的连接进来if ($socket == $this->_master) {// 接收一个socket连接$client = socket_accept($this->_master);if ($client === false) {continue;}//连接 并放到socket池中$this->connection($client);} else {//接收已连接的socket数据,返回的是从socket中接收的字节数。// 第一个参数:socket资源,第二个参数:存储接收的数据的变量,第三个参数:接收数据的长度$bytes = @socket_recv($socket, $buffer, 2048, 0);// 如果接收的字节数为0$recv_msg = '';if ($bytes == 0) {// 断开连接$recv_msg = $this->disconnection($socket);} else {$data = json_decode($buffer);// 判断有没有握手,没有握手进行握手,已经握手则进行处理// 当从PMS后端传过来的,不用再握手if ($data->from != 'server' && $this->_socketPool[(int)$socket]['handShake'] == false) {// 握手$this->handShake($socket, $buffer);continue;} else {// 解析客户端传来的数据$recv_msg = gettype($data) == 'object' ? (array)$data : $this->parse($buffer);}}// 业务处理,组装返回客户端的数据格式$msg = $this->doEvents($socket, $recv_msg);// 把服务端返回的数据写入套接字$this->broadcast($msg);}}} catch (Exception $e) {}}}/** * 数据广播 * @param $data */private function broadcast($data){foreach ($this->_socketPool as $socket) {if ($socket['resource'] == $this->_master) {continue;}// 写入套接字socket_write($socket['resource'], $data, strlen($data));}}/** * 业务处理,在这可以对数据库进行操作,并返回客户端数据;根据不同类型,组装不同格式的数据 * @param $socket * @param $recv_msg 客户端传来的数据 * @return string */private function doEvents($socket, $recv_msg){$type = $recv_msg['type'];$response = [];switch ($type) {// case 'login':// $response['type'] = 'login';// break;// case 'logout':// $response['type'] = 'logout';// break;case 'send':$response['info'] = $recv_msg['info'];break;}return $this->frame(json_encode($response));}/** * socket握手 * @param $socket * @param $buffer客户端传来的数据接收的数据 * @return bool */public function handShake($socket, $buffer){$acceptKey = $this->encry($buffer);$upgrade = "HTTP/1.1 101 Switching Protocols\r\n" ."Upgrade: websocket\r\n" ."Connection: Upgrade\r\n" ."Sec-WebSocket-Accept: " . $acceptKey . "\r\n\r\n";// 将socket写入缓冲区socket_write($socket, $upgrade, strlen($upgrade));// 标记握手已经成功,下次接受数据采用数据帧格式$this->_socketPool[(int)$socket]['handShake'] = true;//发送消息通知客户端握手成功$msg = array('type' => 'handShake', 'msg' => 'success');$msg = $this->frame(json_encode($msg));socket_write($socket, $msg, strlen($msg));return true;}/** * 帧数据封装 * @param $msg * @return string */private function frame($msg){$frame = [];$frame[0] = '81';$len = strlen($msg);if ($len < 126) {$frame[1] = $len < 16 ? '0' . dechex($len) : dechex($len);} else if ($len < 65025) {$s = dechex($len);$frame[1] = '7e' . str_repeat('0', 4 - strlen($s)) . $s;} else {$s = dechex($len);$frame[1] = '7f' . str_repeat('0', 16 - strlen($s)) . $s;}$data = '';$l = strlen($msg);for ($i = 0; $i < $l; $i++) {$data .= dechex(ord($msg{$i}));}$frame[2] = $data;$data = implode('', $frame);return pack("H*", $data);}/** * 解析客户端的数据 * @param $buffer * @return mixed */private function parse($buffer){$decoded = '';$len = ord($buffer[1]) & 127;if ($len === 126) {$masks = substr($buffer, 4, 4);$data = substr($buffer, 8);} else if ($len === 127) {$masks = substr($buffer, 10, 4);$data = substr($buffer, 14);} else {$masks = substr($buffer, 2, 4);$data = substr($buffer, 6);}for ($index = 0; $index $client,'userInfo' => '','handShake' => false,'ip' => $address,'port' => $port,);$this->_socketPool[(int)$client] = $info;}/** * 断开连接 * @param $socket * @return array */public function disconnection($socket){$recv_msg = array('type' => 'logout','msg' => @$this->_socketPool[(int)$socket]['userInfo']['username'],);unset($this->_socketPool[(int)$socket]);return $recv_msg;}}// 类外实例化$sk = new socketServer();// 运行$sk ->run();
2、js前端代码
如果是https请求的,将【ws://】改成【wss://】
$(function(){var wsObj = new WebSocket("ws://【ng服务器ip】:【ng服务器端口】/ws");wsObj.onopen = function(ev){console.log("监听中...");};wsObj.onmessage = function(ev){// 业务代码};wsObj.onclose = function(ev){console.log("监听结束");wsObj.close();};wsObj.onerror = function(ev){console.log("监听报错", ev);};});
3、nginx配置
如果是https请求的,将【/ws】改成【/wss】
upstream pms.uat {server10.73.17.8:80;}upstream pms.uat2 {server10.73.17.8:8090;}server {listen80;server_name localhost;client_max_body_size 50M;location / {root /usr/local/httpd/htdocs;proxy_pass http://pms.uat/trunk/www/;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;}# 转发websocketlocation /ws{ root /usr/local/httpd/htdocs;proxy_pass http://pms.uat2;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection 'upgrade';}}