SqsClient.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. <?php
  2. namespace Aws\Sqs;
  3. use Aws\AwsClient;
  4. use Aws\CommandInterface;
  5. use Aws\Sqs\Exception\SqsException;
  6. use GuzzleHttp\Psr7\Uri;
  7. use GuzzleHttp\Psr7\UriResolver;
  8. use Psr\Http\Message\RequestInterface;
  9. /**
  10. * Client used to interact Amazon Simple Queue Service (Amazon SQS)
  11. *
  12. * @method \Aws\Result addPermission(array $args = [])
  13. * @method \GuzzleHttp\Promise\Promise addPermissionAsync(array $args = [])
  14. * @method \Aws\Result changeMessageVisibility(array $args = [])
  15. * @method \GuzzleHttp\Promise\Promise changeMessageVisibilityAsync(array $args = [])
  16. * @method \Aws\Result changeMessageVisibilityBatch(array $args = [])
  17. * @method \GuzzleHttp\Promise\Promise changeMessageVisibilityBatchAsync(array $args = [])
  18. * @method \Aws\Result createQueue(array $args = [])
  19. * @method \GuzzleHttp\Promise\Promise createQueueAsync(array $args = [])
  20. * @method \Aws\Result deleteMessage(array $args = [])
  21. * @method \GuzzleHttp\Promise\Promise deleteMessageAsync(array $args = [])
  22. * @method \Aws\Result deleteMessageBatch(array $args = [])
  23. * @method \GuzzleHttp\Promise\Promise deleteMessageBatchAsync(array $args = [])
  24. * @method \Aws\Result deleteQueue(array $args = [])
  25. * @method \GuzzleHttp\Promise\Promise deleteQueueAsync(array $args = [])
  26. * @method \Aws\Result getQueueAttributes(array $args = [])
  27. * @method \GuzzleHttp\Promise\Promise getQueueAttributesAsync(array $args = [])
  28. * @method \Aws\Result getQueueUrl(array $args = [])
  29. * @method \GuzzleHttp\Promise\Promise getQueueUrlAsync(array $args = [])
  30. * @method \Aws\Result listDeadLetterSourceQueues(array $args = [])
  31. * @method \GuzzleHttp\Promise\Promise listDeadLetterSourceQueuesAsync(array $args = [])
  32. * @method \Aws\Result listQueueTags(array $args = [])
  33. * @method \GuzzleHttp\Promise\Promise listQueueTagsAsync(array $args = [])
  34. * @method \Aws\Result listQueues(array $args = [])
  35. * @method \GuzzleHttp\Promise\Promise listQueuesAsync(array $args = [])
  36. * @method \Aws\Result purgeQueue(array $args = [])
  37. * @method \GuzzleHttp\Promise\Promise purgeQueueAsync(array $args = [])
  38. * @method \Aws\Result receiveMessage(array $args = [])
  39. * @method \GuzzleHttp\Promise\Promise receiveMessageAsync(array $args = [])
  40. * @method \Aws\Result removePermission(array $args = [])
  41. * @method \GuzzleHttp\Promise\Promise removePermissionAsync(array $args = [])
  42. * @method \Aws\Result sendMessage(array $args = [])
  43. * @method \GuzzleHttp\Promise\Promise sendMessageAsync(array $args = [])
  44. * @method \Aws\Result sendMessageBatch(array $args = [])
  45. * @method \GuzzleHttp\Promise\Promise sendMessageBatchAsync(array $args = [])
  46. * @method \Aws\Result setQueueAttributes(array $args = [])
  47. * @method \GuzzleHttp\Promise\Promise setQueueAttributesAsync(array $args = [])
  48. * @method \Aws\Result tagQueue(array $args = [])
  49. * @method \GuzzleHttp\Promise\Promise tagQueueAsync(array $args = [])
  50. * @method \Aws\Result untagQueue(array $args = [])
  51. * @method \GuzzleHttp\Promise\Promise untagQueueAsync(array $args = [])
  52. */
  53. class SqsClient extends AwsClient
  54. {
  55. public function __construct(array $config)
  56. {
  57. parent::__construct($config);
  58. $list = $this->getHandlerList();
  59. $list->appendBuild($this->queueUrl(), 'sqs.queue_url');
  60. $list->appendSign($this->validateMd5(), 'sqs.md5');
  61. }
  62. /**
  63. * Converts a queue URL into a queue ARN.
  64. *
  65. * @param string $queueUrl The queue URL to perform the action on.
  66. * Retrieved when the queue is first created.
  67. *
  68. * @return string An ARN representation of the queue URL.
  69. */
  70. public function getQueueArn($queueUrl)
  71. {
  72. $queueArn = strtr($queueUrl, [
  73. 'http://' => 'arn:aws:',
  74. 'https://' => 'arn:aws:',
  75. '.amazonaws.com' => '',
  76. '/' => ':',
  77. '.' => ':',
  78. ]);
  79. // Cope with SQS' .fifo / :fifo arn inconsistency
  80. if (substr($queueArn, -5) === ':fifo') {
  81. $queueArn = substr_replace($queueArn, '.fifo', -5);
  82. }
  83. return $queueArn;
  84. }
  85. /**
  86. * Moves the URI of the queue to the URI in the input parameter.
  87. *
  88. * @return callable
  89. */
  90. private function queueUrl()
  91. {
  92. return static function (callable $handler) {
  93. return function (
  94. CommandInterface $c,
  95. RequestInterface $r = null
  96. ) use ($handler) {
  97. if ($c->hasParam('QueueUrl')) {
  98. $r = $r->withUri(UriResolver::resolve(
  99. $r->getUri(),
  100. new Uri($c['QueueUrl'])
  101. ));
  102. }
  103. return $handler($c, $r);
  104. };
  105. };
  106. }
  107. /**
  108. * Calculates the expected md5 hash of message attributes according to the encoding
  109. * scheme detailed in SQS documentation.
  110. *
  111. * @param array $message Message containing attributes for validation.
  112. * Retrieved when using MessageAttributeNames on
  113. * ReceiveMessage.
  114. *
  115. * @return string|null The md5 hash of the message attributes according to
  116. * the encoding scheme. Returns null when there are no
  117. * attributes.
  118. * @link http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html#message-attributes-items-validation
  119. */
  120. private static function calculateMessageAttributesMd5($message)
  121. {
  122. if (empty($message['MessageAttributes'])
  123. || !is_array($message['MessageAttributes'])
  124. ) {
  125. return null;
  126. }
  127. ksort($message['MessageAttributes']);
  128. $attributeValues = "";
  129. foreach ($message['MessageAttributes'] as $name => $details) {
  130. $attributeValues .= self::getEncodedStringPiece($name);
  131. $attributeValues .= self::getEncodedStringPiece($details['DataType']);
  132. if (substr($details['DataType'], 0, 6) === 'Binary') {
  133. $attributeValues .= pack('c', 0x02);
  134. $attributeValues .= self::getEncodedBinaryPiece(
  135. $details['BinaryValue']
  136. );
  137. } else {
  138. $attributeValues .= pack('c', 0x01);
  139. $attributeValues .= self::getEncodedStringPiece(
  140. $details['StringValue']
  141. );
  142. }
  143. }
  144. return md5($attributeValues);
  145. }
  146. private static function calculateBodyMd5($message)
  147. {
  148. return md5($message['Body']);
  149. }
  150. private static function getEncodedStringPiece($piece)
  151. {
  152. $utf8Piece = iconv(
  153. mb_detect_encoding($piece, mb_detect_order(), true),
  154. "UTF-8",
  155. $piece
  156. );
  157. return self::getFourBytePieceLength($utf8Piece) . $utf8Piece;
  158. }
  159. private static function getEncodedBinaryPiece($piece)
  160. {
  161. return self::getFourBytePieceLength($piece) . $piece;
  162. }
  163. private static function getFourBytePieceLength($piece)
  164. {
  165. return pack('N', (int)strlen($piece));
  166. }
  167. /**
  168. * Validates ReceiveMessage body and message attribute MD5s.
  169. *
  170. * @return callable
  171. */
  172. private function validateMd5()
  173. {
  174. return static function (callable $handler) {
  175. return function (
  176. CommandInterface $c,
  177. RequestInterface $r = null
  178. ) use ($handler) {
  179. if ($c->getName() !== 'ReceiveMessage') {
  180. return $handler($c, $r);
  181. }
  182. return $handler($c, $r)
  183. ->then(
  184. function ($result) use ($c, $r) {
  185. foreach ((array) $result['Messages'] as $msg) {
  186. $bodyMd5 = self::calculateBodyMd5($msg);
  187. if (isset($msg['MD5OfBody'])
  188. && $bodyMd5 !== $msg['MD5OfBody']
  189. ) {
  190. throw new SqsException(
  191. sprintf(
  192. 'MD5 mismatch. Expected %s, found %s',
  193. $msg['MD5OfBody'],
  194. $bodyMd5
  195. ),
  196. $c,
  197. [
  198. 'code' => 'ClientChecksumMismatch',
  199. 'request' => $r
  200. ]
  201. );
  202. }
  203. if (isset($msg['MD5OfMessageAttributes'])) {
  204. $messageAttributesMd5 = self::calculateMessageAttributesMd5($msg);
  205. if ($messageAttributesMd5 !== $msg['MD5OfMessageAttributes']) {
  206. throw new SqsException(
  207. sprintf(
  208. 'Attribute MD5 mismatch. Expected %s, found %s',
  209. $msg['MD5OfMessageAttributes'],
  210. $messageAttributesMd5
  211. ? $messageAttributesMd5
  212. : 'No Attributes'
  213. ),
  214. $c,
  215. [
  216. 'code' => 'ClientChecksumMismatch',
  217. 'request' => $r
  218. ]
  219. );
  220. }
  221. } else if (isset($msg['MessageAttributes'])) {
  222. throw new SqsException(
  223. sprintf(
  224. 'No Attribute MD5 found. Expected %s',
  225. self::calculateMessageAttributesMd5($msg)
  226. ),
  227. $c,
  228. [
  229. 'code' => 'ClientChecksumMismatch',
  230. 'request' => $r
  231. ]
  232. );
  233. }
  234. }
  235. return $result;
  236. }
  237. );
  238. };
  239. };
  240. }
  241. }