# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
import logging

import os

from concurrent.futures import as_completed, CancelledError, TimeoutError
from copy import deepcopy
from errno import EEXIST, ENOENT
from hashlib import md5
from os import environ, makedirs, stat, utime
from os.path import (
    basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
)
from posixpath import join as urljoin
from random import shuffle
from time import time
from threading import Thread
from six import Iterator, StringIO, string_types, text_type
from six.moves.queue import Queue
from six.moves.queue import Empty as QueueEmpty
from six.moves.urllib.parse import quote

import json


from swiftclient import Connection
from swiftclient.command_helpers import (
    stat_account, stat_container, stat_object
)
from swiftclient.utils import (
    config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
    parse_api_response, report_traceback, n_groups, split_request_headers,
    n_at_a_time
)
from swiftclient.exceptions import ClientException
from swiftclient.multithreading import MultiThreadingManager


DISK_BUFFER = 2 ** 16
logger = logging.getLogger("swiftclient.service")


class ResultsIterator(Iterator):
    def __init__(self, futures):
        self.futures = interruptable_as_completed(futures)

    def __iter__(self):
        return self

    def __next__(self):
        next_completed_future = next(self.futures)
        return next_completed_future.result()


class SwiftError(Exception):
    def __init__(self, value, container=None, obj=None,
                 segment=None, exc=None):
        self.value = value
        self.container = container
        self.obj = obj
        self.segment = segment
        self.exception = exc

    def __str__(self):
        value = repr(self.value)
        if self.container is not None:
            value += " container:%s" % self.container
        if self.obj is not None:
            value += " object:%s" % self.obj
        if self.segment is not None:
            value += " segment:%s" % self.segment
        return value


def process_options(options):
    # tolerate sloppy auth_version
    if options.get('auth_version') == '3.0':
        options['auth_version'] = '3'
    elif options.get('auth_version') == '2':
        options['auth_version'] = '2.0'

    if options.get('auth_version') not in ('2.0', '3') and not all(
            options.get(key) for key in ('auth', 'user', 'key')):
        # Use keystone auth if any of the new-style args are present
        if any(options.get(k) for k in (
                'os_user_domain_id',
                'os_user_domain_name',
                'os_project_domain_id',
                'os_project_domain_name')):
            # Use v3 if there's any reference to domains
            options['auth_version'] = '3'
        else:
            options['auth_version'] = '2.0'

    # Use new-style args if old ones not present
    if not options['auth'] and options['os_auth_url']:
        options['auth'] = options['os_auth_url']
    if not options['user'] and options['os_username']:
        options['user'] = options['os_username']
    if not options['key'] and options['os_password']:
        options['key'] = options['os_password']

    # Specific OpenStack options
    options['os_options'] = {
        'user_id': options['os_user_id'],
        'user_domain_id': options['os_user_domain_id'],
        'user_domain_name': options['os_user_domain_name'],
        'tenant_id': options['os_tenant_id'],
        'tenant_name': options['os_tenant_name'],
        'project_id': options['os_project_id'],
        'project_name': options['os_project_name'],
        'project_domain_id': options['os_project_domain_id'],
        'project_domain_name': options['os_project_domain_name'],
        'service_type': options['os_service_type'],
        'endpoint_type': options['os_endpoint_type'],
        'auth_token': options['os_auth_token'],
        'object_storage_url': options['os_storage_url'],
        'region_name': options['os_region_name'],
    }


def _build_default_global_options():
    return {
        "snet": False,
        "verbose": 1,
        "debug": False,
        "info": False,
        "auth": environ.get('ST_AUTH'),
        "auth_version": environ.get('ST_AUTH_VERSION', '1.0'),
        "user": environ.get('ST_USER'),
        "key": environ.get('ST_KEY'),
        "retries": 5,
        "force_auth_retry": False,
        "os_username": environ.get('OS_USERNAME'),
        "os_user_id": environ.get('OS_USER_ID'),
        "os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
        "os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
        "os_password": environ.get('OS_PASSWORD'),
        "os_tenant_id": environ.get('OS_TENANT_ID'),
        "os_tenant_name": environ.get('OS_TENANT_NAME'),
        "os_project_name": environ.get('OS_PROJECT_NAME'),
        "os_project_id": environ.get('OS_PROJECT_ID'),
        "os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
        "os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
        "os_auth_url": environ.get('OS_AUTH_URL'),
        "os_auth_token": environ.get('OS_AUTH_TOKEN'),
        "os_storage_url": environ.get('OS_STORAGE_URL'),
        "os_region_name": environ.get('OS_REGION_NAME'),
        "os_service_type": environ.get('OS_SERVICE_TYPE'),
        "os_endpoint_type": environ.get('OS_ENDPOINT_TYPE'),
        "os_cacert": environ.get('OS_CACERT'),
        "os_cert": environ.get('OS_CERT'),
        "os_key": environ.get('OS_KEY'),
        "insecure": config_true_value(environ.get('SWIFTCLIENT_INSECURE')),
        "ssl_compression": False,
        'segment_threads': 10,
        'object_dd_threads': 10,
        'object_uu_threads': 10,
        'container_threads': 10
    }

_default_global_options = _build_default_global_options()

_default_local_options = {
    'sync_to': None,
    'sync_key': None,
    'use_slo': False,
    'segment_size': None,
    'segment_container': None,
    'leave_segments': False,
    'changed': None,
    'skip_identical': False,
    'yes_all': False,
    'read_acl': None,
    'write_acl': None,
    'out_file': None,
    'out_directory': None,
    'remove_prefix': False,
    'no_download': False,
    'long': False,
    'totals': False,
    'marker': '',
    'header': [],
    'meta': [],
    'prefix': None,
    'delimiter': None,
    'fail_fast': False,
    'human': False,
    'dir_marker': False,
    'checksum': True,
    'shuffle': False,
    'destination': None,
    'fresh_metadata': False,
    'ignore_mtime': False,
}

POLICY = 'X-Storage-Policy'
KNOWN_DIR_MARKERS = (
    'application/directory',  # Preferred
    'text/directory',  # Historically relevant
)


def get_from_queue(q, timeout=864000):
    while True:
        try:
            item = q.get(timeout=timeout)
            return item
        except QueueEmpty:
            # Do nothing here, we only have a timeout to allow interruption
            pass


def get_future_result(f, timeout=86400):
    while True:
        try:
            res = f.result(timeout=timeout)
            return res
        except TimeoutError:
            # Do nothing here, we only have a timeout to allow interruption
            pass


def interruptable_as_completed(fs, timeout=86400):
    while True:
        try:
            for f in as_completed(fs, timeout=timeout):
                fs.remove(f)
                yield f
            return
        except TimeoutError:
            # Do nothing here, we only have a timeout to allow interruption
            pass


def get_conn(options):
    """
    Return a connection building it from the options.
    """
    return Connection(options['auth'],
                      options['user'],
                      options['key'],
                      options['retries'],
                      auth_version=options['auth_version'],
                      os_options=options['os_options'],
                      snet=options['snet'],
                      cacert=options['os_cacert'],
                      insecure=options['insecure'],
                      cert=options['os_cert'],
                      cert_key=options['os_key'],
                      ssl_compression=options['ssl_compression'],
                      force_auth_retry=options['force_auth_retry'])


def mkdirs(path):
    try:
        makedirs(path)
    except OSError as err:
        if err.errno != EEXIST:
            raise


def split_headers(options, prefix=''):
    """
    Splits 'Key: Value' strings and returns them as a dictionary.

    :param options: Must be one of:
        * an iterable of 'Key: Value' strings
        * an iterable of ('Key', 'Value') pairs
        * a dict of {'Key': 'Value'} pairs
    :param prefix: String to prepend to all of the keys in the dictionary.
        reporting.
    """
    headers = {}
    try:
        headers = split_request_headers(options, prefix)
    except ValueError as e:
        raise SwiftError(e)

    return headers


class SwiftUploadObject(object):
    """
    Class for specifying an object upload, allowing the object source, name and
    options to be specified separately for each individual object.
    """
    def __init__(self, source, object_name=None, options=None):
        if isinstance(source, string_types):
            self.object_name = object_name or source
        elif source is None or hasattr(source, 'read'):
            if not object_name or not isinstance(object_name, string_types):
                raise SwiftError('Object names must be specified as '
                                 'strings for uploads from None or file '
                                 'like objects.')
            self.object_name = object_name
        else:
            raise SwiftError('Unexpected source type for '
                             'SwiftUploadObject: {0}'.format(type(source)))

        if not self.object_name:
            raise SwiftError('Object names must not be empty strings')

        self.object_name = self.object_name.lstrip('/')
        self.options = options
        self.source = source


class SwiftPostObject(object):
    """
    Class for specifying an object post, allowing the headers/metadata to be
    specified separately for each individual object.
    """
    def __init__(self, object_name, options=None):
        if not (isinstance(object_name, string_types) and object_name):
            raise SwiftError(
                "Object names must be specified as non-empty strings"
            )
        self.object_name = object_name
        self.options = options


class SwiftCopyObject(object):
    """
    Class for specifying an object copy,
    allowing the destination/headers/metadata/fresh_metadata to be specified
    separately for each individual object.
    destination and fresh_metadata should be set in options
    """
    def __init__(self, object_name, options=None):
        if not (isinstance(object_name, string_types) and object_name):
            raise SwiftError(
                "Object names must be specified as non-empty strings"
            )

        self.object_name = object_name
        self.options = options

        if self.options is None:
            self.destination = None
            self.fresh_metadata = False
        else:
            self.destination = self.options.get('destination')
            self.fresh_metadata = self.options.get('fresh_metadata', False)

        if self.destination is not None:
            destination_components = self.destination.split('/')
            if destination_components[0] or len(destination_components) < 2:
                raise SwiftError("destination must be in format /cont[/obj]")
            if not destination_components[-1]:
                raise SwiftError("destination must not end in a slash")
            if len(destination_components) == 2:
                # only container set in destination
                self.destination = "{0}/{1}".format(
                    self.destination, object_name
                )


class _SwiftReader(object):
    """
    Class for downloading objects from swift and raising appropriate
    errors on failures caused by either invalid md5sum or size of the
    data read.
    """
    def __init__(self, path, body, headers, checksum=True):
        self._path = path
        self._body = body
        self._actual_read = 0
        self._content_length = None
        self._actual_md5 = None
        self._expected_md5 = headers.get('etag', '')

        if len(self._expected_md5) > 1 and self._expected_md5[0] == '"' \
                and self._expected_md5[-1] == '"':
            self._expected_md5 = self._expected_md5[1:-1]

        # Some headers indicate the MD5 of the response
        # definitely *won't* match the ETag
        bad_md5_headers = set([
            'content-range',
            'x-object-manifest',
            'x-static-large-object',
        ])
        if bad_md5_headers.intersection(headers):
            # This isn't a useful checksum
            self._expected_md5 = ''

        if self._expected_md5 and checksum:
            self._actual_md5 = md5()

        if 'content-length' in headers:
            try:
                self._content_length = int(headers.get('content-length'))
            except ValueError:
                raise SwiftError('content-length header must be an integer')

    def __iter__(self):
        for chunk in self._body:
            if self._actual_md5:
                self._actual_md5.update(chunk)
            self._actual_read += len(chunk)
            yield chunk
        self._check_contents()

    def _check_contents(self):
        if self._actual_md5 and self._expected_md5:
            etag = self._actual_md5.hexdigest()
            if etag != self._expected_md5:
                raise SwiftError('Error downloading {0}: md5sum != etag, '
                                 '{1} != {2}'.format(
                                     self._path, etag, self._expected_md5))

        if (self._content_length is not None and
                self._actual_read != self._content_length):
            raise SwiftError('Error downloading {0}: read_length != '
                             'content_length, {1:d} != {2:d}'.format(
                                 self._path, self._actual_read,
                                 self._content_length))

    def bytes_read(self):
        return self._actual_read


class SwiftService(object):
    """
    Service for performing swift operations
    """
    def __init__(self, options=None):
        if options is not None:
            self._options = dict(
                _default_global_options,
                **dict(_default_local_options, **options)
            )
        else:
            self._options = dict(
                _default_global_options,
                **_default_local_options
            )
        process_options(self._options)
        create_connection = lambda: get_conn(self._options)
        self.thread_manager = MultiThreadingManager(
            create_connection,
            segment_threads=self._options['segment_threads'],
            object_dd_threads=self._options['object_dd_threads'],
            object_uu_threads=self._options['object_uu_threads'],
            container_threads=self._options['container_threads']
        )
        self.capabilities_cache = {}  # Each instance should have its own cache

    def __enter__(self):
        self.thread_manager.__enter__()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.thread_manager.__exit__(exc_type, exc_val, exc_tb)

    # Stat related methods
    #
    def stat(self, container=None, objects=None, options=None):
        """
        Get account stats, container stats or information about a list of
        objects in a container.

        :param container: The container to query.
        :param objects: A list of object paths about which to return
                        information (a list of strings).
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation.
                        These options are applied to all stat operations
                        performed by this call::

                            {
                                'human': False,
                                'header': []
                            }

        :returns: Either a single dictionary containing stats about an account
                  or container, or an iterator for returning the results of the
                  stat operations on a list of objects.

        :raises SwiftError:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        if not container:
            if objects:
                raise SwiftError('Objects specified without container')
            else:
                res = {
                    'action': 'stat_account',
                    'success': True,
                    'container': container,
                    'object': None,
                }
                try:
                    stats_future = self.thread_manager.container_pool.submit(
                        stat_account, options
                    )
                    items, headers = get_future_result(stats_future)
                    res.update({
                        'items': items,
                        'headers': headers
                    })
                    return res
                except ClientException as err:
                    if err.http_status != 404:
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        res.update({
                            'success': False,
                            'error': err,
                            'traceback': traceback,
                            'error_timestamp': err_time
                        })
                        return res
                    raise SwiftError('Account not found', exc=err)
                except Exception as err:
                    traceback, err_time = report_traceback()
                    logger.exception(err)
                    res.update({
                        'success': False,
                        'error': err,
                        'traceback': traceback,
                        'error_timestamp': err_time
                    })
                    return res
        else:
            if not objects:
                res = {
                    'action': 'stat_container',
                    'container': container,
                    'object': None,
                    'success': True,
                }
                try:
                    stats_future = self.thread_manager.container_pool.submit(
                        stat_container, options, container
                    )
                    items, headers = get_future_result(stats_future)
                    res.update({
                        'items': items,
                        'headers': headers
                    })
                    return res
                except ClientException as err:
                    if err.http_status != 404:
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        res.update({
                            'success': False,
                            'error': err,
                            'traceback': traceback,
                            'error_timestamp': err_time
                        })
                        return res
                    raise SwiftError('Container %r not found' % container,
                                     container=container, exc=err)
                except Exception as err:
                    traceback, err_time = report_traceback()
                    logger.exception(err)
                    res.update({
                        'success': False,
                        'error': err,
                        'traceback': traceback,
                        'error_timestamp': err_time
                    })
                    return res
            else:
                stat_futures = []
                for stat_o in objects:
                    stat_future = self.thread_manager.object_dd_pool.submit(
                        self._stat_object, container, stat_o, options
                    )
                    stat_futures.append(stat_future)

                return ResultsIterator(stat_futures)

    @staticmethod
    def _stat_object(conn, container, obj, options):
        res = {
            'action': 'stat_object',
            'object': obj,
            'container': container,
            'success': True,
        }
        try:
            items, headers = stat_object(conn, options, container, obj)
            res.update({
                'items': items,
                'headers': headers
            })
            return res
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            })
            return res

    # Post related methods
    #
    def post(self, container=None, objects=None, options=None):
        """
        Post operations on an account, container or list of objects

        :param container: The container to make the post operation against.
        :param objects: A list of object names (strings) or SwiftPostObject
                        instances containing an object name, and an
                        options dict (can be None) to override the options for
                        that individual post operation::

                            [
                                'object_name',
                                SwiftPostObject('object_name', options={...}),
                                ...
                            ]

                        The options dict is described below.
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation.
                        These options are applied to all post operations
                        performed by this call, unless overridden on a per
                        object basis. Possible options are given below::

                            {
                                'meta': [],
                                'header': [],
                                'read_acl': None,   # For containers only
                                'write_acl': None,  # For containers only
                                'sync_to': None,    # For containers only
                                'sync_key': None    # For containers only
                            }

        :returns: Either a single result dictionary in the case of a post to a
                  container/account, or an iterator for returning the results
                  of posts to a list of objects.

        :raises SwiftError:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        res = {
            'success': True,
            'container': container,
            'object': None,
            'headers': {},
        }
        if not container:
            res["action"] = "post_account"
            if objects:
                raise SwiftError('Objects specified without container')
            else:
                response_dict = {}
                headers = split_headers(
                    options['meta'], 'X-Account-Meta-')
                headers.update(
                    split_headers(options['header'], ''))
                res['headers'] = headers
                try:
                    post = self.thread_manager.container_pool.submit(
                        self._post_account_job, headers, response_dict
                    )
                    get_future_result(post)
                except ClientException as err:
                    if err.http_status != 404:
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        res.update({
                            'success': False,
                            'error': err,
                            'traceback': traceback,
                            'error_timestamp': err_time,
                            'response_dict': response_dict
                        })
                        return res
                    raise SwiftError('Account not found', exc=err)
                except Exception as err:
                    traceback, err_time = report_traceback()
                    logger.exception(err)
                    res.update({
                        'success': False,
                        'error': err,
                        'response_dict': response_dict,
                        'traceback': traceback,
                        'error_timestamp': err_time
                    })
            return res
        if not objects:
            res["action"] = "post_container"
            response_dict = {}
            headers = split_headers(
                options['meta'], 'X-Container-Meta-')
            headers.update(
                split_headers(options['header'], ''))
            if options['read_acl'] is not None:
                headers['X-Container-Read'] = options['read_acl']
            if options['write_acl'] is not None:
                headers['X-Container-Write'] = options['write_acl']
            if options['sync_to'] is not None:
                headers['X-Container-Sync-To'] = options['sync_to']
            if options['sync_key'] is not None:
                headers['X-Container-Sync-Key'] = options['sync_key']
            res['headers'] = headers
            try:
                post = self.thread_manager.container_pool.submit(
                    self._post_container_job, container,
                    headers, response_dict
                )
                get_future_result(post)
            except ClientException as err:
                if err.http_status != 404:
                    traceback, err_time = report_traceback()
                    logger.exception(err)
                    res.update({
                        'action': 'post_container',
                        'success': False,
                        'error': err,
                        'traceback': traceback,
                        'error_timestamp': err_time,
                        'response_dict': response_dict
                    })
                    return res
                raise SwiftError(
                    "Container '%s' not found" % container,
                    container=container, exc=err
                )
            except Exception as err:
                traceback, err_time = report_traceback()
                logger.exception(err)
                res.update({
                    'action': 'post_container',
                    'success': False,
                    'error': err,
                    'response_dict': response_dict,
                    'traceback': traceback,
                    'error_timestamp': err_time
                })
            return res
        else:
            post_futures = []
            post_objects = self._make_post_objects(objects)
            for post_object in post_objects:
                obj = post_object.object_name
                obj_options = post_object.options
                response_dict = {}
                headers = split_headers(
                    options['meta'], 'X-Object-Meta-')
                # add header options to the headers object for the request.
                headers.update(
                    split_headers(options['header'], ''))
                if obj_options is not None:
                    if 'meta' in obj_options:
                        headers.update(
                            split_headers(
                                obj_options['meta'], 'X-Object-Meta-'
                            )
                        )
                    if 'header' in obj_options:
                        headers.update(
                            split_headers(obj_options['header'], '')
                        )

                post = self.thread_manager.object_uu_pool.submit(
                    self._post_object_job, container, obj,
                    headers, response_dict
                )
                post_futures.append(post)

            return ResultsIterator(post_futures)

    @staticmethod
    def _make_post_objects(objects):
        post_objects = []

        for o in objects:
            if isinstance(o, string_types):
                obj = SwiftPostObject(o)
                post_objects.append(obj)
            elif isinstance(o, SwiftPostObject):
                post_objects.append(o)
            else:
                raise SwiftError(
                    "The post operation takes only strings or "
                    "SwiftPostObjects as input",
                    obj=o)

        return post_objects

    @staticmethod
    def _post_account_job(conn, headers, result):
        return conn.post_account(headers=headers, response_dict=result)

    @staticmethod
    def _post_container_job(conn, container, headers, result):
        try:
            res = conn.post_container(
                container, headers=headers, response_dict=result)
        except ClientException as err:
            if err.http_status != 404:
                raise
            _response_dict = {}
            res = conn.put_container(
                container, headers=headers, response_dict=_response_dict)
            result['post_put'] = _response_dict
        return res

    @staticmethod
    def _post_object_job(conn, container, obj, headers, result):
        res = {
            'success': True,
            'action': 'post_object',
            'container': container,
            'object': obj,
            'headers': headers,
            'response_dict': result
        }
        try:
            conn.post_object(
                container, obj, headers=headers, response_dict=result)
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            })

        return res

    # List related methods
    #
    def list(self, container=None, options=None):
        """
        List operations on an account, container.

        :param container: The container to make the list operation against.
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation::

                            {
                                'long': False,
                                'prefix': None,
                                'delimiter': None,
                                'header': []
                            }

        :returns: A generator for returning the results of the list operation
                  on an account or container. Each result yielded from the
                  generator is either a 'list_account_part' or
                  'list_container_part', containing part of the listing.
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        rq = Queue(maxsize=10)  # Just stop list running away consuming memory

        if container is None:
            listing_future = self.thread_manager.container_pool.submit(
                self._list_account_job, options, rq
            )
        else:
            listing_future = self.thread_manager.container_pool.submit(
                self._list_container_job, container, options, rq
            )

        res = get_from_queue(rq)
        while res is not None:
            yield res
            res = get_from_queue(rq)

        # Make sure the future has completed
        get_future_result(listing_future)

    @staticmethod
    def _list_account_job(conn, options, result_queue):
        marker = ''
        error = None
        req_headers = split_headers(options.get('header', []))
        try:
            while True:
                _, items = conn.get_account(
                    marker=marker, prefix=options['prefix'],
                    headers=req_headers
                )

                if not items:
                    result_queue.put(None)
                    return

                if options['long']:
                    for i in items:
                        name = i['name']
                        i['meta'] = conn.head_container(name)

                res = {
                    'action': 'list_account_part',
                    'container': None,
                    'prefix': options['prefix'],
                    'success': True,
                    'listing': items,
                    'marker': marker,
                }
                result_queue.put(res)

                marker = items[-1].get('name', items[-1].get('subdir'))
        except ClientException as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            if err.http_status != 404:
                error = (err, traceback, err_time)
            else:
                error = (
                    SwiftError('Account not found', exc=err),
                    traceback, err_time
                )

        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            error = (err, traceback, err_time)

        res = {
            'action': 'list_account_part',
            'container': None,
            'prefix': options['prefix'],
            'success': False,
            'marker': marker,
            'error': error[0],
            'traceback': error[1],
            'error_timestamp': error[2]
        }
        result_queue.put(res)
        result_queue.put(None)

    @staticmethod
    def _list_container_job(conn, container, options, result_queue):
        marker = options.get('marker', '')
        error = None
        req_headers = split_headers(options.get('header', []))
        try:
            while True:
                _, items = conn.get_container(
                    container, marker=marker, prefix=options['prefix'],
                    delimiter=options['delimiter'], headers=req_headers
                )

                if not items:
                    result_queue.put(None)
                    return

                res = {
                    'action': 'list_container_part',
                    'container': container,
                    'prefix': options['prefix'],
                    'success': True,
                    'marker': marker,
                    'listing': items,
                }
                result_queue.put(res)

                marker = items[-1].get('name', items[-1].get('subdir'))
        except ClientException as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            if err.http_status != 404:
                error = (err, traceback, err_time)
            else:
                error = (
                    SwiftError(
                        'Container %r not found' % container,
                        container=container, exc=err
                    ),
                    traceback,
                    err_time
                )
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            error = (err, traceback, err_time)

        res = {
            'action': 'list_container_part',
            'container': container,
            'prefix': options['prefix'],
            'success': False,
            'marker': marker,
            'error': error[0],
            'traceback': error[1],
            'error_timestamp': error[2]
        }
        result_queue.put(res)
        result_queue.put(None)

    # Download related methods
    #
    def download(self, container=None, objects=None, options=None):
        """
        Download operations on an account, optional container and optional list
        of objects.

        :param container: The container to download from.
        :param objects: A list of object names to download (a list of strings).
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation::

                            {
                                'yes_all': False,
                                'marker': '',
                                'prefix': None,
                                'no_download': False,
                                'header': [],
                                'skip_identical': False,
                                'out_directory': None,
                                'checksum': True,
                                'out_file': None,
                                'remove_prefix': False,
                                'shuffle' : False
                            }

        :returns: A generator for returning the results of the download
                  operations. Each result yielded from the generator is a
                  'download_object' dictionary containing the results of an
                  individual file download.

        :raises ClientException:
        :raises SwiftError:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        if not container:
            # Download everything if options['yes_all'] is set
            if options['yes_all']:
                try:
                    options_copy = deepcopy(options)
                    options_copy["long"] = False

                    for part in self.list(options=options_copy):
                        if part["success"]:
                            containers = [i['name'] for i in part["listing"]]

                            if options['shuffle']:
                                shuffle(containers)

                            for con in containers:
                                for res in self._download_container(
                                        con, options_copy):
                                    yield res
                        else:
                            raise part["error"]

                # If we see a 404 here, the listing of the account failed
                except ClientException as err:
                    if err.http_status != 404:
                        raise
                    raise SwiftError('Account not found', exc=err)

        elif objects is None:
            if '/' in container:
                raise SwiftError('\'/\' in container name',
                                 container=container)
            for res in self._download_container(container, options):
                yield res

        else:
            if '/' in container:
                raise SwiftError('\'/\' in container name',
                                 container=container)
            if options['out_file'] and len(objects) > 1:
                options['out_file'] = None

            o_downs = [
                self.thread_manager.object_dd_pool.submit(
                    self._download_object_job, container, obj, options
                ) for obj in objects
            ]

            for o_down in interruptable_as_completed(o_downs):
                yield o_down.result()

    def _download_object_job(self, conn, container, obj, options):
        out_file = options['out_file']
        results_dict = {}

        req_headers = split_headers(options['header'], '')

        pseudodir = False
        path = join(container, obj) if options['yes_all'] else obj
        path = path.lstrip(os_path_sep)
        options['skip_identical'] = (options['skip_identical'] and
                                     out_file != '-')

        if options['prefix'] and options['remove_prefix']:
            path = path[len(options['prefix']):].lstrip('/')

        if options['out_directory']:
            path = os.path.join(options['out_directory'], path)

        if options['skip_identical']:
            filename = out_file if out_file else path
            try:
                fp = open(filename, 'rb', DISK_BUFFER)
            except IOError:
                pass
            else:
                with fp:
                    md5sum = md5()
                    while True:
                        data = fp.read(DISK_BUFFER)
                        if not data:
                            break
                        md5sum.update(data)
                    req_headers['If-None-Match'] = md5sum.hexdigest()

        try:
            start_time = time()
            get_args = {'resp_chunk_size': DISK_BUFFER,
                        'headers': req_headers,
                        'response_dict': results_dict}
            if options['skip_identical']:
                # Assume the file is a large object; if we're wrong, the query
                # string is ignored and the If-None-Match header will trigger
                # the behavior we want
                get_args['query_string'] = 'multipart-manifest=get'

            try:
                headers, body = conn.get_object(container, obj, **get_args)
            except ClientException as e:
                if not options['skip_identical']:
                    raise
                if e.http_status != 304:  # Only handling Not Modified
                    raise

                headers = results_dict['headers']
                if 'x-object-manifest' in headers:
                    # DLO: most likely it has more than one page worth of
                    #      segments and we have an empty file locally
                    body = []
                elif config_true_value(headers.get('x-static-large-object')):
                    # SLO: apparently we have a copy of the manifest locally?
                    #      provide no chunking data to force a fresh download
                    body = [b'[]']
                else:
                    # Normal object: let it bubble up
                    raise

            if options['skip_identical']:
                if config_true_value(headers.get('x-static-large-object')) or \
                        'x-object-manifest' in headers:
                    # The request was chunked, so stitch it back together
                    chunk_data = self._get_chunk_data(conn, container, obj,
                                                      headers, b''.join(body))
                else:
                    chunk_data = None

                if chunk_data is not None:
                    if self._is_identical(chunk_data, filename):
                        raise ClientException('Large object is identical',
                                              http_status=304)

                    # Large objects are different; start the real download
                    del get_args['query_string']
                    get_args['response_dict'].clear()
                    headers, body = conn.get_object(container, obj, **get_args)

            headers_receipt = time()

            obj_body = _SwiftReader(path, body, headers,
                                    options.get('checksum', True))

            no_file = options['no_download']
            if out_file == "-" and not no_file:
                res = {
                    'action': 'download_object',
                    'container': container,
                    'object': obj,
                    'path': path,
                    'pseudodir': pseudodir,
                    'contents': obj_body
                }
                return res

            fp = None
            try:
                content_type = headers.get('content-type', '').split(';', 1)[0]
                if content_type in KNOWN_DIR_MARKERS:
                    make_dir = not no_file and out_file != "-"
                    if make_dir and not isdir(path):
                        mkdirs(path)

                else:
                    make_dir = not (no_file or out_file)
                    if make_dir:
                        dirpath = dirname(path)
                        if dirpath and not isdir(dirpath):
                            mkdirs(dirpath)

                    if not no_file:
                        if out_file:
                            fp = open(out_file, 'wb', DISK_BUFFER)
                        else:
                            if basename(path):
                                fp = open(path, 'wb', DISK_BUFFER)
                            else:
                                pseudodir = True

                for chunk in obj_body:
                    if fp is not None:
                        fp.write(chunk)

                finish_time = time()

            finally:
                bytes_read = obj_body.bytes_read()
                if fp is not None:
                    fp.close()
                    if ('x-object-meta-mtime' in headers and not no_file and
                            not options['ignore_mtime']):
                        try:
                            mtime = float(headers['x-object-meta-mtime'])
                        except ValueError:
                            pass  # no real harm; couldn't trust it anyway
                        else:
                            if options['out_file']:
                                utime(options['out_file'], (mtime, mtime))
                            else:
                                utime(path, (mtime, mtime))

            res = {
                'action': 'download_object',
                'success': True,
                'container': container,
                'object': obj,
                'path': path,
                'pseudodir': pseudodir,
                'start_time': start_time,
                'finish_time': finish_time,
                'headers_receipt': headers_receipt,
                'auth_end_time': conn.auth_end_time,
                'read_length': bytes_read,
                'attempts': conn.attempts,
                'response_dict': results_dict
            }
            return res

        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res = {
                'action': 'download_object',
                'container': container,
                'object': obj,
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time,
                'response_dict': results_dict,
                'path': path,
                'pseudodir': pseudodir,
                'attempts': conn.attempts
            }
            return res

    def _submit_page_downloads(self, container, page_generator, options):
        try:
            list_page = next(page_generator)
        except StopIteration:
            return None

        if list_page["success"]:
            objects = [o["name"] for o in list_page["listing"]]

            if options["shuffle"]:
                shuffle(objects)

            o_downs = [
                self.thread_manager.object_dd_pool.submit(
                    self._download_object_job, container, obj, options
                ) for obj in objects
            ]

            return o_downs
        else:
            raise list_page["error"]

    def _download_container(self, container, options):
        _page_generator = self.list(container=container, options=options)
        try:
            next_page_downs = self._submit_page_downloads(
                container, _page_generator, options
            )
        except ClientException as err:
            if err.http_status != 404:
                raise
            raise SwiftError(
                'Container %r not found' % container,
                container=container, exc=err
            )

        error = None
        while next_page_downs:
            page_downs = next_page_downs
            next_page_downs = None

            # Start downloading the next page of list results when
            # we have completed 80% of the previous page
            next_page_triggered = False
            next_page_trigger_point = 0.8 * len(page_downs)

            page_results_yielded = 0
            for o_down in interruptable_as_completed(page_downs):
                yield o_down.result()

                # Do we need to start the next set of downloads yet?
                if not next_page_triggered:
                    page_results_yielded += 1
                    if page_results_yielded >= next_page_trigger_point:
                        try:
                            next_page_downs = self._submit_page_downloads(
                                container, _page_generator, options
                            )
                        except ClientException as err:
                            # Allow the current page to finish downloading
                            logger.exception(err)
                            error = err
                        except Exception:
                            # Something unexpected went wrong - cancel
                            # remaining downloads
                            for _d in page_downs:
                                _d.cancel()
                            raise
                        finally:
                            # Stop counting and testing
                            next_page_triggered = True

        if error:
            raise error

    # Upload related methods
    #
    def upload(self, container, objects, options=None):
        """
        Upload a list of objects to a given container.

        :param container: The container (or pseudo-folder path) to put the
                          uploads into.
        :param objects: A list of file/directory names (strings) or
                        SwiftUploadObject instances containing a source for the
                        created object, an object name, and an options dict
                        (can be None) to override the options for that
                        individual upload operation::

                            [
                                '/path/to/file',
                                SwiftUploadObject('/path', object_name='obj1'),
                                ...
                            ]

                        The options dict is as described below.

                        The SwiftUploadObject source may be one of:

                            * A file-like object (with a read method)
                            * A string containing the path to a local
                              file or directory
                            * None, to indicate that we want an empty object

        :param options: A dictionary containing options to override the global
                        options specified during the service object creation.
                        These options are applied to all upload operations
                        performed by this call, unless overridden on a per
                        object basis. Possible options are given below::

                            {
                                'meta': [],
                                'header': [],
                                'segment_size': None,
                                'use_slo': False,
                                'segment_container': None,
                                'leave_segments': False,
                                'changed': None,
                                'skip_identical': False,
                                'fail_fast': False,
                                'dir_marker': False  # Only for None sources
                            }

        :returns: A generator for returning the results of the uploads.

        :raises SwiftError:
        :raises ClientException:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        try:
            segment_size = int(0 if options['segment_size'] is None else
                               options['segment_size'])
        except ValueError:
            raise SwiftError('Segment size should be an integer value')

        # Incase we have a psudeo-folder path for <container> arg, derive
        # the container name from the top path and prepend the rest to
        # the object name. (same as passing --object-name).
        container, _sep, pseudo_folder = container.partition('/')

        # Try to create the container, just in case it doesn't exist. If this
        # fails, it might just be because the user doesn't have container PUT
        # permissions, so we'll ignore any error. If there's really a problem,
        # it'll surface on the first object PUT.
        policy_header = {}
        _header = split_headers(options["header"])
        if POLICY in _header:
            policy_header[POLICY] = \
                _header[POLICY]
        create_containers = [
            self.thread_manager.container_pool.submit(
                self._create_container_job, container, headers=policy_header)
        ]

        # wait for first container job to complete before possibly attempting
        # segment container job because segment container job may attempt
        # to HEAD the first container
        for r in interruptable_as_completed(create_containers):
            res = r.result()
            yield res

        if segment_size:
            seg_container = container + '_segments'
            if options['segment_container']:
                seg_container = options['segment_container']
            if seg_container != container:
                if not policy_header:
                    # Since no storage policy was specified on the command
                    # line, rather than just letting swift pick the default
                    # storage policy, we'll try to create the segments
                    # container with the same policy as the upload container
                    create_containers = [
                        self.thread_manager.container_pool.submit(
                            self._create_container_job, seg_container,
                            policy_source=container
                        )
                    ]
                else:
                    create_containers = [
                        self.thread_manager.container_pool.submit(
                            self._create_container_job, seg_container,
                            headers=policy_header
                        )
                    ]

                for r in interruptable_as_completed(create_containers):
                    res = r.result()
                    yield res

        # We maintain a results queue here and a separate thread to monitor
        # the futures because we want to get results back from potential
        # segment uploads too
        rq = Queue()
        file_jobs = {}

        upload_objects = self._make_upload_objects(objects, pseudo_folder)
        for upload_object in upload_objects:
            s = upload_object.source
            o = upload_object.object_name
            o_opts = upload_object.options
            details = {'action': 'upload', 'container': container}
            if o_opts is not None:
                object_options = deepcopy(options)
                object_options.update(o_opts)
            else:
                object_options = options
            if hasattr(s, 'read'):
                # We've got a file like object to upload to o
                file_future = self.thread_manager.object_uu_pool.submit(
                    self._upload_object_job, container, s, o, object_options,
                    results_queue=rq
                )
                details['file'] = s
                details['object'] = o
                file_jobs[file_future] = details
            elif s is not None:
                # We've got a path to upload to o
                details['path'] = s
                details['object'] = o
                if isdir(s):
                    dir_future = self.thread_manager.object_uu_pool.submit(
                        self._create_dir_marker_job, container, o,
                        object_options, path=s
                    )
                    file_jobs[dir_future] = details
                else:
                    try:
                        stat(s)
                        file_future = \
                            self.thread_manager.object_uu_pool.submit(
                                self._upload_object_job, container, s, o,
                                object_options, results_queue=rq
                            )
                        file_jobs[file_future] = details
                    except OSError as err:
                        # Avoid tying up threads with jobs that will fail
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        res = {
                            'action': 'upload_object',
                            'container': container,
                            'object': o,
                            'success': False,
                            'error': err,
                            'traceback': traceback,
                            'error_timestamp': err_time,
                            'path': s
                        }
                        rq.put(res)
            else:
                # Create an empty object (as a dir marker if is_dir)
                details['file'] = None
                details['object'] = o
                if object_options['dir_marker']:
                    dir_future = self.thread_manager.object_uu_pool.submit(
                        self._create_dir_marker_job, container, o,
                        object_options
                    )
                    file_jobs[dir_future] = details
                else:
                    file_future = self.thread_manager.object_uu_pool.submit(
                        self._upload_object_job, container, StringIO(),
                        o, object_options
                    )
                    file_jobs[file_future] = details

        # Start a thread to watch for upload results
        Thread(
            target=self._watch_futures, args=(file_jobs, rq)
        ).start()

        # yield results as they become available, including those from
        # segment uploads.
        res = get_from_queue(rq)
        cancelled = False
        while res is not None:
            yield res

            if not res['success']:
                if not cancelled and options['fail_fast']:
                    cancelled = True
                    for f in file_jobs:
                        f.cancel()

            res = get_from_queue(rq)

    @staticmethod
    def _make_upload_objects(objects, pseudo_folder=''):
        upload_objects = []

        for o in objects:
            if isinstance(o, string_types):
                obj = SwiftUploadObject(o, urljoin(pseudo_folder,
                                                   o.lstrip('/')))
                upload_objects.append(obj)
            elif isinstance(o, SwiftUploadObject):
                o.object_name = urljoin(pseudo_folder, o.object_name)
                upload_objects.append(o)
            else:
                raise SwiftError(
                    "The upload operation takes only strings or "
                    "SwiftUploadObjects as input",
                    obj=o)

        return upload_objects

    @staticmethod
    def _create_container_job(
            conn, container, headers=None, policy_source=None):
        """
        Create a container using the given connection

        :param conn: The swift connection used for requests.
        :param container: The container name to create.
        :param headers: An optional dict of headers for the
                        put_container request.
        :param policy_source: An optional name of a container whose policy we
                              should duplicate.
        :return: A dict containing the results of the operation.
        """
        res = {
            'action': 'create_container',
            'container': container,
            'headers': headers
        }
        create_response = {}
        try:
            if policy_source is not None:
                _meta = conn.head_container(policy_source)
                if 'x-storage-policy' in _meta:
                    policy_header = {
                        POLICY: _meta.get('x-storage-policy')
                    }
                    if headers is None:
                        headers = policy_header
                    else:
                        headers.update(policy_header)

            conn.put_container(
                container, headers, response_dict=create_response
            )
            res.update({
                'success': True,
                'response_dict': create_response
            })
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time,
                'response_dict': create_response
            })
        return res

    @staticmethod
    def _create_dir_marker_job(conn, container, obj, options, path=None):
        res = {
            'action': 'create_dir_marker',
            'container': container,
            'object': obj,
            'path': path
        }
        results_dict = {}
        if obj.startswith('./') or obj.startswith('.\\'):
            obj = obj[2:]
        if obj.startswith('/'):
            obj = obj[1:]
        if path is not None:
            put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
        else:
            put_headers = {'x-object-meta-mtime': "%f" % round(time())}
        res['headers'] = put_headers
        if options['changed']:
            try:
                headers = conn.head_object(container, obj)
                ct = headers.get('content-type', '').split(';', 1)[0]
                cl = int(headers.get('content-length'))
                et = headers.get('etag')
                mt = headers.get('x-object-meta-mtime')

                if (ct in KNOWN_DIR_MARKERS and
                        cl == 0 and
                        et == EMPTY_ETAG and
                        mt == put_headers['x-object-meta-mtime']):
                    res['success'] = True
                    return res
            except ClientException as err:
                if err.http_status != 404:
                    traceback, err_time = report_traceback()
                    logger.exception(err)
                    res.update({
                        'success': False,
                        'error': err,
                        'traceback': traceback,
                        'error_timestamp': err_time
                    })
                    return res
        try:
            conn.put_object(container, obj, '', content_length=0,
                            content_type=KNOWN_DIR_MARKERS[0],
                            headers=put_headers,
                            response_dict=results_dict)
            res.update({
                'success': True,
                'response_dict': results_dict})
            return res
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time,
                'response_dict': results_dict})
            return res

    @staticmethod
    def _upload_segment_job(conn, path, container, segment_name, segment_start,
                            segment_size, segment_index, obj_name, options,
                            results_queue=None):
        results_dict = {}
        if options['segment_container']:
            segment_container = options['segment_container']
        else:
            segment_container = container + '_segments'

        res = {
            'action': 'upload_segment',
            'for_container': container,
            'for_object': obj_name,
            'segment_index': segment_index,
            'segment_size': segment_size,
            'segment_location': '/%s/%s' % (segment_container,
                                            segment_name),
            'log_line': '%s segment %s' % (obj_name, segment_index),
        }
        fp = None
        try:
            fp = open(path, 'rb', DISK_BUFFER)
            fp.seek(segment_start)

            contents = LengthWrapper(fp, segment_size, md5=options['checksum'])
            etag = conn.put_object(
                segment_container,
                segment_name,
                contents,
                content_length=segment_size,
                content_type='application/swiftclient-segment',
                response_dict=results_dict)

            if options['checksum'] and etag and etag != contents.get_md5sum():
                raise SwiftError('Segment {0}: upload verification failed: '
                                 'md5 mismatch, local {1} != remote {2} '
                                 '(remote segment has not been removed)'
                                 .format(segment_index,
                                         contents.get_md5sum(),
                                         etag))

            res.update({
                'success': True,
                'response_dict': results_dict,
                'segment_etag': etag,
                'attempts': conn.attempts
            })

            if results_queue is not None:
                results_queue.put(res)
            return res

        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time,
                'response_dict': results_dict,
                'attempts': conn.attempts
            })

            if results_queue is not None:
                results_queue.put(res)
            return res
        finally:
            if fp is not None:
                fp.close()

    @staticmethod
    def _put_object(conn, container, name, content, headers=None, md5=None):
        """
        Upload object into a given container and verify the resulting ETag, if
        the md5 optional parameter is passed.

        :param conn: The Swift connection to use for uploads.
        :param container: The container to put the object into.
        :param name: The name of the object.
        :param content: Object content.
        :param headers: Headers (optional) to associate with the object.
        :param md5: MD5 sum of the content. If passed in, will be used to
                    verify the returned ETag.

        :returns: A dictionary as the response from calling put_object.
                  The keys are:
                    - status
                    - reason
                    - headers
                  On error, the dictionary contains the following keys:
                    - success (with value False)
                    - error - the encountered exception (object)
                    - error_timestamp
                    - response_dict - results from the put_object call, as
                      documented above
                    - attempts - number of attempts made
        """
        if headers is None:
            headers = {}
        else:
            headers = dict(headers)
        if md5 is not None:
            headers['etag'] = md5
        results = {}
        try:
            etag = conn.put_object(
                container, name, content, content_length=len(content),
                headers=headers, response_dict=results)
            if md5 is not None and etag != md5:
                raise SwiftError('Upload verification failed for {0}: md5 '
                                 'mismatch {1} != {2}'.format(name, md5, etag))
            results['success'] = True
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            return {
                'success': False,
                'error': err,
                'error_timestamp': err_time,
                'response_dict': results,
                'attempts': conn.attempts,
                'traceback': traceback
            }
        return results

    @staticmethod
    def _upload_stream_segment(conn, container, object_name,
                               segment_container, segment_name,
                               segment_size, segment_index,
                               headers, fd):
        """
        Upload a segment from a stream, buffering it in memory first. The
        resulting object is placed either as a segment in the segment
        container, or if it is smaller than a single segment, as the given
        object name.

        :param conn: Swift Connection to use.
        :param container: Container in which the object would be placed.
        :param object_name: Name of the final object (used in case the stream
                            is smaller than the segment_size)
        :param segment_container: Container to hold the object segments.
        :param segment_name: The name of the segment.
        :param segment_size: Minimum segment size.
        :param segment_index: The segment index.
        :param headers: Headers to attach to the segment/object.
        :param fd: File-like handle for the content. Must implement read().

        :returns: Dictionary, containing the following keys:
                    - complete -- whether the stream is exhausted
                    - segment_size - the actual size of the segment (may be
                                     smaller than the passed in segment_size)
                    - segment_location - path to the segment
                    - segment_index - index of the segment
                    - segment_etag - the ETag for the segment
        """
        buf = []
        dgst = md5()
        bytes_read = 0
        while bytes_read < segment_size:
            data = fd.read(segment_size - bytes_read)
            if not data:
                break
            bytes_read += len(data)
            dgst.update(data)
            buf.append(data)
        buf = b''.join(buf)
        segment_hash = dgst.hexdigest()

        if not buf and segment_index > 0:
            # Happens if the segment size aligns with the object size
            return {'complete': True,
                    'segment_size': 0,
                    'segment_index': None,
                    'segment_etag': None,
                    'segment_location': None,
                    'success': True}

        if segment_index == 0 and len(buf) < segment_size:
            ret = SwiftService._put_object(
                conn, container, object_name, buf, headers, segment_hash)
            ret['segment_location'] = '/%s/%s' % (container, object_name)
        else:
            ret = SwiftService._put_object(
                conn, segment_container, segment_name, buf, headers,
                segment_hash)
            ret['segment_location'] = '/%s/%s' % (
                segment_container, segment_name)

        ret.update(
            dict(complete=len(buf) < segment_size,
                 segment_size=len(buf),
                 segment_index=segment_index,
                 segment_etag=segment_hash,
                 for_object=object_name))
        return ret

    def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
        chunks = []
        if 'x-object-manifest' in headers:
            scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
            for part in self.list(scontainer, {'prefix': sprefix}):
                if part["success"]:
                    chunks.extend(part["listing"])
                else:
                    raise part["error"]
        elif config_true_value(headers.get('x-static-large-object')):
            if manifest is None:
                headers, manifest = conn.get_object(
                    container, obj, query_string='multipart-manifest=get')
            manifest = parse_api_response(headers, manifest)
            for chunk in manifest:
                if chunk.get('sub_slo'):
                    scont, sobj = chunk['name'].lstrip('/').split('/', 1)
                    chunks.extend(self._get_chunk_data(
                        conn, scont, sobj, {'x-static-large-object': True}))
                else:
                    chunks.append(chunk)
        else:
            chunks.append({'hash': headers.get('etag').strip('"'),
                           'bytes': int(headers.get('content-length'))})
        return chunks

    def _is_identical(self, chunk_data, path):
        if path is None:
            return False
        try:
            fp = open(path, 'rb', DISK_BUFFER)
        except IOError:
            return False

        with fp:
            for chunk in chunk_data:
                to_read = chunk['bytes']
                md5sum = md5()
                while to_read:
                    data = fp.read(min(DISK_BUFFER, to_read))
                    if not data:
                        return False
                    md5sum.update(data)
                    to_read -= len(data)
                if md5sum.hexdigest() != chunk['hash']:
                    return False
            # Each chunk is verified; check that we're at the end of the file
            return not fp.read(1)

    @staticmethod
    def _upload_slo_manifest(conn, segment_results, container, obj, headers):
        """
        Upload an SLO manifest, given the results of uploading each segment, to
        the specified container.

        :param segment_results: List of response_dict structures, as populated
                                by _upload_segment_job. Specifically, each
                                entry must container the following keys:
                                - segment_location
                                - segment_etag
                                - segment_size
                                - segment_index
        :param container: The container to put the manifest into.
        :param obj: The name of the manifest object to use.
        :param headers: Optional set of headers to attach to the manifest.
        """
        if headers is None:
            headers = {}
        segment_results.sort(key=lambda di: di['segment_index'])
        for seg in segment_results:
            seg_loc = seg['segment_location'].lstrip('/')
            if isinstance(seg_loc, text_type):
                seg_loc = seg_loc.encode('utf-8')

        manifest_data = json.dumps([
            {
                'path': d['segment_location'],
                'etag': d['segment_etag'],
                'size_bytes': d['segment_size']
            } for d in segment_results
        ])

        response = {}
        conn.put_object(
            container, obj, manifest_data,
            headers=headers,
            query_string='multipart-manifest=put',
            response_dict=response)
        return response

    def _upload_object_job(self, conn, container, source, obj, options,
                           results_queue=None):
        if obj.startswith('./') or obj.startswith('.\\'):
            obj = obj[2:]
        if obj.startswith('/'):
            obj = obj[1:]
        res = {
            'action': 'upload_object',
            'container': container,
            'object': obj
        }
        if hasattr(source, 'read'):
            stream = source
            path = None
        else:
            path = source
        res['path'] = path
        try:
            if path is not None:
                put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
            else:
                put_headers = {'x-object-meta-mtime': "%f" % round(time())}

            res['headers'] = put_headers

            # We need to HEAD all objects now in case we're overwriting a
            # manifest object and need to delete the old segments
            # ourselves.
            old_manifest = None
            old_slo_manifest_paths = []
            new_slo_manifest_paths = set()
            segment_size = int(0 if options['segment_size'] is None
                               else options['segment_size'])
            if (options['changed'] or options['skip_identical'] or
                    not options['leave_segments']):
                try:
                    headers = conn.head_object(container, obj)
                    is_slo = config_true_value(
                        headers.get('x-static-large-object'))

                    if options['skip_identical'] or (
                            is_slo and not options['leave_segments']):
                        chunk_data = self._get_chunk_data(
                            conn, container, obj, headers)

                    if options['skip_identical'] and self._is_identical(
                            chunk_data, path):
                        res.update({
                            'success': True,
                            'status': 'skipped-identical'
                        })
                        return res

                    cl = int(headers.get('content-length'))
                    mt = headers.get('x-object-meta-mtime')
                    if (path is not None and options['changed'] and
                            cl == getsize(path) and
                            mt == put_headers['x-object-meta-mtime']):
                        res.update({
                            'success': True,
                            'status': 'skipped-changed'
                        })
                        return res
                    if not options['leave_segments']:
                        old_manifest = headers.get('x-object-manifest')
                        if is_slo:
                            for old_seg in chunk_data:
                                seg_path = old_seg['name'].lstrip('/')
                                if isinstance(seg_path, text_type):
                                    seg_path = seg_path.encode('utf-8')
                                old_slo_manifest_paths.append(seg_path)
                except ClientException as err:
                    if err.http_status != 404:
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        res.update({
                            'success': False,
                            'error': err,
                            'traceback': traceback,
                            'error_timestamp': err_time
                        })
                        return res

            # Merge the command line header options to the put_headers
            put_headers.update(split_headers(
                options['meta'], 'X-Object-Meta-'))
            put_headers.update(split_headers(options['header'], ''))

            # Don't do segment job if object is not big enough, and never do
            # a segment job if we're reading from a stream - we may fail if we
            # go over the single object limit, but this gives us a nice way
            # to create objects from memory
            if (path is not None and segment_size and
                    (getsize(path) > segment_size)):
                res['large_object'] = True
                seg_container = container + '_segments'
                if options['segment_container']:
                    seg_container = options['segment_container']
                full_size = getsize(path)

                segment_futures = []
                segment_pool = self.thread_manager.segment_pool
                segment = 0
                segment_start = 0

                while segment_start < full_size:
                    if segment_start + segment_size > full_size:
                        segment_size = full_size - segment_start
                    if options['use_slo']:
                        segment_name = '%s/slo/%s/%s/%s/%08d' % (
                            obj, put_headers['x-object-meta-mtime'],
                            full_size, options['segment_size'], segment
                        )
                    else:
                        segment_name = '%s/%s/%s/%s/%08d' % (
                            obj, put_headers['x-object-meta-mtime'],
                            full_size, options['segment_size'], segment
                        )
                    seg = segment_pool.submit(
                        self._upload_segment_job, path, container,
                        segment_name, segment_start, segment_size, segment,
                        obj, options, results_queue=results_queue
                    )
                    segment_futures.append(seg)
                    segment += 1
                    segment_start += segment_size

                segment_results = []
                errors = False
                exceptions = []
                for f in interruptable_as_completed(segment_futures):
                    try:
                        r = f.result()
                        if not r['success']:
                            errors = True
                        segment_results.append(r)
                    except Exception as err:
                        traceback, err_time = report_traceback()
                        logger.exception(err)
                        errors = True
                        exceptions.append((err, traceback, err_time))
                if errors:
                    err = ClientException(
                        'Aborting manifest creation '
                        'because not all segments could be uploaded. %s/%s'
                        % (container, obj))
                    res.update({
                        'success': False,
                        'error': err,
                        'exceptions': exceptions,
                        'segment_results': segment_results
                    })
                    return res

                res['segment_results'] = segment_results

                if options['use_slo']:
                    response = self._upload_slo_manifest(
                        conn, segment_results, container, obj, put_headers)
                    res['manifest_response_dict'] = response
                    new_slo_manifest_paths = {
                        seg['segment_location'] for seg in segment_results}
                else:
                    new_object_manifest = '%s/%s/%s/%s/%s/' % (
                        quote(seg_container.encode('utf8')),
                        quote(obj.encode('utf8')),
                        put_headers['x-object-meta-mtime'], full_size,
                        options['segment_size'])
                    if old_manifest and old_manifest.rstrip('/') == \
                            new_object_manifest.rstrip('/'):
                        old_manifest = None
                    put_headers['x-object-manifest'] = new_object_manifest
                    mr = {}
                    conn.put_object(
                        container, obj, '', content_length=0,
                        headers=put_headers,
                        response_dict=mr
                    )
                    res['manifest_response_dict'] = mr
            elif options['use_slo'] and segment_size and not path:
                segment = 0
                results = []
                while True:
                    segment_name = '%s/slo/%s/%s/%08d' % (
                        obj, put_headers['x-object-meta-mtime'],
                        segment_size, segment
                    )
                    seg_container = container + '_segments'
                    if options['segment_container']:
                        seg_container = options['segment_container']
                    ret = self._upload_stream_segment(
                        conn, container, obj,
                        seg_container,
                        segment_name,
                        segment_size,
                        segment,
                        put_headers,
                        stream
                    )
                    if not ret['success']:
                        return ret
                    if (ret['complete'] and segment == 0) or\
                            ret['segment_size'] > 0:
                        results.append(ret)
                    if results_queue is not None:
                        # Don't insert the 0-sized segments or objects
                        # themselves
                        if ret['segment_location'] != '/%s/%s' % (
                                container, obj) and ret['segment_size'] > 0:
                            results_queue.put(ret)
                    if ret['complete']:
                        break
                    segment += 1
                if results[0]['segment_location'] != '/%s/%s' % (
                        container, obj):
                    response = self._upload_slo_manifest(
                        conn, results, container, obj, put_headers)
                    res['manifest_response_dict'] = response
                    new_slo_manifest_paths = {
                        r['segment_location'] for r in results}
                    res['large_object'] = True
                else:
                    res['response_dict'] = ret
                    res['large_object'] = False
            else:
                res['large_object'] = False
                obr = {}
                fp = None
                try:
                    if path is not None:
                        content_length = getsize(path)
                        fp = open(path, 'rb', DISK_BUFFER)
                        contents = LengthWrapper(fp,
                                                 content_length,
                                                 md5=options['checksum'])
                    else:
                        content_length = None
                        contents = ReadableToIterable(stream,
                                                      md5=options['checksum'])

                    etag = conn.put_object(
                        container, obj, contents,
                        content_length=content_length, headers=put_headers,
                        response_dict=obr
                    )
                    res['response_dict'] = obr

                    if (options['checksum'] and
                            etag and etag != contents.get_md5sum()):
                        raise SwiftError(
                            'Object upload verification failed: '
                            'md5 mismatch, local {0} != remote {1} '
                            '(remote object has not been removed)'
                            .format(contents.get_md5sum(), etag))
                finally:
                    if fp is not None:
                        fp.close()
            if old_manifest or old_slo_manifest_paths:
                drs = []
                delobjsmap = {}
                if old_manifest:
                    scontainer, sprefix = old_manifest.split('/', 1)
                    sprefix = sprefix.rstrip('/') + '/'
                    delobjsmap[scontainer] = []
                    for part in self.list(scontainer, {'prefix': sprefix}):
                        if not part["success"]:
                            raise part["error"]
                        delobjsmap[scontainer].extend(
                            seg['name'] for seg in part['listing'])

                if old_slo_manifest_paths:
                    for seg_to_delete in old_slo_manifest_paths:
                        if seg_to_delete in new_slo_manifest_paths:
                            continue
                        scont, sobj = \
                            seg_to_delete.split(b'/', 1)
                        delobjs_cont = delobjsmap.get(scont, [])
                        delobjs_cont.append(sobj)
                        delobjsmap[scont] = delobjs_cont

                del_segs = []
                for dscont, dsobjs in delobjsmap.items():
                    for dsobj in dsobjs:
                        del_seg = self.thread_manager.segment_pool.submit(
                            self._delete_segment, dscont, dsobj,
                            results_queue=results_queue
                        )
                        del_segs.append(del_seg)

                for del_seg in interruptable_as_completed(del_segs):
                    drs.append(del_seg.result())
                res['segment_delete_results'] = drs

            # return dict for printing
            res.update({
                'success': True,
                'status': 'uploaded',
                'attempts': conn.attempts})
            return res

        except OSError as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            if err.errno == ENOENT:
                error = SwiftError('Local file %r not found' % path, exc=err)
            else:
                error = err
            res.update({
                'success': False,
                'error': error,
                'traceback': traceback,
                'error_timestamp': err_time
            })
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            })
        return res

    # Delete related methods
    #
    def delete(self, container=None, objects=None, options=None):
        """
        Delete operations on an account, optional container and optional list
        of objects.

        :param container: The container to delete or delete from.
        :param objects: The list of objects to delete.
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation::

                            {
                                'yes_all': False,
                                'leave_segments': False,
                                'prefix': None,
                                'header': [],
                            }

        :returns: A generator for returning the results of the delete
                  operations. Each result yielded from the generator is either
                  a 'delete_container', 'delete_object', 'delete_segment', or
                  'bulk_delete' dictionary containing the results of an
                  individual delete operation.

        :raises ClientException:
        :raises SwiftError:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        if container is not None:
            if objects is not None:
                if options['prefix']:
                    objects = [obj for obj in objects
                               if obj.startswith(options['prefix'])]
                rq = Queue()
                obj_dels = {}

                bulk_page_size = self._bulk_delete_page_size(objects)
                if bulk_page_size > 1:
                    page_at_a_time = n_at_a_time(objects, bulk_page_size)
                    for page_slice in page_at_a_time:
                        for obj_slice in n_groups(
                                page_slice,
                                self._options['object_dd_threads']):
                            self._bulk_delete(container, obj_slice, options,
                                              obj_dels)
                else:
                    self._per_item_delete(container, objects, options,
                                          obj_dels, rq)

                # Start a thread to watch for delete results
                Thread(
                    target=self._watch_futures, args=(obj_dels, rq)
                ).start()

                # yield results as they become available, raising the first
                # encountered exception
                res = get_from_queue(rq)
                while res is not None:
                    yield res

                    # Cancel the remaining jobs if necessary
                    if options['fail_fast'] and not res['success']:
                        for d in obj_dels.keys():
                            d.cancel()

                    res = get_from_queue(rq)
            else:
                for res in self._delete_container(container, options):
                    yield res
        else:
            if objects:
                raise SwiftError('Objects specified without container')
            if options['prefix']:
                raise SwiftError('Prefix specified without container')
            if options['yes_all']:
                cancelled = False
                containers = []
                for part in self.list():
                    if part["success"]:
                        containers.extend(c['name'] for c in part['listing'])
                    else:
                        raise part["error"]

                for con in containers:
                    if cancelled:
                        break
                    else:
                        for res in self._delete_container(
                                con, options=options):
                            yield res

                            # Cancel the remaining container deletes, but yield
                            # any pending results
                            if (not cancelled and options['fail_fast'] and
                                    not res['success']):
                                cancelled = True

    def _bulk_delete_page_size(self, objects):
        '''
        Given the iterable 'objects', will return how many items should be
        deleted at a time.

        :param objects: An iterable that supports 'len()'
        :returns: The bulk delete page size (i.e. the max number of
                  objects that can be bulk deleted at once, as reported by
                  the cluster). If bulk delete is disabled, return 1
        '''
        if len(objects) <= 2 * self._options['object_dd_threads']:
            # Not many objects; may as well delete one-by-one
            return 1

        try:
            cap_result = self.capabilities()
            if not cap_result['success']:
                # This shouldn't actually happen, but just in case we start
                # being more nuanced about our capabilities result...
                return 1
        except ClientException:
            # Old swift, presumably; assume no bulk middleware
            return 1

        swift_info = cap_result['capabilities']
        if 'bulk_delete' in swift_info:
            return swift_info['bulk_delete'].get(
                'max_deletes_per_request', 10000)
        else:
            return 1

    def _per_item_delete(self, container, objects, options, rdict, rq):
        for obj in objects:
            obj_del = self.thread_manager.object_dd_pool.submit(
                self._delete_object, container, obj, options,
                results_queue=rq
            )
            obj_details = {'container': container, 'object': obj}
            rdict[obj_del] = obj_details

    @staticmethod
    def _delete_segment(conn, container, obj, results_queue=None):
        results_dict = {}
        try:
            res = {'success': True}
            conn.delete_object(container, obj, response_dict=results_dict)
        except Exception as err:
            if not isinstance(err, ClientException) or err.http_status != 404:
                traceback, err_time = report_traceback()
                logger.exception(err)
                res = {
                    'success': False,
                    'error': err,
                    'traceback': traceback,
                    'error_timestamp': err_time
                }

        res.update({
            'action': 'delete_segment',
            'container': container,
            'object': obj,
            'attempts': conn.attempts,
            'response_dict': results_dict
        })

        if results_queue is not None:
            results_queue.put(res)
        return res

    def _delete_object(self, conn, container, obj, options,
                       results_queue=None):
        _headers = {}
        _headers = split_headers(options.get('header', []))
        res = {
            'action': 'delete_object',
            'container': container,
            'object': obj
        }
        try:
            old_manifest = None
            query_string = None

            if not options['leave_segments']:
                try:
                    headers = conn.head_object(container, obj,
                                               headers=_headers)
                    old_manifest = headers.get('x-object-manifest')
                    if config_true_value(headers.get('x-static-large-object')):
                        query_string = 'multipart-manifest=delete'
                except ClientException as err:
                    if err.http_status != 404:
                        raise

            results_dict = {}
            conn.delete_object(container, obj,
                               headers=_headers,
                               query_string=query_string,
                               response_dict=results_dict)

            if old_manifest:

                dlo_segments_deleted = True
                segment_pool = self.thread_manager.segment_pool
                s_container, s_prefix = old_manifest.split('/', 1)
                s_prefix = s_prefix.rstrip('/') + '/'

                del_segs = []
                for part in self.list(
                        container=s_container, options={'prefix': s_prefix}):
                    if part["success"]:
                        seg_list = [o["name"] for o in part["listing"]]
                    else:
                        raise part["error"]

                    for seg in seg_list:
                        del_seg = segment_pool.submit(
                            self._delete_segment, s_container,
                            seg, results_queue=results_queue
                        )
                        del_segs.append(del_seg)

                for del_seg in interruptable_as_completed(del_segs):
                    del_res = del_seg.result()
                    if not del_res["success"]:
                        dlo_segments_deleted = False

                res['dlo_segments_deleted'] = dlo_segments_deleted

            res.update({
                'success': True,
                'response_dict': results_dict,
                'attempts': conn.attempts,
            })

        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            })
            return res

        return res

    @staticmethod
    def _delete_empty_container(conn, container, options):
        results_dict = {}
        _headers = {}
        _headers = split_headers(options.get('header', []))
        try:
            conn.delete_container(container, headers=_headers,
                                  response_dict=results_dict)
            res = {'success': True}
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res = {
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            }

        res.update({
            'action': 'delete_container',
            'container': container,
            'object': None,
            'attempts': conn.attempts,
            'response_dict': results_dict
        })
        return res

    def _delete_container(self, container, options):
        try:
            for part in self.list(container=container, options=options):
                if not part["success"]:

                    raise part["error"]

                for res in self.delete(
                        container=container,
                        objects=[o['name'] for o in part['listing']],
                        options=options):
                    yield res
            if options['prefix']:
                # We're only deleting a subset of objects within the container
                return

            con_del = self.thread_manager.container_pool.submit(
                self._delete_empty_container, container, options
            )
            con_del_res = get_future_result(con_del)

        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            con_del_res = {
                'action': 'delete_container',
                'container': container,
                'object': None,
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            }

        yield con_del_res

    # Bulk methods
    #
    def _bulk_delete(self, container, objects, options, rdict):
        if objects:
            bulk_del = self.thread_manager.object_dd_pool.submit(
                self._bulkdelete, container, objects, options
            )
            bulk_details = {'container': container, 'objects': objects}
            rdict[bulk_del] = bulk_details

    @staticmethod
    def _bulkdelete(conn, container, objects, options):
        results_dict = {}
        try:
            headers = {
                'Accept': 'application/json',
                'Content-Type': 'text/plain',
            }
            res = {'container': container, 'objects': objects}
            objects = [quote(('/%s/%s' % (container, obj)).encode('utf-8'))
                       for obj in objects]
            headers, body = conn.post_account(
                headers=headers,
                query_string='bulk-delete',
                data=b''.join(obj.encode('utf-8') + b'\n' for obj in objects),
                response_dict=results_dict)
            if body:
                res.update({'success': True,
                            'result': parse_api_response(headers, body)})
            else:
                res.update({
                    'success': False,
                    'error': SwiftError(
                        'No content received on account POST. '
                        'Is the bulk operations middleware enabled?')})
        except Exception as e:
            res.update({'success': False, 'error': e})

        res.update({
            'action': 'bulk_delete',
            'attempts': conn.attempts,
            'response_dict': results_dict
        })

        return res

    # Copy related methods
    #
    def copy(self, container, objects, options=None):
        """
        Copy operations on a list of objects in a container. Destination
        containers will be created.

        :param container: The container from which to copy the objects.
        :param objects: A list of object names (strings) or SwiftCopyObject
                        instances containing an object name and an
                        options dict (can be None) to override the options for
                        that individual copy operation::

                            [
                                'object_name',
                                SwiftCopyObject(
                                    'object_name',
                                     options={
                                        'destination': '/container/object',
                                        'fresh_metadata': False,
                                        ...
                                        }),
                                ...
                            ]

                        The options dict is described below.
        :param options: A dictionary containing options to override the global
                        options specified during the service object creation.
                        These options are applied to all copy operations
                        performed by this call, unless overridden on a per
                        object basis.
                        The options "destination" and "fresh_metadata" do
                        not need to be set, in this case objects will be
                        copied onto themselves and metadata will not be
                        refreshed.
                        The option "destination" can also be specified in the
                        format '/container', in which case objects without an
                        explicit destination will be copied to the destination
                        /container/original_object_name. Combinations of
                        multiple objects and a destination in the format
                        '/container/object' is invalid. Possible options are
                        given below::

                            {
                                'meta': [],
                                'header': [],
                                'destination': '/container/object',
                                'fresh_metadata': False,
                            }

        :returns: A generator returning the results of copying the given list
                  of objects.

        :raises SwiftError:
        """
        if options is not None:
            options = dict(self._options, **options)
        else:
            options = self._options

        # Try to create the container, just in case it doesn't exist. If this
        # fails, it might just be because the user doesn't have container PUT
        # permissions, so we'll ignore any error. If there's really a problem,
        # it'll surface on the first object COPY.
        containers = set(
            next(p for p in obj.destination.split("/") if p)
            for obj in objects
            if isinstance(obj, SwiftCopyObject) and obj.destination
        )
        if options.get('destination'):
            destination_split = options['destination'].split('/')
            if destination_split[0]:
                raise SwiftError("destination must be in format /cont[/obj]")
            _str_objs = [
                o for o in objects if not isinstance(o, SwiftCopyObject)
            ]
            if len(destination_split) > 2 and len(_str_objs) > 1:
                # TODO (clayg): could be useful to copy multiple objects into
                # a destination like "/container/common/prefix/for/objects/"
                # where the trailing "/" indicates the destination option is a
                # prefix!
                raise SwiftError("Combination of multiple objects and "
                                 "destination including object is invalid")
            if destination_split[-1] == '':
                # N.B. this protects the above case
                raise SwiftError("destination can not end in a slash")
            containers.add(destination_split[1])

        policy_header = {}
        _header = split_headers(options["header"])
        if POLICY in _header:
            policy_header[POLICY] = _header[POLICY]
        create_containers = [
            self.thread_manager.container_pool.submit(
                self._create_container_job, cont, headers=policy_header)
            for cont in containers
        ]

        # wait for container creation jobs to complete before any COPY
        for r in interruptable_as_completed(create_containers):
            res = r.result()
            yield res

        copy_futures = []
        copy_objects = self._make_copy_objects(objects, options)
        for copy_object in copy_objects:
            obj = copy_object.object_name
            obj_options = copy_object.options
            destination = copy_object.destination
            fresh_metadata = copy_object.fresh_metadata
            headers = split_headers(
                options['meta'], 'X-Object-Meta-')
            # add header options to the headers object for the request.
            headers.update(
                split_headers(options['header'], ''))
            if obj_options is not None:
                if 'meta' in obj_options:
                    headers.update(
                        split_headers(
                            obj_options['meta'], 'X-Object-Meta-'
                        )
                    )
                if 'header' in obj_options:
                    headers.update(
                        split_headers(obj_options['header'], '')
                    )

            copy = self.thread_manager.object_uu_pool.submit(
                self._copy_object_job, container, obj, destination,
                headers, fresh_metadata
            )
            copy_futures.append(copy)

        for r in interruptable_as_completed(copy_futures):
            res = r.result()
            yield res

    @staticmethod
    def _make_copy_objects(objects, options):
        copy_objects = []

        for o in objects:
            if isinstance(o, string_types):
                obj = SwiftCopyObject(o, options)
                copy_objects.append(obj)
            elif isinstance(o, SwiftCopyObject):
                copy_objects.append(o)
            else:
                raise SwiftError(
                    "The copy operation takes only strings or "
                    "SwiftCopyObjects as input",
                    obj=o)

        return copy_objects

    @staticmethod
    def _copy_object_job(conn, container, obj, destination, headers,
                         fresh_metadata):
        response_dict = {}
        res = {
            'success': True,
            'action': 'copy_object',
            'container': container,
            'object': obj,
            'destination': destination,
            'headers': headers,
            'fresh_metadata': fresh_metadata,
            'response_dict': response_dict
        }
        try:
            conn.copy_object(
                container, obj, destination=destination, headers=headers,
                fresh_metadata=fresh_metadata, response_dict=response_dict)
        except Exception as err:
            traceback, err_time = report_traceback()
            logger.exception(err)
            res.update({
                'success': False,
                'error': err,
                'traceback': traceback,
                'error_timestamp': err_time
            })

        return res

    # Capabilities related methods
    #
    def capabilities(self, url=None, refresh_cache=False):
        """
        List the cluster capabilities.

        :param url: Proxy URL of the cluster to retrieve capabilities.

        :returns: A dictionary containing the capabilities of the cluster.

        :raises ClientException:
        """
        if not refresh_cache and url in self.capabilities_cache:
            return self.capabilities_cache[url]

        res = {
            'action': 'capabilities',
            'timestamp': time(),
        }

        cap = self.thread_manager.container_pool.submit(
            self._get_capabilities, url
        )
        capabilities = get_future_result(cap)
        res.update({
            'success': True,
            'capabilities': capabilities
        })
        if url is not None:
            res.update({
                'url': url
            })

        self.capabilities_cache[url] = res
        return res

    @staticmethod
    def _get_capabilities(conn, url):
        return conn.get_capabilities(url)

    # Helper methods
    #
    @staticmethod
    def _watch_futures(futures, result_queue):
        """
        Watches a dict of futures and pushes their results onto the given
        queue. We use this to wait for a set of futures which may create
        futures of their own to wait for, whilst also allowing us to
        immediately return the results of those sub-jobs.

        When all futures have completed, None is pushed to the queue

        If the future is cancelled, we use the dict to return details about
        the cancellation.
        """
        futures_only = list(futures.keys())
        for f in interruptable_as_completed(futures_only):
            try:
                r = f.result()
                if r is not None:
                    result_queue.put(r)
            except CancelledError:
                details = futures[f]
                res = details
                res['status'] = 'cancelled'
                result_queue.put(res)
            except Exception as err:
                traceback, err_time = report_traceback()
                logger.exception(err)
                details = futures[f]
                res = details
                res.update({
                    'success': False,
                    'error': err,
                    'traceback': traceback,
                    'error_timestamp': err_time
                })
                result_queue.put(res)

        result_queue.put(None)