DecodingEventStreamIterator.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use \Iterator;
  4. use Aws\Api\DateTimeResult;
  5. use GuzzleHttp\Psr7;
  6. use Psr\Http\Message\StreamInterface;
  7. use Aws\Api\Parser\Exception\ParserException;
  8. /**
  9. * @internal Implements a decoder for a binary encoded event stream that will
  10. * decode, validate, and provide individual events from the stream.
  11. */
  12. class DecodingEventStreamIterator implements Iterator
  13. {
  14. const HEADERS = 'headers';
  15. const PAYLOAD = 'payload';
  16. const LENGTH_TOTAL = 'total_length';
  17. const LENGTH_HEADERS = 'headers_length';
  18. const CRC_PRELUDE = 'prelude_crc';
  19. const BYTES_PRELUDE = 12;
  20. const BYTES_TRAILING = 4;
  21. private static $preludeFormat = [
  22. self::LENGTH_TOTAL => 'decodeUint32',
  23. self::LENGTH_HEADERS => 'decodeUint32',
  24. self::CRC_PRELUDE => 'decodeUint32',
  25. ];
  26. private static $lengthFormatMap = [
  27. 1 => 'decodeUint8',
  28. 2 => 'decodeUint16',
  29. 4 => 'decodeUint32',
  30. 8 => 'decodeUint64',
  31. ];
  32. private static $headerTypeMap = [
  33. 0 => 'decodeBooleanTrue',
  34. 1 => 'decodeBooleanFalse',
  35. 2 => 'decodeInt8',
  36. 3 => 'decodeInt16',
  37. 4 => 'decodeInt32',
  38. 5 => 'decodeInt64',
  39. 6 => 'decodeBytes',
  40. 7 => 'decodeString',
  41. 8 => 'decodeTimestamp',
  42. 9 => 'decodeUuid',
  43. ];
  44. /** @var StreamInterface Stream of eventstream shape to parse. */
  45. private $stream;
  46. /** @var array Currently parsed event. */
  47. private $currentEvent;
  48. /** @var int Current in-order event key. */
  49. private $key;
  50. /** @var resource|\HashContext CRC32 hash context for event validation */
  51. private $hashContext;
  52. /** @var int $currentPosition */
  53. private $currentPosition;
  54. /**
  55. * DecodingEventStreamIterator constructor.
  56. *
  57. * @param StreamInterface $stream
  58. */
  59. public function __construct(StreamInterface $stream)
  60. {
  61. $this->stream = $stream;
  62. $this->rewind();
  63. }
  64. private function parseHeaders($headerBytes)
  65. {
  66. $headers = [];
  67. $bytesRead = 0;
  68. while ($bytesRead < $headerBytes) {
  69. list($key, $numBytes) = $this->decodeString(1);
  70. $bytesRead += $numBytes;
  71. list($type, $numBytes) = $this->decodeUint8();
  72. $bytesRead += $numBytes;
  73. $f = self::$headerTypeMap[$type];
  74. list($value, $numBytes) = $this->{$f}();
  75. $bytesRead += $numBytes;
  76. if (isset($headers[$key])) {
  77. throw new ParserException('Duplicate key in event headers.');
  78. }
  79. $headers[$key] = $value;
  80. }
  81. return [$headers, $bytesRead];
  82. }
  83. private function parsePrelude()
  84. {
  85. $prelude = [];
  86. $bytesRead = 0;
  87. $calculatedCrc = null;
  88. foreach (self::$preludeFormat as $key => $decodeFunction) {
  89. if ($key === self::CRC_PRELUDE) {
  90. $hashCopy = hash_copy($this->hashContext);
  91. $calculatedCrc = hash_final($this->hashContext, true);
  92. $this->hashContext = $hashCopy;
  93. }
  94. list($value, $numBytes) = $this->{$decodeFunction}();
  95. $bytesRead += $numBytes;
  96. $prelude[$key] = $value;
  97. }
  98. if (unpack('N', $calculatedCrc)[1] !== $prelude[self::CRC_PRELUDE]) {
  99. throw new ParserException('Prelude checksum mismatch.');
  100. }
  101. return [$prelude, $bytesRead];
  102. }
  103. private function parseEvent()
  104. {
  105. $event = [];
  106. if ($this->stream->tell() < $this->stream->getSize()) {
  107. $this->hashContext = hash_init('crc32b');
  108. $bytesLeft = $this->stream->getSize() - $this->stream->tell();
  109. list($prelude, $numBytes) = $this->parsePrelude();
  110. if ($prelude[self::LENGTH_TOTAL] > $bytesLeft) {
  111. throw new ParserException('Message length too long.');
  112. }
  113. $bytesLeft -= $numBytes;
  114. if ($prelude[self::LENGTH_HEADERS] > $bytesLeft) {
  115. throw new ParserException('Headers length too long.');
  116. }
  117. list(
  118. $event[self::HEADERS],
  119. $numBytes
  120. ) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
  121. $event[self::PAYLOAD] = Psr7\stream_for(
  122. $this->readAndHashBytes(
  123. $prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE
  124. - $numBytes - self::BYTES_TRAILING
  125. )
  126. );
  127. $calculatedCrc = hash_final($this->hashContext, true);
  128. $messageCrc = $this->stream->read(4);
  129. if ($calculatedCrc !== $messageCrc) {
  130. throw new ParserException('Message checksum mismatch.');
  131. }
  132. }
  133. return $event;
  134. }
  135. // Iterator Functionality
  136. /**
  137. * @return array
  138. */
  139. public function current()
  140. {
  141. return $this->currentEvent;
  142. }
  143. /**
  144. * @return int
  145. */
  146. public function key()
  147. {
  148. return $this->key;
  149. }
  150. public function next()
  151. {
  152. $this->currentPosition = $this->stream->tell();
  153. if ($this->valid()) {
  154. $this->key++;
  155. $this->currentEvent = $this->parseEvent();
  156. }
  157. }
  158. public function rewind()
  159. {
  160. $this->stream->rewind();
  161. $this->key = 0;
  162. $this->currentPosition = 0;
  163. $this->currentEvent = $this->parseEvent();
  164. }
  165. /**
  166. * @return bool
  167. */
  168. public function valid()
  169. {
  170. return $this->currentPosition < $this->stream->getSize();
  171. }
  172. // Decoding Utilities
  173. private function readAndHashBytes($num)
  174. {
  175. $bytes = $this->stream->read($num);
  176. hash_update($this->hashContext, $bytes);
  177. return $bytes;
  178. }
  179. private function decodeBooleanTrue()
  180. {
  181. return [true, 0];
  182. }
  183. private function decodeBooleanFalse()
  184. {
  185. return [false, 0];
  186. }
  187. private function uintToInt($val, $size)
  188. {
  189. $signedCap = pow(2, $size - 1);
  190. if ($val > $signedCap) {
  191. $val -= (2 * $signedCap);
  192. }
  193. return $val;
  194. }
  195. private function decodeInt8()
  196. {
  197. $val = (int)unpack('C', $this->readAndHashBytes(1))[1];
  198. return [$this->uintToInt($val, 8), 1];
  199. }
  200. private function decodeUint8()
  201. {
  202. return [unpack('C', $this->readAndHashBytes(1))[1], 1];
  203. }
  204. private function decodeInt16()
  205. {
  206. $val = (int)unpack('n', $this->readAndHashBytes(2))[1];
  207. return [$this->uintToInt($val, 16), 2];
  208. }
  209. private function decodeUint16()
  210. {
  211. return [unpack('n', $this->readAndHashBytes(2))[1], 2];
  212. }
  213. private function decodeInt32()
  214. {
  215. $val = (int)unpack('N', $this->readAndHashBytes(4))[1];
  216. return [$this->uintToInt($val, 32), 4];
  217. }
  218. private function decodeUint32()
  219. {
  220. return [unpack('N', $this->readAndHashBytes(4))[1], 4];
  221. }
  222. private function decodeInt64()
  223. {
  224. $val = $this->unpackInt64($this->readAndHashBytes(8))[1];
  225. return [$this->uintToInt($val, 64), 8];
  226. }
  227. private function decodeUint64()
  228. {
  229. return [$this->unpackInt64($this->readAndHashBytes(8))[1], 8];
  230. }
  231. private function unpackInt64($bytes)
  232. {
  233. if (version_compare(PHP_VERSION, '5.6.3', '<')) {
  234. $d = unpack('N2', $bytes);
  235. return [1 => $d[1] << 32 | $d[2]];
  236. }
  237. return unpack('J', $bytes);
  238. }
  239. private function decodeBytes($lengthBytes=2)
  240. {
  241. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  242. throw new ParserException('Undefined variable length format.');
  243. }
  244. $f = self::$lengthFormatMap[$lengthBytes];
  245. list($len, $bytes) = $this->{$f}();
  246. return [$this->readAndHashBytes($len), $len + $bytes];
  247. }
  248. private function decodeString($lengthBytes=2)
  249. {
  250. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  251. throw new ParserException('Undefined variable length format.');
  252. }
  253. $f = self::$lengthFormatMap[$lengthBytes];
  254. list($len, $bytes) = $this->{$f}();
  255. return [$this->readAndHashBytes($len), $len + $bytes];
  256. }
  257. private function decodeTimestamp()
  258. {
  259. list($val, $bytes) = $this->decodeInt64();
  260. return [
  261. DateTimeResult::createFromFormat('U.u', $val / 1000),
  262. $bytes
  263. ];
  264. }
  265. private function decodeUuid()
  266. {
  267. $val = unpack('H32', $this->readAndHashBytes(16))[1];
  268. return [
  269. substr($val, 0, 8) . '-'
  270. . substr($val, 8, 4) . '-'
  271. . substr($val, 12, 4) . '-'
  272. . substr($val, 16, 4) . '-'
  273. . substr($val, 20, 12),
  274. 16
  275. ];
  276. }
  277. }