InteractsWithRpcClient.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. <?php
  2. namespace think\swoole\concerns;
  3. use Smf\ConnectionPool\ConnectionPool;
  4. use Swoole\Server;
  5. use think\App;
  6. use think\swoole\Pool;
  7. use think\swoole\pool\Client;
  8. use think\swoole\rpc\client\Connector;
  9. use think\swoole\rpc\client\Gateway;
  10. use think\swoole\rpc\client\Proxy;
  11. use think\swoole\rpc\JsonParser;
  12. use Throwable;
  13. /**
  14. * Trait InteractsWithRpcClient
  15. * @package think\swoole\concerns
  16. * @property App $app
  17. * @property App $container
  18. * @method Server getServer()
  19. * @method Pool getPools()
  20. */
  21. trait InteractsWithRpcClient
  22. {
  23. protected function prepareRpcClient()
  24. {
  25. $this->onEvent('workerStart', function () {
  26. $this->bindRpcClientPool();
  27. $this->bindRpcInterface();
  28. });
  29. }
  30. protected function bindRpcInterface()
  31. {
  32. //引入rpc接口文件
  33. if (file_exists($rpc = $this->container->getBasePath() . 'rpc.php')) {
  34. /** @noinspection PhpIncludeInspection */
  35. $rpcServices = (array) include $rpc;
  36. //绑定rpc接口
  37. try {
  38. foreach ($rpcServices as $name => $abstracts) {
  39. $parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
  40. $parser = $this->getApplication()->make($parserClass);
  41. $gateway = new Gateway($this->createRpcConnector($name), $parser);
  42. $middleware = $this->getConfig("rpc.client.{$name}.middleware", []);
  43. foreach ($abstracts as $abstract) {
  44. $this->getApplication()
  45. ->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
  46. return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
  47. });
  48. }
  49. }
  50. } catch (Throwable $e) {
  51. }
  52. }
  53. }
  54. protected function bindRpcClientPool()
  55. {
  56. if (!empty($clients = $this->getConfig('rpc.client'))) {
  57. //创建client连接池
  58. foreach ($clients as $name => $config) {
  59. $pool = new ConnectionPool(
  60. Pool::pullPoolConfig($config),
  61. new Client(),
  62. $config
  63. );
  64. $this->getPools()->add("rpc.client.{$name}", $pool);
  65. }
  66. }
  67. }
  68. protected function createRpcConnector($name)
  69. {
  70. $pool = $this->getPools()->get("rpc.client.{$name}");
  71. return new class($pool) implements Connector {
  72. use InteractsWithRpcConnector;
  73. protected $pool;
  74. public function __construct(ConnectionPool $pool)
  75. {
  76. $this->pool = $pool;
  77. }
  78. protected function runWithClient($callback)
  79. {
  80. /** @var \Swoole\Coroutine\Client $client */
  81. $client = $this->pool->borrow();
  82. try {
  83. return $callback($client);
  84. } finally {
  85. $this->pool->return($client);
  86. }
  87. }
  88. };
  89. }
  90. }