aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Majewsky <majewsky@gmx.net>2018-03-09 18:12:30 +0100
committerStefan Majewsky <majewsky@gmx.net>2018-03-09 18:12:30 +0100
commitaaf61ac55e18a04fd68b9b6ee4fd4fce49659eeb (patch)
tree31933f305106293d3c8119855bc382e7e014d45d
parent9e8ed9ef479ca2084d9e34edfda0a99be34dbdb5 (diff)
downloadgo-schwift-aaf61ac55e18a04fd68b9b6ee4fd4fce49659eeb.tar.gz
add Account.BulkDelete()
-rw-r--r--bulk.go272
-rw-r--r--errors.go4
-rw-r--r--tests/bulk_delete_test.go87
-rw-r--r--tests/container_test.go8
4 files changed, 330 insertions, 41 deletions
diff --git a/bulk.go b/bulk.go
index c012c42..02dbd99 100644
--- a/bulk.go
+++ b/bulk.go
@@ -20,7 +20,10 @@ package schwift
import (
"encoding/json"
+ "fmt"
"io"
+ "net/http"
+ "net/url"
"strconv"
"strings"
)
@@ -90,66 +93,253 @@ func (a *Account) BulkUpload(uploadPath string, format BulkUploadFormat, content
return 0, err
}
- var result struct {
- //ResponseStatus indicates the overall result as a HTTP status string, e.g.
- //"201 Created" or "500 Internal Error".
- ResponseStatus string `json:"Response Status"`
- //ResponseBody contains an overall error message for errors that are not
- //related to a single file in the archive (e.g. "invalid tar file" or "Max
- //delete failures exceeded").
- ResponseBody string `json:"Response Body"`
- //Errors contains error messages for individual files. Each entry is a
- //[]string with 2 elements, the object's fullName and the HTTP status for
- //this file's upload (e.g. "412 Precondition Failed").
- Errors [][]string `json:"Errors"`
- //NumberFilesCreated is self-explanatory.
- NumberFilesCreated int `json:"Number Files Created"`
- }
- err = json.NewDecoder(resp.Body).Decode(&result)
- closeErr := resp.Body.Close()
+ result, err := parseBulkResponse(resp.Body)
+ return result.NumberFilesCreated, err
+}
+
+func parseResponseStatus(status string) (int, error) {
+ //`status` looks like "201 Created"
+ fields := strings.SplitN(status, " ", 2)
+ return strconv.Atoi(fields[0])
+}
+
+func makeBulkObjectError(fullName string, statusCode int) BulkObjectError {
+ nameFields := strings.SplitN(fullName, "/", 2)
+ for len(nameFields) < 2 {
+ nameFields = append(nameFields, "")
+ }
+ return BulkObjectError{
+ ContainerName: nameFields[0],
+ ObjectName: nameFields[1],
+ StatusCode: statusCode,
+ }
+}
+
+//BulkDelete deletes a large number of objects (and containers) at once.
+//Containers are queued at the end of the deletion, so a container can be
+//deleted in the same call in which all objects in it are deleted.
+//
+//For example, to delete all objects in a container:
+//
+// var container *schwift.Container
+//
+// objects, err := container.Objects().Collect()
+// numDeleted, numNotFound, err := container.Account().BulkDelete(objects, nil, nil, nil)
+//
+//To also delete the container:
+//
+// var container *schwift.Container
+//
+// objects, err := container.Objects().Collect()
+// numDeleted, numNotFound, err := container.Account().BulkDelete(
+// objects, []*schwift.Container{container}, nil, nil)
+//
+//If the server does not support bulk-deletion, this function will just call
+//Object.Delete() for each given object and Container.Delete() for each given
+//container, and aggregate the result.
+//
+//If not nil, the error return value is *usually* an instance of
+//BulkError.
+//
+//The objects may be located in multiple containers, but they and the
+//containers must all be located in the given account. (Otherwise,
+//ErrAccountMismatch is returned.)
+func (a *Account) BulkDelete(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (numDeleted int, numNotFound int, deleteError error) {
+ //validate that all given objects are in this account
+ for _, obj := range objects {
+ other := obj.Container().Account()
+ if other.baseURL != a.baseURL || other.name != a.name {
+ return 0, 0, ErrAccountMismatch
+ }
+ }
+ for _, container := range containers {
+ other := container.Account()
+ if other.baseURL != a.baseURL || other.name != a.name {
+ return 0, 0, ErrAccountMismatch
+ }
+ }
+
+ //check capabilities to choose deletion method
+ caps, err := a.Capabilities()
+ if err != nil {
+ return 0, 0, err
+ }
+ if caps.BulkDelete == nil {
+ return a.bulkDeleteSingle(objects, containers, headers, opts)
+ }
+ chunkSize := int(caps.BulkDelete.MaximumDeletesPerRequest)
+
+ //collect names of things to delete into one big list
+ var names []string
+ for _, object := range objects {
+ object.Invalidate() //deletion must invalidate objects!
+ names = append(names, fmt.Sprintf("/%s/%s",
+ url.PathEscape(object.Container().Name()),
+ url.PathEscape(object.Name()),
+ ))
+ }
+ for _, container := range containers {
+ container.Invalidate() //deletion must invalidate objects!
+ names = append(names, "/"+url.PathEscape(container.Name()))
+ }
+
+ //split list into chunks according to maximum allowed
+ //chunk size; aggregate results
+ for len(names) > 0 {
+ //this condition holds only in the final iteration
+ if chunkSize > len(names) {
+ chunkSize = len(names)
+ }
+ chunk := names[0:chunkSize]
+ names = names[chunkSize:]
+
+ numDeletedNow, numNotFoundNow, err := a.bulkDelete(chunk, headers, opts)
+ numDeleted += numDeletedNow
+ numNotFound += numNotFoundNow
+ if err != nil {
+ return numDeleted, numNotFound, err
+ }
+ }
+
+ return numDeleted, numNotFound, nil
+}
+
+//Implementation of BulkDelete() for servers that *do not* support bulk
+//deletion.
+func (a *Account) bulkDeleteSingle(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (int, int, error) {
+ var (
+ numDeleted = 0
+ numNotFound = 0
+ errs []BulkObjectError
+ )
+
+ handleSingleError := func(containerName, objectName string, err error) error {
+ if err == nil {
+ numDeleted++
+ return nil
+ }
+ if Is(err, http.StatusNotFound) {
+ numNotFound++
+ return nil
+ }
+ if statusErr, ok := err.(UnexpectedStatusCodeError); ok {
+ errs = append(errs, BulkObjectError{
+ ContainerName: containerName,
+ ObjectName: objectName,
+ StatusCode: statusErr.ActualResponse.StatusCode,
+ })
+ return nil
+ }
+ //unexpected error type -> stop early
+ return err
+ }
+
+ for _, obj := range objects {
+ err := obj.Delete(ObjectHeaders(headers), opts) //this implies Invalidate()
+ err = handleSingleError(obj.Container().Name(), obj.Name(), err)
+ if err != nil {
+ return numDeleted, numNotFound, err
+ }
+ }
+
+ for _, container := range containers {
+ err := container.Delete(ContainerHeaders(headers), opts) //this implies Invalidate()
+ err = handleSingleError(container.Name(), "", err)
+ if err != nil {
+ return numDeleted, numNotFound, err
+ }
+ }
+
+ if len(errs) == 0 {
+ return numDeleted, numNotFound, nil
+ }
+ return numDeleted, numNotFound, BulkError{
+ StatusCode: errs[0].StatusCode,
+ OverallError: http.StatusText(errs[0].StatusCode),
+ ObjectErrors: errs,
+ }
+}
+
+//Implementation of BulkDelete() for servers that *do* support bulk deletion.
+//This function is called *after* chunking, so `len(names) <=
+//account.Capabilities.BulkDelete.MaximumDeletesPerRequest`.
+func (a *Account) bulkDelete(names []string, headers AccountHeaders, opts *RequestOptions) (int, int, error) {
+ req := Request{
+ Method: "DELETE",
+ Body: strings.NewReader(strings.Join(names, "\n") + "\n"),
+ Headers: headersToHTTP(headers),
+ Options: cloneRequestOptions(opts),
+ ExpectStatusCodes: []int{200},
+ }
+ req.Headers.Set("Accept", "application/json")
+ req.Headers.Set("Content-Type", "text/plain")
+ req.Options.Values.Set("bulk-delete", "true")
+ resp, err := req.Do(a.backend)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ result, err := parseBulkResponse(resp.Body)
+ return result.NumberDeleted, result.NumberNotFound, err
+}
+
+type bulkResponse struct {
+ //ResponseStatus indicates the overall result as a HTTP status string, e.g.
+ //"201 Created" or "500 Internal Error".
+ ResponseStatus string `json:"Response Status"`
+ //ResponseBody contains an overall error message for errors that are not
+ //related to a single file in the archive (e.g. "invalid tar file" or "Max
+ //delete failures exceeded").
+ ResponseBody string `json:"Response Body"`
+ //Errors contains error messages for individual files. Each entry is a
+ //[]string with 2 elements, the object's fullName and the HTTP status for
+ //this file's upload (e.g. "412 Precondition Failed").
+ Errors [][]string `json:"Errors"`
+ //NumberFilesCreated is included in the BulkUpload result only.
+ NumberFilesCreated int `json:"Number Files Created"`
+ //NumberDeleted is included in the BulkDelete result only.
+ NumberDeleted int `json:"Number Deleted"`
+ //NumberNotFound is included in the BulkDelete result only.
+ NumberNotFound int `json:"Number Not Found"`
+}
+
+func parseBulkResponse(body io.ReadCloser) (bulkResponse, error) {
+ var resp bulkResponse
+ err := json.NewDecoder(body).Decode(&resp)
+ closeErr := body.Close()
if err == nil {
err = closeErr
}
if err != nil {
- return 0, err
+ return resp, err
}
- //parse `result` into type BulkError
+ //parse `resp` into type BulkError
bulkErr := BulkError{
- OverallError: result.ResponseBody,
+ OverallError: resp.ResponseBody,
}
- bulkErr.StatusCode, err = parseResponseStatus(result.ResponseStatus)
+ bulkErr.StatusCode, err = parseResponseStatus(resp.ResponseStatus)
if err != nil {
- return 0, err
+ return resp, err
}
- for _, suberr := range result.Errors {
+ for _, suberr := range resp.Errors {
if len(suberr) != 2 {
continue //wtf
}
- nameFields := strings.SplitN(suberr[0], "/", 2)
- for len(nameFields) < 2 {
- nameFields = append(nameFields, "")
- }
statusCode, err := parseResponseStatus(suberr[1])
if err != nil {
- return 0, err
+ return resp, err
}
- bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, BulkObjectError{
- ContainerName: nameFields[0],
- ObjectName: nameFields[1],
- StatusCode: statusCode,
- })
+ bulkErr.ObjectErrors = append(bulkErr.ObjectErrors,
+ makeBulkObjectError(suberr[0], statusCode),
+ )
}
//is BulkError really an error?
if len(bulkErr.ObjectErrors) == 0 && bulkErr.OverallError == "" && bulkErr.StatusCode >= 200 && bulkErr.StatusCode < 300 {
- return result.NumberFilesCreated, nil
+ return resp, nil
}
- return result.NumberFilesCreated, bulkErr
-}
-
-func parseResponseStatus(status string) (int, error) {
- //`status` looks like "201 Created"
- fields := strings.SplitN(status, " ", 2)
- return strconv.Atoi(fields[0])
+ return resp, bulkErr
+ //NOTE: `resp` is passed back to the caller to read the counters
+ //(resp.NumberFilesCreated etc.)
}
diff --git a/errors.go b/errors.go
index 1eab131..d9487c8 100644
--- a/errors.go
+++ b/errors.go
@@ -39,6 +39,10 @@ var (
//ErrNotSupported is returned by bulk operations, large object operations,
//etc. if the server does not support the requested operation.
ErrNotSupported = errors.New("operation not supported by this Swift server")
+ //ErrAccountMismatch is returned by operations on an account that accept
+ //objects as arguments, if (some of) the provided objects are located in a
+ //different account.
+ ErrAccountMismatch = errors.New("some of the given objects are not in this account")
)
//UnexpectedStatusCodeError is generated when a request to Swift does not yield
diff --git a/tests/bulk_delete_test.go b/tests/bulk_delete_test.go
new file mode 100644
index 0000000..52294ba
--- /dev/null
+++ b/tests/bulk_delete_test.go
@@ -0,0 +1,87 @@
+/******************************************************************************
+*
+* Copyright 2018 Stefan Majewsky <majewsky@gmx.net>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+******************************************************************************/
+
+package tests
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/majewsky/schwift"
+)
+
+func TestBulkDeleteSuccess(t *testing.T) {
+ testWithAccount(t, func(a *schwift.Account) {
+ c, err := a.Container("schwift-test-bulkdelete").EnsureExists()
+ expectSuccess(t, err)
+ objs, err := createTestObjects(c)
+ expectSuccess(t, err)
+
+ numDeleted, numNotFound, err := c.Account().BulkDelete(objs, nil, nil, nil)
+ expectSuccess(t, err)
+ expectInt(t, numDeleted, len(objs))
+ expectInt(t, numNotFound, 0)
+ expectContainerExistence(t, c, true)
+
+ numDeleted, numNotFound, err = c.Account().BulkDelete(objs, nil, nil, nil)
+ expectSuccess(t, err)
+ expectInt(t, numDeleted, 0)
+ expectInt(t, numNotFound, len(objs))
+ expectContainerExistence(t, c, true)
+
+ objs, err = createTestObjects(c)
+ expectSuccess(t, err)
+ cs := []*schwift.Container{c}
+
+ numDeleted, numNotFound, err = c.Account().BulkDelete(objs, cs, nil, nil)
+ expectSuccess(t, err)
+ expectInt(t, numDeleted, len(objs)+1)
+ expectInt(t, numNotFound, 0)
+ expectContainerExistence(t, c, false)
+ })
+}
+
+func TestBulkDeleteError(t *testing.T) {
+ testWithContainer(t, func(c *schwift.Container) {
+ objs, err := createTestObjects(c)
+ expectSuccess(t, err)
+ objs = objs[1:]
+ cs := []*schwift.Container{c}
+
+ //not deleting all objects should lead to 409 Conflict when deleting the Container
+ numDeleted, numNotFound, err := c.Account().BulkDelete(objs, cs, nil, nil)
+ expectInt(t, numDeleted, len(objs))
+ expectInt(t, numNotFound, 0)
+ expectError(t, err, "400 Bad Request (+1 object errors)")
+ expectContainerExistence(t, c, true)
+ })
+}
+
+func createTestObjects(c *schwift.Container) ([]*schwift.Object, error) {
+ var objs []*schwift.Object
+ for idx := 1; idx <= 5; idx++ {
+ obj := c.Object(fmt.Sprintf("object%d", idx))
+ err := obj.Upload(strings.NewReader("example"), nil, nil)
+ if err != nil {
+ return nil, err
+ }
+ objs = append(objs, obj)
+ }
+ return objs, nil
+}
diff --git a/tests/container_test.go b/tests/container_test.go
index 5a0046b..c90a339 100644
--- a/tests/container_test.go
+++ b/tests/container_test.go
@@ -83,3 +83,11 @@ func TestContainerUpdate(t *testing.T) {
})
}
+
+func expectContainerExistence(t *testing.T, c *schwift.Container, expectedExists bool) {
+ t.Helper()
+ c.Invalidate()
+ actualExists, err := c.Exists()
+ expectSuccess(t, err)
+ expectBool(t, actualExists, expectedExists)
+}