From 0a9bb149419c4df0be820f2cb72c8cc7e3332c68 Mon Sep 17 00:00:00 2001 From: Stefan Majewsky Date: Thu, 8 Mar 2018 22:36:04 +0100 Subject: add Account.BulkUpload(), BulkUploadError, BulkObjectError --- bulk.go | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++++ errors.go | 47 ++++++++++++++++ tests/bulk_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 337 insertions(+) create mode 100644 bulk.go create mode 100644 tests/bulk_test.go diff --git a/bulk.go b/bulk.go new file mode 100644 index 0000000..448c10c --- /dev/null +++ b/bulk.go @@ -0,0 +1,155 @@ +/****************************************************************************** +* +* Copyright 2018 Stefan Majewsky +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +******************************************************************************/ + +package schwift + +import ( + "encoding/json" + "io" + "strconv" + "strings" +) + +//BulkUploadFormat enumerates possible archive formats for Container.BulkUpload(). +type BulkUploadFormat string + +const ( + //BulkUploadTar is a plain tar archive. + BulkUploadTar BulkUploadFormat = "tar" + //BulkUploadTarGzip is a GZip-compressed tar archive. + BulkUploadTarGzip BulkUploadFormat = "tar.gz" + //BulkUploadTarBzip2 is a BZip2-compressed tar archive. + BulkUploadTarBzip2 BulkUploadFormat = "tar.bz2" +) + +//BulkUpload extracts an archive (which may contain multiple files) into a +//Swift account. The path of each file in the archive is appended to the +//uploadPath to form the FullName() of the resulting Object. +// +//For example, when uploading an archive that contains the file "a/b/c": +// +// //This uploads the file into the container "a" as object "b/c". +// account.BulkUpload("", format, contents, nil, nil) +// //This uploads the file into the container "foo" as object "a/b/c". +// account.BulkUpload("foo", format, contents, nil, nil) +// //This uploads the file into the container "foo" as object "bar/baz/a/b/c". +// account.BulkUpload("foo/bar/baz", format, contents, nil, nil) +// +//The first return value indicates the number of files that have been created +//on the server side. This may be lower than the number of files in the archive +//if some files could not be saved individually (e.g. because a quota was +//exceeded in the middle of the archive extraction). +// +//If not nil, the error return value is *usually* an instance of +//schwift.BulkUploadError. +// +//This operation returns (0, ErrNotSupported) if the server does not support +//bulk-uploading. +func (a *Account) BulkUpload(uploadPath string, format BulkUploadFormat, contents io.Reader, headers AccountHeaders, opts *RequestOptions) (int, error) { + caps, err := a.Capabilities() + if err != nil { + return 0, err + } + if caps.BulkUpload == nil { + return 0, ErrNotSupported + } + + req := Request{ + Method: "PUT", + Body: contents, + Headers: headersToHTTP(headers), + Options: cloneRequestOptions(opts), + ExpectStatusCodes: []int{200}, + } + req.Headers.Set("Accept", "application/json") + req.Options.Values.Set("extract-archive", string(format)) + + fields := strings.SplitN(strings.Trim(uploadPath, "/"), "/", 2) + req.ContainerName = fields[0] + if len(fields) == 2 { + req.ObjectName = fields[1] + } + + resp, err := req.Do(a.backend) + if err != nil { + return 0, err + } + + var result struct { + //ResponseStatus indicates the overall result as a HTTP status string, e.g. + //"201 Created" or "500 Internal Error". + ResponseStatus string `json:"Response Status"` + //ResponseBody contains an overall error message for errors that are not + //related to a single file in the archive (e.g. "invalid tar file" or "Max + //delete failures exceeded"). + ResponseBody string `json:"Response Body"` + //Errors contains error messages for individual files. Each entry is a + //[]string with 2 elements, the object's fullName and the HTTP status for + //this file's upload (e.g. "412 Precondition Failed"). + Errors [][]string `json:"Errors"` + //NumberFilesCreated is self-explanatory. + NumberFilesCreated int `json:"Number Files Created"` + } + err = json.NewDecoder(resp.Body).Decode(&result) + closeErr := resp.Body.Close() + if err == nil { + err = closeErr + } + if err != nil { + return 0, err + } + + //parse `result` into type BulkUploadError + bulkErr := BulkUploadError{ + ArchiveError: result.ResponseBody, + } + bulkErr.StatusCode, err = parseResponseStatus(result.ResponseStatus) + if err != nil { + return 0, err + } + for _, suberr := range result.Errors { + if len(suberr) != 2 { + continue //wtf + } + nameFields := strings.SplitN(suberr[0], "/", 2) + for len(nameFields) < 2 { + nameFields = append(nameFields, "") + } + statusCode, err := parseResponseStatus(suberr[1]) + if err != nil { + return 0, err + } + bulkErr.ObjectErrors = append(bulkErr.ObjectErrors, BulkObjectError{ + ContainerName: nameFields[0], + ObjectName: nameFields[1], + StatusCode: statusCode, + }) + } + + //is BulkUploadError really an error? + if len(bulkErr.ObjectErrors) == 0 && bulkErr.ArchiveError == "" && bulkErr.StatusCode >= 200 && bulkErr.StatusCode < 300 { + return result.NumberFilesCreated, nil + } + return result.NumberFilesCreated, bulkErr +} + +func parseResponseStatus(status string) (int, error) { + //`status` looks like "201 Created" + fields := strings.SplitN(status, " ", 2) + return strconv.Atoi(fields[0]) +} diff --git a/errors.go b/errors.go index 0aa7acc..3ce7cd0 100644 --- a/errors.go +++ b/errors.go @@ -36,6 +36,9 @@ var ( //ErrMalformedContainerName is returned by Request.Do() if ContainerName //contains slashes. ErrMalformedContainerName = errors.New("container name may not contain slashes") + //ErrNotSupported is returned by bulk operations, large object operations, + //etc. if the server does not support the requested operation. + ErrNotSupported = errors.New("operation not supported by this Swift server") ) //UnexpectedStatusCodeError is generated when a request to Swift does not yield @@ -62,6 +65,50 @@ func (e UnexpectedStatusCodeError) Error() string { return msg } +//BulkObjectError is the error message for a single object in a bulk operation. +//It is not generated individually, only as part of BulkUploadError and BulkDeleteError. +type BulkObjectError struct { + ContainerName string + ObjectName string + StatusCode int +} + +//Error implements the builtin/error interface. +func (e BulkObjectError) Error() string { + return fmt.Sprintf("%s/%s: %d %s", + e.ContainerName, e.ObjectName, + e.StatusCode, http.StatusText(e.StatusCode), + ) +} + +//BulkUploadError is returned by Account.BulkUpload() when the archive was +//uploaded and unpacked successfully, but some (or all) files could not be +//saved in Swift. +type BulkUploadError struct { + //StatusCode contains the overall HTTP status code of the operation. + StatusCode int + //ArchiveError contains the error that occurred while unpacking the archive, + //or if the archive as a whole was not acceptable. If may be empty if no + //error occurred at this point. + ArchiveError string + //ObjectErrors contains errors that occurred while trying to save an + //individual file from the archive. It may be empty. + ObjectErrors []BulkObjectError +} + +//Error implements the builtin/error interface. To fit into one line, it +//condenses the ObjectErrors into a count. +func (e BulkUploadError) Error() string { + result := fmt.Sprintf("%d %s", e.StatusCode, http.StatusText(e.StatusCode)) + if e.ArchiveError != "" { + result += ": " + e.ArchiveError + } + if len(e.ObjectErrors) > 0 { + result += fmt.Sprintf(" (+%d object errors)", len(e.ObjectErrors)) + } + return result +} + //Is checks if the given error is an UnexpectedStatusCodeError for that status //code. For example: // diff --git a/tests/bulk_test.go b/tests/bulk_test.go new file mode 100644 index 0000000..b7f1e77 --- /dev/null +++ b/tests/bulk_test.go @@ -0,0 +1,135 @@ +/****************************************************************************** +* +* Copyright 2018 Stefan Majewsky +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +******************************************************************************/ + +package tests + +import ( + "archive/tar" + "bytes" + "strings" + "testing" + + "github.com/majewsky/schwift" +) + +func TestBulkUploadSuccess(t *testing.T) { + testWithContainer(t, func(c *schwift.Container) { + obj1 := c.Object("file1") + obj2 := c.Object("file2") + + archive := buildTarArchive(map[string][]byte{ + obj1.FullName(): []byte("hello"), + obj2.FullName(): []byte("world"), + }) + n, err := c.Account().BulkUpload( + "", //upload path + schwift.BulkUploadTar, + bytes.NewReader(archive), + nil, nil, + ) + expectInt(t, n, 2) + expectSuccess(t, err) + + expectObjectExistence(t, obj1, true) + expectObjectExistence(t, obj2, true) + expectObjectContent(t, obj1, []byte("hello")) + expectObjectContent(t, obj2, []byte("world")) + }) +} + +func TestBulkUploadArchiveError(t *testing.T) { + testWithContainer(t, func(c *schwift.Container) { + n, err := c.Account().BulkUpload( + c.Name(), //upload path + schwift.BulkUploadTar, + strings.NewReader("This is not the TAR archive you're looking for."), + nil, nil, + ) + expectInt(t, n, 0) + expectError(t, err, "400 Bad Request: Invalid Tar File: truncated header") + bulkErr := err.(schwift.BulkUploadError) + expectInt(t, bulkErr.StatusCode, 400) + expectString(t, bulkErr.ArchiveError, "Invalid Tar File: truncated header") + expectInt(t, len(bulkErr.ObjectErrors), 0) + }) +} + +func TestBulkUploadObjectError(t *testing.T) { + testWithContainer(t, func(c *schwift.Container) { + obj1 := c.Object(buildInvalidObjectName()) + obj2 := c.Object("file2") + expectObjectExistence(t, obj2, false) + + archive := buildTarArchive(map[string][]byte{ + obj1.Name(): []byte("hello"), + obj2.Name(): []byte("world"), + }) + n, err := c.Account().BulkUpload( + c.Name(), //upload path + schwift.BulkUploadTar, + bytes.NewReader(archive), + nil, nil, + ) + expectInt(t, n, 1) + expectError(t, err, "400 Bad Request (+1 object errors)") + bulkErr := err.(schwift.BulkUploadError) + expectInt(t, len(bulkErr.ObjectErrors), 1) + expectString(t, bulkErr.ObjectErrors[0].ContainerName, c.Name()) + expectInt(t, bulkErr.ObjectErrors[0].StatusCode, 400) + //^ We cannot match the ObjectName (or use expectError, for that matter) + //here because Swift truncates the object name to its max length. + + //even if some files cannot be processed, the other files shall be stored correctly + expectObjectExistence(t, obj2, true) + expectObjectContent(t, obj2, []byte("world")) + }) +} + +func buildTarArchive(files map[string][]byte) []byte { + var buf bytes.Buffer + w := tar.NewWriter(&buf) + for fileName, contents := range files { + err := w.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: fileName, + Size: int64(len(contents)), + Mode: 0100644, + }) + if err != nil { + panic(err.Error()) + } + _, err = w.Write(contents) + if err != nil { + panic(err.Error()) + } + } + err := w.Close() + if err != nil { + panic(err.Error()) + } + return buf.Bytes() +} + +func buildInvalidObjectName() string { + //5000 is more than the usual max_object_name_length of 1024 + buf := make([]byte, 5000) + for idx := range buf { + buf[idx] = 'a' + } + return string(buf) +} -- cgit v1.2.3