api-bucket-replication.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2020 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/json"
  22. "encoding/xml"
  23. "io/ioutil"
  24. "net/http"
  25. "net/url"
  26. "time"
  27. "github.com/google/uuid"
  28. "github.com/minio/minio-go/v7/pkg/replication"
  29. "github.com/minio/minio-go/v7/pkg/s3utils"
  30. )
  31. // RemoveBucketReplication removes a replication config on an existing bucket.
  32. func (c Client) RemoveBucketReplication(ctx context.Context, bucketName string) error {
  33. return c.removeBucketReplication(ctx, bucketName)
  34. }
  35. // SetBucketReplication sets a replication config on an existing bucket.
  36. func (c Client) SetBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
  37. // Input validation.
  38. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  39. return err
  40. }
  41. // If replication is empty then delete it.
  42. if cfg.Empty() {
  43. return c.removeBucketReplication(ctx, bucketName)
  44. }
  45. // Save the updated replication.
  46. return c.putBucketReplication(ctx, bucketName, cfg)
  47. }
  48. // Saves a new bucket replication.
  49. func (c Client) putBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
  50. // Get resources properly escaped and lined up before
  51. // using them in http request.
  52. urlValues := make(url.Values)
  53. urlValues.Set("replication", "")
  54. replication, err := xml.Marshal(cfg)
  55. if err != nil {
  56. return err
  57. }
  58. reqMetadata := requestMetadata{
  59. bucketName: bucketName,
  60. queryValues: urlValues,
  61. contentBody: bytes.NewReader(replication),
  62. contentLength: int64(len(replication)),
  63. contentMD5Base64: sumMD5Base64(replication),
  64. }
  65. // Execute PUT to upload a new bucket replication config.
  66. resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
  67. defer closeResponse(resp)
  68. if err != nil {
  69. return err
  70. }
  71. if resp.StatusCode != http.StatusOK {
  72. return httpRespToErrorResponse(resp, bucketName, "")
  73. }
  74. return nil
  75. }
  76. // Remove replication from a bucket.
  77. func (c Client) removeBucketReplication(ctx context.Context, bucketName string) error {
  78. // Get resources properly escaped and lined up before
  79. // using them in http request.
  80. urlValues := make(url.Values)
  81. urlValues.Set("replication", "")
  82. // Execute DELETE on objectName.
  83. resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
  84. bucketName: bucketName,
  85. queryValues: urlValues,
  86. contentSHA256Hex: emptySHA256Hex,
  87. })
  88. defer closeResponse(resp)
  89. if err != nil {
  90. return err
  91. }
  92. return nil
  93. }
  94. // GetBucketReplication fetches bucket replication configuration.If config is not
  95. // found, returns empty config with nil error.
  96. func (c Client) GetBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
  97. // Input validation.
  98. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  99. return cfg, err
  100. }
  101. bucketReplicationCfg, err := c.getBucketReplication(ctx, bucketName)
  102. if err != nil {
  103. errResponse := ToErrorResponse(err)
  104. if errResponse.Code == "ReplicationConfigurationNotFoundError" {
  105. return cfg, nil
  106. }
  107. return cfg, err
  108. }
  109. return bucketReplicationCfg, nil
  110. }
  111. // Request server for current bucket replication config.
  112. func (c Client) getBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
  113. // Get resources properly escaped and lined up before
  114. // using them in http request.
  115. urlValues := make(url.Values)
  116. urlValues.Set("replication", "")
  117. // Execute GET on bucket to get replication config.
  118. resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
  119. bucketName: bucketName,
  120. queryValues: urlValues,
  121. })
  122. defer closeResponse(resp)
  123. if err != nil {
  124. return cfg, err
  125. }
  126. if resp.StatusCode != http.StatusOK {
  127. return cfg, httpRespToErrorResponse(resp, bucketName, "")
  128. }
  129. if err = xmlDecoder(resp.Body, &cfg); err != nil {
  130. return cfg, err
  131. }
  132. return cfg, nil
  133. }
  134. // GetBucketReplicationMetrics fetches bucket replication status metrics
  135. func (c Client) GetBucketReplicationMetrics(ctx context.Context, bucketName string) (s replication.Metrics, err error) {
  136. // Input validation.
  137. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  138. return s, err
  139. }
  140. // Get resources properly escaped and lined up before
  141. // using them in http request.
  142. urlValues := make(url.Values)
  143. urlValues.Set("replication-metrics", "")
  144. // Execute GET on bucket to get replication config.
  145. resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
  146. bucketName: bucketName,
  147. queryValues: urlValues,
  148. })
  149. defer closeResponse(resp)
  150. if err != nil {
  151. return s, err
  152. }
  153. if resp.StatusCode != http.StatusOK {
  154. return s, httpRespToErrorResponse(resp, bucketName, "")
  155. }
  156. respBytes, err := ioutil.ReadAll(resp.Body)
  157. if err != nil {
  158. return s, err
  159. }
  160. if err := json.Unmarshal(respBytes, &s); err != nil {
  161. return s, err
  162. }
  163. return s, nil
  164. }
  165. // mustGetUUID - get a random UUID.
  166. func mustGetUUID() string {
  167. u, err := uuid.NewRandom()
  168. if err != nil {
  169. return ""
  170. }
  171. return u.String()
  172. }
  173. // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
  174. // is enabled in the replication config
  175. func (c Client) ResetBucketReplication(ctx context.Context, bucketName string, olderThan time.Duration) (rID string, err error) {
  176. rID = mustGetUUID()
  177. _, err = c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, "", rID)
  178. if err != nil {
  179. return rID, err
  180. }
  181. return rID, nil
  182. }
  183. // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
  184. // is enabled in the replication config
  185. func (c Client) ResetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string) (rinfo replication.ResyncTargetsInfo, err error) {
  186. rID := mustGetUUID()
  187. return c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, tgtArn, rID)
  188. }
  189. // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
  190. // is enabled in the replication config
  191. func (c Client) resetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string, resetID string) (rinfo replication.ResyncTargetsInfo, err error) {
  192. // Input validation.
  193. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  194. return
  195. }
  196. // Get resources properly escaped and lined up before
  197. // using them in http request.
  198. urlValues := make(url.Values)
  199. urlValues.Set("replication-reset", "")
  200. if olderThan > 0 {
  201. urlValues.Set("older-than", olderThan.String())
  202. }
  203. if tgtArn != "" {
  204. urlValues.Set("arn", tgtArn)
  205. }
  206. urlValues.Set("reset-id", resetID)
  207. // Execute GET on bucket to get replication config.
  208. resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
  209. bucketName: bucketName,
  210. queryValues: urlValues,
  211. })
  212. defer closeResponse(resp)
  213. if err != nil {
  214. return rinfo, err
  215. }
  216. if resp.StatusCode != http.StatusOK {
  217. return rinfo, httpRespToErrorResponse(resp, bucketName, "")
  218. }
  219. respBytes, err := ioutil.ReadAll(resp.Body)
  220. if err != nil {
  221. return rinfo, err
  222. }
  223. if err := json.Unmarshal(respBytes, &rinfo); err != nil {
  224. return rinfo, err
  225. }
  226. return rinfo, nil
  227. }