Transfer.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. <?php
  2. namespace Aws\S3;
  3. use Aws;
  4. use Aws\CommandInterface;
  5. use Aws\Exception\AwsException;
  6. use GuzzleHttp\Promise;
  7. use GuzzleHttp\Promise\PromisorInterface;
  8. use Iterator;
  9. /**
  10. * Transfers files from the local filesystem to S3 or from S3 to the local
  11. * filesystem.
  12. *
  13. * This class does not support copying from the local filesystem to somewhere
  14. * else on the local filesystem or from one S3 bucket to another.
  15. */
  16. class Transfer implements PromisorInterface
  17. {
  18. private $client;
  19. private $promise;
  20. private $source;
  21. private $sourceMetadata;
  22. private $destination;
  23. private $concurrency;
  24. private $mupThreshold;
  25. private $before;
  26. private $s3Args = [];
  27. /**
  28. * When providing the $source argument, you may provide a string referencing
  29. * the path to a directory on disk to upload, an s3 scheme URI that contains
  30. * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
  31. * that yields strings containing filenames that are the path to a file on
  32. * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
  33. * access point ARN. The "/key" portion of an s3 URI is optional.
  34. *
  35. * When providing an iterator for the $source argument, you must also
  36. * provide a 'base_dir' key value pair in the $options argument.
  37. *
  38. * The $dest argument can be the path to a directory on disk or an s3
  39. * scheme URI (e.g., "s3://bucket/key").
  40. *
  41. * The options array can contain the following key value pairs:
  42. *
  43. * - base_dir: (string) Base directory of the source, if $source is an
  44. * iterator. If the $source option is not an array, then this option is
  45. * ignored.
  46. * - before: (callable) A callback to invoke before each transfer. The
  47. * callback accepts a single argument: Aws\CommandInterface $command.
  48. * The provided command will be either a GetObject, PutObject,
  49. * InitiateMultipartUpload, or UploadPart command.
  50. * - mup_threshold: (int) Size in bytes in which a multipart upload should
  51. * be used instead of PutObject. Defaults to 20971520 (20 MB).
  52. * - concurrency: (int, default=5) Number of files to upload concurrently.
  53. * The ideal concurrency value will vary based on the number of files
  54. * being uploaded and the average size of each file. Generally speaking,
  55. * smaller files benefit from a higher concurrency while larger files
  56. * will not.
  57. * - debug: (bool) Set to true to print out debug information for
  58. * transfers. Set to an fopen() resource to write to a specific stream
  59. * rather than writing to STDOUT.
  60. *
  61. * @param S3ClientInterface $client Client used for transfers.
  62. * @param string|Iterator $source Where the files are transferred from.
  63. * @param string $dest Where the files are transferred to.
  64. * @param array $options Hash of options.
  65. */
  66. public function __construct(
  67. S3ClientInterface $client,
  68. $source,
  69. $dest,
  70. array $options = []
  71. ) {
  72. $this->client = $client;
  73. // Prepare the destination.
  74. $this->destination = $this->prepareTarget($dest);
  75. if ($this->destination['scheme'] === 's3') {
  76. $this->s3Args = $this->getS3Args($this->destination['path']);
  77. }
  78. // Prepare the source.
  79. if (is_string($source)) {
  80. $this->sourceMetadata = $this->prepareTarget($source);
  81. $this->source = $source;
  82. } elseif ($source instanceof Iterator) {
  83. if (empty($options['base_dir'])) {
  84. throw new \InvalidArgumentException('You must provide the source'
  85. . ' argument as a string or provide the "base_dir" option.');
  86. }
  87. $this->sourceMetadata = $this->prepareTarget($options['base_dir']);
  88. $this->source = $source;
  89. } else {
  90. throw new \InvalidArgumentException('source must be the path to a '
  91. . 'directory or an iterator that yields file names.');
  92. }
  93. // Validate schemes.
  94. if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
  95. throw new \InvalidArgumentException("You cannot copy from"
  96. . " {$this->sourceMetadata['scheme']} to"
  97. . " {$this->destination['scheme']}."
  98. );
  99. }
  100. // Handle multipart-related options.
  101. $this->concurrency = isset($options['concurrency'])
  102. ? $options['concurrency']
  103. : MultipartUploader::DEFAULT_CONCURRENCY;
  104. $this->mupThreshold = isset($options['mup_threshold'])
  105. ? $options['mup_threshold']
  106. : 16777216;
  107. if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
  108. throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
  109. }
  110. // Handle "before" callback option.
  111. if (isset($options['before'])) {
  112. $this->before = $options['before'];
  113. if (!is_callable($this->before)) {
  114. throw new \InvalidArgumentException('before must be a callable.');
  115. }
  116. }
  117. // Handle "debug" option.
  118. if (isset($options['debug'])) {
  119. if ($options['debug'] === true) {
  120. $options['debug'] = fopen('php://output', 'w');
  121. }
  122. if (is_resource($options['debug'])) {
  123. $this->addDebugToBefore($options['debug']);
  124. }
  125. }
  126. }
  127. /**
  128. * Transfers the files.
  129. */
  130. public function promise()
  131. {
  132. // If the promise has been created, just return it.
  133. if (!$this->promise) {
  134. // Create an upload/download promise for the transfer.
  135. $this->promise = $this->sourceMetadata['scheme'] === 'file'
  136. ? $this->createUploadPromise()
  137. : $this->createDownloadPromise();
  138. }
  139. return $this->promise;
  140. }
  141. /**
  142. * Transfers the files synchronously.
  143. */
  144. public function transfer()
  145. {
  146. $this->promise()->wait();
  147. }
  148. private function prepareTarget($targetPath)
  149. {
  150. $target = [
  151. 'path' => $this->normalizePath($targetPath),
  152. 'scheme' => $this->determineScheme($targetPath),
  153. ];
  154. if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
  155. throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
  156. }
  157. return $target;
  158. }
  159. /**
  160. * Creates an array that contains Bucket and Key by parsing the filename.
  161. *
  162. * @param string $path Path to parse.
  163. *
  164. * @return array
  165. */
  166. private function getS3Args($path)
  167. {
  168. $parts = explode('/', str_replace('s3://', '', $path), 2);
  169. $args = ['Bucket' => $parts[0]];
  170. if (isset($parts[1])) {
  171. $args['Key'] = $parts[1];
  172. }
  173. return $args;
  174. }
  175. /**
  176. * Parses the scheme from a filename.
  177. *
  178. * @param string $path Path to parse.
  179. *
  180. * @return string
  181. */
  182. private function determineScheme($path)
  183. {
  184. return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
  185. }
  186. /**
  187. * Normalize a path so that it has UNIX-style directory separators and no trailing /
  188. *
  189. * @param string $path
  190. *
  191. * @return string
  192. */
  193. private function normalizePath($path)
  194. {
  195. return rtrim(str_replace('\\', '/', $path), '/');
  196. }
  197. private function resolveUri($uri)
  198. {
  199. $resolved = [];
  200. $sections = explode('/', $uri);
  201. foreach ($sections as $section) {
  202. if ($section === '.' || $section === '') {
  203. continue;
  204. }
  205. if ($section === '..') {
  206. array_pop($resolved);
  207. } else {
  208. $resolved []= $section;
  209. }
  210. }
  211. return ($uri[0] === '/' ? '/' : '')
  212. . implode('/', $resolved);
  213. }
  214. private function createDownloadPromise()
  215. {
  216. $parts = $this->getS3Args($this->sourceMetadata['path']);
  217. $prefix = "s3://{$parts['Bucket']}/"
  218. . (isset($parts['Key']) ? $parts['Key'] . '/' : '');
  219. $commands = [];
  220. foreach ($this->getDownloadsIterator() as $object) {
  221. // Prepare the sink.
  222. $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
  223. $resolveSink = $this->destination['path'] . '/';
  224. if (isset($parts['Key']) && strpos($objectKey, $parts['Key']) !== 0) {
  225. $resolveSink .= $parts['Key'] . '/';
  226. }
  227. $resolveSink .= $objectKey;
  228. $sink = $this->destination['path'] . '/' . $objectKey;
  229. $command = $this->client->getCommand(
  230. 'GetObject',
  231. $this->getS3Args($object) + ['@http' => ['sink' => $sink]]
  232. );
  233. if (strpos(
  234. $this->resolveUri($resolveSink),
  235. $this->destination['path']
  236. ) !== 0
  237. ) {
  238. throw new AwsException(
  239. 'Cannot download key ' . $objectKey
  240. . ', its relative path resolves outside the'
  241. . ' parent directory', $command);
  242. }
  243. // Create the directory if needed.
  244. $dir = dirname($sink);
  245. if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
  246. throw new \RuntimeException("Could not create dir: {$dir}");
  247. }
  248. // Create the command.
  249. $commands []= $command;
  250. }
  251. // Create a GetObject command pool and return the promise.
  252. return (new Aws\CommandPool($this->client, $commands, [
  253. 'concurrency' => $this->concurrency,
  254. 'before' => $this->before,
  255. 'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) {
  256. $p->reject($reason);
  257. }
  258. ]))->promise();
  259. }
  260. private function createUploadPromise()
  261. {
  262. // Map each file into a promise that performs the actual transfer.
  263. $files = \Aws\map($this->getUploadsIterator(), function ($file) {
  264. return (filesize($file) >= $this->mupThreshold)
  265. ? $this->uploadMultipart($file)
  266. : $this->upload($file);
  267. });
  268. // Create an EachPromise, that will concurrently handle the upload
  269. // operations' yielded promises from the iterator.
  270. return Promise\each_limit_all($files, $this->concurrency);
  271. }
  272. /** @return Iterator */
  273. private function getUploadsIterator()
  274. {
  275. if (is_string($this->source)) {
  276. return Aws\filter(
  277. Aws\recursive_dir_iterator($this->sourceMetadata['path']),
  278. function ($file) { return !is_dir($file); }
  279. );
  280. }
  281. return $this->source;
  282. }
  283. /** @return Iterator */
  284. private function getDownloadsIterator()
  285. {
  286. if (is_string($this->source)) {
  287. $listArgs = $this->getS3Args($this->sourceMetadata['path']);
  288. if (isset($listArgs['Key'])) {
  289. $listArgs['Prefix'] = $listArgs['Key'] . '/';
  290. unset($listArgs['Key']);
  291. }
  292. $files = $this->client
  293. ->getPaginator('ListObjects', $listArgs)
  294. ->search('Contents[].Key');
  295. $files = Aws\map($files, function ($key) use ($listArgs) {
  296. return "s3://{$listArgs['Bucket']}/$key";
  297. });
  298. return Aws\filter($files, function ($key) {
  299. return substr($key, -1, 1) !== '/';
  300. });
  301. }
  302. return $this->source;
  303. }
  304. private function upload($filename)
  305. {
  306. $args = $this->s3Args;
  307. $args['SourceFile'] = $filename;
  308. $args['Key'] = $this->createS3Key($filename);
  309. $command = $this->client->getCommand('PutObject', $args);
  310. $this->before and call_user_func($this->before, $command);
  311. return $this->client->executeAsync($command);
  312. }
  313. private function uploadMultipart($filename)
  314. {
  315. $args = $this->s3Args;
  316. $args['Key'] = $this->createS3Key($filename);
  317. $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
  318. return (new MultipartUploader($this->client, $filename, [
  319. 'bucket' => $args['Bucket'],
  320. 'key' => $args['Key'],
  321. 'before_initiate' => $this->before,
  322. 'before_upload' => $this->before,
  323. 'before_complete' => $this->before,
  324. 'concurrency' => $this->concurrency,
  325. ]))->promise();
  326. }
  327. private function createS3Key($filename)
  328. {
  329. $filename = $this->normalizePath($filename);
  330. $relative_file_path = ltrim(
  331. preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
  332. '/\\'
  333. );
  334. if (isset($this->s3Args['Key'])) {
  335. return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
  336. }
  337. return $relative_file_path;
  338. }
  339. private function addDebugToBefore($debug)
  340. {
  341. $before = $this->before;
  342. $sourcePath = $this->sourceMetadata['path'];
  343. $s3Args = $this->s3Args;
  344. $this->before = static function (
  345. CommandInterface $command
  346. ) use ($before, $debug, $sourcePath, $s3Args) {
  347. // Call the composed before function.
  348. $before and $before($command);
  349. // Determine the source and dest values based on operation.
  350. switch ($operation = $command->getName()) {
  351. case 'GetObject':
  352. $source = "s3://{$command['Bucket']}/{$command['Key']}";
  353. $dest = $command['@http']['sink'];
  354. break;
  355. case 'PutObject':
  356. $source = $command['SourceFile'];
  357. $dest = "s3://{$command['Bucket']}/{$command['Key']}";
  358. break;
  359. case 'UploadPart':
  360. $part = $command['PartNumber'];
  361. case 'CreateMultipartUpload':
  362. case 'CompleteMultipartUpload':
  363. $sourceKey = $command['Key'];
  364. if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
  365. $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
  366. }
  367. $source = "{$sourcePath}/{$sourceKey}";
  368. $dest = "s3://{$command['Bucket']}/{$command['Key']}";
  369. break;
  370. default:
  371. throw new \UnexpectedValueException(
  372. "Transfer encountered an unexpected operation: {$operation}."
  373. );
  374. }
  375. // Print the debugging message.
  376. $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
  377. if (isset($part)) {
  378. $context .= " : Part={$part}";
  379. }
  380. fwrite($debug, "Transferring {$context}\n");
  381. };
  382. }
  383. }