EventParsingIterator.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use \Iterator;
  4. use Aws\Exception\EventStreamDataException;
  5. use Aws\Api\Parser\Exception\ParserException;
  6. use Aws\Api\StructureShape;
  7. use Psr\Http\Message\StreamInterface;
  8. /**
  9. * @internal Implements a decoder for a binary encoded event stream that will
  10. * decode, validate, and provide individual events from the stream.
  11. */
  12. class EventParsingIterator implements Iterator
  13. {
  14. /** @var StreamInterface */
  15. private $decodingIterator;
  16. /** @var StructureShape */
  17. private $shape;
  18. /** @var AbstractParser */
  19. private $parser;
  20. public function __construct(
  21. StreamInterface $stream,
  22. StructureShape $shape,
  23. AbstractParser $parser
  24. ) {
  25. $this->decodingIterator = new DecodingEventStreamIterator($stream);
  26. $this->shape = $shape;
  27. $this->parser = $parser;
  28. }
  29. public function current()
  30. {
  31. return $this->parseEvent($this->decodingIterator->current());
  32. }
  33. public function key()
  34. {
  35. return $this->decodingIterator->key();
  36. }
  37. public function next()
  38. {
  39. $this->decodingIterator->next();
  40. }
  41. public function rewind()
  42. {
  43. $this->decodingIterator->rewind();
  44. }
  45. public function valid()
  46. {
  47. return $this->decodingIterator->valid();
  48. }
  49. private function parseEvent(array $event)
  50. {
  51. if (!empty($event['headers'][':message-type'])) {
  52. if ($event['headers'][':message-type'] === 'error') {
  53. return $this->parseError($event);
  54. }
  55. if ($event['headers'][':message-type'] !== 'event') {
  56. throw new ParserException('Failed to parse unknown message type.');
  57. }
  58. }
  59. if (empty($event['headers'][':event-type'])) {
  60. throw new ParserException('Failed to parse without event type.');
  61. }
  62. $eventShape = $this->shape->getMember($event['headers'][':event-type']);
  63. $parsedEvent = [];
  64. foreach ($eventShape['members'] as $shape => $details) {
  65. if (!empty($details['eventpayload'])) {
  66. $payloadShape = $eventShape->getMember($shape);
  67. if ($payloadShape['type'] === 'blob') {
  68. $parsedEvent[$shape] = $event['payload'];
  69. } else {
  70. $parsedEvent[$shape] = $this->parser->parseMemberFromStream(
  71. $event['payload'],
  72. $payloadShape,
  73. null
  74. );
  75. }
  76. } else {
  77. $parsedEvent[$shape] = $event['headers'][$shape];
  78. }
  79. }
  80. return [
  81. $event['headers'][':event-type'] => $parsedEvent
  82. ];
  83. }
  84. private function parseError(array $event)
  85. {
  86. throw new EventStreamDataException(
  87. $event['headers'][':error-code'],
  88. $event['headers'][':error-message']
  89. );
  90. }
  91. }