Implement stats counting and reporting and return errors in return code

This commit is contained in:
Nick Craig-Wood 2013-01-03 22:50:00 +00:00
parent 12015b0007
commit f7197c30d7
6 changed files with 268 additions and 37 deletions

177
accounting.go Normal file
View File

@ -0,0 +1,177 @@
// Accounting and limiting reader
package main
import (
"bytes"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
)
// Globals
var (
stats = NewStats()
)
// Stringset holds some strings
type StringSet map[string]bool
// Strings returns all the strings in the StringSet
func (ss StringSet) Strings() []string {
strings := make([]string, 0, len(ss))
for k := range ss {
strings = append(strings, k)
}
return strings
}
// String returns all the strings in the StringSet joined by comma
func (ss StringSet) String() string {
return strings.Join(ss.Strings(), ", ")
}
// Stats limits and accounts all transfers
type Stats struct {
lock sync.RWMutex
bytes int64
errors int64
checks int64
checking StringSet
transfers int64
transferring StringSet
start time.Time
}
// NewStats cretates an initialised Stats
func NewStats() *Stats {
return &Stats{
checking: make(StringSet, *checkers),
transferring: make(StringSet, *transfers),
start: time.Now(),
}
}
// String convert the Stats to a string for printing
func (s *Stats) String() string {
s.lock.RLock()
defer s.lock.RUnlock()
dt := time.Now().Sub(stats.start)
dt_seconds := dt.Seconds()
speed := 0.0
if dt > 0 {
speed = float64(stats.bytes) / 1024 / dt_seconds
}
buf := &bytes.Buffer{}
fmt.Fprintf(buf, `
Transferred: %10d Bytes (%7.2f kByte/s)
Errors: %10d
Checks: %10d
Transferred: %10d
Elapsed time: %v
`,
stats.bytes, speed,
stats.errors,
stats.checks,
stats.transfers,
dt)
if len(s.checking) > 0 {
fmt.Fprintf(buf, "Checking: %s\n", s.checking)
}
if len(s.transferring) > 0 {
fmt.Fprintf(buf, "Transferring: %s\n", s.transferring)
}
return buf.String()
}
// Log outputs the Stats to the log
func (s *Stats) Log() {
log.Printf("%v\n", stats)
}
// Bytes updates the stats for bytes bytes
func (s *Stats) Bytes(bytes int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.bytes += bytes
}
// Errors updates the stats for errors
func (s *Stats) Errors(errors int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.errors += errors
}
// Error adds a single error into the stats
func (s *Stats) Error() {
s.lock.Lock()
defer s.lock.Unlock()
s.errors += 1
}
// Checking adds a check into the stats
func (s *Stats) Checking(fs FsObject) {
s.lock.Lock()
defer s.lock.Unlock()
s.checking[fs.Remote()] = true
}
// DoneChecking removes a check from the stats
func (s *Stats) DoneChecking(fs FsObject) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.checking, fs.Remote())
s.checks += 1
}
// Transferring adds a transfer into the stats
func (s *Stats) Transferring(fs FsObject) {
s.lock.Lock()
defer s.lock.Unlock()
s.transferring[fs.Remote()] = true
}
// DoneTransferring removes a transfer from the stats
func (s *Stats) DoneTransferring(fs FsObject) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.transferring, fs.Remote())
s.transfers += 1
}
// Account limits and accounts for one transfer
type Account struct {
in io.ReadCloser
bytes int64
}
// NewAccount makes a Account reader
func NewAccount(in io.ReadCloser) *Account {
return &Account{
in: in,
}
}
// Read bytes from the object - see io.Reader
func (file *Account) Read(p []byte) (n int, err error) {
n, err = file.in.Read(p)
file.bytes += int64(n)
stats.Bytes(int64(n))
if err == io.EOF {
// FIXME Do something?
}
return
}
// Close the object
func (file *Account) Close() error {
// FIXME do something?
return file.in.Close()
}
// Check it satisfies the interface
var _ io.ReadCloser = &Account{}

2
fs.go
View File

@ -82,11 +82,13 @@ func checkClose(c io.Closer, err *error) {
func CheckMd5sums(src, dst FsObject) (bool, error) {
srcMd5, err := src.Md5sum()
if err != nil {
stats.Error()
FsLog(src, "Failed to calculate src md5: %s", err)
return false, err
}
dstMd5, err := dst.Md5sum()
if err != nil {
stats.Error()
FsLog(dst, "Failed to calculate dst md5: %s", err)
return false, err
}

View File

@ -49,7 +49,7 @@ func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject
} else {
err := fs.lstat()
if err != nil {
log.Printf("Failed to stat %s: %s", path, err)
FsDebug(fs, "Failed to stat %s: %s", path, err)
return nil
}
}
@ -71,10 +71,12 @@ func (f *FsLocal) List() FsObjectsChan {
go func() {
err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error {
if err != nil {
stats.Error()
log.Printf("Failed to open directory: %s: %s", path, err)
} else {
remote, err := filepath.Rel(f.root, path)
if err != nil {
stats.Error()
log.Printf("Failed to get relative path %s: %s", path, err)
return nil
}
@ -91,6 +93,7 @@ func (f *FsLocal) List() FsObjectsChan {
return nil
})
if err != nil {
stats.Error()
log.Printf("Failed to open directory: %s: %s", f.root, err)
}
close(out)
@ -107,19 +110,21 @@ func (f *FsLocal) List() FsObjectsChan {
func (f *FsLocal) Put(src FsObject) {
dstRemote := src.Remote()
dstPath := filepath.Join(f.root, dstRemote)
log.Printf("Download %s to %s", dstRemote, dstPath)
// Temporary FsObject under construction
fs := &FsObjectLocal{remote: dstRemote, path: dstPath}
FsDebug(fs, "Download %s to %s", dstRemote, dstPath)
dir := path.Dir(dstPath)
err := os.MkdirAll(dir, 0770)
if err != nil {
stats.Error()
FsLog(fs, "Couldn't make directory: %s", err)
return
}
out, err := os.Create(dstPath)
if err != nil {
stats.Error()
FsLog(fs, "Failed to open: %s", err)
return
}
@ -131,20 +136,24 @@ func (f *FsLocal) Put(src FsObject) {
FsDebug(fs, "Removing failed download")
removeErr := os.Remove(dstPath)
if removeErr != nil {
FsLog(fs, "Failed to remove failed download: %s", err)
stats.Error()
FsLog(fs, "Failed to remove failed download: %s", removeErr)
}
}
}()
in, err := src.Open()
in0, err := src.Open()
if err != nil {
stats.Error()
FsLog(fs, "Failed to open: %s", err)
return
}
in := NewAccount(in0) // account the transfer
defer checkClose(in, &err)
_, err = io.Copy(out, in)
if err != nil {
stats.Error()
FsLog(fs, "Failed to download: %s", err)
return
}
@ -176,6 +185,7 @@ func (fs *FsObjectLocal) Remote() string {
func (fs *FsObjectLocal) Md5sum() (string, error) {
in, err := os.Open(fs.path)
if err != nil {
stats.Error()
FsLog(fs, "Failed to open: %s", err)
return "", err
}
@ -183,6 +193,7 @@ func (fs *FsObjectLocal) Md5sum() (string, error) {
hash := md5.New()
_, err = io.Copy(hash, in)
if err != nil {
stats.Error()
FsLog(fs, "Failed to read: %s", err)
return "", err
}

View File

@ -105,10 +105,12 @@ func NewFsSwift(path string) (*FsSwift, error) {
func SwiftContainers() {
c, err := swiftConnection()
if err != nil {
stats.Error()
log.Fatalf("Couldn't connect: %s", err)
}
containers, err := c.ContainersAll(nil)
if err != nil {
stats.Error()
log.Fatalf("Couldn't list containers: %s", err)
}
for _, container := range containers {
@ -162,6 +164,7 @@ func (f *FsSwift) List() FsObjectsChan {
return objects, err
})
if err != nil {
stats.Error()
log.Printf("Couldn't read container %q: %s", f.container, err)
}
close(out)
@ -174,11 +177,13 @@ func (f *FsSwift) Put(src FsObject) {
// Temporary FsObject under construction
fs := &FsObjectSwift{swift: f, remote: src.Remote()}
// FIXME content type
in, err := src.Open()
in0, err := src.Open()
if err != nil {
stats.Error()
FsLog(fs, "Failed to open: %s", err)
return
}
in := NewAccount(in0) // account the transfer
defer in.Close()
// Set the mtime
@ -187,7 +192,14 @@ func (f *FsSwift) Put(src FsObject) {
_, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders())
if err != nil {
stats.Error()
FsLog(fs, "Failed to upload: %s", err)
FsDebug(fs, "Removing failed upload")
removeErr := fs.Remove()
if removeErr != nil {
stats.Error()
FsLog(fs, "Failed to remove failed download: %s", removeErr)
}
return
}
FsDebug(fs, "Uploaded")
@ -231,7 +243,7 @@ func (fs *FsObjectSwift) readMetaData() (err error) {
}
info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote)
if err != nil {
FsLog(fs, "Failed to read info: %s", err)
FsDebug(fs, "Failed to read info: %s", err)
return err
}
meta := h.ObjectMetadata()
@ -263,12 +275,14 @@ func (fs *FsObjectSwift) ModTime() time.Time {
func (fs *FsObjectSwift) SetModTime(modTime time.Time) {
err := fs.readMetaData()
if err != nil {
stats.Error()
FsLog(fs, "Failed to read metadata: %s", err)
return
}
fs.meta.SetModTime(modTime)
err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders())
if err != nil {
stats.Error()
FsLog(fs, "Failed to update remote mtime: %s", err)
}
}

View File

@ -3,14 +3,12 @@ Todo
* Ignoring the pseudo directories
* if object.PseudoDirectory {
* fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote())
* Check logging in various parts
* Make Account wrapper
* limit bandwidth for a pool of all individual connectinos
* do timeouts by setting a limit, seeing whether io has happened
and resetting it if it has
* make Account do progress meter
* Make logging controllable with flags (mostly done)
* progress meter would be nice! Do this by wrapping the Reader with a progress bar
* Do bandwidth limit by wrapping the Reader too
* Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple
uploads or downloads.
* code.google.com/p/mxk/go1/flowcontrol - only does one flow at once
* Or maybe put into swift library.
* -timeout: Make all timeouts be settable with command line parameters
* Check the locking in swift module!
* Windows paths? Do we need to translate / and \?
@ -21,27 +19,14 @@ Ideas
* optimise remote copy container to another container using remote
copy if local is same as remote
* Allow subpaths container:/sub/path
* stats
* look at auth from env in s3 module - add to swift?
Make a wrapper in connection which
* measures bandwidth and reports it
* limits bandwidth using Reader and Writer
* for a pool of all individual connectinos
* does timeouts by setting a limit, seeing whether io has happened
and resetting it if it has
Need to make directory objects otherwise can't upload an empty directory
* Or could upload empty directories only?
* Can't purge a local filesystem because it leaves the directories behind
s3
--
Can maybe set last modified?
https://forums.aws.amazon.com/message.jspa?messageID=214062
Otherwise can set metadata
Returns etag and last modified in bucket list
* Can maybe set last modified?
* https://forums.aws.amazon.com/message.jspa?messageID=214062
* Otherwise can set metadata
* Returns etag and last modified in bucket list

View File

@ -12,6 +12,7 @@ import (
"runtime/pprof"
"strings"
"sync"
"time"
)
// Globals
@ -24,6 +25,7 @@ var (
dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes")
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
statsInterval = flag.Duration("stats", time.Minute*1, "Interval to print stats")
)
// Read FsObjects~s on in send to out if they need uploading
@ -31,8 +33,11 @@ var (
// FIXME potentially doing lots of MD5SUMS at once
func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
stats.Checking(src)
dst := fdst.NewFsObject(src.Remote())
stats.DoneChecking(src)
if dst == nil {
FsDebug(src, "Couldn't find local file - download")
out <- src
@ -56,7 +61,9 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
stats.Transferring(src)
fdst.Put(src)
stats.DoneTransferring(src)
}
}
@ -64,6 +71,7 @@ func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
func Copy(fdst, fsrc Fs) {
err := fdst.Mkdir()
if err != nil {
stats.Error()
log.Fatal("Failed to make destination")
}
@ -100,8 +108,11 @@ func DeleteFiles(to_be_deleted FsObjectsChan) {
if *dry_run {
FsDebug(dst, "Not deleting as -dry-run")
} else {
stats.Checking(dst)
err := dst.Remove()
stats.DoneChecking(dst)
if err != nil {
stats.Error()
FsLog(dst, "Couldn't delete: %s", err)
} else {
FsDebug(dst, "Deleted")
@ -119,9 +130,12 @@ func DeleteFiles(to_be_deleted FsObjectsChan) {
func Sync(fdst, fsrc Fs) {
err := fdst.Mkdir()
if err != nil {
stats.Error()
log.Fatal("Failed to make destination")
}
log.Printf("Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]FsObject)
@ -174,6 +188,8 @@ func Sync(fdst, fsrc Fs) {
// Checks the files in fsrc and fdst according to Size and MD5SUM
func Check(fdst, fsrc Fs) {
log.Printf("Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
dstFiles := make(map[string]FsObject)
@ -220,11 +236,14 @@ func Check(fdst, fsrc Fs) {
defer checkerWg.Done()
for check := range checks {
dst, src := check[0], check[1]
stats.Checking(src)
if src.Size() != dst.Size() {
stats.DoneChecking(src)
FsLog(src, "Sizes differ")
continue
}
same, err := CheckMd5sums(src, dst)
stats.DoneChecking(src)
if err != nil {
continue
}
@ -251,7 +270,9 @@ func List(f Fs) {
go func() {
defer wg.Done()
for fs := range in {
stats.Checking(fs)
modTime := fs.ModTime()
stats.DoneChecking(fs)
fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05"), fs.Remote())
}
}()
@ -272,6 +293,7 @@ func list(fdst, fsrc Fs) {
func mkdir(fdst, fsrc Fs) {
err := fdst.Mkdir()
if err != nil {
stats.Error()
log.Fatalf("Mkdir failed: %s", err)
}
}
@ -283,6 +305,7 @@ func rmdir(fdst, fsrc Fs) {
} else {
err := fdst.Rmdir()
if err != nil {
stats.Error()
log.Fatalf("Rmdir failed: %s", err)
}
}
@ -441,6 +464,7 @@ func main() {
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
stats.Error()
log.Fatal(err)
}
pprof.StartCPUProfile(f)
@ -464,12 +488,14 @@ func main() {
break
} else if strings.HasPrefix(command.name, cmd) {
if found != nil {
stats.Error()
log.Fatalf("Not unique - matches multiple commands %q", cmd)
}
found = command
}
}
if found == nil {
stats.Error()
log.Fatalf("Unknown command %q", cmd)
}
found.checkArgs(args)
@ -480,20 +506,36 @@ func main() {
if len(args) >= 1 {
fdst, err = NewFs(args[0])
if err != nil {
stats.Error()
log.Fatal("Failed to create file system: ", err)
}
}
if len(args) >= 2 {
fsrc, err = NewFs(args[1])
if err != nil {
stats.Error()
log.Fatal("Failed to create destination file system: ", err)
}
fsrc, fdst = fdst, fsrc
}
// Print the stats every statsInterval
go func() {
ch := time.Tick(*statsInterval)
for {
<-ch
stats.Log()
}
}()
// Run the actual command
if found.run != nil {
found.run(fdst, fsrc)
fmt.Println(stats)
if stats.errors > 0 {
os.Exit(1)
}
os.Exit(0)
} else {
syntaxError()
}