api-put-object-streaming.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2017 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/base64"
  22. "fmt"
  23. "io"
  24. "net/http"
  25. "net/url"
  26. "sort"
  27. "strings"
  28. "github.com/google/uuid"
  29. "github.com/minio/minio-go/v7/pkg/s3utils"
  30. )
  31. // putObjectMultipartStream - upload a large object using
  32. // multipart upload and streaming signature for signing payload.
  33. // Comprehensive put object operation involving multipart uploads.
  34. //
  35. // Following code handles these types of readers.
  36. //
  37. // - *minio.Object
  38. // - Any reader which has a method 'ReadAt()'
  39. //
  40. func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
  41. reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
  42. if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
  43. // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
  44. info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
  45. } else {
  46. info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts)
  47. }
  48. if err != nil {
  49. errResp := ToErrorResponse(err)
  50. // Verify if multipart functionality is not available, if not
  51. // fall back to single PutObject operation.
  52. if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
  53. // Verify if size of reader is greater than '5GiB'.
  54. if size > maxSinglePutObjectSize {
  55. return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
  56. }
  57. // Fall back to uploading as single PutObject operation.
  58. return c.putObject(ctx, bucketName, objectName, reader, size, opts)
  59. }
  60. }
  61. return info, err
  62. }
  63. // uploadedPartRes - the response received from a part upload.
  64. type uploadedPartRes struct {
  65. Error error // Any error encountered while uploading the part.
  66. PartNum int // Number of the part uploaded.
  67. Size int64 // Size of the part uploaded.
  68. Part ObjectPart
  69. }
  70. type uploadPartReq struct {
  71. PartNum int // Number of the part uploaded.
  72. Part ObjectPart // Size of the part uploaded.
  73. }
  74. // putObjectMultipartFromReadAt - Uploads files bigger than 128MiB.
  75. // Supports all readers which implements io.ReaderAt interface
  76. // (ReadAt method).
  77. //
  78. // NOTE: This function is meant to be used for all readers which
  79. // implement io.ReaderAt which allows us for resuming multipart
  80. // uploads but reading at an offset, which would avoid re-read the
  81. // data which was already uploaded. Internally this function uses
  82. // temporary files for staging all the data, these temporary files are
  83. // cleaned automatically when the caller i.e http client closes the
  84. // stream after uploading all the contents successfully.
  85. func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
  86. reader io.ReaderAt, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
  87. // Input validation.
  88. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  89. return UploadInfo{}, err
  90. }
  91. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  92. return UploadInfo{}, err
  93. }
  94. // Calculate the optimal parts info for a given size.
  95. totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
  96. if err != nil {
  97. return UploadInfo{}, err
  98. }
  99. // Initiate a new multipart upload.
  100. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  101. if err != nil {
  102. return UploadInfo{}, err
  103. }
  104. // Aborts the multipart upload in progress, if the
  105. // function returns any error, since we do not resume
  106. // we should purge the parts which have been uploaded
  107. // to relinquish storage space.
  108. defer func() {
  109. if err != nil {
  110. c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
  111. }
  112. }()
  113. // Total data read and written to server. should be equal to 'size' at the end of the call.
  114. var totalUploadedSize int64
  115. // Complete multipart upload.
  116. var complMultipartUpload completeMultipartUpload
  117. // Declare a channel that sends the next part number to be uploaded.
  118. // Buffered to 10000 because thats the maximum number of parts allowed
  119. // by S3.
  120. uploadPartsCh := make(chan uploadPartReq, 10000)
  121. // Declare a channel that sends back the response of a part upload.
  122. // Buffered to 10000 because thats the maximum number of parts allowed
  123. // by S3.
  124. uploadedPartsCh := make(chan uploadedPartRes, 10000)
  125. // Used for readability, lastPartNumber is always totalPartsCount.
  126. lastPartNumber := totalPartsCount
  127. // Send each part number to the channel to be processed.
  128. for p := 1; p <= totalPartsCount; p++ {
  129. uploadPartsCh <- uploadPartReq{PartNum: p}
  130. }
  131. close(uploadPartsCh)
  132. var partsBuf = make([][]byte, opts.getNumThreads())
  133. for i := range partsBuf {
  134. partsBuf[i] = make([]byte, 0, partSize)
  135. }
  136. // Receive each part number from the channel allowing three parallel uploads.
  137. for w := 1; w <= opts.getNumThreads(); w++ {
  138. go func(w int, partSize int64) {
  139. // Each worker will draw from the part channel and upload in parallel.
  140. for uploadReq := range uploadPartsCh {
  141. // If partNumber was not uploaded we calculate the missing
  142. // part offset and size. For all other part numbers we
  143. // calculate offset based on multiples of partSize.
  144. readOffset := int64(uploadReq.PartNum-1) * partSize
  145. // As a special case if partNumber is lastPartNumber, we
  146. // calculate the offset based on the last part size.
  147. if uploadReq.PartNum == lastPartNumber {
  148. readOffset = (size - lastPartSize)
  149. partSize = lastPartSize
  150. }
  151. n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize])
  152. if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
  153. uploadedPartsCh <- uploadedPartRes{
  154. Error: rerr,
  155. }
  156. // Exit the goroutine.
  157. return
  158. }
  159. // Get a section reader on a particular offset.
  160. hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress)
  161. // Proceed to upload the part.
  162. objPart, err := c.uploadPart(ctx, bucketName, objectName,
  163. uploadID, hookReader, uploadReq.PartNum,
  164. "", "", partSize, opts.ServerSideEncryption)
  165. if err != nil {
  166. uploadedPartsCh <- uploadedPartRes{
  167. Error: err,
  168. }
  169. // Exit the goroutine.
  170. return
  171. }
  172. // Save successfully uploaded part metadata.
  173. uploadReq.Part = objPart
  174. // Send successful part info through the channel.
  175. uploadedPartsCh <- uploadedPartRes{
  176. Size: objPart.Size,
  177. PartNum: uploadReq.PartNum,
  178. Part: uploadReq.Part,
  179. }
  180. }
  181. }(w, partSize)
  182. }
  183. // Gather the responses as they occur and update any
  184. // progress bar.
  185. for u := 1; u <= totalPartsCount; u++ {
  186. uploadRes := <-uploadedPartsCh
  187. if uploadRes.Error != nil {
  188. return UploadInfo{}, uploadRes.Error
  189. }
  190. // Update the totalUploadedSize.
  191. totalUploadedSize += uploadRes.Size
  192. // Store the parts to be completed in order.
  193. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  194. ETag: uploadRes.Part.ETag,
  195. PartNumber: uploadRes.Part.PartNumber,
  196. })
  197. }
  198. // Verify if we uploaded all the data.
  199. if totalUploadedSize != size {
  200. return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
  201. }
  202. // Sort all completed parts.
  203. sort.Sort(completedParts(complMultipartUpload.Parts))
  204. uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
  205. if err != nil {
  206. return UploadInfo{}, err
  207. }
  208. uploadInfo.Size = totalUploadedSize
  209. return uploadInfo, nil
  210. }
  211. func (c Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string,
  212. reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
  213. // Input validation.
  214. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  215. return UploadInfo{}, err
  216. }
  217. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  218. return UploadInfo{}, err
  219. }
  220. // Calculate the optimal parts info for a given size.
  221. totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
  222. if err != nil {
  223. return UploadInfo{}, err
  224. }
  225. // Initiates a new multipart request
  226. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  227. if err != nil {
  228. return UploadInfo{}, err
  229. }
  230. // Aborts the multipart upload if the function returns
  231. // any error, since we do not resume we should purge
  232. // the parts which have been uploaded to relinquish
  233. // storage space.
  234. defer func() {
  235. if err != nil {
  236. c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
  237. }
  238. }()
  239. // Total data read and written to server. should be equal to 'size' at the end of the call.
  240. var totalUploadedSize int64
  241. // Initialize parts uploaded map.
  242. partsInfo := make(map[int]ObjectPart)
  243. // Create a buffer.
  244. buf := make([]byte, partSize)
  245. // Avoid declaring variables in the for loop
  246. var md5Base64 string
  247. var hookReader io.Reader
  248. // Part number always starts with '1'.
  249. var partNumber int
  250. for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
  251. // Proceed to upload the part.
  252. if partNumber == totalPartsCount {
  253. partSize = lastPartSize
  254. }
  255. if opts.SendContentMd5 {
  256. length, rerr := readFull(reader, buf)
  257. if rerr == io.EOF && partNumber > 1 {
  258. break
  259. }
  260. if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
  261. return UploadInfo{}, rerr
  262. }
  263. // Calculate md5sum.
  264. hash := c.md5Hasher()
  265. hash.Write(buf[:length])
  266. md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
  267. hash.Close()
  268. // Update progress reader appropriately to the latest offset
  269. // as we read from the source.
  270. hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress)
  271. } else {
  272. // Update progress reader appropriately to the latest offset
  273. // as we read from the source.
  274. hookReader = newHook(reader, opts.Progress)
  275. }
  276. objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
  277. io.LimitReader(hookReader, partSize),
  278. partNumber, md5Base64, "", partSize, opts.ServerSideEncryption)
  279. if uerr != nil {
  280. return UploadInfo{}, uerr
  281. }
  282. // Save successfully uploaded part metadata.
  283. partsInfo[partNumber] = objPart
  284. // Save successfully uploaded size.
  285. totalUploadedSize += partSize
  286. }
  287. // Verify if we uploaded all the data.
  288. if size > 0 {
  289. if totalUploadedSize != size {
  290. return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
  291. }
  292. }
  293. // Complete multipart upload.
  294. var complMultipartUpload completeMultipartUpload
  295. // Loop over total uploaded parts to save them in
  296. // Parts array before completing the multipart request.
  297. for i := 1; i < partNumber; i++ {
  298. part, ok := partsInfo[i]
  299. if !ok {
  300. return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
  301. }
  302. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  303. ETag: part.ETag,
  304. PartNumber: part.PartNumber,
  305. })
  306. }
  307. // Sort all completed parts.
  308. sort.Sort(completedParts(complMultipartUpload.Parts))
  309. uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
  310. if err != nil {
  311. return UploadInfo{}, err
  312. }
  313. uploadInfo.Size = totalUploadedSize
  314. return uploadInfo, nil
  315. }
  316. // putObject special function used Google Cloud Storage. This special function
  317. // is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
  318. func (c Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
  319. // Input validation.
  320. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  321. return UploadInfo{}, err
  322. }
  323. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  324. return UploadInfo{}, err
  325. }
  326. // Size -1 is only supported on Google Cloud Storage, we error
  327. // out in all other situations.
  328. if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) {
  329. return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName)
  330. }
  331. if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 {
  332. return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'")
  333. }
  334. if size > 0 {
  335. if isReadAt(reader) && !isObject(reader) {
  336. seeker, ok := reader.(io.Seeker)
  337. if ok {
  338. offset, err := seeker.Seek(0, io.SeekCurrent)
  339. if err != nil {
  340. return UploadInfo{}, errInvalidArgument(err.Error())
  341. }
  342. reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
  343. }
  344. }
  345. }
  346. var md5Base64 string
  347. if opts.SendContentMd5 {
  348. // Create a buffer.
  349. buf := make([]byte, size)
  350. length, rErr := readFull(reader, buf)
  351. if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
  352. return UploadInfo{}, rErr
  353. }
  354. // Calculate md5sum.
  355. hash := c.md5Hasher()
  356. hash.Write(buf[:length])
  357. md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
  358. reader = bytes.NewReader(buf[:length])
  359. hash.Close()
  360. }
  361. // Update progress reader appropriately to the latest offset as we
  362. // read from the source.
  363. readSeeker := newHook(reader, opts.Progress)
  364. // This function does not calculate sha256 and md5sum for payload.
  365. // Execute put object.
  366. return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
  367. }
  368. // putObjectDo - executes the put object http operation.
  369. // NOTE: You must have WRITE permissions on a bucket to add an object to it.
  370. func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) {
  371. // Input validation.
  372. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  373. return UploadInfo{}, err
  374. }
  375. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  376. return UploadInfo{}, err
  377. }
  378. // Set headers.
  379. customHeader := opts.Header()
  380. // Populate request metadata.
  381. reqMetadata := requestMetadata{
  382. bucketName: bucketName,
  383. objectName: objectName,
  384. customHeader: customHeader,
  385. contentBody: reader,
  386. contentLength: size,
  387. contentMD5Base64: md5Base64,
  388. contentSHA256Hex: sha256Hex,
  389. }
  390. if opts.Internal.SourceVersionID != "" {
  391. if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
  392. return UploadInfo{}, errInvalidArgument(err.Error())
  393. }
  394. urlValues := make(url.Values)
  395. urlValues.Set("versionId", opts.Internal.SourceVersionID)
  396. reqMetadata.queryValues = urlValues
  397. }
  398. // Execute PUT an objectName.
  399. resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
  400. defer closeResponse(resp)
  401. if err != nil {
  402. return UploadInfo{}, err
  403. }
  404. if resp != nil {
  405. if resp.StatusCode != http.StatusOK {
  406. return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
  407. }
  408. }
  409. // extract lifecycle expiry date and rule ID
  410. expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
  411. return UploadInfo{
  412. Bucket: bucketName,
  413. Key: objectName,
  414. ETag: trimEtag(resp.Header.Get("ETag")),
  415. VersionID: resp.Header.Get(amzVersionID),
  416. Size: size,
  417. Expiration: expTime,
  418. ExpirationRuleID: ruleID,
  419. }, nil
  420. }