diff options
| author | Stefan Majewsky <majewsky@gmx.net> | 2018-04-30 14:14:56 +0200 |
|---|---|---|
| committer | Stefan Majewsky <majewsky@gmx.net> | 2018-05-02 19:33:46 +0200 |
| commit | f9749638e3393f471d7e28362795689bf37cc023 (patch) | |
| tree | 57d56e88387b6ceef39ba23e29d009d683e44be4 /object.go | |
| parent | a5ad3ae67e9c42aa738adae7e7fd535109bc9005 (diff) | |
| download | go-schwift-f9749638e3393f471d7e28362795689bf37cc023.tar.gz | |
revamp the LargeObject API
I thought about this some more, and I believe the Writer-based approach
in the previous version of the LargeObject API does not scale: It makes
it very hard to write code that uploads segments without resorting to a
buffer the same size as the segments. I don't want gigabyte-scale
buffers filling up my RAM, so this commit switches to a different API
based on Readers. LargeObject.Append() now behaves very similar to
Object.Upload(), which I find quite nice.
Diffstat (limited to 'object.go')
| -rw-r--r-- | object.go | 75 |
1 files changed, 59 insertions, 16 deletions
@@ -133,6 +133,13 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error { return err } +//UploadOptions invokes advanced behavior in the Object.Upload() method. +type UploadOptions struct { + //When overwriting a large object, delete its segments. This will cause + //Upload() to call into BulkDelete(), so a BulkError may be returned. + DeleteSegments bool +} + //Upload creates the object using a PUT request. // //If you do not have an io.Reader, but you have a []byte or string instance @@ -164,17 +171,27 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error { //This function can be used regardless of whether the object exists or not. // //A successful PUT request implies Invalidate() since it may change metadata. -func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { - opts = cloneRequestOptions(opts, nil) - hdr := ObjectHeaders{opts.Headers} +func (o *Object) Upload(content io.Reader, opts *UploadOptions, ropts *RequestOptions) error { + if opts == nil { + opts = &UploadOptions{} + } + + ropts = cloneRequestOptions(ropts, nil) + hdr := ObjectHeaders{ropts.Headers} + + if !hdr.SizeBytes().Exists() { + value := tryComputeContentLength(content) + if value != nil { + hdr.SizeBytes().Set(*value) + } + } //do not attempt to add the Etag header when we're writing a large object //manifest; the header refers to the content, but we would be computing the //manifest's hash instead - isManifestUpload := opts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject() + isManifestUpload := ropts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject() var hasher hash.Hash - tryComputeContentLength(content, hdr) if !isManifestUpload { tryComputeEtag(content, hdr) @@ -187,11 +204,30 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { } } + var lo *LargeObject + if opts.DeleteSegments { + //enumerate segments in large object before overwriting it, but only delete + //the segments after successfully uploading the new object to decrease the + //chance of an inconsistent state following an upload error + var err error + lo, err = o.AsLargeObject() + switch err { + case nil: + //okay, delete segments at the end + case ErrNotLarge: + //okay, do not try to delete segments + lo = nil + default: + //unexpected error + return err + } + } + resp, err := Request{ Method: "PUT", ContainerName: o.c.name, ObjectName: o.name, - Options: opts, + Options: ropts, Body: content, ExpectStatusCodes: []int{201}, DrainResponseBody: true, @@ -208,6 +244,13 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error { } } + if opts.DeleteSegments && lo != nil { + _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) + if err != nil { + return err + } + } + return nil } @@ -217,17 +260,15 @@ type readerWithLen interface { Len() int } -func tryComputeContentLength(content io.Reader, headers ObjectHeaders) { - h := headers.SizeBytes() - if h.Exists() { - return - } - +func tryComputeContentLength(content io.Reader) *uint64 { if content == nil { - h.Set(0) + val := uint64(0) + return &val } else if r, ok := content.(readerWithLen); ok { - h.Set(uint64(r.Len())) + val := uint64(r.Len()) + return &val } + return nil } //This covers both bytes.Reader and strings.Reader in a way that is compatible @@ -280,11 +321,13 @@ func tryComputeEtag(content io.Reader, headers ObjectHeaders) { // }) // //If you do not need an io.Writer, always use Upload instead. -func (o *Object) UploadWithWriter(opts *RequestOptions, callback func(io.Writer) error) error { +// +//TODO rename to UploadViaWriter +func (o *Object) UploadWithWriter(opts *UploadOptions, ropts *RequestOptions, callback func(io.Writer) error) error { reader, writer := io.Pipe() errChan := make(chan error) go func() { - err := o.Upload(reader, opts) + err := o.Upload(reader, opts, ropts) reader.CloseWithError(err) //stop the writer if it is still writing errChan <- err }() |
