From f9749638e3393f471d7e28362795689bf37cc023 Mon Sep 17 00:00:00 2001 From: Stefan Majewsky Date: Mon, 30 Apr 2018 14:14:56 +0200 Subject: revamp the LargeObject API I thought about this some more, and I believe the Writer-based approach in the previous version of the LargeObject API does not scale: It makes it very hard to write code that uploads segments without resorting to a buffer the same size as the segments. I don't want gigabyte-scale buffers filling up my RAM, so this commit switches to a different API based on Readers. LargeObject.Append() now behaves very similar to Object.Upload(), which I find quite nice. --- largeobject.go | 551 +++++++++++++++++++++++++----------------- largeobject_test.go | 50 +++- object.go | 75 ++++-- tests/bulk_delete_test.go | 2 +- tests/field_test.go | 2 +- tests/largeobject_test.go | 174 ++++++------- tests/object_iterator_test.go | 2 +- tests/object_test.go | 22 +- 8 files changed, 524 insertions(+), 354 deletions(-) diff --git a/largeobject.go b/largeobject.go index 4175a70..a39b6c0 100644 --- a/largeobject.go +++ b/largeobject.go @@ -19,7 +19,6 @@ package schwift import ( - "bufio" "bytes" "crypto/md5" "encoding/base64" @@ -27,6 +26,7 @@ import ( "encoding/json" "errors" "fmt" + "hash" "io" "math" "net/http" @@ -35,6 +35,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/jpillora/longestcommon" ) @@ -78,65 +79,141 @@ type sloSegmentInfo struct { DataBase64 string `json:"data,omitempty"` } -//LargeObjectOpenMode is a set of flags that can be given to -//LargeObject.Open(). -type LargeObjectOpenMode int - -const ( - //OpenTruncate indicates that all existing segments in this object shall be - //deleted by Open(). - OpenTruncate LargeObjectOpenMode = 0 - //OpenAppend indicates that Open() shall set up the writer to append new - //content to the existing segments. - OpenAppend LargeObjectOpenMode = 1 << 0 - //OpenKeepSegments indicates that, when truncating an existing object, the - //segments shall not be deleted even though they are no longer referenced by - //this object. This flag has no effect when combined with OpenAppend. - OpenKeepSegments LargeObjectOpenMode = 1 << 1 -) - //LargeObjectStrategy is an enum of segmenting strategies supported by Swift. type LargeObjectStrategy int +//NOTE: No valid value for LargeObjectStrategy has the numeric value 0. That +//way, we can change the default strategy in a later version of Schwift, also +//possibly depending on the server capabilities. A numeric value of 0 is +//recognized by all functions taking a SegmentingOptions. const ( //StaticLargeObject is the default LargeObjectStrategy used by Schwift. - StaticLargeObject LargeObjectStrategy = iota + StaticLargeObject LargeObjectStrategy = iota + 1 //DynamicLargeObject is an older LargeObjectStrategy that is not recommended //for new applications because of eventual consistency problems and missing //support for several newer features (e.g. data segments, range specifications). DynamicLargeObject ) +//SegmentingOptions describes how an object is segmented. It is passed to +//Object.AsNewLargeObject(), and also to Object.Upload() when desired. +// +//If Strategy is not set, a reasonable strategy is chosen. Right now, this is +//always StaticLargeObject, but the choice may change in future relases, or may +//start to depend on the Account.Capabilities(). +// +//SegmentContainer must not be nil. A value of nil will cause the consuming +//method to panic. If the SegmentContainer is not in the same account as the +//large object, ErrAccountMismatch is returned from the method consuming the +//SegmentingOptions. +// +//If SegmentPrefix is empty, a reasonable default will be computed by +//Object.AsNewLargeObject(), using the format +//"//", where strategy is either "slo" or +//"dlo". +type SegmentingOptions struct { + Strategy LargeObjectStrategy + SegmentContainer *Container + SegmentPrefix string +} + //////////////////////////////////////////////////////////////////////////////// //LargeObject is a wrapper for type Object that performs operations specific to -//large objects. +//large objects, i.e. those objects which are uploaded in segments rather than +//all at once. It can be constructed with the Object.AsLargeObject() and +//Object.AsNewLargeObject() methods. +// +//The following example shows how to upload a large file from the filesystem to +//Swift (error handling elided for brevity): +// +// file, err := os.Open(sourcePath) +// segmentContainer, err := account.Container("segments").EnsureExists() +// +// lo, err := o.AsNewLargeObject(schwift.SegmentingOptions { +// SegmentContainer: segmentContainer, +// //use defaults for everything else +// }, &schwift.TruncateOptions { +// //if there's already a large object here, clean it up +// DeleteSegments: true, +// }) // -//This type should only be constructed through the Object.AsLargeObject() -//method. If the object does not exist yet, the SegmentContainerName and -//SegmentPrefix must be specified before this object can be written to, and the -//Strategy can be adjusted in the unlikely case that an SLO is not desired. +// err = lo.Append(contents, 1<<30) // 1<30 bytes = 1 GiB per segment +// err = lo.WriteManifest(nil) +// +//Append() has a more low-level counterpart, AddSegment(). Both methods can be +//freely intermixed. AddSegment() is useful when you want to control the +//segments' metadata or use advanced features like range segments or data +//segments; see documentation over there. +// +//Writing to a large object must always be concluded by a call to +//WriteManifest() to link the new segments to the large object on the server +//side. type LargeObject struct { - Object *Object - SegmentContainer *Container - SegmentPrefix string - Strategy LargeObjectStrategy - //This is private so that we can later optimize this to load the segments - //only on demand. - segments []SegmentInfo + object *Object + segmentContainer *Container + segmentPrefix string + strategy LargeObjectStrategy + segments []SegmentInfo } -//AsLargeObject prepares a LargeObject instance. If the given object exists, -//but is not a large object, ErrNotLarge will be returned. If the given object -//does not yet exist, the SegmentContainer and SegmentPrefix attributes need to -//be filled in before the LargeObject can be used. +//Object returns the location of this large object (where its manifest is stored). +func (lo *LargeObject) Object() *Object { + return lo.object +} + +//SegmentContainer returns the container in which this object's segments are +//stored. For static large objects, some segments may also be located in +//different containers. +func (lo *LargeObject) SegmentContainer() *Container { + return lo.segmentContainer +} + +//SegmentPrefix returns the prefix shared by the names of all segments of this +//object. For static large objects, some segments may not be located in this +//prefix. +func (lo *LargeObject) SegmentPrefix() string { + return lo.segmentPrefix +} + +//Strategy returns the LargeObjectStrategy used by this object. +func (lo *LargeObject) Strategy() LargeObjectStrategy { + return lo.strategy +} + +//Segments returns a list of all segments for this object, in order. +func (lo *LargeObject) Segments() ([]SegmentInfo, error) { + //NOTE: This method has an error return value because we might later switch + //to loading segments lazily inside this method. + return lo.segments, nil +} + +func (lo *LargeObject) segmentObjects() []*Object { + seen := make(map[string]bool) + result := make([]*Object, 0, len(lo.segments)) + for _, segment := range lo.segments { + if segment.Object == nil { //can happen because of data segments + continue + } + fullName := segment.Object.FullName() + if !seen[fullName] { + result = append(result, segment.Object) + } + seen[fullName] = true + } + return result +} + +//AsLargeObject opens an existing large object. If the given object does not +//exist, or if it is not a large object, ErrNotLarge will be returned. In this +//case, Object.AsNewLargeObject() needs to be used instead. func (o *Object) AsLargeObject() (*LargeObject, error) { exists, err := o.Exists() if err != nil { return nil, err } if !exists { - return &LargeObject{Object: o, Strategy: StaticLargeObject}, nil + return nil, ErrNotLarge } h := o.headers @@ -156,14 +233,14 @@ func (o *Object) asDLO(manifestStr string) (*LargeObject, error) { } lo := &LargeObject{ - Object: o, - SegmentContainer: o.c.a.Container(manifest[0]), - SegmentPrefix: manifest[1], - Strategy: DynamicLargeObject, + object: o, + segmentContainer: o.c.a.Container(manifest[0]), + segmentPrefix: manifest[1], + strategy: DynamicLargeObject, } - iter := lo.SegmentContainer.Objects() - iter.Prefix = lo.SegmentPrefix + iter := lo.segmentContainer.Objects() + iter.Prefix = lo.segmentPrefix segmentInfos, err := iter.CollectDetailed() if err != nil { return nil, err @@ -198,8 +275,8 @@ func (o *Object) asSLO() (*LargeObject, error) { } lo := &LargeObject{ - Object: o, - Strategy: StaticLargeObject, + object: o, + strategy: StaticLargeObject, } if len(data) == 0 { return lo, nil @@ -255,7 +332,7 @@ func (o *Object) asSLO() (*LargeObject, error) { maxVotes = votes } } - lo.SegmentContainer = lo.Object.c.a.Container(maxName) + lo.segmentContainer = lo.object.c.a.Container(maxName) //choose the SegmentPrefix as the longest common prefix of all segments in //the chosen SegmentContainer... @@ -269,13 +346,13 @@ func (o *Object) asSLO() (*LargeObject, error) { names = append(names, s.Object.Name()) } } - lo.SegmentPrefix = longestcommon.Prefix(names) + lo.segmentPrefix = longestcommon.Prefix(names) //..BUT if the prefix is a path with slashes, do not consider the part after //the last slash; e.g. if we have segments "foo/bar/0001" and "foo/bar/0002", //the longest common prefix is "foo/bar/000", but we actually want "foo/bar/" - if strings.Contains(lo.SegmentPrefix, "/") { - lo.SegmentPrefix = path.Dir(lo.SegmentPrefix) + "/" + if strings.Contains(lo.segmentPrefix, "/") { + lo.segmentPrefix = path.Dir(lo.segmentPrefix) + "/" } return lo, nil @@ -317,85 +394,100 @@ func parseHTTPRange(str string) (offsetVal int64, lengthVal uint64, ok bool) { return int64(firstByte), lastByte - firstByte + 1, true } -//Open returns an io.WriteCloser that can be used to replace or extend the -//contents of this large object. -// -//This call returns ErrNoContainerName if o.SegmentContainer is not set, or -//ErrAccountMismatch if it is not in the same account as the large object. -//For existing objects, SegmentContainer and SegmentPrefix will be filled by -//Object.AsLargeObject(). For new objects, they need to be filled by the -//caller. -// -//WARNING: Every call to Write() on the returned writer will create a new -//segment. To ensure a uniform segment size, wrap the writer returned from this -//call in a bufio.Writer, for example by using the schwift.SetSegmentSize() -//convenience function: -// -// dlo, err := account.Container("public").Object("archive27.zip").AsLargeObject() -// dlo.SegmentContainer = account.Container("segments") -// dlo.SegmentPrefix = "archive27/" -// w, err := dlo.Open(schwift.OpenTruncate) -// w, err = schwift.SetSegmentSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB -// _, err = bw.Write(archiveContents) -// err = w.Close() -// -func (lo *LargeObject) Open(mode LargeObjectOpenMode) (io.WriteCloser, error) { - if lo.SegmentContainer == nil { - return nil, ErrNoContainerName - } - if !lo.SegmentContainer.a.isEqualTo(lo.Object.c.a) { - return nil, ErrAccountMismatch - } - - if mode&OpenAppend == 0 { - if mode&OpenKeepSegments == 0 { - _, _, err := lo.Object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) +//AsNewLargeObject opens an object as a large object. SegmentingOptions are +//always required, see the documentation on type SegmentingOptions for details. +// +//This function can be used regardless of whether the object exists or not. +//If the object exists and is a large object, this function behaves like +//Object.AsLargeObject() followed by Truncate(), but segmenting options are +//initialized from the method's SegmentingOptions argument rather than from the +//existing manifest. +func (o *Object) AsNewLargeObject(sopts SegmentingOptions, topts *TruncateOptions) (*LargeObject, error) { + //we only need to load the existing large object if we want to do something + //with the old segments + if topts != nil && topts.DeleteSegments { + lo, err := o.AsLargeObject() + switch err { + case nil: + err := lo.Truncate(topts) if err != nil { return nil, err } + case ErrNotLarge: + //not an error, continue down below + default: + return nil, err //unexpected error } - lo.segments = nil } - return largeObjectWriter{lo}, nil -} + lo := &LargeObject{object: o} -//Segments returns a list of all segments for this object, in order. -func (lo *LargeObject) Segments() ([]SegmentInfo, error) { - //NOTE: This method has an error return value because we might later switch - //to loading segments lazily inside this method. - return lo.segments, nil -} + //validate segment container + lo.segmentContainer = sopts.SegmentContainer + if sopts.SegmentContainer == nil { + panic("missing value for sopts.SegmentingContainer") + } + if !sopts.SegmentContainer.a.isEqualTo(o.c.a) { + return nil, ErrAccountMismatch + } -func (lo *LargeObject) segmentObjects() []*Object { - seen := make(map[string]bool) - result := make([]*Object, 0, len(lo.segments)) - for _, segment := range lo.segments { - if segment.Object == nil { //can happen because of data segments - continue - } - fullName := segment.Object.FullName() - if !seen[fullName] { - result = append(result, segment.Object) + //apply default value for strategy + if sopts.Strategy == 0 { + lo.strategy = StaticLargeObject + } else { + lo.strategy = sopts.Strategy + } + + //apply default value for segmenting prefix + lo.segmentPrefix = sopts.SegmentPrefix + if lo.segmentPrefix == "" { + now := time.Now() + strategyStr := "slo" + if lo.strategy == DynamicLargeObject { + strategyStr = "dlo" } - seen[fullName] = true + + lo.segmentPrefix = fmt.Sprintf("%s/%s/%d.%09d", + o.Name(), strategyStr, now.Unix(), now.Nanosecond(), + ) } - return result + return lo, nil +} + +//TruncateOptions contains options that can be passed to LargeObject.Truncate() +//and Object.AsNewLargeObject(). +type TruncateOptions struct { + //When truncating a large object's manifest, delete its segments. + //This will cause Truncate() to call into BulkDelete(), so a BulkError may be + //returned. If this is false, the segments will not be deleted even though + //they may not be referenced by any large object anymore. + DeleteSegments bool +} + +//Truncate removes all segments from a large object's manifest. The manifest is +//not written by this call, so WriteManifest() usually needs to be called +//afterwards. +func (lo *LargeObject) Truncate(opts *TruncateOptions) error { + _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) + if err == nil { + lo.segments = nil + } + return err } //NextSegmentObject suggests where to upload the next segment. // -//WARNING: This is a low-level function. Most callers will want to use the -//io.WriteCloser provided by Open(). You will only need to upload segments -//manually when you want to control the segments' metadata. +//WARNING: This is a low-level function. Most callers will want to use +//Append(). You will only need to upload segments manually when you want to +//control the segments' metadata. // //If the name of the current final segment ends with a counter, that counter is //incremented, otherwise a counter is appended to its name. When looking for a //counter in an existing segment name, the regex /[0-9]+$/ is used. For example, //given: // -// segments := lo.Segments() +// segments := lo.segments() // lastSegmentName := segments[len(segments)-1].Name() // nextSegmentName := lo.NextSegmentObject().Name() // @@ -405,12 +497,12 @@ func (lo *LargeObject) segmentObjects() []*Object { //"segments/archive/first0000000000000001". // //However, the last segment's name will only be considered if it lies within -//lo.SegmentContainer below lo.SegmentPrefix. If that is not the case, the name +//lo.segmentContainer below lo.segmentPrefix. If that is not the case, the name //of the last segment that does will be used instead. // //If there are no segments yet, or if all segments are located outside the -//lo.SegmentContainer and lo.SegmentPrefix, the first segment name is chosen as -//lo.SegmentPrefix + "0000000000000001". +//lo.segmentContainer and lo.segmentPrefix, the first segment name is chosen as +//lo.segmentPrefix + "0000000000000001". func (lo *LargeObject) NextSegmentObject() *Object { //find the name of the last-most segment that is within the designated //segment container and prefix @@ -420,7 +512,7 @@ func (lo *LargeObject) NextSegmentObject() *Object { if o == nil { //can happen for data segments continue } - if lo.SegmentContainer.isEqualTo(o.c) && strings.HasPrefix(o.Name(), lo.SegmentPrefix) { + if lo.segmentContainer.isEqualTo(o.c) && strings.HasPrefix(o.Name(), lo.segmentPrefix) { prevSegmentName = s.Object.Name() //keep going, we want to find the last such segment } @@ -429,12 +521,12 @@ func (lo *LargeObject) NextSegmentObject() *Object { //choose the next segment name based on the previous one var segmentName string if prevSegmentName == "" { - segmentName = lo.SegmentPrefix + initialIndex + segmentName = lo.segmentPrefix + initialIndex } else { segmentName = nextSegmentName(prevSegmentName) } - return lo.SegmentContainer.Object(segmentName) + return lo.segmentContainer.Object(segmentName) } var splitSegmentIndexRx = regexp.MustCompile(`^(.*?)([0-9]+$)`) @@ -466,10 +558,10 @@ func nextSegmentName(segmentName string) string { //AddSegment appends a segment to this object. The segment must already have //been uploaded. // -//WARNING: This is a low-level function. Most callers will want to use the -//io.WriteCloser provided by Open(). You will only need to add segments -//manually when you want to control the segments' metadata, or when using -//advanced features such as range-limited segments or data segments. +//WARNING: This is a low-level function. Most callers will want to use +//Append(). You will only need to add segments manually when you want to +//control the segments' metadata, or when using advanced features such as +//range-limited segments or data segments. // //This method returns ErrAccountMismatch if the segment is not located in a //container in the same account. @@ -496,21 +588,21 @@ func (lo *LargeObject) AddSegment(segment SegmentInfo) error { //required attributes return ErrSegmentInvalid } - if !o.c.a.isEqualTo(lo.SegmentContainer.a) { + if !o.c.a.isEqualTo(lo.segmentContainer.a) { return ErrAccountMismatch } - switch lo.Strategy { + switch lo.strategy { case DynamicLargeObject: if segment.RangeLength != 0 || segment.RangeOffset != 0 { //not supported for DLO return ErrSegmentInvalid } - if !o.c.isEqualTo(lo.SegmentContainer) { + if !o.c.isEqualTo(lo.segmentContainer) { return ErrContainerMismatch } - if !strings.HasPrefix(o.name, lo.SegmentPrefix) { + if !strings.HasPrefix(o.name, lo.segmentPrefix) { return ErrContainerMismatch } @@ -522,7 +614,7 @@ func (lo *LargeObject) AddSegment(segment SegmentInfo) error { } } else { //validate plain-data segments - if lo.Strategy != StaticLargeObject { + if lo.strategy != StaticLargeObject { //not supported for DLO return ErrSegmentInvalid } @@ -536,6 +628,121 @@ func (lo *LargeObject) AddSegment(segment SegmentInfo) error { return nil } +//Append uploads the contents of the given io.Reader as segment objects of the +//given segment size. (The last segment will be shorter than the segment size +//unless the reader yields an exact multiple of the segment size.) The reader +//is consumed until EOF, or until an error occurs. +// +//If you do not have an io.Reader, but you have a []byte or string instance +//containing the data, wrap it in a *bytes.Reader instance like so: +// +// var buffer []byte +// lo.Append(bytes.NewReader(buffer), segmentSizeBytes) +// +// //or... +// var buffer string +// lo.Append(bytes.NewReader([]byte(buffer)), segmentSizeBytes) +// +//If segmentSizeBytes is zero, Append() defaults to the maximum file size +//reported by Account.Capabilities(). +// +//Calls to Append() and its low-level counterpart, AddSegment(), can be freely +//intermixed. AddSegment() is useful when you want to control the segments' +//metadata or use advanced features like range segments or data segments; see +//documentation over there. +// +//This function uploads segment objects, so it may return any error that +//Object.Upload() returns, see documentation over there. +func (lo *LargeObject) Append(contents io.Reader, segmentSizeBytes int64) error { + if segmentSizeBytes < 0 { + panic("segmentSizeBytes may not be negative") + } + if segmentSizeBytes == 0 { + //apply default value for segmenting size + caps, err := lo.object.c.a.Capabilities() + if err != nil { + return err + } + segmentSizeBytes = int64(caps.Swift.MaximumFileSize) + if segmentSizeBytes <= 0 { + return errors.New("cannot infer SegmentSizeBytes from Swift /info") + } + } + + sr := segmentingReader{contents, segmentSizeBytes} + for { + segment := sr.NextSegment() + if segment == nil { + break + } + + tracker := lengthAndEtagTrackingReader{ + Reader: segment, + Hasher: md5.New(), + } + + obj := lo.NextSegmentObject() + err := obj.Upload(&tracker, nil, nil) + if err != nil { + return err + } + err = lo.AddSegment(SegmentInfo{ + Object: obj, + SizeBytes: tracker.BytesRead, + Etag: hex.EncodeToString(tracker.Hasher.Sum(nil)), + }) + if err != nil { + return err + } + } + + return nil +} + +type segmentingReader struct { + Reader io.Reader + SegmentSizeBytes int64 //must be >0 +} + +func (sr *segmentingReader) NextSegment() io.Reader { + //peek if there is more content in the backing reader + buf := make([]byte, 1) + var ( + n int + err error + ) + for n == 0 { + n, err = sr.Reader.Read(buf) + if err == io.EOF { + if n == 0 { + //EOF encountered + return nil + } + //that was the last byte - return only that (next NextSegment() will return nil) + return bytes.NewReader(buf) + } + } + + //looks like there is more stuff in the backing reader + return io.MultiReader( + bytes.NewReader(buf), + io.LimitReader(sr.Reader, sr.SegmentSizeBytes-1), //1 == len(buf) + ) +} + +type lengthAndEtagTrackingReader struct { + Reader io.Reader + BytesRead uint64 + Hasher hash.Hash +} + +func (r *lengthAndEtagTrackingReader) Read(buf []byte) (int, error) { + n, err := r.Reader.Read(buf) + r.BytesRead += uint64(n) + r.Hasher.Write(buf[:n]) + return n, err +} + //WriteManifest creates this large object by writing a manifest to its //location using a PUT request. // @@ -543,7 +750,7 @@ func (lo *LargeObject) AddSegment(segment SegmentInfo) error { //if the object already exists and has the correct manifest (i.e. //SegmentContainer and SegmentPrefix have not been changed). func (lo *LargeObject) WriteManifest(opts *RequestOptions) error { - switch lo.Strategy { + switch lo.strategy { case StaticLargeObject: return lo.writeSLOManifest(opts) case DynamicLargeObject: @@ -554,10 +761,10 @@ func (lo *LargeObject) WriteManifest(opts *RequestOptions) error { } func (lo *LargeObject) writeDLOManifest(opts *RequestOptions) error { - manifest := lo.SegmentContainer.Name() + "/" + lo.SegmentPrefix + manifest := lo.segmentContainer.Name() + "/" + lo.segmentPrefix //check if the manifest is already set correctly - headers, err := lo.Object.Headers() + headers, err := lo.object.Headers() if err != nil && !Is(err, http.StatusNotFound) { return err } @@ -568,7 +775,7 @@ func (lo *LargeObject) writeDLOManifest(opts *RequestOptions) error { //write manifest; make sure that this is a DLO opts = cloneRequestOptions(opts, nil) opts.Headers.Set("X-Object-Manifest", manifest) - return lo.Object.Upload(nil, opts) + return lo.object.Upload(nil, nil, opts) } func (lo *LargeObject) writeSLOManifest(opts *RequestOptions) error { @@ -606,97 +813,5 @@ func (lo *LargeObject) writeSLOManifest(opts *RequestOptions) error { opts = cloneRequestOptions(opts, nil) opts.Headers.Del("X-Object-Manifest") //ensure sanity :) opts.Values.Set("multipart-manifest", "put") - return lo.Object.Upload(bytes.NewReader(manifest), opts) -} - -//////////////////////////////////////////////////////////////////////////////// - -type largeObjectWriter struct { - lo *LargeObject -} - -//Write implements the io.WriteCloser interface. -func (w largeObjectWriter) Write(buf []byte) (int, error) { - segment := w.lo.NextSegmentObject() - //TODO: split write into multiple segments if len(buf) > max object size - err := segment.Upload(bytes.NewReader(buf), nil) - if err != nil { - return 0, err - } - - sum := md5.Sum(buf) - return len(buf), w.lo.AddSegment(SegmentInfo{ - Object: segment, - SizeBytes: uint64(len(buf)), - Etag: hex.EncodeToString(sum[:]), - }) -} - -//Close implements the io.WriteCloser interface. -func (w largeObjectWriter) Close() error { - return w.lo.WriteManifest(nil) -} - -//////////////////////////////////////////////////////////////////////////////// - -type largeObjectBufferedWriter struct { - bw *bufio.Writer - w io.WriteCloser -} - -//SetSegmentSize creates a bufio.Writer around an io.WriteCloser and returns -//an interface to it that works like the original io.WriteCloser. -// -//This is intended to be used when writing segments into a large object. -//The writer returned by LargeObject.Open() does not ensure a uniform segment -//size by default, so one would have to wrap it in a bufio.Writer like so: -// -// dlo, err := account.Container("public").Object("archive27.zip").AsLargeObject() -// dlo.SegmentContainer = account.Container("segments") -// dlo.SegmentPrefix = "archive27/" -// -// w, err := largeObject.Open(schwift.OpenTruncate) -// bw, err := bufio.NewWriterSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB -// _, err = bw.Write(archiveContents) -// err = bw.Flush() -// err = w.Close() -// -//This function reduces the boilerplate to: -// -// w, err := largeObject.Open(schwift.OpenTruncate) -// w, err = schwift.SetSegmentSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB -// _, err = w.Write(archiveContents) -// err = w.Close() -// -//Another advantage of this function is that the returned writer implements -//io.WriteCloser, which bufio.Writer does not. So you can pass it into -//consuming functions that use io.WriteCloser to close the object once they're -//done writing to it, and it will be ensured that the buffer is flushed before -//closing the underlying writer. -func SetSegmentSize(w io.WriteCloser, segmentSizeBytes int) io.WriteCloser { - switch w := w.(type) { - case *largeObjectBufferedWriter: - //never chain multiple largeObjectBufferedWriter together - w.bw.Flush() //ensure that previous calls to `w.Write()` are durable - return SetSegmentSize(w.w, segmentSizeBytes) - default: - return &largeObjectBufferedWriter{ - bw: bufio.NewWriterSize(w, segmentSizeBytes), - w: w, - } - } -} - -//Write implements the io.WriteCloser interface. -func (bw *largeObjectBufferedWriter) Write(buf []byte) (int, error) { - return bw.bw.Write(buf) -} - -//Close implements the io.WriteCloser interface. -func (bw *largeObjectBufferedWriter) Close() error { - err := bw.bw.Flush() - if err != nil { - return err - } - return bw.w.Close() + return lo.object.Upload(bytes.NewReader(manifest), nil, opts) } diff --git a/largeobject_test.go b/largeobject_test.go index fe781ab..724654a 100644 --- a/largeobject_test.go +++ b/largeobject_test.go @@ -18,7 +18,11 @@ package schwift -import "testing" +import ( + "bytes" + "io/ioutil" + "testing" +) func TestParseHTTPRange(t *testing.T) { testCases := []struct { @@ -62,3 +66,47 @@ func TestParseHTTPRange(t *testing.T) { } } } + +func TestSegmentingReader(t *testing.T) { + testCases := []struct { + input string + segments []string + }{ + {"abcdefghi", []string{"abc", "def", "ghi"}}, + {"abcdefgh", []string{"abc", "def", "gh"}}, + {"abcdefg", []string{"abc", "def", "g"}}, + } + + for _, tc := range testCases { + sr := segmentingReader{ + Reader: bytes.NewReader([]byte(tc.input)), + SegmentSizeBytes: 3, + } + + for _, expected := range tc.segments { + segment := sr.NextSegment() + if segment == nil { + t.Errorf("expected segment %q, but NextSegment() returned nil", expected) + break + } + actual, err := ioutil.ReadAll(segment) + if err != nil { + t.Errorf("expected segment %q, but got read error %q", expected, err.Error()) + break + } + if string(actual) != expected { + t.Errorf("expected segment %q, but got %q", expected, string(actual)) + } + } + + segment := sr.NextSegment() + if segment != nil { + actual, err := ioutil.ReadAll(segment) + if err == nil { + t.Errorf("expected no more segments, but got segment producing read error %q", err.Error()) + } else { + t.Errorf("expected no more segments, but got %q", string(actual)) + } + } + } +} diff --git a/object.go b/object.go index 69d2e95..1aa035a 100644 --- a/object.go +++ b/object.go @@ -133,6 +133,13 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error { return err } +//UploadOptions invokes advanced behavior in the Object.Upload() method. +type UploadOptions struct { + //When overwriting a large object, delete its segments. This will cause + //Upload() to call into BulkDelete(), so a BulkError may be returned. + DeleteSegments bool +} + //Upload creates the object using a PUT request. // //If you do not have an io.Reader, but you have a []byte or string instance @@ -164,17 +171,27 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error { //This function can be used regardless of whether the object exists or not. // //A successful PUT request implies Invalidate() since it may change metadata. -func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { - opts = cloneRequestOptions(opts, nil) - hdr := ObjectHeaders{opts.Headers} +func (o *Object) Upload(content io.Reader, opts *UploadOptions, ropts *RequestOptions) error { + if opts == nil { + opts = &UploadOptions{} + } + + ropts = cloneRequestOptions(ropts, nil) + hdr := ObjectHeaders{ropts.Headers} + + if !hdr.SizeBytes().Exists() { + value := tryComputeContentLength(content) + if value != nil { + hdr.SizeBytes().Set(*value) + } + } //do not attempt to add the Etag header when we're writing a large object //manifest; the header refers to the content, but we would be computing the //manifest's hash instead - isManifestUpload := opts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject() + isManifestUpload := ropts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject() var hasher hash.Hash - tryComputeContentLength(content, hdr) if !isManifestUpload { tryComputeEtag(content, hdr) @@ -187,11 +204,30 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { } } + var lo *LargeObject + if opts.DeleteSegments { + //enumerate segments in large object before overwriting it, but only delete + //the segments after successfully uploading the new object to decrease the + //chance of an inconsistent state following an upload error + var err error + lo, err = o.AsLargeObject() + switch err { + case nil: + //okay, delete segments at the end + case ErrNotLarge: + //okay, do not try to delete segments + lo = nil + default: + //unexpected error + return err + } + } + resp, err := Request{ Method: "PUT", ContainerName: o.c.name, ObjectName: o.name, - Options: opts, + Options: ropts, Body: content, ExpectStatusCodes: []int{201}, DrainResponseBody: true, @@ -208,6 +244,13 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { } } + if opts.DeleteSegments && lo != nil { + _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) + if err != nil { + return err + } + } + return nil } @@ -217,17 +260,15 @@ type readerWithLen interface { Len() int } -func tryComputeContentLength(content io.Reader, headers ObjectHeaders) { - h := headers.SizeBytes() - if h.Exists() { - return - } - +func tryComputeContentLength(content io.Reader) *uint64 { if content == nil { - h.Set(0) + val := uint64(0) + return &val } else if r, ok := content.(readerWithLen); ok { - h.Set(uint64(r.Len())) + val := uint64(r.Len()) + return &val } + return nil } //This covers both bytes.Reader and strings.Reader in a way that is compatible @@ -280,11 +321,13 @@ func tryComputeEtag(content io.Reader, headers ObjectHeaders) { // }) // //If you do not need an io.Writer, always use Upload instead. -func (o *Object) UploadWithWriter(opts *RequestOptions, callback func(io.Writer) error) error { +// +//TODO rename to UploadViaWriter +func (o *Object) UploadWithWriter(opts *UploadOptions, ropts *RequestOptions, callback func(io.Writer) error) error { reader, writer := io.Pipe() errChan := make(chan error) go func() { - err := o.Upload(reader, opts) + err := o.Upload(reader, opts, ropts) reader.CloseWithError(err) //stop the writer if it is still writing errChan <- err }() diff --git a/tests/bulk_delete_test.go b/tests/bulk_delete_test.go index c498419..dfe8468 100644 --- a/tests/bulk_delete_test.go +++ b/tests/bulk_delete_test.go @@ -77,7 +77,7 @@ func createTestObjects(c *schwift.Container) ([]*schwift.Object, error) { var objs []*schwift.Object for idx := 1; idx <= 5; idx++ { obj := c.Object(fmt.Sprintf("object%d", idx)) - err := obj.Upload(strings.NewReader("example"), nil) + err := obj.Upload(strings.NewReader("example"), nil, nil) if err != nil { return nil, err } diff --git a/tests/field_test.go b/tests/field_test.go index 8166f2d..c8dcf7f 100644 --- a/tests/field_test.go +++ b/tests/field_test.go @@ -88,7 +88,7 @@ func TestFieldTimestamp(t *testing.T) { func TestFieldHTTPTimestamp(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { obj := c.Object("test") - err := obj.Upload(nil, nil) + err := obj.Upload(nil, nil, nil) if !expectSuccess(t, err) { return } diff --git a/tests/largeobject_test.go b/tests/largeobject_test.go index 758083d..3e41cb3 100644 --- a/tests/largeobject_test.go +++ b/tests/largeobject_test.go @@ -38,15 +38,7 @@ func TestLargeObjectsBasic(t *testing.T) { obj := c.Object(strategyStr + "-largeobject") lo, err := obj.AsLargeObject() - expectSuccess(t, err) - - //Open fails when SegmentContainer is not set - _, err = lo.Open(schwift.OpenTruncate) - expectError(t, err, schwift.ErrNoContainerName.Error()) - - lo.SegmentContainer = c - lo.SegmentPrefix = strategyStr + "-segments/" - lo.Strategy = strategy + expectError(t, err, schwift.ErrNotLarge.Error()) segment1 := getRandomSegmentContent(128) segment2 := getRandomSegmentContent(128) @@ -54,16 +46,14 @@ func TestLargeObjectsBasic(t *testing.T) { segment4 := getRandomSegmentContent(128) //basic write example - w, err := lo.Open(schwift.OpenTruncate) - expectSuccess(t, err) - if w == nil { - t.FailNow() - } - _, err = w.Write([]byte(segment1)) + lo, err = obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: strategyStr + "-segments/", + Strategy: strategy, + }, nil) expectSuccess(t, err) - _, err = w.Write([]byte(segment2)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1+segment2)), 128)) + expectSuccess(t, lo.WriteManifest(nil)) expectObjectContent(t, obj, []byte(segment1+segment2)) expectLargeObject(t, obj, []schwift.SegmentInfo{ @@ -84,16 +74,8 @@ func TestLargeObjectsBasic(t *testing.T) { expectSuccess(t, err) expectLargeObjectSetup(t, lo, strategy, fmt.Sprintf("%s/%s-segments/", c.Name(), strategyStr)) - w, err = lo.Open(schwift.OpenAppend) - expectSuccess(t, err) - if w == nil { - t.FailNow() - } - _, err = w.Write([]byte(segment3)) - expectSuccess(t, err) - _, err = w.Write([]byte(segment4)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment3+segment4)), 128)) + expectSuccess(t, lo.WriteManifest(nil)) expectObjectContent(t, obj, []byte(segment1+segment2+segment3+segment4)) expectLargeObject(t, obj, []schwift.SegmentInfo{ @@ -122,26 +104,22 @@ func TestLargeObjectsBasic(t *testing.T) { //basic truncate example lo, err = obj.AsLargeObject() expectSuccess(t, err) + err = lo.Truncate(&schwift.TruncateOptions{ + DeleteSegments: true, + }) + expectSuccess(t, err) expectLargeObjectSetup(t, lo, strategy, fmt.Sprintf("%s/%s-segments/", c.Name(), strategyStr)) - w, err = lo.Open(schwift.OpenTruncate) - expectSuccess(t, err) - if w == nil { - t.FailNow() - } //verify that segments were deleted iter := c.Objects() - iter.Prefix = lo.SegmentPrefix + iter.Prefix = lo.SegmentPrefix() names, err := iter.Collect() expectSuccess(t, err) expectObjectNames(t, names) - _, err = w.Write([]byte(segment3)) - expectSuccess(t, err) - _, err = w.Write([]byte(segment4)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment3+segment4)), 128)) + expectSuccess(t, lo.WriteManifest(nil)) expectObjectContent(t, obj, []byte(segment3+segment4)) expectLargeObject(t, obj, []schwift.SegmentInfo{ @@ -164,7 +142,7 @@ func TestLargeObjectsBasic(t *testing.T) { func TestOpenRegularObjectAsLargeObject(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { o := c.Object("foo") - expectSuccess(t, o.Upload(bytes.NewReader(objectExampleContent), nil)) + expectSuccess(t, o.Upload(bytes.NewReader(objectExampleContent), nil, nil)) _, err := o.AsLargeObject() expectError(t, err, schwift.ErrNotLarge.Error()) }) @@ -173,29 +151,21 @@ func TestOpenRegularObjectAsLargeObject(t *testing.T) { func TestSLOWithDataSegment(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { o := c.Object("foo") - lo, err := o.AsLargeObject() + lo, err := o.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "segments/", + Strategy: schwift.StaticLargeObject, + }, nil) expectSuccess(t, err) - lo.SegmentContainer = c - lo.SegmentPrefix = "segments/" - lo.Strategy = schwift.StaticLargeObject segment1 := getRandomSegmentContent(128) - segment2 := getRandomSegmentContent(128) - w, err := lo.Open(schwift.OpenTruncate) - expectSuccess(t, err) - if w == nil { - t.FailNow() - } - _, err = w.Write([]byte(segment1)) - expectSuccess(t, err) - dataSegment := schwift.SegmentInfo{Data: []byte("---")} - err = lo.AddSegment(dataSegment) - expectSuccess(t, err) + segment2 := getRandomSegmentContent(128) - _, err = w.Write([]byte(segment2)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.AddSegment(dataSegment)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) expectObjectContent(t, o, []byte(segment1+string(dataSegment.Data)+segment2)) expectLargeObject(t, o, []schwift.SegmentInfo{ @@ -218,14 +188,15 @@ func TestSLOWithRangeSegments(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { segmentStr := "XX" segmentObj := c.Object("segment") - expectSuccess(t, segmentObj.Upload(bytes.NewReader([]byte(segmentStr)), nil)) + expectSuccess(t, segmentObj.Upload(bytes.NewReader([]byte(segmentStr)), nil, nil)) o := c.Object("largeobject") - lo, err := o.AsLargeObject() + lo, err := o.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "segments/", + Strategy: schwift.StaticLargeObject, + }, nil) expectSuccess(t, err) - lo.SegmentContainer = c - lo.SegmentPrefix = "segments/" - lo.Strategy = schwift.StaticLargeObject //the large object is composed out of three ranges such that the "X" are precisely cut out of segmentStr expectSuccess(t, lo.AddSegment(schwift.SegmentInfo{ @@ -277,26 +248,23 @@ func TestSLOGuessSegmentPrefix(t *testing.T) { obj := c.Object("largeobject") //setup phase: create an SLO - lo, err := obj.AsLargeObject() - expectSuccess(t, err) - lo.SegmentContainer = c - lo.SegmentPrefix = "foo/bar/baz/" - w, err := lo.Open(schwift.OpenTruncate) + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "foo/bar/baz/", + }, nil) expectSuccess(t, err) segment1 := getRandomSegmentContent(128) segment2 := getRandomSegmentContent(128) - _, err = w.Write([]byte(segment1)) - expectSuccess(t, err) - _, err = w.Write([]byte(segment2)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) //now create a fresh SLO and check if it infers the correct SegmentPrefix lo, err = obj.AsLargeObject() expectSuccess(t, err) - expectString(t, lo.SegmentContainer.Name(), c.Name()) - expectString(t, lo.SegmentPrefix, "foo/bar/baz/") + expectString(t, lo.SegmentContainer().Name(), c.Name()) + expectString(t, lo.SegmentPrefix(), "foo/bar/baz/") }) } @@ -305,27 +273,25 @@ func TestDeleteLargeObjectAndKeepSegments(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { obj := c.Object("largeobject") - //setup phase: create an SLO - lo, err := obj.AsLargeObject() - expectSuccess(t, err) - lo.SegmentContainer = c - lo.SegmentPrefix = "foo/bar/baz/" - w, err := lo.Open(schwift.OpenTruncate) + //setup phase: create a large object + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "foo/bar/baz/", + Strategy: strategy, + }, nil) expectSuccess(t, err) segment1 := getRandomSegmentContent(128) segment2 := getRandomSegmentContent(128) - _, err = w.Write([]byte(segment1)) - expectSuccess(t, err) - _, err = w.Write([]byte(segment2)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) //test deletion that keeps segments expectSuccess(t, obj.Delete(nil, nil)) iter := c.Objects() - iter.Prefix = lo.SegmentPrefix + iter.Prefix = lo.SegmentPrefix() names, err := iter.Collect() expectSuccess(t, err) expectObjectNames(t, names, @@ -340,27 +306,25 @@ func TestDeleteLargeObjectIncludingSegments(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { obj := c.Object("largeobject") - //setup phase: create an SLO - lo, err := obj.AsLargeObject() - expectSuccess(t, err) - lo.SegmentContainer = c - lo.SegmentPrefix = "foo/bar/baz/" - w, err := lo.Open(schwift.OpenTruncate) + //setup phase: create a large object + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "foo/bar/baz/", + Strategy: strategy, + }, nil) expectSuccess(t, err) segment1 := getRandomSegmentContent(128) segment2 := getRandomSegmentContent(128) - _, err = w.Write([]byte(segment1)) - expectSuccess(t, err) - _, err = w.Write([]byte(segment2)) - expectSuccess(t, err) - expectSuccess(t, w.Close()) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) //test deletion that keeps segments expectSuccess(t, obj.Delete(&schwift.DeleteOptions{DeleteSegments: true}, nil)) iter := c.Objects() - iter.Prefix = lo.SegmentPrefix + iter.Prefix = lo.SegmentPrefix() names, err := iter.Collect() expectSuccess(t, err) expectObjectNames(t, names) @@ -415,19 +379,19 @@ func expectLargeObject(t *testing.T, obj *schwift.Object, expected []schwift.Seg } func expectLargeObjectSetup(t *testing.T, lo *schwift.LargeObject, strategy schwift.LargeObjectStrategy, segmentFullPrefix string) { - if strategy != lo.Strategy { + if strategy != lo.Strategy() { t.Errorf("expected %s to use LargeObjectStrategy %d, got %d", - lo.Object.FullName(), strategy, lo.Strategy) + lo.Object().FullName(), strategy, lo.Strategy()) } - if lo.SegmentContainer == nil { + if lo.SegmentContainer() == nil { t.Errorf("expected %s to use segment container+prefix %q, got no container", - lo.Object.FullName(), segmentFullPrefix) + lo.Object().FullName(), segmentFullPrefix) } else { - fullPrefix := lo.SegmentContainer.Name() + "/" + lo.SegmentPrefix + fullPrefix := lo.SegmentContainer().Name() + "/" + lo.SegmentPrefix() if fullPrefix != segmentFullPrefix { t.Errorf("expected %s to use segment container+prefix %q, got %q", - lo.Object.FullName(), segmentFullPrefix, fullPrefix) + lo.Object().FullName(), segmentFullPrefix, fullPrefix) } } } diff --git a/tests/object_iterator_test.go b/tests/object_iterator_test.go index 009189d..9b822cd 100644 --- a/tests/object_iterator_test.go +++ b/tests/object_iterator_test.go @@ -39,7 +39,7 @@ func TestObjectIterator(t *testing.T) { for idx := 1; idx <= 4; idx++ { hdr := schwift.NewObjectHeaders() hdr.ContentType().Set("application/json") - err := c.Object(oname(idx)).Upload(bytes.NewReader(objectExampleContent), hdr.ToOpts()) + err := c.Object(oname(idx)).Upload(bytes.NewReader(objectExampleContent), nil, hdr.ToOpts()) expectSuccess(t, err) } diff --git a/tests/object_test.go b/tests/object_test.go index 6291527..251db9f 100644 --- a/tests/object_test.go +++ b/tests/object_test.go @@ -51,7 +51,7 @@ func TestObjectLifecycle(t *testing.T) { err = o.Delete(nil, nil) expectError(t, err, "expected 204 response, got 404 instead:

Not Found

The resource could not be found.

") - err = o.Upload(bytes.NewReader([]byte("test")), nil) + err = o.Upload(bytes.NewReader([]byte("test")), nil, nil) expectSuccess(t, err) expectObjectExistence(t, o, true) @@ -66,31 +66,31 @@ func TestObjectUpload(t *testing.T) { //test upload with bytes.Reader obj := c.Object("upload1") - err := obj.Upload(bytes.NewReader(objectExampleContent), nil) + err := obj.Upload(bytes.NewReader(objectExampleContent), nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, objectExampleContent) //test upload with bytes.Buffer obj = c.Object("upload2") - err = obj.Upload(bytes.NewBuffer(objectExampleContent), nil) + err = obj.Upload(bytes.NewBuffer(objectExampleContent), nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, objectExampleContent) //test upload with strings.Reader obj = c.Object("upload3") - err = obj.Upload(strings.NewReader(string(objectExampleContent)), nil) + err = obj.Upload(strings.NewReader(string(objectExampleContent)), nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, objectExampleContent) //test upload with opaque io.Reader obj = c.Object("upload4") - err = obj.Upload(opaqueReader{bytes.NewReader(objectExampleContent)}, nil) + err = obj.Upload(opaqueReader{bytes.NewReader(objectExampleContent)}, nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, objectExampleContent) //test upload with io.Writer obj = c.Object("upload5") - err = obj.UploadWithWriter(nil, func(w io.Writer) error { + err = obj.UploadWithWriter(nil, nil, func(w io.Writer) error { _, err := w.Write(objectExampleContent) return err }) @@ -99,13 +99,13 @@ func TestObjectUpload(t *testing.T) { //test upload with empty reader (should create zero-byte-sized object) obj = c.Object("upload6") - err = obj.Upload(eofReader{}, nil) + err = obj.Upload(eofReader{}, nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, nil) //test upload without reader (should create zero-byte-sized object) obj = c.Object("upload7") - err = obj.Upload(nil, nil) + err = obj.Upload(nil, nil, nil) expectSuccess(t, err) expectObjectContent(t, obj, nil) }) @@ -129,7 +129,7 @@ func TestObjectDownload(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { //upload example object obj := c.Object("example") - err := obj.Upload(bytes.NewReader(objectExampleContent), nil) + err := obj.Upload(bytes.NewReader(objectExampleContent), nil, nil) expectSuccess(t, err) //test download as string @@ -170,7 +170,7 @@ func TestObjectUpdate(t *testing.T) { expectError(t, err, "expected 202 response, got 404 instead:

Not Found

The resource could not be found.

") //create object - err = obj.Upload(nil, nil) + err = obj.Upload(nil, nil, nil) expectSuccess(t, err) hdr, err := obj.Headers() @@ -190,7 +190,7 @@ func TestObjectUpdate(t *testing.T) { func TestObjectCopyMove(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { obj1 := c.Object("location1") - err := obj1.Upload(bytes.NewReader(objectExampleContent), nil) + err := obj1.Upload(bytes.NewReader(objectExampleContent), nil, nil) expectSuccess(t, err) expectObjectExistence(t, obj1, true) -- cgit v1.2.3