Proxy.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. <?php
  2. namespace think\swoole\pool;
  3. use Closure;
  4. use RuntimeException;
  5. use Smf\ConnectionPool\ConnectionPool;
  6. use Smf\ConnectionPool\Connectors\ConnectorInterface;
  7. use Swoole\Coroutine;
  8. use Swoole\Event;
  9. use think\swoole\coroutine\Context;
  10. use think\swoole\Pool;
  11. abstract class Proxy
  12. {
  13. const KEY_RELEASED = '__released';
  14. protected $pool;
  15. /**
  16. * Proxy constructor.
  17. * @param Closure $creator
  18. * @param array $config
  19. */
  20. public function __construct($creator, $config)
  21. {
  22. $this->pool = new ConnectionPool(
  23. Pool::pullPoolConfig($config),
  24. new class($creator) implements ConnectorInterface {
  25. protected $creator;
  26. public function __construct($creator)
  27. {
  28. $this->creator = $creator;
  29. }
  30. public function connect(array $config)
  31. {
  32. return call_user_func($this->creator);
  33. }
  34. public function disconnect($connection)
  35. {
  36. //强制回收内存,完成连接释放
  37. Event::defer(function () {
  38. Coroutine::create('gc_collect_cycles');
  39. });
  40. }
  41. public function isConnected($connection): bool
  42. {
  43. return true;
  44. }
  45. public function reset($connection, array $config)
  46. {
  47. }
  48. public function validate($connection): bool
  49. {
  50. return true;
  51. }
  52. },
  53. []
  54. );
  55. $this->pool->init();
  56. }
  57. protected function getPoolConnection()
  58. {
  59. return Context::rememberData('connection.' . spl_object_id($this), function () {
  60. $connection = $this->pool->borrow();
  61. $connection->{static::KEY_RELEASED} = false;
  62. Coroutine::defer(function () use ($connection) {
  63. //自动归还
  64. $connection->{static::KEY_RELEASED} = true;
  65. $this->pool->return($connection);
  66. });
  67. return $connection;
  68. });
  69. }
  70. public function release()
  71. {
  72. $connection = $this->getPoolConnection();
  73. if ($connection->{static::KEY_RELEASED}) {
  74. return;
  75. }
  76. $this->pool->return($connection);
  77. }
  78. public function __call($method, $arguments)
  79. {
  80. $connection = $this->getPoolConnection();
  81. if ($connection->{static::KEY_RELEASED}) {
  82. throw new RuntimeException('Connection already has been released!');
  83. }
  84. return $connection->{$method}(...$arguments);
  85. }
  86. }