2022-10-13 20:04:56 +08:00
|
|
|
//go:build !plan9 && !solaris && !js
|
|
|
|
// +build !plan9,!solaris,!js
|
|
|
|
|
|
|
|
package oracleobjectstorage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/base64"
|
|
|
|
"encoding/hex"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/http"
|
|
|
|
"regexp"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ncw/swift/v2"
|
|
|
|
"github.com/oracle/oci-go-sdk/v65/common"
|
|
|
|
"github.com/oracle/oci-go-sdk/v65/objectstorage"
|
|
|
|
"github.com/oracle/oci-go-sdk/v65/objectstorage/transfer"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
|
|
"github.com/rclone/rclone/fs/hash"
|
|
|
|
"github.com/rclone/rclone/lib/atexit"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// Object Interface Implementation
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
|
|
|
|
const (
|
|
|
|
metaMtime = "mtime" // the meta key to store mtime in - e.g. X-Amz-Meta-Mtime
|
|
|
|
metaMD5Hash = "md5chksum" // the meta key to store md5hash in
|
|
|
|
// StandardTier object storage tier
|
|
|
|
ociMetaPrefix = "opc-meta-"
|
|
|
|
)
|
|
|
|
|
|
|
|
var archive = "archive"
|
|
|
|
var infrequentAccess = "infrequentaccess"
|
|
|
|
var standard = "standard"
|
|
|
|
|
|
|
|
var storageTierMap = map[string]*string{
|
|
|
|
archive: &archive,
|
|
|
|
infrequentAccess: &infrequentAccess,
|
|
|
|
standard: &standard,
|
|
|
|
}
|
|
|
|
|
|
|
|
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
|
|
|
|
|
|
|
// Object describes a oci bucket object
|
|
|
|
type Object struct {
|
|
|
|
fs *Fs // what this object is part of
|
|
|
|
remote string // The remote path
|
|
|
|
md5 string // MD5 hash if known
|
|
|
|
bytes int64 // Size of the object
|
|
|
|
lastModified time.Time // The modified time of the object if known
|
|
|
|
meta map[string]string // The object metadata if known - may be nil
|
|
|
|
mimeType string // Content-Type of the object
|
|
|
|
|
|
|
|
// Metadata as pointers to strings as they often won't be present
|
|
|
|
storageTier *string // e.g. Standard
|
|
|
|
}
|
|
|
|
|
|
|
|
// split returns bucket and bucketPath from the object
|
|
|
|
func (o *Object) split() (bucket, bucketPath string) {
|
|
|
|
return o.fs.split(o.remote)
|
|
|
|
}
|
|
|
|
|
|
|
|
// readMetaData gets the metadata if it hasn't already been fetched
|
|
|
|
func (o *Object) readMetaData(ctx context.Context) (err error) {
|
|
|
|
fs.Debugf(o, "trying to read metadata %v", o.remote)
|
|
|
|
if o.meta != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
info, err := o.headObject(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return o.decodeMetaDataHead(info)
|
|
|
|
}
|
|
|
|
|
|
|
|
// headObject gets the metadata from the object unconditionally
|
|
|
|
func (o *Object) headObject(ctx context.Context) (info *objectstorage.HeadObjectResponse, err error) {
|
|
|
|
bucketName, objectPath := o.split()
|
|
|
|
req := objectstorage.HeadObjectRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
ObjectName: common.String(objectPath),
|
|
|
|
}
|
2023-02-17 04:07:34 +08:00
|
|
|
useBYOKHeadObject(o.fs, &req)
|
2022-10-13 20:04:56 +08:00
|
|
|
var response objectstorage.HeadObjectResponse
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
var err error
|
|
|
|
response, err = o.fs.srv.HeadObject(ctx, req)
|
|
|
|
return shouldRetry(ctx, response.HTTPResponse(), err)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
if svcErr, ok := err.(common.ServiceError); ok {
|
|
|
|
if svcErr.GetHTTPStatusCode() == http.StatusNotFound {
|
|
|
|
return nil, fs.ErrorObjectNotFound
|
|
|
|
}
|
|
|
|
}
|
2023-02-17 04:07:34 +08:00
|
|
|
fs.Errorf(o, "Failed to head object: %v", err)
|
2022-10-13 20:04:56 +08:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
o.fs.cache.MarkOK(bucketName)
|
|
|
|
return &response, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) decodeMetaDataHead(info *objectstorage.HeadObjectResponse) (err error) {
|
|
|
|
return o.setMetaData(
|
|
|
|
info.ContentLength,
|
|
|
|
info.ContentMd5,
|
|
|
|
info.ContentType,
|
|
|
|
info.LastModified,
|
|
|
|
info.StorageTier,
|
|
|
|
info.OpcMeta)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) decodeMetaDataObject(info *objectstorage.GetObjectResponse) (err error) {
|
|
|
|
return o.setMetaData(
|
|
|
|
info.ContentLength,
|
|
|
|
info.ContentMd5,
|
|
|
|
info.ContentType,
|
|
|
|
info.LastModified,
|
|
|
|
info.StorageTier,
|
|
|
|
info.OpcMeta)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) setMetaData(
|
|
|
|
contentLength *int64,
|
|
|
|
contentMd5 *string,
|
|
|
|
contentType *string,
|
|
|
|
lastModified *common.SDKTime,
|
|
|
|
storageTier interface{},
|
|
|
|
meta map[string]string) error {
|
|
|
|
|
|
|
|
if contentLength != nil {
|
|
|
|
o.bytes = *contentLength
|
|
|
|
}
|
|
|
|
if contentMd5 != nil {
|
|
|
|
md5, err := o.base64ToMd5(*contentMd5)
|
|
|
|
if err == nil {
|
|
|
|
o.md5 = md5
|
|
|
|
}
|
|
|
|
}
|
|
|
|
o.meta = meta
|
|
|
|
if o.meta == nil {
|
|
|
|
o.meta = map[string]string{}
|
|
|
|
}
|
|
|
|
// Read MD5 from metadata if present
|
|
|
|
if md5sumBase64, ok := o.meta[metaMD5Hash]; ok {
|
|
|
|
md5, err := o.base64ToMd5(md5sumBase64)
|
|
|
|
if err != nil {
|
|
|
|
o.md5 = md5
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if lastModified == nil {
|
|
|
|
o.lastModified = time.Now()
|
|
|
|
fs.Logf(o, "Failed to read last modified")
|
|
|
|
} else {
|
|
|
|
o.lastModified = lastModified.Time
|
|
|
|
}
|
|
|
|
if contentType != nil {
|
|
|
|
o.mimeType = *contentType
|
|
|
|
}
|
|
|
|
if storageTier == nil || storageTier == "" {
|
|
|
|
o.storageTier = storageTierMap[standard]
|
|
|
|
} else {
|
|
|
|
tier := strings.ToLower(fmt.Sprintf("%v", storageTier))
|
|
|
|
o.storageTier = storageTierMap[tier]
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) base64ToMd5(md5sumBase64 string) (md5 string, err error) {
|
|
|
|
md5sumBytes, err := base64.StdEncoding.DecodeString(md5sumBase64)
|
|
|
|
if err != nil {
|
|
|
|
fs.Debugf(o, "Failed to read md5sum from metadata %q: %v", md5sumBase64, err)
|
|
|
|
return "", err
|
|
|
|
} else if len(md5sumBytes) != 16 {
|
|
|
|
fs.Debugf(o, "failed to read md5sum from metadata %q: wrong length", md5sumBase64)
|
|
|
|
return "", fmt.Errorf("failed to read md5sum from metadata %q: wrong length", md5sumBase64)
|
|
|
|
}
|
|
|
|
return hex.EncodeToString(md5sumBytes), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Fs returns the parent Fs
|
|
|
|
func (o *Object) Fs() fs.Info {
|
|
|
|
return o.fs
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remote returns the remote path
|
|
|
|
func (o *Object) Remote() string {
|
|
|
|
return o.remote
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return a string version
|
|
|
|
func (o *Object) String() string {
|
|
|
|
if o == nil {
|
|
|
|
return "<nil>"
|
|
|
|
}
|
|
|
|
return o.remote
|
|
|
|
}
|
|
|
|
|
|
|
|
// Size returns the size of an object in bytes
|
|
|
|
func (o *Object) Size() int64 {
|
|
|
|
return o.bytes
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetTier returns storage class as string
|
|
|
|
func (o *Object) GetTier() string {
|
|
|
|
if o.storageTier == nil || *o.storageTier == "" {
|
|
|
|
return standard
|
|
|
|
}
|
|
|
|
return *o.storageTier
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetTier performs changing storage class
|
|
|
|
func (o *Object) SetTier(tier string) (err error) {
|
|
|
|
ctx := context.TODO()
|
|
|
|
tier = strings.ToLower(tier)
|
|
|
|
bucketName, bucketPath := o.split()
|
|
|
|
tierEnum, ok := objectstorage.GetMappingStorageTierEnum(tier)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("not a valid storage tier %v ", tier)
|
|
|
|
}
|
|
|
|
|
|
|
|
req := objectstorage.UpdateObjectStorageTierRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
UpdateObjectStorageTierDetails: objectstorage.UpdateObjectStorageTierDetails{
|
|
|
|
ObjectName: common.String(bucketPath),
|
|
|
|
StorageTier: tierEnum,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
_, err = o.fs.srv.UpdateObjectStorageTier(ctx, req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
o.storageTier = storageTierMap[tier]
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// MimeType of an Object if known, "" otherwise
|
|
|
|
func (o *Object) MimeType(ctx context.Context) string {
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
if err != nil {
|
|
|
|
fs.Logf(o, "Failed to read metadata: %v", err)
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
return o.mimeType
|
|
|
|
}
|
|
|
|
|
|
|
|
// Hash returns the MD5 of an object returning a lowercase hex string
|
|
|
|
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
|
|
|
|
if t != hash.MD5 {
|
|
|
|
return "", hash.ErrUnsupported
|
|
|
|
}
|
|
|
|
// Convert base64 encoded md5 into lower case hex
|
|
|
|
if o.md5 == "" {
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return o.md5, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ModTime returns the modification time of the object
|
|
|
|
//
|
|
|
|
// It attempts to read the objects mtime and if that isn't present the
|
|
|
|
// LastModified returned to the http headers
|
|
|
|
func (o *Object) ModTime(ctx context.Context) (result time.Time) {
|
|
|
|
if o.fs.ci.UseServerModTime {
|
|
|
|
return o.lastModified
|
|
|
|
}
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
if err != nil {
|
|
|
|
fs.Logf(o, "Failed to read metadata: %v", err)
|
|
|
|
return time.Now()
|
|
|
|
}
|
|
|
|
// read mtime out of metadata if available
|
|
|
|
d, ok := o.meta[metaMtime]
|
|
|
|
if !ok || d == "" {
|
|
|
|
return o.lastModified
|
|
|
|
}
|
|
|
|
modTime, err := swift.FloatStringToTime(d)
|
|
|
|
if err != nil {
|
|
|
|
fs.Logf(o, "Failed to read mtime from object: %v", err)
|
|
|
|
return o.lastModified
|
|
|
|
}
|
|
|
|
return modTime
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetModTime sets the modification time of the local fs object
|
|
|
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
o.meta[metaMtime] = swift.TimeToFloatString(modTime)
|
|
|
|
_, err = o.fs.Copy(ctx, o, o.remote)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Storable returns if this object is storable
|
|
|
|
func (o *Object) Storable() bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remove an object
|
|
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
|
|
bucketName, bucketPath := o.split()
|
|
|
|
req := objectstorage.DeleteObjectRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
ObjectName: common.String(bucketPath),
|
|
|
|
}
|
|
|
|
err := o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
resp, err := o.fs.srv.DeleteObject(ctx, req)
|
|
|
|
return shouldRetry(ctx, resp.HTTPResponse(), err)
|
|
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Open object file
|
|
|
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
|
|
|
bucketName, bucketPath := o.split()
|
|
|
|
req := objectstorage.GetObjectRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
ObjectName: common.String(bucketPath),
|
|
|
|
}
|
|
|
|
o.applyGetObjectOptions(&req, options...)
|
2023-02-17 04:07:34 +08:00
|
|
|
useBYOKGetObject(o.fs, &req)
|
2022-10-13 20:04:56 +08:00
|
|
|
var resp objectstorage.GetObjectResponse
|
|
|
|
err := o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
var err error
|
|
|
|
resp, err = o.fs.srv.GetObject(ctx, req)
|
|
|
|
return shouldRetry(ctx, resp.HTTPResponse(), err)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// read size from ContentLength or ContentRange
|
|
|
|
bytes := resp.ContentLength
|
|
|
|
if resp.ContentRange != nil {
|
|
|
|
var contentRange = *resp.ContentRange
|
|
|
|
slash := strings.IndexRune(contentRange, '/')
|
|
|
|
if slash >= 0 {
|
|
|
|
i, err := strconv.ParseInt(contentRange[slash+1:], 10, 64)
|
|
|
|
if err == nil {
|
|
|
|
bytes = &i
|
|
|
|
} else {
|
|
|
|
fs.Debugf(o, "Failed to find parse integer from in %q: %v", contentRange, err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fs.Debugf(o, "Failed to find length in %q", contentRange)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
err = o.decodeMetaDataObject(&resp)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
o.bytes = *bytes
|
|
|
|
return resp.HTTPResponse().Body, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update an object if it has changed
|
|
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
|
|
|
bucketName, bucketPath := o.split()
|
|
|
|
err = o.fs.makeBucket(ctx, bucketName)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// determine if we like upload single or multipart.
|
|
|
|
size := src.Size()
|
|
|
|
multipart := size >= int64(o.fs.opt.UploadCutoff)
|
|
|
|
|
|
|
|
// Set the mtime in the metadata
|
|
|
|
modTime := src.ModTime(ctx)
|
|
|
|
metadata := map[string]string{
|
|
|
|
metaMtime: swift.TimeToFloatString(modTime),
|
|
|
|
}
|
|
|
|
|
|
|
|
// read the md5sum if available
|
|
|
|
// - for non-multipart
|
|
|
|
// - so we can add a ContentMD5
|
|
|
|
// - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C
|
|
|
|
// - for multipart provided checksums aren't disabled
|
|
|
|
// - so we can add the md5sum in the metadata as metaMD5Hash
|
|
|
|
var md5sumBase64 string
|
|
|
|
var md5sumHex string
|
|
|
|
if !multipart || !o.fs.opt.DisableChecksum {
|
|
|
|
md5sumHex, err = src.Hash(ctx, hash.MD5)
|
|
|
|
if err == nil && matchMd5.MatchString(md5sumHex) {
|
|
|
|
hashBytes, err := hex.DecodeString(md5sumHex)
|
|
|
|
if err == nil {
|
|
|
|
md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes)
|
|
|
|
if multipart && !o.fs.opt.DisableChecksum {
|
|
|
|
// Set the md5sum as metadata on the object if
|
|
|
|
// - a multipart upload
|
|
|
|
// - the ETag is not an MD5, e.g. when using SSE/SSE-C
|
|
|
|
// provided checksums aren't disabled
|
|
|
|
metadata[metaMD5Hash] = md5sumBase64
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Guess the content type
|
|
|
|
mimeType := fs.MimeType(ctx, src)
|
|
|
|
|
|
|
|
if multipart {
|
|
|
|
chunkSize := int64(o.fs.opt.ChunkSize)
|
|
|
|
uploadRequest := transfer.UploadRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
ObjectName: common.String(bucketPath),
|
|
|
|
ContentType: common.String(mimeType),
|
|
|
|
PartSize: common.Int64(chunkSize),
|
|
|
|
AllowMultipartUploads: common.Bool(true),
|
|
|
|
AllowParrallelUploads: common.Bool(true),
|
|
|
|
ObjectStorageClient: o.fs.srv,
|
|
|
|
EnableMultipartChecksumVerification: common.Bool(!o.fs.opt.DisableChecksum),
|
|
|
|
NumberOfGoroutines: common.Int(o.fs.opt.UploadConcurrency),
|
|
|
|
Metadata: metadataWithOpcPrefix(metadata),
|
|
|
|
}
|
|
|
|
if o.fs.opt.StorageTier != "" {
|
|
|
|
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
|
|
|
|
}
|
|
|
|
uploadRequest.StorageTier = storageTier
|
|
|
|
}
|
|
|
|
o.applyMultiPutOptions(&uploadRequest, options...)
|
2023-02-17 04:07:34 +08:00
|
|
|
useBYOKUpload(o.fs, &uploadRequest)
|
2022-10-13 20:04:56 +08:00
|
|
|
uploadStreamRequest := transfer.UploadStreamRequest{
|
|
|
|
UploadRequest: uploadRequest,
|
|
|
|
StreamReader: in,
|
|
|
|
}
|
|
|
|
uploadMgr := transfer.NewUploadManager()
|
|
|
|
var uploadID = ""
|
|
|
|
|
|
|
|
defer atexit.OnError(&err, func() {
|
|
|
|
if uploadID == "" {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if o.fs.opt.LeavePartsOnError {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
fs.Debugf(o, "Cancelling multipart upload")
|
|
|
|
errCancel := o.fs.abortMultiPartUpload(
|
|
|
|
context.Background(),
|
|
|
|
bucketName,
|
|
|
|
bucketPath,
|
|
|
|
uploadID)
|
|
|
|
if errCancel != nil {
|
|
|
|
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
|
|
|
|
}
|
|
|
|
})()
|
|
|
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
uploadResponse, err := uploadMgr.UploadStream(ctx, uploadStreamRequest)
|
|
|
|
var httpResponse *http.Response
|
|
|
|
if err == nil {
|
|
|
|
if uploadResponse.Type == transfer.MultipartUpload {
|
|
|
|
if uploadResponse.MultipartUploadResponse != nil {
|
|
|
|
httpResponse = uploadResponse.MultipartUploadResponse.HTTPResponse()
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if uploadResponse.SinglepartUploadResponse != nil {
|
|
|
|
httpResponse = uploadResponse.SinglepartUploadResponse.HTTPResponse()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
uploadID := ""
|
|
|
|
if uploadResponse.MultipartUploadResponse != nil && uploadResponse.MultipartUploadResponse.UploadID != nil {
|
|
|
|
uploadID = *uploadResponse.MultipartUploadResponse.UploadID
|
|
|
|
fs.Debugf(o, "multipart streaming upload failed, aborting uploadID: %v, may retry", uploadID)
|
|
|
|
_ = o.fs.abortMultiPartUpload(ctx, bucketName, bucketPath, uploadID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return shouldRetry(ctx, httpResponse, err)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
fs.Errorf(o, "multipart streaming upload failed %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
req := objectstorage.PutObjectRequest{
|
|
|
|
NamespaceName: common.String(o.fs.opt.Namespace),
|
|
|
|
BucketName: common.String(bucketName),
|
|
|
|
ObjectName: common.String(bucketPath),
|
|
|
|
ContentType: common.String(mimeType),
|
|
|
|
PutObjectBody: io.NopCloser(in),
|
|
|
|
OpcMeta: metadata,
|
|
|
|
}
|
|
|
|
if size >= 0 {
|
|
|
|
req.ContentLength = common.Int64(size)
|
|
|
|
}
|
|
|
|
if o.fs.opt.StorageTier != "" {
|
|
|
|
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
|
|
|
|
}
|
|
|
|
req.StorageTier = storageTier
|
|
|
|
}
|
|
|
|
o.applyPutOptions(&req, options...)
|
2023-02-17 04:07:34 +08:00
|
|
|
useBYOKPutObject(o.fs, &req)
|
|
|
|
var resp objectstorage.PutObjectResponse
|
2022-10-13 20:04:56 +08:00
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
2023-02-17 04:07:34 +08:00
|
|
|
resp, err = o.fs.srv.PutObject(ctx, req)
|
2022-10-13 20:04:56 +08:00
|
|
|
return shouldRetry(ctx, resp.HTTPResponse(), err)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
fs.Errorf(o, "put object failed %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Read the metadata from the newly created object
|
|
|
|
o.meta = nil // wipe old metadata
|
|
|
|
return o.readMetaData(ctx)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) applyPutOptions(req *objectstorage.PutObjectRequest, options ...fs.OpenOption) {
|
|
|
|
// Apply upload options
|
|
|
|
for _, option := range options {
|
|
|
|
key, value := option.Header()
|
|
|
|
lowerKey := strings.ToLower(key)
|
|
|
|
switch lowerKey {
|
|
|
|
case "":
|
|
|
|
// ignore
|
|
|
|
case "cache-control":
|
|
|
|
req.CacheControl = common.String(value)
|
|
|
|
case "content-disposition":
|
|
|
|
req.ContentDisposition = common.String(value)
|
|
|
|
case "content-encoding":
|
|
|
|
req.ContentEncoding = common.String(value)
|
|
|
|
case "content-language":
|
|
|
|
req.ContentLanguage = common.String(value)
|
|
|
|
case "content-type":
|
|
|
|
req.ContentType = common.String(value)
|
|
|
|
default:
|
|
|
|
if strings.HasPrefix(lowerKey, ociMetaPrefix) {
|
|
|
|
req.OpcMeta[lowerKey] = value
|
|
|
|
} else {
|
|
|
|
fs.Errorf(o, "Don't know how to set key %q on upload", key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) applyGetObjectOptions(req *objectstorage.GetObjectRequest, options ...fs.OpenOption) {
|
|
|
|
fs.FixRangeOption(options, o.bytes)
|
|
|
|
for _, option := range options {
|
|
|
|
switch option.(type) {
|
|
|
|
case *fs.RangeOption, *fs.SeekOption:
|
|
|
|
_, value := option.Header()
|
|
|
|
req.Range = &value
|
|
|
|
default:
|
|
|
|
if option.Mandatory() {
|
|
|
|
fs.Logf(o, "Unsupported mandatory option: %v", option)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Apply upload options
|
|
|
|
for _, option := range options {
|
|
|
|
key, value := option.Header()
|
|
|
|
lowerKey := strings.ToLower(key)
|
|
|
|
switch lowerKey {
|
|
|
|
case "":
|
|
|
|
// ignore
|
|
|
|
case "cache-control":
|
|
|
|
req.HttpResponseCacheControl = common.String(value)
|
|
|
|
case "content-disposition":
|
|
|
|
req.HttpResponseContentDisposition = common.String(value)
|
|
|
|
case "content-encoding":
|
|
|
|
req.HttpResponseContentEncoding = common.String(value)
|
|
|
|
case "content-language":
|
|
|
|
req.HttpResponseContentLanguage = common.String(value)
|
|
|
|
case "content-type":
|
|
|
|
req.HttpResponseContentType = common.String(value)
|
|
|
|
case "range":
|
|
|
|
// do nothing
|
|
|
|
default:
|
|
|
|
fs.Errorf(o, "Don't know how to set key %q on upload", key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Object) applyMultiPutOptions(req *transfer.UploadRequest, options ...fs.OpenOption) {
|
|
|
|
// Apply upload options
|
|
|
|
for _, option := range options {
|
|
|
|
key, value := option.Header()
|
|
|
|
lowerKey := strings.ToLower(key)
|
|
|
|
switch lowerKey {
|
|
|
|
case "":
|
|
|
|
// ignore
|
|
|
|
case "content-encoding":
|
|
|
|
req.ContentEncoding = common.String(value)
|
|
|
|
case "content-language":
|
|
|
|
req.ContentLanguage = common.String(value)
|
|
|
|
case "content-type":
|
|
|
|
req.ContentType = common.String(value)
|
|
|
|
default:
|
|
|
|
if strings.HasPrefix(lowerKey, ociMetaPrefix) {
|
|
|
|
req.Metadata[lowerKey] = value
|
|
|
|
} else {
|
|
|
|
fs.Errorf(o, "Don't know how to set key %q on upload", key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func metadataWithOpcPrefix(src map[string]string) map[string]string {
|
|
|
|
dst := make(map[string]string)
|
|
|
|
for lowerKey, value := range src {
|
|
|
|
if !strings.HasPrefix(lowerKey, ociMetaPrefix) {
|
|
|
|
dst[ociMetaPrefix+lowerKey] = value
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return dst
|
|
|
|
}
|