diff options
Diffstat (limited to 'bulk.go')
| -rw-r--r-- | bulk.go | 272 |
1 files changed, 231 insertions, 41 deletions
@@ -20,7 +20,10 @@ package schwift import ( "encoding/json" + "fmt" "io" + "net/http" + "net/url" "strconv" "strings" ) @@ -90,66 +93,253 @@ func (a *Account) BulkUpload(uploadPath string, format BulkUploadFormat, content return 0, err } - var result struct { - //ResponseStatus indicates the overall result as a HTTP status string, e.g. - //"201 Created" or "500 Internal Error". - ResponseStatus string `json:"Response Status"` - //ResponseBody contains an overall error message for errors that are not - //related to a single file in the archive (e.g. "invalid tar file" or "Max - //delete failures exceeded"). - ResponseBody string `json:"Response Body"` - //Errors contains error messages for individual files. Each entry is a - //[]string with 2 elements, the object's fullName and the HTTP status for - //this file's upload (e.g. "412 Precondition Failed"). - Errors [][]string `json:"Errors"` - //NumberFilesCreated is self-explanatory. - NumberFilesCreated int `json:"Number Files Created"` - } - err = json.NewDecoder(resp.Body).Decode(&result) - closeErr := resp.Body.Close() + result, err := parseBulkResponse(resp.Body) + return result.NumberFilesCreated, err +} + +func parseResponseStatus(status string) (int, error) { + //`status` looks like "201 Created" + fields := strings.SplitN(status, " ", 2) + return strconv.Atoi(fields[0]) +} + +func makeBulkObjectError(fullName string, statusCode int) BulkObjectError { + nameFields := strings.SplitN(fullName, "/", 2) + for len(nameFields) < 2 { + nameFields = append(nameFields, "") + } + return BulkObjectError{ + ContainerName: nameFields[0], + ObjectName: nameFields[1], + StatusCode: statusCode, + } +} + +//BulkDelete deletes a large number of objects (and containers) at once. +//Containers are queued at the end of the deletion, so a container can be +//deleted in the same call in which all objects in it are deleted. +// +//For example, to delete all objects in a container: +// +// var container *schwift.Container +// +// objects, err := container.Objects().Collect() +// numDeleted, numNotFound, err := container.Account().BulkDelete(objects, nil, nil, nil) +// +//To also delete the container: +// +// var container *schwift.Container +// +// objects, err := container.Objects().Collect() +// numDeleted, numNotFound, err := container.Account().BulkDelete( +// objects, []*schwift.Container{container}, nil, nil) +// +//If the server does not support bulk-deletion, this function will just call +//Object.Delete() for each given object and Container.Delete() for each given +//container, and aggregate the result. +// +//If not nil, the error return value is *usually* an instance of +//BulkError. +// +//The objects may be located in multiple containers, but they and the +//containers must all be located in the given account. (Otherwise, +//ErrAccountMismatch is returned.) +func (a *Account) BulkDelete(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (numDeleted int, numNotFound int, deleteError error) { + //validate that all given objects are in this account + for _, obj := range objects { + other := obj.Container().Account() + if other.baseURL != a.baseURL || other.name != a.name { + return 0, 0, ErrAccountMismatch + } + } + for _, container := range containers { + other := container.Account() + if other.baseURL != a.baseURL || other.name != a.name { + return 0, 0, ErrAccountMismatch + } + } + + //check capabilities to choose deletion method + caps, err := a.Capabilities() + if err != nil { + return 0, 0, err + } + if caps.BulkDelete == nil { + return a.bulkDeleteSingle(objects, containers, headers, opts) + } + chunkSize := int(caps.BulkDelete.MaximumDeletesPerRequest) + + //collect names of things to delete into one big list + var names []string + for _, object := range objects { + object.Invalidate() //deletion must invalidate objects! + names = append(names, fmt.Sprintf("/%s/%s", + url.PathEscape(object.Container().Name()), + url.PathEscape(object.Name()), + )) + } + for _, container := range containers { + container.Invalidate() //deletion must invalidate objects! + names = append(names, "/"+url.PathEscape(container.Name())) + } + + //split list into chunks according to maximum allowed + //chunk size; aggregate results + for len(names) > 0 { + //this condition holds only in the final iteration + if chunkSize > len(names) { + chunkSize = len(names) + } + chunk := names[0:chunkSize] + names = names[chunkSize:] + + numDeletedNow, numNotFoundNow, err := a.bulkDelete(chunk, headers, opts) + numDeleted += numDeletedNow + numNotFound += numNotFoundNow + if err != nil { + return numDeleted, numNotFound, err + } + } + + return numDeleted, numNotFound, nil +} + +//Implementation of BulkDelete() for servers that *do not* support bulk +//deletion. +func (a *Account) bulkDeleteSingle(objects []*Object, containers []*Container, headers AccountHeaders, opts *RequestOptions) (int, int, error) { + var ( + numDeleted = 0 + numNotFound = 0 + errs []BulkObjectError + ) + + handleSingleError := func(containerName, objectName string, err error) error { + if err == nil { + numDeleted++ + return nil + } + if Is(err, http.StatusNotFound) { + numNotFound++ + return nil + } + if statusErr, ok := err.(UnexpectedStatusCodeError); ok { + errs = append(errs, BulkObjectError{ + ContainerName: containerName, + ObjectName: objectName, + StatusCode: statusErr.ActualResponse.StatusCode, + }) + return nil + } + //unexpected error type -> stop early + return err + } + + for _, obj := range objects { + err := obj.Delete(ObjectHeaders(headers), opts) //this implies Invalidate() + err = handleSingleError(obj.Container().Name(), obj.Name(), err) + if err != nil { + return numDeleted, numNotFound, err + } + } + + for _, container := range containers { + err := container.Delete(ContainerHeaders(headers), opts) //this implies Invalidate() + err = handleSingleError(container.Name(), "", err) + if err != nil { + return numDeleted, numNotFound, err + } + } + + if len(errs) == 0 { + return numDeleted, numNotFound, nil + } + return numDeleted, numNotFound, BulkError{ + StatusCode: errs[0].StatusCode, + OverallError: http.StatusText(errs[0].StatusCode), + ObjectErrors: errs, + } +} + +//Implementation of BulkDelete() for servers that *do* support bulk deletion. +//This function is called *after* chunking, so `len(names) <= +//account.Capabilities.BulkDelete.MaximumDeletesPerRequest`. +func (a *Account) bulkDelete(names []string, headers AccountHeaders, opts *RequestOptions) (int, int, error) { + req := Request{ + Method: "DELETE", + Body: strings.NewReader(strings.Join(names, "\n") + "\n"), + Headers: headersToHTTP(headers), + Options: cloneRequestOptions(opts), + ExpectStatusCodes: []int{200}, + } + req.Headers.Set("Accept", "application/json") + req.Headers.Set("Content-Type", "text/plain") + req.Options.Values.Set("bulk-delete", "true") + resp, err := req.Do(a.backend) + if err != nil { + return 0, 0, err + } + + result, err := parseBulkResponse(resp.Body) + return result.NumberDeleted, result.NumberNotFound, err +} + +type bulkResponse struct { + //ResponseStatus indicates the overall result as a HTTP status string, e.g. + //"201 Created" or "500 Internal Error". + ResponseStatus string `json:"Response Status"` + //ResponseBody contains an overall error message for errors that are not + //related to a single file in the archive (e.g. "invalid tar file" or "Max + //delete failures exceeded"). + ResponseBody string `json:"Response Body"` + //Errors contains error messages for individual files. Each entry is a + //[]string with 2 elements, the object's fullName and the HTTP status for + //this file's upload (e.g. "412 Precondition Failed"). + Errors [][]string `json:"Errors"` + //NumberFilesCreated is included in the BulkUpload result only. + NumberFilesCreated int `json:"Number Files Created"` + //NumberDeleted is included in the BulkDelete result only. + NumberDeleted int `json:"Number Deleted"` + //NumberNotFound is included in the BulkDelete result only. + NumberNotFound int `json:"Number Not Found"` +} + +func parseBulkResponse(body io.ReadCloser) (bulkResponse, error) { + var resp bulkResponse + err := json.NewDecoder(body).Decode(&resp) + closeErr := body.Close() if err == nil { err = closeErr } if err != nil { - return 0, err + return resp, err } - //parse `result` into type BulkError + //parse `resp` into type BulkError bulkErr := BulkError{ - OverallError: result.ResponseBody, + OverallError: resp.ResponseBody, } - bulkErr.StatusCode, err = parseResponseStatus(result.ResponseStatus) + bulkErr.StatusCode, err = parseResponseStatus(resp.ResponseStatus) if err != nil { - return 0, err + return resp, err } - for _, suberr := range result.Errors { + for _, suberr := range resp.Errors { if len(suberr) != 2 { continue //wtf } - nameFields := strings.SplitN(suberr[0], "/", 2) - for len(nameFields) < 2 { - nameFields = append(nameFields, "") - } statusCode, err := parseResponseStatus(suberr[1]) if err != nil { - return 0, err + return resp, err } - bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, BulkObjectError{ - ContainerName: nameFields[0], - ObjectName: nameFields[1], - StatusCode: statusCode, - }) + bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, + makeBulkObjectError(suberr[0], statusCode), + ) } //is BulkError really an error? if len(bulkErr.ObjectErrors) == 0 && bulkErr.OverallError == "" && bulkErr.StatusCode >= 200 && bulkErr.StatusCode < 300 { - return result.NumberFilesCreated, nil + return resp, nil } - return result.NumberFilesCreated, bulkErr -} - -func parseResponseStatus(status string) (int, error) { - //`status` looks like "201 Created" - fields := strings.SplitN(status, " ", 2) - return strconv.Atoi(fields[0]) + return resp, bulkErr + //NOTE: `resp` is passed back to the caller to read the counters + //(resp.NumberFilesCreated etc.) } |
