diff --git a/backend/hdfs/fs.go b/backend/hdfs/fs.go index 7d48ffed6..8ba9d2366 100644 --- a/backend/hdfs/fs.go +++ b/backend/hdfs/fs.go @@ -21,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/pacer" ) // Fs represents a HDFS server @@ -31,8 +32,15 @@ type Fs struct { opt Options // options for this backend ci *fs.ConfigInfo // global config client *hdfs.Client + pacer *fs.Pacer // pacer for API calls } +const ( + minSleep = 20 * time.Millisecond + maxSleep = 10 * time.Second + decayConstant = 2 // bigger for slower decay, exponential +) + // copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go func getKerberosClient() (*krb.Client, error) { configPath := os.Getenv("KRB5_CONFIG") @@ -114,6 +122,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt: *opt, ci: fs.GetConfig(ctx), client: client, + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ diff --git a/backend/hdfs/object.go b/backend/hdfs/object.go index 31c8282f8..a2c2c1087 100644 --- a/backend/hdfs/object.go +++ b/backend/hdfs/object.go @@ -5,10 +5,12 @@ package hdfs import ( "context" + "errors" "io" "path" "time" + "github.com/colinmarc/hdfs/v2" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/readers" @@ -141,7 +143,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } - err = out.Close() + // If the datanodes have acknowledged all writes but not yet + // to the namenode, FileWriter.Close can return ErrReplicating + // (wrapped in an os.PathError). This indicates that all data + // has been written, but the lease is still open for the file. + // + // It is safe in this case to either ignore the error (and let + // the lease expire on its own) or to call Close multiple + // times until it completes without an error. The Java client, + // for context, always chooses to retry, with exponential + // backoff. + err = o.fs.pacer.Call(func() (bool, error) { + err := out.Close() + if err == nil { + return false, nil + } + return errors.Is(err, hdfs.ErrReplicating), err + }) if err != nil { cleanup() return err