// Package batcher implements a generic batcher.
//
// It uses two types:
//
//	Item - the thing to be batched
//	Result - the result from the batching
//
// And one function of type CommitBatchFn which is called to do the actual batching.
package batcher

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/fserrors"
	"github.com/rclone/rclone/lib/atexit"
)

// Options for configuring the batcher
type Options struct {
	Mode                  string        // mode of the batcher "sync", "async" or "off"
	Size                  int           // size of batch
	Timeout               time.Duration // timeout before committing the batch
	MaxBatchSize          int           // max size the batch can be
	DefaultTimeoutSync    time.Duration // default time to kick off the batch if nothing added for this long (sync)
	DefaultTimeoutAsync   time.Duration // default time to kick off the batch if nothing added for this long (async)
	DefaultBatchSizeAsync int           // default batch size if async
}

// CommitBatchFn is called to commit a batch of Item and return Result to the callers.
//
// It should commit the batch of items then for each result i (of
// which there should be len(items)) it should set either results[i]
// or errors[i]
type CommitBatchFn[Item, Result any] func(ctx context.Context, items []Item, results []Result, errors []error) (err error)

// Batcher holds info about the current items waiting to be acted on.
type Batcher[Item, Result any] struct {
	opt      Options                     // options for configuring the batcher
	f        any                         // logging identity for fs.Debugf(f, ...)
	commit   CommitBatchFn[Item, Result] // User defined function to commit the batch
	async    bool                        // whether we are using async batching
	in       chan request[Item, Result]  // incoming items to batch
	closed   chan struct{}               // close to indicate batcher shut down
	atexit   atexit.FnHandle             // atexit handle
	shutOnce sync.Once                   // make sure we shutdown once only
	wg       sync.WaitGroup              // wait for shutdown
}

// request holds an incoming request with a place for a reply
type request[Item, Result any] struct {
	item   Item
	name   string
	result chan<- response[Result]
	quit   bool // if set then quit
}

// response holds a response to be delivered to clients waiting
// for a batch to complete.
type response[Result any] struct {
	err   error
	entry Result
}

// New creates a Batcher for Item and Result calling commit to do the actual committing.
func New[Item, Result any](ctx context.Context, f any, commit CommitBatchFn[Item, Result], opt Options) (*Batcher[Item, Result], error) {
	// fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout)
	if opt.Size > opt.MaxBatchSize || opt.Size < 0 {
		return nil, fmt.Errorf("batcher: batch size must be < %d and >= 0 - it is currently %d", opt.MaxBatchSize, opt.Size)
	}

	async := false

	switch opt.Mode {
	case "sync":
		if opt.Size <= 0 {
			ci := fs.GetConfig(ctx)
			opt.Size = ci.Transfers
		}
		if opt.Timeout <= 0 {
			opt.Timeout = opt.DefaultTimeoutSync
		}
	case "async":
		if opt.Size <= 0 {
			opt.Size = opt.DefaultBatchSizeAsync
		}
		if opt.Timeout <= 0 {
			opt.Timeout = opt.DefaultTimeoutAsync
		}
		async = true
	case "off":
		opt.Size = 0
	default:
		return nil, fmt.Errorf("batcher: batch mode must be sync|async|off not %q", opt.Mode)
	}

	b := &Batcher[Item, Result]{
		opt:    opt,
		f:      f,
		commit: commit,
		async:  async,
		in:     make(chan request[Item, Result], opt.Size),
		closed: make(chan struct{}),
	}
	if b.Batching() {
		b.atexit = atexit.Register(b.Shutdown)
		b.wg.Add(1)
		go b.commitLoop(context.Background())
	}
	return b, nil

}

// Batching returns true if batching is active
func (b *Batcher[Item, Result]) Batching() bool {
	return b.opt.Size > 0
}

// commit a batch calling the user defined commit function then distributing the results.
func (b *Batcher[Item, Result]) commitBatch(ctx context.Context, requests []request[Item, Result]) (err error) {
	// If commit fails then signal clients if sync
	var signalled = b.async
	defer func() {
		if err != nil && !signalled {
			// Signal to clients that there was an error
			for _, req := range requests {
				req.result <- response[Result]{err: err}
			}
		}
	}()
	desc := fmt.Sprintf("%s batch length %d starting with: %s", b.opt.Mode, len(requests), requests[0].name)
	fs.Debugf(b.f, "Committing %s", desc)

	var (
		items   = make([]Item, len(requests))
		results = make([]Result, len(requests))
		errors  = make([]error, len(requests))
	)

	for i := range requests {
		items[i] = requests[i].item
	}

	// Commit the batch
	err = b.commit(ctx, items, results, errors)
	if err != nil {
		return err
	}

	// Report results to clients
	var (
		lastError  error
		errorCount = 0
	)
	for i, req := range requests {
		result := results[i]
		err := errors[i]
		resp := response[Result]{}
		if err == nil {
			resp.entry = result
		} else {
			errorCount++
			lastError = err
			resp.err = fmt.Errorf("batch upload failed: %w", err)
		}
		if !b.async {
			req.result <- resp
		}
	}

	// show signalled so no need to report error to clients from now on
	signalled = true

	// Report an error if any failed in the batch
	if lastError != nil {
		return fmt.Errorf("batch had %d errors: last error: %w", errorCount, lastError)
	}

	fs.Debugf(b.f, "Committed %s", desc)
	return nil
}

// commitLoop runs the commit engine in the background
func (b *Batcher[Item, Result]) commitLoop(ctx context.Context) {
	var (
		requests  []request[Item, Result] // current batch of uncommitted Items
		idleTimer = time.NewTimer(b.opt.Timeout)
		commit    = func() {
			err := b.commitBatch(ctx, requests)
			if err != nil {
				fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.opt.Mode, len(requests), err)
			}
			requests = nil
		}
	)
	defer b.wg.Done()
	defer idleTimer.Stop()
	idleTimer.Stop()

outer:
	for {
		select {
		case req := <-b.in:
			if req.quit {
				break outer
			}
			requests = append(requests, req)
			idleTimer.Stop()
			if len(requests) >= b.opt.Size {
				commit()
			} else {
				idleTimer.Reset(b.opt.Timeout)
			}
		case <-idleTimer.C:
			if len(requests) > 0 {
				fs.Debugf(b.f, "Batch idle for %v so committing", b.opt.Timeout)
				commit()
			}
		}

	}
	// commit any remaining items
	if len(requests) > 0 {
		commit()
	}
}

// Shutdown finishes any pending batches then shuts everything down.
//
// This is registered as an atexit handler by New.
func (b *Batcher[Item, Result]) Shutdown() {
	if !b.Batching() {
		return
	}
	b.shutOnce.Do(func() {
		atexit.Unregister(b.atexit)
		fs.Infof(b.f, "Committing uploads - please wait...")
		// show that batcher is shutting down
		close(b.closed)
		// quit the commitLoop by sending a quitRequest message
		//
		// Note that we don't close b.in because that will
		// cause write to closed channel in Commit when we are
		// exiting due to a signal.
		b.in <- request[Item, Result]{quit: true}
		b.wg.Wait()
	})
}

// Commit commits the Item getting a Result or error using a batch
// call, first adding it to the batch and then waiting for the batch
// to complete in a synchronous way if async is not set.
//
// If async is set then this will return no error and a nil/empty
// Result.
//
// This should not be called if batching is off - check first with
// IsBatching.
func (b *Batcher[Item, Result]) Commit(ctx context.Context, name string, item Item) (entry Result, err error) {
	select {
	case <-b.closed:
		return entry, fserrors.FatalError(errors.New("batcher is shutting down"))
	default:
	}
	fs.Debugf(b.f, "Adding %q to batch", name)
	resp := make(chan response[Result], 1)
	b.in <- request[Item, Result]{
		item:   item,
		name:   name,
		result: resp,
	}
	// If running async then don't wait for the result
	if b.async {
		return entry, nil
	}
	result := <-resp
	return result.entry, result.err
}