123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- /*
- * MinIO Go Library for Amazon S3 Compatible Cloud Storage
- * Copyright 2017, 2018 MinIO, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package minio
- import (
- "context"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "time"
- "github.com/google/uuid"
- "github.com/minio/minio-go/v7/pkg/encrypt"
- "github.com/minio/minio-go/v7/pkg/s3utils"
- )
- // CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs
- type CopyDestOptions struct {
- Bucket string // points to destination bucket
- Object string // points to destination object
- // `Encryption` is the key info for server-side-encryption with customer
- // provided key. If it is nil, no encryption is performed.
- Encryption encrypt.ServerSide
- // `userMeta` is the user-metadata key-value pairs to be set on the
- // destination. The keys are automatically prefixed with `x-amz-meta-`
- // if needed. If nil is passed, and if only a single source (of any
- // size) is provided in the ComposeObject call, then metadata from the
- // source is copied to the destination.
- // if no user-metadata is provided, it is copied from source
- // (when there is only once source object in the compose
- // request)
- UserMetadata map[string]string
- // UserMetadata is only set to destination if ReplaceMetadata is true
- // other value is UserMetadata is ignored and we preserve src.UserMetadata
- // NOTE: if you set this value to true and now metadata is present
- // in UserMetadata your destination object will not have any metadata
- // set.
- ReplaceMetadata bool
- // `userTags` is the user defined object tags to be set on destination.
- // This will be set only if the `replaceTags` field is set to true.
- // Otherwise this field is ignored
- UserTags map[string]string
- ReplaceTags bool
- // Specifies whether you want to apply a Legal Hold to the copied object.
- LegalHold LegalHoldStatus
- // Object Retention related fields
- Mode RetentionMode
- RetainUntilDate time.Time
- Size int64 // Needs to be specified if progress bar is specified.
- // Progress of the entire copy operation will be sent here.
- Progress io.Reader
- }
- // Process custom-metadata to remove a `x-amz-meta-` prefix if
- // present and validate that keys are distinct (after this
- // prefix removal).
- func filterCustomMeta(userMeta map[string]string) map[string]string {
- m := make(map[string]string)
- for k, v := range userMeta {
- if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
- k = k[len("x-amz-meta-"):]
- }
- if _, ok := m[k]; ok {
- continue
- }
- m[k] = v
- }
- return m
- }
- // Marshal converts all the CopyDestOptions into their
- // equivalent HTTP header representation
- func (opts CopyDestOptions) Marshal(header http.Header) {
- const replaceDirective = "REPLACE"
- if opts.ReplaceTags {
- header.Set(amzTaggingHeaderDirective, replaceDirective)
- if tags := s3utils.TagEncode(opts.UserTags); tags != "" {
- header.Set(amzTaggingHeader, tags)
- }
- }
- if opts.LegalHold != LegalHoldStatus("") {
- header.Set(amzLegalHoldHeader, opts.LegalHold.String())
- }
- if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() {
- header.Set(amzLockMode, opts.Mode.String())
- header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339))
- }
- if opts.Encryption != nil {
- opts.Encryption.Marshal(header)
- }
- if opts.ReplaceMetadata {
- header.Set("x-amz-metadata-directive", replaceDirective)
- for k, v := range filterCustomMeta(opts.UserMetadata) {
- if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
- header.Set(k, v)
- } else {
- header.Set("x-amz-meta-"+k, v)
- }
- }
- }
- }
- // toDestinationInfo returns a validated copyOptions object.
- func (opts CopyDestOptions) validate() (err error) {
- // Input validation.
- if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
- return err
- }
- if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
- return err
- }
- if opts.Progress != nil && opts.Size < 0 {
- return errInvalidArgument("For progress bar effective size needs to be specified")
- }
- return nil
- }
- // CopySrcOptions represents a source object to be copied, using
- // server-side copying APIs.
- type CopySrcOptions struct {
- Bucket, Object string
- VersionID string
- MatchETag string
- NoMatchETag string
- MatchModifiedSince time.Time
- MatchUnmodifiedSince time.Time
- MatchRange bool
- Start, End int64
- Encryption encrypt.ServerSide
- }
- // Marshal converts all the CopySrcOptions into their
- // equivalent HTTP header representation
- func (opts CopySrcOptions) Marshal(header http.Header) {
- // Set the source header
- header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object))
- if opts.VersionID != "" {
- header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID)
- }
- if opts.MatchETag != "" {
- header.Set("x-amz-copy-source-if-match", opts.MatchETag)
- }
- if opts.NoMatchETag != "" {
- header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag)
- }
- if !opts.MatchModifiedSince.IsZero() {
- header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat))
- }
- if !opts.MatchUnmodifiedSince.IsZero() {
- header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat))
- }
- if opts.Encryption != nil {
- encrypt.SSECopy(opts.Encryption).Marshal(header)
- }
- }
- func (opts CopySrcOptions) validate() (err error) {
- // Input validation.
- if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
- return err
- }
- if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
- return err
- }
- if opts.Start > opts.End || opts.Start < 0 {
- return errInvalidArgument("start must be non-negative, and start must be at most end.")
- }
- return nil
- }
- // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
- func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
- metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions) (ObjectInfo, error) {
- // Build headers.
- headers := make(http.Header)
- // Set all the metadata headers.
- for k, v := range metadata {
- headers.Set(k, v)
- }
- if !dstOpts.Internal.ReplicationStatus.Empty() {
- headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus))
- }
- if !dstOpts.Internal.SourceMTime.IsZero() {
- headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano))
- }
- if dstOpts.Internal.SourceETag != "" {
- headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag)
- }
- if dstOpts.Internal.ReplicationRequest {
- headers.Set(minIOBucketReplicationRequest, "")
- }
- if !dstOpts.Internal.LegalholdTimestamp.IsZero() {
- headers.Set(minIOBucketReplicationObjectLegalHoldTimestamp, dstOpts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
- }
- if !dstOpts.Internal.RetentionTimestamp.IsZero() {
- headers.Set(minIOBucketReplicationObjectRetentionTimestamp, dstOpts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
- }
- if !dstOpts.Internal.TaggingTimestamp.IsZero() {
- headers.Set(minIOBucketReplicationTaggingTimestamp, dstOpts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
- }
- if len(dstOpts.UserTags) != 0 {
- headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags))
- }
- reqMetadata := requestMetadata{
- bucketName: destBucket,
- objectName: destObject,
- customHeader: headers,
- }
- if dstOpts.Internal.SourceVersionID != "" {
- if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil {
- return ObjectInfo{}, errInvalidArgument(err.Error())
- }
- urlValues := make(url.Values)
- urlValues.Set("versionId", dstOpts.Internal.SourceVersionID)
- reqMetadata.queryValues = urlValues
- }
- // Set the source header
- headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
- if srcOpts.VersionID != "" {
- headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID)
- }
- // Send upload-part-copy request
- resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
- defer closeResponse(resp)
- if err != nil {
- return ObjectInfo{}, err
- }
- // Check if we got an error response.
- if resp.StatusCode != http.StatusOK {
- return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
- }
- cpObjRes := copyObjectResult{}
- err = xmlDecoder(resp.Body, &cpObjRes)
- if err != nil {
- return ObjectInfo{}, err
- }
- objInfo := ObjectInfo{
- Key: destObject,
- ETag: strings.Trim(cpObjRes.ETag, "\""),
- LastModified: cpObjRes.LastModified,
- }
- return objInfo, nil
- }
- func (c Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
- partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) {
- headers := make(http.Header)
- // Set source
- headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
- if startOffset < 0 {
- return p, errInvalidArgument("startOffset must be non-negative")
- }
- if length >= 0 {
- headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
- }
- for k, v := range metadata {
- headers.Set(k, v)
- }
- queryValues := make(url.Values)
- queryValues.Set("partNumber", strconv.Itoa(partID))
- queryValues.Set("uploadId", uploadID)
- resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
- bucketName: destBucket,
- objectName: destObject,
- customHeader: headers,
- queryValues: queryValues,
- })
- defer closeResponse(resp)
- if err != nil {
- return
- }
- // Check if we got an error response.
- if resp.StatusCode != http.StatusOK {
- return p, httpRespToErrorResponse(resp, destBucket, destObject)
- }
- // Decode copy-part response on success.
- cpObjRes := copyObjectResult{}
- err = xmlDecoder(resp.Body, &cpObjRes)
- if err != nil {
- return p, err
- }
- p.PartNumber, p.ETag = partID, cpObjRes.ETag
- return p, nil
- }
- // uploadPartCopy - helper function to create a part in a multipart
- // upload via an upload-part-copy request
- // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
- func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
- headers http.Header) (p CompletePart, err error) {
- // Build query parameters
- urlValues := make(url.Values)
- urlValues.Set("partNumber", strconv.Itoa(partNumber))
- urlValues.Set("uploadId", uploadID)
- // Send upload-part-copy request
- resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
- bucketName: bucket,
- objectName: object,
- customHeader: headers,
- queryValues: urlValues,
- })
- defer closeResponse(resp)
- if err != nil {
- return p, err
- }
- // Check if we got an error response.
- if resp.StatusCode != http.StatusOK {
- return p, httpRespToErrorResponse(resp, bucket, object)
- }
- // Decode copy-part response on success.
- cpObjRes := copyObjectResult{}
- err = xmlDecoder(resp.Body, &cpObjRes)
- if err != nil {
- return p, err
- }
- p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
- return p, nil
- }
- // ComposeObject - creates an object using server-side copying
- // of existing objects. It takes a list of source objects (with optional offsets)
- // and concatenates them into a new object using only server-side copying
- // operations. Optionally takes progress reader hook for applications to
- // look at current progress.
- func (c Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) {
- if len(srcs) < 1 || len(srcs) > maxPartsCount {
- return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.")
- }
- for _, src := range srcs {
- if err := src.validate(); err != nil {
- return UploadInfo{}, err
- }
- }
- if err := dst.validate(); err != nil {
- return UploadInfo{}, err
- }
- srcObjectInfos := make([]ObjectInfo, len(srcs))
- srcObjectSizes := make([]int64, len(srcs))
- var totalSize, totalParts int64
- var err error
- for i, src := range srcs {
- opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID}
- srcObjectInfos[i], err = c.statObject(context.Background(), src.Bucket, src.Object, opts)
- if err != nil {
- return UploadInfo{}, err
- }
- srcCopySize := srcObjectInfos[i].Size
- // Check if a segment is specified, and if so, is the
- // segment within object bounds?
- if src.MatchRange {
- // Since range is specified,
- // 0 <= src.start <= src.end
- // so only invalid case to check is:
- if src.End >= srcCopySize || src.Start < 0 {
- return UploadInfo{}, errInvalidArgument(
- fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)",
- i, src.Start, src.End, srcCopySize))
- }
- srcCopySize = src.End - src.Start + 1
- }
- // Only the last source may be less than `absMinPartSize`
- if srcCopySize < absMinPartSize && i < len(srcs)-1 {
- return UploadInfo{}, errInvalidArgument(
- fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize))
- }
- // Is data to copy too large?
- totalSize += srcCopySize
- if totalSize > maxMultipartPutObjectSize {
- return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
- }
- // record source size
- srcObjectSizes[i] = srcCopySize
- // calculate parts needed for current source
- totalParts += partsRequired(srcCopySize)
- // Do we need more parts than we are allowed?
- if totalParts > maxPartsCount {
- return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
- "Your proposed compose object requires more than %d parts", maxPartsCount))
- }
- }
- // Single source object case (i.e. when only one source is
- // involved, it is being copied wholly and at most 5GiB in
- // size, emptyfiles are also supported).
- if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
- return c.CopyObject(ctx, dst, srcs[0])
- }
- // Now, handle multipart-copy cases.
- // 1. Ensure that the object has not been changed while
- // we are copying data.
- for i, src := range srcs {
- src.MatchETag = srcObjectInfos[i].ETag
- }
- // 2. Initiate a new multipart upload.
- // Set user-metadata on the destination object. If no
- // user-metadata is specified, and there is only one source,
- // (only) then metadata from source is copied.
- var userMeta map[string]string
- if dst.ReplaceMetadata {
- userMeta = dst.UserMetadata
- } else {
- userMeta = srcObjectInfos[0].UserMetadata
- }
- var userTags map[string]string
- if dst.ReplaceTags {
- userTags = dst.UserTags
- } else {
- userTags = srcObjectInfos[0].UserTags
- }
- uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{
- ServerSideEncryption: dst.Encryption,
- UserMetadata: userMeta,
- UserTags: userTags,
- Mode: dst.Mode,
- RetainUntilDate: dst.RetainUntilDate,
- LegalHold: dst.LegalHold,
- })
- if err != nil {
- return UploadInfo{}, err
- }
- // 3. Perform copy part uploads
- objParts := []CompletePart{}
- partIndex := 1
- for i, src := range srcs {
- var h = make(http.Header)
- src.Marshal(h)
- if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC {
- dst.Encryption.Marshal(h)
- }
- // calculate start/end indices of parts after
- // splitting.
- startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
- for j, start := range startIdx {
- end := endIdx[j]
- // Add (or reset) source range header for
- // upload part copy request.
- h.Set("x-amz-copy-source-range",
- fmt.Sprintf("bytes=%d-%d", start, end))
- // make upload-part-copy request
- complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
- dst.Object, uploadID, partIndex, h)
- if err != nil {
- return UploadInfo{}, err
- }
- if dst.Progress != nil {
- io.CopyN(ioutil.Discard, dst.Progress, end-start+1)
- }
- objParts = append(objParts, complPart)
- partIndex++
- }
- }
- // 4. Make final complete-multipart request.
- uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
- completeMultipartUpload{Parts: objParts}, PutObjectOptions{})
- if err != nil {
- return UploadInfo{}, err
- }
- uploadInfo.Size = totalSize
- return uploadInfo, nil
- }
- // partsRequired is maximum parts possible with
- // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
- func partsRequired(size int64) int64 {
- maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
- r := size / int64(maxPartSize)
- if size%int64(maxPartSize) > 0 {
- r++
- }
- return r
- }
- // calculateEvenSplits - computes splits for a source and returns
- // start and end index slices. Splits happen evenly to be sure that no
- // part is less than 5MiB, as that could fail the multipart request if
- // it is not the last part.
- func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
- if size == 0 {
- return
- }
- reqParts := partsRequired(size)
- startIndex = make([]int64, reqParts)
- endIndex = make([]int64, reqParts)
- // Compute number of required parts `k`, as:
- //
- // k = ceiling(size / copyPartSize)
- //
- // Now, distribute the `size` bytes in the source into
- // k parts as evenly as possible:
- //
- // r parts sized (q+1) bytes, and
- // (k - r) parts sized q bytes, where
- //
- // size = q * k + r (by simple division of size by k,
- // so that 0 <= r < k)
- //
- start := src.Start
- if start == -1 {
- start = 0
- }
- quot, rem := size/reqParts, size%reqParts
- nextStart := start
- for j := int64(0); j < reqParts; j++ {
- curPartSize := quot
- if j < rem {
- curPartSize++
- }
- cStart := nextStart
- cEnd := cStart + curPartSize - 1
- nextStart = cEnd + 1
- startIndex[j], endIndex[j] = cStart, cEnd
- }
- return
- }
|