diff options
| -rw-r--r-- | largeobject.go | 13 | ||||
| -rw-r--r-- | object.go | 4 | ||||
| -rw-r--r-- | tests/largeobject_test.go | 195 |
3 files changed, 208 insertions, 4 deletions
diff --git a/largeobject.go b/largeobject.go index a39b6c0..7771366 100644 --- a/largeobject.go +++ b/largeobject.go @@ -188,7 +188,15 @@ func (lo *LargeObject) Segments() ([]SegmentInfo, error) { return lo.segments, nil } -func (lo *LargeObject) segmentObjects() []*Object { +//SegmentObjects returns a list of all segment objects referenced by this large +//object. Note that, in general, +// +// len(lo.SegmentObjects()) <= len(lo.Segments()) +// +//since one object may be backing multiple segments, and data segments are not +//backed by any object at all. No guarantee is made about the order in which +//objects appear in this list. +func (lo *LargeObject) SegmentObjects() []*Object { seen := make(map[string]bool) result := make([]*Object, 0, len(lo.segments)) for _, segment := range lo.segments { @@ -415,6 +423,7 @@ func (o *Object) AsNewLargeObject(sopts SegmentingOptions, topts *TruncateOption } case ErrNotLarge: //not an error, continue down below + err = nil default: return nil, err //unexpected error } @@ -469,7 +478,7 @@ type TruncateOptions struct { //not written by this call, so WriteManifest() usually needs to be called //afterwards. func (lo *LargeObject) Truncate(opts *TruncateOptions) error { - _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) + _, _, err := lo.object.c.a.BulkDelete(lo.SegmentObjects(), nil, nil) if err == nil { lo.segments = nil } @@ -245,7 +245,7 @@ func (o *Object) Upload(content io.Reader, opts *UploadOptions, ropts *RequestOp } if opts.DeleteSegments && lo != nil { - _, _, err := lo.object.c.a.BulkDelete(lo.segmentObjects(), nil, nil) + _, _, err := lo.object.c.a.BulkDelete(lo.SegmentObjects(), nil, nil) if err != nil { return err } @@ -362,7 +362,7 @@ func (o *Object) Delete(opts *DeleteOptions, ropts *RequestOptions) error { switch err { case nil: //is large object - delete segments and the object itself in one step - _, _, err := o.c.a.BulkDelete(append(lo.segmentObjects(), o), nil, nil) + _, _, err := o.c.a.BulkDelete(append(lo.SegmentObjects(), o), nil, nil) o.Invalidate() return err case ErrNotLarge: diff --git a/tests/largeobject_test.go b/tests/largeobject_test.go index 3e41cb3..a8c18e6 100644 --- a/tests/largeobject_test.go +++ b/tests/largeobject_test.go @@ -139,6 +139,47 @@ func TestLargeObjectsBasic(t *testing.T) { }) } +func TestTruncateDuringOverwrite(t *testing.T) { + foreachLargeObjectStrategy(func(strategy schwift.LargeObjectStrategy, strategyStr string) { + testWithContainer(t, func(c *schwift.Container) { + obj := c.Object("largeobject") + + //setup phase: create a large object + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "segments/", + Strategy: strategy, + }, nil) + expectSuccess(t, err) + + segment1 := getRandomSegmentContent(128) + segment2 := getRandomSegmentContent(128) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) + + expectObjectExistence(t, c.Object("segments/0000000000000001"), true) + expectObjectExistence(t, c.Object("segments/0000000000000002"), true) + + //test phase: truncate using AsNewLargeObject + lo, err = obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + Strategy: strategy, + }, &schwift.TruncateOptions{ + DeleteSegments: true, + }) + expectSuccess(t, err) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) + + expectObjectExistence(t, c.Object("segments/0000000000000001"), false) + expectObjectExistence(t, c.Object("segments/0000000000000002"), false) + + }) + }) +} + func TestOpenRegularObjectAsLargeObject(t *testing.T) { testWithContainer(t, func(c *schwift.Container) { o := c.Object("foo") @@ -181,6 +222,12 @@ func TestSLOWithDataSegment(t *testing.T) { Etag: etagOfString(segment2), }, }) + + //check that truncating this does not try to delete the nil segment.Object + //in the data segment + expectSuccess(t, lo.Truncate(&schwift.TruncateOptions{ + DeleteSegments: true, + })) }) } @@ -332,6 +379,154 @@ func TestDeleteLargeObjectIncludingSegments(t *testing.T) { }) } +func TestOverwriteLargeObjectAndKeepSegments(t *testing.T) { + foreachLargeObjectStrategy(func(strategy schwift.LargeObjectStrategy, strategyStr string) { + testWithContainer(t, func(c *schwift.Container) { + obj := c.Object("largeobject") + + //setup phase: create a large object + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "foo/bar/baz/", + Strategy: strategy, + }, nil) + expectSuccess(t, err) + + segment1 := getRandomSegmentContent(128) + segment2 := getRandomSegmentContent(128) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) + + //test overwriting that keeps segments + expectSuccess(t, obj.Upload(bytes.NewReader(objectExampleContent), nil, nil)) + + iter := c.Objects() + iter.Prefix = lo.SegmentPrefix() + names, err := iter.Collect() + expectSuccess(t, err) + expectObjectNames(t, names, + "foo/bar/baz/0000000000000001", + "foo/bar/baz/0000000000000002") + }) + }) +} + +func TestOverwriteLargeObjectIncludingSegments(t *testing.T) { + foreachLargeObjectStrategy(func(strategy schwift.LargeObjectStrategy, strategyStr string) { + testWithContainer(t, func(c *schwift.Container) { + obj := c.Object("largeobject") + + //setup phase: create a large object + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + SegmentPrefix: "foo/bar/baz/", + Strategy: strategy, + }, nil) + expectSuccess(t, err) + + segment1 := getRandomSegmentContent(128) + segment2 := getRandomSegmentContent(128) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment1)), 0)) + expectSuccess(t, lo.Append(bytes.NewReader([]byte(segment2)), 0)) + expectSuccess(t, lo.WriteManifest(nil)) + + //test overwriting that deletes segments + expectSuccess(t, obj.Upload( + bytes.NewReader(objectExampleContent), + &schwift.UploadOptions{DeleteSegments: true}, + nil, + )) + + iter := c.Objects() + iter.Prefix = lo.SegmentPrefix() + names, err := iter.Collect() + expectSuccess(t, err) + expectObjectNames(t, names) + + //test overwriting that wants to delete segments, but there aren't any + expectSuccess(t, obj.Upload( + bytes.NewReader(objectExampleContent), + &schwift.UploadOptions{DeleteSegments: true}, + nil, + )) + + //while we're at it, test the same for deletion + expectSuccess(t, obj.Delete( + &schwift.DeleteOptions{DeleteSegments: true}, + nil, + )) + }) + }) +} + +func TestAddInvalidSegments(t *testing.T) { + foreachLargeObjectStrategy(func(strategy schwift.LargeObjectStrategy, strategyStr string) { + testWithContainer(t, func(c *schwift.Container) { + obj := c.Object("largeobject") + lo, err := obj.AsNewLargeObject(schwift.SegmentingOptions{ + SegmentContainer: c, + Strategy: strategy, + }, nil) + expectSuccess(t, err) + + //must have either backing object or data + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Object: nil, + Data: nil, + }), schwift.ErrSegmentInvalid.Error()) + + //if data segment, must not have any other attribute + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Data: []byte("foo"), + Object: obj, + }), schwift.ErrSegmentInvalid.Error()) + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Data: []byte("foo"), + SizeBytes: 3, + }), schwift.ErrSegmentInvalid.Error()) + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Data: []byte("foo"), + Etag: etagOfString("foo"), + }), schwift.ErrSegmentInvalid.Error()) + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Data: []byte("foo"), + RangeOffset: 2, + }), schwift.ErrSegmentInvalid.Error()) + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Data: []byte("foo"), + RangeLength: 1, + }), schwift.ErrSegmentInvalid.Error()) + + //malformed range + expectError(t, lo.AddSegment(schwift.SegmentInfo{ + Object: obj, + RangeOffset: -1, + RangeLength: 0, + }), schwift.ErrSegmentInvalid.Error()) + + //DLO is strict about the SegmentContainer and SegmentPrefix, but SLO accepts it + c2 := c.Account().Container("foo") + err = lo.AddSegment(schwift.SegmentInfo{ + Object: c2.Object("foo"), + }) + if strategy == schwift.DynamicLargeObject { + expectError(t, err, schwift.ErrContainerMismatch.Error()) + } else { + expectSuccess(t, err) + } + err = lo.AddSegment(schwift.SegmentInfo{ + Object: c.Object("definitely-not-in-the-segment-prefix"), + }) + if strategy == schwift.DynamicLargeObject { + expectError(t, err, schwift.ErrContainerMismatch.Error()) + } else { + expectSuccess(t, err) + } + }) + }) +} + //////////////////////////////////////////////////////////////////////////////// // helpers |
