mirror of
https://github.com/rclone/rclone.git
synced 2024-11-25 09:41:44 +08:00
serve nfs: implement on disk cache for file handles
Some checks are pending
Docker beta build / Build image job (push) Waiting to run
Some checks are pending
Docker beta build / Build image job (push) Waiting to run
This commit is contained in:
parent
55b9b3e33a
commit
70e8ad456f
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
"github.com/rclone/rclone/vfs/vfstest"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Return true if the command ran without error
|
||||
|
@ -28,5 +29,7 @@ func TestMount(t *testing.T) {
|
|||
}
|
||||
sudo = true
|
||||
}
|
||||
nfsServerOpt.HandleCacheDir = t.TempDir()
|
||||
require.NoError(t, nfsServerOpt.HandleCache.Set("disk"))
|
||||
vfstest.RunTests(t, false, vfscommon.CacheModeWrites, false, mount)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,24 @@
|
|||
package nfs
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
billy "github.com/go-git/go-billy/v5"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
"github.com/willscott/go-nfs"
|
||||
nfshelper "github.com/willscott/go-nfs/helpers"
|
||||
)
|
||||
|
||||
|
@ -25,8 +42,121 @@ type Cache interface {
|
|||
}
|
||||
|
||||
// Set the cache of the handler to the type required by the user
|
||||
func (h *Handler) setCache() (err error) {
|
||||
// The default caching handler
|
||||
h.Cache = nfshelper.NewCachingHandler(h, h.opt.HandleLimit)
|
||||
func (h *Handler) getCache() (c Cache, err error) {
|
||||
switch h.opt.HandleCache {
|
||||
case cacheMemory:
|
||||
return nfshelper.NewCachingHandler(h, h.opt.HandleLimit), nil
|
||||
case cacheDisk:
|
||||
return newDiskHandler(h)
|
||||
case cacheSymlink:
|
||||
if runtime.GOOS != "linux" {
|
||||
return nil, errors.New("can only use symlink cache on Linux")
|
||||
}
|
||||
return nil, errors.New("FIXME not implemented yet")
|
||||
}
|
||||
return nil, errors.New("unknown handle cache type")
|
||||
}
|
||||
|
||||
// diskHandler implements an on disk NFS file handle cache
|
||||
type diskHandler struct {
|
||||
mu sync.RWMutex
|
||||
cacheDir string
|
||||
billyFS billy.Filesystem
|
||||
}
|
||||
|
||||
// Create a new disk handler
|
||||
func newDiskHandler(h *Handler) (dh *diskHandler, err error) {
|
||||
cacheDir := h.opt.HandleCacheDir
|
||||
// If cacheDir isn't set then make one from the config
|
||||
if cacheDir == "" {
|
||||
// How the VFS was configured
|
||||
configString := fs.ConfigString(h.vfs.Fs())
|
||||
// Turn it into a valid OS directory name
|
||||
dirName := encoder.OS.ToStandardName(configString)
|
||||
cacheDir = filepath.Join(config.GetCacheDir(), "serve-nfs-handle-cache-"+h.opt.HandleCache.String(), dirName)
|
||||
}
|
||||
// Create the cache dir
|
||||
err = file.MkdirAll(cacheDir, 0700)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("disk handler mkdir failed: %v", err)
|
||||
}
|
||||
dh = &diskHandler{
|
||||
cacheDir: cacheDir,
|
||||
billyFS: h.billyFS,
|
||||
}
|
||||
fs.Infof("nfs", "Storing handle cache in %q", dh.cacheDir)
|
||||
return dh, nil
|
||||
}
|
||||
|
||||
// Convert a path to a hash
|
||||
func hashPath(fullPath string) []byte {
|
||||
hash := md5.Sum([]byte(fullPath))
|
||||
return hash[:]
|
||||
}
|
||||
|
||||
// Convert a handle to a path on disk for the handle
|
||||
func (dh *diskHandler) handleToPath(fh []byte) (cachePath string) {
|
||||
fhString := hex.EncodeToString(fh)
|
||||
if len(fhString) <= 4 {
|
||||
cachePath = filepath.Join(dh.cacheDir, fhString)
|
||||
} else {
|
||||
cachePath = filepath.Join(dh.cacheDir, fhString[0:2], fhString[2:4], fhString)
|
||||
}
|
||||
return cachePath
|
||||
}
|
||||
|
||||
// ToHandle takes a file and represents it with an opaque handle to reference it.
|
||||
// In stateless nfs (when it's serving a unix fs) this can be the device + inode
|
||||
// but we can generalize with a stateful local cache of handed out IDs.
|
||||
func (dh *diskHandler) ToHandle(f billy.Filesystem, splitPath []string) (fh []byte) {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
fullPath := path.Join(splitPath...)
|
||||
fh = hashPath(fullPath)
|
||||
cachePath := dh.handleToPath(fh)
|
||||
cacheDir := filepath.Dir(cachePath)
|
||||
err := os.MkdirAll(cacheDir, 0700)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Couldn't create cache file handle directory: %v", err)
|
||||
return fh
|
||||
}
|
||||
err = os.WriteFile(cachePath, []byte(fullPath), 0600)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Couldn't create cache file handle: %v", err)
|
||||
return fh
|
||||
}
|
||||
return fh
|
||||
}
|
||||
|
||||
var errStaleHandle = &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
|
||||
|
||||
// FromHandle converts from an opaque handle to the file it represents
|
||||
func (dh *diskHandler) FromHandle(fh []byte) (f billy.Filesystem, splitPath []string, err error) {
|
||||
dh.mu.RLock()
|
||||
defer dh.mu.RUnlock()
|
||||
cachePath := dh.handleToPath(fh)
|
||||
fullPathBytes, err := os.ReadFile(cachePath)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Stale handle %q: %v", cachePath, err)
|
||||
return nil, nil, errStaleHandle
|
||||
}
|
||||
splitPath = strings.Split(string(fullPathBytes), "/")
|
||||
return dh.billyFS, splitPath, nil
|
||||
}
|
||||
|
||||
// Invalidate the handle passed - used on rename and delete
|
||||
func (dh *diskHandler) InvalidateHandle(f billy.Filesystem, fh []byte) error {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
cachePath := dh.handleToPath(fh)
|
||||
err := os.Remove(cachePath)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Failed to remove handle %q: %v", cachePath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleLimit exports how many file handles can be safely stored by this cache.
|
||||
func (dh *diskHandler) HandleLimit() int {
|
||||
return math.MaxInt
|
||||
}
|
||||
|
|
134
cmd/serve/nfs/cache_test.go
Normal file
134
cmd/serve/nfs/cache_test.go
Normal file
|
@ -0,0 +1,134 @@
|
|||
//go:build unix
|
||||
|
||||
package nfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Check basic CRUD operations
|
||||
func testCacheCRUD(t *testing.T, h *Handler, c Cache, fileName string) {
|
||||
// Check reading a non existent handle returns an error
|
||||
_, _, err := c.FromHandle([]byte{10})
|
||||
assert.Error(t, err)
|
||||
|
||||
// Write a handle
|
||||
splitPath := []string{"dir", fileName}
|
||||
fh := c.ToHandle(h.billyFS, splitPath)
|
||||
assert.True(t, len(fh) > 0)
|
||||
|
||||
// Read the handle back
|
||||
newFs, newSplitPath, err := c.FromHandle(fh)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, h.billyFS, newFs)
|
||||
assert.Equal(t, splitPath, newSplitPath)
|
||||
|
||||
// Invalidate the handle
|
||||
err = c.InvalidateHandle(h.billyFS, fh)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Invalidate the handle twice
|
||||
err = c.InvalidateHandle(h.billyFS, fh)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the handle is gone and returning stale handle error
|
||||
_, _, err = c.FromHandle(fh)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, errStaleHandle, err)
|
||||
}
|
||||
|
||||
// Thrash the cache operations in parallel on different files
|
||||
func testCacheThrashDifferent(t *testing.T, h *Handler, c Cache) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
testCacheCRUD(t, h, c, fmt.Sprintf("file-%d", i))
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Thrash the cache operations in parallel on the same file
|
||||
func testCacheThrashSame(t *testing.T, h *Handler, c Cache) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Write a handle
|
||||
splitPath := []string{"file"}
|
||||
fh := c.ToHandle(h.billyFS, splitPath)
|
||||
assert.True(t, len(fh) > 0)
|
||||
|
||||
// Read the handle back
|
||||
newFs, newSplitPath, err := c.FromHandle(fh)
|
||||
if err != nil {
|
||||
assert.Equal(t, errStaleHandle, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, h.billyFS, newFs)
|
||||
assert.Equal(t, splitPath, newSplitPath)
|
||||
}
|
||||
|
||||
// Invalidate the handle
|
||||
err = c.InvalidateHandle(h.billyFS, fh)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the handle is gone and returning stale handle error
|
||||
_, _, err = c.FromHandle(fh)
|
||||
if err != nil {
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, errStaleHandle, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
// Quieten the flood of ERROR messages!
|
||||
ci := fs.GetConfig(context.Background())
|
||||
oldLogLevel := ci.LogLevel
|
||||
ci.LogLevel = fs.LogLevelEmergency
|
||||
defer func() {
|
||||
ci.LogLevel = oldLogLevel
|
||||
}()
|
||||
billyFS := &FS{nil} // place holder billyFS
|
||||
for _, cacheType := range []handleCache{cacheMemory, cacheDisk} {
|
||||
cacheType := cacheType
|
||||
t.Run(cacheType.String(), func(t *testing.T) {
|
||||
h := &Handler{
|
||||
billyFS: billyFS,
|
||||
}
|
||||
h.opt.HandleLimit = 1000
|
||||
h.opt.HandleCache = cacheType
|
||||
h.opt.HandleCacheDir = t.TempDir()
|
||||
c, err := h.getCache()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("CRUD", func(t *testing.T) {
|
||||
testCacheCRUD(t, h, c, "file")
|
||||
})
|
||||
// NB the default caching handler is not thread safe!
|
||||
if cacheType != cacheMemory {
|
||||
t.Run("ThrashDifferent", func(t *testing.T) {
|
||||
testCacheThrashDifferent(t, h, c)
|
||||
})
|
||||
t.Run("ThrashSame", func(t *testing.T) {
|
||||
testCacheThrashSame(t, h, c)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -13,32 +13,30 @@ import (
|
|||
"github.com/rclone/rclone/fs/log"
|
||||
"github.com/rclone/rclone/vfs"
|
||||
"github.com/willscott/go-nfs"
|
||||
nfshelper "github.com/willscott/go-nfs/helpers"
|
||||
)
|
||||
|
||||
// Handler returns a NFS backing that exposes a given file system in response to all mount requests.
|
||||
type Handler struct {
|
||||
vfs *vfs.VFS
|
||||
opt *Options
|
||||
opt Options
|
||||
billyFS *FS
|
||||
Cache
|
||||
}
|
||||
|
||||
// NewHandler creates a handler for the provided filesystem
|
||||
func NewHandler(vfs *vfs.VFS, opt *Options) (nfs.Handler, error) {
|
||||
handler := &Handler{
|
||||
func NewHandler(vfs *vfs.VFS, opt *Options) (handler nfs.Handler, err error) {
|
||||
h := &Handler{
|
||||
vfs: vfs,
|
||||
opt: opt,
|
||||
opt: *opt,
|
||||
billyFS: &FS{vfs: vfs},
|
||||
}
|
||||
handler.opt.HandleLimit = handler.opt.Limit()
|
||||
err := handler.setCache()
|
||||
h.opt.HandleLimit = h.opt.Limit()
|
||||
h.Cache, err = h.getCache()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to make cache: %w", err)
|
||||
}
|
||||
handler.Cache = nfshelper.NewCachingHandler(handler, handler.opt.HandleLimit)
|
||||
nfs.SetLogger(&logIntercepter{Level: nfs.DebugLevel})
|
||||
return handler, nil
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// Mount backs Mount RPC Requests, allowing for access control policies.
|
||||
|
@ -83,7 +81,7 @@ func (h *Handler) HandleLimit() int {
|
|||
return h.Cache.HandleLimit()
|
||||
}
|
||||
|
||||
// Invalidate the handle passed - used on rename and delete
|
||||
// InvalidateHandle invalidates the handle passed - used on rename and delete
|
||||
func (h *Handler) InvalidateHandle(f billy.Filesystem, b []byte) (err error) {
|
||||
defer log.Trace("nfs", "handle=%X", b)("err=%v", &err)
|
||||
return h.Cache.InvalidateHandle(f, b)
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
//go:build unix
|
||||
|
||||
// Package nfs implements a server to serve a VFS remote over NFSv3 protocol
|
||||
// Package nfs implements a server to serve a VFS remote over the NFSv3 protocol
|
||||
//
|
||||
// There is no authentication available on this server
|
||||
// and it is served on loopback interface by default.
|
||||
// There is no authentication available on this server and it is
|
||||
// served on the loopback interface by default.
|
||||
//
|
||||
// This is primarily used for mounting a VFS remote
|
||||
// in macOS, where FUSE-mounting mechanisms are usually not available.
|
||||
// This is primarily used for mounting a VFS remote in macOS, where
|
||||
// FUSE-mounting mechanisms are usually not available.
|
||||
package nfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
@ -31,16 +32,44 @@ var OptionsInfo = fs.Options{{
|
|||
Name: "nfs_cache_handle_limit",
|
||||
Default: 1000000,
|
||||
Help: "max file handles cached simultaneously (min 5)",
|
||||
}, {
|
||||
Name: "nfs_cache_type",
|
||||
Default: cacheMemory,
|
||||
Help: "Type of NFS handle cache to use",
|
||||
}, {
|
||||
Name: "nfs_cache_dir",
|
||||
Default: "",
|
||||
Help: "The directory the NFS handle cache will use if set",
|
||||
}}
|
||||
|
||||
func init() {
|
||||
fs.RegisterGlobalOptions(fs.OptionsInfo{Name: "nfs", Opt: &opt, Options: OptionsInfo})
|
||||
}
|
||||
|
||||
type handleCache = fs.Enum[handleCacheChoices]
|
||||
|
||||
const (
|
||||
cacheMemory handleCache = iota
|
||||
cacheDisk
|
||||
cacheSymlink
|
||||
)
|
||||
|
||||
type handleCacheChoices struct{}
|
||||
|
||||
func (handleCacheChoices) Choices() []string {
|
||||
return []string{
|
||||
cacheMemory: "memory",
|
||||
cacheDisk: "disk",
|
||||
cacheSymlink: "symlink",
|
||||
}
|
||||
}
|
||||
|
||||
// Options contains options for the NFS Server
|
||||
type Options struct {
|
||||
ListenAddr string `config:"addr"` // Port to listen on
|
||||
HandleLimit int `config:"nfs_cache_handle_limit"` // max file handles cached by go-nfs CachingHandler
|
||||
ListenAddr string `config:"addr"` // Port to listen on
|
||||
HandleLimit int `config:"nfs_cache_handle_limit"` // max file handles cached by go-nfs CachingHandler
|
||||
HandleCache handleCache `config:"nfs_cache_type"` // what kind of handle cache to use
|
||||
HandleCacheDir string `config:"nfs_cache_dir"` // where the handle cache should be stored
|
||||
}
|
||||
|
||||
var opt Options
|
||||
|
@ -73,38 +102,73 @@ func Run(command *cobra.Command, args []string) {
|
|||
var Command = &cobra.Command{
|
||||
Use: "nfs remote:path",
|
||||
Short: `Serve the remote as an NFS mount`,
|
||||
Long: `Create an NFS server that serves the given remote over the network.
|
||||
Long: strings.ReplaceAll(`Create an NFS server that serves the given remote over the network.
|
||||
|
||||
The primary purpose for this command is to enable [mount command](/commands/rclone_mount/) on recent macOS versions where
|
||||
installing FUSE is very cumbersome.
|
||||
This implements an NFSv3 server to serve any rclone remote via NFS.
|
||||
|
||||
Since this is running on NFSv3, no authentication method is available. Any client
|
||||
will be able to access the data. To limit access, you can use serve NFS on loopback address
|
||||
and rely on secure tunnels (such as SSH). For this reason, by default, a random TCP port is chosen and loopback interface is used for the listening address;
|
||||
meaning that it is only available to the local machine. If you want other machines to access the
|
||||
NFS mount over local network, you need to specify the listening address and port using ` + "`--addr`" + ` flag.
|
||||
The primary purpose for this command is to enable the [mount
|
||||
command](/commands/rclone_mount/) on recent macOS versions where
|
||||
installing FUSE is very cumbersome.
|
||||
|
||||
Modifying files through NFS protocol requires VFS caching. Usually you will need to specify ` + "`--vfs-cache-mode`" + `
|
||||
in order to be able to write to the mountpoint (full is recommended). If you don't specify VFS cache mode,
|
||||
the mount will be read-only. Note also that ` + "`--nfs-cache-handle-limit`" + ` controls the maximum number of cached file handles stored by the caching handler.
|
||||
This should not be set too low or you may experience errors when trying to access files. The default is ` + "`1000000`" + `, but consider lowering this limit if
|
||||
the server's system resource usage causes problems.
|
||||
This server does not implement any authentication so any client will be
|
||||
able to access the data. To limit access, you can use |serve nfs| on
|
||||
the loopback address or rely on secure tunnels (such as SSH) or use
|
||||
firewalling.
|
||||
|
||||
For this reason, by default, a random TCP port is chosen and the
|
||||
loopback interface is used for the listening address by default;
|
||||
meaning that it is only available to the local machine. If you want
|
||||
other machines to access the NFS mount over local network, you need to
|
||||
specify the listening address and port using the |--addr| flag.
|
||||
|
||||
Modifying files through the NFS protocol requires VFS caching. Usually
|
||||
you will need to specify |--vfs-cache-mode| in order to be able to
|
||||
write to the mountpoint (|full| is recommended). If you don't specify
|
||||
VFS cache mode, the mount will be read-only.
|
||||
|
||||
|--nfs-cache-type| controls the type of the NFS handle cache. By
|
||||
default this is |memory| where new handles will be randomly allocated
|
||||
when needed. These are stored in memory. If the server is restarted
|
||||
the handle cache will be lost and connected NFS clients will get stale
|
||||
handle errors.
|
||||
|
||||
|--nfs-cache-type disk| uses an on disk NFS handle cache. Rclone
|
||||
hashes the path of the object and stores it in a file named after the
|
||||
hash. These hashes are stored on disk the directory controlled by
|
||||
|--cache-dir| or the exact directory may be specified with
|
||||
|--nfs-cache-dir|. Using this means that the NFS server can be
|
||||
restarted at will without affecting the connected clients.
|
||||
|
||||
|--nfs-cache-type symlink| is similar to |--nfs-cache-type disk| in
|
||||
that it uses an on disk cache, but the cache entries are held as
|
||||
symlinks. Rclone will use the handle of the underlying file as the NFS
|
||||
handle which improves performance. This sort of cache can't be backed
|
||||
up and restored as the underlying handles will change. This is Linux
|
||||
only.
|
||||
|
||||
|--nfs-cache-handle-limit| controls the maximum number of cached NFS
|
||||
handles stored by the caching handler. This should not be set too low
|
||||
or you may experience errors when trying to access files. The default
|
||||
is |1000000|, but consider lowering this limit if the server's system
|
||||
resource usage causes problems. This is only used by the |memory| type
|
||||
cache.
|
||||
|
||||
To serve NFS over the network use following command:
|
||||
|
||||
rclone serve nfs remote: --addr 0.0.0.0:$PORT --vfs-cache-mode=full
|
||||
|
||||
We specify a specific port that we can use in the mount command:
|
||||
|
||||
To mount the server under Linux/macOS, use the following command:
|
||||
This specifies a port that can be used in the mount command. To mount
|
||||
the server under Linux/macOS, use the following command:
|
||||
|
||||
mount -t nfs -o port=$PORT,mountport=$PORT,tcp $HOSTNAME:/ path/to/mountpoint
|
||||
|
||||
Where ` + "`$PORT`" + ` is the same port number we used in the serve nfs command.
|
||||
Where |$PORT| is the same port number used in the |serve nfs| command
|
||||
and |$HOSTNAME| is the network address of the machine that |serve nfs|
|
||||
was run on.
|
||||
|
||||
This feature is only available on Unix platforms.
|
||||
This command is only available on Unix platforms.
|
||||
|
||||
` + vfs.Help(),
|
||||
`, "|", "`") + vfs.Help(),
|
||||
Annotations: map[string]string{
|
||||
"versionIntroduced": "v1.65",
|
||||
"groups": "Filter",
|
||||
|
|
Loading…
Reference in New Issue
Block a user