mirror of
https://github.com/rclone/rclone.git
synced 2024-11-25 09:41:44 +08:00
b2: Include custom upload headers in large file info - fixes #7744
This commit is contained in:
parent
495a5759d3
commit
56caab2033
|
@ -306,6 +306,7 @@ type Object struct {
|
|||
sha1 string // SHA-1 hash if known
|
||||
size int64 // Size of the object
|
||||
mimeType string // Content-Type of the object
|
||||
meta map[string]string // The object metadata if known - may be nil - with lower case keys
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
@ -1593,7 +1594,14 @@ func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp
|
|||
o.size = Size
|
||||
// Use the UploadTimestamp if can't get file info
|
||||
o.modTime = time.Time(UploadTimestamp)
|
||||
return o.parseTimeString(Info[timeKey])
|
||||
err = o.parseTimeString(Info[timeKey])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// For now, just set "mtime" in metadata
|
||||
o.meta = make(map[string]string, 1)
|
||||
o.meta["mtime"] = o.modTime.Format(time.RFC3339Nano)
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeMetaData sets the metadata in the object from an api.File
|
||||
|
@ -1695,6 +1703,16 @@ func timeString(modTime time.Time) string {
|
|||
return strconv.FormatInt(modTime.UnixNano()/1e6, 10)
|
||||
}
|
||||
|
||||
// parseTimeStringHelper converts a decimal string number of milliseconds
|
||||
// elapsed since January 1, 1970 UTC into a time.Time
|
||||
func parseTimeStringHelper(timeString string) (time.Time, error) {
|
||||
unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return time.Unix(unixMilliseconds/1e3, (unixMilliseconds%1e3)*1e6).UTC(), nil
|
||||
}
|
||||
|
||||
// parseTimeString converts a decimal string number of milliseconds
|
||||
// elapsed since January 1, 1970 UTC into a time.Time and stores it in
|
||||
// the modTime variable.
|
||||
|
@ -1702,12 +1720,12 @@ func (o *Object) parseTimeString(timeString string) (err error) {
|
|||
if timeString == "" {
|
||||
return nil
|
||||
}
|
||||
unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64)
|
||||
modTime, err := parseTimeStringHelper(timeString)
|
||||
if err != nil {
|
||||
fs.Debugf(o, "Failed to parse mod time string %q: %v", timeString, err)
|
||||
return nil
|
||||
}
|
||||
o.modTime = time.Unix(unixMilliseconds/1e3, (unixMilliseconds%1e3)*1e6).UTC()
|
||||
o.modTime = modTime
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1861,6 +1879,14 @@ func (o *Object) getOrHead(ctx context.Context, method string, options []fs.Open
|
|||
ContentType: resp.Header.Get("Content-Type"),
|
||||
Info: Info,
|
||||
}
|
||||
|
||||
// Embryonic metadata support - just mtime
|
||||
o.meta = make(map[string]string, 1)
|
||||
modTime, err := parseTimeStringHelper(info.Info[timeKey])
|
||||
if err == nil {
|
||||
o.meta["mtime"] = modTime.Format(time.RFC3339Nano)
|
||||
}
|
||||
|
||||
// When reading files from B2 via cloudflare using
|
||||
// --b2-download-url cloudflare strips the Content-Length
|
||||
// headers (presumably so it can inject stuff) so use the old
|
||||
|
@ -1958,7 +1984,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
|
||||
if err == nil {
|
||||
fs.Debugf(o, "File is big enough for chunked streaming")
|
||||
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
|
||||
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil, options...)
|
||||
if err != nil {
|
||||
o.fs.putRW(rw)
|
||||
return err
|
||||
|
@ -1990,7 +2016,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return o.decodeMetaDataFileInfo(up.info)
|
||||
}
|
||||
|
||||
modTime := src.ModTime(ctx)
|
||||
modTime, err := o.getModTime(ctx, src, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
calculatedSha1, _ := src.Hash(ctx, hash.SHA1)
|
||||
if calculatedSha1 == "" {
|
||||
|
@ -2095,6 +2124,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return o.decodeMetaDataFileInfo(&response)
|
||||
}
|
||||
|
||||
// Get modTime from the source; if --metadata is set, fetch the src metadata and get it from there.
|
||||
// When metadata support is added to b2, this method will need a more generic name
|
||||
func (o *Object) getModTime(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (time.Time, error) {
|
||||
modTime := src.ModTime(ctx)
|
||||
|
||||
// Fetch metadata if --metadata is in use
|
||||
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("failed to read metadata from source object: %w", err)
|
||||
}
|
||||
// merge metadata into request and user metadata
|
||||
for k, v := range meta {
|
||||
k = strings.ToLower(k)
|
||||
// For now, the only metadata we're concerned with is "mtime"
|
||||
switch k {
|
||||
case "mtime":
|
||||
// mtime in meta overrides source ModTime
|
||||
metaModTime, err := time.Parse(time.RFC3339Nano, v)
|
||||
if err != nil {
|
||||
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
|
||||
} else {
|
||||
modTime = metaModTime
|
||||
}
|
||||
default:
|
||||
// Do nothing for now
|
||||
}
|
||||
}
|
||||
return modTime, nil
|
||||
}
|
||||
|
||||
// OpenChunkWriter returns the chunk size and a ChunkWriter
|
||||
//
|
||||
// Pass in the remote and the src object
|
||||
|
@ -2126,7 +2185,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
|||
Concurrency: o.fs.opt.UploadConcurrency,
|
||||
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
|
||||
}
|
||||
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil)
|
||||
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
|
||||
return info, up, err
|
||||
}
|
||||
|
||||
|
|
|
@ -184,16 +184,64 @@ func TestParseTimeString(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
// This is adapted from the s3 equivalent.
|
||||
func (f *Fs) InternalTestMetadata(t *testing.T) {
|
||||
// Return a map of the headers in the options with keys stripped of the "x-bz-info-" prefix
|
||||
func OpenOptionToMetaData(options []fs.OpenOption) map[string]string {
|
||||
var headers = make(map[string]string)
|
||||
for _, option := range options {
|
||||
k, v := option.Header()
|
||||
k = strings.ToLower(k)
|
||||
if strings.HasPrefix(k, headerPrefix) {
|
||||
headers[k[len(headerPrefix):]] = v
|
||||
}
|
||||
}
|
||||
|
||||
return headers
|
||||
}
|
||||
|
||||
func (f *Fs) internalTestMetadata(t *testing.T, size string, uploadCutoff string, chunkSize string) {
|
||||
what := fmt.Sprintf("Size%s/UploadCutoff%s/ChunkSize%s", size, uploadCutoff, chunkSize)
|
||||
t.Run(what, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
original := random.String(1000)
|
||||
|
||||
ss := fs.SizeSuffix(0)
|
||||
err := ss.Set(size)
|
||||
require.NoError(t, err)
|
||||
original := random.String(int(ss))
|
||||
|
||||
contents := fstest.Gz(t, original)
|
||||
mimeType := "text/html"
|
||||
|
||||
if chunkSize != "" {
|
||||
ss := fs.SizeSuffix(0)
|
||||
err := ss.Set(chunkSize)
|
||||
require.NoError(t, err)
|
||||
_, err = f.SetUploadChunkSize(ss)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if uploadCutoff != "" {
|
||||
ss := fs.SizeSuffix(0)
|
||||
err := ss.Set(uploadCutoff)
|
||||
require.NoError(t, err)
|
||||
_, err = f.SetUploadCutoff(ss)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
item := fstest.NewItem("test-metadata", contents, fstest.Time("2001-05-06T04:05:06.499Z"))
|
||||
btime := time.Now()
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, mimeType, nil)
|
||||
metadata := fs.Metadata{
|
||||
// Just mtime for now - limit to milliseconds since x-bz-info-src_last_modified_millis can't support any
|
||||
|
||||
"mtime": "2009-05-06T04:05:06.499Z",
|
||||
}
|
||||
|
||||
// Need to specify HTTP options with the header prefix since they are passed as-is
|
||||
options := []fs.OpenOption{
|
||||
&fs.HTTPOption{Key: "X-Bz-Info-a", Value: "1"},
|
||||
&fs.HTTPOption{Key: "X-Bz-Info-b", Value: "2"},
|
||||
}
|
||||
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, true, contents, true, mimeType, metadata, options...)
|
||||
defer func() {
|
||||
assert.NoError(t, obj.Remove(ctx))
|
||||
}()
|
||||
|
@ -201,7 +249,19 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
|
|||
gotMetadata, err := o.getMetaData(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We currently have a limited amount of metadata to test with B2
|
||||
// X-Bz-Info-a & X-Bz-Info-b
|
||||
optMetadata := OpenOptionToMetaData(options)
|
||||
for k, v := range optMetadata {
|
||||
got := gotMetadata.Info[k]
|
||||
assert.Equal(t, v, got, k)
|
||||
}
|
||||
|
||||
// mtime
|
||||
for k, v := range metadata {
|
||||
got := o.meta[k]
|
||||
assert.Equal(t, v, got, k)
|
||||
}
|
||||
|
||||
assert.Equal(t, mimeType, gotMetadata.ContentType, "Content-Type")
|
||||
|
||||
// Modification time from the x-bz-info-src_last_modified_millis header
|
||||
|
@ -233,6 +293,15 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
|
|||
checkDownload(contents, int64(len(contents)), sha1Sum(t, contents))
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) InternalTestMetadata(t *testing.T) {
|
||||
// 1 kB regular file
|
||||
f.internalTestMetadata(t, "1kiB", "", "")
|
||||
|
||||
// 10 MiB large file
|
||||
f.internalTestMetadata(t, "10MiB", "6MiB", "6MiB")
|
||||
}
|
||||
|
||||
func sha1Sum(t *testing.T, s string) string {
|
||||
|
|
|
@ -91,7 +91,7 @@ type largeUpload struct {
|
|||
// newLargeUpload starts an upload of object o from in with metadata in src
|
||||
//
|
||||
// If newInfo is set then metadata from that will be used instead of reading it from src
|
||||
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) {
|
||||
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File, options ...fs.OpenOption) (up *largeUpload, err error) {
|
||||
size := src.Size()
|
||||
parts := 0
|
||||
chunkSize := defaultChunkSize
|
||||
|
@ -104,11 +104,6 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
|
|||
parts++
|
||||
}
|
||||
}
|
||||
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: "/b2_start_large_file",
|
||||
}
|
||||
bucket, bucketPath := o.split()
|
||||
bucketID, err := f.getBucketID(ctx, bucket)
|
||||
if err != nil {
|
||||
|
@ -118,12 +113,27 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
|
|||
BucketID: bucketID,
|
||||
Name: f.opt.Enc.FromStandardPath(bucketPath),
|
||||
}
|
||||
optionsToSend := make([]fs.OpenOption, 0, len(options))
|
||||
if newInfo == nil {
|
||||
modTime := src.ModTime(ctx)
|
||||
modTime, err := o.getModTime(ctx, src, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
request.ContentType = fs.MimeType(ctx, src)
|
||||
request.Info = map[string]string{
|
||||
timeKey: timeString(modTime),
|
||||
}
|
||||
// Custom upload headers - remove header prefix since they are sent in the body
|
||||
for _, option := range options {
|
||||
k, v := option.Header()
|
||||
k = strings.ToLower(k)
|
||||
if strings.HasPrefix(k, headerPrefix) {
|
||||
request.Info[k[len(headerPrefix):]] = v
|
||||
} else {
|
||||
optionsToSend = append(optionsToSend, option)
|
||||
}
|
||||
}
|
||||
// Set the SHA1 if known
|
||||
if !o.fs.opt.DisableCheckSum || doCopy {
|
||||
if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" {
|
||||
|
@ -134,6 +144,11 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
|
|||
request.ContentType = newInfo.ContentType
|
||||
request.Info = newInfo.Info
|
||||
}
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: "/b2_start_large_file",
|
||||
Options: optionsToSend,
|
||||
}
|
||||
var response api.StartLargeFileResponse
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
|
||||
|
|
|
@ -59,7 +59,7 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
|
|||
//"utime" - read-only
|
||||
//"content-type" - read-only
|
||||
}
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, "text/html", metadata)
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, false, contents, true, "text/html", metadata)
|
||||
defer func() {
|
||||
assert.NoError(t, obj.Remove(ctx))
|
||||
}()
|
||||
|
|
|
@ -379,7 +379,7 @@ func (f *Fs) putWithMeta(ctx context.Context, t *testing.T, file *fstest.Item, p
|
|||
}
|
||||
|
||||
expectedMeta.Set("permissions", marshalPerms(t, perms))
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, file, content, true, "plain/text", expectedMeta)
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, file, false, content, true, "plain/text", expectedMeta)
|
||||
do, ok := obj.(fs.Metadataer)
|
||||
require.True(t, ok)
|
||||
actualMeta, err := do.Metadata(ctx)
|
||||
|
|
|
@ -58,7 +58,7 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
|
|||
// "tier" - read only
|
||||
// "btime" - read only
|
||||
}
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, "text/html", metadata)
|
||||
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, true, contents, true, "text/html", metadata)
|
||||
defer func() {
|
||||
assert.NoError(t, obj.Remove(ctx))
|
||||
}()
|
||||
|
|
|
@ -151,7 +151,7 @@ func retry(t *testing.T, what string, f func() error) {
|
|||
// It uploads the object with the mimeType and metadata passed in if set.
|
||||
//
|
||||
// It returns the object which will have been checked if check is set
|
||||
func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, contents string, check bool, mimeType string, metadata fs.Metadata) fs.Object {
|
||||
func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, useFileHashes bool, contents string, check bool, mimeType string, metadata fs.Metadata, options ...fs.OpenOption) fs.Object {
|
||||
var (
|
||||
err error
|
||||
obj fs.Object
|
||||
|
@ -163,7 +163,13 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
|
|||
in := io.TeeReader(buf, uploadHash)
|
||||
|
||||
file.Size = int64(buf.Len())
|
||||
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
|
||||
// The caller explicitly indicates whether the hashes in the file parameter should be used. If hashes is nil,
|
||||
// then NewStaticObjectInfo will calculate default hashes for use in the check.
|
||||
hashes := file.Hashes
|
||||
if !useFileHashes {
|
||||
hashes = nil
|
||||
}
|
||||
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, hashes, nil)
|
||||
if mimeType != "" || metadata != nil {
|
||||
// force the --metadata flag on temporarily
|
||||
if metadata != nil {
|
||||
|
@ -176,7 +182,7 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
|
|||
}
|
||||
obji.WithMetadata(metadata).WithMimeType(mimeType)
|
||||
}
|
||||
obj, err = f.Put(ctx, in, obji)
|
||||
obj, err = f.Put(ctx, in, obji, options...)
|
||||
return err
|
||||
})
|
||||
file.Hashes = uploadHash.Sums()
|
||||
|
@ -198,19 +204,22 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
|
|||
|
||||
// PutTestContents puts file with given contents to the remote and checks it but unlike TestPutLarge doesn't remove
|
||||
func PutTestContents(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, contents string, check bool) fs.Object {
|
||||
return PutTestContentsMetadata(ctx, t, f, file, contents, check, "", nil)
|
||||
return PutTestContentsMetadata(ctx, t, f, file, false, contents, check, "", nil)
|
||||
}
|
||||
|
||||
// testPut puts file with random contents to the remote
|
||||
func testPut(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item) (string, fs.Object) {
|
||||
contents := random.String(100)
|
||||
return contents, PutTestContents(ctx, t, f, file, contents, true)
|
||||
return testPutMimeType(ctx, t, f, file, "", nil)
|
||||
}
|
||||
|
||||
// testPutMimeType puts file with random contents to the remote and the mime type given
|
||||
func testPutMimeType(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, mimeType string, metadata fs.Metadata) (string, fs.Object) {
|
||||
contents := random.String(100)
|
||||
return contents, PutTestContentsMetadata(ctx, t, f, file, contents, true, mimeType, metadata)
|
||||
// We just generated new contents, but file may contain hashes generated by a previous operation
|
||||
if len(file.Hashes) > 0 {
|
||||
file.Hashes = make(map[hash.Type]string)
|
||||
}
|
||||
return contents, PutTestContentsMetadata(ctx, t, f, file, false, contents, true, mimeType, metadata)
|
||||
}
|
||||
|
||||
// testPutLarge puts file to the remote, checks it and removes it on success.
|
||||
|
@ -1284,15 +1293,15 @@ func Run(t *testing.T, opt *Opt) {
|
|||
const dstName = "test metadata copied.txt"
|
||||
t1 := fstest.Time("2003-02-03T04:05:06.499999999Z")
|
||||
t2 := fstest.Time("2004-03-03T04:05:06.499999999Z")
|
||||
fileSrc := fstest.NewItem(srcName, srcName, t1)
|
||||
contents := random.String(100)
|
||||
fileSrc := fstest.NewItem(srcName, contents, t1)
|
||||
var testMetadata = fs.Metadata{
|
||||
// System metadata supported by all backends
|
||||
"mtime": t1.Format(time.RFC3339Nano),
|
||||
// User metadata
|
||||
"potato": "jersey",
|
||||
}
|
||||
oSrc := PutTestContentsMetadata(ctx, t, f, &fileSrc, contents, true, "text/plain", testMetadata)
|
||||
oSrc := PutTestContentsMetadata(ctx, t, f, &fileSrc, false, contents, true, "text/plain", testMetadata)
|
||||
fstest.CheckEntryMetadata(ctx, t, f, oSrc, testMetadata)
|
||||
|
||||
// Copy it with --metadata-set
|
||||
|
@ -1401,7 +1410,7 @@ func Run(t *testing.T, opt *Opt) {
|
|||
// User metadata
|
||||
"potato": "jersey",
|
||||
}
|
||||
o := PutTestContentsMetadata(ctx, t, f, &file, contents, true, "text/plain", testMetadata)
|
||||
o := PutTestContentsMetadata(ctx, t, f, &file, false, contents, true, "text/plain", testMetadata)
|
||||
fstest.CheckEntryMetadata(ctx, t, f, o, testMetadata)
|
||||
|
||||
// Move it with --metadata-set
|
||||
|
|
Loading…
Reference in New Issue
Block a user