// Upload large files for b2
//
// Docs - https://www.backblaze.com/docs/cloud-storage-large-files

package b2

import (
	"context"
	"crypto/sha1"
	"encoding/hex"
	"fmt"
	gohash "hash"
	"io"
	"strings"
	"sync"

	"github.com/rclone/rclone/backend/b2/api"
	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/accounting"
	"github.com/rclone/rclone/fs/chunksize"
	"github.com/rclone/rclone/fs/hash"
	"github.com/rclone/rclone/lib/atexit"
	"github.com/rclone/rclone/lib/pool"
	"github.com/rclone/rclone/lib/rest"
	"golang.org/x/sync/errgroup"
)

type hashAppendingReader struct {
	h         gohash.Hash
	in        io.Reader
	hexSum    string
	hexReader io.Reader
}

// Read returns bytes all bytes from the original reader, then the hex sum
// of what was read so far, then EOF.
func (har *hashAppendingReader) Read(b []byte) (int, error) {
	if har.hexReader == nil {
		n, err := har.in.Read(b)
		if err == io.EOF {
			har.in = nil // allow GC
			err = nil    // allow reading hexSum before EOF

			har.hexSum = hex.EncodeToString(har.h.Sum(nil))
			har.hexReader = strings.NewReader(har.hexSum)
		}
		return n, err
	}
	return har.hexReader.Read(b)
}

// AdditionalLength returns how many bytes the appended hex sum will take up.
func (har *hashAppendingReader) AdditionalLength() int {
	return hex.EncodedLen(har.h.Size())
}

// HexSum returns the hash sum as hex. It's only available after the original
// reader has EOF'd. It's an empty string before that.
func (har *hashAppendingReader) HexSum() string {
	return har.hexSum
}

// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
// after the original reader reaches EOF. The increased size depends on the
// given hash, which may be queried through AdditionalLength()
func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader {
	withHash := io.TeeReader(in, h)
	return &hashAppendingReader{h: h, in: withHash}
}

// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
	f         *Fs                             // parent Fs
	o         *Object                         // object being uploaded
	doCopy    bool                            // doing copy rather than upload
	what      string                          // text name of operation for logs
	in        io.Reader                       // read the data from here
	wrap      accounting.WrapFn               // account parts being transferred
	id        string                          // ID of the file being uploaded
	size      int64                           // total size
	parts     int                             // calculated number of parts, if known
	sha1smu   sync.Mutex                      // mutex to protect sha1s
	sha1s     []string                        // slice of SHA1s for each part
	uploadMu  sync.Mutex                      // lock for upload variable
	uploads   []*api.GetUploadPartURLResponse // result of get upload URL calls
	chunkSize int64                           // chunk size to use
	src       *Object                         // if copying, object we are reading from
	info      *api.FileInfo                   // final response with info about the object
}

// newLargeUpload starts an upload of object o from in with metadata in src
//
// If newInfo is set then metadata from that will be used instead of reading it from src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File, options ...fs.OpenOption) (up *largeUpload, err error) {
	size := src.Size()
	parts := 0
	chunkSize := defaultChunkSize
	if size == -1 {
		fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize)
	} else {
		chunkSize = chunksize.Calculator(o, size, maxParts, defaultChunkSize)
		parts = int(size / int64(chunkSize))
		if size%int64(chunkSize) != 0 {
			parts++
		}
	}
	bucket, bucketPath := o.split()
	bucketID, err := f.getBucketID(ctx, bucket)
	if err != nil {
		return nil, err
	}
	var request = api.StartLargeFileRequest{
		BucketID: bucketID,
		Name:     f.opt.Enc.FromStandardPath(bucketPath),
	}
	optionsToSend := make([]fs.OpenOption, 0, len(options))
	if newInfo == nil {
		modTime, err := o.getModTime(ctx, src, options)
		if err != nil {
			return nil, err
		}

		request.ContentType = fs.MimeType(ctx, src)
		request.Info = map[string]string{
			timeKey: timeString(modTime),
		}
		// Custom upload headers - remove header prefix since they are sent in the body
		for _, option := range options {
			k, v := option.Header()
			k = strings.ToLower(k)
			if strings.HasPrefix(k, headerPrefix) {
				request.Info[k[len(headerPrefix):]] = v
			} else {
				optionsToSend = append(optionsToSend, option)
			}
		}
		// Set the SHA1 if known
		if !o.fs.opt.DisableCheckSum || doCopy {
			if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" {
				request.Info[sha1Key] = calculatedSha1
			}
		}
	} else {
		request.ContentType = newInfo.ContentType
		request.Info = newInfo.Info
	}
	opts := rest.Opts{
		Method:  "POST",
		Path:    "/b2_start_large_file",
		Options: optionsToSend,
	}
	var response api.StartLargeFileResponse
	err = f.pacer.Call(func() (bool, error) {
		resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
		return f.shouldRetry(ctx, resp, err)
	})
	if err != nil {
		return nil, err
	}
	up = &largeUpload{
		f:         f,
		o:         o,
		doCopy:    doCopy,
		what:      "upload",
		id:        response.ID,
		size:      size,
		parts:     parts,
		sha1s:     make([]string, 0, 16),
		chunkSize: int64(chunkSize),
	}
	// unwrap the accounting from the input, we use wrap to put it
	// back on after the buffering
	if doCopy {
		up.what = "copy"
		up.src = src.(*Object)
	} else {
		up.in, up.wrap = accounting.UnWrap(in)
	}
	return up, nil
}

// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
func (up *largeUpload) getUploadURL(ctx context.Context) (upload *api.GetUploadPartURLResponse, err error) {
	up.uploadMu.Lock()
	if len(up.uploads) > 0 {
		upload, up.uploads = up.uploads[0], up.uploads[1:]
		up.uploadMu.Unlock()
		return upload, nil
	}
	up.uploadMu.Unlock()

	opts := rest.Opts{
		Method: "POST",
		Path:   "/b2_get_upload_part_url",
	}
	var request = api.GetUploadPartURLRequest{
		ID: up.id,
	}
	err = up.f.pacer.Call(func() (bool, error) {
		resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &upload)
		return up.f.shouldRetry(ctx, resp, err)
	})
	if err != nil {
		return nil, fmt.Errorf("failed to get upload URL: %w", err)
	}
	return upload, nil
}

// returnUploadURL returns the UploadURL to the cache
func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) {
	if upload == nil {
		return
	}
	up.uploadMu.Lock()
	up.uploads = append(up.uploads, upload)
	up.uploadMu.Unlock()
}

// Add an sha1 to the being built up sha1s
func (up *largeUpload) addSha1(chunkNumber int, sha1 string) {
	up.sha1smu.Lock()
	defer up.sha1smu.Unlock()
	if len(up.sha1s) < chunkNumber+1 {
		up.sha1s = append(up.sha1s, make([]string, chunkNumber+1-len(up.sha1s))...)
	}
	up.sha1s[chunkNumber] = sha1
}

// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (up *largeUpload) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (size int64, err error) {
	// Only account after the checksum reads have been done
	if do, ok := reader.(pool.DelayAccountinger); ok {
		// To figure out this number, do a transfer and if the accounted size is 0 or a
		// multiple of what it should be, increase or decrease this number.
		do.DelayAccounting(1)
	}

	err = up.f.pacer.Call(func() (bool, error) {
		// Discover the size by seeking to the end
		size, err = reader.Seek(0, io.SeekEnd)
		if err != nil {
			return false, err
		}

		// rewind the reader on retry and after reading size
		_, err = reader.Seek(0, io.SeekStart)
		if err != nil {
			return false, err
		}

		fs.Debugf(up.o, "Sending chunk %d length %d", chunkNumber, size)

		// Get upload URL
		upload, err := up.getUploadURL(ctx)
		if err != nil {
			return false, err
		}

		in := newHashAppendingReader(reader, sha1.New())
		sizeWithHash := size + int64(in.AdditionalLength())

		// Authorization
		//
		// An upload authorization token, from b2_get_upload_part_url.
		//
		// X-Bz-Part-Number
		//
		// A number from 1 to 10000. The parts uploaded for one file
		// must have contiguous numbers, starting with 1.
		//
		// Content-Length
		//
		// The number of bytes in the file being uploaded. Note that
		// this header is required; you cannot leave it out and just
		// use chunked encoding. The minimum size of every part but
		// the last one is 100 MB (100,000,000 bytes)
		//
		// X-Bz-Content-Sha1
		//
		// The SHA1 checksum of the this part of the file. B2 will
		// check this when the part is uploaded, to make sure that the
		// data arrived correctly. The same SHA1 checksum must be
		// passed to b2_finish_large_file.
		opts := rest.Opts{
			Method:  "POST",
			RootURL: upload.UploadURL,
			Body:    up.wrap(in),
			ExtraHeaders: map[string]string{
				"Authorization":    upload.AuthorizationToken,
				"X-Bz-Part-Number": fmt.Sprintf("%d", chunkNumber+1),
				sha1Header:         "hex_digits_at_end",
			},
			ContentLength: &sizeWithHash,
		}

		var response api.UploadPartResponse

		resp, err := up.f.srv.CallJSON(ctx, &opts, nil, &response)
		retry, err := up.f.shouldRetry(ctx, resp, err)
		if err != nil {
			fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", chunkNumber, retry, err, err)
		}
		// On retryable error clear PartUploadURL
		if retry {
			fs.Debugf(up.o, "Clearing part upload URL because of error: %v", err)
			upload = nil
		}
		up.returnUploadURL(upload)
		up.addSha1(chunkNumber, in.HexSum())
		return retry, err
	})
	if err != nil {
		fs.Debugf(up.o, "Error sending chunk %d: %v", chunkNumber, err)
	} else {
		fs.Debugf(up.o, "Done sending chunk %d", chunkNumber)
	}
	return size, err
}

// Copy a chunk
func (up *largeUpload) copyChunk(ctx context.Context, part int, partSize int64) error {
	err := up.f.pacer.Call(func() (bool, error) {
		fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize)
		opts := rest.Opts{
			Method: "POST",
			Path:   "/b2_copy_part",
		}
		offset := int64(part) * up.chunkSize // where we are in the source file
		var request = api.CopyPartRequest{
			SourceID:    up.src.id,
			LargeFileID: up.id,
			PartNumber:  int64(part + 1),
			Range:       fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1),
		}
		var response api.UploadPartResponse
		resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
		retry, err := up.f.shouldRetry(ctx, resp, err)
		if err != nil {
			fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err)
		}
		up.addSha1(part, response.SHA1)
		return retry, err
	})
	if err != nil {
		fs.Debugf(up.o, "Error copying chunk %d: %v", part, err)
	} else {
		fs.Debugf(up.o, "Done copying chunk %d", part)
	}
	return err
}

// Close closes off the large upload
func (up *largeUpload) Close(ctx context.Context) error {
	fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts)
	opts := rest.Opts{
		Method: "POST",
		Path:   "/b2_finish_large_file",
	}
	var request = api.FinishLargeFileRequest{
		ID:    up.id,
		SHA1s: up.sha1s,
	}
	var response api.FileInfo
	err := up.f.pacer.Call(func() (bool, error) {
		resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
		return up.f.shouldRetry(ctx, resp, err)
	})
	if err != nil {
		return err
	}
	up.info = &response
	return nil
}

// Abort aborts the large upload
func (up *largeUpload) Abort(ctx context.Context) error {
	fs.Debugf(up.o, "Cancelling large file %s", up.what)
	opts := rest.Opts{
		Method: "POST",
		Path:   "/b2_cancel_large_file",
	}
	var request = api.CancelLargeFileRequest{
		ID: up.id,
	}
	var response api.CancelLargeFileResponse
	err := up.f.pacer.Call(func() (bool, error) {
		resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
		return up.f.shouldRetry(ctx, resp, err)
	})
	if err != nil {
		fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, err)
	}
	return err
}

// Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF.
//
// Note that initialUploadBlock must be returned to f.putBuf()
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) (err error) {
	defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
	fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
	var (
		g, gCtx      = errgroup.WithContext(ctx)
		hasMoreParts = true
	)
	up.size = initialUploadBlock.Size()
	up.parts = 0
	for part := 0; hasMoreParts; part++ {
		// Get a block of memory from the pool and token which limits concurrency.
		var rw *pool.RW
		if part == 0 {
			rw = initialUploadBlock
		} else {
			rw = up.f.getRW(false)
		}

		// Fail fast, in case an errgroup managed function returns an error
		// gCtx is cancelled. There is no point in uploading all the other parts.
		if gCtx.Err() != nil {
			up.f.putRW(rw)
			break
		}

		// Read the chunk
		var n int64
		if part == 0 {
			n = rw.Size()
		} else {
			n, err = io.CopyN(rw, up.in, up.chunkSize)
			if err == io.EOF {
				if n == 0 {
					fs.Debugf(up.o, "Not sending empty chunk after EOF - ending.")
					up.f.putRW(rw)
					break
				} else {
					fs.Debugf(up.o, "Read less than a full chunk %d, making this the last one.", n)
				}
				hasMoreParts = false
			} else if err != nil {
				// other kinds of errors indicate failure
				up.f.putRW(rw)
				return err
			}
		}

		// Keep stats up to date
		up.parts += 1
		up.size += n
		if part > maxParts {
			up.f.putRW(rw)
			return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
		}

		part := part // for the closure
		g.Go(func() (err error) {
			defer up.f.putRW(rw)
			_, err = up.WriteChunk(gCtx, part, rw)
			return err
		})
	}
	err = g.Wait()
	if err != nil {
		return err
	}
	return up.Close(ctx)
}

// Copy the chunks from the source to the destination
func (up *largeUpload) Copy(ctx context.Context) (err error) {
	defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
	fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
	var (
		g, gCtx   = errgroup.WithContext(ctx)
		remaining = up.size
	)
	g.SetLimit(up.f.opt.UploadConcurrency)
	for part := 0; part < up.parts; part++ {
		// Fail fast, in case an errgroup managed function returns an error
		// gCtx is cancelled. There is no point in copying all the other parts.
		if gCtx.Err() != nil {
			break
		}

		reqSize := remaining
		if reqSize >= up.chunkSize {
			reqSize = up.chunkSize
		}

		part := part // for the closure
		g.Go(func() (err error) {
			return up.copyChunk(gCtx, part, reqSize)
		})
		remaining -= reqSize
	}
	err = g.Wait()
	if err != nil {
		return err
	}
	return up.Close(ctx)
}