2017-07-25 22:18:13 +08:00
// Package azureblob provides an interface to the Microsoft Azure blob object storage system
2018-07-13 23:21:49 +08:00
// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8
2017-07-25 22:18:13 +08:00
package azureblob
import (
"bytes"
2018-07-13 23:21:49 +08:00
"context"
2018-09-07 12:43:40 +08:00
"crypto/md5"
2017-07-25 22:18:13 +08:00
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/http"
2018-03-24 22:01:23 +08:00
"net/url"
2017-07-25 22:18:13 +08:00
"path"
"regexp"
"strconv"
"strings"
"sync"
"time"
2018-07-13 23:21:49 +08:00
"github.com/Azure/azure-storage-blob-go/2018-03-28/azblob"
2017-07-25 22:18:13 +08:00
"github.com/ncw/rclone/fs"
2018-02-02 00:35:08 +08:00
"github.com/ncw/rclone/fs/accounting"
2018-05-15 01:06:57 +08:00
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
2018-01-13 00:30:54 +08:00
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk"
2018-01-12 00:29:20 +08:00
"github.com/ncw/rclone/lib/pacer"
2017-07-25 22:18:13 +08:00
"github.com/pkg/errors"
)
const (
2018-07-13 23:21:49 +08:00
minSleep = 10 * time . Millisecond
maxSleep = 10 * time . Second
decayConstant = 1 // bigger for slower decay, exponential
2018-09-11 03:45:06 +08:00
maxListChunkSize = 5000 // number of items to read at once
2018-07-13 23:21:49 +08:00
modTimeKey = "mtime"
timeFormatIn = time . RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
maxTotalParts = 50000 // in multipart upload
storageDefaultBaseURL = "blob.core.windows.net"
2018-05-04 22:31:55 +08:00
// maxUncommittedSize = 9 << 30 // can't upload bigger than this
2018-05-15 01:06:57 +08:00
defaultChunkSize = 4 * 1024 * 1024
maxChunkSize = 100 * 1024 * 1024
defaultUploadCutoff = 256 * 1024 * 1024
maxUploadCutoff = 256 * 1024 * 1024
2018-08-20 00:53:59 +08:00
defaultAccessTier = azblob . AccessTierNone
2017-07-25 22:18:13 +08:00
)
// Register with Fs
func init ( ) {
fs . Register ( & fs . RegInfo {
Name : "azureblob" ,
Description : "Microsoft Azure Blob Storage" ,
NewFs : NewFs ,
Options : [ ] fs . Option { {
Name : "account" ,
2018-03-24 22:01:23 +08:00
Help : "Storage Account Name (leave blank to use connection string or SAS URL)" ,
2017-07-25 22:18:13 +08:00
} , {
Name : "key" ,
2018-03-24 22:01:23 +08:00
Help : "Storage Account Key (leave blank to use connection string or SAS URL)" ,
} , {
Name : "sas_url" ,
Help : "SAS URL for container level access only\n(leave blank if using account/key or connection string)" ,
2017-07-25 22:18:13 +08:00
} , {
2018-05-15 01:06:57 +08:00
Name : "endpoint" ,
Help : "Endpoint for the service\nLeave blank normally." ,
Advanced : true ,
} , {
Name : "upload_cutoff" ,
Help : "Cutoff for switching to chunked upload." ,
Default : fs . SizeSuffix ( defaultUploadCutoff ) ,
Advanced : true ,
} , {
Name : "chunk_size" ,
Help : "Upload chunk size. Must fit in memory." ,
Default : fs . SizeSuffix ( defaultChunkSize ) ,
Advanced : true ,
2018-09-11 03:45:06 +08:00
} , {
Name : "list_chunk" ,
Help : "Size of blob list." ,
Default : maxListChunkSize ,
Advanced : true ,
2018-08-20 00:53:59 +08:00
} , {
Name : "access_tier" ,
Help : "Access tier of blob, supports hot, cool and archive tiers.\nArchived blobs can be restored by setting access tier to hot or cool." +
" Leave blank if you intend to use default access tier, which is set at account level" ,
Advanced : true ,
2018-05-15 01:06:57 +08:00
} } ,
2017-07-25 22:18:13 +08:00
} )
2018-05-15 01:06:57 +08:00
}
// Options defines the configuration for this backend
type Options struct {
2018-09-11 03:45:06 +08:00
Account string ` config:"account" `
Key string ` config:"key" `
Endpoint string ` config:"endpoint" `
SASURL string ` config:"sas_url" `
UploadCutoff fs . SizeSuffix ` config:"upload_cutoff" `
ChunkSize fs . SizeSuffix ` config:"chunk_size" `
ListChunkSize uint ` config:"list_chunk" `
AccessTier string ` config:"access_tier" `
2017-07-25 22:18:13 +08:00
}
// Fs represents a remote azure server
type Fs struct {
2018-07-13 23:21:49 +08:00
name string // name of this remote
root string // the path we are working on if any
2018-05-15 01:06:57 +08:00
opt Options // parsed config options
2018-07-13 23:21:49 +08:00
features * fs . Features // optional features
svcURL * azblob . ServiceURL // reference to serviceURL
cntURL * azblob . ContainerURL // reference to containerURL
2017-07-25 22:18:13 +08:00
container string // the container we are working on
containerOKMu sync . Mutex // mutex to protect container OK
containerOK bool // true if we have created the container
containerDeleted bool // true if we have deleted the container
pacer * pacer . Pacer // To pace and retry the API calls
uploadToken * pacer . TokenDispenser // control concurrency
}
// Object describes a azure object
type Object struct {
2018-07-13 23:21:49 +08:00
fs * Fs // what this object is part of
remote string // The remote path
modTime time . Time // The modified time of the object if known
md5 string // MD5 hash if known
size int64 // Size of the object
mimeType string // Content-Type of the object
accessTier azblob . AccessTierType // Blob Access Tier
meta map [ string ] string // blob metadata
2017-07-25 22:18:13 +08:00
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func ( f * Fs ) Name ( ) string {
return f . name
}
// Root of the remote (as passed into NewFs)
func ( f * Fs ) Root ( ) string {
if f . root == "" {
return f . container
}
return f . container + "/" + f . root
}
// String converts this Fs to a string
func ( f * Fs ) String ( ) string {
if f . root == "" {
return fmt . Sprintf ( "Azure container %s" , f . container )
}
return fmt . Sprintf ( "Azure container %s path %s" , f . container , f . root )
}
// Features returns the optional features of this Fs
func ( f * Fs ) Features ( ) * fs . Features {
return f . features
}
// Pattern to match a azure path
2018-08-27 06:19:28 +08:00
var matcher = regexp . MustCompile ( ` ^/*([^/]*)(.*)$ ` )
2017-07-25 22:18:13 +08:00
// parseParse parses a azure 'url'
func parsePath ( path string ) ( container , directory string , err error ) {
parts := matcher . FindStringSubmatch ( path )
if parts == nil {
err = errors . Errorf ( "couldn't find container in azure path %q" , path )
} else {
container , directory = parts [ 1 ] , parts [ 2 ]
directory = strings . Trim ( directory , "/" )
}
return
}
// retryErrorCodes is a slice of error codes that we will retry
var retryErrorCodes = [ ] int {
401 , // Unauthorized (eg "Token has expired")
408 , // Request Timeout
429 , // Rate exceeded.
500 , // Get occasional 500 Internal Server Error
503 , // Service Unavailable
504 , // Gateway Time-out
}
// shouldRetry returns a boolean as to whether this resp and err
// deserve to be retried. It returns the err as a convenience
func ( f * Fs ) shouldRetry ( err error ) ( bool , error ) {
// FIXME interpret special errors - more to do here
2018-07-13 23:21:49 +08:00
if storageErr , ok := err . ( azblob . StorageError ) ; ok {
statusCode := storageErr . Response ( ) . StatusCode
2017-07-25 22:18:13 +08:00
for _ , e := range retryErrorCodes {
if statusCode == e {
return true , err
}
}
}
2018-01-13 00:30:54 +08:00
return fserrors . ShouldRetry ( err ) , err
2017-07-25 22:18:13 +08:00
}
// NewFs contstructs an Fs from the path, container:path
2018-05-15 01:06:57 +08:00
func NewFs ( name , root string , m configmap . Mapper ) ( fs . Fs , error ) {
// Parse config into Options struct
opt := new ( Options )
err := configstruct . Set ( m , opt )
if err != nil {
return nil , err
2017-07-25 22:18:13 +08:00
}
2018-05-15 01:06:57 +08:00
if opt . UploadCutoff > maxUploadCutoff {
return nil , errors . Errorf ( "azure: upload cutoff (%v) must be less than or equal to %v" , opt . UploadCutoff , maxUploadCutoff )
}
if opt . ChunkSize > maxChunkSize {
return nil , errors . Errorf ( "azure: chunk size can't be greater than %v - was %v" , maxChunkSize , opt . ChunkSize )
2017-07-25 22:18:13 +08:00
}
2018-09-11 03:45:06 +08:00
if opt . ListChunkSize > maxListChunkSize {
return nil , errors . Errorf ( "azure: blob list size can't be greater than %v - was %v" , maxListChunkSize , opt . ListChunkSize )
}
2017-07-25 22:18:13 +08:00
container , directory , err := parsePath ( root )
if err != nil {
return nil , err
}
2018-05-15 01:06:57 +08:00
if opt . Endpoint == "" {
opt . Endpoint = storageDefaultBaseURL
}
2017-07-25 22:18:13 +08:00
2018-08-20 00:53:59 +08:00
if opt . AccessTier == "" {
opt . AccessTier = string ( defaultAccessTier )
} else {
switch opt . AccessTier {
case string ( azblob . AccessTierHot ) :
case string ( azblob . AccessTierCool ) :
case string ( azblob . AccessTierArchive ) :
// valid cases
default :
return nil , errors . Errorf ( "azure: Supported access tiers are %s, %s and %s" , string ( azblob . AccessTierHot ) , string ( azblob . AccessTierCool ) , azblob . AccessTierArchive )
}
}
2018-03-24 22:01:23 +08:00
var (
2018-07-13 23:21:49 +08:00
u * url . URL
serviceURL azblob . ServiceURL
containerURL azblob . ContainerURL
2018-03-24 22:01:23 +08:00
)
switch {
2018-05-15 01:06:57 +08:00
case opt . Account != "" && opt . Key != "" :
2018-09-07 12:43:40 +08:00
credential , err := azblob . NewSharedKeyCredential ( opt . Account , opt . Key )
if err != nil {
return nil , errors . Wrapf ( err , "Failed to parse credentials" )
}
2018-05-15 01:06:57 +08:00
u , err = url . Parse ( fmt . Sprintf ( "https://%s.%s" , opt . Account , opt . Endpoint ) )
2018-03-24 22:01:23 +08:00
if err != nil {
2018-07-13 23:21:49 +08:00
return nil , errors . Wrap ( err , "failed to make azure storage url from account and endpoint" )
2018-03-24 22:01:23 +08:00
}
2018-07-13 23:21:49 +08:00
pipeline := azblob . NewPipeline ( credential , azblob . PipelineOptions { } )
serviceURL = azblob . NewServiceURL ( * u , pipeline )
containerURL = serviceURL . NewContainerURL ( container )
2018-05-15 01:06:57 +08:00
case opt . SASURL != "" :
u , err = url . Parse ( opt . SASURL )
2018-03-24 22:01:23 +08:00
if err != nil {
return nil , errors . Wrapf ( err , "failed to parse SAS URL" )
}
2018-07-13 23:21:49 +08:00
// use anonymous credentials in case of sas url
pipeline := azblob . NewPipeline ( azblob . NewAnonymousCredential ( ) , azblob . PipelineOptions { } )
// Check if we have container level SAS or account level sas
parts := azblob . NewBlobURLParts ( * u )
if parts . ContainerName != "" {
if container != "" && parts . ContainerName != container {
return nil , errors . New ( "Container name in SAS URL and container provided in command do not match" )
}
container = parts . ContainerName
containerURL = azblob . NewContainerURL ( * u , pipeline )
} else {
serviceURL = azblob . NewServiceURL ( * u , pipeline )
containerURL = serviceURL . NewContainerURL ( container )
2018-03-24 22:01:23 +08:00
}
default :
return nil , errors . New ( "Need account+key or connectionString or sasURL" )
2017-07-25 22:18:13 +08:00
}
f := & Fs {
name : name ,
2018-05-15 01:06:57 +08:00
opt : * opt ,
2017-07-25 22:18:13 +08:00
container : container ,
root : directory ,
2018-07-13 23:21:49 +08:00
svcURL : & serviceURL ,
cntURL : & containerURL ,
2017-07-25 22:18:13 +08:00
pacer : pacer . New ( ) . SetMinSleep ( minSleep ) . SetMaxSleep ( maxSleep ) . SetDecayConstant ( decayConstant ) ,
uploadToken : pacer . NewTokenDispenser ( fs . Config . Transfers ) ,
}
2017-08-09 22:27:43 +08:00
f . features = ( & fs . Features {
ReadMimeType : true ,
WriteMimeType : true ,
BucketBased : true ,
} ) . Fill ( f )
2017-07-25 22:18:13 +08:00
if f . root != "" {
f . root += "/"
// Check to see if the (container,directory) is actually an existing file
oldRoot := f . root
remote := path . Base ( directory )
f . root = path . Dir ( directory )
if f . root == "." {
f . root = ""
} else {
f . root += "/"
}
_ , err := f . NewObject ( remote )
if err != nil {
if err == fs . ErrorObjectNotFound {
// File doesn't exist so return old f
f . root = oldRoot
return f , nil
}
return nil , err
}
// return an error with an fs which points to the parent
return f , fs . ErrorIsFile
}
return f , nil
}
// Return an Object from a path
//
// If it can't be found it returns the error fs.ErrorObjectNotFound.
2018-07-13 23:21:49 +08:00
func ( f * Fs ) newObjectWithInfo ( remote string , info * azblob . BlobItem ) ( fs . Object , error ) {
2017-07-25 22:18:13 +08:00
o := & Object {
fs : f ,
remote : remote ,
}
if info != nil {
2018-07-13 23:21:49 +08:00
err := o . decodeMetaDataFromBlob ( info )
2017-07-25 22:18:13 +08:00
if err != nil {
return nil , err
}
} else {
err := o . readMetaData ( ) // reads info and headers, returning an error
if err != nil {
return nil , err
}
}
return o , nil
}
// NewObject finds the Object at remote. If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func ( f * Fs ) NewObject ( remote string ) ( fs . Object , error ) {
return f . newObjectWithInfo ( remote , nil )
}
// getBlobReference creates an empty blob reference with no metadata
2018-07-13 23:21:49 +08:00
func ( f * Fs ) getBlobReference ( remote string ) azblob . BlobURL {
return f . cntURL . NewBlobURL ( f . root + remote )
2017-07-25 22:18:13 +08:00
}
2018-07-13 23:21:49 +08:00
// updateMetadataWithModTime adds the modTime passed in to o.meta.
func ( o * Object ) updateMetadataWithModTime ( modTime time . Time ) {
2017-07-25 22:18:13 +08:00
// Make sure o.meta is not nil
if o . meta == nil {
o . meta = make ( map [ string ] string , 1 )
}
// Set modTimeKey in it
o . meta [ modTimeKey ] = modTime . Format ( timeFormatOut )
}
// listFn is called from list to handle an object
2018-07-13 23:21:49 +08:00
type listFn func ( remote string , object * azblob . BlobItem , isDirectory bool ) error
2017-07-25 22:18:13 +08:00
// list lists the objects into the function supplied from
// the container and root supplied
//
// dir is the starting directory, "" for root
func ( f * Fs ) list ( dir string , recurse bool , maxResults uint , fn listFn ) error {
f . containerOKMu . Lock ( )
deleted := f . containerDeleted
f . containerOKMu . Unlock ( )
if deleted {
return fs . ErrorDirNotFound
}
root := f . root
if dir != "" {
root += dir + "/"
}
delimiter := ""
if ! recurse {
delimiter = "/"
}
2018-07-13 23:21:49 +08:00
options := azblob . ListBlobsSegmentOptions {
Details : azblob . BlobListingDetails {
Copy : false ,
2017-07-25 22:18:13 +08:00
Metadata : true ,
2018-07-13 23:21:49 +08:00
Snapshots : false ,
2017-07-25 22:18:13 +08:00
UncommittedBlobs : false ,
2018-07-13 23:21:49 +08:00
Deleted : false ,
2017-07-25 22:18:13 +08:00
} ,
2018-07-13 23:21:49 +08:00
Prefix : root ,
MaxResults : int32 ( maxResults ) ,
2017-07-25 22:18:13 +08:00
}
2018-07-13 23:21:49 +08:00
ctx := context . Background ( )
for marker := ( azblob . Marker { } ) ; marker . NotDone ( ) ; {
var response * azblob . ListBlobsHierarchySegmentResponse
2017-07-25 22:18:13 +08:00
err := f . pacer . Call ( func ( ) ( bool , error ) {
var err error
2018-07-13 23:21:49 +08:00
response , err = f . cntURL . ListBlobsHierarchySegment ( ctx , marker , delimiter , options )
2017-07-25 22:18:13 +08:00
return f . shouldRetry ( err )
} )
2018-07-13 23:21:49 +08:00
2017-07-25 22:18:13 +08:00
if err != nil {
2018-07-13 23:21:49 +08:00
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
2018-07-15 20:49:15 +08:00
if storageErr , ok := err . ( azblob . StorageError ) ; ok && ( storageErr . ServiceCode ( ) == azblob . ServiceCodeContainerNotFound || storageErr . Response ( ) . StatusCode == http . StatusNotFound ) {
2017-07-25 22:18:13 +08:00
return fs . ErrorDirNotFound
}
return err
}
2018-07-13 23:21:49 +08:00
// Advance marker to next
marker = response . NextMarker
for i := range response . Segment . BlobItems {
file := & response . Segment . BlobItems [ i ]
2017-07-25 22:18:13 +08:00
// Finish if file name no longer has prefix
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
// return nil
// }
if ! strings . HasPrefix ( file . Name , f . root ) {
fs . Debugf ( f , "Odd name received %q" , file . Name )
continue
}
remote := file . Name [ len ( f . root ) : ]
// Check for directory
isDirectory := strings . HasSuffix ( remote , "/" )
if isDirectory {
remote = remote [ : len ( remote ) - 1 ]
}
// Send object
err = fn ( remote , file , isDirectory )
if err != nil {
return err
}
}
// Send the subdirectories
2018-07-13 23:21:49 +08:00
for _ , remote := range response . Segment . BlobPrefixes {
remote := strings . TrimRight ( remote . Name , "/" )
2017-07-25 22:18:13 +08:00
if ! strings . HasPrefix ( remote , f . root ) {
fs . Debugf ( f , "Odd directory name received %q" , remote )
continue
}
remote = remote [ len ( f . root ) : ]
// Send object
err = fn ( remote , nil , true )
if err != nil {
return err
}
}
}
return nil
}
// Convert a list item into a DirEntry
2018-07-13 23:21:49 +08:00
func ( f * Fs ) itemToDirEntry ( remote string , object * azblob . BlobItem , isDirectory bool ) ( fs . DirEntry , error ) {
2017-07-25 22:18:13 +08:00
if isDirectory {
d := fs . NewDir ( remote , time . Time { } )
return d , nil
}
o , err := f . newObjectWithInfo ( remote , object )
if err != nil {
return nil , err
}
return o , nil
}
2018-03-01 20:11:34 +08:00
// mark the container as being OK
func ( f * Fs ) markContainerOK ( ) {
if f . container != "" {
f . containerOKMu . Lock ( )
f . containerOK = true
f . containerDeleted = false
f . containerOKMu . Unlock ( )
}
}
2017-07-25 22:18:13 +08:00
// listDir lists a single directory
func ( f * Fs ) listDir ( dir string ) ( entries fs . DirEntries , err error ) {
2018-09-11 03:45:06 +08:00
err = f . list ( dir , false , f . opt . ListChunkSize , func ( remote string , object * azblob . BlobItem , isDirectory bool ) error {
2017-07-25 22:18:13 +08:00
entry , err := f . itemToDirEntry ( remote , object , isDirectory )
if err != nil {
return err
}
if entry != nil {
entries = append ( entries , entry )
}
return nil
} )
if err != nil {
return nil , err
}
2018-03-01 20:11:34 +08:00
// container must be present if listing succeeded
f . markContainerOK ( )
2017-07-25 22:18:13 +08:00
return entries , nil
}
// listContainers returns all the containers to out
func ( f * Fs ) listContainers ( dir string ) ( entries fs . DirEntries , err error ) {
if dir != "" {
return nil , fs . ErrorListBucketRequired
}
2018-07-13 23:21:49 +08:00
err = f . listContainersToFn ( func ( container * azblob . ContainerItem ) error {
d := fs . NewDir ( container . Name , container . Properties . LastModified )
2017-07-25 22:18:13 +08:00
entries = append ( entries , d )
return nil
} )
if err != nil {
return nil , err
}
return entries , nil
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func ( f * Fs ) List ( dir string ) ( entries fs . DirEntries , err error ) {
if f . container == "" {
return f . listContainers ( dir )
}
return f . listDir ( dir )
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func ( f * Fs ) ListR ( dir string , callback fs . ListRCallback ) ( err error ) {
if f . container == "" {
return fs . ErrorListBucketRequired
}
2018-01-13 00:30:54 +08:00
list := walk . NewListRHelper ( callback )
2018-09-11 03:45:06 +08:00
err = f . list ( dir , true , f . opt . ListChunkSize , func ( remote string , object * azblob . BlobItem , isDirectory bool ) error {
2017-07-25 22:18:13 +08:00
entry , err := f . itemToDirEntry ( remote , object , isDirectory )
if err != nil {
return err
}
return list . Add ( entry )
} )
if err != nil {
return err
}
2018-03-01 20:11:34 +08:00
// container must be present if listing succeeded
f . markContainerOK ( )
2017-07-25 22:18:13 +08:00
return list . Flush ( )
}
// listContainerFn is called from listContainersToFn to handle a container
2018-07-13 23:21:49 +08:00
type listContainerFn func ( * azblob . ContainerItem ) error
2017-07-25 22:18:13 +08:00
// listContainersToFn lists the containers to the function supplied
func ( f * Fs ) listContainersToFn ( fn listContainerFn ) error {
2018-07-13 23:21:49 +08:00
params := azblob . ListContainersSegmentOptions {
2018-09-11 03:45:06 +08:00
MaxResults : int32 ( f . opt . ListChunkSize ) ,
2017-07-25 22:18:13 +08:00
}
2018-07-13 23:21:49 +08:00
ctx := context . Background ( )
for marker := ( azblob . Marker { } ) ; marker . NotDone ( ) ; {
2018-09-07 12:43:40 +08:00
var response * azblob . ListContainersSegmentResponse
2018-07-13 23:21:49 +08:00
err := f . pacer . Call ( func ( ) ( bool , error ) {
var err error
response , err = f . svcURL . ListContainersSegment ( ctx , marker , params )
return f . shouldRetry ( err )
} )
2017-07-25 22:18:13 +08:00
if err != nil {
return err
}
2018-07-13 23:21:49 +08:00
for i := range response . ContainerItems {
err = fn ( & response . ContainerItems [ i ] )
if err != nil {
return err
}
}
marker = response . NextMarker
2017-07-25 22:18:13 +08:00
}
2018-07-13 23:21:49 +08:00
2017-07-25 22:18:13 +08:00
return nil
}
// Put the object into the container
//
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func ( f * Fs ) Put ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
// Temporary Object under construction
fs := & Object {
fs : f ,
remote : src . Remote ( ) ,
}
return fs , fs . Update ( in , src , options ... )
}
// Mkdir creates the container if it doesn't exist
func ( f * Fs ) Mkdir ( dir string ) error {
f . containerOKMu . Lock ( )
defer f . containerOKMu . Unlock ( )
if f . containerOK {
return nil
}
2018-07-13 23:21:49 +08:00
2018-06-25 22:15:17 +08:00
// now try to create the container
2018-07-13 23:21:49 +08:00
err := f . pacer . Call ( func ( ) ( bool , error ) {
ctx := context . Background ( )
_ , err := f . cntURL . Create ( ctx , azblob . Metadata { } , azblob . PublicAccessNone )
2017-07-25 22:18:13 +08:00
if err != nil {
2018-07-13 23:21:49 +08:00
if storageErr , ok := err . ( azblob . StorageError ) ; ok {
switch storageErr . ServiceCode ( ) {
case azblob . ServiceCodeContainerAlreadyExists :
f . containerOK = true
return false , nil
case azblob . ServiceCodeContainerBeingDeleted :
f . containerDeleted = true
return true , err
2017-07-25 22:18:13 +08:00
}
}
}
return f . shouldRetry ( err )
} )
if err == nil {
f . containerOK = true
f . containerDeleted = false
}
return errors . Wrap ( err , "failed to make container" )
}
// isEmpty checks to see if a given directory is empty and returns an error if not
func ( f * Fs ) isEmpty ( dir string ) ( err error ) {
empty := true
2018-07-13 23:21:49 +08:00
err = f . list ( "" , true , 1 , func ( remote string , object * azblob . BlobItem , isDirectory bool ) error {
2017-07-25 22:18:13 +08:00
empty = false
return nil
} )
if err != nil {
return err
}
if ! empty {
return fs . ErrorDirectoryNotEmpty
}
return nil
}
// deleteContainer deletes the container. It can delete a full
// container so use isEmpty if you don't want that.
func ( f * Fs ) deleteContainer ( ) error {
f . containerOKMu . Lock ( )
defer f . containerOKMu . Unlock ( )
2018-07-13 23:21:49 +08:00
options := azblob . ContainerAccessConditions { }
ctx := context . Background ( )
2017-07-25 22:18:13 +08:00
err := f . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
_ , err := f . cntURL . GetProperties ( ctx , azblob . LeaseAccessConditions { } )
if err == nil {
_ , err = f . cntURL . Delete ( ctx , options )
}
2017-07-25 22:18:13 +08:00
if err != nil {
2018-07-13 23:21:49 +08:00
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
2018-07-15 20:49:15 +08:00
if storageErr , ok := err . ( azblob . StorageError ) ; ok && ( storageErr . ServiceCode ( ) == azblob . ServiceCodeContainerNotFound || storageErr . Response ( ) . StatusCode == http . StatusNotFound ) {
2018-07-13 23:21:49 +08:00
return false , fs . ErrorDirNotFound
}
2017-07-25 22:18:13 +08:00
return f . shouldRetry ( err )
}
2018-07-13 23:21:49 +08:00
2017-07-25 22:18:13 +08:00
return f . shouldRetry ( err )
} )
if err == nil {
f . containerOK = false
f . containerDeleted = true
}
return errors . Wrap ( err , "failed to delete container" )
}
// Rmdir deletes the container if the fs is at the root
//
// Returns an error if it isn't empty
func ( f * Fs ) Rmdir ( dir string ) error {
err := f . isEmpty ( dir )
if err != nil {
return err
}
if f . root != "" || dir != "" {
return nil
}
return f . deleteContainer ( )
}
// Precision of the remote
func ( f * Fs ) Precision ( ) time . Duration {
return time . Nanosecond
}
// Hashes returns the supported hash sets.
2018-01-13 00:30:54 +08:00
func ( f * Fs ) Hashes ( ) hash . Set {
2018-01-19 04:27:52 +08:00
return hash . Set ( hash . MD5 )
2017-07-25 22:18:13 +08:00
}
// Purge deletes all the files and directories including the old versions.
func ( f * Fs ) Purge ( ) error {
dir := "" // forward compat!
if f . root != "" || dir != "" {
// Delegate to caller if not root container
return fs . ErrorCantPurge
}
return f . deleteContainer ( )
}
// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func ( f * Fs ) Copy ( src fs . Object , remote string ) ( fs . Object , error ) {
err := f . Mkdir ( "" )
if err != nil {
return nil , err
}
srcObj , ok := src . ( * Object )
if ! ok {
fs . Debugf ( src , "Can't copy - not same remote type" )
return nil , fs . ErrorCantCopy
}
2018-07-13 23:21:49 +08:00
dstBlobURL := f . getBlobReference ( remote )
srcBlobURL := srcObj . getBlobReference ( )
source , err := url . Parse ( srcBlobURL . String ( ) )
if err != nil {
return nil , err
}
options := azblob . BlobAccessConditions { }
ctx := context . Background ( )
var startCopy * azblob . BlobStartCopyFromURLResponse
2017-07-25 22:18:13 +08:00
err = f . pacer . Call ( func ( ) ( bool , error ) {
2018-09-07 12:43:40 +08:00
startCopy , err = dstBlobURL . StartCopyFromURL ( ctx , * source , nil , azblob . ModifiedAccessConditions { } , options )
2017-07-25 22:18:13 +08:00
return f . shouldRetry ( err )
} )
if err != nil {
return nil , err
}
2018-07-13 23:21:49 +08:00
copyStatus := startCopy . CopyStatus ( )
for copyStatus == azblob . CopyStatusPending {
time . Sleep ( 1 * time . Second )
getMetadata , err := dstBlobURL . GetProperties ( ctx , options )
if err != nil {
return nil , err
}
copyStatus = getMetadata . CopyStatus ( )
}
2017-07-25 22:18:13 +08:00
return f . NewObject ( remote )
}
// ------------------------------------------------------------
// Fs returns the parent Fs
func ( o * Object ) Fs ( ) fs . Info {
return o . fs
}
// Return a string version
func ( o * Object ) String ( ) string {
if o == nil {
return "<nil>"
}
return o . remote
}
// Remote returns the remote path
func ( o * Object ) Remote ( ) string {
return o . remote
}
// Hash returns the MD5 of an object returning a lowercase hex string
2018-01-13 00:30:54 +08:00
func ( o * Object ) Hash ( t hash . Type ) ( string , error ) {
2018-01-19 04:27:52 +08:00
if t != hash . MD5 {
return "" , hash . ErrUnsupported
2017-07-25 22:18:13 +08:00
}
// Convert base64 encoded md5 into lower case hex
if o . md5 == "" {
return "" , nil
}
data , err := base64 . StdEncoding . DecodeString ( o . md5 )
if err != nil {
return "" , errors . Wrapf ( err , "Failed to decode Content-MD5: %q" , o . md5 )
}
return hex . EncodeToString ( data ) , nil
}
// Size returns the size of an object in bytes
func ( o * Object ) Size ( ) int64 {
return o . size
}
2018-07-13 23:21:49 +08:00
func ( o * Object ) setMetadata ( metadata azblob . Metadata ) {
if len ( metadata ) > 0 {
o . meta = metadata
if modTime , ok := metadata [ modTimeKey ] ; ok {
2017-07-25 22:18:13 +08:00
when , err := time . Parse ( timeFormatIn , modTime )
if err != nil {
fs . Debugf ( o , "Couldn't parse %v = %q: %v" , modTimeKey , modTime , err )
}
o . modTime = when
}
} else {
o . meta = nil
}
2018-07-13 23:21:49 +08:00
}
// decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
// o.meta
func ( o * Object ) decodeMetaDataFromPropertiesResponse ( info * azblob . BlobGetPropertiesResponse ) ( err error ) {
2018-09-07 12:43:40 +08:00
// NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
// this as base64 encoded string.
2018-07-13 23:21:49 +08:00
o . md5 = base64 . StdEncoding . EncodeToString ( info . ContentMD5 ( ) )
o . mimeType = info . ContentType ( )
o . size = info . ContentLength ( )
o . modTime = time . Time ( info . LastModified ( ) )
o . accessTier = azblob . AccessTierType ( info . AccessTier ( ) )
o . setMetadata ( info . NewMetadata ( ) )
return nil
}
func ( o * Object ) decodeMetaDataFromBlob ( info * azblob . BlobItem ) ( err error ) {
2018-09-07 12:43:40 +08:00
// NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
// this as base64 encoded string.
o . md5 = base64 . StdEncoding . EncodeToString ( info . Properties . ContentMD5 )
2018-07-13 23:21:49 +08:00
o . mimeType = * info . Properties . ContentType
o . size = * info . Properties . ContentLength
o . modTime = info . Properties . LastModified
o . accessTier = info . Properties . AccessTier
o . setMetadata ( info . Metadata )
2017-07-25 22:18:13 +08:00
return nil
}
// getBlobReference creates an empty blob reference with no metadata
2018-07-13 23:21:49 +08:00
func ( o * Object ) getBlobReference ( ) azblob . BlobURL {
2017-07-25 22:18:13 +08:00
return o . fs . getBlobReference ( o . remote )
}
// clearMetaData clears enough metadata so readMetaData will re-read it
func ( o * Object ) clearMetaData ( ) {
o . modTime = time . Time { }
}
// readMetaData gets the metadata if it hasn't already been fetched
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
func ( o * Object ) readMetaData ( ) ( err error ) {
if ! o . modTime . IsZero ( ) {
return nil
}
blob := o . getBlobReference ( )
// Read metadata (this includes metadata)
2018-07-13 23:21:49 +08:00
options := azblob . BlobAccessConditions { }
ctx := context . Background ( )
var blobProperties * azblob . BlobGetPropertiesResponse
2017-07-25 22:18:13 +08:00
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
blobProperties , err = blob . GetProperties ( ctx , options )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
if err != nil {
2018-07-13 23:21:49 +08:00
// On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
2018-07-15 20:49:15 +08:00
if storageErr , ok := err . ( azblob . StorageError ) ; ok && ( storageErr . ServiceCode ( ) == azblob . ServiceCodeBlobNotFound || storageErr . Response ( ) . StatusCode == http . StatusNotFound ) {
2017-07-25 22:18:13 +08:00
return fs . ErrorObjectNotFound
}
return err
}
2018-07-13 23:21:49 +08:00
return o . decodeMetaDataFromPropertiesResponse ( blobProperties )
2017-07-25 22:18:13 +08:00
}
// timeString returns modTime as the number of milliseconds
// elapsed since January 1, 1970 UTC as a decimal string.
func timeString ( modTime time . Time ) string {
return strconv . FormatInt ( modTime . UnixNano ( ) / 1E6 , 10 )
}
// parseTimeString converts a decimal string number of milliseconds
// elapsed since January 1, 1970 UTC into a time.Time and stores it in
// the modTime variable.
func ( o * Object ) parseTimeString ( timeString string ) ( err error ) {
if timeString == "" {
return nil
}
unixMilliseconds , err := strconv . ParseInt ( timeString , 10 , 64 )
if err != nil {
fs . Debugf ( o , "Failed to parse mod time string %q: %v" , timeString , err )
return err
}
o . modTime = time . Unix ( unixMilliseconds / 1E3 , ( unixMilliseconds % 1E3 ) * 1E6 ) . UTC ( )
return nil
}
// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned in the http headers
func ( o * Object ) ModTime ( ) ( result time . Time ) {
// The error is logged in readMetaData
_ = o . readMetaData ( )
return o . modTime
}
// SetModTime sets the modification time of the local fs object
func ( o * Object ) SetModTime ( modTime time . Time ) error {
2018-07-13 23:21:49 +08:00
// Make sure o.meta is not nil
if o . meta == nil {
o . meta = make ( map [ string ] string , 1 )
}
// Set modTimeKey in it
o . meta [ modTimeKey ] = modTime . Format ( timeFormatOut )
blob := o . getBlobReference ( )
ctx := context . Background ( )
2017-07-25 22:18:13 +08:00
err := o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
_ , err := blob . SetMetadata ( ctx , o . meta , azblob . BlobAccessConditions { } )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
if err != nil {
return err
}
o . modTime = modTime
return nil
}
// Storable returns if this object is storable
func ( o * Object ) Storable ( ) bool {
return true
}
// Open an object for read
func ( o * Object ) Open ( options ... fs . OpenOption ) ( in io . ReadCloser , err error ) {
2018-07-13 23:21:49 +08:00
// Offset and Count for range download
var offset int64
var count int64
2018-08-20 00:53:59 +08:00
if o . AccessTier ( ) == azblob . AccessTierArchive {
return nil , errors . Errorf ( "Blob in archive tier, you need to set tier to hot or cool first" )
}
2017-07-25 22:18:13 +08:00
for _ , option := range options {
switch x := option . ( type ) {
case * fs . RangeOption :
2018-07-13 23:21:49 +08:00
offset , count = x . Decode ( o . size )
if count < 0 {
count = o . size - offset
2017-07-25 22:18:13 +08:00
}
case * fs . SeekOption :
2018-07-13 23:21:49 +08:00
offset = x . Offset
2017-07-25 22:18:13 +08:00
default :
if option . Mandatory ( ) {
fs . Logf ( o , "Unsupported mandatory option: %v" , option )
}
}
}
blob := o . getBlobReference ( )
2018-07-13 23:21:49 +08:00
ctx := context . Background ( )
ac := azblob . BlobAccessConditions { }
var dowloadResponse * azblob . DownloadResponse
2017-07-25 22:18:13 +08:00
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
dowloadResponse , err = blob . Download ( ctx , offset , count , ac , false )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
if err != nil {
return nil , errors . Wrap ( err , "failed to open for download" )
}
2018-07-13 23:21:49 +08:00
in = dowloadResponse . Body ( azblob . RetryReaderOptions { } )
2017-07-25 22:18:13 +08:00
return in , nil
}
// dontEncode is the characters that do not need percent-encoding
//
// The characters that do not need percent-encoding are a subset of
// the printable ASCII characters: upper-case letters, lower-case
// letters, digits, ".", "_", "-", "/", "~", "!", "$", "'", "(", ")",
// "*", ";", "=", ":", and "@". All other byte values in a UTF-8 must
// be replaced with "%" and the two-digit hex value of the byte.
const dontEncode = ( ` abcdefghijklmnopqrstuvwxyz ` +
` ABCDEFGHIJKLMNOPQRSTUVWXYZ ` +
` 0123456789 ` +
` ._-/~!$'()*;=:@ ` )
// noNeedToEncode is a bitmap of characters which don't need % encoding
var noNeedToEncode [ 256 ] bool
func init ( ) {
for _ , c := range dontEncode {
noNeedToEncode [ c ] = true
}
}
2018-07-13 23:21:49 +08:00
// readSeeker joins an io.Reader and an io.Seeker
type readSeeker struct {
io . Reader
io . Seeker
}
2017-07-25 22:18:13 +08:00
// uploadMultipart uploads a file using multipart upload
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
2018-07-13 23:21:49 +08:00
func ( o * Object ) uploadMultipart ( in io . Reader , size int64 , blob * azblob . BlobURL , httpHeaders * azblob . BlobHTTPHeaders ) ( err error ) {
2017-07-25 22:18:13 +08:00
// Calculate correct chunkSize
2018-05-15 01:06:57 +08:00
chunkSize := int64 ( o . fs . opt . ChunkSize )
2017-07-25 22:18:13 +08:00
var totalParts int64
for {
// Calculate number of parts
var remainder int64
totalParts , remainder = size / chunkSize , size % chunkSize
if remainder != 0 {
totalParts ++
}
if totalParts < maxTotalParts {
break
}
// Double chunk size if the number of parts is too big
chunkSize *= 2
if chunkSize > int64 ( maxChunkSize ) {
return errors . Errorf ( "can't upload as it is too big %v - takes more than %d chunks of %v" , fs . SizeSuffix ( size ) , totalParts , fs . SizeSuffix ( chunkSize / 2 ) )
}
}
fs . Debugf ( o , "Multipart upload session started for %d parts of size %v" , totalParts , fs . SizeSuffix ( chunkSize ) )
2018-07-13 23:21:49 +08:00
// https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL
// Utilities are cloned from above example
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
blockIDBinaryToBase64 := func ( blockID [ ] byte ) string { return base64 . StdEncoding . EncodeToString ( blockID ) }
// These helper functions convert an int block ID to a base-64 string and vice versa
blockIDIntToBase64 := func ( blockID uint64 ) string {
binaryBlockID := ( & [ 8 ] byte { } ) [ : ] // All block IDs are 8 bytes long
binary . LittleEndian . PutUint64 ( binaryBlockID , blockID )
return blockIDBinaryToBase64 ( binaryBlockID )
}
2017-07-25 22:18:13 +08:00
// block ID variables
var (
rawID uint64
blockID = "" // id in base64 encoded form
2018-09-06 20:45:17 +08:00
blocks [ ] string
2017-07-25 22:18:13 +08:00
)
// increment the blockID
nextID := func ( ) {
rawID ++
2018-07-13 23:21:49 +08:00
blockID = blockIDIntToBase64 ( rawID )
blocks = append ( blocks , blockID )
2017-07-25 22:18:13 +08:00
}
2018-07-13 23:21:49 +08:00
// Get BlockBlobURL, we will use default pipeline here
blockBlobURL := blob . ToBlockBlobURL ( )
ctx := context . Background ( )
ac := azblob . LeaseAccessConditions { } // Use default lease access conditions
2018-02-02 00:35:08 +08:00
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in , wrap := accounting . UnWrap ( in )
2017-07-25 22:18:13 +08:00
// Upload the chunks
remaining := size
position := int64 ( 0 )
errs := make ( chan error , 1 )
var wg sync . WaitGroup
outer :
for part := 0 ; part < int ( totalParts ) ; part ++ {
// Check any errors
select {
case err = <- errs :
break outer
default :
}
reqSize := remaining
if reqSize >= chunkSize {
reqSize = chunkSize
}
// Make a block of memory
buf := make ( [ ] byte , reqSize )
// Read the chunk
_ , err = io . ReadFull ( in , buf )
if err != nil {
err = errors . Wrap ( err , "multipart upload failed to read source" )
break outer
}
// Transfer the chunk
nextID ( )
wg . Add ( 1 )
o . fs . uploadToken . Get ( )
go func ( part int , position int64 , blockID string ) {
defer wg . Done ( )
defer o . fs . uploadToken . Put ( )
fs . Debugf ( o , "Uploading part %d/%d offset %v/%v part size %v" , part + 1 , totalParts , fs . SizeSuffix ( position ) , fs . SizeSuffix ( size ) , fs . SizeSuffix ( chunkSize ) )
2018-09-07 12:43:40 +08:00
// Upload the block, with MD5 for check
md5sum := md5 . Sum ( buf )
transactionalMD5 := md5sum [ : ]
2017-07-25 22:18:13 +08:00
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
bufferReader := bytes . NewReader ( buf )
wrappedReader := wrap ( bufferReader )
rs := readSeeker { wrappedReader , bufferReader }
2018-09-07 12:43:40 +08:00
_ , err = blockBlobURL . StageBlock ( ctx , blockID , & rs , ac , transactionalMD5 )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
if err != nil {
err = errors . Wrap ( err , "multipart upload failed to upload part" )
select {
case errs <- err :
default :
}
return
}
} ( part , position , blockID )
// ready for next block
remaining -= chunkSize
position += chunkSize
}
wg . Wait ( )
if err == nil {
select {
case err = <- errs :
default :
}
}
if err != nil {
return err
}
// Finalise the upload session
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
_ , err := blockBlobURL . CommitBlockList ( ctx , blocks , * httpHeaders , o . meta , azblob . BlobAccessConditions { } )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
if err != nil {
return errors . Wrap ( err , "multipart upload failed to finalize" )
}
return nil
}
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func ( o * Object ) Update ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( err error ) {
err = o . fs . Mkdir ( "" )
if err != nil {
return err
}
size := src . Size ( )
2018-07-13 23:21:49 +08:00
// Update Mod time
o . updateMetadataWithModTime ( src . ModTime ( ) )
if err != nil {
return err
}
blob := o . getBlobReference ( )
httpHeaders := azblob . BlobHTTPHeaders { }
httpHeaders . ContentType = fs . MimeType ( o )
// Multipart upload doesn't support MD5 checksums at put block calls, hence calculate
// MD5 only for PutBlob requests
2018-05-15 01:06:57 +08:00
if size < int64 ( o . fs . opt . UploadCutoff ) {
2018-07-13 23:21:49 +08:00
if sourceMD5 , _ := src . Hash ( hash . MD5 ) ; sourceMD5 != "" {
sourceMD5bytes , err := hex . DecodeString ( sourceMD5 )
if err == nil {
httpHeaders . ContentMD5 = sourceMD5bytes
} else {
fs . Debugf ( o , "Failed to decode %q as MD5: %v" , sourceMD5 , err )
}
2017-07-25 22:18:13 +08:00
}
}
2018-07-13 23:21:49 +08:00
putBlobOptions := azblob . UploadStreamToBlockBlobOptions {
2018-05-15 01:06:57 +08:00
BufferSize : int ( o . fs . opt . ChunkSize ) ,
2018-07-13 23:21:49 +08:00
MaxBuffers : 4 ,
Metadata : o . meta ,
BlobHTTPHeaders : httpHeaders ,
}
ctx := context . Background ( )
2017-07-25 22:18:13 +08:00
// Don't retry, return a retry error instead
err = o . fs . pacer . CallNoRetry ( func ( ) ( bool , error ) {
2018-05-15 01:06:57 +08:00
if size >= int64 ( o . fs . opt . UploadCutoff ) {
2017-07-25 22:18:13 +08:00
// If a large file upload in chunks
2018-07-13 23:21:49 +08:00
err = o . uploadMultipart ( in , size , & blob , & httpHeaders )
2017-07-25 22:18:13 +08:00
} else {
// Write a small blob in one transaction
2018-07-13 23:21:49 +08:00
blockBlobURL := blob . ToBlockBlobURL ( )
_ , err = azblob . UploadStreamToBlockBlob ( ctx , in , blockBlobURL , putBlobOptions )
2017-07-25 22:18:13 +08:00
}
return o . fs . shouldRetry ( err )
} )
if err != nil {
return err
}
2018-08-20 00:53:59 +08:00
// Refresh metadata on object
2017-07-25 22:18:13 +08:00
o . clearMetaData ( )
2018-08-20 00:53:59 +08:00
err = o . readMetaData ( )
if err != nil {
return err
}
// If tier is not changed or not specified, do not attempt to invoke `SetBlobTier` operation
if o . fs . opt . AccessTier == string ( defaultAccessTier ) || o . fs . opt . AccessTier == string ( o . AccessTier ( ) ) {
return nil
}
// Now, set blob tier based on configured access tier
desiredAccessTier := azblob . AccessTierType ( o . fs . opt . AccessTier )
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
_ , err := blob . SetTier ( ctx , desiredAccessTier )
return o . fs . shouldRetry ( err )
} )
if err != nil {
return errors . Wrap ( err , "Failed to set Blob Tier" )
}
return nil
2017-07-25 22:18:13 +08:00
}
// Remove an object
func ( o * Object ) Remove ( ) error {
blob := o . getBlobReference ( )
2018-07-13 23:21:49 +08:00
snapShotOptions := azblob . DeleteSnapshotsOptionNone
ac := azblob . BlobAccessConditions { }
ctx := context . Background ( )
2017-07-25 22:18:13 +08:00
return o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-07-13 23:21:49 +08:00
_ , err := blob . Delete ( ctx , snapShotOptions , ac )
2017-07-25 22:18:13 +08:00
return o . fs . shouldRetry ( err )
} )
}
// MimeType of an Object if known, "" otherwise
func ( o * Object ) MimeType ( ) string {
return o . mimeType
}
2018-08-20 00:53:59 +08:00
// AccessTier of an object, default is of type none
func ( o * Object ) AccessTier ( ) azblob . AccessTierType {
return o . accessTier
}
2017-07-25 22:18:13 +08:00
// Check the interfaces are satisfied
var (
_ fs . Fs = & Fs { }
_ fs . Copier = & Fs { }
_ fs . Purger = & Fs { }
_ fs . ListRer = & Fs { }
_ fs . Object = & Object { }
_ fs . MimeTyper = & Object { }
)