123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- package minio
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "sync"
- "github.com/minio/minio-go/v7/pkg/s3utils"
- )
- func (c Client) GetObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (*Object, error) {
-
- if err := s3utils.CheckValidBucketName(bucketName); err != nil {
- return nil, err
- }
- if err := s3utils.CheckValidObjectName(objectName); err != nil {
- return nil, err
- }
-
- var snowball bool
- if location, ok := c.bucketLocCache.Get(bucketName); ok {
- if location == "snowball" {
- snowball = true
- }
- }
- var (
- err error
- httpReader io.ReadCloser
- objectInfo ObjectInfo
- totalRead int
- )
-
- reqCh := make(chan getRequest)
-
- resCh := make(chan getResponse)
-
- doneCh := make(chan struct{})
-
- go func() {
- defer close(reqCh)
- defer close(resCh)
-
- var etag string
-
- for {
- select {
-
- case <-doneCh:
-
-
- if httpReader != nil {
- httpReader.Close()
- }
- return
-
- case req := <-reqCh:
-
- if req.isFirstReq {
-
- if req.isReadOp {
-
- if req.isReadAt {
-
-
-
-
- opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
- } else if req.Offset > 0 {
- opts.SetRange(req.Offset, 0)
- }
- httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
- if err != nil {
- resCh <- getResponse{Error: err}
- return
- }
- etag = objectInfo.ETag
-
-
- size, err := readFull(httpReader, req.Buffer)
- totalRead += size
- if size > 0 && err == io.ErrUnexpectedEOF {
- if int64(size) < objectInfo.Size {
-
-
-
-
- err = io.ErrUnexpectedEOF
- } else {
-
-
- err = io.EOF
- }
- } else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
-
-
-
-
- err = io.ErrUnexpectedEOF
- }
-
- resCh <- getResponse{
- objectInfo: objectInfo,
- Size: size,
- Error: err,
- didRead: true,
- }
- } else {
-
-
-
- delete(opts.headers, "Range")
- objectInfo, err = c.statObject(ctx, bucketName, objectName, StatObjectOptions(opts))
- if err != nil {
- resCh <- getResponse{
- Error: err,
- }
-
- return
- }
- etag = objectInfo.ETag
-
- resCh <- getResponse{
- objectInfo: objectInfo,
- }
- }
- } else if req.settingObjectInfo {
-
- delete(opts.headers, "Range")
-
-
-
- if etag != "" && !snowball {
- opts.SetMatchETag(etag)
- }
- objectInfo, err := c.statObject(ctx, bucketName, objectName, StatObjectOptions(opts))
- if err != nil {
- resCh <- getResponse{
- Error: err,
- }
-
- return
- }
-
- resCh <- getResponse{
- objectInfo: objectInfo,
- }
- } else {
-
-
-
-
-
-
- if req.DidOffsetChange || !req.beenRead {
-
-
-
- if etag != "" && !snowball {
- opts.SetMatchETag(etag)
- }
- if httpReader != nil {
-
- httpReader.Close()
- }
-
- if req.isReadAt {
-
- opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
- } else if req.Offset > 0 {
- opts.SetRange(req.Offset, 0)
- }
- httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
- if err != nil {
- resCh <- getResponse{
- Error: err,
- }
- return
- }
- totalRead = 0
- }
-
-
- size, err := readFull(httpReader, req.Buffer)
- totalRead += size
- if size > 0 && err == io.ErrUnexpectedEOF {
- if int64(totalRead) < objectInfo.Size {
-
-
-
-
- err = io.ErrUnexpectedEOF
- } else {
-
-
- err = io.EOF
- }
- } else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
-
-
-
-
- err = io.ErrUnexpectedEOF
- }
-
- resCh <- getResponse{
- Size: size,
- Error: err,
- didRead: true,
- objectInfo: objectInfo,
- }
- }
- }
- }
- }()
-
- return newObject(reqCh, resCh, doneCh), nil
- }
- type getRequest struct {
- Buffer []byte
- Offset int64
- DidOffsetChange bool
- beenRead bool
- isReadAt bool
- isReadOp bool
- isFirstReq bool
- settingObjectInfo bool
- }
- type getResponse struct {
- Size int
- Error error
- didRead bool
- objectInfo ObjectInfo
- }
- type Object struct {
-
- mutex *sync.Mutex
-
- reqCh chan<- getRequest
- resCh <-chan getResponse
- doneCh chan<- struct{}
- currOffset int64
- objectInfo ObjectInfo
-
- seekData bool
-
- isClosed bool
-
- isStarted bool
-
- prevErr error
-
- beenRead bool
-
- objectInfoSet bool
- }
- func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
- o.reqCh <- request
- response := <-o.resCh
-
- if response.Error != nil {
- return response, response.Error
- }
-
- if !o.isStarted {
-
- o.isStarted = true
- }
-
-
- if !o.objectInfoSet && !request.isReadAt {
- o.objectInfo = response.objectInfo
- o.objectInfoSet = true
- }
-
- if !o.beenRead {
- o.beenRead = response.didRead
- }
-
- o.seekData = false
- return response, nil
- }
- func (o *Object) setOffset(bytesRead int64) error {
-
- o.currOffset += bytesRead
- if o.objectInfo.Size > -1 && o.currOffset >= o.objectInfo.Size {
- return io.EOF
- }
- return nil
- }
- func (o *Object) Read(b []byte) (n int, err error) {
- if o == nil {
- return 0, errInvalidArgument("Object is nil")
- }
-
- o.mutex.Lock()
- defer o.mutex.Unlock()
-
- if o.prevErr != nil || o.isClosed {
- return 0, o.prevErr
- }
-
- readReq := getRequest{
- isReadOp: true,
- beenRead: o.beenRead,
- Buffer: b,
- }
-
- if !o.isStarted {
- readReq.isFirstReq = true
- }
-
- readReq.DidOffsetChange = o.seekData
- readReq.Offset = o.currOffset
-
- response, err := o.doGetRequest(readReq)
- if err != nil && err != io.EOF {
-
- o.prevErr = err
- return response.Size, err
- }
-
- bytesRead := int64(response.Size)
-
- oerr := o.setOffset(bytesRead)
- if oerr != nil {
-
- o.prevErr = oerr
- return response.Size, oerr
- }
-
- return response.Size, err
- }
- func (o *Object) Stat() (ObjectInfo, error) {
- if o == nil {
- return ObjectInfo{}, errInvalidArgument("Object is nil")
- }
-
- o.mutex.Lock()
- defer o.mutex.Unlock()
- if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
- return ObjectInfo{}, o.prevErr
- }
-
- if !o.isStarted || !o.objectInfoSet {
-
- _, err := o.doGetRequest(getRequest{
- isFirstReq: !o.isStarted,
- settingObjectInfo: !o.objectInfoSet,
- })
- if err != nil {
- o.prevErr = err
- return ObjectInfo{}, err
- }
- }
- return o.objectInfo, nil
- }
- func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
- if o == nil {
- return 0, errInvalidArgument("Object is nil")
- }
-
- o.mutex.Lock()
- defer o.mutex.Unlock()
-
- if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
- return 0, o.prevErr
- }
-
- o.currOffset = offset
-
- if o.objectInfoSet {
-
-
- if (o.objectInfo.Size > -1 && offset >= o.objectInfo.Size) || offset < 0 {
- return 0, io.EOF
- }
- }
-
- readAtReq := getRequest{
- isReadOp: true,
- isReadAt: true,
- DidOffsetChange: true,
- beenRead: o.beenRead,
- Offset: offset,
- Buffer: b,
- }
-
- if !o.isStarted {
- readAtReq.isFirstReq = true
- }
-
- response, err := o.doGetRequest(readAtReq)
- if err != nil && err != io.EOF {
-
- o.prevErr = err
- return response.Size, err
- }
-
- bytesRead := int64(response.Size)
-
-
- if !o.objectInfoSet {
-
- o.currOffset += bytesRead
- } else {
-
-
-
- oerr := o.setOffset(bytesRead)
- if oerr != nil {
- o.prevErr = oerr
- return response.Size, oerr
- }
- }
- return response.Size, err
- }
- func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
- if o == nil {
- return 0, errInvalidArgument("Object is nil")
- }
-
- o.mutex.Lock()
- defer o.mutex.Unlock()
-
- if o.prevErr != nil && o.prevErr != io.EOF {
- return 0, o.prevErr
- }
-
- if offset < 0 && whence != 2 {
- return 0, errInvalidArgument(fmt.Sprintf("Negative position not allowed for %d", whence))
- }
-
-
- if !o.isStarted || !o.objectInfoSet {
-
- seekReq := getRequest{
- isReadOp: false,
- Offset: offset,
- isFirstReq: true,
- }
-
- _, err := o.doGetRequest(seekReq)
- if err != nil {
-
- o.prevErr = err
- return 0, err
- }
- }
-
- switch whence {
- default:
- return 0, errInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
- case 0:
- if o.objectInfo.Size > -1 && offset > o.objectInfo.Size {
- return 0, io.EOF
- }
- o.currOffset = offset
- case 1:
- if o.objectInfo.Size > -1 && o.currOffset+offset > o.objectInfo.Size {
- return 0, io.EOF
- }
- o.currOffset += offset
- case 2:
-
- if o.objectInfo.Size < 0 {
- return 0, errInvalidArgument("Whence END is not supported when the object size is unknown")
- }
-
-
-
- if offset > 0 {
- return 0, io.EOF
- }
-
- if o.objectInfo.Size+offset < 0 {
- return 0, errInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
- }
- o.currOffset = o.objectInfo.Size + offset
- }
-
-
- if o.prevErr == io.EOF {
- o.prevErr = nil
- }
-
- o.seekData = true
-
- return o.currOffset, nil
- }
- func (o *Object) Close() (err error) {
- if o == nil {
- return errInvalidArgument("Object is nil")
- }
-
- o.mutex.Lock()
- defer o.mutex.Unlock()
-
- if o.isClosed {
- return o.prevErr
- }
-
- close(o.doneCh)
-
- errMsg := "Object is already closed. Bad file descriptor."
- o.prevErr = errors.New(errMsg)
-
- o.isClosed = true
- return nil
- }
- func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
- return &Object{
- mutex: &sync.Mutex{},
- reqCh: reqCh,
- resCh: resCh,
- doneCh: doneCh,
- }
- }
- func (c Client) getObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (io.ReadCloser, ObjectInfo, http.Header, error) {
-
- if err := s3utils.CheckValidBucketName(bucketName); err != nil {
- return nil, ObjectInfo{}, nil, err
- }
- if err := s3utils.CheckValidObjectName(objectName); err != nil {
- return nil, ObjectInfo{}, nil, err
- }
- urlValues := make(url.Values)
- if opts.VersionID != "" {
- urlValues.Set("versionId", opts.VersionID)
- }
-
- resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
- bucketName: bucketName,
- objectName: objectName,
- queryValues: urlValues,
- customHeader: opts.Header(),
- contentSHA256Hex: emptySHA256Hex,
- })
- if err != nil {
- return nil, ObjectInfo{}, nil, err
- }
- if resp != nil {
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
- return nil, ObjectInfo{}, nil, httpRespToErrorResponse(resp, bucketName, objectName)
- }
- }
- objectStat, err := ToObjectInfo(bucketName, objectName, resp.Header)
- if err != nil {
- closeResponse(resp)
- return nil, ObjectInfo{}, nil, err
- }
-
- return resp.Body, objectStat, resp.Header, nil
- }
|