aboutsummaryrefslogtreecommitdiff
path: root/object.go
diff options
context:
space:
mode:
Diffstat (limited to 'object.go')
-rw-r--r--object.go75
1 files changed, 59 insertions, 16 deletions
diff --git a/object.go b/object.go
index 69d2e95..1aa035a 100644
--- a/object.go
+++ b/object.go
@@ -133,6 +133,13 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error {
return err
}
+//UploadOptions invokes advanced behavior in the Object.Upload() method.
+type UploadOptions struct {
+ //When overwriting a large object, delete its segments. This will cause
+ //Upload() to call into BulkDelete(), so a BulkError may be returned.
+ DeleteSegments bool
+}
+
//Upload creates the object using a PUT request.
//
//If you do not have an io.Reader, but you have a []byte or string instance
@@ -164,17 +171,27 @@ func (o *Object) Update(headers ObjectHeaders, opts *RequestOptions) error {
//This function can be used regardless of whether the object exists or not.
//
//A successful PUT request implies Invalidate() since it may change metadata.
-func (o *Object) Upload(content io.Reader, opts *RequestOptions) error {
- opts = cloneRequestOptions(opts, nil)
- hdr := ObjectHeaders{opts.Headers}
+func (o *Object) Upload(content io.Reader, opts *UploadOptions, ropts *RequestOptions) error {
+ if opts == nil {
+ opts = &UploadOptions{}
+ }
+
+ ropts = cloneRequestOptions(ropts, nil)
+ hdr := ObjectHeaders{ropts.Headers}
+
+ if !hdr.SizeBytes().Exists() {
+ value := tryComputeContentLength(content)
+ if value != nil {
+ hdr.SizeBytes().Set(*value)
+ }
+ }
//do not attempt to add the Etag header when we're writing a large object
//manifest; the header refers to the content, but we would be computing the
//manifest's hash instead
- isManifestUpload := opts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject()
+ isManifestUpload := ropts.Values.Get("multipart-manifest") == "put" || hdr.IsDynamicLargeObject()
var hasher hash.Hash
- tryComputeContentLength(content, hdr)
if !isManifestUpload {
tryComputeEtag(content, hdr)
@@ -187,11 +204,30 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error {
}
}
+ var lo *LargeObject
+ if opts.DeleteSegments {
+ //enumerate segments in large object before overwriting it, but only delete
+ //the segments after successfully uploading the new object to decrease the
+ //chance of an inconsistent state following an upload error
+ var err error
+ lo, err = o.AsLargeObject()
+ switch err {
+ case nil:
+ //okay, delete segments at the end
+ case ErrNotLarge:
+ //okay, do not try to delete segments
+ lo = nil
+ default:
+ //unexpected error
+ return err
+ }
+ }
+
resp, err := Request{
Method: "PUT",
ContainerName: o.c.name,
ObjectName: o.name,
- Options: opts,
+ Options: ropts,
Body: content,
ExpectStatusCodes: []int{201},
DrainResponseBody: true,
@@ -208,6 +244,13 @@ func (o *Object) Upload(content io.Reader, opts *RequestOptions) error {
}
}
+ if opts.DeleteSegments && lo != nil {
+ _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil)
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
@@ -217,17 +260,15 @@ type readerWithLen interface {
Len() int
}
-func tryComputeContentLength(content io.Reader, headers ObjectHeaders) {
- h := headers.SizeBytes()
- if h.Exists() {
- return
- }
-
+func tryComputeContentLength(content io.Reader) *uint64 {
if content == nil {
- h.Set(0)
+ val := uint64(0)
+ return &val
} else if r, ok := content.(readerWithLen); ok {
- h.Set(uint64(r.Len()))
+ val := uint64(r.Len())
+ return &val
}
+ return nil
}
//This covers both bytes.Reader and strings.Reader in a way that is compatible
@@ -280,11 +321,13 @@ func tryComputeEtag(content io.Reader, headers ObjectHeaders) {
// })
//
//If you do not need an io.Writer, always use Upload instead.
-func (o *Object) UploadWithWriter(opts *RequestOptions, callback func(io.Writer) error) error {
+//
+//TODO rename to UploadViaWriter
+func (o *Object) UploadWithWriter(opts *UploadOptions, ropts *RequestOptions, callback func(io.Writer) error) error {
reader, writer := io.Pipe()
errChan := make(chan error)
go func() {
- err := o.Upload(reader, opts)
+ err := o.Upload(reader, opts, ropts)
reader.CloseWithError(err) //stop the writer if it is still writing
errChan <- err
}()