Implement sync, -dry-run and fix logging

* Implement sync command
  * Implement String() interface for Fs
  * Sort out logging of FsObject~s
  * Implement -dry-run, -verbose and -quiet
This commit is contained in:
Nick Craig-Wood 2012-12-31 16:40:34 +00:00
parent c15ae179ee
commit 335667fdcb
5 changed files with 173 additions and 80 deletions

42
fs.go
View File

@ -3,12 +3,15 @@
package main package main
import ( import (
"fmt"
"io" "io"
"log"
"time" "time"
) )
// A Filesystem, describes the local filesystem and the remote object store // A Filesystem, describes the local filesystem and the remote object store
type Fs interface { type Fs interface {
String() string
List() FsObjectsChan List() FsObjectsChan
NewFsObject(remote string) FsObject NewFsObject(remote string) FsObject
Put(src FsObject) Put(src FsObject)
@ -22,7 +25,6 @@ type Fs interface {
// local file/directory // local file/directory
type FsObject interface { type FsObject interface {
Remote() string Remote() string
Debugf(string, ...interface{})
Md5sum() (string, error) Md5sum() (string, error)
ModTime() (time.Time, error) ModTime() (time.Time, error)
SetModTime(time.Time) SetModTime(time.Time)
@ -47,6 +49,22 @@ func NewFs(path string) (Fs, error) {
return NewFsLocal(path) return NewFsLocal(path)
} }
// Write debuging output for this FsObject
func FsDebug(fs FsObject, text string, args ...interface{}) {
if *verbose {
out := fmt.Sprintf(text, args...)
log.Printf("%s: %s", fs.Remote(), out)
}
}
// Write log output for this FsObject
func FsLog(fs FsObject, text string, args ...interface{}) {
if !*quiet {
out := fmt.Sprintf(text, args...)
log.Printf("%s: %s", fs.Remote(), out)
}
}
// checkClose is a utility function used to check the return from // checkClose is a utility function used to check the return from
// Close in a defer statement. // Close in a defer statement.
func checkClose(c io.Closer, err *error) { func checkClose(c io.Closer, err *error) {
@ -74,22 +92,22 @@ func checkClose(c io.Closer, err *error) {
// were errors reading info. // were errors reading info.
func Equal(src, dst FsObject) bool { func Equal(src, dst FsObject) bool {
if src.Size() != dst.Size() { if src.Size() != dst.Size() {
src.Debugf("Sizes differ") FsDebug(src, "Sizes differ")
return false return false
} }
// Size the same so check the mtime // Size the same so check the mtime
srcModTime, err := src.ModTime() srcModTime, err := src.ModTime()
if err != nil { if err != nil {
src.Debugf("Failed to read src mtime: %s", err) FsDebug(src, "Failed to read src mtime: %s", err)
} else { } else {
dstModTime, err := dst.ModTime() dstModTime, err := dst.ModTime()
if err != nil { if err != nil {
dst.Debugf("Failed to read dst mtime: %s", err) FsDebug(dst, "Failed to read dst mtime: %s", err)
} else if !dstModTime.Equal(srcModTime) { } else if !dstModTime.Equal(srcModTime) {
src.Debugf("Modification times differ") FsDebug(src, "Modification times differ")
} else { } else {
src.Debugf("Size and modification time the same") FsDebug(src, "Size and modification time the same")
return true return true
} }
} }
@ -98,18 +116,18 @@ func Equal(src, dst FsObject) bool {
// check the MD5SUM // check the MD5SUM
srcMd5, err := src.Md5sum() srcMd5, err := src.Md5sum()
if err != nil { if err != nil {
src.Debugf("Failed to calculate src md5: %s", err) FsDebug(src, "Failed to calculate src md5: %s", err)
return false return false
} }
dstMd5, err := dst.Md5sum() dstMd5, err := dst.Md5sum()
if err != nil { if err != nil {
dst.Debugf("Failed to calculate dst md5: %s", err) FsDebug(dst, "Failed to calculate dst md5: %s", err)
return false return false
} }
// fs.Debugf("Src MD5 %s", srcMd5) // FsDebug("Src MD5 %s", srcMd5)
// fs.Debugf("Dst MD5 %s", obj.Hash) // FsDebug("Dst MD5 %s", obj.Hash)
if srcMd5 != dstMd5 { if srcMd5 != dstMd5 {
src.Debugf("Md5sums differ") FsDebug(src, "Md5sums differ")
return false return false
} }
@ -117,6 +135,6 @@ func Equal(src, dst FsObject) bool {
// mtime of the dst object here // mtime of the dst object here
dst.SetModTime(srcModTime) dst.SetModTime(srcModTime)
src.Debugf("Size and MD5SUM of src and dst objects identical") FsDebug(src, "Size and MD5SUM of src and dst objects identical")
return true return true
} }

View File

@ -33,6 +33,11 @@ func NewFsLocal(root string) (*FsLocal, error) {
return f, nil return f, nil
} }
// String converts this FsLocal to a string
func (f *FsLocal) String() string {
return fmt.Sprintf("Local file system at %s", f.root)
}
// Return an FsObject from a path // Return an FsObject from a path
// //
// May return nil if an error occurred // May return nil if an error occurred
@ -109,13 +114,13 @@ func (f *FsLocal) Put(src FsObject) {
dir := path.Dir(dstPath) dir := path.Dir(dstPath)
err := os.MkdirAll(dir, 0770) err := os.MkdirAll(dir, 0770)
if err != nil { if err != nil {
fs.Debugf("Couldn't make directory: %s", err) FsLog(fs, "Couldn't make directory: %s", err)
return return
} }
out, err := os.Create(dstPath) out, err := os.Create(dstPath)
if err != nil { if err != nil {
fs.Debugf("Failed to open: %s", err) FsLog(fs, "Failed to open: %s", err)
return return
} }
@ -123,31 +128,31 @@ func (f *FsLocal) Put(src FsObject) {
defer func() { defer func() {
checkClose(out, &err) checkClose(out, &err)
if err != nil { if err != nil {
fs.Debugf("Removing failed download") FsDebug(fs, "Removing failed download")
removeErr := os.Remove(dstPath) removeErr := os.Remove(dstPath)
if removeErr != nil { if removeErr != nil {
fs.Debugf("Failed to remove failed download: %s", err) FsLog(fs, "Failed to remove failed download: %s", err)
} }
} }
}() }()
in, err := src.Open() in, err := src.Open()
if err != nil { if err != nil {
fs.Debugf("Failed to open: %s", err) FsLog(fs, "Failed to open: %s", err)
return return
} }
defer checkClose(in, &err) defer checkClose(in, &err)
_, err = io.Copy(out, in) _, err = io.Copy(out, in)
if err != nil { if err != nil {
fs.Debugf("Failed to download: %s", err) FsLog(fs, "Failed to download: %s", err)
return return
} }
// Set the mtime // Set the mtime
modTime, err := src.ModTime() modTime, err := src.ModTime()
if err != nil { if err != nil {
fs.Debugf("Failed to read mtime from object: %s", err) FsDebug(fs, "Failed to read mtime from object: %s", err)
} else { } else {
fs.SetModTime(modTime) fs.SetModTime(modTime)
} }
@ -172,24 +177,18 @@ func (fs *FsObjectLocal) Remote() string {
return fs.remote return fs.remote
} }
// Write debuging output for this FsObject
func (fs *FsObjectLocal) Debugf(text string, args ...interface{}) {
out := fmt.Sprintf(text, args...)
log.Printf("%s: %s", fs.remote, out)
}
// Md5sum calculates the Md5sum of a file returning a lowercase hex string // Md5sum calculates the Md5sum of a file returning a lowercase hex string
func (fs *FsObjectLocal) Md5sum() (string, error) { func (fs *FsObjectLocal) Md5sum() (string, error) {
in, err := os.Open(fs.path) in, err := os.Open(fs.path)
if err != nil { if err != nil {
fs.Debugf("Failed to open: %s", err) FsLog(fs, "Failed to open: %s", err)
return "", err return "", err
} }
defer in.Close() // FIXME ignoring error defer in.Close() // FIXME ignoring error
hash := md5.New() hash := md5.New()
_, err = io.Copy(hash, in) _, err = io.Copy(hash, in)
if err != nil { if err != nil {
fs.Debugf("Failed to read: %s", err) FsLog(fs, "Failed to read: %s", err)
return "", err return "", err
} }
return fmt.Sprintf("%x", hash.Sum(nil)), nil return fmt.Sprintf("%x", hash.Sum(nil)), nil
@ -209,7 +208,7 @@ func (fs *FsObjectLocal) ModTime() (modTime time.Time, err error) {
func (fs *FsObjectLocal) SetModTime(modTime time.Time) { func (fs *FsObjectLocal) SetModTime(modTime time.Time) {
err := Chtimes(fs.path, modTime, modTime) err := Chtimes(fs.path, modTime, modTime)
if err != nil { if err != nil {
fs.Debugf("Failed to set mtime on file: %s", err) FsDebug(fs, "Failed to set mtime on file: %s", err)
} }
} }
@ -217,11 +216,10 @@ func (fs *FsObjectLocal) SetModTime(modTime time.Time) {
func (fs *FsObjectLocal) Storable() bool { func (fs *FsObjectLocal) Storable() bool {
mode := fs.info.Mode() mode := fs.info.Mode()
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
fs.Debugf("Can't transfer non file/directory") FsDebug(fs, "Can't transfer non file/directory")
return false return false
} else if mode&os.ModeDir != 0 { } else if mode&os.ModeDir != 0 {
// Debug? FsDebug(fs, "FIXME Skipping directory")
fs.Debugf("FIXME Skipping directory")
return false return false
} }
return true return true

View File

@ -41,6 +41,11 @@ var (
apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.")
) )
// String converts this FsSwift to a string
func (f *FsSwift) String() string {
return fmt.Sprintf("Swift container %s", f.container)
}
// Pattern to match a swift url // Pattern to match a swift url
var swiftMatch = regexp.MustCompile(`^([^/:]+):(.*)$`) var swiftMatch = regexp.MustCompile(`^([^/:]+):(.*)$`)
@ -125,7 +130,7 @@ func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObjec
} else { } else {
err := fs.readMetaData() // reads info and meta, returning an error err := fs.readMetaData() // reads info and meta, returning an error
if err != nil { if err != nil {
// logged already fs.Debugf("Failed to read info: %s", err) // logged already FsDebug("Failed to read info: %s", err)
return nil return nil
} }
} }
@ -171,7 +176,7 @@ func (f *FsSwift) Put(src FsObject) {
// FIXME content type // FIXME content type
in, err := src.Open() in, err := src.Open()
if err != nil { if err != nil {
fs.Debugf("Failed to open: %s", err) FsLog(fs, "Failed to open: %s", err)
return return
} }
defer in.Close() defer in.Close()
@ -180,17 +185,17 @@ func (f *FsSwift) Put(src FsObject) {
m := swift.Metadata{} m := swift.Metadata{}
modTime, err := src.ModTime() modTime, err := src.ModTime()
if err != nil { if err != nil {
fs.Debugf("Failed to read mtime from object: %s", err) FsDebug(fs, "Failed to read mtime from object: %s", err)
} else { } else {
m.SetModTime(modTime) m.SetModTime(modTime)
} }
_, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders()) _, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders())
if err != nil { if err != nil {
fs.Debugf("Failed to upload: %s", err) FsLog(fs, "Failed to upload: %s", err)
return return
} }
fs.Debugf("Uploaded") FsDebug(fs, "Uploaded")
} }
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
@ -212,12 +217,6 @@ func (fs *FsObjectSwift) Remote() string {
return fs.remote return fs.remote
} }
// Write debuging output for this FsObject
func (fs *FsObjectSwift) Debugf(text string, args ...interface{}) {
out := fmt.Sprintf(text, args...)
log.Printf("%s: %s", fs.remote, out)
}
// Md5sum returns the Md5sum of an object returning a lowercase hex string // Md5sum returns the Md5sum of an object returning a lowercase hex string
func (fs *FsObjectSwift) Md5sum() (string, error) { func (fs *FsObjectSwift) Md5sum() (string, error) {
return strings.ToLower(fs.info.Hash), nil return strings.ToLower(fs.info.Hash), nil
@ -237,7 +236,7 @@ func (fs *FsObjectSwift) readMetaData() (err error) {
} }
info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote) info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote)
if err != nil { if err != nil {
fs.Debugf("Failed to read info: %s", err) FsLog(fs, "Failed to read info: %s", err)
return err return err
} }
meta := h.ObjectMetadata() meta := h.ObjectMetadata()
@ -250,12 +249,12 @@ func (fs *FsObjectSwift) readMetaData() (err error) {
func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) { func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) {
err = fs.readMetaData() err = fs.readMetaData()
if err != nil { if err != nil {
fs.Debugf("Failed to read metadata: %s", err) FsLog(fs, "Failed to read metadata: %s", err)
return return
} }
modTime, err = fs.meta.GetModTime() modTime, err = fs.meta.GetModTime()
if err != nil { if err != nil {
fs.Debugf("Failed to read mtime from object: %s", err) FsLog(fs, "Failed to read mtime from object: %s", err)
return return
} }
return return
@ -265,13 +264,13 @@ func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) {
func (fs *FsObjectSwift) SetModTime(modTime time.Time) { func (fs *FsObjectSwift) SetModTime(modTime time.Time) {
err := fs.readMetaData() err := fs.readMetaData()
if err != nil { if err != nil {
fs.Debugf("Failed to read metadata: %s", err) FsLog(fs, "Failed to read metadata: %s", err)
return return
} }
fs.meta.SetModTime(modTime) fs.meta.SetModTime(modTime)
err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders()) err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders())
if err != nil { if err != nil {
fs.Debugf("Failed to update remote mtime: %s", err) FsLog(fs, "Failed to update remote mtime: %s", err)
} }
} }

View File

@ -1,17 +1,18 @@
Todo Todo
* Add sync command (like rsync with delete)
* Check logging in various parts * Check logging in various parts
* Make logging controllable with flags * Make logging controllable with flags (mostly done)
* progress meter would be nice! Do this by wrapping the Reader with a progress bar * progress meter would be nice! Do this by wrapping the Reader with a progress bar
* Do bandwidth limit by wrapping the Reader too * Do bandwidth limit by wrapping the Reader too
* Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple * Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple
uploads or downloads. uploads or downloads.
* code.google.com/p/mxk/go1/flowcontrol - only does one flow at once * code.google.com/p/mxk/go1/flowcontrol - only does one flow at once
* Or maybe put into swift library. * Or maybe put into swift library.
* Make swift timeouts be settable with command line parameters * -timeout: Make all timeouts be settable with command line parameters
* Check the locking in swift module! * Check the locking in swift module!
* Windows paths? Do we need to translate / and \? * Windows paths? Do we need to translate / and \?
* Make a fs.Errorf and count errors and log them at a different level * Make a fs.Errorf and count errors and log them at a different level
* add -modify-window flag - fs should keep knowledge of resolution
* add check command to compare local MD5SUMs with remote
Ideas Ideas
* optimise remote copy container to another container using remote * optimise remote copy container to another container using remote

View File

@ -21,6 +21,7 @@ var (
snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented
verbose = flag.Bool("verbose", false, "Print lots more stuff") verbose = flag.Bool("verbose", false, "Print lots more stuff")
quiet = flag.Bool("quiet", false, "Print as little stuff as possible") quiet = flag.Bool("quiet", false, "Print as little stuff as possible")
dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes")
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
) )
@ -33,7 +34,7 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
for src := range in { for src := range in {
dst := fdst.NewFsObject(src.Remote()) dst := fdst.NewFsObject(src.Remote())
if dst == nil { if dst == nil {
src.Debugf("Couldn't find local file - download") FsDebug(src, "Couldn't find local file - download")
out <- src out <- src
continue continue
} }
@ -44,7 +45,7 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
} }
// Check to see if changed or not // Check to see if changed or not
if Equal(src, dst) { if Equal(src, dst) {
src.Debugf("Unchanged skipping") FsDebug(src, "Unchanged skipping")
continue continue
} }
out <- src out <- src
@ -88,9 +89,87 @@ func Copy(fdst, fsrc Fs) {
copierWg.Wait() copierWg.Wait()
} }
// Copy~s from source to dest // Delete all the files passed in the channel
func copy_(fdst, fsrc Fs) { func DeleteFiles(to_be_deleted FsObjectsChan) {
Copy(fdst, fsrc) var wg sync.WaitGroup
wg.Add(*transfers)
for i := 0; i < *transfers; i++ {
go func() {
defer wg.Done()
for dst := range to_be_deleted {
if *dry_run {
FsDebug(dst, "Not deleting as -dry-run")
} else {
err := dst.Remove()
if err != nil {
FsLog(dst, "Couldn't delete: %s", err)
} else {
FsDebug(dst, "Deleted")
}
}
}
}()
}
log.Printf("Waiting for deletions to finish")
wg.Wait()
}
// Syncs fsrc into fdst
func Sync(fdst, fsrc Fs) {
err := fdst.Mkdir()
if err != nil {
log.Fatal("Failed to make destination")
}
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]FsObject)
for dstFile := range fdst.List() {
delFiles[dstFile.Remote()] = dstFile
}
// Read source files checking them off against dest files
to_be_checked := make(FsObjectsChan, *transfers)
go func() {
for srcFile := range fsrc.List() {
delete(delFiles, srcFile.Remote())
to_be_checked <- srcFile
}
close(to_be_checked)
}()
to_be_uploaded := make(FsObjectsChan, *transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(*transfers)
for i := 0; i < *transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for transfers to finish")
copierWg.Wait()
// FIXME don't delete if IO errors
// Delete the spare files
toDelete := make(FsObjectsChan, *transfers)
go func() {
for _, fs := range delFiles {
toDelete <- fs
}
close(toDelete)
}()
DeleteFiles(toDelete)
} }
// List the Fs to stdout // List the Fs to stdout
@ -129,37 +208,21 @@ func mkdir(fdst, fsrc Fs) {
// Removes a container but not if not empty // Removes a container but not if not empty
func rmdir(fdst, fsrc Fs) { func rmdir(fdst, fsrc Fs) {
if *dry_run {
log.Printf("Not deleting %s as -dry-run", fdst)
} else {
err := fdst.Rmdir() err := fdst.Rmdir()
if err != nil { if err != nil {
log.Fatalf("Rmdir failed: %s", err) log.Fatalf("Rmdir failed: %s", err)
} }
}
} }
// Removes a container and all of its contents // Removes a container and all of its contents
// //
// FIXME doesn't delete local directories // FIXME doesn't delete local directories
func purge(fdst, fsrc Fs) { func purge(fdst, fsrc Fs) {
to_be_deleted := fdst.List() DeleteFiles(fdst.List())
var wg sync.WaitGroup
wg.Add(*transfers)
for i := 0; i < *transfers; i++ {
go func() {
defer wg.Done()
for dst := range to_be_deleted {
err := dst.Remove()
if err != nil {
log.Printf("%s: Couldn't delete: %s\n", dst.Remote(), err)
} else {
log.Printf("%s: Deleted\n", dst.Remote())
}
}
}()
}
log.Printf("Waiting for deletions to finish")
wg.Wait()
log.Printf("Deleting path") log.Printf("Deleting path")
rmdir(fdst, fsrc) rmdir(fdst, fsrc)
} }
@ -194,7 +257,21 @@ var Commands = []Command{
MD5SUM. Doesn't delete files from the destination. MD5SUM. Doesn't delete files from the destination.
`, `,
copy_, Copy,
2, 2,
},
{
"sync",
`<source> <destination>
Sync the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test
first with the -dry-run flag.
`,
Sync,
2, 2, 2, 2,
}, },
{ {