Gateway.php 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132
  1. <?php
  2. namespace GatewayClient;
  3. use \Exception;
  4. /**
  5. * This file is part of workerman.
  6. *
  7. * Licensed under The MIT License
  8. * For full copyright and license information, please see the MIT-LICENSE.txt
  9. * Redistributions of files must retain the above copyright notice.
  10. *
  11. * @author walkor<walkor@workerman.net>
  12. * @copyright walkor<walkor@workerman.net>
  13. * @link http://www.workerman.net/
  14. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  15. */
  16. /**
  17. * 数据发送相关
  18. * @version 3.0.0
  19. */
  20. class Gateway
  21. {
  22. /**
  23. * gateway 实例
  24. *
  25. * @var object
  26. */
  27. protected static $businessWorker = null;
  28. /**
  29. * 注册中心地址
  30. *
  31. * @var string
  32. */
  33. public static $registerAddress = '127.0.0.1:1236';
  34. /**
  35. * 秘钥
  36. * @var string
  37. */
  38. public static $secretKey = '';
  39. /**
  40. * 链接超时时间
  41. * @var int
  42. */
  43. public static $connectTimeout = 3;
  44. /**
  45. * 与Gateway是否是长链接
  46. * @var bool
  47. */
  48. public static $persistentConnection = false;
  49. /**
  50. * 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息
  51. *
  52. * @param string $message 向客户端发送的消息
  53. * @param array $client_id_array 客户端 id 数组
  54. * @param array $exclude_client_id 不给这些client_id发
  55. * @param bool $raw 是否发送原始数据(即不调用gateway的协议的encode方法)
  56. * @return void
  57. */
  58. public static function sendToAll($message, $client_id_array = null, $exclude_client_id = null, $raw = false)
  59. {
  60. $gateway_data = GatewayProtocol::$empty;
  61. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
  62. $gateway_data['body'] = $message;
  63. if ($raw) {
  64. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  65. }
  66. if ($exclude_client_id) {
  67. if (!is_array($exclude_client_id)) {
  68. $exclude_client_id = array($exclude_client_id);
  69. }
  70. if ($client_id_array) {
  71. $exclude_client_id = array_flip($exclude_client_id);
  72. }
  73. }
  74. if ($client_id_array) {
  75. if (!is_array($client_id_array)) {
  76. echo new \Exception('bad $client_id_array:'.var_export($client_id_array, true));
  77. return;
  78. }
  79. $data_array = array();
  80. foreach ($client_id_array as $client_id) {
  81. if (isset($exclude_client_id[$client_id])) {
  82. continue;
  83. }
  84. $address = Context::clientIdToAddress($client_id);
  85. $key = long2ip($address['local_ip']) . ":{$address['local_port']}";
  86. $data_array[$key][$address['connection_id']] = $address['connection_id'];
  87. }
  88. foreach ($data_array as $addr => $connection_id_list) {
  89. $the_gateway_data = $gateway_data;
  90. $the_gateway_data['ext_data'] = json_encode(array('connections' => $connection_id_list));
  91. self::sendToGateway($addr, $the_gateway_data);
  92. }
  93. return;
  94. } elseif (empty($client_id_array) && is_array($client_id_array)) {
  95. return;
  96. }
  97. if (!$exclude_client_id) {
  98. return self::sendToAllGateway($gateway_data);
  99. }
  100. $address_connection_array = self::clientIdArrayToAddressArray($exclude_client_id);
  101. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  102. if (self::$businessWorker) {
  103. foreach (self::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  104. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  105. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  106. /** @var TcpConnection $gateway_connection */
  107. $gateway_connection->send($gateway_data);
  108. }
  109. } // 运行在其它环境中,通过注册中心得到gateway地址
  110. else {
  111. $all_addresses = self::getAllGatewayAddressesFromRegister();
  112. if (!$all_addresses) {
  113. throw new Exception('Gateway::getAllGatewayAddressesFromRegister() with registerAddress:' .
  114. self::$registerAddress . ' return ' . var_export($all_addresses, true));
  115. }
  116. foreach ($all_addresses as $address) {
  117. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  118. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  119. self::sendToGateway($address, $gateway_data);
  120. }
  121. }
  122. }
  123. /**
  124. * 向某个客户端连接发消息
  125. *
  126. * @param int $client_id
  127. * @param string $message
  128. * @return bool
  129. */
  130. public static function sendToClient($client_id, $message)
  131. {
  132. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  133. }
  134. /**
  135. * 向当前客户端连接发送消息
  136. *
  137. * @param string $message
  138. * @return bool
  139. */
  140. public static function sendToCurrentClient($message)
  141. {
  142. return self::sendCmdAndMessageToClient(null, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  143. }
  144. /**
  145. * 判断某个uid是否在线
  146. *
  147. * @param string $uid
  148. * @return int 0|1
  149. */
  150. public static function isUidOnline($uid)
  151. {
  152. return (int)self::getClientIdByUid($uid);
  153. }
  154. /**
  155. * 判断某个客户端连接是否在线
  156. *
  157. * @param int $client_id
  158. * @return int 0|1
  159. */
  160. public static function isOnline($client_id)
  161. {
  162. $address_data = Context::clientIdToAddress($client_id);
  163. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  164. if (isset(self::$businessWorker)) {
  165. if (!isset(self::$businessWorker->gatewayConnections[$address])) {
  166. return 0;
  167. }
  168. }
  169. $gateway_data = GatewayProtocol::$empty;
  170. $gateway_data['cmd'] = GatewayProtocol::CMD_IS_ONLINE;
  171. $gateway_data['connection_id'] = $address_data['connection_id'];
  172. return (int)self::sendAndRecv($address, $gateway_data);
  173. }
  174. /**
  175. * 获取所有在线用户的session,client_id为 key
  176. *
  177. * @param string $group
  178. * @return array
  179. */
  180. public static function getAllClientInfo($group = null)
  181. {
  182. return self::getAllClientSessions($group);
  183. }
  184. /**
  185. * 获取所有在线用户的session,client_id为 key
  186. *
  187. * @param string $group
  188. * @return array
  189. */
  190. public static function getAllClientSessions($group = null)
  191. {
  192. $gateway_data = GatewayProtocol::$empty;
  193. if (!$group) {
  194. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_ALL_CLIENT_INFO;
  195. } else {
  196. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLINET_INFO_BY_GROUP;
  197. $gateway_data['ext_data'] = $group;
  198. }
  199. $status_data = array();
  200. $all_buffer_array = self::getBufferFromAllGateway($gateway_data);
  201. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  202. foreach ($buffer_array as $local_port => $data) {
  203. if ($data) {
  204. foreach ($data as $connection_id => $session_buffer) {
  205. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  206. if ($client_id === Context::$client_id) {
  207. $status_data[$client_id] = (array)$_SESSION;
  208. } else {
  209. $status_data[$client_id] = $session_buffer ? Context::sessionDecode($session_buffer) : array();
  210. }
  211. }
  212. }
  213. }
  214. }
  215. return $status_data;
  216. }
  217. /**
  218. * 获取某个组的连接信息
  219. *
  220. * @param string $group
  221. * @return array
  222. */
  223. public static function getClientInfoByGroup($group)
  224. {
  225. return self::getAllClientSessions($group);
  226. }
  227. /**
  228. * 获取某个组的连接信息
  229. *
  230. * @param string $group
  231. * @return array
  232. */
  233. public static function getClientSessionsByGroup($group)
  234. {
  235. return self::getAllClientSessions($group);
  236. }
  237. /**
  238. * 获取所有连接数
  239. *
  240. * @return int
  241. */
  242. public static function getAllClientCount()
  243. {
  244. return self::getClientCountByGroup();
  245. }
  246. /**
  247. * 获取某个组的在线连接数
  248. *
  249. * @param string $group
  250. * @return int
  251. */
  252. public static function getClientCountByGroup($group = '')
  253. {
  254. $gateway_data = GatewayProtocol::$empty;
  255. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP;
  256. $gateway_data['ext_data'] = $group;
  257. $total_count = 0;
  258. $all_buffer_array = self::getBufferFromAllGateway($gateway_data);
  259. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  260. foreach ($buffer_array as $local_port => $count) {
  261. if ($count) {
  262. $total_count += $count;
  263. }
  264. }
  265. }
  266. return $total_count;
  267. }
  268. /**
  269. * 获取与 uid 绑定的 client_id 列表
  270. *
  271. * @param string $uid
  272. * @return array
  273. */
  274. public static function getClientIdByUid($uid)
  275. {
  276. $gateway_data = GatewayProtocol::$empty;
  277. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID;
  278. $gateway_data['ext_data'] = $uid;
  279. $client_list = array();
  280. $all_buffer_array = self::getBufferFromAllGateway($gateway_data);
  281. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  282. foreach ($buffer_array as $local_port => $connection_id_array) {
  283. if ($connection_id_array) {
  284. foreach ($connection_id_array as $connection_id) {
  285. $client_list[] = Context::addressToClientId($local_ip, $local_port, $connection_id);
  286. }
  287. }
  288. }
  289. }
  290. return $client_list;
  291. }
  292. /**
  293. * 生成验证包,用于验证此客户端的合法性
  294. *
  295. * @return string
  296. */
  297. protected static function generateAuthBuffer()
  298. {
  299. $gateway_data = GatewayProtocol::$empty;
  300. $gateway_data['cmd'] = GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT;
  301. $gateway_data['body'] = json_encode(array(
  302. 'secret_key' => self::$secretKey,
  303. ));
  304. return GatewayProtocol::encode($gateway_data);
  305. }
  306. /**
  307. * 批量向所有 gateway 发包,并得到返回数组
  308. *
  309. * @param string $gateway_data
  310. * @return array
  311. * @throws Exception
  312. */
  313. protected static function getBufferFromAllGateway($gateway_data)
  314. {
  315. $gateway_buffer = GatewayProtocol::encode($gateway_data);
  316. $gateway_buffer = self::$secretKey ? self::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  317. if (isset(self::$businessWorker)) {
  318. $all_addresses = self::$businessWorker->getAllGatewayAddresses();
  319. if (empty($all_addresses)) {
  320. throw new Exception('businessWorker::getAllGatewayAddresses return empty');
  321. }
  322. } else {
  323. $all_addresses = self::getAllGatewayAddressesFromRegister();
  324. if (empty($all_addresses)) {
  325. return array();
  326. }
  327. }
  328. $client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array();
  329. // 批量向所有gateway进程发送请求数据
  330. foreach ($all_addresses as $address) {
  331. $client = stream_socket_client("tcp://$address", $errno, $errmsg, self::$connectTimeout);
  332. if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
  333. $socket_id = (int)$client;
  334. $client_array[$socket_id] = $client;
  335. $client_address_map[$socket_id] = explode(':', $address);
  336. $receive_buffer_array[$socket_id] = '';
  337. }
  338. }
  339. // 超时1秒
  340. $timeout = 1;
  341. $time_start = microtime(true);
  342. // 批量接收请求
  343. while (count($client_array) > 0) {
  344. $write = $except = array();
  345. $read = $client_array;
  346. if (@stream_select($read, $write, $except, $timeout)) {
  347. foreach ($read as $client) {
  348. $socket_id = (int)$client;
  349. $buffer = stream_socket_recvfrom($client, 65535);
  350. if ($buffer !== '' && $buffer !== false) {
  351. $receive_buffer_array[$socket_id] .= $buffer;
  352. $receive_length = strlen($receive_buffer_array[$socket_id]);
  353. if (empty($recv_length_array[$socket_id]) && $receive_length >= 4) {
  354. $recv_length_array[$socket_id] = current(unpack('N', $receive_buffer_array[$socket_id]));
  355. }
  356. if (!empty($recv_length_array[$socket_id]) && $receive_length >= $recv_length_array[$socket_id] + 4) {
  357. unset($client_array[$socket_id]);
  358. }
  359. } elseif (feof($client)) {
  360. unset($client_array[$socket_id]);
  361. }
  362. }
  363. }
  364. if (microtime(true) - $time_start > $timeout) {
  365. break;
  366. }
  367. }
  368. $format_buffer_array = array();
  369. foreach ($receive_buffer_array as $socket_id => $buffer) {
  370. $local_ip = ip2long($client_address_map[$socket_id][0]);
  371. $local_port = $client_address_map[$socket_id][1];
  372. $format_buffer_array[$local_ip][$local_port] = unserialize(substr($buffer, 4));
  373. }
  374. return $format_buffer_array;
  375. }
  376. /**
  377. * 关闭某个客户端
  378. *
  379. * @param int $client_id
  380. * @return bool
  381. */
  382. public static function closeClient($client_id)
  383. {
  384. if ($client_id === Context::$client_id) {
  385. return self::closeCurrentClient();
  386. } // 不是发给当前用户则使用存储中的地址
  387. else {
  388. $address_data = Context::clientIdToAddress($client_id);
  389. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  390. return self::kickAddress($address, $address_data['connection_id']);
  391. }
  392. }
  393. /**
  394. * 踢掉当前客户端
  395. *
  396. * @return bool
  397. * @throws Exception
  398. */
  399. public static function closeCurrentClient()
  400. {
  401. if (!Context::$connection_id) {
  402. throw new Exception('closeCurrentClient can not be called in async context');
  403. }
  404. return self::kickAddress(long2ip(Context::$local_ip) . ':' . Context::$local_port, Context::$connection_id);
  405. }
  406. /**
  407. * 将 client_id 与 uid 绑定
  408. *
  409. * @param int $client_id
  410. * @param int|string $uid
  411. * @return bool
  412. */
  413. public static function bindUid($client_id, $uid)
  414. {
  415. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_BIND_UID, '', $uid);
  416. }
  417. /**
  418. * 将 client_id 与 uid 解除绑定
  419. *
  420. * @param int $client_id
  421. * @param int|string $uid
  422. * @return bool
  423. */
  424. public static function unbindUid($client_id, $uid)
  425. {
  426. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UNBIND_UID, '', $uid);
  427. }
  428. /**
  429. * 将 client_id 加入组
  430. *
  431. * @param int $client_id
  432. * @param int|string $group
  433. * @return bool
  434. */
  435. public static function joinGroup($client_id, $group)
  436. {
  437. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_JOIN_GROUP, '', $group);
  438. }
  439. /**
  440. * 将 client_id 离开组
  441. *
  442. * @param int $client_id
  443. * @param int|string $group
  444. * @return bool
  445. */
  446. public static function leaveGroup($client_id, $group)
  447. {
  448. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_LEAVE_GROUP, '', $group);
  449. }
  450. /**
  451. * 向所有 uid 发送
  452. *
  453. * @param int|string|array $uid
  454. * @param string $message
  455. */
  456. public static function sendToUid($uid, $message)
  457. {
  458. $gateway_data = GatewayProtocol::$empty;
  459. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_UID;
  460. $gateway_data['body'] = $message;
  461. if (!is_array($uid)) {
  462. $uid = array($uid);
  463. }
  464. $gateway_data['ext_data'] = json_encode($uid);
  465. self::sendToAllGateway($gateway_data);
  466. }
  467. /**
  468. * 向 group 发送
  469. *
  470. * @param int|string|array $group 组(不允许是 0 '0' false null array()等为空的值)
  471. * @param string $message 消息
  472. * @param array $exclude_client_id 不给这些client_id发
  473. * @param bool $raw 发送原始数据(即不调用gateway的协议的encode方法)
  474. */
  475. public static function sendToGroup($group, $message, $exclude_client_id = null, $raw = false)
  476. {
  477. $gateway_data = GatewayProtocol::$empty;
  478. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_GROUP;
  479. $gateway_data['body'] = $message;
  480. if ($raw) {
  481. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  482. }
  483. if (!is_array($group)) {
  484. $group = array($group);
  485. }
  486. // 分组发送,没有排除的client_id,直接发送
  487. $default_ext_data_buffer = json_encode(array('group'=> $group, 'exclude'=> null));
  488. if (empty($exclude_client_id)) {
  489. $gateway_data['ext_data'] = $default_ext_data_buffer;
  490. return self::sendToAllGateway($gateway_data);
  491. }
  492. // 分组发送,有排除的client_id,需要将client_id转换成对应gateway进程内的connectionId
  493. if (!is_array($exclude_client_id)) {
  494. $exclude_client_id = array($exclude_client_id);
  495. }
  496. $address_connection_array = self::clientIdArrayToAddressArray($exclude_client_id);
  497. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  498. if (self::$businessWorker) {
  499. foreach (self::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  500. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  501. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  502. $default_ext_data_buffer;
  503. /** @var TcpConnection $gateway_connection */
  504. $gateway_connection->send($gateway_data);
  505. }
  506. } // 运行在其它环境中,通过注册中心得到gateway地址
  507. else {
  508. $all_addresses = self::getAllGatewayAddressesFromRegister();
  509. if (!$all_addresses) {
  510. throw new Exception('Gateway::getAllGatewayAddressesFromRegister() with registerAddress:' .
  511. self::$registerAddress . ' return ' . var_export($all_addresses, true));
  512. }
  513. foreach ($all_addresses as $address) {
  514. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  515. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  516. $default_ext_data_buffer;
  517. self::sendToGateway($address, $gateway_data);
  518. }
  519. }
  520. }
  521. /**
  522. * 更新 session,框架自动调用,开发者不要调用
  523. *
  524. * @param int $client_id
  525. * @param string $session_str
  526. * @return bool
  527. */
  528. public static function setSocketSession($client_id, $session_str)
  529. {
  530. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SET_SESSION, '', $session_str);
  531. }
  532. /**
  533. * 设置 session,原session值会被覆盖
  534. *
  535. * @param int $client_id
  536. * @param array $session
  537. */
  538. public static function setSession($client_id, array $session)
  539. {
  540. if (Context::$client_id === $client_id) {
  541. $_SESSION = $session;
  542. Context::$old_session = $_SESSION;
  543. }
  544. return self::setSocketSession($client_id, Context::sessionEncode($session));
  545. }
  546. /**
  547. * 更新 session,实际上是与老的session合并
  548. *
  549. * @param int $client_id
  550. * @param array $session
  551. */
  552. public static function updateSession($client_id, array $session)
  553. {
  554. if (Context::$client_id === $client_id) {
  555. $_SESSION = $session + (array)$_SESSION;
  556. Context::$old_session = $_SESSION;
  557. }
  558. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UPDATE_SESSION, '', Context::sessionEncode($session));
  559. }
  560. /**
  561. * 获取某个client_id的session
  562. *
  563. * @param int $client_id
  564. * @return mixed false表示出错、null表示用户不存在、array表示具体的session信息
  565. */
  566. public static function getSession($client_id)
  567. {
  568. $address_data = Context::clientIdToAddress($client_id);
  569. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  570. if (isset(self::$businessWorker)) {
  571. if (!isset(self::$businessWorker->gatewayConnections[$address])) {
  572. return null;
  573. }
  574. }
  575. $gateway_data = GatewayProtocol::$empty;
  576. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID;
  577. $gateway_data['connection_id'] = $address_data['connection_id'];
  578. return self::sendAndRecv($address, $gateway_data);
  579. }
  580. /**
  581. * 想某个用户网关发送命令和消息
  582. *
  583. * @param int $client_id
  584. * @param int $cmd
  585. * @param string $message
  586. * @param string $ext_data
  587. * @return boolean
  588. */
  589. protected static function sendCmdAndMessageToClient($client_id, $cmd, $message, $ext_data = '')
  590. {
  591. // 如果是发给当前用户则直接获取上下文中的地址
  592. if ($client_id === Context::$client_id || $client_id === null) {
  593. $address = long2ip(Context::$local_ip) . ':' . Context::$local_port;
  594. $connection_id = Context::$connection_id;
  595. } else {
  596. $address_data = Context::clientIdToAddress($client_id);
  597. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  598. $connection_id = $address_data['connection_id'];
  599. }
  600. $gateway_data = GatewayProtocol::$empty;
  601. $gateway_data['cmd'] = $cmd;
  602. $gateway_data['connection_id'] = $connection_id;
  603. $gateway_data['body'] = $message;
  604. if (!empty($ext_data)) {
  605. $gateway_data['ext_data'] = $ext_data;
  606. }
  607. return self::sendToGateway($address, $gateway_data);
  608. }
  609. /**
  610. * 发送数据并返回
  611. *
  612. * @param int $address
  613. * @param mixed $data
  614. * @return bool
  615. * @throws Exception
  616. */
  617. protected static function sendAndRecv($address, $data)
  618. {
  619. $buffer = GatewayProtocol::encode($data);
  620. $buffer = self::$secretKey ? self::generateAuthBuffer() . $buffer : $buffer;
  621. $client = stream_socket_client("tcp://$address", $errno, $errmsg, self::$connectTimeout);
  622. if (!$client) {
  623. throw new Exception("can not connect to tcp://$address $errmsg");
  624. }
  625. if (strlen($buffer) === stream_socket_sendto($client, $buffer)) {
  626. $timeout = 1;
  627. // 阻塞读
  628. stream_set_blocking($client, 1);
  629. // 1秒超时
  630. stream_set_timeout($client, 1);
  631. $all_buffer = '';
  632. $time_start = microtime(true);
  633. $pack_len = 0;
  634. while (1) {
  635. $buf = stream_socket_recvfrom($client, 655350);
  636. if ($buf !== '' && $buf !== false) {
  637. $all_buffer .= $buf;
  638. } else {
  639. if (feof($client)) {
  640. throw new Exception("connection close tcp://$address");
  641. } elseif (microtime(true) - $time_start > $timeout) {
  642. break;
  643. }
  644. continue;
  645. }
  646. $recv_len = strlen($all_buffer);
  647. if (!$pack_len && $recv_len >= 4) {
  648. $pack_len= current(unpack('N', $all_buffer));
  649. }
  650. // 回复的数据都是以\n结尾
  651. if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) {
  652. break;
  653. }
  654. }
  655. // 返回结果
  656. return unserialize(substr($all_buffer, 4));
  657. } else {
  658. throw new Exception("sendAndRecv($address, \$bufer) fail ! Can not send data!", 502);
  659. }
  660. }
  661. /**
  662. * 发送数据到网关
  663. *
  664. * @param string $address
  665. * @param array $gateway_data
  666. * @return bool
  667. */
  668. protected static function sendToGateway($address, $gateway_data)
  669. {
  670. return self::sendBufferToGateway($address, GatewayProtocol::encode($gateway_data));
  671. }
  672. /**
  673. * 发送buffer数据到网关
  674. * @param string $address
  675. * @param string $gateway_buffer
  676. * @return bool
  677. */
  678. protected static function sendBufferToGateway($address, $gateway_buffer)
  679. {
  680. // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
  681. if (self::$businessWorker) {
  682. if (!isset(self::$businessWorker->gatewayConnections[$address])) {
  683. return false;
  684. }
  685. return self::$businessWorker->gatewayConnections[$address]->send($gateway_buffer, true);
  686. }
  687. // 非workerman环境
  688. $gateway_buffer = self::$secretKey ? self::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  689. $flag = self::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT;
  690. $client = stream_socket_client("tcp://$address", $errno, $errmsg, self::$connectTimeout, $flag);
  691. return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
  692. }
  693. /**
  694. * 向所有 gateway 发送数据
  695. *
  696. * @param string $gateway_data
  697. * @throws Exception
  698. */
  699. protected static function sendToAllGateway($gateway_data)
  700. {
  701. $buffer = GatewayProtocol::encode($gateway_data);
  702. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  703. if (self::$businessWorker) {
  704. foreach (self::$businessWorker->gatewayConnections as $gateway_connection) {
  705. /** @var TcpConnection $gateway_connection */
  706. $gateway_connection->send($buffer, true);
  707. }
  708. } // 运行在其它环境中,通过注册中心得到gateway地址
  709. else {
  710. $all_addresses = self::getAllGatewayAddressesFromRegister();
  711. if (!$all_addresses) {
  712. throw new Exception('Gateway::getAllGatewayAddressesFromRegister() with registerAddress:' .
  713. self::$registerAddress . ' return ' . var_export($all_addresses, true));
  714. }
  715. foreach ($all_addresses as $address) {
  716. self::sendBufferToGateway($address, $buffer);
  717. }
  718. }
  719. }
  720. /**
  721. * 踢掉某个网关的 socket
  722. *
  723. * @param string $address
  724. * @param int $connection_id
  725. * @return bool
  726. */
  727. protected static function kickAddress($address, $connection_id)
  728. {
  729. $gateway_data = GatewayProtocol::$empty;
  730. $gateway_data['cmd'] = GatewayProtocol::CMD_KICK;
  731. $gateway_data['connection_id'] = $connection_id;
  732. return self::sendToGateway($address, $gateway_data);
  733. }
  734. /**
  735. * 将clientid数组转换成address数组
  736. *
  737. * @param array $client_id_array
  738. * @return array
  739. */
  740. protected static function clientIdArrayToAddressArray(array $client_id_array)
  741. {
  742. $address_connection_array = array();
  743. foreach ($client_id_array as $client_id) {
  744. $address_data = Context::clientIdToAddress($client_id);
  745. $address = long2ip($address_data['local_ip']) .
  746. ":{$address_data['local_port']}";
  747. $address_connection_array[$address][$address_data['connection_id']] = $address_data['connection_id'];
  748. }
  749. return $address_connection_array;
  750. }
  751. /**
  752. * 设置 gateway 实例
  753. *
  754. * @param \GatewayWorker\BusinessWorker $business_worker_instance
  755. */
  756. public static function setBusinessWorker($business_worker_instance)
  757. {
  758. self::$businessWorker = $business_worker_instance;
  759. }
  760. /**
  761. * 获取通过注册中心获取所有 gateway 通讯地址
  762. *
  763. * @return array
  764. * @throws Exception
  765. */
  766. protected static function getAllGatewayAddressesFromRegister()
  767. {
  768. static $addresses_cache, $last_update;
  769. $time_now = time();
  770. $expiration_time = 1;
  771. if(empty($addresses_cache) || $time_now - $last_update > $expiration_time) {
  772. $client = stream_socket_client('tcp://' . self::$registerAddress, $errno, $errmsg, self::$connectTimeout);
  773. if (!$client) {
  774. throw new Exception('Can not connect to tcp://' . self::$registerAddress . ' ' . $errmsg);
  775. }
  776. fwrite($client, '{"event":"worker_connect","secret_key":"' . self::$secretKey . '"}' . "\n");
  777. stream_set_timeout($client, 1);
  778. $ret = fgets($client, 655350);
  779. if (!$ret || !$data = json_decode(trim($ret), true)) {
  780. throw new Exception('getAllGatewayAddressesFromRegister fail. tcp://' .
  781. self::$registerAddress . ' return ' . var_export($ret, true));
  782. }
  783. $last_update = $time_now;
  784. $addresses_cache = $data['addresses'];
  785. }
  786. return $addresses_cache;
  787. }
  788. }
  789. /**
  790. * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
  791. */
  792. class Context
  793. {
  794. /**
  795. * 内部通讯id
  796. * @var string
  797. */
  798. public static $local_ip;
  799. /**
  800. * 内部通讯端口
  801. * @var int
  802. */
  803. public static $local_port;
  804. /**
  805. * 客户端ip
  806. * @var string
  807. */
  808. public static $client_ip;
  809. /**
  810. * 客户端端口
  811. * @var int
  812. */
  813. public static $client_port;
  814. /**
  815. * client_id
  816. * @var string
  817. */
  818. public static $client_id;
  819. /**
  820. * 连接connection->id
  821. * @var int
  822. */
  823. public static $connection_id;
  824. /**
  825. * 旧的session
  826. *
  827. * @var string
  828. */
  829. public static $old_session;
  830. /**
  831. * 编码session
  832. * @param mixed $session_data
  833. * @return string
  834. */
  835. public static function sessionEncode($session_data = '')
  836. {
  837. if($session_data !== '')
  838. {
  839. return serialize($session_data);
  840. }
  841. return '';
  842. }
  843. /**
  844. * 解码session
  845. * @param string $session_buffer
  846. * @return mixed
  847. */
  848. public static function sessionDecode($session_buffer)
  849. {
  850. return unserialize($session_buffer);
  851. }
  852. /**
  853. * 清除上下文
  854. * @return void
  855. */
  856. public static function clear()
  857. {
  858. self::$local_ip = self::$local_port = self::$client_ip = self::$client_port =
  859. self::$client_id = self::$connection_id = self::$old_session = null;
  860. }
  861. /**
  862. * 通讯地址到client_id的转换
  863. * @return string
  864. */
  865. public static function addressToClientId($local_ip, $local_port, $connection_id)
  866. {
  867. return bin2hex(pack('NnN', $local_ip, $local_port, $connection_id));
  868. }
  869. /**
  870. * client_id到通讯地址的转换
  871. * @return array
  872. */
  873. public static function clientIdToAddress($client_id)
  874. {
  875. if(strlen($client_id) !== 20)
  876. {
  877. throw new \Exception("client_id $client_id is invalid");
  878. }
  879. return unpack('Nlocal_ip/nlocal_port/Nconnection_id' ,pack('H*', $client_id));
  880. }
  881. }
  882. /**
  883. * Gateway与Worker间通讯的二进制协议
  884. *
  885. * struct GatewayProtocol
  886. * {
  887. * unsigned int pack_len,
  888. * unsigned char cmd,//命令字
  889. * unsigned int local_ip,
  890. * unsigned short local_port,
  891. * unsigned int client_ip,
  892. * unsigned short client_port,
  893. * unsigned int connection_id,
  894. * unsigned char flag,
  895. * unsigned short gateway_port,
  896. * unsigned int ext_len,
  897. * char[ext_len] ext_data,
  898. * char[pack_length-HEAD_LEN] body//包体
  899. * }
  900. * NCNnNnNCnN
  901. */
  902. class GatewayProtocol
  903. {
  904. // 发给worker,gateway有一个新的连接
  905. const CMD_ON_CONNECTION = 1;
  906. // 发给worker的,客户端有消息
  907. const CMD_ON_MESSAGE = 3;
  908. // 发给worker上的关闭链接事件
  909. const CMD_ON_CLOSE = 4;
  910. // 发给gateway的向单个用户发送数据
  911. const CMD_SEND_TO_ONE = 5;
  912. // 发给gateway的向所有用户发送数据
  913. const CMD_SEND_TO_ALL = 6;
  914. // 发给gateway的踢出用户
  915. const CMD_KICK = 7;
  916. // 发给gateway,通知用户session更新
  917. const CMD_UPDATE_SESSION = 9;
  918. // 获取在线状态
  919. const CMD_GET_ALL_CLIENT_INFO = 10;
  920. // 判断是否在线
  921. const CMD_IS_ONLINE = 11;
  922. // client_id绑定到uid
  923. const CMD_BIND_UID = 12;
  924. // 解绑
  925. const CMD_UNBIND_UID = 13;
  926. // 向uid发送数据
  927. const CMD_SEND_TO_UID = 14;
  928. // 根据uid获取绑定的clientid
  929. const CMD_GET_CLIENT_ID_BY_UID = 15;
  930. // 加入组
  931. const CMD_JOIN_GROUP = 20;
  932. // 离开组
  933. const CMD_LEAVE_GROUP = 21;
  934. // 向组成员发消息
  935. const CMD_SEND_TO_GROUP = 22;
  936. // 获取组成员
  937. const CMD_GET_CLINET_INFO_BY_GROUP = 23;
  938. // 获取组成员数
  939. const CMD_GET_CLIENT_COUNT_BY_GROUP = 24;
  940. // worker连接gateway事件
  941. const CMD_WORKER_CONNECT = 200;
  942. // 心跳
  943. const CMD_PING = 201;
  944. // GatewayClient连接gateway事件
  945. const CMD_GATEWAY_CLIENT_CONNECT = 202;
  946. // 根据client_id获取session
  947. const CMD_GET_SESSION_BY_CLIENT_ID = 203;
  948. // 发给gateway,覆盖session
  949. const CMD_SET_SESSION = 204;
  950. // 包体是标量
  951. const FLAG_BODY_IS_SCALAR = 0x01;
  952. // 通知gateway在send时不调用协议encode方法,在广播组播时提升性能
  953. const FLAG_NOT_CALL_ENCODE = 0x02;
  954. /**
  955. * 包头长度
  956. *
  957. * @var int
  958. */
  959. const HEAD_LEN = 28;
  960. public static $empty = array(
  961. 'cmd' => 0,
  962. 'local_ip' => 0,
  963. 'local_port' => 0,
  964. 'client_ip' => 0,
  965. 'client_port' => 0,
  966. 'connection_id' => 0,
  967. 'flag' => 0,
  968. 'gateway_port' => 0,
  969. 'ext_data' => '',
  970. 'body' => '',
  971. );
  972. /**
  973. * 返回包长度
  974. *
  975. * @param string $buffer
  976. * @return int return current package length
  977. */
  978. public static function input($buffer)
  979. {
  980. if (strlen($buffer) < self::HEAD_LEN) {
  981. return 0;
  982. }
  983. $data = unpack("Npack_len", $buffer);
  984. return $data['pack_len'];
  985. }
  986. /**
  987. * 获取整个包的 buffer
  988. *
  989. * @param mixed $data
  990. * @return string
  991. */
  992. public static function encode($data)
  993. {
  994. $flag = (int)is_scalar($data['body']);
  995. if (!$flag) {
  996. $data['body'] = serialize($data['body']);
  997. }
  998. $data['flag'] |= $flag;
  999. $ext_len = strlen($data['ext_data']);
  1000. $package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
  1001. return pack("NCNnNnNCnN", $package_len,
  1002. $data['cmd'], $data['local_ip'],
  1003. $data['local_port'], $data['client_ip'],
  1004. $data['client_port'], $data['connection_id'],
  1005. $data['flag'], $data['gateway_port'],
  1006. $ext_len) . $data['ext_data'] . $data['body'];
  1007. }
  1008. /**
  1009. * 从二进制数据转换为数组
  1010. *
  1011. * @param string $buffer
  1012. * @return array
  1013. */
  1014. public static function decode($buffer)
  1015. {
  1016. $data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nconnection_id/Cflag/ngateway_port/Next_len",
  1017. $buffer);
  1018. if ($data['ext_len'] > 0) {
  1019. $data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']);
  1020. if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
  1021. $data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']);
  1022. } else {
  1023. $data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len']));
  1024. }
  1025. } else {
  1026. $data['ext_data'] = '';
  1027. if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
  1028. $data['body'] = substr($buffer, self::HEAD_LEN);
  1029. } else {
  1030. $data['body'] = unserialize(substr($buffer, self::HEAD_LEN));
  1031. }
  1032. }
  1033. return $data;
  1034. }
  1035. }