// Package memory provides an interface to an in memory object storage system
package memory

import (
	"bytes"
	"context"
	"crypto/md5"
	"encoding/hex"
	"fmt"
	"io"
	"io/ioutil"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/pkg/errors"
	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/config/configmap"
	"github.com/rclone/rclone/fs/config/configstruct"
	"github.com/rclone/rclone/fs/hash"
	"github.com/rclone/rclone/fs/walk"
	"github.com/rclone/rclone/lib/bucket"
)

var (
	hashType = hash.MD5
	// the object storage is persistent
	buckets = newBucketsInfo()
)

// Register with Fs
func init() {
	fs.Register(&fs.RegInfo{
		Name:        "memory",
		Description: "In memory object storage system.",
		NewFs:       NewFs,
		Options:     []fs.Option{},
	})
}

// Options defines the configuration for this backend
type Options struct {
}

// Fs represents a remote memory server
type Fs struct {
	name          string       // name of this remote
	root          string       // the path we are working on if any
	opt           Options      // parsed config options
	rootBucket    string       // bucket part of root (if any)
	rootDirectory string       // directory part of root (if any)
	features      *fs.Features // optional features
}

// bucketsInfo holds info about all the buckets
type bucketsInfo struct {
	mu      sync.RWMutex
	buckets map[string]*bucketInfo
}

func newBucketsInfo() *bucketsInfo {
	return &bucketsInfo{
		buckets: make(map[string]*bucketInfo, 16),
	}
}

// getBucket gets a names bucket or nil
func (bi *bucketsInfo) getBucket(name string) (b *bucketInfo) {
	bi.mu.RLock()
	b = bi.buckets[name]
	bi.mu.RUnlock()
	return b
}

// makeBucket returns the bucket or makes it
func (bi *bucketsInfo) makeBucket(name string) (b *bucketInfo) {
	bi.mu.Lock()
	defer bi.mu.Unlock()
	b = bi.buckets[name]
	if b != nil {
		return b
	}
	b = newBucketInfo()
	bi.buckets[name] = b
	return b
}

// deleteBucket deleted the bucket or returns an error
func (bi *bucketsInfo) deleteBucket(name string) error {
	bi.mu.Lock()
	defer bi.mu.Unlock()
	b := bi.buckets[name]
	if b == nil {
		return fs.ErrorDirNotFound
	}
	if !b.isEmpty() {
		return fs.ErrorDirectoryNotEmpty
	}
	delete(bi.buckets, name)
	return nil
}

// getObjectData gets an object from (bucketName, bucketPath) or nil
func (bi *bucketsInfo) getObjectData(bucketName, bucketPath string) (od *objectData) {
	b := bi.getBucket(bucketName)
	if b == nil {
		return nil
	}
	return b.getObjectData(bucketPath)
}

// updateObjectData updates an object from (bucketName, bucketPath)
func (bi *bucketsInfo) updateObjectData(bucketName, bucketPath string, od *objectData) {
	b := bi.makeBucket(bucketName)
	b.mu.Lock()
	b.objects[bucketPath] = od
	b.mu.Unlock()
}

// removeObjectData removes an object from (bucketName, bucketPath) returning true if removed
func (bi *bucketsInfo) removeObjectData(bucketName, bucketPath string) (removed bool) {
	b := bi.getBucket(bucketName)
	if b != nil {
		b.mu.Lock()
		od := b.objects[bucketPath]
		if od != nil {
			delete(b.objects, bucketPath)
			removed = true
		}
		b.mu.Unlock()
	}
	return removed
}

// bucketInfo holds info about a single bucket
type bucketInfo struct {
	mu      sync.RWMutex
	objects map[string]*objectData
}

func newBucketInfo() *bucketInfo {
	return &bucketInfo{
		objects: make(map[string]*objectData, 16),
	}
}

// getBucket gets a names bucket or nil
func (bi *bucketInfo) getObjectData(name string) (od *objectData) {
	bi.mu.RLock()
	od = bi.objects[name]
	bi.mu.RUnlock()
	return od
}

// getBucket gets a names bucket or nil
func (bi *bucketInfo) isEmpty() (empty bool) {
	bi.mu.RLock()
	empty = len(bi.objects) == 0
	bi.mu.RUnlock()
	return empty
}

// the object data and metadata
type objectData struct {
	modTime  time.Time
	hash     string
	mimeType string
	data     []byte
}

// Object describes a memory object
type Object struct {
	fs     *Fs         // what this object is part of
	remote string      // The remote path
	od     *objectData // the object data
}

// ------------------------------------------------------------

// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
	return f.name
}

// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
	return f.root
}

// String converts this Fs to a string
func (f *Fs) String() string {
	return fmt.Sprintf("Memory root '%s'", f.root)
}

// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
	return f.features
}

// parsePath parses a remote 'url'
func parsePath(path string) (root string) {
	root = strings.Trim(path, "/")
	return
}

// split returns bucket and bucketPath from the rootRelativePath
// relative to f.root
func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
	return bucket.Split(path.Join(f.root, rootRelativePath))
}

// split returns bucket and bucketPath from the object
func (o *Object) split() (bucket, bucketPath string) {
	return o.fs.split(o.remote)
}

// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
	f.root = parsePath(root)
	f.rootBucket, f.rootDirectory = bucket.Split(f.root)
}

// NewFs contstructs an Fs from the path, bucket:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
	// Parse config into Options struct
	opt := new(Options)
	err := configstruct.Set(m, opt)
	if err != nil {
		return nil, err
	}
	root = strings.Trim(root, "/")
	f := &Fs{
		name: name,
		root: root,
		opt:  *opt,
	}
	f.setRoot(root)
	f.features = (&fs.Features{
		ReadMimeType:      true,
		WriteMimeType:     true,
		BucketBased:       true,
		BucketBasedRootOK: true,
	}).Fill(f)
	if f.rootBucket != "" && f.rootDirectory != "" {
		od := buckets.getObjectData(f.rootBucket, f.rootDirectory)
		if od != nil {
			newRoot := path.Dir(f.root)
			if newRoot == "." {
				newRoot = ""
			}
			f.setRoot(newRoot)
			// return an error with an fs which points to the parent
			err = fs.ErrorIsFile
		}
	}
	return f, err
}

// newObject makes an object from a remote and an objectData
func (f *Fs) newObject(remote string, od *objectData) *Object {
	return &Object{fs: f, remote: remote, od: od}
}

// NewObject finds the Object at remote.  If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
	bucket, bucketPath := f.split(remote)
	od := buckets.getObjectData(bucket, bucketPath)
	if od == nil {
		return nil, fs.ErrorObjectNotFound
	}
	return f.newObject(remote, od), nil
}

// listFn is called from list to handle an object.
type listFn func(remote string, entry fs.DirEntry, isDirectory bool) error

// list the buckets to fn
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) (err error) {
	if prefix != "" {
		prefix += "/"
	}
	if directory != "" {
		directory += "/"
	}
	b := buckets.getBucket(bucket)
	if b == nil {
		return fs.ErrorDirNotFound
	}
	b.mu.RLock()
	defer b.mu.RUnlock()
	dirs := make(map[string]struct{})
	for absPath, od := range b.objects {
		if strings.HasPrefix(absPath, directory) {
			remote := absPath[len(prefix):]
			if !recurse {
				localPath := absPath[len(directory):]
				slash := strings.IndexRune(localPath, '/')
				if slash >= 0 {
					// send a directory if have a slash
					dir := directory + localPath[:slash]
					if addBucket {
						dir = path.Join(bucket, dir)
					}
					_, found := dirs[dir]
					if !found {
						err = fn(dir, fs.NewDir(dir, time.Time{}), true)
						if err != nil {
							return err
						}
						dirs[dir] = struct{}{}
					}
					continue // don't send this file if not recursing
				}
			}
			// send an object
			if addBucket {
				remote = path.Join(bucket, remote)
			}
			err = fn(remote, f.newObject(remote, od), false)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

// listDir lists the bucket to the entries
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
	// List the objects and directories
	err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, entry fs.DirEntry, isDirectory bool) error {
		entries = append(entries, entry)
		return nil
	})
	return entries, err
}

// listBuckets lists the buckets to entries
func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) {
	buckets.mu.RLock()
	defer buckets.mu.RUnlock()
	for name := range buckets.buckets {
		entries = append(entries, fs.NewDir(name, time.Time{}))
	}
	return entries, nil
}

// List the objects and directories in dir into entries.  The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
	// defer fslog.Trace(dir, "")("entries = %q, err = %v", &entries, &err)
	bucket, directory := f.split(dir)
	if bucket == "" {
		if directory != "" {
			return nil, fs.ErrorListBucketRequired
		}
		return f.listBuckets(ctx)
	}
	return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")
}

// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order.  If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
	bucket, directory := f.split(dir)
	list := walk.NewListRHelper(callback)
	listR := func(bucket, directory, prefix string, addBucket bool) error {
		return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, entry fs.DirEntry, isDirectory bool) error {
			return list.Add(entry)
		})
	}
	if bucket == "" {
		entries, err := f.listBuckets(ctx)
		if err != nil {
			return err
		}
		for _, entry := range entries {
			err = list.Add(entry)
			if err != nil {
				return err
			}
			bucket := entry.Remote()
			err = listR(bucket, "", f.rootDirectory, true)
			if err != nil {
				return err
			}
		}
	} else {
		err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "")
		if err != nil {
			return err
		}
	}
	return list.Flush()
}

// Put the object into the bucket
//
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	// Temporary Object under construction
	fs := &Object{
		fs:     f,
		remote: src.Remote(),
		od: &objectData{
			modTime: src.ModTime(ctx),
		},
	}
	return fs, fs.Update(ctx, in, src, options...)
}

// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	return f.Put(ctx, in, src, options...)
}

// Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
	bucket, _ := f.split(dir)
	buckets.makeBucket(bucket)
	return nil
}

// Rmdir deletes the bucket if the fs is at the root
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
	bucket, directory := f.split(dir)
	if bucket == "" || directory != "" {
		return nil
	}
	return buckets.deleteBucket(bucket)
}

// Precision of the remote
func (f *Fs) Precision() time.Duration {
	return time.Nanosecond
}

// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
	dstBucket, dstPath := f.split(remote)
	_ = buckets.makeBucket(dstBucket)
	srcObj, ok := src.(*Object)
	if !ok {
		fs.Debugf(src, "Can't copy - not same remote type")
		return nil, fs.ErrorCantCopy
	}
	srcBucket, srcPath := srcObj.split()
	od := buckets.getObjectData(srcBucket, srcPath)
	if od == nil {
		return nil, fs.ErrorObjectNotFound
	}
	buckets.updateObjectData(dstBucket, dstPath, od)
	return f.NewObject(ctx, remote)
}

// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
	return hash.Set(hashType)
}

// ------------------------------------------------------------

// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
	return o.fs
}

// Return a string version
func (o *Object) String() string {
	if o == nil {
		return "<nil>"
	}
	return o.Remote()
}

// Remote returns the remote path
func (o *Object) Remote() string {
	return o.remote
}

// Hash returns the hash of an object returning a lowercase hex string
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
	if t != hashType {
		return "", hash.ErrUnsupported
	}
	if o.od.hash == "" {
		sum := md5.Sum(o.od.data)
		o.od.hash = hex.EncodeToString(sum[:])
	}
	return o.od.hash, nil
}

// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
	return int64(len(o.od.data))
}

// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned in the http headers
//
// SHA-1 will also be updated once the request has completed.
func (o *Object) ModTime(ctx context.Context) (result time.Time) {
	return o.od.modTime
}

// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
	o.od.modTime = modTime
	return nil
}

// Storable returns if this object is storable
func (o *Object) Storable() bool {
	return true
}

// Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
	var offset, limit int64 = 0, -1
	for _, option := range options {
		switch x := option.(type) {
		case *fs.RangeOption:
			offset, limit = x.Decode(int64(len(o.od.data)))
		case *fs.SeekOption:
			offset = x.Offset
		default:
			if option.Mandatory() {
				fs.Logf(o, "Unsupported mandatory option: %v", option)
			}
		}
	}
	if offset > int64(len(o.od.data)) {
		offset = int64(len(o.od.data))
	}
	data := o.od.data[offset:]
	if limit >= 0 {
		if limit > int64(len(data)) {
			limit = int64(len(data))
		}
		data = data[:limit]
	}
	return ioutil.NopCloser(bytes.NewBuffer(data)), nil
}

// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
	bucket, bucketPath := o.split()
	data, err := ioutil.ReadAll(in)
	if err != nil {
		return errors.Wrap(err, "failed to update memory object")
	}
	o.od = &objectData{
		data:     data,
		hash:     "",
		modTime:  src.ModTime(ctx),
		mimeType: fs.MimeType(ctx, o),
	}
	buckets.updateObjectData(bucket, bucketPath, o.od)
	return nil
}

// Remove an object
func (o *Object) Remove(ctx context.Context) error {
	bucket, bucketPath := o.split()
	removed := buckets.removeObjectData(bucket, bucketPath)
	if !removed {
		return fs.ErrorObjectNotFound
	}
	return nil
}

// MimeType of an Object if known, "" otherwise
func (o *Object) MimeType(ctx context.Context) string {
	return o.od.mimeType
}

// Check the interfaces are satisfied
var (
	_ fs.Fs          = &Fs{}
	_ fs.Copier      = &Fs{}
	_ fs.PutStreamer = &Fs{}
	_ fs.ListRer     = &Fs{}
	_ fs.Object      = &Object{}
	_ fs.MimeTyper   = &Object{}
)