diff options
Diffstat (limited to 'largeobject.go')
| -rw-r--r-- | largeobject.go | 689 |
1 files changed, 689 insertions, 0 deletions
diff --git a/largeobject.go b/largeobject.go new file mode 100644 index 0000000..493671d --- /dev/null +++ b/largeobject.go @@ -0,0 +1,689 @@ +/****************************************************************************** +* +* Copyright 2018 Stefan Majewsky <majewsky@gmx.net> +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +******************************************************************************/ + +package schwift + +import ( + "bufio" + "bytes" + "crypto/md5" + "encoding/base64" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "math" + "net/http" + "net/url" + "path" + "regexp" + "strconv" + "strings" + + "github.com/jpillora/longestcommon" +) + +//SegmentInfo describes a segment of a large object. +// +//For .RangeLength == 0, the segment consists of all the bytes in the backing +//object, after skipping the first .RangeOffset bytes. The default +//(.RangeOffset == 0) includes the entire contents of the backing object. +// +//For .RangeLength > 0, the segment consists of that many bytes from the +//backing object, again after skipping the first .RangeOffset bytes. +// +//However, for .RangeOffset < 0, the segment consists of .RangeLength many bytes +//from the *end* of the backing object. (The concrete value for .RangeOffset is +//disregarded.) .RangeLength must be non-zero in this case. +// +//Sorry that specifying a range is that involved. I was just following orders ^W +//RFC 7233, section 3.1 here. +type SegmentInfo struct { + Object *Object + SizeBytes uint64 + Etag string + RangeLength uint64 + RangeOffset int64 + //Static Large Objects support data segments that are not backed by actual + //objects. For those kinds of segments, only the Data attribute is set and + //all other attributes are set to their default values (esp. .Object == nil). + // + //Data segments can only be used for small chunks of data because the SLO + //manifest (the list of all SegmentInfo encoded as JSON) is severely limited + //in size (usually to 8 MiB). + Data []byte +} + +type sloSegmentInfo struct { + Path string `json:"path,omitempty"` + SizeBytes uint64 `json:"size_bytes,omitempty"` + Etag string `json:"etag,omitempty"` + Range string `json:"range,omitempty"` + DataBase64 string `json:"data,omitempty"` +} + +//LargeObjectOpenMode is a set of flags that can be given to +//LargeObject.Open(). +type LargeObjectOpenMode int + +const ( + //OpenTruncate indicates that all existing segments in this object shall be + //deleted by Open(). + OpenTruncate LargeObjectOpenMode = 0 + //OpenAppend indicates that Open() shall set up the writer to append new + //content to the existing segments. + OpenAppend LargeObjectOpenMode = 1 << 0 + //OpenKeepSegments indicates that, when truncating an existing object, the + //segments shall not be deleted even though they are no longer referenced by + //this object. This flag has no effect when combined with OpenAppend. + OpenKeepSegments LargeObjectOpenMode = 1 << 1 +) + +//LargeObjectStrategy is an enum of segmenting strategies supported by Swift. +type LargeObjectStrategy int + +const ( + //StaticLargeObject is the default LargeObjectStrategy used by Schwift. + StaticLargeObject LargeObjectStrategy = iota + //DynamicLargeObject is an older LargeObjectStrategy that is not recommended + //for new applications because of eventual consistency problems and missing + //support for several newer features (e.g. data segments, range specifications). + DynamicLargeObject +) + +//////////////////////////////////////////////////////////////////////////////// + +//LargeObject is a wrapper for type Object that performs operations specific to +//large objects. +// +//This type should only be constructed through the Object.AsLargeObject() +//method. If the object does not exist yet, the SegmentContainerName and +//SegmentPrefix must be specified before this object can be written to, and the +//Strategy can be adjusted in the unlikely case that an SLO is not desired. +type LargeObject struct { + Object *Object + SegmentContainer *Container + SegmentPrefix string + Strategy LargeObjectStrategy + //This is private so that we can later optimize this to load the segments + //only on demand. + segments []SegmentInfo +} + +//AsLargeObject prepares a LargeObject instance. If the given object exists, +//but is not a large object, ErrNotLarge will be returned. If the given object +//does not yet exist, the SegmentContainer and SegmentPrefix attributes need to +//be filled in before the LargeObject can be used. +func (o *Object) AsLargeObject() (*LargeObject, error) { + exists, err := o.Exists() + if err != nil { + return nil, err + } + if !exists { + return &LargeObject{Object: o, Strategy: StaticLargeObject}, nil + } + + h := o.headers + if h.IsDynamicLargeObject() { + return o.asDLO(h.Get("X-Object-Manifest")) + } + if h.IsStaticLargeObject() { + return o.asSLO() + } + return nil, ErrNotLarge +} + +func (o *Object) asDLO(manifestStr string) (*LargeObject, error) { + manifest := strings.SplitN(manifestStr, "/", 2) + if len(manifest) < 2 { + return nil, ErrNotLarge + } + + lo := &LargeObject{ + Object: o, + SegmentContainer: o.c.a.Container(manifest[0]), + SegmentPrefix: manifest[1], + Strategy: DynamicLargeObject, + } + + iter := lo.SegmentContainer.Objects() + iter.Prefix = lo.SegmentPrefix + segmentInfos, err := iter.CollectDetailed() + if err != nil { + return nil, err + } + lo.segments = make([]SegmentInfo, 0, len(segmentInfos)) + for _, info := range segmentInfos { + lo.segments = append(lo.segments, SegmentInfo{ + Object: info.Object, + SizeBytes: info.SizeBytes, + Etag: info.Etag, + }) + } + + return lo, nil +} + +func (o *Object) asSLO() (*LargeObject, error) { + opts := RequestOptions{ + Values: make(url.Values), + } + opts.Values.Set("multipart-manifest", "get") + opts.Values.Set("format", "raw") + buf, err := o.Download(&opts).AsByteSlice() + if err != nil { + return nil, err + } + + var data []sloSegmentInfo + err = json.Unmarshal(buf, &data) + if err != nil { + return nil, errors.New("invalid SLO manifest: " + err.Error()) + } + + lo := &LargeObject{ + Object: o, + Strategy: StaticLargeObject, + } + if len(data) == 0 { + return lo, nil + } + + //read the segments first, then deduce the SegmentContainer/SegmentPrefix from these + lo.segments = make([]SegmentInfo, 0, len(data)) + for _, info := range data { + //option 1: data segment + if info.DataBase64 != "" { + data, err := base64.StdEncoding.DecodeString(info.DataBase64) + if err != nil { + return nil, errors.New("invalid SLO data segment: " + err.Error()) + } + lo.segments = append(lo.segments, SegmentInfo{Data: data}) + continue + } + + //option 2: segment backed by object + pathElements := strings.SplitN(strings.TrimPrefix(info.Path, "/"), "/", 2) + if len(pathElements) != 2 { + return nil, errors.New("invalid SLO segment: malformed path: " + info.Path) + } + s := SegmentInfo{ + Object: o.c.a.Container(pathElements[0]).Object(pathElements[1]), + SizeBytes: info.SizeBytes, + Etag: info.Etag, + } + if info.Range != "" { + var ok bool + s.RangeOffset, s.RangeLength, ok = parseHTTPRange(info.Range) + if !ok { + return nil, errors.New("invalid SLO segment: malformed range: " + info.Range) + } + } + lo.segments = append(lo.segments, s) + } + + //choose the SegmentContainer by majority vote (in the spirit of "be liberal + //in what you accept") + containerNames := make(map[string]uint) + for _, s := range lo.segments { + if s.Object == nil { //can happen for data segments + continue + } + containerNames[s.Object.c.Name()]++ + } + maxName := "" + maxVotes := uint(0) + for name, votes := range containerNames { + if votes > maxVotes { + maxName = name + maxVotes = votes + } + } + lo.SegmentContainer = lo.Object.c.a.Container(maxName) + + //choose the SegmentPrefix as the longest common prefix of all segments in + //the chosen SegmentContainer... + names := make([]string, 0, len(lo.segments)) + for _, s := range lo.segments { + if s.Object == nil { //can happen for data segments + continue + } + name := s.Object.c.Name() + if name == maxName { + names = append(names, s.Object.Name()) + } + } + lo.SegmentPrefix = longestcommon.Prefix(names) + + //..BUT if the prefix is a path with slashes, do not consider the part after + //the last slash; e.g. if we have segments "foo/bar/0001" and "foo/bar/0002", + //the longest common prefix is "foo/bar/000", but we actually want "foo/bar/" + if strings.Contains(lo.SegmentPrefix, "/") { + lo.SegmentPrefix = path.Dir(lo.SegmentPrefix) + "/" + } + + return lo, nil +} + +func parseHTTPRange(str string) (offsetVal int64, lengthVal uint64, ok bool) { + fields := strings.SplitN(str, "-", 2) + if len(fields) != 2 { + return 0, 0, false + } + + if fields[0] == "" { + //case 1: "-" + if fields[1] == "" { + return 0, 0, true + } + + //case 2: "-N" + numBytes, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, 0, false + } + return -1, numBytes, true + } + + firstByte, err := strconv.ParseUint(fields[0], 10, 63) //not 64; needs to be unsigned, but also fit into int64 + if err != nil { + return 0, 0, false + } + if fields[1] == "" { + //case 3: "N-" + return int64(firstByte), 0, true + } + //case 4: "M-N" + lastByte, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil || lastByte < firstByte { + return 0, 0, false + } + return int64(firstByte), lastByte - firstByte + 1, true +} + +//Open returns an io.WriteCloser that can be used to replace or extend the +//contents of this large object. +// +//This call returns ErrNoContainerName if o.SegmentContainer is not set, or +//ErrAccountMismatch if it is not in the same account as the large object. +//For existing objects, SegmentContainer and SegmentPrefix will be filled by +//Object.AsLargeObject(). For new objects, they need to be filled by the +//caller. +// +//WARNING: Every call to Write() on the returned writer will create a new +//segment. To ensure a uniform segment size, wrap the writer returned from this +//call in a bufio.Writer, for example by using the schwift.SetSegmentSize() +//convenience function: +// +// dlo, err := account.Container("public").Object("archive27.zip").AsLargeObject() +// dlo.SegmentContainer = account.Container("segments") +// dlo.SegmentPrefix = "archive27/" +// w, err := dlo.Open(schwift.OpenTruncate) +// w, err = schwift.SetSegmentSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB +// _, err = bw.Write(archiveContents) +// err = w.Close() +// +func (lo *LargeObject) Open(mode LargeObjectOpenMode) (io.WriteCloser, error) { + if lo.SegmentContainer == nil { + return nil, ErrNoContainerName + } + if !lo.SegmentContainer.a.isEqualTo(lo.Object.c.a) { + return nil, ErrAccountMismatch + } + + if mode&OpenAppend == 0 { + if mode&OpenKeepSegments == 0 { + segmentObjects := make([]*Object, len(lo.segments)) + for idx, segment := range lo.segments { + segmentObjects[idx] = segment.Object + } + _, _, err := lo.Object.c.a.BulkDelete(segmentObjects, nil, nil) + if err != nil { + return nil, err + } + } + lo.segments = nil + } + + return largeObjectWriter{lo}, nil +} + +//Segments returns a list of all segments for this object, in order. +func (lo *LargeObject) Segments() ([]SegmentInfo, error) { + //NOTE: This method has an error return value because we might later switch + //to loading segments lazily inside this method. + return lo.segments, nil +} + +//NextSegmentObject suggests where to upload the next segment. +// +//WARNING: This is a low-level function. Most callers will want to use the +//io.WriteCloser provided by Open(). You will only need to upload segments +//manually when you want to control the segments' metadata. +// +//If the name of the current final segment ends with a counter, that counter is +//incremented, otherwise a counter is appended to its name. When looking for a +//counter in an existing segment name, the regex /[0-9]+$/ is used. For example, +//given: +// +// segments := lo.Segments() +// lastSegmentName := segments[len(segments)-1].Name() +// nextSegmentName := lo.NextSegmentObject().Name() +// +//If lastSegmentName is "segments/archive/segment0001", then nextSegmentName is +//"segments/archive/segment0002". If lastSegmentName is +//"segments/archive/first", then nextSegmentName is +//"segments/archive/first0000000000000001". +// +//However, the last segment's name will only be considered if it lies within +//lo.SegmentContainer below lo.SegmentPrefix. If that is not the case, the name +//of the last segment that does will be used instead. +// +//If there are no segments yet, or if all segments are located outside the +//lo.SegmentContainer and lo.SegmentPrefix, the first segment name is chosen as +//lo.SegmentPrefix + "0000000000000001". +func (lo *LargeObject) NextSegmentObject() *Object { + //find the name of the last-most segment that is within the designated + //segment container and prefix + var prevSegmentName string + for _, s := range lo.segments { + o := s.Object + if o == nil { //can happen for data segments + continue + } + if lo.SegmentContainer.isEqualTo(o.c) && strings.HasPrefix(o.Name(), lo.SegmentPrefix) { + prevSegmentName = s.Object.Name() + //keep going, we want to find the last such segment + } + } + + //choose the next segment name based on the previous one + var segmentName string + if prevSegmentName == "" { + segmentName = lo.SegmentPrefix + initialIndex + } else { + segmentName = nextSegmentName(prevSegmentName) + } + + return lo.SegmentContainer.Object(segmentName) +} + +var splitSegmentIndexRx = regexp.MustCompile(`^(.*?)([0-9]+$)`) +var initialIndex = "0000000000000001" + +//Given the object name of a previous large object segment, compute a suitable +//name for the next segment. See doc for LargeObject.NextSegmentObject() +//for how this works. +func nextSegmentName(segmentName string) string { + match := splitSegmentIndexRx.FindStringSubmatch(segmentName) + if match == nil { + return segmentName + initialIndex + } + base, idxStr := match[1], match[2] + + idx, err := strconv.ParseUint(idxStr, 10, 64) + if err != nil || idx == math.MaxUint64 { //overflow + //start from one again, but separate with a dash to ensure that the new + //index can be parsed properly in the next call to this function + return segmentName + "-" + initialIndex + } + + //print next index with same number of digits as previous index, + //e.g. "00001" -> "00002" (except if overflow, e.g. "9999" -> "10000") + formatStr := fmt.Sprintf("%%0%dd", len(idxStr)) + return base + fmt.Sprintf(formatStr, idx+1) +} + +//AddSegment appends a segment to this object. The segment must already have +//been uploaded. +// +//WARNING: This is a low-level function. Most callers will want to use the +//io.WriteCloser provided by Open(). You will only need to add segments +//manually when you want to control the segments' metadata, or when using +//advanced features such as range-limited segments or data segments. +// +//This method returns ErrAccountMismatch if the segment is not located in a +//container in the same account. +// +//For dynamic large objects, this method returns ErrContainerMismatch if the +//segment is not located in the correct container below the correct prefix. +// +//This method returns ErrSegmentInvalid if: +// +//- a range is specified in the SegmentInfo, but it is invalid or the +//LargeObject is a dynamic large object (DLOs do not support ranges), or +// +//- the SegmentInfo's Data attribute is set and any other attribute is also +//set (segments cannot be backed by objects and be data segments at the same +//time), or +// +//- the SegmentInfo's Data attribute is set, but the LargeObject is a dynamic +//large objects (DLOs do not support data segments). +func (lo *LargeObject) AddSegment(segment SegmentInfo) error { + if len(segment.Data) == 0 { + //validate segments backed by objects + o := segment.Object + if o == nil { + //required attributes + return ErrSegmentInvalid + } + if !o.c.a.isEqualTo(lo.SegmentContainer.a) { + return ErrAccountMismatch + } + + switch lo.Strategy { + case DynamicLargeObject: + if segment.RangeLength != 0 || segment.RangeOffset != 0 { + //not supported for DLO + return ErrSegmentInvalid + } + + if !o.c.isEqualTo(lo.SegmentContainer) { + return ErrContainerMismatch + } + if !strings.HasPrefix(o.name, lo.SegmentPrefix) { + return ErrContainerMismatch + } + + case StaticLargeObject: + if segment.RangeLength == 0 && segment.RangeOffset < 0 { + //malformed range + return ErrSegmentInvalid + } + } + } else { + //validate plain-data segments + if lo.Strategy != StaticLargeObject { + //not supported for DLO + return ErrSegmentInvalid + } + if segment.Object != nil || segment.SizeBytes != 0 || segment.Etag != "" || segment.RangeLength != 0 || segment.RangeOffset != 0 { + //all other attributes must be unset + return ErrSegmentInvalid + } + } + + lo.segments = append(lo.segments, segment) + return nil +} + +//WriteManifest creates this large object by writing a manifest to its +//location using a PUT request. +// +//For dynamic large objects, this method does not generate a PUT request +//if the object already exists and has the correct manifest (i.e. +//SegmentContainer and SegmentPrefix have not been changed). +func (lo *LargeObject) WriteManifest(opts *RequestOptions) error { + switch lo.Strategy { + case StaticLargeObject: + return lo.writeSLOManifest(opts) + case DynamicLargeObject: + return lo.writeDLOManifest(opts) + default: + panic("no such strategy") + } +} + +func (lo *LargeObject) writeDLOManifest(opts *RequestOptions) error { + manifest := lo.SegmentContainer.Name() + "/" + lo.SegmentPrefix + + //check if the manifest is already set correctly + headers, err := lo.Object.Headers() + if err != nil && !Is(err, http.StatusNotFound) { + return err + } + if headers.Get("X-Object-Manifest") == manifest { + return nil + } + + //write manifest; make sure that this is a DLO + opts = cloneRequestOptions(opts, nil) + opts.Headers.Set("X-Object-Manifest", manifest) + return lo.Object.Upload(nil, opts) +} + +func (lo *LargeObject) writeSLOManifest(opts *RequestOptions) error { + sloSegments := make([]sloSegmentInfo, len(lo.segments)) + for idx, s := range lo.segments { + if len(s.Data) > 0 { + sloSegments[idx] = sloSegmentInfo{ + DataBase64: base64.StdEncoding.EncodeToString(s.Data), + } + } else { + si := sloSegmentInfo{ + Path: "/" + s.Object.FullName(), + SizeBytes: s.SizeBytes, + Etag: s.Etag, + } + + if s.RangeOffset < 0 { + si.Range = "-" + strconv.FormatUint(s.RangeLength, 10) + } else { + firstByteStr := strconv.FormatUint(uint64(s.RangeOffset), 10) + lastByteStr := strconv.FormatUint(uint64(s.RangeOffset)+s.RangeLength-1, 10) + si.Range = firstByteStr + "-" + lastByteStr + } + + sloSegments[idx] = si + } + } + + manifest, err := json.Marshal(sloSegments) + if err != nil { + //failing json.Marshal() on such a trivial data structure is alarming + panic(err.Error()) + } + + opts = cloneRequestOptions(opts, nil) + opts.Headers.Del("X-Object-Manifest") //ensure sanity :) + opts.Values.Set("multipart-manifest", "put") + return lo.Object.Upload(bytes.NewReader(manifest), opts) +} + +//////////////////////////////////////////////////////////////////////////////// + +type largeObjectWriter struct { + lo *LargeObject +} + +//Write implements the io.WriteCloser interface. +func (w largeObjectWriter) Write(buf []byte) (int, error) { + segment := w.lo.NextSegmentObject() + //TODO: split write into multiple segments if len(buf) > max object size + err := segment.Upload(bytes.NewReader(buf), nil) + if err != nil { + return 0, err + } + + sum := md5.Sum(buf) + return len(buf), w.lo.AddSegment(SegmentInfo{ + Object: segment, + SizeBytes: uint64(len(buf)), + Etag: hex.EncodeToString(sum[:]), + }) +} + +//Close implements the io.WriteCloser interface. +func (w largeObjectWriter) Close() error { + return w.lo.WriteManifest(nil) +} + +//////////////////////////////////////////////////////////////////////////////// + +type largeObjectBufferedWriter struct { + bw *bufio.Writer + w io.WriteCloser +} + +//SetSegmentSize creates a bufio.Writer around an io.WriteCloser and returns +//an interface to it that works like the original io.WriteCloser. +// +//This is intended to be used when writing segments into a large object. +//The writer returned by LargeObject.Open() does not ensure a uniform segment +//size by default, so one would have to wrap it in a bufio.Writer like so: +// +// dlo, err := account.Container("public").Object("archive27.zip").AsLargeObject() +// dlo.SegmentContainer = account.Container("segments") +// dlo.SegmentPrefix = "archive27/" +// +// w, err := largeObject.Open(schwift.OpenTruncate) +// bw, err := bufio.NewWriterSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB +// _, err = bw.Write(archiveContents) +// err = bw.Flush() +// err = w.Close() +// +//This function reduces the boilerplate to: +// +// w, err := largeObject.Open(schwift.OpenTruncate) +// w, err = schwift.SetSegmentSize(w, 1<<30) //segment size 1<<30 byte = 1 GiB +// _, err = w.Write(archiveContents) +// err = w.Close() +// +//Another advantage of this function is that the returned writer implements +//io.WriteCloser, which bufio.Writer does not. So you can pass it into +//consuming functions that use io.WriteCloser to close the object once they're +//done writing to it, and it will be ensured that the buffer is flushed before +//closing the underlying writer. +func SetSegmentSize(w io.WriteCloser, segmentSizeBytes int) io.WriteCloser { + switch w := w.(type) { + case *largeObjectBufferedWriter: + //never chain multiple largeObjectBufferedWriter together + w.bw.Flush() //ensure that previous calls to `w.Write()` are durable + return SetSegmentSize(w.w, segmentSizeBytes) + default: + return &largeObjectBufferedWriter{ + bw: bufio.NewWriterSize(w, segmentSizeBytes), + w: w, + } + } +} + +//Write implements the io.WriteCloser interface. +func (bw *largeObjectBufferedWriter) Write(buf []byte) (int, error) { + return bw.bw.Write(buf) +} + +//Close implements the io.WriteCloser interface. +func (bw *largeObjectBufferedWriter) Close() error { + err := bw.bw.Flush() + if err != nil { + return err + } + return bw.w.Close() +} |
