diff options
| -rw-r--r-- | bulk.go | 272 | ||||
| -rw-r--r-- | errors.go | 4 | ||||
| -rw-r--r-- | tests/bulk_delete_test.go | 87 | ||||
| -rw-r--r-- | tests/container_test.go | 8 |
4 files changed, 330 insertions, 41 deletions
@@ -20,7 +20,10 @@ package schwift import ( "encoding/json" + "fmt" "io" + "net/http" + "net/url" "strconv" "strings" ) @@ -90,66 +93,253 @@ func (a *Account) BulkUpload(uploadPath string, format BulkUploadFormat, content return 0, err } - var result struct { - //ResponseStatus indicates the overall result as a HTTP status string, e.g. - //"201 Created" or "500 Internal Error". - ResponseStatus string `json:"Response Status"` - //ResponseBody contains an overall error message for errors that are not - //related to a single file in the archive (e.g. "invalid tar file" or "Max - //delete failures exceeded"). - ResponseBody string `json:"Response Body"` - //Errors contains error messages for individual files. Each entry is a - //[]string with 2 elements, the object's fullName and the HTTP status for - //this file's upload (e.g. "412 Precondition Failed"). - Errors [][]string `json:"Errors"` - //NumberFilesCreated is self-explanatory. - NumberFilesCreated int `json:"Number Files Created"` - } - err = json.NewDecoder(resp.Body).Decode(&result) - closeErr := resp.Body.Close() + result, err := parseBulkResponse(resp.Body) + return result.NumberFilesCreated, err +} + +func parseResponseStatus(status string) (int, error) { + //`status` looks like "201 Created" + fields := strings.SplitN(status, " ", 2) + return strconv.Atoi(fields[0]) +} + +func makeBulkObjectError(fullName string, statusCode int) BulkObjectError { + nameFields := strings.SplitN(fullName, "/", 2) + for len(nameFields) < 2 { + nameFields = append(nameFields, "") + } + return BulkObjectError{ + ContainerName: nameFields[0], + ObjectName: nameFields[1], + StatusCode: statusCode, + } +} + +//BulkDelete deletes a large number of objects (and containers) at once. +//Containers are queued at the end of the deletion, so a container can be +//deleted in the same call in which all objects in it are deleted. +// +//For example, to delete all objects in a container: +// +// var container *schwift.Container +// +// objects, err := container.Objects().Collect() +// numDeleted, numNotFound, err := container.Account().BulkDelete(objects, nil, nil, nil) +// +//To also delete the container: +// +// var container *schwift.Container +// +// objects, err := container.Objects().Collect() +// numDeleted, numNotFound, err := container.Account().BulkDelete( +// objects, []*schwift.Container{container}, nil, nil) +// +//If the server does not support bulk-deletion, this function will just call +//Object.Delete() for each given object and Container.Delete() for each given +//container, and aggregate the result. +// +//If not nil, the error return value is *usually* an instance of +//BulkError. +// +//The objects may be located in multiple containers, but they and the +//containers must all be located in the given account. (Otherwise, +//ErrAccountMismatch is returned.) +func (a *Account) BulkDelete(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (numDeleted int, numNotFound int, deleteError error) { + //validate that all given objects are in this account + for _, obj := range objects { + other := obj.Container().Account() + if other.baseURL != a.baseURL || other.name != a.name { + return 0, 0, ErrAccountMismatch + } + } + for _, container := range containers { + other := container.Account() + if other.baseURL != a.baseURL || other.name != a.name { + return 0, 0, ErrAccountMismatch + } + } + + //check capabilities to choose deletion method + caps, err := a.Capabilities() + if err != nil { + return 0, 0, err + } + if caps.BulkDelete == nil { + return a.bulkDeleteSingle(objects, containers, headers, opts) + } + chunkSize := int(caps.BulkDelete.MaximumDeletesPerRequest) + + //collect names of things to delete into one big list + var names []string + for _, object := range objects { + object.Invalidate() //deletion must invalidate objects! + names = append(names, fmt.Sprintf("/%s/%s", + url.PathEscape(object.Container().Name()), + url.PathEscape(object.Name()), + )) + } + for _, container := range containers { + container.Invalidate() //deletion must invalidate objects! + names = append(names, "/"+url.PathEscape(container.Name())) + } + + //split list into chunks according to maximum allowed + //chunk size; aggregate results + for len(names) > 0 { + //this condition holds only in the final iteration + if chunkSize > len(names) { + chunkSize = len(names) + } + chunk := names[0:chunkSize] + names = names[chunkSize:] + + numDeletedNow, numNotFoundNow, err := a.bulkDelete(chunk, headers, opts) + numDeleted += numDeletedNow + numNotFound += numNotFoundNow + if err != nil { + return numDeleted, numNotFound, err + } + } + + return numDeleted, numNotFound, nil +} + +//Implementation of BulkDelete() for servers that *do not* support bulk +//deletion. +func (a *Account) bulkDeleteSingle(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (int, int, error) { + var ( + numDeleted = 0 + numNotFound = 0 + errs []BulkObjectError + ) + + handleSingleError := func(containerName, objectName string, err error) error { + if err == nil { + numDeleted++ + return nil + } + if Is(err, http.StatusNotFound) { + numNotFound++ + return nil + } + if statusErr, ok := err.(UnexpectedStatusCodeError); ok { + errs = append(errs, BulkObjectError{ + ContainerName: containerName, + ObjectName: objectName, + StatusCode: statusErr.ActualResponse.StatusCode, + }) + return nil + } + //unexpected error type -> stop early + return err + } + + for _, obj := range objects { + err := obj.Delete(ObjectHeaders(headers), opts) //this implies Invalidate() + err = handleSingleError(obj.Container().Name(), obj.Name(), err) + if err != nil { + return numDeleted, numNotFound, err + } + } + + for _, container := range containers { + err := container.Delete(ContainerHeaders(headers), opts) //this implies Invalidate() + err = handleSingleError(container.Name(), "", err) + if err != nil { + return numDeleted, numNotFound, err + } + } + + if len(errs) == 0 { + return numDeleted, numNotFound, nil + } + return numDeleted, numNotFound, BulkError{ + StatusCode: errs[0].StatusCode, + OverallError: http.StatusText(errs[0].StatusCode), + ObjectErrors: errs, + } +} + +//Implementation of BulkDelete() for servers that *do* support bulk deletion. +//This function is called *after* chunking, so `len(names) <= +//account.Capabilities.BulkDelete.MaximumDeletesPerRequest`. +func (a *Account) bulkDelete(names []string, headers AccountHeaders, opts *RequestOptions) (int, int, error) { + req := Request{ + Method: "DELETE", + Body: strings.NewReader(strings.Join(names, "\n") + "\n"), + Headers: headersToHTTP(headers), + Options: cloneRequestOptions(opts), + ExpectStatusCodes: []int{200}, + } + req.Headers.Set("Accept", "application/json") + req.Headers.Set("Content-Type", "text/plain") + req.Options.Values.Set("bulk-delete", "true") + resp, err := req.Do(a.backend) + if err != nil { + return 0, 0, err + } + + result, err := parseBulkResponse(resp.Body) + return result.NumberDeleted, result.NumberNotFound, err +} + +type bulkResponse struct { + //ResponseStatus indicates the overall result as a HTTP status string, e.g. + //"201 Created" or "500 Internal Error". + ResponseStatus string `json:"Response Status"` + //ResponseBody contains an overall error message for errors that are not + //related to a single file in the archive (e.g. "invalid tar file" or "Max + //delete failures exceeded"). + ResponseBody string `json:"Response Body"` + //Errors contains error messages for individual files. Each entry is a + //[]string with 2 elements, the object's fullName and the HTTP status for + //this file's upload (e.g. "412 Precondition Failed"). + Errors [][]string `json:"Errors"` + //NumberFilesCreated is included in the BulkUpload result only. + NumberFilesCreated int `json:"Number Files Created"` + //NumberDeleted is included in the BulkDelete result only. + NumberDeleted int `json:"Number Deleted"` + //NumberNotFound is included in the BulkDelete result only. + NumberNotFound int `json:"Number Not Found"` +} + +func parseBulkResponse(body io.ReadCloser) (bulkResponse, error) { + var resp bulkResponse + err := json.NewDecoder(body).Decode(&resp) + closeErr := body.Close() if err == nil { err = closeErr } if err != nil { - return 0, err + return resp, err } - //parse `result` into type BulkError + //parse `resp` into type BulkError bulkErr := BulkError{ - OverallError: result.ResponseBody, + OverallError: resp.ResponseBody, } - bulkErr.StatusCode, err = parseResponseStatus(result.ResponseStatus) + bulkErr.StatusCode, err = parseResponseStatus(resp.ResponseStatus) if err != nil { - return 0, err + return resp, err } - for _, suberr := range result.Errors { + for _, suberr := range resp.Errors { if len(suberr) != 2 { continue //wtf } - nameFields := strings.SplitN(suberr[0], "/", 2) - for len(nameFields) < 2 { - nameFields = append(nameFields, "") - } statusCode, err := parseResponseStatus(suberr[1]) if err != nil { - return 0, err + return resp, err } - bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, BulkObjectError{ - ContainerName: nameFields[0], - ObjectName: nameFields[1], - StatusCode: statusCode, - }) + bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, + makeBulkObjectError(suberr[0], statusCode), + ) } //is BulkError really an error? if len(bulkErr.ObjectErrors) == 0 && bulkErr.OverallError == "" && bulkErr.StatusCode >= 200 && bulkErr.StatusCode < 300 { - return result.NumberFilesCreated, nil + return resp, nil } - return result.NumberFilesCreated, bulkErr -} - -func parseResponseStatus(status string) (int, error) { - //`status` looks like "201 Created" - fields := strings.SplitN(status, " ", 2) - return strconv.Atoi(fields[0]) + return resp, bulkErr + //NOTE: `resp` is passed back to the caller to read the counters + //(resp.NumberFilesCreated etc.) } @@ -39,6 +39,10 @@ var ( //ErrNotSupported is returned by bulk operations, large object operations, //etc. if the server does not support the requested operation. ErrNotSupported = errors.New("operation not supported by this Swift server") + //ErrAccountMismatch is returned by operations on an account that accept + //objects as arguments, if (some of) the provided objects are located in a + //different account. + ErrAccountMismatch = errors.New("some of the given objects are not in this account") ) //UnexpectedStatusCodeError is generated when a request to Swift does not yield diff --git a/tests/bulk_delete_test.go b/tests/bulk_delete_test.go new file mode 100644 index 0000000..52294ba --- /dev/null +++ b/tests/bulk_delete_test.go @@ -0,0 +1,87 @@ +/****************************************************************************** +* +* Copyright 2018 Stefan Majewsky <majewsky@gmx.net> +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +******************************************************************************/ + +package tests + +import ( + "fmt" + "strings" + "testing" + + "github.com/majewsky/schwift" +) + +func TestBulkDeleteSuccess(t *testing.T) { + testWithAccount(t, func(a *schwift.Account) { + c, err := a.Container("schwift-test-bulkdelete").EnsureExists() + expectSuccess(t, err) + objs, err := createTestObjects(c) + expectSuccess(t, err) + + numDeleted, numNotFound, err := c.Account().BulkDelete(objs, nil, nil, nil) + expectSuccess(t, err) + expectInt(t, numDeleted, len(objs)) + expectInt(t, numNotFound, 0) + expectContainerExistence(t, c, true) + + numDeleted, numNotFound, err = c.Account().BulkDelete(objs, nil, nil, nil) + expectSuccess(t, err) + expectInt(t, numDeleted, 0) + expectInt(t, numNotFound, len(objs)) + expectContainerExistence(t, c, true) + + objs, err = createTestObjects(c) + expectSuccess(t, err) + cs := []*schwift.Container{c} + + numDeleted, numNotFound, err = c.Account().BulkDelete(objs, cs, nil, nil) + expectSuccess(t, err) + expectInt(t, numDeleted, len(objs)+1) + expectInt(t, numNotFound, 0) + expectContainerExistence(t, c, false) + }) +} + +func TestBulkDeleteError(t *testing.T) { + testWithContainer(t, func(c *schwift.Container) { + objs, err := createTestObjects(c) + expectSuccess(t, err) + objs = objs[1:] + cs := []*schwift.Container{c} + + //not deleting all objects should lead to 409 Conflict when deleting the Container + numDeleted, numNotFound, err := c.Account().BulkDelete(objs, cs, nil, nil) + expectInt(t, numDeleted, len(objs)) + expectInt(t, numNotFound, 0) + expectError(t, err, "400 Bad Request (+1 object errors)") + expectContainerExistence(t, c, true) + }) +} + +func createTestObjects(c *schwift.Container) ([]*schwift.Object, error) { + var objs []*schwift.Object + for idx := 1; idx <= 5; idx++ { + obj := c.Object(fmt.Sprintf("object%d", idx)) + err := obj.Upload(strings.NewReader("example"), nil, nil) + if err != nil { + return nil, err + } + objs = append(objs, obj) + } + return objs, nil +} diff --git a/tests/container_test.go b/tests/container_test.go index 5a0046b..c90a339 100644 --- a/tests/container_test.go +++ b/tests/container_test.go @@ -83,3 +83,11 @@ func TestContainerUpdate(t *testing.T) { }) } + +func expectContainerExistence(t *testing.T, c *schwift.Container, expectedExists bool) { + t.Helper() + c.Invalidate() + actualExists, err := c.Exists() + expectSuccess(t, err) + expectBool(t, actualExists, expectedExists) +} |
