api-compose-object.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2017, 2018 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "context"
  20. "fmt"
  21. "io"
  22. "io/ioutil"
  23. "net/http"
  24. "net/url"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "github.com/google/uuid"
  29. "github.com/minio/minio-go/v7/pkg/encrypt"
  30. "github.com/minio/minio-go/v7/pkg/s3utils"
  31. )
  32. // CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs
  33. type CopyDestOptions struct {
  34. Bucket string // points to destination bucket
  35. Object string // points to destination object
  36. // `Encryption` is the key info for server-side-encryption with customer
  37. // provided key. If it is nil, no encryption is performed.
  38. Encryption encrypt.ServerSide
  39. // `userMeta` is the user-metadata key-value pairs to be set on the
  40. // destination. The keys are automatically prefixed with `x-amz-meta-`
  41. // if needed. If nil is passed, and if only a single source (of any
  42. // size) is provided in the ComposeObject call, then metadata from the
  43. // source is copied to the destination.
  44. // if no user-metadata is provided, it is copied from source
  45. // (when there is only once source object in the compose
  46. // request)
  47. UserMetadata map[string]string
  48. // UserMetadata is only set to destination if ReplaceMetadata is true
  49. // other value is UserMetadata is ignored and we preserve src.UserMetadata
  50. // NOTE: if you set this value to true and now metadata is present
  51. // in UserMetadata your destination object will not have any metadata
  52. // set.
  53. ReplaceMetadata bool
  54. // `userTags` is the user defined object tags to be set on destination.
  55. // This will be set only if the `replaceTags` field is set to true.
  56. // Otherwise this field is ignored
  57. UserTags map[string]string
  58. ReplaceTags bool
  59. // Specifies whether you want to apply a Legal Hold to the copied object.
  60. LegalHold LegalHoldStatus
  61. // Object Retention related fields
  62. Mode RetentionMode
  63. RetainUntilDate time.Time
  64. Size int64 // Needs to be specified if progress bar is specified.
  65. // Progress of the entire copy operation will be sent here.
  66. Progress io.Reader
  67. }
  68. // Process custom-metadata to remove a `x-amz-meta-` prefix if
  69. // present and validate that keys are distinct (after this
  70. // prefix removal).
  71. func filterCustomMeta(userMeta map[string]string) map[string]string {
  72. m := make(map[string]string)
  73. for k, v := range userMeta {
  74. if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
  75. k = k[len("x-amz-meta-"):]
  76. }
  77. if _, ok := m[k]; ok {
  78. continue
  79. }
  80. m[k] = v
  81. }
  82. return m
  83. }
  84. // Marshal converts all the CopyDestOptions into their
  85. // equivalent HTTP header representation
  86. func (opts CopyDestOptions) Marshal(header http.Header) {
  87. const replaceDirective = "REPLACE"
  88. if opts.ReplaceTags {
  89. header.Set(amzTaggingHeaderDirective, replaceDirective)
  90. if tags := s3utils.TagEncode(opts.UserTags); tags != "" {
  91. header.Set(amzTaggingHeader, tags)
  92. }
  93. }
  94. if opts.LegalHold != LegalHoldStatus("") {
  95. header.Set(amzLegalHoldHeader, opts.LegalHold.String())
  96. }
  97. if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() {
  98. header.Set(amzLockMode, opts.Mode.String())
  99. header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339))
  100. }
  101. if opts.Encryption != nil {
  102. opts.Encryption.Marshal(header)
  103. }
  104. if opts.ReplaceMetadata {
  105. header.Set("x-amz-metadata-directive", replaceDirective)
  106. for k, v := range filterCustomMeta(opts.UserMetadata) {
  107. if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
  108. header.Set(k, v)
  109. } else {
  110. header.Set("x-amz-meta-"+k, v)
  111. }
  112. }
  113. }
  114. }
  115. // toDestinationInfo returns a validated copyOptions object.
  116. func (opts CopyDestOptions) validate() (err error) {
  117. // Input validation.
  118. if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
  119. return err
  120. }
  121. if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
  122. return err
  123. }
  124. if opts.Progress != nil && opts.Size < 0 {
  125. return errInvalidArgument("For progress bar effective size needs to be specified")
  126. }
  127. return nil
  128. }
  129. // CopySrcOptions represents a source object to be copied, using
  130. // server-side copying APIs.
  131. type CopySrcOptions struct {
  132. Bucket, Object string
  133. VersionID string
  134. MatchETag string
  135. NoMatchETag string
  136. MatchModifiedSince time.Time
  137. MatchUnmodifiedSince time.Time
  138. MatchRange bool
  139. Start, End int64
  140. Encryption encrypt.ServerSide
  141. }
  142. // Marshal converts all the CopySrcOptions into their
  143. // equivalent HTTP header representation
  144. func (opts CopySrcOptions) Marshal(header http.Header) {
  145. // Set the source header
  146. header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object))
  147. if opts.VersionID != "" {
  148. header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID)
  149. }
  150. if opts.MatchETag != "" {
  151. header.Set("x-amz-copy-source-if-match", opts.MatchETag)
  152. }
  153. if opts.NoMatchETag != "" {
  154. header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag)
  155. }
  156. if !opts.MatchModifiedSince.IsZero() {
  157. header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat))
  158. }
  159. if !opts.MatchUnmodifiedSince.IsZero() {
  160. header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat))
  161. }
  162. if opts.Encryption != nil {
  163. encrypt.SSECopy(opts.Encryption).Marshal(header)
  164. }
  165. }
  166. func (opts CopySrcOptions) validate() (err error) {
  167. // Input validation.
  168. if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
  169. return err
  170. }
  171. if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
  172. return err
  173. }
  174. if opts.Start > opts.End || opts.Start < 0 {
  175. return errInvalidArgument("start must be non-negative, and start must be at most end.")
  176. }
  177. return nil
  178. }
  179. // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
  180. func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
  181. metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions) (ObjectInfo, error) {
  182. // Build headers.
  183. headers := make(http.Header)
  184. // Set all the metadata headers.
  185. for k, v := range metadata {
  186. headers.Set(k, v)
  187. }
  188. if !dstOpts.Internal.ReplicationStatus.Empty() {
  189. headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus))
  190. }
  191. if !dstOpts.Internal.SourceMTime.IsZero() {
  192. headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano))
  193. }
  194. if dstOpts.Internal.SourceETag != "" {
  195. headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag)
  196. }
  197. if dstOpts.Internal.ReplicationRequest {
  198. headers.Set(minIOBucketReplicationRequest, "")
  199. }
  200. if !dstOpts.Internal.LegalholdTimestamp.IsZero() {
  201. headers.Set(minIOBucketReplicationObjectLegalHoldTimestamp, dstOpts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
  202. }
  203. if !dstOpts.Internal.RetentionTimestamp.IsZero() {
  204. headers.Set(minIOBucketReplicationObjectRetentionTimestamp, dstOpts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
  205. }
  206. if !dstOpts.Internal.TaggingTimestamp.IsZero() {
  207. headers.Set(minIOBucketReplicationTaggingTimestamp, dstOpts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
  208. }
  209. if len(dstOpts.UserTags) != 0 {
  210. headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags))
  211. }
  212. reqMetadata := requestMetadata{
  213. bucketName: destBucket,
  214. objectName: destObject,
  215. customHeader: headers,
  216. }
  217. if dstOpts.Internal.SourceVersionID != "" {
  218. if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil {
  219. return ObjectInfo{}, errInvalidArgument(err.Error())
  220. }
  221. urlValues := make(url.Values)
  222. urlValues.Set("versionId", dstOpts.Internal.SourceVersionID)
  223. reqMetadata.queryValues = urlValues
  224. }
  225. // Set the source header
  226. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  227. if srcOpts.VersionID != "" {
  228. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID)
  229. }
  230. // Send upload-part-copy request
  231. resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
  232. defer closeResponse(resp)
  233. if err != nil {
  234. return ObjectInfo{}, err
  235. }
  236. // Check if we got an error response.
  237. if resp.StatusCode != http.StatusOK {
  238. return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
  239. }
  240. cpObjRes := copyObjectResult{}
  241. err = xmlDecoder(resp.Body, &cpObjRes)
  242. if err != nil {
  243. return ObjectInfo{}, err
  244. }
  245. objInfo := ObjectInfo{
  246. Key: destObject,
  247. ETag: strings.Trim(cpObjRes.ETag, "\""),
  248. LastModified: cpObjRes.LastModified,
  249. }
  250. return objInfo, nil
  251. }
  252. func (c Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
  253. partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) {
  254. headers := make(http.Header)
  255. // Set source
  256. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  257. if startOffset < 0 {
  258. return p, errInvalidArgument("startOffset must be non-negative")
  259. }
  260. if length >= 0 {
  261. headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
  262. }
  263. for k, v := range metadata {
  264. headers.Set(k, v)
  265. }
  266. queryValues := make(url.Values)
  267. queryValues.Set("partNumber", strconv.Itoa(partID))
  268. queryValues.Set("uploadId", uploadID)
  269. resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
  270. bucketName: destBucket,
  271. objectName: destObject,
  272. customHeader: headers,
  273. queryValues: queryValues,
  274. })
  275. defer closeResponse(resp)
  276. if err != nil {
  277. return
  278. }
  279. // Check if we got an error response.
  280. if resp.StatusCode != http.StatusOK {
  281. return p, httpRespToErrorResponse(resp, destBucket, destObject)
  282. }
  283. // Decode copy-part response on success.
  284. cpObjRes := copyObjectResult{}
  285. err = xmlDecoder(resp.Body, &cpObjRes)
  286. if err != nil {
  287. return p, err
  288. }
  289. p.PartNumber, p.ETag = partID, cpObjRes.ETag
  290. return p, nil
  291. }
  292. // uploadPartCopy - helper function to create a part in a multipart
  293. // upload via an upload-part-copy request
  294. // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
  295. func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
  296. headers http.Header) (p CompletePart, err error) {
  297. // Build query parameters
  298. urlValues := make(url.Values)
  299. urlValues.Set("partNumber", strconv.Itoa(partNumber))
  300. urlValues.Set("uploadId", uploadID)
  301. // Send upload-part-copy request
  302. resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
  303. bucketName: bucket,
  304. objectName: object,
  305. customHeader: headers,
  306. queryValues: urlValues,
  307. })
  308. defer closeResponse(resp)
  309. if err != nil {
  310. return p, err
  311. }
  312. // Check if we got an error response.
  313. if resp.StatusCode != http.StatusOK {
  314. return p, httpRespToErrorResponse(resp, bucket, object)
  315. }
  316. // Decode copy-part response on success.
  317. cpObjRes := copyObjectResult{}
  318. err = xmlDecoder(resp.Body, &cpObjRes)
  319. if err != nil {
  320. return p, err
  321. }
  322. p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
  323. return p, nil
  324. }
  325. // ComposeObject - creates an object using server-side copying
  326. // of existing objects. It takes a list of source objects (with optional offsets)
  327. // and concatenates them into a new object using only server-side copying
  328. // operations. Optionally takes progress reader hook for applications to
  329. // look at current progress.
  330. func (c Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) {
  331. if len(srcs) < 1 || len(srcs) > maxPartsCount {
  332. return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.")
  333. }
  334. for _, src := range srcs {
  335. if err := src.validate(); err != nil {
  336. return UploadInfo{}, err
  337. }
  338. }
  339. if err := dst.validate(); err != nil {
  340. return UploadInfo{}, err
  341. }
  342. srcObjectInfos := make([]ObjectInfo, len(srcs))
  343. srcObjectSizes := make([]int64, len(srcs))
  344. var totalSize, totalParts int64
  345. var err error
  346. for i, src := range srcs {
  347. opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID}
  348. srcObjectInfos[i], err = c.statObject(context.Background(), src.Bucket, src.Object, opts)
  349. if err != nil {
  350. return UploadInfo{}, err
  351. }
  352. srcCopySize := srcObjectInfos[i].Size
  353. // Check if a segment is specified, and if so, is the
  354. // segment within object bounds?
  355. if src.MatchRange {
  356. // Since range is specified,
  357. // 0 <= src.start <= src.end
  358. // so only invalid case to check is:
  359. if src.End >= srcCopySize || src.Start < 0 {
  360. return UploadInfo{}, errInvalidArgument(
  361. fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)",
  362. i, src.Start, src.End, srcCopySize))
  363. }
  364. srcCopySize = src.End - src.Start + 1
  365. }
  366. // Only the last source may be less than `absMinPartSize`
  367. if srcCopySize < absMinPartSize && i < len(srcs)-1 {
  368. return UploadInfo{}, errInvalidArgument(
  369. fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize))
  370. }
  371. // Is data to copy too large?
  372. totalSize += srcCopySize
  373. if totalSize > maxMultipartPutObjectSize {
  374. return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
  375. }
  376. // record source size
  377. srcObjectSizes[i] = srcCopySize
  378. // calculate parts needed for current source
  379. totalParts += partsRequired(srcCopySize)
  380. // Do we need more parts than we are allowed?
  381. if totalParts > maxPartsCount {
  382. return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
  383. "Your proposed compose object requires more than %d parts", maxPartsCount))
  384. }
  385. }
  386. // Single source object case (i.e. when only one source is
  387. // involved, it is being copied wholly and at most 5GiB in
  388. // size, emptyfiles are also supported).
  389. if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
  390. return c.CopyObject(ctx, dst, srcs[0])
  391. }
  392. // Now, handle multipart-copy cases.
  393. // 1. Ensure that the object has not been changed while
  394. // we are copying data.
  395. for i, src := range srcs {
  396. src.MatchETag = srcObjectInfos[i].ETag
  397. }
  398. // 2. Initiate a new multipart upload.
  399. // Set user-metadata on the destination object. If no
  400. // user-metadata is specified, and there is only one source,
  401. // (only) then metadata from source is copied.
  402. var userMeta map[string]string
  403. if dst.ReplaceMetadata {
  404. userMeta = dst.UserMetadata
  405. } else {
  406. userMeta = srcObjectInfos[0].UserMetadata
  407. }
  408. var userTags map[string]string
  409. if dst.ReplaceTags {
  410. userTags = dst.UserTags
  411. } else {
  412. userTags = srcObjectInfos[0].UserTags
  413. }
  414. uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{
  415. ServerSideEncryption: dst.Encryption,
  416. UserMetadata: userMeta,
  417. UserTags: userTags,
  418. Mode: dst.Mode,
  419. RetainUntilDate: dst.RetainUntilDate,
  420. LegalHold: dst.LegalHold,
  421. })
  422. if err != nil {
  423. return UploadInfo{}, err
  424. }
  425. // 3. Perform copy part uploads
  426. objParts := []CompletePart{}
  427. partIndex := 1
  428. for i, src := range srcs {
  429. var h = make(http.Header)
  430. src.Marshal(h)
  431. if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC {
  432. dst.Encryption.Marshal(h)
  433. }
  434. // calculate start/end indices of parts after
  435. // splitting.
  436. startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
  437. for j, start := range startIdx {
  438. end := endIdx[j]
  439. // Add (or reset) source range header for
  440. // upload part copy request.
  441. h.Set("x-amz-copy-source-range",
  442. fmt.Sprintf("bytes=%d-%d", start, end))
  443. // make upload-part-copy request
  444. complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
  445. dst.Object, uploadID, partIndex, h)
  446. if err != nil {
  447. return UploadInfo{}, err
  448. }
  449. if dst.Progress != nil {
  450. io.CopyN(ioutil.Discard, dst.Progress, end-start+1)
  451. }
  452. objParts = append(objParts, complPart)
  453. partIndex++
  454. }
  455. }
  456. // 4. Make final complete-multipart request.
  457. uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
  458. completeMultipartUpload{Parts: objParts}, PutObjectOptions{})
  459. if err != nil {
  460. return UploadInfo{}, err
  461. }
  462. uploadInfo.Size = totalSize
  463. return uploadInfo, nil
  464. }
  465. // partsRequired is maximum parts possible with
  466. // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
  467. func partsRequired(size int64) int64 {
  468. maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
  469. r := size / int64(maxPartSize)
  470. if size%int64(maxPartSize) > 0 {
  471. r++
  472. }
  473. return r
  474. }
  475. // calculateEvenSplits - computes splits for a source and returns
  476. // start and end index slices. Splits happen evenly to be sure that no
  477. // part is less than 5MiB, as that could fail the multipart request if
  478. // it is not the last part.
  479. func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
  480. if size == 0 {
  481. return
  482. }
  483. reqParts := partsRequired(size)
  484. startIndex = make([]int64, reqParts)
  485. endIndex = make([]int64, reqParts)
  486. // Compute number of required parts `k`, as:
  487. //
  488. // k = ceiling(size / copyPartSize)
  489. //
  490. // Now, distribute the `size` bytes in the source into
  491. // k parts as evenly as possible:
  492. //
  493. // r parts sized (q+1) bytes, and
  494. // (k - r) parts sized q bytes, where
  495. //
  496. // size = q * k + r (by simple division of size by k,
  497. // so that 0 <= r < k)
  498. //
  499. start := src.Start
  500. if start == -1 {
  501. start = 0
  502. }
  503. quot, rem := size/reqParts, size%reqParts
  504. nextStart := start
  505. for j := int64(0); j < reqParts; j++ {
  506. curPartSize := quot
  507. if j < rem {
  508. curPartSize++
  509. }
  510. cStart := nextStart
  511. cEnd := cStart + curPartSize - 1
  512. nextStart = cEnd + 1
  513. startIndex[j], endIndex[j] = cStart, cEnd
  514. }
  515. return
  516. }