api-put-object-multipart.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2015-2017 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/base64"
  22. "encoding/hex"
  23. "encoding/xml"
  24. "fmt"
  25. "io"
  26. "io/ioutil"
  27. "net/http"
  28. "net/url"
  29. "sort"
  30. "strconv"
  31. "strings"
  32. "github.com/google/uuid"
  33. "github.com/minio/minio-go/v7/pkg/encrypt"
  34. "github.com/minio/minio-go/v7/pkg/s3utils"
  35. )
  36. func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
  37. opts PutObjectOptions) (info UploadInfo, err error) {
  38. info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
  39. if err != nil {
  40. errResp := ToErrorResponse(err)
  41. // Verify if multipart functionality is not available, if not
  42. // fall back to single PutObject operation.
  43. if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
  44. // Verify if size of reader is greater than '5GiB'.
  45. if size > maxSinglePutObjectSize {
  46. return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
  47. }
  48. // Fall back to uploading as single PutObject operation.
  49. return c.putObject(ctx, bucketName, objectName, reader, size, opts)
  50. }
  51. }
  52. return info, err
  53. }
  54. func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
  55. // Input validation.
  56. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  57. return UploadInfo{}, err
  58. }
  59. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  60. return UploadInfo{}, err
  61. }
  62. // Total data read and written to server. should be equal to
  63. // 'size' at the end of the call.
  64. var totalUploadedSize int64
  65. // Complete multipart upload.
  66. var complMultipartUpload completeMultipartUpload
  67. // Calculate the optimal parts info for a given size.
  68. totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
  69. if err != nil {
  70. return UploadInfo{}, err
  71. }
  72. // Initiate a new multipart upload.
  73. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  74. if err != nil {
  75. return UploadInfo{}, err
  76. }
  77. defer func() {
  78. if err != nil {
  79. c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
  80. }
  81. }()
  82. // Part number always starts with '1'.
  83. partNumber := 1
  84. // Initialize parts uploaded map.
  85. partsInfo := make(map[int]ObjectPart)
  86. // Create a buffer.
  87. buf := make([]byte, partSize)
  88. for partNumber <= totalPartsCount {
  89. // Choose hash algorithms to be calculated by hashCopyN,
  90. // avoid sha256 with non-v4 signature request or
  91. // HTTPS connection.
  92. hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5)
  93. length, rErr := readFull(reader, buf)
  94. if rErr == io.EOF && partNumber > 1 {
  95. break
  96. }
  97. if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
  98. return UploadInfo{}, rErr
  99. }
  100. // Calculates hash sums while copying partSize bytes into cw.
  101. for k, v := range hashAlgos {
  102. v.Write(buf[:length])
  103. hashSums[k] = v.Sum(nil)
  104. v.Close()
  105. }
  106. // Update progress reader appropriately to the latest offset
  107. // as we read from the source.
  108. rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
  109. // Checksums..
  110. var (
  111. md5Base64 string
  112. sha256Hex string
  113. )
  114. if hashSums["md5"] != nil {
  115. md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
  116. }
  117. if hashSums["sha256"] != nil {
  118. sha256Hex = hex.EncodeToString(hashSums["sha256"])
  119. }
  120. // Proceed to upload the part.
  121. objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
  122. md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
  123. if uerr != nil {
  124. return UploadInfo{}, uerr
  125. }
  126. // Save successfully uploaded part metadata.
  127. partsInfo[partNumber] = objPart
  128. // Save successfully uploaded size.
  129. totalUploadedSize += int64(length)
  130. // Increment part number.
  131. partNumber++
  132. // For unknown size, Read EOF we break away.
  133. // We do not have to upload till totalPartsCount.
  134. if rErr == io.EOF {
  135. break
  136. }
  137. }
  138. // Loop over total uploaded parts to save them in
  139. // Parts array before completing the multipart request.
  140. for i := 1; i < partNumber; i++ {
  141. part, ok := partsInfo[i]
  142. if !ok {
  143. return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
  144. }
  145. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  146. ETag: part.ETag,
  147. PartNumber: part.PartNumber,
  148. })
  149. }
  150. // Sort all completed parts.
  151. sort.Sort(completedParts(complMultipartUpload.Parts))
  152. uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
  153. if err != nil {
  154. return UploadInfo{}, err
  155. }
  156. uploadInfo.Size = totalUploadedSize
  157. return uploadInfo, nil
  158. }
  159. // initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
  160. func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
  161. // Input validation.
  162. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  163. return initiateMultipartUploadResult{}, err
  164. }
  165. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  166. return initiateMultipartUploadResult{}, err
  167. }
  168. // Initialize url queries.
  169. urlValues := make(url.Values)
  170. urlValues.Set("uploads", "")
  171. if opts.Internal.SourceVersionID != "" {
  172. if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
  173. return initiateMultipartUploadResult{}, errInvalidArgument(err.Error())
  174. }
  175. urlValues.Set("versionId", opts.Internal.SourceVersionID)
  176. }
  177. // Set ContentType header.
  178. customHeader := opts.Header()
  179. reqMetadata := requestMetadata{
  180. bucketName: bucketName,
  181. objectName: objectName,
  182. queryValues: urlValues,
  183. customHeader: customHeader,
  184. }
  185. // Execute POST on an objectName to initiate multipart upload.
  186. resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
  187. defer closeResponse(resp)
  188. if err != nil {
  189. return initiateMultipartUploadResult{}, err
  190. }
  191. if resp != nil {
  192. if resp.StatusCode != http.StatusOK {
  193. return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
  194. }
  195. }
  196. // Decode xml for new multipart upload.
  197. initiateMultipartUploadResult := initiateMultipartUploadResult{}
  198. err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
  199. if err != nil {
  200. return initiateMultipartUploadResult, err
  201. }
  202. return initiateMultipartUploadResult, nil
  203. }
  204. // uploadPart - Uploads a part in a multipart upload.
  205. func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
  206. partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) {
  207. // Input validation.
  208. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  209. return ObjectPart{}, err
  210. }
  211. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  212. return ObjectPart{}, err
  213. }
  214. if size > maxPartSize {
  215. return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName)
  216. }
  217. if size <= -1 {
  218. return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName)
  219. }
  220. if partNumber <= 0 {
  221. return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
  222. }
  223. if uploadID == "" {
  224. return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
  225. }
  226. // Get resources properly escaped and lined up before using them in http request.
  227. urlValues := make(url.Values)
  228. // Set part number.
  229. urlValues.Set("partNumber", strconv.Itoa(partNumber))
  230. // Set upload id.
  231. urlValues.Set("uploadId", uploadID)
  232. // Set encryption headers, if any.
  233. customHeader := make(http.Header)
  234. // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
  235. // Server-side encryption is supported by the S3 Multipart Upload actions.
  236. // Unless you are using a customer-provided encryption key, you don't need
  237. // to specify the encryption parameters in each UploadPart request.
  238. if sse != nil && sse.Type() == encrypt.SSEC {
  239. sse.Marshal(customHeader)
  240. }
  241. reqMetadata := requestMetadata{
  242. bucketName: bucketName,
  243. objectName: objectName,
  244. queryValues: urlValues,
  245. customHeader: customHeader,
  246. contentBody: reader,
  247. contentLength: size,
  248. contentMD5Base64: md5Base64,
  249. contentSHA256Hex: sha256Hex,
  250. }
  251. // Execute PUT on each part.
  252. resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
  253. defer closeResponse(resp)
  254. if err != nil {
  255. return ObjectPart{}, err
  256. }
  257. if resp != nil {
  258. if resp.StatusCode != http.StatusOK {
  259. return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
  260. }
  261. }
  262. // Once successfully uploaded, return completed part.
  263. objPart := ObjectPart{}
  264. objPart.Size = size
  265. objPart.PartNumber = partNumber
  266. // Trim off the odd double quotes from ETag in the beginning and end.
  267. objPart.ETag = trimEtag(resp.Header.Get("ETag"))
  268. return objPart, nil
  269. }
  270. // completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
  271. func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
  272. complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error) {
  273. // Input validation.
  274. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  275. return UploadInfo{}, err
  276. }
  277. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  278. return UploadInfo{}, err
  279. }
  280. // Initialize url queries.
  281. urlValues := make(url.Values)
  282. urlValues.Set("uploadId", uploadID)
  283. // Marshal complete multipart body.
  284. completeMultipartUploadBytes, err := xml.Marshal(complete)
  285. if err != nil {
  286. return UploadInfo{}, err
  287. }
  288. // Instantiate all the complete multipart buffer.
  289. completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
  290. reqMetadata := requestMetadata{
  291. bucketName: bucketName,
  292. objectName: objectName,
  293. queryValues: urlValues,
  294. contentBody: completeMultipartUploadBuffer,
  295. contentLength: int64(len(completeMultipartUploadBytes)),
  296. contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
  297. customHeader: opts.Header(),
  298. }
  299. // Execute POST to complete multipart upload for an objectName.
  300. resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
  301. defer closeResponse(resp)
  302. if err != nil {
  303. return UploadInfo{}, err
  304. }
  305. if resp != nil {
  306. if resp.StatusCode != http.StatusOK {
  307. return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
  308. }
  309. }
  310. // Read resp.Body into a []bytes to parse for Error response inside the body
  311. var b []byte
  312. b, err = ioutil.ReadAll(resp.Body)
  313. if err != nil {
  314. return UploadInfo{}, err
  315. }
  316. // Decode completed multipart upload response on success.
  317. completeMultipartUploadResult := completeMultipartUploadResult{}
  318. err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
  319. if err != nil {
  320. // xml parsing failure due to presence an ill-formed xml fragment
  321. return UploadInfo{}, err
  322. } else if completeMultipartUploadResult.Bucket == "" {
  323. // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
  324. // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
  325. // of the members.
  326. // Decode completed multipart upload response on failure
  327. completeMultipartUploadErr := ErrorResponse{}
  328. err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
  329. if err != nil {
  330. // xml parsing failure due to presence an ill-formed xml fragment
  331. return UploadInfo{}, err
  332. }
  333. return UploadInfo{}, completeMultipartUploadErr
  334. }
  335. // extract lifecycle expiry date and rule ID
  336. expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
  337. return UploadInfo{
  338. Bucket: completeMultipartUploadResult.Bucket,
  339. Key: completeMultipartUploadResult.Key,
  340. ETag: trimEtag(completeMultipartUploadResult.ETag),
  341. VersionID: resp.Header.Get(amzVersionID),
  342. Location: completeMultipartUploadResult.Location,
  343. Expiration: expTime,
  344. ExpirationRuleID: ruleID,
  345. }, nil
  346. }