hdfs: fix retry "replication in progress" errors when uploading

Before this change uploaded files could return the error "replication
in progress".

This error is harmless though and means the Close should be retried
which is what this patch does.
This commit is contained in:
Nick Craig-Wood 2023-09-08 15:30:04 +01:00
parent c9350149d8
commit 7453b7d5f3
2 changed files with 28 additions and 1 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/pacer"
) )
// Fs represents a HDFS server // Fs represents a HDFS server
@ -31,8 +32,15 @@ type Fs struct {
opt Options // options for this backend opt Options // options for this backend
ci *fs.ConfigInfo // global config ci *fs.ConfigInfo // global config
client *hdfs.Client client *hdfs.Client
pacer *fs.Pacer // pacer for API calls
} }
const (
minSleep = 20 * time.Millisecond
maxSleep = 10 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
)
// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go // copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
func getKerberosClient() (*krb.Client, error) { func getKerberosClient() (*krb.Client, error) {
configPath := os.Getenv("KRB5_CONFIG") configPath := os.Getenv("KRB5_CONFIG")
@ -114,6 +122,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
opt: *opt, opt: *opt,
ci: fs.GetConfig(ctx), ci: fs.GetConfig(ctx),
client: client, client: client,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
} }
f.features = (&fs.Features{ f.features = (&fs.Features{

View File

@ -5,10 +5,12 @@ package hdfs
import ( import (
"context" "context"
"errors"
"io" "io"
"path" "path"
"time" "time"
"github.com/colinmarc/hdfs/v2"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
@ -141,7 +143,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err return err
} }
err = out.Close() // If the datanodes have acknowledged all writes but not yet
// to the namenode, FileWriter.Close can return ErrReplicating
// (wrapped in an os.PathError). This indicates that all data
// has been written, but the lease is still open for the file.
//
// It is safe in this case to either ignore the error (and let
// the lease expire on its own) or to call Close multiple
// times until it completes without an error. The Java client,
// for context, always chooses to retry, with exponential
// backoff.
err = o.fs.pacer.Call(func() (bool, error) {
err := out.Close()
if err == nil {
return false, nil
}
return errors.Is(err, hdfs.ErrReplicating), err
})
if err != nil { if err != nil {
cleanup() cleanup()
return err return err