Joel Wright 3c0289844f Log and report trace on service operation fails
This patch adds exception logging to the swift service API. Each
operation that results in failure of any operation will now log
the exception as well as report a timestamp and full stack trace
in the results returned by the service API calls.

Change-Id: I7336b28354e7740ea7d048bdf355e3c1a1b4436c
2015-08-31 22:03:26 +01:00

2351 lines
88 KiB
Python

# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from concurrent.futures import as_completed, CancelledError, TimeoutError
from copy import deepcopy
from errno import EEXIST, ENOENT
from hashlib import md5
from os import environ, makedirs, stat, utime
from os.path import (
basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
)
from random import shuffle
from time import time
from threading import Thread
from six import StringIO, text_type
from six.moves.queue import Queue
from six.moves.queue import Empty as QueueEmpty
from six.moves.urllib.parse import quote, unquote
from six import Iterator, string_types
import json
from swiftclient import Connection
from swiftclient.command_helpers import (
stat_account, stat_container, stat_object
)
from swiftclient.utils import (
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
parse_api_response, report_traceback
)
from swiftclient.exceptions import ClientException
from swiftclient.multithreading import MultiThreadingManager
logger = logging.getLogger("swiftclient.service")
class ResultsIterator(Iterator):
def __init__(self, futures):
self.futures = interruptable_as_completed(futures)
def __iter__(self):
return self
def __next__(self):
next_completed_future = next(self.futures)
return next_completed_future.result()
class SwiftError(Exception):
def __init__(self, value, container=None, obj=None,
segment=None, exc=None):
self.value = value
self.container = container
self.obj = obj
self.segment = segment
self.exception = exc
def __str__(self):
value = repr(self.value)
if self.container is not None:
value += " container:%s" % self.container
if self.obj is not None:
value += " object:%s" % self.obj
if self.segment is not None:
value += " segment:%s" % self.segment
return value
def process_options(options):
if (not (options.get('auth') and options.get('user')
and options.get('key'))
and options.get('auth_version') != '3'):
# Use keystone 2.0 auth if any of the old-style args are missing
options['auth_version'] = '2.0'
# Use new-style args if old ones not present
if not options['auth'] and options['os_auth_url']:
options['auth'] = options['os_auth_url']
if not options['user']and options['os_username']:
options['user'] = options['os_username']
if not options['key'] and options['os_password']:
options['key'] = options['os_password']
# Specific OpenStack options
options['os_options'] = {
'user_id': options['os_user_id'],
'user_domain_id': options['os_user_domain_id'],
'user_domain_name': options['os_user_domain_name'],
'tenant_id': options['os_tenant_id'],
'tenant_name': options['os_tenant_name'],
'project_id': options['os_project_id'],
'project_name': options['os_project_name'],
'project_domain_id': options['os_project_domain_id'],
'project_domain_name': options['os_project_domain_name'],
'service_type': options['os_service_type'],
'endpoint_type': options['os_endpoint_type'],
'auth_token': options['os_auth_token'],
'object_storage_url': options['os_storage_url'],
'region_name': options['os_region_name'],
}
def _build_default_global_options():
return {
"snet": False,
"verbose": 1,
"debug": False,
"info": False,
"auth": environ.get('ST_AUTH'),
"auth_version": environ.get('ST_AUTH_VERSION', '1.0'),
"user": environ.get('ST_USER'),
"key": environ.get('ST_KEY'),
"retries": 5,
"os_username": environ.get('OS_USERNAME'),
"os_user_id": environ.get('OS_USER_ID'),
"os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
"os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
"os_password": environ.get('OS_PASSWORD'),
"os_tenant_id": environ.get('OS_TENANT_ID'),
"os_tenant_name": environ.get('OS_TENANT_NAME'),
"os_project_name": environ.get('OS_PROJECT_NAME'),
"os_project_id": environ.get('OS_PROJECT_ID'),
"os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
"os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
"os_auth_url": environ.get('OS_AUTH_URL'),
"os_auth_token": environ.get('OS_AUTH_TOKEN'),
"os_storage_url": environ.get('OS_STORAGE_URL'),
"os_region_name": environ.get('OS_REGION_NAME'),
"os_service_type": environ.get('OS_SERVICE_TYPE'),
"os_endpoint_type": environ.get('OS_ENDPOINT_TYPE'),
"os_cacert": environ.get('OS_CACERT'),
"insecure": config_true_value(environ.get('SWIFTCLIENT_INSECURE')),
"ssl_compression": False,
'segment_threads': 10,
'object_dd_threads': 10,
'object_uu_threads': 10,
'container_threads': 10
}
_default_global_options = _build_default_global_options()
_default_local_options = {
'sync_to': None,
'sync_key': None,
'use_slo': False,
'segment_size': None,
'segment_container': None,
'leave_segments': False,
'changed': None,
'skip_identical': False,
'yes_all': False,
'read_acl': None,
'write_acl': None,
'out_file': None,
'out_directory': None,
'remove_prefix': False,
'no_download': False,
'long': False,
'totals': False,
'marker': '',
'header': [],
'meta': [],
'prefix': None,
'delimiter': None,
'fail_fast': False,
'human': False,
'dir_marker': False,
'checksum': True,
'shuffle': False
}
POLICY = 'X-Storage-Policy'
def get_from_queue(q, timeout=864000):
while True:
try:
item = q.get(timeout=timeout)
return item
except QueueEmpty:
# Do nothing here, we only have a timeout to allow interruption
pass
def get_future_result(f, timeout=86400):
while True:
try:
res = f.result(timeout=timeout)
return res
except TimeoutError:
# Do nothing here, we only have a timeout to allow interruption
pass
def interruptable_as_completed(fs, timeout=86400):
while True:
try:
for f in as_completed(fs, timeout=timeout):
fs.remove(f)
yield f
return
except TimeoutError:
# Do nothing here, we only have a timeout to allow interruption
pass
def get_conn(options):
"""
Return a connection building it from the options.
"""
return Connection(options['auth'],
options['user'],
options['key'],
options['retries'],
auth_version=options['auth_version'],
os_options=options['os_options'],
snet=options['snet'],
cacert=options['os_cacert'],
insecure=options['insecure'],
ssl_compression=options['ssl_compression'])
def mkdirs(path):
try:
makedirs(path)
except OSError as err:
if err.errno != EEXIST:
raise
def split_headers(options, prefix=''):
"""
Splits 'Key: Value' strings and returns them as a dictionary.
:param options: An array of 'Key: Value' strings
:param prefix: String to prepend to all of the keys in the dictionary.
reporting.
"""
headers = {}
for item in options:
split_item = item.split(':', 1)
if len(split_item) == 2:
headers[(prefix + split_item[0]).title()] = split_item[1]
else:
raise SwiftError(
"Metadata parameter %s must contain a ':'.\n%s"
% (item, "Example: 'Color:Blue' or 'Size:Large'")
)
return headers
class SwiftUploadObject(object):
"""
Class for specifying an object upload, allowing the object source, name and
options to be specified separately for each individual object.
"""
def __init__(self, source, object_name=None, options=None):
if isinstance(source, string_types):
self.object_name = object_name or source
elif source is None or hasattr(source, 'read'):
if not object_name or not isinstance(object_name, string_types):
raise SwiftError('Object names must be specified as '
'strings for uploads from None or file '
'like objects.')
self.object_name = object_name
else:
raise SwiftError('Unexpected source type for '
'SwiftUploadObject: {0}'.format(type(source)))
if not self.object_name:
raise SwiftError('Object names must not be empty strings')
self.options = options
self.source = source
class SwiftPostObject(object):
"""
Class for specifying an object post, allowing the headers/metadata to be
specified separately for each individual object.
"""
def __init__(self, object_name, options=None):
if not isinstance(object_name, string_types) or not object_name:
raise SwiftError(
"Object names must be specified as non-empty strings"
)
else:
self.object_name = object_name
self.options = options
class _SwiftReader(object):
"""
Class for downloading objects from swift and raising appropriate
errors on failures caused by either invalid md5sum or size of the
data read.
"""
def __init__(self, path, body, headers):
self._path = path
self._body = body
self._actual_read = 0
self._content_length = None
self._actual_md5 = None
self._expected_etag = headers.get('etag')
if ('x-object-manifest' not in headers
and 'x-static-large-object' not in headers):
self._actual_md5 = md5()
if 'content-length' in headers:
try:
self._content_length = int(headers.get('content-length'))
except ValueError:
raise SwiftError('content-length header must be an integer')
def __iter__(self):
for chunk in self._body:
if self._actual_md5:
self._actual_md5.update(chunk)
self._actual_read += len(chunk)
yield chunk
self._check_contents()
def _check_contents(self):
if self._actual_md5 and self._expected_etag:
etag = self._actual_md5.hexdigest()
if etag != self._expected_etag:
raise SwiftError('Error downloading {0}: md5sum != etag, '
'{1} != {2}'.format(
self._path, etag, self._expected_etag))
if (self._content_length is not None
and self._actual_read != self._content_length):
raise SwiftError('Error downloading {0}: read_length != '
'content_length, {1:d} != {2:d}'.format(
self._path, self._actual_read,
self._content_length))
def bytes_read(self):
return self._actual_read
class SwiftService(object):
"""
Service for performing swift operations
"""
def __init__(self, options=None):
if options is not None:
self._options = dict(
_default_global_options,
**dict(_default_local_options, **options)
)
else:
self._options = dict(
_default_global_options,
**_default_local_options
)
process_options(self._options)
create_connection = lambda: get_conn(self._options)
self.thread_manager = MultiThreadingManager(
create_connection,
segment_threads=self._options['segment_threads'],
object_dd_threads=self._options['object_dd_threads'],
object_uu_threads=self._options['object_uu_threads'],
container_threads=self._options['container_threads']
)
def __enter__(self):
self.thread_manager.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.thread_manager.__exit__(exc_type, exc_val, exc_tb)
# Stat related methods
#
def stat(self, container=None, objects=None, options=None):
"""
Get account stats, container stats or information about a list of
objects in a container.
:param container: The container to query.
:param objects: A list of object paths about which to return
information (a list of strings).
:param options: A dictionary containing options to override the global
options specified during the service object creation.
These options are applied to all stat operations
performed by this call::
{
'human': False
}
:returns: Either a single dictionary containing stats about an account
or container, or an iterator for returning the results of the
stat operations on a list of objects.
:raises: SwiftError
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
if not container:
if objects:
raise SwiftError('Objects specified without container')
else:
res = {
'action': 'stat_account',
'success': True,
'container': container,
'object': None,
}
try:
stats_future = self.thread_manager.container_pool.submit(
stat_account, options
)
items, headers = get_future_result(stats_future)
res.update({
'items': items,
'headers': headers
})
return res
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
raise SwiftError('Account not found', exc=err)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
else:
if not objects:
res = {
'action': 'stat_container',
'container': container,
'object': None,
'success': True,
}
try:
stats_future = self.thread_manager.container_pool.submit(
stat_container, options, container
)
items, headers = get_future_result(stats_future)
res.update({
'items': items,
'headers': headers
})
return res
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
raise SwiftError('Container %r not found' % container,
container=container, exc=err)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
else:
stat_futures = []
for stat_o in objects:
stat_future = self.thread_manager.object_dd_pool.submit(
self._stat_object, container, stat_o, options
)
stat_futures.append(stat_future)
return ResultsIterator(stat_futures)
@staticmethod
def _stat_object(conn, container, obj, options):
res = {
'action': 'stat_object',
'object': obj,
'container': container,
'success': True,
}
try:
items, headers = stat_object(conn, options, container, obj)
res.update({
'items': items,
'headers': headers
})
return res
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
# Post related methods
#
def post(self, container=None, objects=None, options=None):
"""
Post operations on an account, container or list of objects
:param container: The container to make the post operation against.
:param objects: A list of object names (strings) or SwiftPostObject
instances containing an object name, and an
options dict (can be None) to override the options for
that individual post operation::
[
'object_name',
SwiftPostObject('object_name', options={...}),
...
]
The options dict is described below.
:param options: A dictionary containing options to override the global
options specified during the service object creation.
These options are applied to all post operations
performed by this call, unless overridden on a per
object basis. Possible options are given below::
{
'meta': [],
'headers': [],
'read_acl': None, # For containers only
'write_acl': None, # For containers only
'sync_to': None, # For containers only
'sync_key': None # For containers only
}
:returns: Either a single result dictionary in the case of a post to a
container/account, or an iterator for returning the results
of posts to a list of objects.
:raises: SwiftError
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
res = {
'success': True,
'container': container,
'object': None,
'headers': {},
}
if not container:
res["action"] = "post_account"
if objects:
raise SwiftError('Objects specified without container')
else:
response_dict = {}
headers = split_headers(
options['meta'], 'X-Account-Meta-')
headers.update(
split_headers(options['header'], ''))
res['headers'] = headers
try:
post = self.thread_manager.container_pool.submit(
self._post_account_job, headers, response_dict
)
get_future_result(post)
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': response_dict
})
return res
raise SwiftError('Account not found', exc=err)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'response_dict': response_dict,
'traceback': traceback,
'error_timestamp': err_time
})
return res
if not objects:
res["action"] = "post_container"
response_dict = {}
headers = split_headers(
options['meta'], 'X-Container-Meta-')
headers.update(
split_headers(options['header'], ''))
if options['read_acl'] is not None:
headers['X-Container-Read'] = options['read_acl']
if options['write_acl'] is not None:
headers['X-Container-Write'] = options['write_acl']
if options['sync_to'] is not None:
headers['X-Container-Sync-To'] = options['sync_to']
if options['sync_key'] is not None:
headers['X-Container-Sync-Key'] = options['sync_key']
res['headers'] = headers
try:
post = self.thread_manager.container_pool.submit(
self._post_container_job, container,
headers, response_dict
)
get_future_result(post)
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'action': 'post_container',
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': response_dict
})
return res
raise SwiftError(
"Container '%s' not found" % container,
container=container, exc=err
)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'action': 'post_container',
'success': False,
'error': err,
'response_dict': response_dict,
'traceback': traceback,
'error_timestamp': err_time
})
return res
else:
post_futures = []
post_objects = self._make_post_objects(objects)
for post_object in post_objects:
obj = post_object.object_name
obj_options = post_object.options
response_dict = {}
headers = split_headers(
options['meta'], 'X-Object-Meta-')
# add header options to the headers object for the request.
headers.update(
split_headers(options['header'], ''))
if obj_options is not None:
if 'meta' in obj_options:
headers.update(
split_headers(
obj_options['meta'], 'X-Object-Meta'
)
)
if 'headers' in obj_options:
headers.update(
split_headers(obj_options['header'], '')
)
post = self.thread_manager.object_uu_pool.submit(
self._post_object_job, container, obj,
headers, response_dict
)
post_futures.append(post)
return ResultsIterator(post_futures)
@staticmethod
def _make_post_objects(objects):
post_objects = []
for o in objects:
if isinstance(o, string_types):
obj = SwiftPostObject(o)
post_objects.append(obj)
elif isinstance(o, SwiftPostObject):
post_objects.append(o)
else:
raise SwiftError(
"The post operation takes only strings or "
"SwiftPostObjects as input",
obj=o)
return post_objects
@staticmethod
def _post_account_job(conn, headers, result):
return conn.post_account(headers=headers, response_dict=result)
@staticmethod
def _post_container_job(conn, container, headers, result):
try:
res = conn.post_container(
container, headers=headers, response_dict=result)
except ClientException as err:
if err.http_status != 404:
raise
_response_dict = {}
res = conn.put_container(
container, headers=headers, response_dict=_response_dict)
result['post_put'] = _response_dict
return res
@staticmethod
def _post_object_job(conn, container, obj, headers, result):
res = {
'success': True,
'action': 'post_object',
'container': container,
'object': obj,
'headers': headers,
'response_dict': result
}
try:
conn.post_object(
container, obj, headers=headers, response_dict=result)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
# List related methods
#
def list(self, container=None, options=None):
"""
List operations on an account, container.
:param container: The container to make the list operation against.
:param options: A dictionary containing options to override the global
options specified during the service object creation::
{
'long': False,
'prefix': None,
'delimiter': None,
}
:returns: A generator for returning the results of the list operation
on an account or container. Each result yielded from the
generator is either a 'list_account_part' or
'list_container_part', containing part of the listing.
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
rq = Queue(maxsize=10) # Just stop list running away consuming memory
if container is None:
listing_future = self.thread_manager.container_pool.submit(
self._list_account_job, options, rq
)
else:
listing_future = self.thread_manager.container_pool.submit(
self._list_container_job, container, options, rq
)
res = get_from_queue(rq)
while res is not None:
yield res
res = get_from_queue(rq)
# Make sure the future has completed
get_future_result(listing_future)
@staticmethod
def _list_account_job(conn, options, result_queue):
marker = ''
error = None
try:
while True:
_, items = conn.get_account(
marker=marker, prefix=options['prefix']
)
if not items:
result_queue.put(None)
return
if options['long']:
for i in items:
name = i['name']
i['meta'] = conn.head_container(name)
res = {
'action': 'list_account_part',
'container': None,
'prefix': options['prefix'],
'success': True,
'listing': items,
'marker': marker,
}
result_queue.put(res)
marker = items[-1].get('name', items[-1].get('subdir'))
except ClientException as err:
traceback, err_time = report_traceback()
logger.exception(err)
if err.http_status != 404:
error = (err, traceback, err_time)
else:
error = (
SwiftError('Account not found', exc=err),
traceback, err_time
)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
error = (err, traceback, err_time)
res = {
'action': 'list_account_part',
'container': None,
'prefix': options['prefix'],
'success': False,
'marker': marker,
'error': error[0],
'traceback': error[1],
'error_timestamp': error[2]
}
result_queue.put(res)
result_queue.put(None)
@staticmethod
def _list_container_job(conn, container, options, result_queue):
marker = ''
error = None
try:
while True:
_, items = conn.get_container(
container, marker=marker, prefix=options['prefix'],
delimiter=options['delimiter']
)
if not items:
result_queue.put(None)
return
res = {
'action': 'list_container_part',
'container': container,
'prefix': options['prefix'],
'success': True,
'marker': marker,
'listing': items,
}
result_queue.put(res)
marker = items[-1].get('name', items[-1].get('subdir'))
except ClientException as err:
traceback, err_time = report_traceback()
logger.exception(err)
if err.http_status != 404:
error = (err, traceback, err_time)
else:
error = (
SwiftError(
'Container %r not found' % container,
container=container, exc=err
),
traceback,
err_time
)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
error = (err, traceback, err_time)
res = {
'action': 'list_container_part',
'container': container,
'prefix': options['prefix'],
'success': False,
'marker': marker,
'error': error[0],
'traceback': error[1],
'error_timestamp': error[2]
}
result_queue.put(res)
result_queue.put(None)
# Download related methods
#
def download(self, container=None, objects=None, options=None):
"""
Download operations on an account, optional container and optional list
of objects.
:param container: The container to download from.
:param objects: A list of object names to download (a list of strings).
:param options: A dictionary containing options to override the global
options specified during the service object creation::
{
'yes_all': False,
'marker': '',
'prefix': None,
'no_download': False,
'header': [],
'skip_identical': False,
'out_directory': None,
'out_file': None,
'remove_prefix': False,
'shuffle' : False
}
:returns: A generator for returning the results of the download
operations. Each result yielded from the generator is a
'download_object' dictionary containing the results of an
individual file download.
:raises: ClientException
:raises: SwiftError
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
if not container:
# Download everything if options['yes_all'] is set
if options['yes_all']:
try:
options_copy = deepcopy(options)
options_copy["long"] = False
for part in self.list(options=options_copy):
if part["success"]:
containers = [i['name'] for i in part["listing"]]
if options['shuffle']:
shuffle(containers)
for con in containers:
for res in self._download_container(
con, options_copy):
yield res
else:
raise part["error"]
# If we see a 404 here, the listing of the account failed
except ClientException as err:
if err.http_status != 404:
raise
raise SwiftError('Account not found', exc=err)
elif not objects:
if '/' in container:
raise SwiftError('\'/\' in container name',
container=container)
for res in self._download_container(container, options):
yield res
else:
if '/' in container:
raise SwiftError('\'/\' in container name',
container=container)
if options['out_file'] and len(objects) > 1:
options['out_file'] = None
o_downs = [
self.thread_manager.object_dd_pool.submit(
self._download_object_job, container, obj, options
) for obj in objects
]
for o_down in interruptable_as_completed(o_downs):
yield o_down.result()
def _download_object_job(self, conn, container, obj, options):
out_file = options['out_file']
results_dict = {}
req_headers = split_headers(options['header'], '')
pseudodir = False
path = join(container, obj) if options['yes_all'] else obj
path = path.lstrip(os_path_sep)
options['skip_identical'] = (options['skip_identical'] and
out_file != '-')
if options['prefix'] and options['remove_prefix']:
path = path[len(options['prefix']):].lstrip('/')
if options['out_directory']:
path = os.path.join(options['out_directory'], path)
if options['skip_identical']:
filename = out_file if out_file else path
try:
fp = open(filename, 'rb')
except IOError:
pass
else:
with fp:
md5sum = md5()
while True:
data = fp.read(65536)
if not data:
break
md5sum.update(data)
req_headers['If-None-Match'] = md5sum.hexdigest()
try:
start_time = time()
get_args = {'resp_chunk_size': 65536,
'headers': req_headers,
'response_dict': results_dict}
if options['skip_identical']:
# Assume the file is a large object; if we're wrong, the query
# string is ignored and the If-None-Match header will trigger
# the behavior we want
get_args['query_string'] = 'multipart-manifest=get'
try:
headers, body = conn.get_object(container, obj, **get_args)
except ClientException as e:
if not options['skip_identical']:
raise
if e.http_status != 304: # Only handling Not Modified
raise
headers = results_dict['headers']
if 'x-object-manifest' in headers:
# DLO: most likely it has more than one page worth of
# segments and we have an empty file locally
body = []
elif config_true_value(headers.get('x-static-large-object')):
# SLO: apparently we have a copy of the manifest locally?
# provide no chunking data to force a fresh download
body = [b'[]']
else:
# Normal object: let it bubble up
raise
if options['skip_identical']:
if config_true_value(headers.get('x-static-large-object')) or \
'x-object-manifest' in headers:
# The request was chunked, so stitch it back together
chunk_data = self._get_chunk_data(conn, container, obj,
headers, b''.join(body))
else:
chunk_data = None
if chunk_data is not None:
if self._is_identical(chunk_data, filename):
raise ClientException('Large object is identical',
http_status=304)
# Large objects are different; start the real download
del get_args['query_string']
get_args['response_dict'].clear()
headers, body = conn.get_object(container, obj, **get_args)
headers_receipt = time()
obj_body = _SwiftReader(path, body, headers)
no_file = options['no_download']
if out_file == "-" and not no_file:
res = {
'action': 'download_object',
'container': container,
'object': obj,
'path': path,
'pseudodir': pseudodir,
'contents': obj_body
}
return res
fp = None
try:
content_type = headers.get('content-type')
if (content_type and
content_type.split(';', 1)[0] == 'text/directory'):
make_dir = not no_file and out_file != "-"
if make_dir and not isdir(path):
mkdirs(path)
else:
make_dir = not (no_file or out_file)
if make_dir:
dirpath = dirname(path)
if dirpath and not isdir(dirpath):
mkdirs(dirpath)
if not no_file:
if out_file:
fp = open(out_file, 'wb')
else:
if basename(path):
fp = open(path, 'wb')
else:
pseudodir = True
for chunk in obj_body:
if fp is not None:
fp.write(chunk)
finish_time = time()
finally:
bytes_read = obj_body.bytes_read()
if fp is not None:
fp.close()
if 'x-object-meta-mtime' in headers and not no_file:
mtime = float(headers['x-object-meta-mtime'])
if options['out_file']:
utime(options['out_file'], (mtime, mtime))
else:
utime(path, (mtime, mtime))
res = {
'action': 'download_object',
'success': True,
'container': container,
'object': obj,
'path': path,
'pseudodir': pseudodir,
'start_time': start_time,
'finish_time': finish_time,
'headers_receipt': headers_receipt,
'auth_end_time': conn.auth_end_time,
'read_length': bytes_read,
'attempts': conn.attempts,
'response_dict': results_dict
}
return res
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res = {
'action': 'download_object',
'container': container,
'object': obj,
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': results_dict,
'path': path,
'pseudodir': pseudodir,
'attempts': conn.attempts
}
return res
def _submit_page_downloads(self, container, page_generator, options):
try:
list_page = next(page_generator)
except StopIteration:
return None
if list_page["success"]:
objects = [o["name"] for o in list_page["listing"]]
if options["shuffle"]:
shuffle(objects)
o_downs = [
self.thread_manager.object_dd_pool.submit(
self._download_object_job, container, obj, options
) for obj in objects
]
return o_downs
else:
raise list_page["error"]
def _download_container(self, container, options):
_page_generator = self.list(container=container, options=options)
try:
next_page_downs = self._submit_page_downloads(
container, _page_generator, options
)
except ClientException as err:
if err.http_status != 404:
raise
raise SwiftError(
'Container %r not found' % container,
container=container, exc=err
)
error = None
while next_page_downs:
page_downs = next_page_downs
next_page_downs = None
# Start downloading the next page of list results when
# we have completed 80% of the previous page
next_page_triggered = False
next_page_trigger_point = 0.8 * len(page_downs)
page_results_yielded = 0
for o_down in interruptable_as_completed(page_downs):
yield o_down.result()
# Do we need to start the next set of downloads yet?
if not next_page_triggered:
page_results_yielded += 1
if page_results_yielded >= next_page_trigger_point:
try:
next_page_downs = self._submit_page_downloads(
container, _page_generator, options
)
except ClientException as err:
# Allow the current page to finish downloading
logger.exception(err)
error = err
except Exception:
# Something unexpected went wrong - cancel
# remaining downloads
for _d in page_downs:
_d.cancel()
raise
finally:
# Stop counting and testing
next_page_triggered = True
if error:
raise error
# Upload related methods
#
def upload(self, container, objects, options=None):
"""
Upload a list of objects to a given container.
:param container: The container to put the uploads into.
:param objects: A list of file/directory names (strings) or
SwiftUploadObject instances containing a source for the
created object, an object name, and an options dict
(can be None) to override the options for that
individual upload operation::
[
'/path/to/file',
SwiftUploadObject('/path', object_name='obj1'),
...
]
The options dict is as described below.
The SwiftUploadObject source may be one of:
file - A file like object (with a read method)
path - A string containing the path to a local file
or directory
None - Indicates that we want an empty object
:param options: A dictionary containing options to override the global
options specified during the service object creation.
These options are applied to all upload operations
performed by this call, unless overridden on a per
object basis. Possible options are given below::
{
'meta': [],
'header': [],
'segment_size': None,
'use_slo': False,
'segment_container': None,
'leave_segments': False,
'changed': None,
'skip_identical': False,
'fail_fast': False,
'dir_marker': False # Only for None sources
}
:returns: A generator for returning the results of the uploads.
:raises: SwiftError
:raises: ClientException
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
try:
segment_size = int(0 if options['segment_size'] is None else
options['segment_size'])
except ValueError:
raise SwiftError('Segment size should be an integer value')
# Try to create the container, just in case it doesn't exist. If this
# fails, it might just be because the user doesn't have container PUT
# permissions, so we'll ignore any error. If there's really a problem,
# it'll surface on the first object PUT.
policy_header = {}
_header = split_headers(options["header"])
if POLICY in _header:
policy_header[POLICY] = \
_header[POLICY]
create_containers = [
self.thread_manager.container_pool.submit(
self._create_container_job, container, headers=policy_header
)
]
# wait for first container job to complete before possibly attempting
# segment container job because segment container job may attempt
# to HEAD the first container
for r in interruptable_as_completed(create_containers):
res = r.result()
yield res
if segment_size:
seg_container = container + '_segments'
if options['segment_container']:
seg_container = options['segment_container']
if seg_container != container:
if not policy_header:
# Since no storage policy was specified on the command
# line, rather than just letting swift pick the default
# storage policy, we'll try to create the segments
# container with the same policy as the upload container
create_containers = [
self.thread_manager.container_pool.submit(
self._create_container_job, seg_container,
policy_source=container
)
]
else:
create_containers = [
self.thread_manager.container_pool.submit(
self._create_container_job, seg_container,
headers=policy_header
)
]
for r in interruptable_as_completed(create_containers):
res = r.result()
yield res
# We maintain a results queue here and a separate thread to monitor
# the futures because we want to get results back from potential
# segment uploads too
rq = Queue()
file_jobs = {}
upload_objects = self._make_upload_objects(objects)
for upload_object in upload_objects:
s = upload_object.source
o = upload_object.object_name
o_opts = upload_object.options
details = {'action': 'upload', 'container': container}
if o_opts is not None:
object_options = deepcopy(options)
object_options.update(o_opts)
else:
object_options = options
if hasattr(s, 'read'):
# We've got a file like object to upload to o
file_future = self.thread_manager.object_uu_pool.submit(
self._upload_object_job, container, s, o, object_options
)
details['file'] = s
details['object'] = o
file_jobs[file_future] = details
elif s is not None:
# We've got a path to upload to o
details['path'] = s
details['object'] = o
if isdir(s):
dir_future = self.thread_manager.object_uu_pool.submit(
self._create_dir_marker_job, container, o,
object_options, path=s
)
file_jobs[dir_future] = details
else:
try:
stat(s)
file_future = \
self.thread_manager.object_uu_pool.submit(
self._upload_object_job, container, s, o,
object_options, results_queue=rq
)
file_jobs[file_future] = details
except OSError as err:
# Avoid tying up threads with jobs that will fail
traceback, err_time = report_traceback()
logger.exception(err)
res = {
'action': 'upload_object',
'container': container,
'object': o,
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'path': s
}
rq.put(res)
else:
# Create an empty object (as a dir marker if is_dir)
details['file'] = None
details['object'] = o
if object_options['dir_marker']:
dir_future = self.thread_manager.object_uu_pool.submit(
self._create_dir_marker_job, container, o,
object_options
)
file_jobs[dir_future] = details
else:
file_future = self.thread_manager.object_uu_pool.submit(
self._upload_object_job, container, StringIO(),
o, object_options
)
file_jobs[file_future] = details
# Start a thread to watch for upload results
Thread(
target=self._watch_futures, args=(file_jobs, rq)
).start()
# yield results as they become available, including those from
# segment uploads.
res = get_from_queue(rq)
cancelled = False
while res is not None:
yield res
if not res['success']:
if not cancelled and options['fail_fast']:
cancelled = True
for f in file_jobs:
f.cancel()
res = get_from_queue(rq)
@staticmethod
def _make_upload_objects(objects):
upload_objects = []
for o in objects:
if isinstance(o, string_types):
obj = SwiftUploadObject(o)
upload_objects.append(obj)
elif isinstance(o, SwiftUploadObject):
upload_objects.append(o)
else:
raise SwiftError(
"The upload operation takes only strings or "
"SwiftUploadObjects as input",
obj=o)
return upload_objects
@staticmethod
def _create_container_job(
conn, container, headers=None, policy_source=None):
"""
Create a container using the given connection
:param conn: The swift connection used for requests.
:param container: The container name to create.
:param headers: An optional dict of headers for the
put_container request.
:param policy_source: An optional name of a container whose policy we
should duplicate.
:return: A dict containing the results of the operation.
"""
res = {
'action': 'create_container',
'container': container,
'headers': headers
}
create_response = {}
try:
if policy_source is not None:
_meta = conn.head_container(policy_source)
if 'x-storage-policy' in _meta:
policy_header = {
POLICY: _meta.get('x-storage-policy')
}
if headers is None:
headers = policy_header
else:
headers.update(policy_header)
conn.put_container(
container, headers, response_dict=create_response
)
res.update({
'success': True,
'response_dict': create_response
})
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': create_response
})
return res
@staticmethod
def _create_dir_marker_job(conn, container, obj, options, path=None):
res = {
'action': 'create_dir_marker',
'container': container,
'object': obj,
'path': path
}
results_dict = {}
if obj.startswith('./') or obj.startswith('.\\'):
obj = obj[2:]
if obj.startswith('/'):
obj = obj[1:]
if path is not None:
put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
else:
put_headers = {'x-object-meta-mtime': "%f" % round(time())}
res['headers'] = put_headers
if options['changed']:
try:
headers = conn.head_object(container, obj)
ct = headers.get('content-type')
cl = int(headers.get('content-length'))
et = headers.get('etag')
mt = headers.get('x-object-meta-mtime')
if (ct.split(';', 1)[0] == 'text/directory' and
cl == 0 and
et == EMPTY_ETAG and
mt == put_headers['x-object-meta-mtime']):
res['success'] = True
return res
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
try:
conn.put_object(container, obj, '', content_length=0,
content_type='text/directory',
headers=put_headers,
response_dict=results_dict)
res.update({
'success': True,
'response_dict': results_dict})
return res
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': results_dict})
return res
@staticmethod
def _upload_segment_job(conn, path, container, segment_name, segment_start,
segment_size, segment_index, obj_name, options,
results_queue=None):
results_dict = {}
if options['segment_container']:
segment_container = options['segment_container']
else:
segment_container = container + '_segments'
res = {
'action': 'upload_segment',
'for_object': obj_name,
'segment_index': segment_index,
'segment_size': segment_size,
'segment_location': '/%s/%s' % (segment_container,
segment_name),
'log_line': '%s segment %s' % (obj_name, segment_index),
}
try:
fp = open(path, 'rb')
fp.seek(segment_start)
contents = LengthWrapper(fp, segment_size, md5=options['checksum'])
etag = conn.put_object(segment_container,
segment_name, contents,
content_length=segment_size,
response_dict=results_dict)
if options['checksum'] and etag and etag != contents.get_md5sum():
raise SwiftError('Segment {0}: upload verification failed: '
'md5 mismatch, local {1} != remote {2} '
'(remote segment has not been removed)'
.format(segment_index,
contents.get_md5sum(),
etag))
res.update({
'success': True,
'response_dict': results_dict,
'segment_etag': etag,
'attempts': conn.attempts
})
if results_queue is not None:
results_queue.put(res)
return res
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time,
'response_dict': results_dict,
'attempts': conn.attempts
})
if results_queue is not None:
results_queue.put(res)
return res
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
if 'x-object-manifest' in headers:
scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
for part in self.list(scontainer, {'prefix': sprefix}):
if part["success"]:
chunks.extend(part["listing"])
else:
raise part["error"]
elif config_true_value(headers.get('x-static-large-object')):
if manifest is None:
headers, manifest = conn.get_object(
container, obj, query_string='multipart-manifest=get')
manifest = parse_api_response(headers, manifest)
for chunk in manifest:
if chunk.get('sub_slo'):
scont, sobj = chunk['name'].lstrip('/').split('/', 1)
chunks.extend(self._get_chunk_data(
conn, scont, sobj, {'x-static-large-object': True}))
else:
chunks.append(chunk)
else:
chunks.append({'hash': headers.get('etag').strip('"'),
'bytes': int(headers.get('content-length'))})
return chunks
def _is_identical(self, chunk_data, path):
try:
fp = open(path, 'rb')
except IOError:
return False
with fp:
for chunk in chunk_data:
to_read = chunk['bytes']
md5sum = md5()
while to_read:
data = fp.read(min(65536, to_read))
if not data:
return False
md5sum.update(data)
to_read -= len(data)
if md5sum.hexdigest() != chunk['hash']:
return False
# Each chunk is verified; check that we're at the end of the file
return not fp.read(1)
def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None):
if obj.startswith('./') or obj.startswith('.\\'):
obj = obj[2:]
if obj.startswith('/'):
obj = obj[1:]
res = {
'action': 'upload_object',
'container': container,
'object': obj
}
if hasattr(source, 'read'):
stream = source
path = None
else:
path = source
res['path'] = path
try:
if path is not None:
put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
else:
put_headers = {'x-object-meta-mtime': "%f" % round(time())}
res['headers'] = put_headers
# We need to HEAD all objects now in case we're overwriting a
# manifest object and need to delete the old segments
# ourselves.
old_manifest = None
old_slo_manifest_paths = []
new_slo_manifest_paths = set()
segment_size = int(0 if options['segment_size'] is None
else options['segment_size'])
if (options['changed'] or options['skip_identical']
or not options['leave_segments']):
try:
headers = conn.head_object(container, obj)
is_slo = config_true_value(
headers.get('x-static-large-object'))
if options['skip_identical'] or (
is_slo and not options['leave_segments']):
chunk_data = self._get_chunk_data(
conn, container, obj, headers)
if options['skip_identical'] and self._is_identical(
chunk_data, path):
res.update({
'success': True,
'status': 'skipped-identical'
})
return res
cl = int(headers.get('content-length'))
mt = headers.get('x-object-meta-mtime')
if (path is not None and options['changed']
and cl == getsize(path)
and mt == put_headers['x-object-meta-mtime']):
res.update({
'success': True,
'status': 'skipped-changed'
})
return res
if not options['leave_segments']:
old_manifest = headers.get('x-object-manifest')
if is_slo:
for old_seg in chunk_data:
seg_path = old_seg['name'].lstrip('/')
if isinstance(seg_path, text_type):
seg_path = seg_path.encode('utf-8')
old_slo_manifest_paths.append(seg_path)
except ClientException as err:
if err.http_status != 404:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
# Merge the command line header options to the put_headers
put_headers.update(split_headers(options['header'], ''))
# Don't do segment job if object is not big enough, and never do
# a segment job if we're reading from a stream - we may fail if we
# go over the single object limit, but this gives us a nice way
# to create objects from memory
if (path is not None and segment_size
and (getsize(path) > segment_size)):
res['large_object'] = True
seg_container = container + '_segments'
if options['segment_container']:
seg_container = options['segment_container']
full_size = getsize(path)
segment_futures = []
segment_pool = self.thread_manager.segment_pool
segment = 0
segment_start = 0
while segment_start < full_size:
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
if options['use_slo']:
segment_name = '%s/slo/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options['segment_size'], segment
)
else:
segment_name = '%s/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options['segment_size'], segment
)
seg = segment_pool.submit(
self._upload_segment_job, path, container,
segment_name, segment_start, segment_size, segment,
obj, options, results_queue=results_queue
)
segment_futures.append(seg)
segment += 1
segment_start += segment_size
segment_results = []
errors = False
exceptions = []
for f in interruptable_as_completed(segment_futures):
try:
r = f.result()
if not r['success']:
errors = True
segment_results.append(r)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
errors = True
exceptions.append((err, traceback, err_time))
if errors:
err = ClientException(
'Aborting manifest creation '
'because not all segments could be uploaded. %s/%s'
% (container, obj))
res.update({
'success': False,
'error': err,
'exceptions': exceptions,
'segment_results': segment_results
})
return res
res['segment_results'] = segment_results
if options['use_slo']:
segment_results.sort(key=lambda di: di['segment_index'])
for seg in segment_results:
seg_loc = seg['segment_location'].lstrip('/')
if isinstance(seg_loc, text_type):
seg_loc = seg_loc.encode('utf-8')
new_slo_manifest_paths.add(seg_loc)
manifest_data = json.dumps([
{
'path': d['segment_location'],
'etag': d['segment_etag'],
'size_bytes': d['segment_size']
} for d in segment_results
])
put_headers['x-static-large-object'] = 'true'
mr = {}
conn.put_object(
container, obj, manifest_data,
headers=put_headers,
query_string='multipart-manifest=put',
response_dict=mr
)
res['manifest_response_dict'] = mr
else:
new_object_manifest = '%s/%s/%s/%s/%s/' % (
quote(seg_container), quote(obj),
put_headers['x-object-meta-mtime'], full_size,
options['segment_size'])
if old_manifest and old_manifest.rstrip('/') == \
new_object_manifest.rstrip('/'):
old_manifest = None
put_headers['x-object-manifest'] = new_object_manifest
mr = {}
conn.put_object(
container, obj, '', content_length=0,
headers=put_headers,
response_dict=mr
)
res['manifest_response_dict'] = mr
else:
res['large_object'] = False
obr = {}
if path is not None:
content_length = getsize(path)
contents = LengthWrapper(open(path, 'rb'),
content_length,
md5=options['checksum'])
else:
content_length = None
contents = ReadableToIterable(stream,
md5=options['checksum'])
etag = conn.put_object(
container, obj, contents,
content_length=content_length, headers=put_headers,
response_dict=obr
)
res['response_dict'] = obr
if (options['checksum'] and
etag and etag != contents.get_md5sum()):
raise SwiftError('Object upload verification failed: '
'md5 mismatch, local {0} != remote {1} '
'(remote object has not been removed)'
.format(contents.get_md5sum(), etag))
if old_manifest or old_slo_manifest_paths:
drs = []
delobjsmap = {}
if old_manifest:
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
sprefix = unquote(sprefix).rstrip('/') + '/'
delobjsmap[scontainer] = []
for part in self.list(scontainer, {'prefix': sprefix}):
if not part["success"]:
raise part["error"]
delobjsmap[scontainer].extend(
seg['name'] for seg in part['listing'])
if old_slo_manifest_paths:
for seg_to_delete in old_slo_manifest_paths:
if seg_to_delete in new_slo_manifest_paths:
continue
scont, sobj = \
seg_to_delete.split(b'/', 1)
delobjs_cont = delobjsmap.get(scont, [])
delobjs_cont.append(sobj)
delobjsmap[scont] = delobjs_cont
del_segs = []
for dscont, dsobjs in delobjsmap.items():
for dsobj in dsobjs:
del_seg = self.thread_manager.segment_pool.submit(
self._delete_segment, dscont, dsobj,
results_queue=results_queue
)
del_segs.append(del_seg)
for del_seg in interruptable_as_completed(del_segs):
drs.append(del_seg.result())
res['segment_delete_results'] = drs
# return dict for printing
res.update({
'success': True,
'status': 'uploaded',
'attempts': conn.attempts})
return res
except OSError as err:
traceback, err_time = report_traceback()
logger.exception(err)
if err.errno == ENOENT:
error = SwiftError('Local file %r not found' % path, exc=err)
else:
error = err
res.update({
'success': False,
'error': error,
'traceback': traceback,
'error_timestamp': err_time
})
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
# Delete related methods
#
def delete(self, container=None, objects=None, options=None):
"""
Delete operations on an account, optional container and optional list
of objects.
:param container: The container to delete or delete from.
:param objects: The list of objects to delete.
:param options: A dictionary containing options to override the global
options specified during the service object creation::
{
'yes_all': False,
'leave_segments': False,
}
:returns: A generator for returning the results of the delete
operations. Each result yielded from the generator is either
a 'delete_container', 'delete_object' or 'delete_segment'
dictionary containing the results of an individual delete
operation.
:raises: ClientException
:raises: SwiftError
"""
if options is not None:
options = dict(self._options, **options)
else:
options = self._options
rq = Queue()
if container is not None:
if objects is not None:
obj_dels = {}
for obj in objects:
obj_del = self.thread_manager.object_dd_pool.submit(
self._delete_object, container, obj, options,
results_queue=rq
)
obj_details = {'container': container, 'object': obj}
obj_dels[obj_del] = obj_details
# Start a thread to watch for upload results
Thread(
target=self._watch_futures, args=(obj_dels, rq)
).start()
# yield results as they become available, raising the first
# encountered exception
res = get_from_queue(rq)
while res is not None:
yield res
# Cancel the remaining jobs if necessary
if options['fail_fast'] and not res['success']:
for d in obj_dels.keys():
d.cancel()
res = get_from_queue(rq)
else:
for res in self._delete_container(container, options):
yield res
else:
if objects:
raise SwiftError('Objects specified without container')
if options['yes_all']:
cancelled = False
containers = []
for part in self.list():
if part["success"]:
containers.extend(c['name'] for c in part['listing'])
else:
raise part["error"]
for con in containers:
if cancelled:
break
else:
for res in self._delete_container(
con, options=options):
yield res
# Cancel the remaining container deletes, but yield
# any pending results
if (not cancelled and options['fail_fast']
and not res['success']):
cancelled = True
@staticmethod
def _delete_segment(conn, container, obj, results_queue=None):
results_dict = {}
try:
conn.delete_object(container, obj, response_dict=results_dict)
res = {'success': True}
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res = {
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
}
res.update({
'action': 'delete_segment',
'container': container,
'object': obj,
'attempts': conn.attempts,
'response_dict': results_dict
})
if results_queue is not None:
results_queue.put(res)
return res
def _delete_object(self, conn, container, obj, options,
results_queue=None):
res = {
'action': 'delete_object',
'container': container,
'object': obj
}
try:
old_manifest = None
query_string = None
if not options['leave_segments']:
try:
headers = conn.head_object(container, obj)
old_manifest = headers.get('x-object-manifest')
if config_true_value(headers.get('x-static-large-object')):
query_string = 'multipart-manifest=delete'
except ClientException as err:
if err.http_status != 404:
raise
results_dict = {}
conn.delete_object(container, obj, query_string=query_string,
response_dict=results_dict)
if old_manifest:
dlo_segments_deleted = True
segment_pool = self.thread_manager.segment_pool
s_container, s_prefix = old_manifest.split('/', 1)
s_container = unquote(s_container)
s_prefix = unquote(s_prefix).rstrip('/') + '/'
del_segs = []
for part in self.list(
container=s_container, options={'prefix': s_prefix}):
if part["success"]:
seg_list = [o["name"] for o in part["listing"]]
else:
raise part["error"]
for seg in seg_list:
del_seg = segment_pool.submit(
self._delete_segment, s_container,
seg, results_queue=results_queue
)
del_segs.append(del_seg)
for del_seg in interruptable_as_completed(del_segs):
del_res = del_seg.result()
if not del_res["success"]:
dlo_segments_deleted = False
res['dlo_segments_deleted'] = dlo_segments_deleted
res.update({
'success': True,
'response_dict': results_dict,
'attempts': conn.attempts,
})
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
return res
return res
@staticmethod
def _delete_empty_container(conn, container):
results_dict = {}
try:
conn.delete_container(container, response_dict=results_dict)
res = {'success': True}
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
res = {
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
}
res.update({
'action': 'delete_container',
'container': container,
'object': None,
'attempts': conn.attempts,
'response_dict': results_dict
})
return res
def _delete_container(self, container, options):
try:
for part in self.list(container=container):
if part["success"]:
objs = [o['name'] for o in part['listing']]
o_dels = self.delete(
container=container, objects=objs, options=options
)
for res in o_dels:
yield res
else:
raise part["error"]
con_del = self.thread_manager.container_pool.submit(
self._delete_empty_container, container
)
con_del_res = get_future_result(con_del)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
con_del_res = {
'action': 'delete_container',
'container': container,
'object': None,
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
}
yield con_del_res
# Capabilities related methods
#
def capabilities(self, url=None):
"""
List the cluster capabilities.
:param url: Proxy URL of the cluster to retrieve capabilities.
:returns: A dictionary containing the capabilities of the cluster.
:raises: ClientException
:raises: SwiftError
"""
res = {
'action': 'capabilities'
}
try:
cap = self.thread_manager.container_pool.submit(
self._get_capabilities, url
)
capabilities = get_future_result(cap)
res.update({
'success': True,
'capabilities': capabilities
})
if url is not None:
res.update({
'url': url
})
except ClientException as err:
if err.http_status != 404:
raise err
raise SwiftError('Account not found', exc=err)
return res
@staticmethod
def _get_capabilities(conn, url):
return conn.get_capabilities(url)
# Helper methods
#
@staticmethod
def _watch_futures(futures, result_queue):
"""
Watches a dict of futures and pushes their results onto the given
queue. We use this to wait for a set of futures which may create
futures of their own to wait for, whilst also allowing us to
immediately return the results of those sub-jobs.
When all futures have completed, None is pushed to the queue
If the future is cancelled, we use the dict to return details about
the cancellation.
"""
futures_only = list(futures.keys())
for f in interruptable_as_completed(futures_only):
try:
r = f.result()
if r is not None:
result_queue.put(r)
except CancelledError:
details = futures[f]
res = details
res['status'] = 'cancelled'
result_queue.put(res)
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
details = futures[f]
res = details
res.update({
'success': False,
'error': err,
'traceback': traceback,
'error_timestamp': err_time
})
result_queue.put(res)
result_queue.put(None)