2ff36fde57
1. update hacking version to latest 2. fix pep8 failed Change-Id: Ifc3bfeff4038c93d8c8cf2c9d7814c3003e73504
2954 lines
112 KiB
Python
2954 lines
112 KiB
Python
# Copyright (c) 2010-2013 OpenStack, LLC.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from __future__ import unicode_literals
|
|
import logging
|
|
|
|
import os
|
|
|
|
from concurrent.futures import as_completed, CancelledError, TimeoutError
|
|
from copy import deepcopy
|
|
from errno import EEXIST, ENOENT
|
|
from hashlib import md5
|
|
from os import environ, makedirs, stat, utime
|
|
from os.path import (
|
|
basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
|
|
)
|
|
from posixpath import join as urljoin
|
|
from random import shuffle
|
|
from time import time
|
|
from threading import Thread
|
|
from six import Iterator, StringIO, string_types, text_type
|
|
from six.moves.queue import Queue
|
|
from six.moves.queue import Empty as QueueEmpty
|
|
from six.moves.urllib.parse import quote
|
|
|
|
import json
|
|
|
|
|
|
from swiftclient import Connection
|
|
from swiftclient.command_helpers import (
|
|
stat_account, stat_container, stat_object
|
|
)
|
|
from swiftclient.utils import (
|
|
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
|
|
parse_api_response, report_traceback, n_groups, split_request_headers,
|
|
n_at_a_time
|
|
)
|
|
from swiftclient.exceptions import ClientException
|
|
from swiftclient.multithreading import MultiThreadingManager
|
|
|
|
|
|
DISK_BUFFER = 2 ** 16
|
|
logger = logging.getLogger("swiftclient.service")
|
|
|
|
|
|
class ResultsIterator(Iterator):
|
|
def __init__(self, futures):
|
|
self.futures = interruptable_as_completed(futures)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
next_completed_future = next(self.futures)
|
|
return next_completed_future.result()
|
|
|
|
|
|
class SwiftError(Exception):
|
|
def __init__(self, value, container=None, obj=None,
|
|
segment=None, exc=None):
|
|
self.value = value
|
|
self.container = container
|
|
self.obj = obj
|
|
self.segment = segment
|
|
self.exception = exc
|
|
|
|
def __str__(self):
|
|
value = repr(self.value)
|
|
if self.container is not None:
|
|
value += " container:%s" % self.container
|
|
if self.obj is not None:
|
|
value += " object:%s" % self.obj
|
|
if self.segment is not None:
|
|
value += " segment:%s" % self.segment
|
|
return value
|
|
|
|
|
|
def process_options(options):
|
|
# tolerate sloppy auth_version
|
|
if options.get('auth_version') == '3.0':
|
|
options['auth_version'] = '3'
|
|
elif options.get('auth_version') == '2':
|
|
options['auth_version'] = '2.0'
|
|
|
|
if options.get('auth_version') not in ('2.0', '3') and not all(
|
|
options.get(key) for key in ('auth', 'user', 'key')):
|
|
# Use keystone auth if any of the new-style args are present
|
|
if any(options.get(k) for k in (
|
|
'os_user_domain_id',
|
|
'os_user_domain_name',
|
|
'os_project_domain_id',
|
|
'os_project_domain_name')):
|
|
# Use v3 if there's any reference to domains
|
|
options['auth_version'] = '3'
|
|
else:
|
|
options['auth_version'] = '2.0'
|
|
|
|
# Use new-style args if old ones not present
|
|
if not options['auth'] and options['os_auth_url']:
|
|
options['auth'] = options['os_auth_url']
|
|
if not options['user'] and options['os_username']:
|
|
options['user'] = options['os_username']
|
|
if not options['key'] and options['os_password']:
|
|
options['key'] = options['os_password']
|
|
|
|
# Specific OpenStack options
|
|
options['os_options'] = {
|
|
'user_id': options['os_user_id'],
|
|
'user_domain_id': options['os_user_domain_id'],
|
|
'user_domain_name': options['os_user_domain_name'],
|
|
'tenant_id': options['os_tenant_id'],
|
|
'tenant_name': options['os_tenant_name'],
|
|
'project_id': options['os_project_id'],
|
|
'project_name': options['os_project_name'],
|
|
'project_domain_id': options['os_project_domain_id'],
|
|
'project_domain_name': options['os_project_domain_name'],
|
|
'service_type': options['os_service_type'],
|
|
'endpoint_type': options['os_endpoint_type'],
|
|
'auth_token': options['os_auth_token'],
|
|
'object_storage_url': options['os_storage_url'],
|
|
'region_name': options['os_region_name'],
|
|
}
|
|
|
|
|
|
def _build_default_global_options():
|
|
return {
|
|
"snet": False,
|
|
"verbose": 1,
|
|
"debug": False,
|
|
"info": False,
|
|
"auth": environ.get('ST_AUTH'),
|
|
"auth_version": environ.get('ST_AUTH_VERSION', '1.0'),
|
|
"user": environ.get('ST_USER'),
|
|
"key": environ.get('ST_KEY'),
|
|
"retries": 5,
|
|
"force_auth_retry": False,
|
|
"os_username": environ.get('OS_USERNAME'),
|
|
"os_user_id": environ.get('OS_USER_ID'),
|
|
"os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
|
|
"os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
|
|
"os_password": environ.get('OS_PASSWORD'),
|
|
"os_tenant_id": environ.get('OS_TENANT_ID'),
|
|
"os_tenant_name": environ.get('OS_TENANT_NAME'),
|
|
"os_project_name": environ.get('OS_PROJECT_NAME'),
|
|
"os_project_id": environ.get('OS_PROJECT_ID'),
|
|
"os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
|
|
"os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
|
|
"os_auth_url": environ.get('OS_AUTH_URL'),
|
|
"os_auth_token": environ.get('OS_AUTH_TOKEN'),
|
|
"os_storage_url": environ.get('OS_STORAGE_URL'),
|
|
"os_region_name": environ.get('OS_REGION_NAME'),
|
|
"os_service_type": environ.get('OS_SERVICE_TYPE'),
|
|
"os_endpoint_type": environ.get('OS_ENDPOINT_TYPE'),
|
|
"os_cacert": environ.get('OS_CACERT'),
|
|
"os_cert": environ.get('OS_CERT'),
|
|
"os_key": environ.get('OS_KEY'),
|
|
"insecure": config_true_value(environ.get('SWIFTCLIENT_INSECURE')),
|
|
"ssl_compression": False,
|
|
'segment_threads': 10,
|
|
'object_dd_threads': 10,
|
|
'object_uu_threads': 10,
|
|
'container_threads': 10
|
|
}
|
|
|
|
_default_global_options = _build_default_global_options()
|
|
|
|
_default_local_options = {
|
|
'sync_to': None,
|
|
'sync_key': None,
|
|
'use_slo': False,
|
|
'segment_size': None,
|
|
'segment_container': None,
|
|
'leave_segments': False,
|
|
'changed': None,
|
|
'skip_identical': False,
|
|
'yes_all': False,
|
|
'read_acl': None,
|
|
'write_acl': None,
|
|
'out_file': None,
|
|
'out_directory': None,
|
|
'remove_prefix': False,
|
|
'no_download': False,
|
|
'long': False,
|
|
'totals': False,
|
|
'marker': '',
|
|
'header': [],
|
|
'meta': [],
|
|
'prefix': None,
|
|
'delimiter': None,
|
|
'fail_fast': False,
|
|
'human': False,
|
|
'dir_marker': False,
|
|
'checksum': True,
|
|
'shuffle': False,
|
|
'destination': None,
|
|
'fresh_metadata': False,
|
|
'ignore_mtime': False,
|
|
}
|
|
|
|
POLICY = 'X-Storage-Policy'
|
|
KNOWN_DIR_MARKERS = (
|
|
'application/directory', # Preferred
|
|
'text/directory', # Historically relevant
|
|
)
|
|
|
|
|
|
def get_from_queue(q, timeout=864000):
|
|
while True:
|
|
try:
|
|
item = q.get(timeout=timeout)
|
|
return item
|
|
except QueueEmpty:
|
|
# Do nothing here, we only have a timeout to allow interruption
|
|
pass
|
|
|
|
|
|
def get_future_result(f, timeout=86400):
|
|
while True:
|
|
try:
|
|
res = f.result(timeout=timeout)
|
|
return res
|
|
except TimeoutError:
|
|
# Do nothing here, we only have a timeout to allow interruption
|
|
pass
|
|
|
|
|
|
def interruptable_as_completed(fs, timeout=86400):
|
|
while True:
|
|
try:
|
|
for f in as_completed(fs, timeout=timeout):
|
|
fs.remove(f)
|
|
yield f
|
|
return
|
|
except TimeoutError:
|
|
# Do nothing here, we only have a timeout to allow interruption
|
|
pass
|
|
|
|
|
|
def get_conn(options):
|
|
"""
|
|
Return a connection building it from the options.
|
|
"""
|
|
return Connection(options['auth'],
|
|
options['user'],
|
|
options['key'],
|
|
options['retries'],
|
|
auth_version=options['auth_version'],
|
|
os_options=options['os_options'],
|
|
snet=options['snet'],
|
|
cacert=options['os_cacert'],
|
|
insecure=options['insecure'],
|
|
cert=options['os_cert'],
|
|
cert_key=options['os_key'],
|
|
ssl_compression=options['ssl_compression'],
|
|
force_auth_retry=options['force_auth_retry'])
|
|
|
|
|
|
def mkdirs(path):
|
|
try:
|
|
makedirs(path)
|
|
except OSError as err:
|
|
if err.errno != EEXIST:
|
|
raise
|
|
|
|
|
|
def split_headers(options, prefix=''):
|
|
"""
|
|
Splits 'Key: Value' strings and returns them as a dictionary.
|
|
|
|
:param options: Must be one of:
|
|
* an iterable of 'Key: Value' strings
|
|
* an iterable of ('Key', 'Value') pairs
|
|
* a dict of {'Key': 'Value'} pairs
|
|
:param prefix: String to prepend to all of the keys in the dictionary.
|
|
reporting.
|
|
"""
|
|
headers = {}
|
|
try:
|
|
headers = split_request_headers(options, prefix)
|
|
except ValueError as e:
|
|
raise SwiftError(e)
|
|
|
|
return headers
|
|
|
|
|
|
class SwiftUploadObject(object):
|
|
"""
|
|
Class for specifying an object upload, allowing the object source, name and
|
|
options to be specified separately for each individual object.
|
|
"""
|
|
def __init__(self, source, object_name=None, options=None):
|
|
if isinstance(source, string_types):
|
|
self.object_name = object_name or source
|
|
elif source is None or hasattr(source, 'read'):
|
|
if not object_name or not isinstance(object_name, string_types):
|
|
raise SwiftError('Object names must be specified as '
|
|
'strings for uploads from None or file '
|
|
'like objects.')
|
|
self.object_name = object_name
|
|
else:
|
|
raise SwiftError('Unexpected source type for '
|
|
'SwiftUploadObject: {0}'.format(type(source)))
|
|
|
|
if not self.object_name:
|
|
raise SwiftError('Object names must not be empty strings')
|
|
|
|
self.object_name = self.object_name.lstrip('/')
|
|
self.options = options
|
|
self.source = source
|
|
|
|
|
|
class SwiftPostObject(object):
|
|
"""
|
|
Class for specifying an object post, allowing the headers/metadata to be
|
|
specified separately for each individual object.
|
|
"""
|
|
def __init__(self, object_name, options=None):
|
|
if not (isinstance(object_name, string_types) and object_name):
|
|
raise SwiftError(
|
|
"Object names must be specified as non-empty strings"
|
|
)
|
|
self.object_name = object_name
|
|
self.options = options
|
|
|
|
|
|
class SwiftCopyObject(object):
|
|
"""
|
|
Class for specifying an object copy,
|
|
allowing the destination/headers/metadata/fresh_metadata to be specified
|
|
separately for each individual object.
|
|
destination and fresh_metadata should be set in options
|
|
"""
|
|
def __init__(self, object_name, options=None):
|
|
if not (isinstance(object_name, string_types) and object_name):
|
|
raise SwiftError(
|
|
"Object names must be specified as non-empty strings"
|
|
)
|
|
|
|
self.object_name = object_name
|
|
self.options = options
|
|
|
|
if self.options is None:
|
|
self.destination = None
|
|
self.fresh_metadata = False
|
|
else:
|
|
self.destination = self.options.get('destination')
|
|
self.fresh_metadata = self.options.get('fresh_metadata', False)
|
|
|
|
if self.destination is not None:
|
|
destination_components = self.destination.split('/')
|
|
if destination_components[0] or len(destination_components) < 2:
|
|
raise SwiftError("destination must be in format /cont[/obj]")
|
|
if not destination_components[-1]:
|
|
raise SwiftError("destination must not end in a slash")
|
|
if len(destination_components) == 2:
|
|
# only container set in destination
|
|
self.destination = "{0}/{1}".format(
|
|
self.destination, object_name
|
|
)
|
|
|
|
|
|
class _SwiftReader(object):
|
|
"""
|
|
Class for downloading objects from swift and raising appropriate
|
|
errors on failures caused by either invalid md5sum or size of the
|
|
data read.
|
|
"""
|
|
def __init__(self, path, body, headers, checksum=True):
|
|
self._path = path
|
|
self._body = body
|
|
self._actual_read = 0
|
|
self._content_length = None
|
|
self._actual_md5 = None
|
|
self._expected_md5 = headers.get('etag', '')
|
|
|
|
if len(self._expected_md5) > 1 and self._expected_md5[0] == '"' \
|
|
and self._expected_md5[-1] == '"':
|
|
self._expected_md5 = self._expected_md5[1:-1]
|
|
|
|
# Some headers indicate the MD5 of the response
|
|
# definitely *won't* match the ETag
|
|
bad_md5_headers = set([
|
|
'content-range',
|
|
'x-object-manifest',
|
|
'x-static-large-object',
|
|
])
|
|
if bad_md5_headers.intersection(headers):
|
|
# This isn't a useful checksum
|
|
self._expected_md5 = ''
|
|
|
|
if self._expected_md5 and checksum:
|
|
self._actual_md5 = md5()
|
|
|
|
if 'content-length' in headers:
|
|
try:
|
|
self._content_length = int(headers.get('content-length'))
|
|
except ValueError:
|
|
raise SwiftError('content-length header must be an integer')
|
|
|
|
def __iter__(self):
|
|
for chunk in self._body:
|
|
if self._actual_md5:
|
|
self._actual_md5.update(chunk)
|
|
self._actual_read += len(chunk)
|
|
yield chunk
|
|
self._check_contents()
|
|
|
|
def _check_contents(self):
|
|
if self._actual_md5 and self._expected_md5:
|
|
etag = self._actual_md5.hexdigest()
|
|
if etag != self._expected_md5:
|
|
raise SwiftError('Error downloading {0}: md5sum != etag, '
|
|
'{1} != {2}'.format(
|
|
self._path, etag, self._expected_md5))
|
|
|
|
if (self._content_length is not None and
|
|
self._actual_read != self._content_length):
|
|
raise SwiftError('Error downloading {0}: read_length != '
|
|
'content_length, {1:d} != {2:d}'.format(
|
|
self._path, self._actual_read,
|
|
self._content_length))
|
|
|
|
def bytes_read(self):
|
|
return self._actual_read
|
|
|
|
|
|
class SwiftService(object):
|
|
"""
|
|
Service for performing swift operations
|
|
"""
|
|
def __init__(self, options=None):
|
|
if options is not None:
|
|
self._options = dict(
|
|
_default_global_options,
|
|
**dict(_default_local_options, **options)
|
|
)
|
|
else:
|
|
self._options = dict(
|
|
_default_global_options,
|
|
**_default_local_options
|
|
)
|
|
process_options(self._options)
|
|
create_connection = lambda: get_conn(self._options)
|
|
self.thread_manager = MultiThreadingManager(
|
|
create_connection,
|
|
segment_threads=self._options['segment_threads'],
|
|
object_dd_threads=self._options['object_dd_threads'],
|
|
object_uu_threads=self._options['object_uu_threads'],
|
|
container_threads=self._options['container_threads']
|
|
)
|
|
self.capabilities_cache = {} # Each instance should have its own cache
|
|
|
|
def __enter__(self):
|
|
self.thread_manager.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.thread_manager.__exit__(exc_type, exc_val, exc_tb)
|
|
|
|
# Stat related methods
|
|
#
|
|
def stat(self, container=None, objects=None, options=None):
|
|
"""
|
|
Get account stats, container stats or information about a list of
|
|
objects in a container.
|
|
|
|
:param container: The container to query.
|
|
:param objects: A list of object paths about which to return
|
|
information (a list of strings).
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation.
|
|
These options are applied to all stat operations
|
|
performed by this call::
|
|
|
|
{
|
|
'human': False,
|
|
'header': []
|
|
}
|
|
|
|
:returns: Either a single dictionary containing stats about an account
|
|
or container, or an iterator for returning the results of the
|
|
stat operations on a list of objects.
|
|
|
|
:raises SwiftError:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
if not container:
|
|
if objects:
|
|
raise SwiftError('Objects specified without container')
|
|
else:
|
|
res = {
|
|
'action': 'stat_account',
|
|
'success': True,
|
|
'container': container,
|
|
'object': None,
|
|
}
|
|
try:
|
|
stats_future = self.thread_manager.container_pool.submit(
|
|
stat_account, options
|
|
)
|
|
items, headers = get_future_result(stats_future)
|
|
res.update({
|
|
'items': items,
|
|
'headers': headers
|
|
})
|
|
return res
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
raise SwiftError('Account not found', exc=err)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
else:
|
|
if not objects:
|
|
res = {
|
|
'action': 'stat_container',
|
|
'container': container,
|
|
'object': None,
|
|
'success': True,
|
|
}
|
|
try:
|
|
stats_future = self.thread_manager.container_pool.submit(
|
|
stat_container, options, container
|
|
)
|
|
items, headers = get_future_result(stats_future)
|
|
res.update({
|
|
'items': items,
|
|
'headers': headers
|
|
})
|
|
return res
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
raise SwiftError('Container %r not found' % container,
|
|
container=container, exc=err)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
else:
|
|
stat_futures = []
|
|
for stat_o in objects:
|
|
stat_future = self.thread_manager.object_dd_pool.submit(
|
|
self._stat_object, container, stat_o, options
|
|
)
|
|
stat_futures.append(stat_future)
|
|
|
|
return ResultsIterator(stat_futures)
|
|
|
|
@staticmethod
|
|
def _stat_object(conn, container, obj, options):
|
|
res = {
|
|
'action': 'stat_object',
|
|
'object': obj,
|
|
'container': container,
|
|
'success': True,
|
|
}
|
|
try:
|
|
items, headers = stat_object(conn, options, container, obj)
|
|
res.update({
|
|
'items': items,
|
|
'headers': headers
|
|
})
|
|
return res
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
|
|
# Post related methods
|
|
#
|
|
def post(self, container=None, objects=None, options=None):
|
|
"""
|
|
Post operations on an account, container or list of objects
|
|
|
|
:param container: The container to make the post operation against.
|
|
:param objects: A list of object names (strings) or SwiftPostObject
|
|
instances containing an object name, and an
|
|
options dict (can be None) to override the options for
|
|
that individual post operation::
|
|
|
|
[
|
|
'object_name',
|
|
SwiftPostObject('object_name', options={...}),
|
|
...
|
|
]
|
|
|
|
The options dict is described below.
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation.
|
|
These options are applied to all post operations
|
|
performed by this call, unless overridden on a per
|
|
object basis. Possible options are given below::
|
|
|
|
{
|
|
'meta': [],
|
|
'header': [],
|
|
'read_acl': None, # For containers only
|
|
'write_acl': None, # For containers only
|
|
'sync_to': None, # For containers only
|
|
'sync_key': None # For containers only
|
|
}
|
|
|
|
:returns: Either a single result dictionary in the case of a post to a
|
|
container/account, or an iterator for returning the results
|
|
of posts to a list of objects.
|
|
|
|
:raises SwiftError:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
res = {
|
|
'success': True,
|
|
'container': container,
|
|
'object': None,
|
|
'headers': {},
|
|
}
|
|
if not container:
|
|
res["action"] = "post_account"
|
|
if objects:
|
|
raise SwiftError('Objects specified without container')
|
|
else:
|
|
response_dict = {}
|
|
headers = split_headers(
|
|
options['meta'], 'X-Account-Meta-')
|
|
headers.update(
|
|
split_headers(options['header'], ''))
|
|
res['headers'] = headers
|
|
try:
|
|
post = self.thread_manager.container_pool.submit(
|
|
self._post_account_job, headers, response_dict
|
|
)
|
|
get_future_result(post)
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': response_dict
|
|
})
|
|
return res
|
|
raise SwiftError('Account not found', exc=err)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'response_dict': response_dict,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
if not objects:
|
|
res["action"] = "post_container"
|
|
response_dict = {}
|
|
headers = split_headers(
|
|
options['meta'], 'X-Container-Meta-')
|
|
headers.update(
|
|
split_headers(options['header'], ''))
|
|
if options['read_acl'] is not None:
|
|
headers['X-Container-Read'] = options['read_acl']
|
|
if options['write_acl'] is not None:
|
|
headers['X-Container-Write'] = options['write_acl']
|
|
if options['sync_to'] is not None:
|
|
headers['X-Container-Sync-To'] = options['sync_to']
|
|
if options['sync_key'] is not None:
|
|
headers['X-Container-Sync-Key'] = options['sync_key']
|
|
res['headers'] = headers
|
|
try:
|
|
post = self.thread_manager.container_pool.submit(
|
|
self._post_container_job, container,
|
|
headers, response_dict
|
|
)
|
|
get_future_result(post)
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'action': 'post_container',
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': response_dict
|
|
})
|
|
return res
|
|
raise SwiftError(
|
|
"Container '%s' not found" % container,
|
|
container=container, exc=err
|
|
)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'action': 'post_container',
|
|
'success': False,
|
|
'error': err,
|
|
'response_dict': response_dict,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
else:
|
|
post_futures = []
|
|
post_objects = self._make_post_objects(objects)
|
|
for post_object in post_objects:
|
|
obj = post_object.object_name
|
|
obj_options = post_object.options
|
|
response_dict = {}
|
|
headers = split_headers(
|
|
options['meta'], 'X-Object-Meta-')
|
|
# add header options to the headers object for the request.
|
|
headers.update(
|
|
split_headers(options['header'], ''))
|
|
if obj_options is not None:
|
|
if 'meta' in obj_options:
|
|
headers.update(
|
|
split_headers(
|
|
obj_options['meta'], 'X-Object-Meta-'
|
|
)
|
|
)
|
|
if 'header' in obj_options:
|
|
headers.update(
|
|
split_headers(obj_options['header'], '')
|
|
)
|
|
|
|
post = self.thread_manager.object_uu_pool.submit(
|
|
self._post_object_job, container, obj,
|
|
headers, response_dict
|
|
)
|
|
post_futures.append(post)
|
|
|
|
return ResultsIterator(post_futures)
|
|
|
|
@staticmethod
|
|
def _make_post_objects(objects):
|
|
post_objects = []
|
|
|
|
for o in objects:
|
|
if isinstance(o, string_types):
|
|
obj = SwiftPostObject(o)
|
|
post_objects.append(obj)
|
|
elif isinstance(o, SwiftPostObject):
|
|
post_objects.append(o)
|
|
else:
|
|
raise SwiftError(
|
|
"The post operation takes only strings or "
|
|
"SwiftPostObjects as input",
|
|
obj=o)
|
|
|
|
return post_objects
|
|
|
|
@staticmethod
|
|
def _post_account_job(conn, headers, result):
|
|
return conn.post_account(headers=headers, response_dict=result)
|
|
|
|
@staticmethod
|
|
def _post_container_job(conn, container, headers, result):
|
|
try:
|
|
res = conn.post_container(
|
|
container, headers=headers, response_dict=result)
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
raise
|
|
_response_dict = {}
|
|
res = conn.put_container(
|
|
container, headers=headers, response_dict=_response_dict)
|
|
result['post_put'] = _response_dict
|
|
return res
|
|
|
|
@staticmethod
|
|
def _post_object_job(conn, container, obj, headers, result):
|
|
res = {
|
|
'success': True,
|
|
'action': 'post_object',
|
|
'container': container,
|
|
'object': obj,
|
|
'headers': headers,
|
|
'response_dict': result
|
|
}
|
|
try:
|
|
conn.post_object(
|
|
container, obj, headers=headers, response_dict=result)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
|
|
return res
|
|
|
|
# List related methods
|
|
#
|
|
def list(self, container=None, options=None):
|
|
"""
|
|
List operations on an account, container.
|
|
|
|
:param container: The container to make the list operation against.
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation::
|
|
|
|
{
|
|
'long': False,
|
|
'prefix': None,
|
|
'delimiter': None,
|
|
'header': []
|
|
}
|
|
|
|
:returns: A generator for returning the results of the list operation
|
|
on an account or container. Each result yielded from the
|
|
generator is either a 'list_account_part' or
|
|
'list_container_part', containing part of the listing.
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
rq = Queue(maxsize=10) # Just stop list running away consuming memory
|
|
|
|
if container is None:
|
|
listing_future = self.thread_manager.container_pool.submit(
|
|
self._list_account_job, options, rq
|
|
)
|
|
else:
|
|
listing_future = self.thread_manager.container_pool.submit(
|
|
self._list_container_job, container, options, rq
|
|
)
|
|
|
|
res = get_from_queue(rq)
|
|
while res is not None:
|
|
yield res
|
|
res = get_from_queue(rq)
|
|
|
|
# Make sure the future has completed
|
|
get_future_result(listing_future)
|
|
|
|
@staticmethod
|
|
def _list_account_job(conn, options, result_queue):
|
|
marker = ''
|
|
error = None
|
|
req_headers = split_headers(options.get('header', []))
|
|
try:
|
|
while True:
|
|
_, items = conn.get_account(
|
|
marker=marker, prefix=options['prefix'],
|
|
headers=req_headers
|
|
)
|
|
|
|
if not items:
|
|
result_queue.put(None)
|
|
return
|
|
|
|
if options['long']:
|
|
for i in items:
|
|
name = i['name']
|
|
i['meta'] = conn.head_container(name)
|
|
|
|
res = {
|
|
'action': 'list_account_part',
|
|
'container': None,
|
|
'prefix': options['prefix'],
|
|
'success': True,
|
|
'listing': items,
|
|
'marker': marker,
|
|
}
|
|
result_queue.put(res)
|
|
|
|
marker = items[-1].get('name', items[-1].get('subdir'))
|
|
except ClientException as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
if err.http_status != 404:
|
|
error = (err, traceback, err_time)
|
|
else:
|
|
error = (
|
|
SwiftError('Account not found', exc=err),
|
|
traceback, err_time
|
|
)
|
|
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
error = (err, traceback, err_time)
|
|
|
|
res = {
|
|
'action': 'list_account_part',
|
|
'container': None,
|
|
'prefix': options['prefix'],
|
|
'success': False,
|
|
'marker': marker,
|
|
'error': error[0],
|
|
'traceback': error[1],
|
|
'error_timestamp': error[2]
|
|
}
|
|
result_queue.put(res)
|
|
result_queue.put(None)
|
|
|
|
@staticmethod
|
|
def _list_container_job(conn, container, options, result_queue):
|
|
marker = options.get('marker', '')
|
|
error = None
|
|
req_headers = split_headers(options.get('header', []))
|
|
try:
|
|
while True:
|
|
_, items = conn.get_container(
|
|
container, marker=marker, prefix=options['prefix'],
|
|
delimiter=options['delimiter'], headers=req_headers
|
|
)
|
|
|
|
if not items:
|
|
result_queue.put(None)
|
|
return
|
|
|
|
res = {
|
|
'action': 'list_container_part',
|
|
'container': container,
|
|
'prefix': options['prefix'],
|
|
'success': True,
|
|
'marker': marker,
|
|
'listing': items,
|
|
}
|
|
result_queue.put(res)
|
|
|
|
marker = items[-1].get('name', items[-1].get('subdir'))
|
|
except ClientException as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
if err.http_status != 404:
|
|
error = (err, traceback, err_time)
|
|
else:
|
|
error = (
|
|
SwiftError(
|
|
'Container %r not found' % container,
|
|
container=container, exc=err
|
|
),
|
|
traceback,
|
|
err_time
|
|
)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
error = (err, traceback, err_time)
|
|
|
|
res = {
|
|
'action': 'list_container_part',
|
|
'container': container,
|
|
'prefix': options['prefix'],
|
|
'success': False,
|
|
'marker': marker,
|
|
'error': error[0],
|
|
'traceback': error[1],
|
|
'error_timestamp': error[2]
|
|
}
|
|
result_queue.put(res)
|
|
result_queue.put(None)
|
|
|
|
# Download related methods
|
|
#
|
|
def download(self, container=None, objects=None, options=None):
|
|
"""
|
|
Download operations on an account, optional container and optional list
|
|
of objects.
|
|
|
|
:param container: The container to download from.
|
|
:param objects: A list of object names to download (a list of strings).
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation::
|
|
|
|
{
|
|
'yes_all': False,
|
|
'marker': '',
|
|
'prefix': None,
|
|
'no_download': False,
|
|
'header': [],
|
|
'skip_identical': False,
|
|
'out_directory': None,
|
|
'checksum': True,
|
|
'out_file': None,
|
|
'remove_prefix': False,
|
|
'shuffle' : False
|
|
}
|
|
|
|
:returns: A generator for returning the results of the download
|
|
operations. Each result yielded from the generator is a
|
|
'download_object' dictionary containing the results of an
|
|
individual file download.
|
|
|
|
:raises ClientException:
|
|
:raises SwiftError:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
if not container:
|
|
# Download everything if options['yes_all'] is set
|
|
if options['yes_all']:
|
|
try:
|
|
options_copy = deepcopy(options)
|
|
options_copy["long"] = False
|
|
|
|
for part in self.list(options=options_copy):
|
|
if part["success"]:
|
|
containers = [i['name'] for i in part["listing"]]
|
|
|
|
if options['shuffle']:
|
|
shuffle(containers)
|
|
|
|
for con in containers:
|
|
for res in self._download_container(
|
|
con, options_copy):
|
|
yield res
|
|
else:
|
|
raise part["error"]
|
|
|
|
# If we see a 404 here, the listing of the account failed
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
raise
|
|
raise SwiftError('Account not found', exc=err)
|
|
|
|
elif objects is None:
|
|
if '/' in container:
|
|
raise SwiftError('\'/\' in container name',
|
|
container=container)
|
|
for res in self._download_container(container, options):
|
|
yield res
|
|
|
|
else:
|
|
if '/' in container:
|
|
raise SwiftError('\'/\' in container name',
|
|
container=container)
|
|
if options['out_file'] and len(objects) > 1:
|
|
options['out_file'] = None
|
|
|
|
o_downs = [
|
|
self.thread_manager.object_dd_pool.submit(
|
|
self._download_object_job, container, obj, options
|
|
) for obj in objects
|
|
]
|
|
|
|
for o_down in interruptable_as_completed(o_downs):
|
|
yield o_down.result()
|
|
|
|
def _download_object_job(self, conn, container, obj, options):
|
|
out_file = options['out_file']
|
|
results_dict = {}
|
|
|
|
req_headers = split_headers(options['header'], '')
|
|
|
|
pseudodir = False
|
|
path = join(container, obj) if options['yes_all'] else obj
|
|
path = path.lstrip(os_path_sep)
|
|
options['skip_identical'] = (options['skip_identical'] and
|
|
out_file != '-')
|
|
|
|
if options['prefix'] and options['remove_prefix']:
|
|
path = path[len(options['prefix']):].lstrip('/')
|
|
|
|
if options['out_directory']:
|
|
path = os.path.join(options['out_directory'], path)
|
|
|
|
if options['skip_identical']:
|
|
filename = out_file if out_file else path
|
|
try:
|
|
fp = open(filename, 'rb', DISK_BUFFER)
|
|
except IOError:
|
|
pass
|
|
else:
|
|
with fp:
|
|
md5sum = md5()
|
|
while True:
|
|
data = fp.read(DISK_BUFFER)
|
|
if not data:
|
|
break
|
|
md5sum.update(data)
|
|
req_headers['If-None-Match'] = md5sum.hexdigest()
|
|
|
|
try:
|
|
start_time = time()
|
|
get_args = {'resp_chunk_size': DISK_BUFFER,
|
|
'headers': req_headers,
|
|
'response_dict': results_dict}
|
|
if options['skip_identical']:
|
|
# Assume the file is a large object; if we're wrong, the query
|
|
# string is ignored and the If-None-Match header will trigger
|
|
# the behavior we want
|
|
get_args['query_string'] = 'multipart-manifest=get'
|
|
|
|
try:
|
|
headers, body = conn.get_object(container, obj, **get_args)
|
|
except ClientException as e:
|
|
if not options['skip_identical']:
|
|
raise
|
|
if e.http_status != 304: # Only handling Not Modified
|
|
raise
|
|
|
|
headers = results_dict['headers']
|
|
if 'x-object-manifest' in headers:
|
|
# DLO: most likely it has more than one page worth of
|
|
# segments and we have an empty file locally
|
|
body = []
|
|
elif config_true_value(headers.get('x-static-large-object')):
|
|
# SLO: apparently we have a copy of the manifest locally?
|
|
# provide no chunking data to force a fresh download
|
|
body = [b'[]']
|
|
else:
|
|
# Normal object: let it bubble up
|
|
raise
|
|
|
|
if options['skip_identical']:
|
|
if config_true_value(headers.get('x-static-large-object')) or \
|
|
'x-object-manifest' in headers:
|
|
# The request was chunked, so stitch it back together
|
|
chunk_data = self._get_chunk_data(conn, container, obj,
|
|
headers, b''.join(body))
|
|
else:
|
|
chunk_data = None
|
|
|
|
if chunk_data is not None:
|
|
if self._is_identical(chunk_data, filename):
|
|
raise ClientException('Large object is identical',
|
|
http_status=304)
|
|
|
|
# Large objects are different; start the real download
|
|
del get_args['query_string']
|
|
get_args['response_dict'].clear()
|
|
headers, body = conn.get_object(container, obj, **get_args)
|
|
|
|
headers_receipt = time()
|
|
|
|
obj_body = _SwiftReader(path, body, headers,
|
|
options.get('checksum', True))
|
|
|
|
no_file = options['no_download']
|
|
if out_file == "-" and not no_file:
|
|
res = {
|
|
'action': 'download_object',
|
|
'container': container,
|
|
'object': obj,
|
|
'path': path,
|
|
'pseudodir': pseudodir,
|
|
'contents': obj_body
|
|
}
|
|
return res
|
|
|
|
fp = None
|
|
try:
|
|
content_type = headers.get('content-type', '').split(';', 1)[0]
|
|
if content_type in KNOWN_DIR_MARKERS:
|
|
make_dir = not no_file and out_file != "-"
|
|
if make_dir and not isdir(path):
|
|
mkdirs(path)
|
|
|
|
else:
|
|
make_dir = not (no_file or out_file)
|
|
if make_dir:
|
|
dirpath = dirname(path)
|
|
if dirpath and not isdir(dirpath):
|
|
mkdirs(dirpath)
|
|
|
|
if not no_file:
|
|
if out_file:
|
|
fp = open(out_file, 'wb', DISK_BUFFER)
|
|
else:
|
|
if basename(path):
|
|
fp = open(path, 'wb', DISK_BUFFER)
|
|
else:
|
|
pseudodir = True
|
|
|
|
for chunk in obj_body:
|
|
if fp is not None:
|
|
fp.write(chunk)
|
|
|
|
finish_time = time()
|
|
|
|
finally:
|
|
bytes_read = obj_body.bytes_read()
|
|
if fp is not None:
|
|
fp.close()
|
|
if ('x-object-meta-mtime' in headers and not no_file and
|
|
not options['ignore_mtime']):
|
|
try:
|
|
mtime = float(headers['x-object-meta-mtime'])
|
|
except ValueError:
|
|
pass # no real harm; couldn't trust it anyway
|
|
else:
|
|
if options['out_file']:
|
|
utime(options['out_file'], (mtime, mtime))
|
|
else:
|
|
utime(path, (mtime, mtime))
|
|
|
|
res = {
|
|
'action': 'download_object',
|
|
'success': True,
|
|
'container': container,
|
|
'object': obj,
|
|
'path': path,
|
|
'pseudodir': pseudodir,
|
|
'start_time': start_time,
|
|
'finish_time': finish_time,
|
|
'headers_receipt': headers_receipt,
|
|
'auth_end_time': conn.auth_end_time,
|
|
'read_length': bytes_read,
|
|
'attempts': conn.attempts,
|
|
'response_dict': results_dict
|
|
}
|
|
return res
|
|
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res = {
|
|
'action': 'download_object',
|
|
'container': container,
|
|
'object': obj,
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': results_dict,
|
|
'path': path,
|
|
'pseudodir': pseudodir,
|
|
'attempts': conn.attempts
|
|
}
|
|
return res
|
|
|
|
def _submit_page_downloads(self, container, page_generator, options):
|
|
try:
|
|
list_page = next(page_generator)
|
|
except StopIteration:
|
|
return None
|
|
|
|
if list_page["success"]:
|
|
objects = [o["name"] for o in list_page["listing"]]
|
|
|
|
if options["shuffle"]:
|
|
shuffle(objects)
|
|
|
|
o_downs = [
|
|
self.thread_manager.object_dd_pool.submit(
|
|
self._download_object_job, container, obj, options
|
|
) for obj in objects
|
|
]
|
|
|
|
return o_downs
|
|
else:
|
|
raise list_page["error"]
|
|
|
|
def _download_container(self, container, options):
|
|
_page_generator = self.list(container=container, options=options)
|
|
try:
|
|
next_page_downs = self._submit_page_downloads(
|
|
container, _page_generator, options
|
|
)
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
raise
|
|
raise SwiftError(
|
|
'Container %r not found' % container,
|
|
container=container, exc=err
|
|
)
|
|
|
|
error = None
|
|
while next_page_downs:
|
|
page_downs = next_page_downs
|
|
next_page_downs = None
|
|
|
|
# Start downloading the next page of list results when
|
|
# we have completed 80% of the previous page
|
|
next_page_triggered = False
|
|
next_page_trigger_point = 0.8 * len(page_downs)
|
|
|
|
page_results_yielded = 0
|
|
for o_down in interruptable_as_completed(page_downs):
|
|
yield o_down.result()
|
|
|
|
# Do we need to start the next set of downloads yet?
|
|
if not next_page_triggered:
|
|
page_results_yielded += 1
|
|
if page_results_yielded >= next_page_trigger_point:
|
|
try:
|
|
next_page_downs = self._submit_page_downloads(
|
|
container, _page_generator, options
|
|
)
|
|
except ClientException as err:
|
|
# Allow the current page to finish downloading
|
|
logger.exception(err)
|
|
error = err
|
|
except Exception:
|
|
# Something unexpected went wrong - cancel
|
|
# remaining downloads
|
|
for _d in page_downs:
|
|
_d.cancel()
|
|
raise
|
|
finally:
|
|
# Stop counting and testing
|
|
next_page_triggered = True
|
|
|
|
if error:
|
|
raise error
|
|
|
|
# Upload related methods
|
|
#
|
|
def upload(self, container, objects, options=None):
|
|
"""
|
|
Upload a list of objects to a given container.
|
|
|
|
:param container: The container (or pseudo-folder path) to put the
|
|
uploads into.
|
|
:param objects: A list of file/directory names (strings) or
|
|
SwiftUploadObject instances containing a source for the
|
|
created object, an object name, and an options dict
|
|
(can be None) to override the options for that
|
|
individual upload operation::
|
|
|
|
[
|
|
'/path/to/file',
|
|
SwiftUploadObject('/path', object_name='obj1'),
|
|
...
|
|
]
|
|
|
|
The options dict is as described below.
|
|
|
|
The SwiftUploadObject source may be one of:
|
|
|
|
* A file-like object (with a read method)
|
|
* A string containing the path to a local
|
|
file or directory
|
|
* None, to indicate that we want an empty object
|
|
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation.
|
|
These options are applied to all upload operations
|
|
performed by this call, unless overridden on a per
|
|
object basis. Possible options are given below::
|
|
|
|
{
|
|
'meta': [],
|
|
'header': [],
|
|
'segment_size': None,
|
|
'use_slo': False,
|
|
'segment_container': None,
|
|
'leave_segments': False,
|
|
'changed': None,
|
|
'skip_identical': False,
|
|
'fail_fast': False,
|
|
'dir_marker': False # Only for None sources
|
|
}
|
|
|
|
:returns: A generator for returning the results of the uploads.
|
|
|
|
:raises SwiftError:
|
|
:raises ClientException:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
try:
|
|
segment_size = int(0 if options['segment_size'] is None else
|
|
options['segment_size'])
|
|
except ValueError:
|
|
raise SwiftError('Segment size should be an integer value')
|
|
|
|
# Incase we have a psudeo-folder path for <container> arg, derive
|
|
# the container name from the top path and prepend the rest to
|
|
# the object name. (same as passing --object-name).
|
|
container, _sep, pseudo_folder = container.partition('/')
|
|
|
|
# Try to create the container, just in case it doesn't exist. If this
|
|
# fails, it might just be because the user doesn't have container PUT
|
|
# permissions, so we'll ignore any error. If there's really a problem,
|
|
# it'll surface on the first object PUT.
|
|
policy_header = {}
|
|
_header = split_headers(options["header"])
|
|
if POLICY in _header:
|
|
policy_header[POLICY] = \
|
|
_header[POLICY]
|
|
create_containers = [
|
|
self.thread_manager.container_pool.submit(
|
|
self._create_container_job, container, headers=policy_header)
|
|
]
|
|
|
|
# wait for first container job to complete before possibly attempting
|
|
# segment container job because segment container job may attempt
|
|
# to HEAD the first container
|
|
for r in interruptable_as_completed(create_containers):
|
|
res = r.result()
|
|
yield res
|
|
|
|
if segment_size:
|
|
seg_container = container + '_segments'
|
|
if options['segment_container']:
|
|
seg_container = options['segment_container']
|
|
if seg_container != container:
|
|
if not policy_header:
|
|
# Since no storage policy was specified on the command
|
|
# line, rather than just letting swift pick the default
|
|
# storage policy, we'll try to create the segments
|
|
# container with the same policy as the upload container
|
|
create_containers = [
|
|
self.thread_manager.container_pool.submit(
|
|
self._create_container_job, seg_container,
|
|
policy_source=container
|
|
)
|
|
]
|
|
else:
|
|
create_containers = [
|
|
self.thread_manager.container_pool.submit(
|
|
self._create_container_job, seg_container,
|
|
headers=policy_header
|
|
)
|
|
]
|
|
|
|
for r in interruptable_as_completed(create_containers):
|
|
res = r.result()
|
|
yield res
|
|
|
|
# We maintain a results queue here and a separate thread to monitor
|
|
# the futures because we want to get results back from potential
|
|
# segment uploads too
|
|
rq = Queue()
|
|
file_jobs = {}
|
|
|
|
upload_objects = self._make_upload_objects(objects, pseudo_folder)
|
|
for upload_object in upload_objects:
|
|
s = upload_object.source
|
|
o = upload_object.object_name
|
|
o_opts = upload_object.options
|
|
details = {'action': 'upload', 'container': container}
|
|
if o_opts is not None:
|
|
object_options = deepcopy(options)
|
|
object_options.update(o_opts)
|
|
else:
|
|
object_options = options
|
|
if hasattr(s, 'read'):
|
|
# We've got a file like object to upload to o
|
|
file_future = self.thread_manager.object_uu_pool.submit(
|
|
self._upload_object_job, container, s, o, object_options,
|
|
results_queue=rq
|
|
)
|
|
details['file'] = s
|
|
details['object'] = o
|
|
file_jobs[file_future] = details
|
|
elif s is not None:
|
|
# We've got a path to upload to o
|
|
details['path'] = s
|
|
details['object'] = o
|
|
if isdir(s):
|
|
dir_future = self.thread_manager.object_uu_pool.submit(
|
|
self._create_dir_marker_job, container, o,
|
|
object_options, path=s
|
|
)
|
|
file_jobs[dir_future] = details
|
|
else:
|
|
try:
|
|
stat(s)
|
|
file_future = \
|
|
self.thread_manager.object_uu_pool.submit(
|
|
self._upload_object_job, container, s, o,
|
|
object_options, results_queue=rq
|
|
)
|
|
file_jobs[file_future] = details
|
|
except OSError as err:
|
|
# Avoid tying up threads with jobs that will fail
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res = {
|
|
'action': 'upload_object',
|
|
'container': container,
|
|
'object': o,
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'path': s
|
|
}
|
|
rq.put(res)
|
|
else:
|
|
# Create an empty object (as a dir marker if is_dir)
|
|
details['file'] = None
|
|
details['object'] = o
|
|
if object_options['dir_marker']:
|
|
dir_future = self.thread_manager.object_uu_pool.submit(
|
|
self._create_dir_marker_job, container, o,
|
|
object_options
|
|
)
|
|
file_jobs[dir_future] = details
|
|
else:
|
|
file_future = self.thread_manager.object_uu_pool.submit(
|
|
self._upload_object_job, container, StringIO(),
|
|
o, object_options
|
|
)
|
|
file_jobs[file_future] = details
|
|
|
|
# Start a thread to watch for upload results
|
|
Thread(
|
|
target=self._watch_futures, args=(file_jobs, rq)
|
|
).start()
|
|
|
|
# yield results as they become available, including those from
|
|
# segment uploads.
|
|
res = get_from_queue(rq)
|
|
cancelled = False
|
|
while res is not None:
|
|
yield res
|
|
|
|
if not res['success']:
|
|
if not cancelled and options['fail_fast']:
|
|
cancelled = True
|
|
for f in file_jobs:
|
|
f.cancel()
|
|
|
|
res = get_from_queue(rq)
|
|
|
|
@staticmethod
|
|
def _make_upload_objects(objects, pseudo_folder=''):
|
|
upload_objects = []
|
|
|
|
for o in objects:
|
|
if isinstance(o, string_types):
|
|
obj = SwiftUploadObject(o, urljoin(pseudo_folder,
|
|
o.lstrip('/')))
|
|
upload_objects.append(obj)
|
|
elif isinstance(o, SwiftUploadObject):
|
|
o.object_name = urljoin(pseudo_folder, o.object_name)
|
|
upload_objects.append(o)
|
|
else:
|
|
raise SwiftError(
|
|
"The upload operation takes only strings or "
|
|
"SwiftUploadObjects as input",
|
|
obj=o)
|
|
|
|
return upload_objects
|
|
|
|
@staticmethod
|
|
def _create_container_job(
|
|
conn, container, headers=None, policy_source=None):
|
|
"""
|
|
Create a container using the given connection
|
|
|
|
:param conn: The swift connection used for requests.
|
|
:param container: The container name to create.
|
|
:param headers: An optional dict of headers for the
|
|
put_container request.
|
|
:param policy_source: An optional name of a container whose policy we
|
|
should duplicate.
|
|
:return: A dict containing the results of the operation.
|
|
"""
|
|
res = {
|
|
'action': 'create_container',
|
|
'container': container,
|
|
'headers': headers
|
|
}
|
|
create_response = {}
|
|
try:
|
|
if policy_source is not None:
|
|
_meta = conn.head_container(policy_source)
|
|
if 'x-storage-policy' in _meta:
|
|
policy_header = {
|
|
POLICY: _meta.get('x-storage-policy')
|
|
}
|
|
if headers is None:
|
|
headers = policy_header
|
|
else:
|
|
headers.update(policy_header)
|
|
|
|
conn.put_container(
|
|
container, headers, response_dict=create_response
|
|
)
|
|
res.update({
|
|
'success': True,
|
|
'response_dict': create_response
|
|
})
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': create_response
|
|
})
|
|
return res
|
|
|
|
@staticmethod
|
|
def _create_dir_marker_job(conn, container, obj, options, path=None):
|
|
res = {
|
|
'action': 'create_dir_marker',
|
|
'container': container,
|
|
'object': obj,
|
|
'path': path
|
|
}
|
|
results_dict = {}
|
|
if obj.startswith('./') or obj.startswith('.\\'):
|
|
obj = obj[2:]
|
|
if obj.startswith('/'):
|
|
obj = obj[1:]
|
|
if path is not None:
|
|
put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
|
|
else:
|
|
put_headers = {'x-object-meta-mtime': "%f" % round(time())}
|
|
res['headers'] = put_headers
|
|
if options['changed']:
|
|
try:
|
|
headers = conn.head_object(container, obj)
|
|
ct = headers.get('content-type', '').split(';', 1)[0]
|
|
cl = int(headers.get('content-length'))
|
|
et = headers.get('etag')
|
|
mt = headers.get('x-object-meta-mtime')
|
|
|
|
if (ct in KNOWN_DIR_MARKERS and
|
|
cl == 0 and
|
|
et == EMPTY_ETAG and
|
|
mt == put_headers['x-object-meta-mtime']):
|
|
res['success'] = True
|
|
return res
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
try:
|
|
conn.put_object(container, obj, '', content_length=0,
|
|
content_type=KNOWN_DIR_MARKERS[0],
|
|
headers=put_headers,
|
|
response_dict=results_dict)
|
|
res.update({
|
|
'success': True,
|
|
'response_dict': results_dict})
|
|
return res
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': results_dict})
|
|
return res
|
|
|
|
@staticmethod
|
|
def _upload_segment_job(conn, path, container, segment_name, segment_start,
|
|
segment_size, segment_index, obj_name, options,
|
|
results_queue=None):
|
|
results_dict = {}
|
|
if options['segment_container']:
|
|
segment_container = options['segment_container']
|
|
else:
|
|
segment_container = container + '_segments'
|
|
|
|
res = {
|
|
'action': 'upload_segment',
|
|
'for_container': container,
|
|
'for_object': obj_name,
|
|
'segment_index': segment_index,
|
|
'segment_size': segment_size,
|
|
'segment_location': '/%s/%s' % (segment_container,
|
|
segment_name),
|
|
'log_line': '%s segment %s' % (obj_name, segment_index),
|
|
}
|
|
fp = None
|
|
try:
|
|
fp = open(path, 'rb', DISK_BUFFER)
|
|
fp.seek(segment_start)
|
|
|
|
contents = LengthWrapper(fp, segment_size, md5=options['checksum'])
|
|
etag = conn.put_object(
|
|
segment_container,
|
|
segment_name,
|
|
contents,
|
|
content_length=segment_size,
|
|
content_type='application/swiftclient-segment',
|
|
response_dict=results_dict)
|
|
|
|
if options['checksum'] and etag and etag != contents.get_md5sum():
|
|
raise SwiftError('Segment {0}: upload verification failed: '
|
|
'md5 mismatch, local {1} != remote {2} '
|
|
'(remote segment has not been removed)'
|
|
.format(segment_index,
|
|
contents.get_md5sum(),
|
|
etag))
|
|
|
|
res.update({
|
|
'success': True,
|
|
'response_dict': results_dict,
|
|
'segment_etag': etag,
|
|
'attempts': conn.attempts
|
|
})
|
|
|
|
if results_queue is not None:
|
|
results_queue.put(res)
|
|
return res
|
|
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time,
|
|
'response_dict': results_dict,
|
|
'attempts': conn.attempts
|
|
})
|
|
|
|
if results_queue is not None:
|
|
results_queue.put(res)
|
|
return res
|
|
finally:
|
|
if fp is not None:
|
|
fp.close()
|
|
|
|
@staticmethod
|
|
def _put_object(conn, container, name, content, headers=None, md5=None):
|
|
"""
|
|
Upload object into a given container and verify the resulting ETag, if
|
|
the md5 optional parameter is passed.
|
|
|
|
:param conn: The Swift connection to use for uploads.
|
|
:param container: The container to put the object into.
|
|
:param name: The name of the object.
|
|
:param content: Object content.
|
|
:param headers: Headers (optional) to associate with the object.
|
|
:param md5: MD5 sum of the content. If passed in, will be used to
|
|
verify the returned ETag.
|
|
|
|
:returns: A dictionary as the response from calling put_object.
|
|
The keys are:
|
|
- status
|
|
- reason
|
|
- headers
|
|
On error, the dictionary contains the following keys:
|
|
- success (with value False)
|
|
- error - the encountered exception (object)
|
|
- error_timestamp
|
|
- response_dict - results from the put_object call, as
|
|
documented above
|
|
- attempts - number of attempts made
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
else:
|
|
headers = dict(headers)
|
|
if md5 is not None:
|
|
headers['etag'] = md5
|
|
results = {}
|
|
try:
|
|
etag = conn.put_object(
|
|
container, name, content, content_length=len(content),
|
|
headers=headers, response_dict=results)
|
|
if md5 is not None and etag != md5:
|
|
raise SwiftError('Upload verification failed for {0}: md5 '
|
|
'mismatch {1} != {2}'.format(name, md5, etag))
|
|
results['success'] = True
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
return {
|
|
'success': False,
|
|
'error': err,
|
|
'error_timestamp': err_time,
|
|
'response_dict': results,
|
|
'attempts': conn.attempts,
|
|
'traceback': traceback
|
|
}
|
|
return results
|
|
|
|
@staticmethod
|
|
def _upload_stream_segment(conn, container, object_name,
|
|
segment_container, segment_name,
|
|
segment_size, segment_index,
|
|
headers, fd):
|
|
"""
|
|
Upload a segment from a stream, buffering it in memory first. The
|
|
resulting object is placed either as a segment in the segment
|
|
container, or if it is smaller than a single segment, as the given
|
|
object name.
|
|
|
|
:param conn: Swift Connection to use.
|
|
:param container: Container in which the object would be placed.
|
|
:param object_name: Name of the final object (used in case the stream
|
|
is smaller than the segment_size)
|
|
:param segment_container: Container to hold the object segments.
|
|
:param segment_name: The name of the segment.
|
|
:param segment_size: Minimum segment size.
|
|
:param segment_index: The segment index.
|
|
:param headers: Headers to attach to the segment/object.
|
|
:param fd: File-like handle for the content. Must implement read().
|
|
|
|
:returns: Dictionary, containing the following keys:
|
|
- complete -- whether the stream is exhausted
|
|
- segment_size - the actual size of the segment (may be
|
|
smaller than the passed in segment_size)
|
|
- segment_location - path to the segment
|
|
- segment_index - index of the segment
|
|
- segment_etag - the ETag for the segment
|
|
"""
|
|
buf = []
|
|
dgst = md5()
|
|
bytes_read = 0
|
|
while bytes_read < segment_size:
|
|
data = fd.read(segment_size - bytes_read)
|
|
if not data:
|
|
break
|
|
bytes_read += len(data)
|
|
dgst.update(data)
|
|
buf.append(data)
|
|
buf = b''.join(buf)
|
|
segment_hash = dgst.hexdigest()
|
|
|
|
if not buf and segment_index > 0:
|
|
# Happens if the segment size aligns with the object size
|
|
return {'complete': True,
|
|
'segment_size': 0,
|
|
'segment_index': None,
|
|
'segment_etag': None,
|
|
'segment_location': None,
|
|
'success': True}
|
|
|
|
if segment_index == 0 and len(buf) < segment_size:
|
|
ret = SwiftService._put_object(
|
|
conn, container, object_name, buf, headers, segment_hash)
|
|
ret['segment_location'] = '/%s/%s' % (container, object_name)
|
|
else:
|
|
ret = SwiftService._put_object(
|
|
conn, segment_container, segment_name, buf, headers,
|
|
segment_hash)
|
|
ret['segment_location'] = '/%s/%s' % (
|
|
segment_container, segment_name)
|
|
|
|
ret.update(
|
|
dict(complete=len(buf) < segment_size,
|
|
segment_size=len(buf),
|
|
segment_index=segment_index,
|
|
segment_etag=segment_hash,
|
|
for_object=object_name))
|
|
return ret
|
|
|
|
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
|
|
chunks = []
|
|
if 'x-object-manifest' in headers:
|
|
scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
|
|
for part in self.list(scontainer, {'prefix': sprefix}):
|
|
if part["success"]:
|
|
chunks.extend(part["listing"])
|
|
else:
|
|
raise part["error"]
|
|
elif config_true_value(headers.get('x-static-large-object')):
|
|
if manifest is None:
|
|
headers, manifest = conn.get_object(
|
|
container, obj, query_string='multipart-manifest=get')
|
|
manifest = parse_api_response(headers, manifest)
|
|
for chunk in manifest:
|
|
if chunk.get('sub_slo'):
|
|
scont, sobj = chunk['name'].lstrip('/').split('/', 1)
|
|
chunks.extend(self._get_chunk_data(
|
|
conn, scont, sobj, {'x-static-large-object': True}))
|
|
else:
|
|
chunks.append(chunk)
|
|
else:
|
|
chunks.append({'hash': headers.get('etag').strip('"'),
|
|
'bytes': int(headers.get('content-length'))})
|
|
return chunks
|
|
|
|
def _is_identical(self, chunk_data, path):
|
|
if path is None:
|
|
return False
|
|
try:
|
|
fp = open(path, 'rb', DISK_BUFFER)
|
|
except IOError:
|
|
return False
|
|
|
|
with fp:
|
|
for chunk in chunk_data:
|
|
to_read = chunk['bytes']
|
|
md5sum = md5()
|
|
while to_read:
|
|
data = fp.read(min(DISK_BUFFER, to_read))
|
|
if not data:
|
|
return False
|
|
md5sum.update(data)
|
|
to_read -= len(data)
|
|
if md5sum.hexdigest() != chunk['hash']:
|
|
return False
|
|
# Each chunk is verified; check that we're at the end of the file
|
|
return not fp.read(1)
|
|
|
|
@staticmethod
|
|
def _upload_slo_manifest(conn, segment_results, container, obj, headers):
|
|
"""
|
|
Upload an SLO manifest, given the results of uploading each segment, to
|
|
the specified container.
|
|
|
|
:param segment_results: List of response_dict structures, as populated
|
|
by _upload_segment_job. Specifically, each
|
|
entry must container the following keys:
|
|
- segment_location
|
|
- segment_etag
|
|
- segment_size
|
|
- segment_index
|
|
:param container: The container to put the manifest into.
|
|
:param obj: The name of the manifest object to use.
|
|
:param headers: Optional set of headers to attach to the manifest.
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
segment_results.sort(key=lambda di: di['segment_index'])
|
|
for seg in segment_results:
|
|
seg_loc = seg['segment_location'].lstrip('/')
|
|
if isinstance(seg_loc, text_type):
|
|
seg_loc = seg_loc.encode('utf-8')
|
|
|
|
manifest_data = json.dumps([
|
|
{
|
|
'path': d['segment_location'],
|
|
'etag': d['segment_etag'],
|
|
'size_bytes': d['segment_size']
|
|
} for d in segment_results
|
|
])
|
|
|
|
response = {}
|
|
conn.put_object(
|
|
container, obj, manifest_data,
|
|
headers=headers,
|
|
query_string='multipart-manifest=put',
|
|
response_dict=response)
|
|
return response
|
|
|
|
def _upload_object_job(self, conn, container, source, obj, options,
|
|
results_queue=None):
|
|
if obj.startswith('./') or obj.startswith('.\\'):
|
|
obj = obj[2:]
|
|
if obj.startswith('/'):
|
|
obj = obj[1:]
|
|
res = {
|
|
'action': 'upload_object',
|
|
'container': container,
|
|
'object': obj
|
|
}
|
|
if hasattr(source, 'read'):
|
|
stream = source
|
|
path = None
|
|
else:
|
|
path = source
|
|
res['path'] = path
|
|
try:
|
|
if path is not None:
|
|
put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
|
|
else:
|
|
put_headers = {'x-object-meta-mtime': "%f" % round(time())}
|
|
|
|
res['headers'] = put_headers
|
|
|
|
# We need to HEAD all objects now in case we're overwriting a
|
|
# manifest object and need to delete the old segments
|
|
# ourselves.
|
|
old_manifest = None
|
|
old_slo_manifest_paths = []
|
|
new_slo_manifest_paths = set()
|
|
segment_size = int(0 if options['segment_size'] is None
|
|
else options['segment_size'])
|
|
if (options['changed'] or options['skip_identical'] or
|
|
not options['leave_segments']):
|
|
try:
|
|
headers = conn.head_object(container, obj)
|
|
is_slo = config_true_value(
|
|
headers.get('x-static-large-object'))
|
|
|
|
if options['skip_identical'] or (
|
|
is_slo and not options['leave_segments']):
|
|
chunk_data = self._get_chunk_data(
|
|
conn, container, obj, headers)
|
|
|
|
if options['skip_identical'] and self._is_identical(
|
|
chunk_data, path):
|
|
res.update({
|
|
'success': True,
|
|
'status': 'skipped-identical'
|
|
})
|
|
return res
|
|
|
|
cl = int(headers.get('content-length'))
|
|
mt = headers.get('x-object-meta-mtime')
|
|
if (path is not None and options['changed'] and
|
|
cl == getsize(path) and
|
|
mt == put_headers['x-object-meta-mtime']):
|
|
res.update({
|
|
'success': True,
|
|
'status': 'skipped-changed'
|
|
})
|
|
return res
|
|
if not options['leave_segments']:
|
|
old_manifest = headers.get('x-object-manifest')
|
|
if is_slo:
|
|
for old_seg in chunk_data:
|
|
seg_path = old_seg['name'].lstrip('/')
|
|
if isinstance(seg_path, text_type):
|
|
seg_path = seg_path.encode('utf-8')
|
|
old_slo_manifest_paths.append(seg_path)
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
|
|
# Merge the command line header options to the put_headers
|
|
put_headers.update(split_headers(
|
|
options['meta'], 'X-Object-Meta-'))
|
|
put_headers.update(split_headers(options['header'], ''))
|
|
|
|
# Don't do segment job if object is not big enough, and never do
|
|
# a segment job if we're reading from a stream - we may fail if we
|
|
# go over the single object limit, but this gives us a nice way
|
|
# to create objects from memory
|
|
if (path is not None and segment_size and
|
|
(getsize(path) > segment_size)):
|
|
res['large_object'] = True
|
|
seg_container = container + '_segments'
|
|
if options['segment_container']:
|
|
seg_container = options['segment_container']
|
|
full_size = getsize(path)
|
|
|
|
segment_futures = []
|
|
segment_pool = self.thread_manager.segment_pool
|
|
segment = 0
|
|
segment_start = 0
|
|
|
|
while segment_start < full_size:
|
|
if segment_start + segment_size > full_size:
|
|
segment_size = full_size - segment_start
|
|
if options['use_slo']:
|
|
segment_name = '%s/slo/%s/%s/%s/%08d' % (
|
|
obj, put_headers['x-object-meta-mtime'],
|
|
full_size, options['segment_size'], segment
|
|
)
|
|
else:
|
|
segment_name = '%s/%s/%s/%s/%08d' % (
|
|
obj, put_headers['x-object-meta-mtime'],
|
|
full_size, options['segment_size'], segment
|
|
)
|
|
seg = segment_pool.submit(
|
|
self._upload_segment_job, path, container,
|
|
segment_name, segment_start, segment_size, segment,
|
|
obj, options, results_queue=results_queue
|
|
)
|
|
segment_futures.append(seg)
|
|
segment += 1
|
|
segment_start += segment_size
|
|
|
|
segment_results = []
|
|
errors = False
|
|
exceptions = []
|
|
for f in interruptable_as_completed(segment_futures):
|
|
try:
|
|
r = f.result()
|
|
if not r['success']:
|
|
errors = True
|
|
segment_results.append(r)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
errors = True
|
|
exceptions.append((err, traceback, err_time))
|
|
if errors:
|
|
err = ClientException(
|
|
'Aborting manifest creation '
|
|
'because not all segments could be uploaded. %s/%s'
|
|
% (container, obj))
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'exceptions': exceptions,
|
|
'segment_results': segment_results
|
|
})
|
|
return res
|
|
|
|
res['segment_results'] = segment_results
|
|
|
|
if options['use_slo']:
|
|
response = self._upload_slo_manifest(
|
|
conn, segment_results, container, obj, put_headers)
|
|
res['manifest_response_dict'] = response
|
|
new_slo_manifest_paths = {
|
|
seg['segment_location'] for seg in segment_results}
|
|
else:
|
|
new_object_manifest = '%s/%s/%s/%s/%s/' % (
|
|
quote(seg_container.encode('utf8')),
|
|
quote(obj.encode('utf8')),
|
|
put_headers['x-object-meta-mtime'], full_size,
|
|
options['segment_size'])
|
|
if old_manifest and old_manifest.rstrip('/') == \
|
|
new_object_manifest.rstrip('/'):
|
|
old_manifest = None
|
|
put_headers['x-object-manifest'] = new_object_manifest
|
|
mr = {}
|
|
conn.put_object(
|
|
container, obj, '', content_length=0,
|
|
headers=put_headers,
|
|
response_dict=mr
|
|
)
|
|
res['manifest_response_dict'] = mr
|
|
elif options['use_slo'] and segment_size and not path:
|
|
segment = 0
|
|
results = []
|
|
while True:
|
|
segment_name = '%s/slo/%s/%s/%08d' % (
|
|
obj, put_headers['x-object-meta-mtime'],
|
|
segment_size, segment
|
|
)
|
|
seg_container = container + '_segments'
|
|
if options['segment_container']:
|
|
seg_container = options['segment_container']
|
|
ret = self._upload_stream_segment(
|
|
conn, container, obj,
|
|
seg_container,
|
|
segment_name,
|
|
segment_size,
|
|
segment,
|
|
put_headers,
|
|
stream
|
|
)
|
|
if not ret['success']:
|
|
return ret
|
|
if (ret['complete'] and segment == 0) or\
|
|
ret['segment_size'] > 0:
|
|
results.append(ret)
|
|
if results_queue is not None:
|
|
# Don't insert the 0-sized segments or objects
|
|
# themselves
|
|
if ret['segment_location'] != '/%s/%s' % (
|
|
container, obj) and ret['segment_size'] > 0:
|
|
results_queue.put(ret)
|
|
if ret['complete']:
|
|
break
|
|
segment += 1
|
|
if results[0]['segment_location'] != '/%s/%s' % (
|
|
container, obj):
|
|
response = self._upload_slo_manifest(
|
|
conn, results, container, obj, put_headers)
|
|
res['manifest_response_dict'] = response
|
|
new_slo_manifest_paths = {
|
|
r['segment_location'] for r in results}
|
|
res['large_object'] = True
|
|
else:
|
|
res['response_dict'] = ret
|
|
res['large_object'] = False
|
|
else:
|
|
res['large_object'] = False
|
|
obr = {}
|
|
fp = None
|
|
try:
|
|
if path is not None:
|
|
content_length = getsize(path)
|
|
fp = open(path, 'rb', DISK_BUFFER)
|
|
contents = LengthWrapper(fp,
|
|
content_length,
|
|
md5=options['checksum'])
|
|
else:
|
|
content_length = None
|
|
contents = ReadableToIterable(stream,
|
|
md5=options['checksum'])
|
|
|
|
etag = conn.put_object(
|
|
container, obj, contents,
|
|
content_length=content_length, headers=put_headers,
|
|
response_dict=obr
|
|
)
|
|
res['response_dict'] = obr
|
|
|
|
if (options['checksum'] and
|
|
etag and etag != contents.get_md5sum()):
|
|
raise SwiftError(
|
|
'Object upload verification failed: '
|
|
'md5 mismatch, local {0} != remote {1} '
|
|
'(remote object has not been removed)'
|
|
.format(contents.get_md5sum(), etag))
|
|
finally:
|
|
if fp is not None:
|
|
fp.close()
|
|
if old_manifest or old_slo_manifest_paths:
|
|
drs = []
|
|
delobjsmap = {}
|
|
if old_manifest:
|
|
scontainer, sprefix = old_manifest.split('/', 1)
|
|
sprefix = sprefix.rstrip('/') + '/'
|
|
delobjsmap[scontainer] = []
|
|
for part in self.list(scontainer, {'prefix': sprefix}):
|
|
if not part["success"]:
|
|
raise part["error"]
|
|
delobjsmap[scontainer].extend(
|
|
seg['name'] for seg in part['listing'])
|
|
|
|
if old_slo_manifest_paths:
|
|
for seg_to_delete in old_slo_manifest_paths:
|
|
if seg_to_delete in new_slo_manifest_paths:
|
|
continue
|
|
scont, sobj = \
|
|
seg_to_delete.split(b'/', 1)
|
|
delobjs_cont = delobjsmap.get(scont, [])
|
|
delobjs_cont.append(sobj)
|
|
delobjsmap[scont] = delobjs_cont
|
|
|
|
del_segs = []
|
|
for dscont, dsobjs in delobjsmap.items():
|
|
for dsobj in dsobjs:
|
|
del_seg = self.thread_manager.segment_pool.submit(
|
|
self._delete_segment, dscont, dsobj,
|
|
results_queue=results_queue
|
|
)
|
|
del_segs.append(del_seg)
|
|
|
|
for del_seg in interruptable_as_completed(del_segs):
|
|
drs.append(del_seg.result())
|
|
res['segment_delete_results'] = drs
|
|
|
|
# return dict for printing
|
|
res.update({
|
|
'success': True,
|
|
'status': 'uploaded',
|
|
'attempts': conn.attempts})
|
|
return res
|
|
|
|
except OSError as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
if err.errno == ENOENT:
|
|
error = SwiftError('Local file %r not found' % path, exc=err)
|
|
else:
|
|
error = err
|
|
res.update({
|
|
'success': False,
|
|
'error': error,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
|
|
# Delete related methods
|
|
#
|
|
def delete(self, container=None, objects=None, options=None):
|
|
"""
|
|
Delete operations on an account, optional container and optional list
|
|
of objects.
|
|
|
|
:param container: The container to delete or delete from.
|
|
:param objects: The list of objects to delete.
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation::
|
|
|
|
{
|
|
'yes_all': False,
|
|
'leave_segments': False,
|
|
'prefix': None,
|
|
'header': [],
|
|
}
|
|
|
|
:returns: A generator for returning the results of the delete
|
|
operations. Each result yielded from the generator is either
|
|
a 'delete_container', 'delete_object', 'delete_segment', or
|
|
'bulk_delete' dictionary containing the results of an
|
|
individual delete operation.
|
|
|
|
:raises ClientException:
|
|
:raises SwiftError:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
if container is not None:
|
|
if objects is not None:
|
|
if options['prefix']:
|
|
objects = [obj for obj in objects
|
|
if obj.startswith(options['prefix'])]
|
|
rq = Queue()
|
|
obj_dels = {}
|
|
|
|
bulk_page_size = self._bulk_delete_page_size(objects)
|
|
if bulk_page_size > 1:
|
|
page_at_a_time = n_at_a_time(objects, bulk_page_size)
|
|
for page_slice in page_at_a_time:
|
|
for obj_slice in n_groups(
|
|
page_slice,
|
|
self._options['object_dd_threads']):
|
|
self._bulk_delete(container, obj_slice, options,
|
|
obj_dels)
|
|
else:
|
|
self._per_item_delete(container, objects, options,
|
|
obj_dels, rq)
|
|
|
|
# Start a thread to watch for delete results
|
|
Thread(
|
|
target=self._watch_futures, args=(obj_dels, rq)
|
|
).start()
|
|
|
|
# yield results as they become available, raising the first
|
|
# encountered exception
|
|
res = get_from_queue(rq)
|
|
while res is not None:
|
|
yield res
|
|
|
|
# Cancel the remaining jobs if necessary
|
|
if options['fail_fast'] and not res['success']:
|
|
for d in obj_dels.keys():
|
|
d.cancel()
|
|
|
|
res = get_from_queue(rq)
|
|
else:
|
|
for res in self._delete_container(container, options):
|
|
yield res
|
|
else:
|
|
if objects:
|
|
raise SwiftError('Objects specified without container')
|
|
if options['prefix']:
|
|
raise SwiftError('Prefix specified without container')
|
|
if options['yes_all']:
|
|
cancelled = False
|
|
containers = []
|
|
for part in self.list():
|
|
if part["success"]:
|
|
containers.extend(c['name'] for c in part['listing'])
|
|
else:
|
|
raise part["error"]
|
|
|
|
for con in containers:
|
|
if cancelled:
|
|
break
|
|
else:
|
|
for res in self._delete_container(
|
|
con, options=options):
|
|
yield res
|
|
|
|
# Cancel the remaining container deletes, but yield
|
|
# any pending results
|
|
if (not cancelled and options['fail_fast'] and
|
|
not res['success']):
|
|
cancelled = True
|
|
|
|
def _bulk_delete_page_size(self, objects):
|
|
'''
|
|
Given the iterable 'objects', will return how many items should be
|
|
deleted at a time.
|
|
|
|
:param objects: An iterable that supports 'len()'
|
|
:returns: The bulk delete page size (i.e. the max number of
|
|
objects that can be bulk deleted at once, as reported by
|
|
the cluster). If bulk delete is disabled, return 1
|
|
'''
|
|
if len(objects) <= 2 * self._options['object_dd_threads']:
|
|
# Not many objects; may as well delete one-by-one
|
|
return 1
|
|
|
|
try:
|
|
cap_result = self.capabilities()
|
|
if not cap_result['success']:
|
|
# This shouldn't actually happen, but just in case we start
|
|
# being more nuanced about our capabilities result...
|
|
return 1
|
|
except ClientException:
|
|
# Old swift, presumably; assume no bulk middleware
|
|
return 1
|
|
|
|
swift_info = cap_result['capabilities']
|
|
if 'bulk_delete' in swift_info:
|
|
return swift_info['bulk_delete'].get(
|
|
'max_deletes_per_request', 10000)
|
|
else:
|
|
return 1
|
|
|
|
def _per_item_delete(self, container, objects, options, rdict, rq):
|
|
for obj in objects:
|
|
obj_del = self.thread_manager.object_dd_pool.submit(
|
|
self._delete_object, container, obj, options,
|
|
results_queue=rq
|
|
)
|
|
obj_details = {'container': container, 'object': obj}
|
|
rdict[obj_del] = obj_details
|
|
|
|
@staticmethod
|
|
def _delete_segment(conn, container, obj, results_queue=None):
|
|
results_dict = {}
|
|
try:
|
|
res = {'success': True}
|
|
conn.delete_object(container, obj, response_dict=results_dict)
|
|
except Exception as err:
|
|
if not isinstance(err, ClientException) or err.http_status != 404:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res = {
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
}
|
|
|
|
res.update({
|
|
'action': 'delete_segment',
|
|
'container': container,
|
|
'object': obj,
|
|
'attempts': conn.attempts,
|
|
'response_dict': results_dict
|
|
})
|
|
|
|
if results_queue is not None:
|
|
results_queue.put(res)
|
|
return res
|
|
|
|
def _delete_object(self, conn, container, obj, options,
|
|
results_queue=None):
|
|
_headers = {}
|
|
_headers = split_headers(options.get('header', []))
|
|
res = {
|
|
'action': 'delete_object',
|
|
'container': container,
|
|
'object': obj
|
|
}
|
|
try:
|
|
old_manifest = None
|
|
query_string = None
|
|
|
|
if not options['leave_segments']:
|
|
try:
|
|
headers = conn.head_object(container, obj,
|
|
headers=_headers)
|
|
old_manifest = headers.get('x-object-manifest')
|
|
if config_true_value(headers.get('x-static-large-object')):
|
|
query_string = 'multipart-manifest=delete'
|
|
except ClientException as err:
|
|
if err.http_status != 404:
|
|
raise
|
|
|
|
results_dict = {}
|
|
conn.delete_object(container, obj,
|
|
headers=_headers,
|
|
query_string=query_string,
|
|
response_dict=results_dict)
|
|
|
|
if old_manifest:
|
|
|
|
dlo_segments_deleted = True
|
|
segment_pool = self.thread_manager.segment_pool
|
|
s_container, s_prefix = old_manifest.split('/', 1)
|
|
s_prefix = s_prefix.rstrip('/') + '/'
|
|
|
|
del_segs = []
|
|
for part in self.list(
|
|
container=s_container, options={'prefix': s_prefix}):
|
|
if part["success"]:
|
|
seg_list = [o["name"] for o in part["listing"]]
|
|
else:
|
|
raise part["error"]
|
|
|
|
for seg in seg_list:
|
|
del_seg = segment_pool.submit(
|
|
self._delete_segment, s_container,
|
|
seg, results_queue=results_queue
|
|
)
|
|
del_segs.append(del_seg)
|
|
|
|
for del_seg in interruptable_as_completed(del_segs):
|
|
del_res = del_seg.result()
|
|
if not del_res["success"]:
|
|
dlo_segments_deleted = False
|
|
|
|
res['dlo_segments_deleted'] = dlo_segments_deleted
|
|
|
|
res.update({
|
|
'success': True,
|
|
'response_dict': results_dict,
|
|
'attempts': conn.attempts,
|
|
})
|
|
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
return res
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def _delete_empty_container(conn, container, options):
|
|
results_dict = {}
|
|
_headers = {}
|
|
_headers = split_headers(options.get('header', []))
|
|
try:
|
|
conn.delete_container(container, headers=_headers,
|
|
response_dict=results_dict)
|
|
res = {'success': True}
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res = {
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
}
|
|
|
|
res.update({
|
|
'action': 'delete_container',
|
|
'container': container,
|
|
'object': None,
|
|
'attempts': conn.attempts,
|
|
'response_dict': results_dict
|
|
})
|
|
return res
|
|
|
|
def _delete_container(self, container, options):
|
|
try:
|
|
for part in self.list(container=container, options=options):
|
|
if not part["success"]:
|
|
|
|
raise part["error"]
|
|
|
|
for res in self.delete(
|
|
container=container,
|
|
objects=[o['name'] for o in part['listing']],
|
|
options=options):
|
|
yield res
|
|
if options['prefix']:
|
|
# We're only deleting a subset of objects within the container
|
|
return
|
|
|
|
con_del = self.thread_manager.container_pool.submit(
|
|
self._delete_empty_container, container, options
|
|
)
|
|
con_del_res = get_future_result(con_del)
|
|
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
con_del_res = {
|
|
'action': 'delete_container',
|
|
'container': container,
|
|
'object': None,
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
}
|
|
|
|
yield con_del_res
|
|
|
|
# Bulk methods
|
|
#
|
|
def _bulk_delete(self, container, objects, options, rdict):
|
|
if objects:
|
|
bulk_del = self.thread_manager.object_dd_pool.submit(
|
|
self._bulkdelete, container, objects, options
|
|
)
|
|
bulk_details = {'container': container, 'objects': objects}
|
|
rdict[bulk_del] = bulk_details
|
|
|
|
@staticmethod
|
|
def _bulkdelete(conn, container, objects, options):
|
|
results_dict = {}
|
|
try:
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'text/plain',
|
|
}
|
|
res = {'container': container, 'objects': objects}
|
|
objects = [quote(('/%s/%s' % (container, obj)).encode('utf-8'))
|
|
for obj in objects]
|
|
headers, body = conn.post_account(
|
|
headers=headers,
|
|
query_string='bulk-delete',
|
|
data=b''.join(obj.encode('utf-8') + b'\n' for obj in objects),
|
|
response_dict=results_dict)
|
|
if body:
|
|
res.update({'success': True,
|
|
'result': parse_api_response(headers, body)})
|
|
else:
|
|
res.update({
|
|
'success': False,
|
|
'error': SwiftError(
|
|
'No content received on account POST. '
|
|
'Is the bulk operations middleware enabled?')})
|
|
except Exception as e:
|
|
res.update({'success': False, 'error': e})
|
|
|
|
res.update({
|
|
'action': 'bulk_delete',
|
|
'attempts': conn.attempts,
|
|
'response_dict': results_dict
|
|
})
|
|
|
|
return res
|
|
|
|
# Copy related methods
|
|
#
|
|
def copy(self, container, objects, options=None):
|
|
"""
|
|
Copy operations on a list of objects in a container. Destination
|
|
containers will be created.
|
|
|
|
:param container: The container from which to copy the objects.
|
|
:param objects: A list of object names (strings) or SwiftCopyObject
|
|
instances containing an object name and an
|
|
options dict (can be None) to override the options for
|
|
that individual copy operation::
|
|
|
|
[
|
|
'object_name',
|
|
SwiftCopyObject(
|
|
'object_name',
|
|
options={
|
|
'destination': '/container/object',
|
|
'fresh_metadata': False,
|
|
...
|
|
}),
|
|
...
|
|
]
|
|
|
|
The options dict is described below.
|
|
:param options: A dictionary containing options to override the global
|
|
options specified during the service object creation.
|
|
These options are applied to all copy operations
|
|
performed by this call, unless overridden on a per
|
|
object basis.
|
|
The options "destination" and "fresh_metadata" do
|
|
not need to be set, in this case objects will be
|
|
copied onto themselves and metadata will not be
|
|
refreshed.
|
|
The option "destination" can also be specified in the
|
|
format '/container', in which case objects without an
|
|
explicit destination will be copied to the destination
|
|
/container/original_object_name. Combinations of
|
|
multiple objects and a destination in the format
|
|
'/container/object' is invalid. Possible options are
|
|
given below::
|
|
|
|
{
|
|
'meta': [],
|
|
'header': [],
|
|
'destination': '/container/object',
|
|
'fresh_metadata': False,
|
|
}
|
|
|
|
:returns: A generator returning the results of copying the given list
|
|
of objects.
|
|
|
|
:raises SwiftError:
|
|
"""
|
|
if options is not None:
|
|
options = dict(self._options, **options)
|
|
else:
|
|
options = self._options
|
|
|
|
# Try to create the container, just in case it doesn't exist. If this
|
|
# fails, it might just be because the user doesn't have container PUT
|
|
# permissions, so we'll ignore any error. If there's really a problem,
|
|
# it'll surface on the first object COPY.
|
|
containers = set(
|
|
next(p for p in obj.destination.split("/") if p)
|
|
for obj in objects
|
|
if isinstance(obj, SwiftCopyObject) and obj.destination
|
|
)
|
|
if options.get('destination'):
|
|
destination_split = options['destination'].split('/')
|
|
if destination_split[0]:
|
|
raise SwiftError("destination must be in format /cont[/obj]")
|
|
_str_objs = [
|
|
o for o in objects if not isinstance(o, SwiftCopyObject)
|
|
]
|
|
if len(destination_split) > 2 and len(_str_objs) > 1:
|
|
# TODO (clayg): could be useful to copy multiple objects into
|
|
# a destination like "/container/common/prefix/for/objects/"
|
|
# where the trailing "/" indicates the destination option is a
|
|
# prefix!
|
|
raise SwiftError("Combination of multiple objects and "
|
|
"destination including object is invalid")
|
|
if destination_split[-1] == '':
|
|
# N.B. this protects the above case
|
|
raise SwiftError("destination can not end in a slash")
|
|
containers.add(destination_split[1])
|
|
|
|
policy_header = {}
|
|
_header = split_headers(options["header"])
|
|
if POLICY in _header:
|
|
policy_header[POLICY] = _header[POLICY]
|
|
create_containers = [
|
|
self.thread_manager.container_pool.submit(
|
|
self._create_container_job, cont, headers=policy_header)
|
|
for cont in containers
|
|
]
|
|
|
|
# wait for container creation jobs to complete before any COPY
|
|
for r in interruptable_as_completed(create_containers):
|
|
res = r.result()
|
|
yield res
|
|
|
|
copy_futures = []
|
|
copy_objects = self._make_copy_objects(objects, options)
|
|
for copy_object in copy_objects:
|
|
obj = copy_object.object_name
|
|
obj_options = copy_object.options
|
|
destination = copy_object.destination
|
|
fresh_metadata = copy_object.fresh_metadata
|
|
headers = split_headers(
|
|
options['meta'], 'X-Object-Meta-')
|
|
# add header options to the headers object for the request.
|
|
headers.update(
|
|
split_headers(options['header'], ''))
|
|
if obj_options is not None:
|
|
if 'meta' in obj_options:
|
|
headers.update(
|
|
split_headers(
|
|
obj_options['meta'], 'X-Object-Meta-'
|
|
)
|
|
)
|
|
if 'header' in obj_options:
|
|
headers.update(
|
|
split_headers(obj_options['header'], '')
|
|
)
|
|
|
|
copy = self.thread_manager.object_uu_pool.submit(
|
|
self._copy_object_job, container, obj, destination,
|
|
headers, fresh_metadata
|
|
)
|
|
copy_futures.append(copy)
|
|
|
|
for r in interruptable_as_completed(copy_futures):
|
|
res = r.result()
|
|
yield res
|
|
|
|
@staticmethod
|
|
def _make_copy_objects(objects, options):
|
|
copy_objects = []
|
|
|
|
for o in objects:
|
|
if isinstance(o, string_types):
|
|
obj = SwiftCopyObject(o, options)
|
|
copy_objects.append(obj)
|
|
elif isinstance(o, SwiftCopyObject):
|
|
copy_objects.append(o)
|
|
else:
|
|
raise SwiftError(
|
|
"The copy operation takes only strings or "
|
|
"SwiftCopyObjects as input",
|
|
obj=o)
|
|
|
|
return copy_objects
|
|
|
|
@staticmethod
|
|
def _copy_object_job(conn, container, obj, destination, headers,
|
|
fresh_metadata):
|
|
response_dict = {}
|
|
res = {
|
|
'success': True,
|
|
'action': 'copy_object',
|
|
'container': container,
|
|
'object': obj,
|
|
'destination': destination,
|
|
'headers': headers,
|
|
'fresh_metadata': fresh_metadata,
|
|
'response_dict': response_dict
|
|
}
|
|
try:
|
|
conn.copy_object(
|
|
container, obj, destination=destination, headers=headers,
|
|
fresh_metadata=fresh_metadata, response_dict=response_dict)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
|
|
return res
|
|
|
|
# Capabilities related methods
|
|
#
|
|
def capabilities(self, url=None, refresh_cache=False):
|
|
"""
|
|
List the cluster capabilities.
|
|
|
|
:param url: Proxy URL of the cluster to retrieve capabilities.
|
|
|
|
:returns: A dictionary containing the capabilities of the cluster.
|
|
|
|
:raises ClientException:
|
|
"""
|
|
if not refresh_cache and url in self.capabilities_cache:
|
|
return self.capabilities_cache[url]
|
|
|
|
res = {
|
|
'action': 'capabilities',
|
|
'timestamp': time(),
|
|
}
|
|
|
|
cap = self.thread_manager.container_pool.submit(
|
|
self._get_capabilities, url
|
|
)
|
|
capabilities = get_future_result(cap)
|
|
res.update({
|
|
'success': True,
|
|
'capabilities': capabilities
|
|
})
|
|
if url is not None:
|
|
res.update({
|
|
'url': url
|
|
})
|
|
|
|
self.capabilities_cache[url] = res
|
|
return res
|
|
|
|
@staticmethod
|
|
def _get_capabilities(conn, url):
|
|
return conn.get_capabilities(url)
|
|
|
|
# Helper methods
|
|
#
|
|
@staticmethod
|
|
def _watch_futures(futures, result_queue):
|
|
"""
|
|
Watches a dict of futures and pushes their results onto the given
|
|
queue. We use this to wait for a set of futures which may create
|
|
futures of their own to wait for, whilst also allowing us to
|
|
immediately return the results of those sub-jobs.
|
|
|
|
When all futures have completed, None is pushed to the queue
|
|
|
|
If the future is cancelled, we use the dict to return details about
|
|
the cancellation.
|
|
"""
|
|
futures_only = list(futures.keys())
|
|
for f in interruptable_as_completed(futures_only):
|
|
try:
|
|
r = f.result()
|
|
if r is not None:
|
|
result_queue.put(r)
|
|
except CancelledError:
|
|
details = futures[f]
|
|
res = details
|
|
res['status'] = 'cancelled'
|
|
result_queue.put(res)
|
|
except Exception as err:
|
|
traceback, err_time = report_traceback()
|
|
logger.exception(err)
|
|
details = futures[f]
|
|
res = details
|
|
res.update({
|
|
'success': False,
|
|
'error': err,
|
|
'traceback': traceback,
|
|
'error_timestamp': err_time
|
|
})
|
|
result_queue.put(res)
|
|
|
|
result_queue.put(None)
|