rclone/backend/compress/compress.go
buengese bc4282e49e compress: added experimental compression remote - implements #2098, #1356, #675
This remote implements transparent compression using gzip. Uses JSON as a for storing metadata.

Co-authored-by: id01 <gaviniboom@gmail.com>
2020-11-13 02:31:59 +01:00

1348 lines
38 KiB
Go

// Package compress provides wrappers for Fs and Object which implement compression.
package compress
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"regexp"
"strings"
"time"
"github.com/buengese/sgzip"
"github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunkedreader"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fspath"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
)
// Globals
const (
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
maxChunkSize = 8388608 // at 256KB and 8 MB.
bufferSize = 8388608
heuristicBytes = 1048576
minCompressionRatio = 1.1
gzFileExt = ".gz"
metaFileExt = ".json"
uncompressedFileExt = ".bin"
)
// Compression modes
const (
Uncompressed = 0
Gzip = 2
)
var nameRegexp = regexp.MustCompile("^(.+?)\\.([A-Za-z0-9+_]{11})$")
// Register with Fs
func init() {
// Build compression mode options.
compressionModeOptions := []fs.OptionExample{
{ // Default compression mode options {
Value: "gzip",
Help: "Standard gzip compression with fastest parameters.",
},
}
// Register our remote
fs.Register(&fs.RegInfo{
Name: "compress",
Description: "Compress a remote",
NewFs: NewFs,
Options: []fs.Option{{
Name: "remote",
Help: "Remote to compress.",
Required: true,
}, {
Name: "mode",
Help: "Compression mode.",
Default: "gzip",
Examples: compressionModeOptions,
}, {
Name: "level",
Help: `GZIP compression level (-2 to 9).
Generally -1 (default, equivalent to 5) is recommended.
Levels 1 to 9 increase compressiong at the cost of speed.. Going past 6
generally offers very little return.
Level -2 uses Huffmann encoding only. Only use if you now what you
are doing
Level 0 turns off compression.`,
Default: sgzip.DefaultCompression,
Advanced: true,
}},
})
}
// Options defines the configuration for this backend
type Options struct {
Remote string `config:"remote"`
CompressionMode string `config:"mode"`
CompressionLevel int `config:"level"`
}
/*** FILESYSTEM FUNCTIONS ***/
// Fs represents a wrapped fs.Fs
type Fs struct {
fs.Fs
wrapper fs.Fs
name string
root string
opt Options
mode int // compression mode id
features *fs.Features // optional features
}
// NewFs contstructs an Fs from the path, container:path
func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct
opt := new(Options)
err := configstruct.Set(m, opt)
if err != nil {
return nil, err
}
remote := opt.Remote
if strings.HasPrefix(remote, name+":") {
return nil, errors.New("can't point press remote at itself - check the value of the remote setting")
}
wInfo, wName, wPath, wConfig, err := fs.ConfigFs(remote)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote)
}
// Strip trailing slashes if they exist in rpath
rpath = strings.TrimRight(rpath, "\\/")
// First, check for a file
// If a metadata file was found, return an error. Otherwise, check for a directory
remotePath := fspath.JoinRootPath(wPath, makeMetadataName(rpath))
wrappedFs, err := wInfo.NewFs(ctx, wName, remotePath, wConfig)
if err != fs.ErrorIsFile {
remotePath = fspath.JoinRootPath(wPath, rpath)
wrappedFs, err = wInfo.NewFs(ctx, wName, remotePath, wConfig)
}
if err != nil && err != fs.ErrorIsFile {
return nil, errors.Wrapf(err, "failed to make remote %s:%q to wrap", wName, remotePath)
}
// Create the wrapping fs
f := &Fs{
Fs: wrappedFs,
name: name,
root: rpath,
opt: *opt,
mode: compressionModeFromName(opt.CompressionMode),
}
// the features here are ones we could support, and they are
// ANDed with the ones from wrappedFs
f.features = (&fs.Features{
CaseInsensitive: true,
DuplicateFiles: false,
ReadMimeType: false,
WriteMimeType: false,
GetTier: true,
SetTier: true,
BucketBased: true,
CanHaveEmptyDirectories: true,
}).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs)
// We support reading MIME types no matter the wrapped fs
f.features.ReadMimeType = true
// We can only support putstream if we have serverside copy or move
if !operations.CanServerSideMove(wrappedFs) {
f.features.Disable("PutStream")
}
return f, err
}
func compressionModeFromName(name string) int {
switch name {
case "gzip":
return Gzip
default:
return Uncompressed
}
}
// Converts an int64 to base64
func int64ToBase64(number int64) string {
intBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(intBytes, uint64(number))
return base64.RawURLEncoding.EncodeToString(intBytes)
}
// Converts base64 to int64
func base64ToInt64(str string) (int64, error) {
intBytes, err := base64.RawURLEncoding.DecodeString(str)
if err != nil {
return 0, err
}
return int64(binary.LittleEndian.Uint64(intBytes)), nil
}
// Processes a file name for a compressed file. Returns the original file name, the extension, and the size of the original file.
// Returns -2 for the original size if the file is uncompressed.
func processFileName(compressedFileName string) (origFileName string, extension string, origSize int64, err error) {
// Separate the filename and size from the extension
extensionPos := strings.LastIndex(compressedFileName, ".")
if extensionPos == -1 {
return "", "", 0, errors.New("File name has no extension")
}
extension = compressedFileName[extensionPos:]
nameWithSize := compressedFileName[:extensionPos]
if extension == uncompressedFileExt {
return nameWithSize, extension, -2, nil
}
match := nameRegexp.FindStringSubmatch(nameWithSize)
if match == nil || len(match) != 3 {
return "", "", 0, errors.New("Invalid filename")
}
size, err := base64ToInt64(match[2])
if err != nil {
return "", "", 0, errors.New("Could not decode size")
}
return match[1], gzFileExt, size, nil
}
// Generates the file name for a metadata file
func makeMetadataName(remote string) (newRemote string) {
return remote + metaFileExt
}
// Checks whether a file is a metadata file
func isMetadataFile(filename string) bool {
return strings.HasSuffix(filename, metaFileExt)
}
// makeDataName generates the file name for a data file with specified compression mode
func makeDataName(remote string, size int64, mode int) (newRemote string) {
if mode != Uncompressed {
newRemote = remote + "." + int64ToBase64(size) + gzFileExt
} else {
newRemote = remote + uncompressedFileExt
}
return newRemote
}
// dataName generates the file name for data file
func (f *Fs) dataName(remote string, size int64, compressed bool) (name string) {
if !compressed {
return makeDataName(remote, size, Uncompressed)
}
return makeDataName(remote, size, f.mode)
}
// addData parses an object and adds it to the DirEntries
func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) {
origFileName, _, size, err := processFileName(o.Remote())
if err != nil {
fs.Errorf(o, "Error on parsing file name: %v", err)
return
}
if size == -2 { // File is uncompressed
size = o.Size()
}
metaName := makeMetadataName(origFileName)
*entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size))
}
// addDir adds a dir to the dir entries
func (f *Fs) addDir(entries *fs.DirEntries, dir fs.Directory) {
*entries = append(*entries, f.newDir(dir))
}
// newDir returns a dir
func (f *Fs) newDir(dir fs.Directory) fs.Directory {
return dir // We're using the same dir
}
// processEntries parses the file names and adds metadata to the dir entries
func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, err error) {
newEntries = entries[:0] // in place filter
for _, entry := range entries {
switch x := entry.(type) {
case fs.Object:
if !isMetadataFile(x.Remote()) {
f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant.
}
case fs.Directory:
f.addDir(&newEntries, x)
default:
return nil, errors.Errorf("Unknown object type %T", entry)
}
}
return newEntries, nil
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
// List entries and process them
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
entries, err = f.Fs.List(ctx, dir)
if err != nil {
return nil, err
}
return f.processEntries(entries)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
return f.Fs.Features().ListR(ctx, dir, func(entries fs.DirEntries) error {
newEntries, err := f.processEntries(entries)
if err != nil {
return err
}
return callback(newEntries)
})
}
// NewObject finds the Object at remote.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// Read metadata from metadata object
mo, err := f.Fs.NewObject(ctx, makeMetadataName(remote))
if err != nil {
return nil, err
}
meta := readMetadata(ctx, mo)
if meta == nil {
return nil, errors.New("error decoding metadata")
}
// Create our Object
o, err := f.Fs.NewObject(ctx, makeDataName(remote, meta.CompressionMetadata.Size, meta.Mode))
return f.newObject(o, mo, meta), err
}
// checkCompressAndType checks if an object is compressible and determines it's mime type
// returns a multireader with the bytes that were read to determine mime type
func checkCompressAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) {
in, wrap := accounting.UnWrap(in)
buf := make([]byte, heuristicBytes)
n, err := in.Read(buf)
buf = buf[:n]
if err != nil && err != io.EOF {
return nil, false, "", err
}
mime := mimetype.Detect(buf)
compressible, err = isCompressible(bytes.NewReader(buf))
if err != nil {
return nil, false, "", err
}
in = io.MultiReader(bytes.NewReader(buf), in)
return wrap(in), compressible, mime.String(), nil
}
// isCompressible checks the compression ratio of the provided data and returns true if the ratio exceeds
// the configured threshold
func isCompressible(r io.Reader) (bool, error) {
var b bytes.Buffer
w, err := sgzip.NewWriterLevel(&b, sgzip.DefaultCompression)
if err != nil {
return false, err
}
n, err := io.Copy(w, r)
if err != nil {
return false, err
}
ratio := float64(n) / float64(b.Len())
return ratio > minCompressionRatio, nil
}
// verifyObjectHash verifies the Objects hash
func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) error {
srcHash := hasher.Sums()[ht]
dstHash, err := o.Hash(ctx, ht)
if err != nil {
return errors.Wrap(err, "failed to read destination hash")
}
if srcHash != "" && dstHash != "" && srcHash != dstHash {
// remove object
err = o.Remove(ctx)
if err != nil {
fs.Errorf(o, "Failed to remove corrupted object: %v", err)
}
return errors.Errorf("corrupted on transfer: %v compressed hashes differ %q vs %q", ht, srcHash, dstHash)
}
return nil
}
type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
type compressionResult struct {
err error
meta sgzip.GzipMetadata
}
// Put a compressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap reader accounting
in, wrap := accounting.UnWrap(in)
// Add the metadata hasher
metaHasher := md5.New()
in = io.TeeReader(in, metaHasher)
// Compress the file
pipeReader, pipeWriter := io.Pipe()
results := make(chan compressionResult)
go func() {
gz, err := sgzip.NewWriterLevel(pipeWriter, f.opt.CompressionLevel)
if err != nil {
results <- compressionResult{err: err, meta: sgzip.GzipMetadata{}}
return
}
_, err = io.Copy(gz, in)
gzErr := gz.Close()
if gzErr != nil {
fs.Errorf(nil, "Failed to close compress: %v", gzErr)
if err == nil {
err = gzErr
}
}
closeErr := pipeWriter.Close()
if closeErr != nil {
fs.Errorf(nil, "Failed to close pipe: %v", closeErr)
if err == nil {
err = closeErr
}
}
results <- compressionResult{err: err, meta: gz.MetaData()}
}()
wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering
// Find a hash the destination supports to compute a hash of
// the compressed data.
ht := f.Fs.Hashes().GetOne()
var hasher *hash.MultiHasher
var err error
if ht != hash.None {
// unwrap the accounting again
wrappedIn, wrap = accounting.UnWrap(wrappedIn)
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht))
if err != nil {
return nil, nil, err
}
// add the hasher and re-wrap the accounting
wrappedIn = io.TeeReader(wrappedIn, hasher)
wrappedIn = wrap(wrappedIn)
}
// Transfer the data
o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx))
if err != nil {
if o != nil {
removeErr := o.Remove(ctx)
if removeErr != nil {
fs.Errorf(o, "Failed to remove partially transferred object: %v", err)
}
}
return nil, nil, err
}
// Check whether we got an error during compression
result := <-results
err = result.err
if err != nil {
if o != nil {
removeErr := o.Remove(ctx)
if removeErr != nil {
fs.Errorf(o, "Failed to remove partially compressed object: %v", err)
}
}
return nil, nil, err
}
// Generate metadata
meta := newMetadata(result.meta.Size, f.mode, result.meta, hex.EncodeToString(metaHasher.Sum(nil)), mimeType)
// Check the hashes of the compressed data if we were comparing them
if ht != hash.None && hasher != nil {
err = f.verifyObjectHash(ctx, o, hasher, ht)
if err != nil {
return nil, nil, err
}
}
return o, meta, nil
}
// Put an uncompressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap the accounting, add our metadata hasher, then wrap it back on
in, wrap := accounting.UnWrap(in)
hs := hash.NewHashSet(hash.MD5)
ht := f.Fs.Hashes().GetOne()
if !hs.Contains(ht) {
hs.Add(ht)
}
metaHasher, err := hash.NewMultiHasherTypes(hs)
if err != nil {
return nil, nil, err
}
in = io.TeeReader(in, metaHasher)
wrappedIn := wrap(in)
// Put the object
o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), Uncompressed), src.Size()), options...)
if err != nil {
if o != nil {
removeErr := o.Remove(ctx)
if removeErr != nil {
fs.Errorf(o, "Failed to remove partially transferred object: %v", err)
}
}
return nil, nil, err
}
// Check the hashes of the compressed data if we were comparing them
if ht != hash.None {
err := f.verifyObjectHash(ctx, o, metaHasher, ht)
if err != nil {
return nil, nil, err
}
}
// Return our object and metadata
sum, err := metaHasher.Sum(hash.MD5)
if err != nil {
return nil, nil, err
}
return o, newMetadata(o.Size(), Uncompressed, sgzip.GzipMetadata{}, hex.EncodeToString(sum), mimeType), nil
}
// This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object.
func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (mo fs.Object, err error) {
// Generate the metadata contents
data, err := json.Marshal(meta)
if err != nil {
return nil, err
}
metaReader := bytes.NewReader(data)
// Put the data
mo, err = put(ctx, metaReader, f.wrapInfo(src, makeMetadataName(src.Remote()), int64(len(data))), options...)
if err != nil {
removeErr := mo.Remove(ctx)
if removeErr != nil {
fs.Errorf(mo, "Failed to remove partially transferred object: %v", err)
}
return nil, err
}
return mo, nil
}
// This function will put both the data and metadata for an Object.
// putData is the function used for data, while putMeta is the function used for metadata.
func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption,
putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) {
// Put file then metadata
var dataObject fs.Object
var meta *ObjectMetadata
var err error
if compressible {
dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType)
} else {
dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType)
}
if err != nil {
return nil, err
}
mo, err := f.putMetadata(ctx, meta, src, options, putMeta)
// meta data upload may fail. in this case we try to remove the original object
if err != nil {
removeError := dataObject.Remove(ctx)
if removeError != nil {
return nil, removeError
}
return nil, err
}
return f.newObject(dataObject, mo, meta), err
}
// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// If there's already an existent objects we need to make sure to explicitly update it to make sure we don't leave
// orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it
// destroys all server-side versioning.
o, err := f.NewObject(ctx, src.Remote())
if err == fs.ErrorObjectNotFound {
// Get our file compressibility
in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil {
return nil, err
}
return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType)
}
if err != nil {
return nil, err
}
return o, o.Update(ctx, in, src, options...)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
oldObj, err := f.NewObject(ctx, src.Remote())
if err != nil && err != fs.ErrorObjectNotFound {
return nil, err
}
found := err == nil
in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil {
return nil, err
}
newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType)
if err != nil {
return nil, err
}
// Our transfer is now complete. We have to make sure to remove the old object because our new object will
// have a different name except when both the old and the new object where uncompressed.
if found && (oldObj.(*Object).meta.Mode != Uncompressed || compressible) {
err = oldObj.(*Object).Object.Remove(ctx)
if err != nil {
return nil, errors.Wrap(err, "Could remove original object")
}
}
// If our new object is compressed we have to rename it with the correct size.
// Uncompressed objects don't store the size in the name so we they'll allready have the correct name.
if compressible {
wrapObj, err := operations.Move(ctx, f.Fs, nil, f.dataName(src.Remote(), newObj.size, compressible), newObj.Object)
if err != nil {
return nil, errors.Wrap(err, "Couldn't rename streamed Object.")
}
newObj.Object = wrapObj
}
return newObj, nil
}
// Temporarely disabled. There might be a way to implement this correctly but with the current handling metadata duplicate objects
// will break stuff. Right no I can't think of a way to make this work.
// PutUnchecked uploads the object
//
// This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that.
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5)
}
// Mkdir makes the directory (container, bucket)
//
// Shouldn't return an error if it already exists
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return f.Fs.Mkdir(ctx, dir)
}
// Rmdir removes the directory (container, bucket) if empty
//
// Return an error if it doesn't exist or isn't empty
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
return f.Fs.Rmdir(ctx, dir)
}
// Purge all files in the root and the root directory
//
// Implement this if you have a way of deleting all the files
// quicker than just running Remove() on the result of List()
//
// Return an error if it doesn't exist
func (f *Fs) Purge(ctx context.Context, dir string) error {
do := f.Fs.Features().Purge
if do == nil {
return fs.ErrorCantPurge
}
return do(ctx, dir)
}
// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
do := f.Fs.Features().Copy
if do == nil {
return nil, fs.ErrorCantCopy
}
o, ok := src.(*Object)
if !ok {
return nil, fs.ErrorCantCopy
}
// We might be trying to overwrite a file with a newer version but due to size difference the name
// is different. Therefore we have to remove the old file first (if it exists).
dstFile, err := f.NewObject(ctx, remote)
if err != nil && err != fs.ErrorObjectNotFound {
return nil, err
}
if err == nil {
err := dstFile.Remove(ctx)
if err != nil {
return nil, err
}
}
// Copy over metadata
err = o.loadMetadataIfNotLoaded(ctx)
if err != nil {
return nil, err
}
newFilename := makeMetadataName(remote)
moResult, err := do(ctx, o.mo, newFilename)
if err != nil {
return nil, err
}
// Copy over data
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
oResult, err := do(ctx, o.Object, newFilename)
if err != nil {
return nil, err
}
return f.newObject(oResult, moResult, o.meta), nil
}
// Move src to this remote using server side move operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
do := f.Fs.Features().Move
if do == nil {
return nil, fs.ErrorCantMove
}
o, ok := src.(*Object)
if !ok {
return nil, fs.ErrorCantMove
}
// We might be trying to overwrite a file with a newer version but due to size difference the name
// is different. Therefore we have to remove the old file first (if it exists).
dstFile, err := f.NewObject(ctx, remote)
if err != nil && err != fs.ErrorObjectNotFound {
return nil, err
}
if err == nil {
err := dstFile.Remove(ctx)
if err != nil {
return nil, err
}
}
// Move metadata
err = o.loadMetadataIfNotLoaded(ctx)
if err != nil {
return nil, err
}
newFilename := makeMetadataName(remote)
moResult, err := do(ctx, o.mo, newFilename)
if err != nil {
return nil, err
}
// Move data
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
oResult, err := do(ctx, o.Object, newFilename)
if err != nil {
return nil, err
}
return f.newObject(oResult, moResult, o.meta), nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
do := f.Fs.Features().DirMove
if do == nil {
return fs.ErrorCantDirMove
}
srcFs, ok := src.(*Fs)
if !ok {
fs.Debugf(srcFs, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
return do(ctx, srcFs.Fs, srcRemote, dstRemote)
}
// CleanUp the trash in the Fs
//
// Implement this if you have a way of emptying the trash or
// otherwise cleaning up old versions of files.
func (f *Fs) CleanUp(ctx context.Context) error {
do := f.Fs.Features().CleanUp
if do == nil {
return errors.New("can't CleanUp")
}
return do(ctx)
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
do := f.Fs.Features().About
if do == nil {
return nil, errors.New("About not supported")
}
return do(ctx)
}
// UnWrap returns the Fs that this Fs is wrapping
func (f *Fs) UnWrap() fs.Fs {
return f.Fs
}
// WrapFs returns the Fs that is wrapping this Fs
func (f *Fs) WrapFs() fs.Fs {
return f.wrapper
}
// SetWrapper sets the Fs that is wrapping this Fs
func (f *Fs) SetWrapper(wrapper fs.Fs) {
f.wrapper = wrapper
}
// MergeDirs merges the contents of all the directories passed
// in into the first one and rmdirs the other directories.
func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
do := f.Fs.Features().MergeDirs
if do == nil {
return errors.New("MergeDirs not supported")
}
out := make([]fs.Directory, len(dirs))
for i, dir := range dirs {
out[i] = fs.NewDirCopy(ctx, dir).SetRemote(dir.Remote())
}
return do(ctx, out)
}
// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
do := f.Fs.Features().DirCacheFlush
if do != nil {
do()
}
}
// ChangeNotify calls the passed function with a path
// that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
do := f.Fs.Features().ChangeNotify
if do == nil {
return
}
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
fs.Logf(f, "path %q entryType %d", path, entryType)
var (
wrappedPath string
)
switch entryType {
case fs.EntryDirectory:
wrappedPath = path
case fs.EntryObject:
// Note: All we really need to do to monitor the object is to check whether the metadata changed,
// as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same.
wrappedPath = makeMetadataName(path)
default:
fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType)
return
}
notifyFunc(wrappedPath, entryType)
}
do(ctx, wrappedNotifyFunc, pollIntervalChan)
}
// PublicLink generates a public link to the remote path (usually readable by anyone)
func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) {
do := f.Fs.Features().PublicLink
if do == nil {
return "", errors.New("PublicLink not supported")
}
o, err := f.NewObject(ctx, remote)
if err != nil {
// assume it is a directory
return do(ctx, remote, duration, unlink)
}
return do(ctx, o.(*Object).Object.Remote(), duration, unlink)
}
/*** OBJECT FUNCTIONS ***/
// ObjectMetadata describes the metadata for an Object.
type ObjectMetadata struct {
Mode int // Compression mode of the file.
Size int64 // Size of the object.
MD5 string // MD5 hash of the file.
MimeType string // Mime type of the file
CompressionMetadata sgzip.GzipMetadata
}
// Object with external metadata
type Object struct {
fs.Object // Wraps around data object for this object
f *Fs // Filesystem object is in
mo fs.Object // Metadata object for this object
moName string // Metadata file name for this object
size int64 // Size of this object
meta *ObjectMetadata // Metadata struct for this object (nil if not loaded)
}
// This function generates a metadata object
func newMetadata(size int64, mode int, cmeta sgzip.GzipMetadata, md5 string, mimeType string) *ObjectMetadata {
meta := new(ObjectMetadata)
meta.Size = size
meta.Mode = mode
meta.CompressionMetadata = cmeta
meta.MD5 = md5
meta.MimeType = mimeType
return meta
}
// This function will read the metadata from a metadata object.
func readMetadata(ctx context.Context, mo fs.Object) (meta *ObjectMetadata) {
// Open our meradata object
rc, err := mo.Open(ctx)
if err != nil {
return nil
}
defer func() {
err := rc.Close()
if err != nil {
fs.Errorf(mo, "Error closing object: %v", err)
}
}()
jr := json.NewDecoder(rc)
meta = new(ObjectMetadata)
if err = jr.Decode(meta); err != nil {
return nil
}
return meta
}
// Remove removes this object
func (o *Object) Remove(ctx context.Context) error {
err := o.loadMetadataObjectIfNotLoaded(ctx)
if err != nil {
return err
}
err = o.mo.Remove(ctx)
objErr := o.Object.Remove(ctx)
if err != nil {
return err
}
return objErr
}
// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser
type ReadCloserWrapper struct {
io.Reader
io.Closer
}
// Update in to the object with the modTime given of the given size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
err = o.loadMetadataIfNotLoaded(ctx) // Loads metadata object too
if err != nil {
return err
}
// Function that updates metadata object
updateMeta := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return o.mo, o.mo.Update(ctx, in, src, options...)
}
in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil {
return err
}
// Since we are storing the filesize in the name the new object may have different name than the old
// We'll make sure to delete the old object in this case
var newObject *Object
origName := o.Remote()
if o.meta.Mode != Uncompressed || compressible {
newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType)
if newObject.Object.Remote() != o.Object.Remote() {
if removeErr := o.Object.Remove(ctx); removeErr != nil {
return removeErr
}
}
} else {
// Function that updates object
update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return o.Object, o.Object.Update(ctx, in, src, options...)
}
// If we are, just update the object and metadata
newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType)
}
if err != nil {
return err
}
// Update object metadata and return
o.Object = newObject.Object
o.meta = newObject.meta
o.size = newObject.size
return nil
}
// This will initialize the variables of a new press Object. The metadata object, mo, and metadata struct, meta, must be specified.
func (f *Fs) newObject(o fs.Object, mo fs.Object, meta *ObjectMetadata) *Object {
return &Object{
Object: o,
f: f,
mo: mo,
moName: mo.Remote(),
size: meta.Size,
meta: meta,
}
}
// This initializes the variables of a press Object with only the size. The metadata will be loaded later on demand.
func (f *Fs) newObjectSizeAndNameOnly(o fs.Object, moName string, size int64) *Object {
return &Object{
Object: o,
f: f,
mo: nil,
moName: moName,
size: size,
meta: nil,
}
}
// This loads the metadata of a press Object if it's not loaded yet
func (o *Object) loadMetadataIfNotLoaded(ctx context.Context) (err error) {
err = o.loadMetadataObjectIfNotLoaded(ctx)
if err != nil {
return err
}
if o.meta == nil {
o.meta = readMetadata(ctx, o.mo)
}
return err
}
// This loads the metadata object of a press Object if it's not loaded yet
func (o *Object) loadMetadataObjectIfNotLoaded(ctx context.Context) (err error) {
if o.mo == nil {
o.mo, err = o.f.Fs.NewObject(ctx, o.moName)
}
return err
}
// Fs returns read only access to the Fs that this object is part of
func (o *Object) Fs() fs.Info {
return o.f
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.Remote()
}
// Remote returns the remote path
func (o *Object) Remote() string {
origFileName, _, _, err := processFileName(o.Object.Remote())
if err != nil {
fs.Errorf(o, "Could not get remote path for: %s", o.Object.Remote())
return o.Object.Remote()
}
return origFileName
}
// Size returns the size of the file
func (o *Object) Size() int64 {
if o.meta == nil {
return o.size
}
return o.meta.Size
}
// MimeType returns the MIME type of the file
func (o *Object) MimeType(ctx context.Context) string {
err := o.loadMetadataIfNotLoaded(ctx)
if err != nil {
return "error/error"
}
return o.meta.MimeType
}
// Hash returns the selected checksum of the file
// If no checksum is available it returns ""
func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) {
if ht != hash.MD5 {
return "", hash.ErrUnsupported
}
err := o.loadMetadataIfNotLoaded(ctx)
if err != nil {
return "", err
}
return o.meta.MD5, nil
}
// SetTier performs changing storage tier of the Object if
// multiple storage classes supported
func (o *Object) SetTier(tier string) error {
do, ok := o.Object.(fs.SetTierer)
if !ok {
return errors.New("press: underlying remote does not support SetTier")
}
return do.SetTier(tier)
}
// GetTier returns storage tier or class of the Object
func (o *Object) GetTier() string {
do, ok := o.Object.(fs.GetTierer)
if !ok {
return ""
}
return do.GetTier()
}
// UnWrap returns the wrapped Object
func (o *Object) UnWrap() fs.Object {
return o.Object
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser. Note that this call requires quite a bit of overhead.
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
err = o.loadMetadataIfNotLoaded(ctx)
if err != nil {
return nil, err
}
// If we're uncompressed, just pass this to the underlying object
if o.meta.Mode == Uncompressed {
return o.Object.Open(ctx, options...)
}
// Get offset and limit from OpenOptions, pass the rest to the underlying remote
var openOptions []fs.OpenOption = []fs.OpenOption{&fs.SeekOption{Offset: 0}}
var offset, limit int64 = 0, -1
for _, option := range options {
switch x := option.(type) {
case *fs.SeekOption:
offset = x.Offset
case *fs.RangeOption:
offset, limit = x.Decode(o.Size())
default:
openOptions = append(openOptions, option)
}
}
// Get a chunkedreader for the wrapped object
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize)
// Get file handle
var file io.Reader
if offset != 0 {
file, err = sgzip.NewReaderAt(chunkedReader, &o.meta.CompressionMetadata, offset)
} else {
file, err = sgzip.NewReader(chunkedReader)
}
if err != nil {
return nil, err
}
var fileReader io.Reader
if limit != -1 {
fileReader = io.LimitReader(file, limit)
} else {
fileReader = file
}
// Return a ReadCloser
return ReadCloserWrapper{Reader: fileReader, Closer: chunkedReader}, nil
}
// ObjectInfo describes a wrapped fs.ObjectInfo for being the source
type ObjectInfo struct {
src fs.ObjectInfo
fs *Fs
remote string
size int64
}
func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, size int64) *ObjectInfo {
return &ObjectInfo{
src: src,
fs: f,
remote: newRemote,
size: size,
}
}
// Fs returns read only access to the Fs that this object is part of
func (o *ObjectInfo) Fs() fs.Info {
if o.fs == nil {
panic("stub ObjectInfo")
}
return o.fs
}
// String returns string representation
func (o *ObjectInfo) String() string {
return o.src.String()
}
// Storable returns whether object is storable
func (o *ObjectInfo) Storable() bool {
return o.src.Storable()
}
// Remote returns the remote path
func (o *ObjectInfo) Remote() string {
if o.remote != "" {
return o.remote
}
return o.src.Remote()
}
// Size returns the size of the file
func (o *ObjectInfo) Size() int64 {
if o.size != -1 {
return o.size
}
return o.src.Size()
}
// ModTime returns the modification time
func (o *ObjectInfo) ModTime(ctx context.Context) time.Time {
return o.src.ModTime(ctx)
}
// Hash returns the selected checksum of the file
// If no checksum is available it returns ""
func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) {
if ht != hash.MD5 {
return "", hash.ErrUnsupported
}
value, err := o.src.Hash(ctx, ht)
if err == hash.ErrUnsupported {
return "", hash.ErrUnsupported
}
return value, err
}
// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
do, ok := o.Object.(fs.IDer)
if !ok {
return ""
}
return do.ID()
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// Return a string version
func (f *Fs) String() string {
return fmt.Sprintf("Compressed: %s:%s", f.name, f.root)
}
// Precision returns the precision of this Fs
func (f *Fs) Precision() time.Duration {
return f.Fs.Precision()
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
_ fs.Wrapper = (*Fs)(nil)
_ fs.MergeDirser = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.PublicLinker = (*Fs)(nil)
_ fs.ObjectInfo = (*ObjectInfo)(nil)
_ fs.GetTierer = (*Object)(nil)
_ fs.SetTierer = (*Object)(nil)
_ fs.Object = (*Object)(nil)
_ fs.ObjectUnWrapper = (*Object)(nil)
_ fs.IDer = (*Object)(nil)
_ fs.MimeTyper = (*Object)(nil)
)