mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-02 03:27:54 +00:00
Bump deps
This commit is contained in:
136
vendor/github.com/ncw/swift/dlo.go
generated
vendored
Normal file
136
vendor/github.com/ncw/swift/dlo.go
generated
vendored
Normal file
@@ -0,0 +1,136 @@
|
||||
package swift
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// DynamicLargeObjectCreateFile represents an open static large object
|
||||
type DynamicLargeObjectCreateFile struct {
|
||||
largeObjectCreateFile
|
||||
}
|
||||
|
||||
// DynamicLargeObjectCreateFile creates a dynamic large object
|
||||
// returning an object which satisfies io.Writer, io.Seeker, io.Closer
|
||||
// and io.ReaderFrom. The flags are as passes to the
|
||||
// largeObjectCreate method.
|
||||
func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
lo, err := c.largeObjectCreate(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return withBuffer(opts, &DynamicLargeObjectCreateFile{
|
||||
largeObjectCreateFile: *lo,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// DynamicLargeObjectCreate creates or truncates an existing dynamic
|
||||
// large object returning a writeable object. This sets opts.Flags to
|
||||
// an appropriate value before calling DynamicLargeObjectCreateFile
|
||||
func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
opts.Flags = os.O_TRUNC | os.O_CREATE
|
||||
return c.DynamicLargeObjectCreateFile(opts)
|
||||
}
|
||||
|
||||
// DynamicLargeObjectDelete deletes a dynamic large object and all of its segments.
|
||||
func (c *Connection) DynamicLargeObjectDelete(container string, path string) error {
|
||||
return c.LargeObjectDelete(container, path)
|
||||
}
|
||||
|
||||
// DynamicLargeObjectMove moves a dynamic large object from srcContainer, srcObjectName to dstContainer, dstObjectName
|
||||
func (c *Connection) DynamicLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
|
||||
info, headers, err := c.Object(dstContainer, srcObjectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
segmentContainer, segmentPath := parseFullPath(headers["X-Object-Manifest"])
|
||||
if err := c.createDLOManifest(dstContainer, dstObjectName, segmentContainer+"/"+segmentPath, info.ContentType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createDLOManifest creates a dynamic large object manifest
|
||||
func (c *Connection) createDLOManifest(container string, objectName string, prefix string, contentType string) error {
|
||||
headers := make(Headers)
|
||||
headers["X-Object-Manifest"] = prefix
|
||||
manifest, err := c.ObjectCreate(container, objectName, false, "", contentType, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := manifest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close satisfies the io.Closer interface
|
||||
func (file *DynamicLargeObjectCreateFile) Close() error {
|
||||
return file.Flush()
|
||||
}
|
||||
|
||||
func (file *DynamicLargeObjectCreateFile) Flush() error {
|
||||
err := file.conn.createDLOManifest(file.container, file.objectName, file.segmentContainer+"/"+file.prefix, file.contentType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
|
||||
}
|
||||
|
||||
func (c *Connection) getAllDLOSegments(segmentContainer, segmentPath string) ([]Object, error) {
|
||||
//a simple container listing works 99.9% of the time
|
||||
segments, err := c.ObjectsAll(segmentContainer, &ObjectsOpts{Prefix: segmentPath})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasObjectName := make(map[string]struct{})
|
||||
for _, segment := range segments {
|
||||
hasObjectName[segment.Name] = struct{}{}
|
||||
}
|
||||
|
||||
//The container listing might be outdated (i.e. not contain all existing
|
||||
//segment objects yet) because of temporary inconsistency (Swift is only
|
||||
//eventually consistent!). Check its completeness.
|
||||
segmentNumber := 0
|
||||
for {
|
||||
segmentNumber++
|
||||
segmentName := getSegment(segmentPath, segmentNumber)
|
||||
if _, seen := hasObjectName[segmentName]; seen {
|
||||
continue
|
||||
}
|
||||
|
||||
//This segment is missing in the container listing. Use a more reliable
|
||||
//request to check its existence. (HEAD requests on segments are
|
||||
//guaranteed to return the correct metadata, except for the pathological
|
||||
//case of an outage of large parts of the Swift cluster or its network,
|
||||
//since every segment is only written once.)
|
||||
segment, _, err := c.Object(segmentContainer, segmentName)
|
||||
switch err {
|
||||
case nil:
|
||||
//found new segment -> add it in the correct position and keep
|
||||
//going, more might be missing
|
||||
if segmentNumber <= len(segments) {
|
||||
segments = append(segments[:segmentNumber], segments[segmentNumber-1:]...)
|
||||
segments[segmentNumber-1] = segment
|
||||
} else {
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
continue
|
||||
case ObjectNotFound:
|
||||
//This segment is missing. Since we upload segments sequentially,
|
||||
//there won't be any more segments after it.
|
||||
return segments, nil
|
||||
default:
|
||||
return nil, err //unexpected error
|
||||
}
|
||||
}
|
||||
}
|
||||
445
vendor/github.com/ncw/swift/largeobjects.go
generated
vendored
Normal file
445
vendor/github.com/ncw/swift/largeobjects.go
generated
vendored
Normal file
@@ -0,0 +1,445 @@
|
||||
package swift
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
gopath "path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NotLargeObject is returned if an operation is performed on an object which isn't large.
|
||||
var NotLargeObject = errors.New("Not a large object")
|
||||
|
||||
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
|
||||
var readAfterWriteTimeout = 15 * time.Second
|
||||
|
||||
// readAfterWriteWait defines the time to sleep between two retries
|
||||
var readAfterWriteWait = 200 * time.Millisecond
|
||||
|
||||
// largeObjectCreateFile represents an open static or dynamic large object
|
||||
type largeObjectCreateFile struct {
|
||||
conn *Connection
|
||||
container string
|
||||
objectName string
|
||||
currentLength int64
|
||||
filePos int64
|
||||
chunkSize int64
|
||||
segmentContainer string
|
||||
prefix string
|
||||
contentType string
|
||||
checkHash bool
|
||||
segments []Object
|
||||
headers Headers
|
||||
minChunkSize int64
|
||||
}
|
||||
|
||||
func swiftSegmentPath(path string) (string, error) {
|
||||
checksum := sha1.New()
|
||||
random := make([]byte, 32)
|
||||
if _, err := rand.Read(random); err != nil {
|
||||
return "", err
|
||||
}
|
||||
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
|
||||
return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
||||
}
|
||||
|
||||
func getSegment(segmentPath string, partNumber int) string {
|
||||
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
|
||||
}
|
||||
|
||||
func parseFullPath(manifest string) (container string, prefix string) {
|
||||
components := strings.SplitN(manifest, "/", 2)
|
||||
container = components[0]
|
||||
if len(components) > 1 {
|
||||
prefix = components[1]
|
||||
}
|
||||
return container, prefix
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObjectDLO() bool {
|
||||
_, isDLO := headers["X-Object-Manifest"]
|
||||
return isDLO
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObjectSLO() bool {
|
||||
_, isSLO := headers["X-Static-Large-Object"]
|
||||
return isSLO
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObject() bool {
|
||||
return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
|
||||
}
|
||||
|
||||
func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
|
||||
if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
|
||||
segmentContainer, segmentPath := parseFullPath(manifest)
|
||||
segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
|
||||
return segmentContainer, segments, err
|
||||
}
|
||||
if headers.IsLargeObjectSLO() {
|
||||
return c.getAllSLOSegments(container, path)
|
||||
}
|
||||
return "", nil, NotLargeObject
|
||||
}
|
||||
|
||||
// LargeObjectOpts describes how a large object should be created
|
||||
type LargeObjectOpts struct {
|
||||
Container string // Name of container to place object
|
||||
ObjectName string // Name of object
|
||||
Flags int // Creation flags
|
||||
CheckHash bool // If set Check the hash
|
||||
Hash string // If set use this hash to check
|
||||
ContentType string // Content-Type of the object
|
||||
Headers Headers // Additional headers to upload the object with
|
||||
ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set
|
||||
MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info
|
||||
SegmentContainer string // Name of the container to place segments
|
||||
SegmentPrefix string // Prefix to use for the segments
|
||||
NoBuffer bool // Prevents using a bufio.Writer to write segments
|
||||
}
|
||||
|
||||
type LargeObjectFile interface {
|
||||
io.Writer
|
||||
io.Seeker
|
||||
io.Closer
|
||||
Size() int64
|
||||
Flush() error
|
||||
}
|
||||
|
||||
// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
|
||||
//
|
||||
// opts.Flags can have the following bits set
|
||||
// os.TRUNC - remove the contents of the large object if it exists
|
||||
// os.APPEND - write at the end of the large object
|
||||
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
|
||||
var (
|
||||
segmentPath string
|
||||
segmentContainer string
|
||||
segments []Object
|
||||
currentLength int64
|
||||
err error
|
||||
)
|
||||
|
||||
if opts.SegmentPrefix != "" {
|
||||
segmentPath = opts.SegmentPrefix
|
||||
} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
|
||||
if opts.Flags&os.O_TRUNC != 0 {
|
||||
c.LargeObjectDelete(opts.Container, opts.ObjectName)
|
||||
} else {
|
||||
currentLength = info.Bytes
|
||||
if headers.IsLargeObject() {
|
||||
segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(segments) > 0 {
|
||||
segmentPath = gopath.Dir(segments[0].Name)
|
||||
}
|
||||
} else {
|
||||
if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
segments = append(segments, info)
|
||||
}
|
||||
}
|
||||
} else if err != ObjectNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// segmentContainer is not empty when the manifest already existed
|
||||
if segmentContainer == "" {
|
||||
if opts.SegmentContainer != "" {
|
||||
segmentContainer = opts.SegmentContainer
|
||||
} else {
|
||||
segmentContainer = opts.Container + "_segments"
|
||||
}
|
||||
}
|
||||
|
||||
file := &largeObjectCreateFile{
|
||||
conn: c,
|
||||
checkHash: opts.CheckHash,
|
||||
container: opts.Container,
|
||||
objectName: opts.ObjectName,
|
||||
chunkSize: opts.ChunkSize,
|
||||
minChunkSize: opts.MinChunkSize,
|
||||
headers: opts.Headers,
|
||||
segmentContainer: segmentContainer,
|
||||
prefix: segmentPath,
|
||||
segments: segments,
|
||||
currentLength: currentLength,
|
||||
}
|
||||
|
||||
if file.chunkSize == 0 {
|
||||
file.chunkSize = 10 * 1024 * 1024
|
||||
}
|
||||
|
||||
if file.minChunkSize > file.chunkSize {
|
||||
file.chunkSize = file.minChunkSize
|
||||
}
|
||||
|
||||
if opts.Flags&os.O_APPEND != 0 {
|
||||
file.filePos = currentLength
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
||||
// LargeObjectDelete deletes the large object named by container, path
|
||||
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
|
||||
_, headers, err := c.Object(container, objectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var objects [][]string
|
||||
if headers.IsLargeObject() {
|
||||
segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, obj := range segments {
|
||||
objects = append(objects, []string{segmentContainer, obj.Name})
|
||||
}
|
||||
}
|
||||
objects = append(objects, []string{container, objectName})
|
||||
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
|
||||
filenames := make([]string, len(objects))
|
||||
for i, obj := range objects {
|
||||
filenames[i] = obj[0] + "/" + obj[1]
|
||||
}
|
||||
_, err = c.doBulkDelete(filenames)
|
||||
// Don't fail on ObjectNotFound because eventual consistency
|
||||
// makes this situation normal.
|
||||
if err != nil && err != Forbidden && err != ObjectNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for _, obj := range objects {
|
||||
if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LargeObjectGetSegments returns all the segments that compose an object
|
||||
// If the object is a Dynamic Large Object (DLO), it just returns the objects
|
||||
// that have the prefix as indicated by the manifest.
|
||||
// If the object is a Static Large Object (SLO), it retrieves the JSON content
|
||||
// of the manifest and return all the segments of it.
|
||||
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
|
||||
_, headers, err := c.Object(container, path)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return c.getAllSegments(container, path, headers)
|
||||
}
|
||||
|
||||
// Seek sets the offset for the next write operation
|
||||
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case 0:
|
||||
file.filePos = offset
|
||||
case 1:
|
||||
file.filePos += offset
|
||||
case 2:
|
||||
file.filePos = file.currentLength + offset
|
||||
default:
|
||||
return -1, fmt.Errorf("invalid value for whence")
|
||||
}
|
||||
if file.filePos < 0 {
|
||||
return -1, fmt.Errorf("negative offset")
|
||||
}
|
||||
return file.filePos, nil
|
||||
}
|
||||
|
||||
func (file *largeObjectCreateFile) Size() int64 {
|
||||
return file.currentLength
|
||||
}
|
||||
|
||||
func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
|
||||
waitingTime := readAfterWriteWait
|
||||
endTimer := time.After(readAfterWriteTimeout)
|
||||
for {
|
||||
var headers Headers
|
||||
var sz int64
|
||||
if headers, sz, err = fn(); err == nil {
|
||||
if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-endTimer:
|
||||
err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
|
||||
return
|
||||
case <-time.After(waitingTime):
|
||||
waitingTime *= 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
|
||||
err = withLORetry(expectedSize, func() (Headers, int64, error) {
|
||||
var info Object
|
||||
var headers Headers
|
||||
info, headers, err = c.objectBase(container, objectName)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, info.Bytes, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Write satisfies the io.Writer interface
|
||||
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
|
||||
var sz int64
|
||||
var relativeFilePos int
|
||||
writeSegmentIdx := 0
|
||||
for i, obj := range file.segments {
|
||||
if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
|
||||
relativeFilePos = int(file.filePos - sz)
|
||||
break
|
||||
}
|
||||
writeSegmentIdx++
|
||||
sz += obj.Bytes
|
||||
}
|
||||
sizeToWrite := len(buf)
|
||||
for offset := 0; offset < sizeToWrite; {
|
||||
newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if writeSegmentIdx < len(file.segments) {
|
||||
file.segments[writeSegmentIdx] = *newSegment
|
||||
} else {
|
||||
file.segments = append(file.segments, *newSegment)
|
||||
}
|
||||
offset += n
|
||||
writeSegmentIdx++
|
||||
relativeFilePos = 0
|
||||
}
|
||||
file.filePos += int64(sizeToWrite)
|
||||
file.currentLength = 0
|
||||
for _, obj := range file.segments {
|
||||
file.currentLength += obj.Bytes
|
||||
}
|
||||
return sizeToWrite, nil
|
||||
}
|
||||
|
||||
func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
|
||||
var (
|
||||
readers []io.Reader
|
||||
existingSegment *Object
|
||||
segmentSize int
|
||||
)
|
||||
segmentName := getSegment(file.prefix, writeSegmentIdx+1)
|
||||
sizeToRead := int(file.chunkSize)
|
||||
if writeSegmentIdx < len(file.segments) {
|
||||
existingSegment = &file.segments[writeSegmentIdx]
|
||||
if writeSegmentIdx != len(file.segments)-1 {
|
||||
sizeToRead = int(existingSegment.Bytes)
|
||||
}
|
||||
if relativeFilePos > 0 {
|
||||
headers := make(Headers)
|
||||
headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
|
||||
existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer existingSegmentReader.Close()
|
||||
sizeToRead -= relativeFilePos
|
||||
segmentSize += relativeFilePos
|
||||
readers = []io.Reader{existingSegmentReader}
|
||||
}
|
||||
}
|
||||
if sizeToRead > len(buf) {
|
||||
sizeToRead = len(buf)
|
||||
}
|
||||
segmentSize += sizeToRead
|
||||
readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
|
||||
if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
|
||||
headers := make(Headers)
|
||||
headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
|
||||
tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer tailSegmentReader.Close()
|
||||
segmentSize = int(existingSegment.Bytes)
|
||||
readers = append(readers, tailSegmentReader)
|
||||
}
|
||||
segmentReader := io.MultiReader(readers...)
|
||||
headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
|
||||
}
|
||||
|
||||
func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
|
||||
if !opts.NoBuffer {
|
||||
return &bufferedLargeObjectFile{
|
||||
LargeObjectFile: lo,
|
||||
bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)),
|
||||
}
|
||||
}
|
||||
return lo
|
||||
}
|
||||
|
||||
type bufferedLargeObjectFile struct {
|
||||
LargeObjectFile
|
||||
bw *bufio.Writer
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Close() error {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return blo.LargeObjectFile.Close()
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
|
||||
return blo.bw.Write(p)
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return blo.LargeObjectFile.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Size() int64 {
|
||||
return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Flush() error {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return blo.LargeObjectFile.Flush()
|
||||
}
|
||||
168
vendor/github.com/ncw/swift/slo.go
generated
vendored
Normal file
168
vendor/github.com/ncw/swift/slo.go
generated
vendored
Normal file
@@ -0,0 +1,168 @@
|
||||
package swift
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
)
|
||||
|
||||
// StaticLargeObjectCreateFile represents an open static large object
|
||||
type StaticLargeObjectCreateFile struct {
|
||||
largeObjectCreateFile
|
||||
}
|
||||
|
||||
var SLONotSupported = errors.New("SLO not supported")
|
||||
|
||||
type swiftSegment struct {
|
||||
Path string `json:"path,omitempty"`
|
||||
Etag string `json:"etag,omitempty"`
|
||||
Size int64 `json:"size_bytes,omitempty"`
|
||||
// When uploading a manifest, the attributes must be named `path`, `etag` and `size_bytes`
|
||||
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
|
||||
// parameter, Swift names those attributes `name`, `hash` and `bytes`.
|
||||
// We use all the different attributes names in this structure to be able to use
|
||||
// the same structure for both uploading and retrieving.
|
||||
Name string `json:"name,omitempty"`
|
||||
Hash string `json:"hash,omitempty"`
|
||||
Bytes int64 `json:"bytes,omitempty"`
|
||||
ContentType string `json:"content_type,omitempty"`
|
||||
LastModified string `json:"last_modified,omitempty"`
|
||||
}
|
||||
|
||||
// StaticLargeObjectCreateFile creates a static large object returning
|
||||
// an object which satisfies io.Writer, io.Seeker, io.Closer and
|
||||
// io.ReaderFrom. The flags are as passed to the largeObjectCreate
|
||||
// method.
|
||||
func (c *Connection) StaticLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err != nil || !info.SupportsSLO() {
|
||||
return nil, SLONotSupported
|
||||
}
|
||||
realMinChunkSize := info.SLOMinSegmentSize()
|
||||
if realMinChunkSize > opts.MinChunkSize {
|
||||
opts.MinChunkSize = realMinChunkSize
|
||||
}
|
||||
lo, err := c.largeObjectCreate(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return withBuffer(opts, &StaticLargeObjectCreateFile{
|
||||
largeObjectCreateFile: *lo,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// StaticLargeObjectCreate creates or truncates an existing static
|
||||
// large object returning a writeable object. This sets opts.Flags to
|
||||
// an appropriate value before calling StaticLargeObjectCreateFile
|
||||
func (c *Connection) StaticLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
opts.Flags = os.O_TRUNC | os.O_CREATE
|
||||
return c.StaticLargeObjectCreateFile(opts)
|
||||
}
|
||||
|
||||
// StaticLargeObjectDelete deletes a static large object and all of its segments.
|
||||
func (c *Connection) StaticLargeObjectDelete(container string, path string) error {
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err != nil || !info.SupportsSLO() {
|
||||
return SLONotSupported
|
||||
}
|
||||
return c.LargeObjectDelete(container, path)
|
||||
}
|
||||
|
||||
// StaticLargeObjectMove moves a static large object from srcContainer, srcObjectName to dstContainer, dstObjectName
|
||||
func (c *Connection) StaticLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
|
||||
swiftInfo, err := c.cachedQueryInfo()
|
||||
if err != nil || !swiftInfo.SupportsSLO() {
|
||||
return SLONotSupported
|
||||
}
|
||||
info, headers, err := c.Object(srcContainer, srcObjectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
container, segments, err := c.getAllSegments(srcContainer, srcObjectName, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.createSLOManifest(dstContainer, dstObjectName, info.ContentType, container, segments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSLOManifest creates a static large object manifest
|
||||
func (c *Connection) createSLOManifest(container string, path string, contentType string, segmentContainer string, segments []Object) error {
|
||||
sloSegments := make([]swiftSegment, len(segments))
|
||||
for i, segment := range segments {
|
||||
sloSegments[i].Path = fmt.Sprintf("%s/%s", segmentContainer, segment.Name)
|
||||
sloSegments[i].Etag = segment.Hash
|
||||
sloSegments[i].Size = segment.Bytes
|
||||
}
|
||||
|
||||
content, err := json.Marshal(sloSegments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("multipart-manifest", "put")
|
||||
if _, err := c.objectPut(container, path, bytes.NewBuffer(content), false, "", contentType, nil, values); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (file *StaticLargeObjectCreateFile) Close() error {
|
||||
return file.Flush()
|
||||
}
|
||||
|
||||
func (file *StaticLargeObjectCreateFile) Flush() error {
|
||||
if err := file.conn.createSLOManifest(file.container, file.objectName, file.contentType, file.segmentContainer, file.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
|
||||
}
|
||||
|
||||
func (c *Connection) getAllSLOSegments(container, path string) (string, []Object, error) {
|
||||
var (
|
||||
segmentList []swiftSegment
|
||||
segments []Object
|
||||
segPath string
|
||||
segmentContainer string
|
||||
)
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("multipart-manifest", "get")
|
||||
|
||||
file, _, err := c.objectOpen(container, path, true, nil, values)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
json.Unmarshal(content, &segmentList)
|
||||
for _, segment := range segmentList {
|
||||
segmentContainer, segPath = parseFullPath(segment.Name[1:])
|
||||
segments = append(segments, Object{
|
||||
Name: segPath,
|
||||
Bytes: segment.Bytes,
|
||||
Hash: segment.Hash,
|
||||
})
|
||||
}
|
||||
|
||||
return segmentContainer, segments, nil
|
||||
}
|
||||
277
vendor/github.com/ncw/swift/swift.go
generated
vendored
277
vendor/github.com/ncw/swift/swift.go
generated
vendored
@@ -33,6 +33,17 @@ const (
|
||||
allObjectsChanLimit = 1000 // ...when fetching to a channel
|
||||
)
|
||||
|
||||
// ObjectType is the type of the swift object, regular, static large,
|
||||
// or dynamic large.
|
||||
type ObjectType int
|
||||
|
||||
// Values that ObjectType can take
|
||||
const (
|
||||
RegularObjectType ObjectType = iota
|
||||
StaticLargeObjectType
|
||||
DynamicLargeObjectType
|
||||
)
|
||||
|
||||
// Connection holds the details of the connection to the swift server.
|
||||
//
|
||||
// You need to provide UserName, ApiKey and AuthUrl when you create a
|
||||
@@ -108,6 +119,8 @@ type Connection struct {
|
||||
client *http.Client
|
||||
Auth Authenticator `json:"-" xml:"-"` // the current authenticator
|
||||
authLock sync.Mutex // lock when R/W StorageUrl, AuthToken, Auth
|
||||
// swiftInfo is filled after QueryInfo is called
|
||||
swiftInfo SwiftInfo
|
||||
}
|
||||
|
||||
// Error - all errors generated by this package are of this type. Other error
|
||||
@@ -406,6 +419,24 @@ func (c *Connection) authenticated() bool {
|
||||
// the enabled middlewares and their configuration
|
||||
type SwiftInfo map[string]interface{}
|
||||
|
||||
func (i SwiftInfo) SupportsBulkDelete() bool {
|
||||
_, val := i["bulk_delete"]
|
||||
return val
|
||||
}
|
||||
|
||||
func (i SwiftInfo) SupportsSLO() bool {
|
||||
_, val := i["slo"]
|
||||
return val
|
||||
}
|
||||
|
||||
func (i SwiftInfo) SLOMinSegmentSize() int64 {
|
||||
if slo, ok := i["slo"].(map[string]interface{}); ok {
|
||||
val, _ := slo["min_segment_size"].(float64)
|
||||
return int64(val)
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
// Discover Swift configuration by doing a request against /info
|
||||
func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
|
||||
infoUrl, err := url.Parse(c.StorageUrl)
|
||||
@@ -413,14 +444,36 @@ func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
|
||||
return nil, err
|
||||
}
|
||||
infoUrl.Path = path.Join(infoUrl.Path, "..", "..", "info")
|
||||
resp, err := http.Get(infoUrl.String())
|
||||
resp, err := c.client.Get(infoUrl.String())
|
||||
if err == nil {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("Invalid status code for info request: %d", resp.StatusCode)
|
||||
}
|
||||
err = readJson(resp, &infos)
|
||||
if err == nil {
|
||||
c.authLock.Lock()
|
||||
c.swiftInfo = infos
|
||||
c.authLock.Unlock()
|
||||
}
|
||||
return infos, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (c *Connection) cachedQueryInfo() (infos SwiftInfo, err error) {
|
||||
c.authLock.Lock()
|
||||
infos = c.swiftInfo
|
||||
c.authLock.Unlock()
|
||||
if infos == nil {
|
||||
infos, err = c.QueryInfo()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// RequestOpts contains parameters for Connection.storage.
|
||||
type RequestOpts struct {
|
||||
Container string
|
||||
@@ -796,14 +849,15 @@ func (c *Connection) ObjectNames(container string, opts *ObjectsOpts) ([]string,
|
||||
|
||||
// Object contains information about an object
|
||||
type Object struct {
|
||||
Name string `json:"name"` // object name
|
||||
ContentType string `json:"content_type"` // eg application/directory
|
||||
Bytes int64 `json:"bytes"` // size in bytes
|
||||
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
|
||||
LastModified time.Time // Last modified time converted to a time.Time
|
||||
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
|
||||
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
|
||||
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
|
||||
Name string `json:"name"` // object name
|
||||
ContentType string `json:"content_type"` // eg application/directory
|
||||
Bytes int64 `json:"bytes"` // size in bytes
|
||||
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
|
||||
LastModified time.Time // Last modified time converted to a time.Time
|
||||
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
|
||||
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
|
||||
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
|
||||
ObjectType ObjectType // type of this object
|
||||
}
|
||||
|
||||
// Objects returns a slice of Object with information about each
|
||||
@@ -1215,7 +1269,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
||||
}
|
||||
// Run the PUT in the background piping it data
|
||||
go func() {
|
||||
file.resp, file.headers, file.err = c.storage(RequestOpts{
|
||||
opts := RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
@@ -1223,7 +1277,8 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
||||
Body: pipeReader,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
})
|
||||
}
|
||||
file.resp, file.headers, file.err = c.storage(opts)
|
||||
// Signal finished
|
||||
pipeReader.Close()
|
||||
close(file.done)
|
||||
@@ -1231,6 +1286,37 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers, parameters url.Values) (headers Headers, err error) {
|
||||
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
|
||||
hash := md5.New()
|
||||
var body io.Reader = contents
|
||||
if checkHash {
|
||||
body = io.TeeReader(contents, hash)
|
||||
}
|
||||
_, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
Headers: extraHeaders,
|
||||
Body: body,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
Parameters: parameters,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if checkHash {
|
||||
receivedMd5 := strings.ToLower(headers["Etag"])
|
||||
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
|
||||
if receivedMd5 != calculatedMd5 {
|
||||
err = ObjectCorrupted
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ObjectPut creates or updates the path in the container from
|
||||
// contents. contents should be an open io.Reader which will have all
|
||||
// its contents read.
|
||||
@@ -1253,33 +1339,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
||||
// If contentType is set it will be used, otherwise one will be
|
||||
// guessed from objectName using mime.TypeByExtension
|
||||
func (c *Connection) ObjectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers) (headers Headers, err error) {
|
||||
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
|
||||
hash := md5.New()
|
||||
var body io.Reader = contents
|
||||
if checkHash {
|
||||
body = io.TeeReader(contents, hash)
|
||||
}
|
||||
_, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
Headers: extraHeaders,
|
||||
Body: body,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if checkHash {
|
||||
receivedMd5 := strings.ToLower(headers["Etag"])
|
||||
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
|
||||
if receivedMd5 != calculatedMd5 {
|
||||
err = ObjectCorrupted
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
return c.objectPut(container, objectName, contents, checkHash, Hash, contentType, h, nil)
|
||||
}
|
||||
|
||||
// ObjectPutBytes creates an object from a []byte in a container.
|
||||
@@ -1287,7 +1347,8 @@ func (c *Connection) ObjectPut(container string, objectName string, contents io.
|
||||
// This is a simplified interface which checks the MD5.
|
||||
func (c *Connection) ObjectPutBytes(container string, objectName string, contents []byte, contentType string) (err error) {
|
||||
buf := bytes.NewBuffer(contents)
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
|
||||
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1296,7 +1357,8 @@ func (c *Connection) ObjectPutBytes(container string, objectName string, content
|
||||
// This is a simplified interface which checks the MD5
|
||||
func (c *Connection) ObjectPutString(container string, objectName string, contents string, contentType string) (err error) {
|
||||
buf := strings.NewReader(contents)
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
|
||||
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1441,6 +1503,57 @@ func (file *ObjectOpenFile) Close() (err error) {
|
||||
var _ io.ReadCloser = &ObjectOpenFile{}
|
||||
var _ io.Seeker = &ObjectOpenFile{}
|
||||
|
||||
func (c *Connection) objectOpenBase(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
opts := RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "GET",
|
||||
ErrorMap: objectErrorMap,
|
||||
Headers: h,
|
||||
Parameters: parameters,
|
||||
}
|
||||
resp, headers, err = c.storage(opts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
|
||||
if checkHash && headers.IsLargeObject() {
|
||||
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
|
||||
checkHash = false
|
||||
}
|
||||
file = &ObjectOpenFile{
|
||||
connection: c,
|
||||
container: container,
|
||||
objectName: objectName,
|
||||
headers: h,
|
||||
resp: resp,
|
||||
checkHash: checkHash,
|
||||
body: resp.Body,
|
||||
}
|
||||
if checkHash {
|
||||
file.hash = md5.New()
|
||||
file.body = io.TeeReader(resp.Body, file.hash)
|
||||
}
|
||||
// Read Content-Length
|
||||
if resp.Header.Get("Content-Length") != "" {
|
||||
file.length, err = getInt64FromHeader(resp, "Content-Length")
|
||||
file.lengthOk = (err == nil)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectOpen(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
err = withLORetry(0, func() (Headers, int64, error) {
|
||||
file, headers, err = c.objectOpenBase(container, objectName, checkHash, h, parameters)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, file.length, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// ObjectOpen returns an ObjectOpenFile for reading the contents of
|
||||
// the object. This satisfies the io.ReadCloser and the io.Seeker
|
||||
// interfaces.
|
||||
@@ -1465,41 +1578,7 @@ var _ io.Seeker = &ObjectOpenFile{}
|
||||
//
|
||||
// headers["Content-Type"] will give the content type if desired.
|
||||
func (c *Connection) ObjectOpen(container string, objectName string, checkHash bool, h Headers) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
resp, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "GET",
|
||||
ErrorMap: objectErrorMap,
|
||||
Headers: h,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
|
||||
if checkHash && (headers["X-Object-Manifest"] != "" || headers["X-Static-Large-Object"] != "") {
|
||||
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
|
||||
checkHash = false
|
||||
}
|
||||
file = &ObjectOpenFile{
|
||||
connection: c,
|
||||
container: container,
|
||||
objectName: objectName,
|
||||
headers: h,
|
||||
resp: resp,
|
||||
checkHash: checkHash,
|
||||
body: resp.Body,
|
||||
}
|
||||
if checkHash {
|
||||
file.hash = md5.New()
|
||||
file.body = io.TeeReader(resp.Body, file.hash)
|
||||
}
|
||||
// Read Content-Length
|
||||
if resp.Header.Get("Content-Length") != "" {
|
||||
file.length, err = getInt64FromHeader(resp, "Content-Length")
|
||||
file.lengthOk = (err == nil)
|
||||
}
|
||||
return
|
||||
return c.objectOpen(container, objectName, checkHash, h, nil)
|
||||
}
|
||||
|
||||
// ObjectGet gets the object into the io.Writer contents.
|
||||
@@ -1602,19 +1681,10 @@ type BulkDeleteResult struct {
|
||||
Headers Headers // Response HTTP headers.
|
||||
}
|
||||
|
||||
// BulkDelete deletes multiple objectNames from container in one operation.
|
||||
//
|
||||
// Some servers may not accept bulk-delete requests since bulk-delete is
|
||||
// an optional feature of swift - these will return the Forbidden error.
|
||||
//
|
||||
// See also:
|
||||
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
|
||||
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
|
||||
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
|
||||
func (c *Connection) doBulkDelete(objects []string) (result BulkDeleteResult, err error) {
|
||||
var buffer bytes.Buffer
|
||||
for _, s := range objectNames {
|
||||
buffer.WriteString(fmt.Sprintf("/%s/%s\n", container,
|
||||
url.QueryEscape(s)))
|
||||
for _, s := range objects {
|
||||
buffer.WriteString(url.QueryEscape(s) + "\n")
|
||||
}
|
||||
resp, headers, err := c.storage(RequestOpts{
|
||||
Operation: "DELETE",
|
||||
@@ -1655,6 +1725,22 @@ func (c *Connection) BulkDelete(container string, objectNames []string) (result
|
||||
return
|
||||
}
|
||||
|
||||
// BulkDelete deletes multiple objectNames from container in one operation.
|
||||
//
|
||||
// Some servers may not accept bulk-delete requests since bulk-delete is
|
||||
// an optional feature of swift - these will return the Forbidden error.
|
||||
//
|
||||
// See also:
|
||||
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
|
||||
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
|
||||
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
|
||||
fullPaths := make([]string, len(objectNames))
|
||||
for i, name := range objectNames {
|
||||
fullPaths[i] = fmt.Sprintf("/%s/%s", container, name)
|
||||
}
|
||||
return c.doBulkDelete(fullPaths)
|
||||
}
|
||||
|
||||
// BulkUploadResult stores results of BulkUpload().
|
||||
//
|
||||
// Individual errors may (or may not) be returned by Errors.
|
||||
@@ -1738,6 +1824,17 @@ func (c *Connection) BulkUpload(uploadPath string, dataStream io.Reader, format
|
||||
//
|
||||
// Use headers.ObjectMetadata() to read the metadata in the Headers.
|
||||
func (c *Connection) Object(container string, objectName string) (info Object, headers Headers, err error) {
|
||||
err = withLORetry(0, func() (Headers, int64, error) {
|
||||
info, headers, err = c.objectBase(container, objectName)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, info.Bytes, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectBase(container string, objectName string) (info Object, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
resp, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
@@ -1778,6 +1875,12 @@ func (c *Connection) Object(container string, objectName string) (info Object, h
|
||||
}
|
||||
|
||||
info.Hash = resp.Header.Get("Etag")
|
||||
if resp.Header.Get("X-Object-Manifest") != "" {
|
||||
info.ObjectType = DynamicLargeObjectType
|
||||
} else if resp.Header.Get("X-Static-Large-Object") != "" {
|
||||
info.ObjectType = StaticLargeObjectType
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
22
vendor/github.com/ncw/swift/travis_realserver.sh
generated
vendored
Executable file
22
vendor/github.com/ncw/swift/travis_realserver.sh
generated
vendored
Executable file
@@ -0,0 +1,22 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
if [ ! "${TRAVIS_BRANCH}" = "master" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ "${TEST_REAL_SERVER}" = "rackspace" ] && [ ! -z "${RACKSPACE_APIKEY}" ]; then
|
||||
echo "Running tests pointing to Rackspace"
|
||||
export SWIFT_API_KEY=$RACKSPACE_APIKEY
|
||||
export SWIFT_API_USER=$RACKSPACE_USER
|
||||
export SWIFT_AUTH_URL=$RACKSPACE_AUTH
|
||||
go test ./...
|
||||
fi
|
||||
|
||||
if [ "${TEST_REAL_SERVER}" = "memset" ] && [ ! -z "${MEMSET_APIKEY}" ]; then
|
||||
echo "Running tests pointing to Memset"
|
||||
export SWIFT_API_KEY=$MEMSET_APIKEY
|
||||
export SWIFT_API_USER=$MEMSET_USER
|
||||
export SWIFT_AUTH_URL=$MEMSET_AUTH
|
||||
go test
|
||||
fi
|
||||
43
vendor/github.com/ncw/swift/watchdog_reader.go
generated
vendored
43
vendor/github.com/ncw/swift/watchdog_reader.go
generated
vendored
@@ -5,29 +5,50 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var watchdogChunkSize = 1 << 20 // 1 MiB
|
||||
|
||||
// An io.Reader which resets a watchdog timer whenever data is read
|
||||
type watchdogReader struct {
|
||||
timeout time.Duration
|
||||
reader io.Reader
|
||||
timer *time.Timer
|
||||
timeout time.Duration
|
||||
reader io.Reader
|
||||
timer *time.Timer
|
||||
chunkSize int
|
||||
}
|
||||
|
||||
// Returns a new reader which will kick the watchdog timer whenever data is read
|
||||
func newWatchdogReader(reader io.Reader, timeout time.Duration, timer *time.Timer) *watchdogReader {
|
||||
return &watchdogReader{
|
||||
timeout: timeout,
|
||||
reader: reader,
|
||||
timer: timer,
|
||||
timeout: timeout,
|
||||
reader: reader,
|
||||
timer: timer,
|
||||
chunkSize: watchdogChunkSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p
|
||||
func (t *watchdogReader) Read(p []byte) (n int, err error) {
|
||||
// FIXME limit the amount of data read in one chunk so as to not exceed the timeout?
|
||||
func (t *watchdogReader) Read(p []byte) (int, error) {
|
||||
//read from underlying reader in chunks not larger than t.chunkSize
|
||||
//while resetting the watchdog timer before every read; the small chunk
|
||||
//size ensures that the timer does not fire when reading a large amount of
|
||||
//data from a slow connection
|
||||
start := 0
|
||||
end := len(p)
|
||||
for start < end {
|
||||
length := end - start
|
||||
if length > t.chunkSize {
|
||||
length = t.chunkSize
|
||||
}
|
||||
|
||||
resetTimer(t.timer, t.timeout)
|
||||
n, err := t.reader.Read(p[start : start+length])
|
||||
start += n
|
||||
if n == 0 || err != nil {
|
||||
return start, err
|
||||
}
|
||||
}
|
||||
|
||||
resetTimer(t.timer, t.timeout)
|
||||
n, err = t.reader.Read(p)
|
||||
resetTimer(t.timer, t.timeout)
|
||||
return
|
||||
return start, nil
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
|
||||
Reference in New Issue
Block a user