2956 lines
		
	
	
		
			112 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2956 lines
		
	
	
		
			112 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2010-2013 OpenStack, LLC.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | |
| # implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| from __future__ import unicode_literals
 | |
| import logging
 | |
| 
 | |
| import os
 | |
| 
 | |
| from concurrent.futures import as_completed, CancelledError, TimeoutError
 | |
| from copy import deepcopy
 | |
| from errno import EEXIST, ENOENT
 | |
| from hashlib import md5
 | |
| from os import environ, makedirs, stat, utime
 | |
| from os.path import (
 | |
|     basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
 | |
| )
 | |
| from posixpath import join as urljoin
 | |
| from random import shuffle
 | |
| from time import time
 | |
| from threading import Thread
 | |
| from six import Iterator, StringIO, string_types, text_type
 | |
| from six.moves.queue import Queue
 | |
| from six.moves.queue import Empty as QueueEmpty
 | |
| from six.moves.urllib.parse import quote
 | |
| 
 | |
| import json
 | |
| 
 | |
| 
 | |
| from swiftclient import Connection
 | |
| from swiftclient.command_helpers import (
 | |
|     stat_account, stat_container, stat_object
 | |
| )
 | |
| from swiftclient.utils import (
 | |
|     config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
 | |
|     parse_api_response, report_traceback, n_groups, split_request_headers,
 | |
|     n_at_a_time
 | |
| )
 | |
| from swiftclient.exceptions import ClientException
 | |
| from swiftclient.multithreading import MultiThreadingManager
 | |
| 
 | |
| 
 | |
| DISK_BUFFER = 2 ** 16
 | |
| logger = logging.getLogger("swiftclient.service")
 | |
| 
 | |
| 
 | |
| class ResultsIterator(Iterator):
 | |
|     def __init__(self, futures):
 | |
|         self.futures = interruptable_as_completed(futures)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __next__(self):
 | |
|         next_completed_future = next(self.futures)
 | |
|         return next_completed_future.result()
 | |
| 
 | |
| 
 | |
| class SwiftError(Exception):
 | |
|     def __init__(self, value, container=None, obj=None,
 | |
|                  segment=None, exc=None):
 | |
|         self.value = value
 | |
|         self.container = container
 | |
|         self.obj = obj
 | |
|         self.segment = segment
 | |
|         self.exception = exc
 | |
| 
 | |
|     def __str__(self):
 | |
|         value = repr(self.value)
 | |
|         if self.container is not None:
 | |
|             value += " container:%s" % self.container
 | |
|         if self.obj is not None:
 | |
|             value += " object:%s" % self.obj
 | |
|         if self.segment is not None:
 | |
|             value += " segment:%s" % self.segment
 | |
|         return value
 | |
| 
 | |
| 
 | |
| def process_options(options):
 | |
|     # tolerate sloppy auth_version
 | |
|     if options.get('auth_version') == '3.0':
 | |
|         options['auth_version'] = '3'
 | |
|     elif options.get('auth_version') == '2':
 | |
|         options['auth_version'] = '2.0'
 | |
| 
 | |
|     if options.get('auth_version') not in ('2.0', '3') and not all(
 | |
|             options.get(key) for key in ('auth', 'user', 'key')):
 | |
|         # Use keystone auth if any of the new-style args are present
 | |
|         if any(options.get(k) for k in (
 | |
|                 'os_user_domain_id',
 | |
|                 'os_user_domain_name',
 | |
|                 'os_project_domain_id',
 | |
|                 'os_project_domain_name')):
 | |
|             # Use v3 if there's any reference to domains
 | |
|             options['auth_version'] = '3'
 | |
|         else:
 | |
|             options['auth_version'] = '2.0'
 | |
| 
 | |
|     # Use new-style args if old ones not present
 | |
|     if not options['auth'] and options['os_auth_url']:
 | |
|         options['auth'] = options['os_auth_url']
 | |
|     if not options['user'] and options['os_username']:
 | |
|         options['user'] = options['os_username']
 | |
|     if not options['key'] and options['os_password']:
 | |
|         options['key'] = options['os_password']
 | |
| 
 | |
|     # Specific OpenStack options
 | |
|     options['os_options'] = {
 | |
|         'user_id': options['os_user_id'],
 | |
|         'user_domain_id': options['os_user_domain_id'],
 | |
|         'user_domain_name': options['os_user_domain_name'],
 | |
|         'tenant_id': options['os_tenant_id'],
 | |
|         'tenant_name': options['os_tenant_name'],
 | |
|         'project_id': options['os_project_id'],
 | |
|         'project_name': options['os_project_name'],
 | |
|         'project_domain_id': options['os_project_domain_id'],
 | |
|         'project_domain_name': options['os_project_domain_name'],
 | |
|         'service_type': options['os_service_type'],
 | |
|         'endpoint_type': options['os_endpoint_type'],
 | |
|         'auth_token': options['os_auth_token'],
 | |
|         'object_storage_url': options['os_storage_url'],
 | |
|         'region_name': options['os_region_name'],
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_default_global_options():
 | |
|     return {
 | |
|         "snet": False,
 | |
|         "verbose": 1,
 | |
|         "debug": False,
 | |
|         "info": False,
 | |
|         "auth": environ.get('ST_AUTH'),
 | |
|         "auth_version": environ.get('ST_AUTH_VERSION', '1.0'),
 | |
|         "user": environ.get('ST_USER'),
 | |
|         "key": environ.get('ST_KEY'),
 | |
|         "retries": 5,
 | |
|         "force_auth_retry": False,
 | |
|         "os_username": environ.get('OS_USERNAME'),
 | |
|         "os_user_id": environ.get('OS_USER_ID'),
 | |
|         "os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
 | |
|         "os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
 | |
|         "os_password": environ.get('OS_PASSWORD'),
 | |
|         "os_tenant_id": environ.get('OS_TENANT_ID'),
 | |
|         "os_tenant_name": environ.get('OS_TENANT_NAME'),
 | |
|         "os_project_name": environ.get('OS_PROJECT_NAME'),
 | |
|         "os_project_id": environ.get('OS_PROJECT_ID'),
 | |
|         "os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
 | |
|         "os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
 | |
|         "os_auth_url": environ.get('OS_AUTH_URL'),
 | |
|         "os_auth_token": environ.get('OS_AUTH_TOKEN'),
 | |
|         "os_storage_url": environ.get('OS_STORAGE_URL'),
 | |
|         "os_region_name": environ.get('OS_REGION_NAME'),
 | |
|         "os_service_type": environ.get('OS_SERVICE_TYPE'),
 | |
|         "os_endpoint_type": environ.get('OS_ENDPOINT_TYPE'),
 | |
|         "os_cacert": environ.get('OS_CACERT'),
 | |
|         "os_cert": environ.get('OS_CERT'),
 | |
|         "os_key": environ.get('OS_KEY'),
 | |
|         "insecure": config_true_value(environ.get('SWIFTCLIENT_INSECURE')),
 | |
|         "ssl_compression": False,
 | |
|         'segment_threads': 10,
 | |
|         'object_dd_threads': 10,
 | |
|         'object_uu_threads': 10,
 | |
|         'container_threads': 10
 | |
|     }
 | |
| 
 | |
| _default_global_options = _build_default_global_options()
 | |
| 
 | |
| _default_local_options = {
 | |
|     'sync_to': None,
 | |
|     'sync_key': None,
 | |
|     'use_slo': False,
 | |
|     'segment_size': None,
 | |
|     'segment_container': None,
 | |
|     'leave_segments': False,
 | |
|     'changed': None,
 | |
|     'skip_identical': False,
 | |
|     'yes_all': False,
 | |
|     'read_acl': None,
 | |
|     'write_acl': None,
 | |
|     'out_file': None,
 | |
|     'out_directory': None,
 | |
|     'remove_prefix': False,
 | |
|     'no_download': False,
 | |
|     'long': False,
 | |
|     'totals': False,
 | |
|     'marker': '',
 | |
|     'header': [],
 | |
|     'meta': [],
 | |
|     'prefix': None,
 | |
|     'delimiter': None,
 | |
|     'fail_fast': False,
 | |
|     'human': False,
 | |
|     'dir_marker': False,
 | |
|     'checksum': True,
 | |
|     'shuffle': False,
 | |
|     'destination': None,
 | |
|     'fresh_metadata': False,
 | |
|     'ignore_mtime': False,
 | |
| }
 | |
| 
 | |
| POLICY = 'X-Storage-Policy'
 | |
| KNOWN_DIR_MARKERS = (
 | |
|     'application/directory',  # Preferred
 | |
|     'text/directory',  # Historically relevant
 | |
| )
 | |
| 
 | |
| 
 | |
| def get_from_queue(q, timeout=864000):
 | |
|     while True:
 | |
|         try:
 | |
|             item = q.get(timeout=timeout)
 | |
|             return item
 | |
|         except QueueEmpty:
 | |
|             # Do nothing here, we only have a timeout to allow interruption
 | |
|             pass
 | |
| 
 | |
| 
 | |
| def get_future_result(f, timeout=86400):
 | |
|     while True:
 | |
|         try:
 | |
|             res = f.result(timeout=timeout)
 | |
|             return res
 | |
|         except TimeoutError:
 | |
|             # Do nothing here, we only have a timeout to allow interruption
 | |
|             pass
 | |
| 
 | |
| 
 | |
| def interruptable_as_completed(fs, timeout=86400):
 | |
|     while True:
 | |
|         try:
 | |
|             for f in as_completed(fs, timeout=timeout):
 | |
|                 fs.remove(f)
 | |
|                 yield f
 | |
|             return
 | |
|         except TimeoutError:
 | |
|             # Do nothing here, we only have a timeout to allow interruption
 | |
|             pass
 | |
| 
 | |
| 
 | |
| def get_conn(options):
 | |
|     """
 | |
|     Return a connection building it from the options.
 | |
|     """
 | |
|     return Connection(options['auth'],
 | |
|                       options['user'],
 | |
|                       options['key'],
 | |
|                       options['retries'],
 | |
|                       auth_version=options['auth_version'],
 | |
|                       os_options=options['os_options'],
 | |
|                       snet=options['snet'],
 | |
|                       cacert=options['os_cacert'],
 | |
|                       insecure=options['insecure'],
 | |
|                       cert=options['os_cert'],
 | |
|                       cert_key=options['os_key'],
 | |
|                       ssl_compression=options['ssl_compression'],
 | |
|                       force_auth_retry=options['force_auth_retry'])
 | |
| 
 | |
| 
 | |
| def mkdirs(path):
 | |
|     try:
 | |
|         makedirs(path)
 | |
|     except OSError as err:
 | |
|         if err.errno != EEXIST:
 | |
|             raise
 | |
| 
 | |
| 
 | |
| def split_headers(options, prefix=''):
 | |
|     """
 | |
|     Splits 'Key: Value' strings and returns them as a dictionary.
 | |
| 
 | |
|     :param options: Must be one of:
 | |
|         * an iterable of 'Key: Value' strings
 | |
|         * an iterable of ('Key', 'Value') pairs
 | |
|         * a dict of {'Key': 'Value'} pairs
 | |
|     :param prefix: String to prepend to all of the keys in the dictionary.
 | |
|         reporting.
 | |
|     """
 | |
|     headers = {}
 | |
|     try:
 | |
|         headers = split_request_headers(options, prefix)
 | |
|     except ValueError as e:
 | |
|         raise SwiftError(e)
 | |
| 
 | |
|     return headers
 | |
| 
 | |
| 
 | |
| class SwiftUploadObject(object):
 | |
|     """
 | |
|     Class for specifying an object upload, allowing the object source, name and
 | |
|     options to be specified separately for each individual object.
 | |
|     """
 | |
|     def __init__(self, source, object_name=None, options=None):
 | |
|         if isinstance(source, string_types):
 | |
|             self.object_name = object_name or source
 | |
|         elif source is None or hasattr(source, 'read'):
 | |
|             if not object_name or not isinstance(object_name, string_types):
 | |
|                 raise SwiftError('Object names must be specified as '
 | |
|                                  'strings for uploads from None or file '
 | |
|                                  'like objects.')
 | |
|             self.object_name = object_name
 | |
|         else:
 | |
|             raise SwiftError('Unexpected source type for '
 | |
|                              'SwiftUploadObject: {0}'.format(type(source)))
 | |
| 
 | |
|         if not self.object_name:
 | |
|             raise SwiftError('Object names must not be empty strings')
 | |
| 
 | |
|         self.object_name = self.object_name.lstrip('/')
 | |
|         self.options = options
 | |
|         self.source = source
 | |
| 
 | |
| 
 | |
| class SwiftPostObject(object):
 | |
|     """
 | |
|     Class for specifying an object post, allowing the headers/metadata to be
 | |
|     specified separately for each individual object.
 | |
|     """
 | |
|     def __init__(self, object_name, options=None):
 | |
|         if not (isinstance(object_name, string_types) and object_name):
 | |
|             raise SwiftError(
 | |
|                 "Object names must be specified as non-empty strings"
 | |
|             )
 | |
|         self.object_name = object_name
 | |
|         self.options = options
 | |
| 
 | |
| 
 | |
| class SwiftCopyObject(object):
 | |
|     """
 | |
|     Class for specifying an object copy,
 | |
|     allowing the destination/headers/metadata/fresh_metadata to be specified
 | |
|     separately for each individual object.
 | |
|     destination and fresh_metadata should be set in options
 | |
|     """
 | |
|     def __init__(self, object_name, options=None):
 | |
|         if not (isinstance(object_name, string_types) and object_name):
 | |
|             raise SwiftError(
 | |
|                 "Object names must be specified as non-empty strings"
 | |
|             )
 | |
| 
 | |
|         self.object_name = object_name
 | |
|         self.options = options
 | |
| 
 | |
|         if self.options is None:
 | |
|             self.destination = None
 | |
|             self.fresh_metadata = False
 | |
|         else:
 | |
|             self.destination = self.options.get('destination')
 | |
|             self.fresh_metadata = self.options.get('fresh_metadata', False)
 | |
| 
 | |
|         if self.destination is not None:
 | |
|             destination_components = self.destination.split('/')
 | |
|             if destination_components[0] or len(destination_components) < 2:
 | |
|                 raise SwiftError("destination must be in format /cont[/obj]")
 | |
|             if not destination_components[-1]:
 | |
|                 raise SwiftError("destination must not end in a slash")
 | |
|             if len(destination_components) == 2:
 | |
|                 # only container set in destination
 | |
|                 self.destination = "{0}/{1}".format(
 | |
|                     self.destination, object_name
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class _SwiftReader(object):
 | |
|     """
 | |
|     Class for downloading objects from swift and raising appropriate
 | |
|     errors on failures caused by either invalid md5sum or size of the
 | |
|     data read.
 | |
|     """
 | |
|     def __init__(self, path, body, headers, checksum=True):
 | |
|         self._path = path
 | |
|         self._body = body
 | |
|         self._actual_read = 0
 | |
|         self._content_length = None
 | |
|         self._actual_md5 = None
 | |
|         self._expected_md5 = headers.get('etag', '')
 | |
| 
 | |
|         if len(self._expected_md5) > 1 and self._expected_md5[0] == '"' \
 | |
|                 and self._expected_md5[-1] == '"':
 | |
|             self._expected_md5 = self._expected_md5[1:-1]
 | |
| 
 | |
|         # Some headers indicate the MD5 of the response
 | |
|         # definitely *won't* match the ETag
 | |
|         bad_md5_headers = set([
 | |
|             'content-range',
 | |
|             'x-object-manifest',
 | |
|             'x-static-large-object',
 | |
|         ])
 | |
|         if bad_md5_headers.intersection(headers):
 | |
|             # This isn't a useful checksum
 | |
|             self._expected_md5 = ''
 | |
| 
 | |
|         if self._expected_md5 and checksum:
 | |
|             self._actual_md5 = md5()
 | |
| 
 | |
|         if 'content-length' in headers:
 | |
|             try:
 | |
|                 self._content_length = int(headers.get('content-length'))
 | |
|             except ValueError:
 | |
|                 raise SwiftError('content-length header must be an integer')
 | |
| 
 | |
|     def __iter__(self):
 | |
|         for chunk in self._body:
 | |
|             if self._actual_md5:
 | |
|                 self._actual_md5.update(chunk)
 | |
|             self._actual_read += len(chunk)
 | |
|             yield chunk
 | |
|         self._check_contents()
 | |
| 
 | |
|     def _check_contents(self):
 | |
|         if self._actual_md5 and self._expected_md5:
 | |
|             etag = self._actual_md5.hexdigest()
 | |
|             if etag != self._expected_md5:
 | |
|                 raise SwiftError('Error downloading {0}: md5sum != etag, '
 | |
|                                  '{1} != {2}'.format(
 | |
|                                      self._path, etag, self._expected_md5))
 | |
| 
 | |
|         if (self._content_length is not None and
 | |
|                 self._actual_read != self._content_length):
 | |
|             raise SwiftError('Error downloading {0}: read_length != '
 | |
|                              'content_length, {1:d} != {2:d}'.format(
 | |
|                                  self._path, self._actual_read,
 | |
|                                  self._content_length))
 | |
| 
 | |
|     def bytes_read(self):
 | |
|         return self._actual_read
 | |
| 
 | |
| 
 | |
| class SwiftService(object):
 | |
|     """
 | |
|     Service for performing swift operations
 | |
|     """
 | |
|     def __init__(self, options=None):
 | |
|         if options is not None:
 | |
|             self._options = dict(
 | |
|                 _default_global_options,
 | |
|                 **dict(_default_local_options, **options)
 | |
|             )
 | |
|         else:
 | |
|             self._options = dict(
 | |
|                 _default_global_options,
 | |
|                 **_default_local_options
 | |
|             )
 | |
|         process_options(self._options)
 | |
| 
 | |
|         def create_connection():
 | |
|             return get_conn(self._options)
 | |
|         self.thread_manager = MultiThreadingManager(
 | |
|             create_connection,
 | |
|             segment_threads=self._options['segment_threads'],
 | |
|             object_dd_threads=self._options['object_dd_threads'],
 | |
|             object_uu_threads=self._options['object_uu_threads'],
 | |
|             container_threads=self._options['container_threads']
 | |
|         )
 | |
|         self.capabilities_cache = {}  # Each instance should have its own cache
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.thread_manager.__enter__()
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, exc_type, exc_val, exc_tb):
 | |
|         self.thread_manager.__exit__(exc_type, exc_val, exc_tb)
 | |
| 
 | |
|     # Stat related methods
 | |
|     #
 | |
|     def stat(self, container=None, objects=None, options=None):
 | |
|         """
 | |
|         Get account stats, container stats or information about a list of
 | |
|         objects in a container.
 | |
| 
 | |
|         :param container: The container to query.
 | |
|         :param objects: A list of object paths about which to return
 | |
|                         information (a list of strings).
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation.
 | |
|                         These options are applied to all stat operations
 | |
|                         performed by this call::
 | |
| 
 | |
|                             {
 | |
|                                 'human': False,
 | |
|                                 'header': []
 | |
|                             }
 | |
| 
 | |
|         :returns: Either a single dictionary containing stats about an account
 | |
|                   or container, or an iterator for returning the results of the
 | |
|                   stat operations on a list of objects.
 | |
| 
 | |
|         :raises SwiftError:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         if not container:
 | |
|             if objects:
 | |
|                 raise SwiftError('Objects specified without container')
 | |
|             else:
 | |
|                 res = {
 | |
|                     'action': 'stat_account',
 | |
|                     'success': True,
 | |
|                     'container': container,
 | |
|                     'object': None,
 | |
|                 }
 | |
|                 try:
 | |
|                     stats_future = self.thread_manager.container_pool.submit(
 | |
|                         stat_account, options
 | |
|                     )
 | |
|                     items, headers = get_future_result(stats_future)
 | |
|                     res.update({
 | |
|                         'items': items,
 | |
|                         'headers': headers
 | |
|                     })
 | |
|                     return res
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         res.update({
 | |
|                             'success': False,
 | |
|                             'error': err,
 | |
|                             'traceback': traceback,
 | |
|                             'error_timestamp': err_time
 | |
|                         })
 | |
|                         return res
 | |
|                     raise SwiftError('Account not found', exc=err)
 | |
|                 except Exception as err:
 | |
|                     traceback, err_time = report_traceback()
 | |
|                     logger.exception(err)
 | |
|                     res.update({
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'traceback': traceback,
 | |
|                         'error_timestamp': err_time
 | |
|                     })
 | |
|                     return res
 | |
|         else:
 | |
|             if not objects:
 | |
|                 res = {
 | |
|                     'action': 'stat_container',
 | |
|                     'container': container,
 | |
|                     'object': None,
 | |
|                     'success': True,
 | |
|                 }
 | |
|                 try:
 | |
|                     stats_future = self.thread_manager.container_pool.submit(
 | |
|                         stat_container, options, container
 | |
|                     )
 | |
|                     items, headers = get_future_result(stats_future)
 | |
|                     res.update({
 | |
|                         'items': items,
 | |
|                         'headers': headers
 | |
|                     })
 | |
|                     return res
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         res.update({
 | |
|                             'success': False,
 | |
|                             'error': err,
 | |
|                             'traceback': traceback,
 | |
|                             'error_timestamp': err_time
 | |
|                         })
 | |
|                         return res
 | |
|                     raise SwiftError('Container %r not found' % container,
 | |
|                                      container=container, exc=err)
 | |
|                 except Exception as err:
 | |
|                     traceback, err_time = report_traceback()
 | |
|                     logger.exception(err)
 | |
|                     res.update({
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'traceback': traceback,
 | |
|                         'error_timestamp': err_time
 | |
|                     })
 | |
|                     return res
 | |
|             else:
 | |
|                 stat_futures = []
 | |
|                 for stat_o in objects:
 | |
|                     stat_future = self.thread_manager.object_dd_pool.submit(
 | |
|                         self._stat_object, container, stat_o, options
 | |
|                     )
 | |
|                     stat_futures.append(stat_future)
 | |
| 
 | |
|                 return ResultsIterator(stat_futures)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _stat_object(conn, container, obj, options):
 | |
|         res = {
 | |
|             'action': 'stat_object',
 | |
|             'object': obj,
 | |
|             'container': container,
 | |
|             'success': True,
 | |
|         }
 | |
|         try:
 | |
|             items, headers = stat_object(conn, options, container, obj)
 | |
|             res.update({
 | |
|                 'items': items,
 | |
|                 'headers': headers
 | |
|             })
 | |
|             return res
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
|             return res
 | |
| 
 | |
|     # Post related methods
 | |
|     #
 | |
|     def post(self, container=None, objects=None, options=None):
 | |
|         """
 | |
|         Post operations on an account, container or list of objects
 | |
| 
 | |
|         :param container: The container to make the post operation against.
 | |
|         :param objects: A list of object names (strings) or SwiftPostObject
 | |
|                         instances containing an object name, and an
 | |
|                         options dict (can be None) to override the options for
 | |
|                         that individual post operation::
 | |
| 
 | |
|                             [
 | |
|                                 'object_name',
 | |
|                                 SwiftPostObject('object_name', options={...}),
 | |
|                                 ...
 | |
|                             ]
 | |
| 
 | |
|                         The options dict is described below.
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation.
 | |
|                         These options are applied to all post operations
 | |
|                         performed by this call, unless overridden on a per
 | |
|                         object basis. Possible options are given below::
 | |
| 
 | |
|                             {
 | |
|                                 'meta': [],
 | |
|                                 'header': [],
 | |
|                                 'read_acl': None,   # For containers only
 | |
|                                 'write_acl': None,  # For containers only
 | |
|                                 'sync_to': None,    # For containers only
 | |
|                                 'sync_key': None    # For containers only
 | |
|                             }
 | |
| 
 | |
|         :returns: Either a single result dictionary in the case of a post to a
 | |
|                   container/account, or an iterator for returning the results
 | |
|                   of posts to a list of objects.
 | |
| 
 | |
|         :raises SwiftError:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         res = {
 | |
|             'success': True,
 | |
|             'container': container,
 | |
|             'object': None,
 | |
|             'headers': {},
 | |
|         }
 | |
|         if not container:
 | |
|             res["action"] = "post_account"
 | |
|             if objects:
 | |
|                 raise SwiftError('Objects specified without container')
 | |
|             else:
 | |
|                 response_dict = {}
 | |
|                 headers = split_headers(
 | |
|                     options['meta'], 'X-Account-Meta-')
 | |
|                 headers.update(
 | |
|                     split_headers(options['header'], ''))
 | |
|                 res['headers'] = headers
 | |
|                 try:
 | |
|                     post = self.thread_manager.container_pool.submit(
 | |
|                         self._post_account_job, headers, response_dict
 | |
|                     )
 | |
|                     get_future_result(post)
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         res.update({
 | |
|                             'success': False,
 | |
|                             'error': err,
 | |
|                             'traceback': traceback,
 | |
|                             'error_timestamp': err_time,
 | |
|                             'response_dict': response_dict
 | |
|                         })
 | |
|                         return res
 | |
|                     raise SwiftError('Account not found', exc=err)
 | |
|                 except Exception as err:
 | |
|                     traceback, err_time = report_traceback()
 | |
|                     logger.exception(err)
 | |
|                     res.update({
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'response_dict': response_dict,
 | |
|                         'traceback': traceback,
 | |
|                         'error_timestamp': err_time
 | |
|                     })
 | |
|             return res
 | |
|         if not objects:
 | |
|             res["action"] = "post_container"
 | |
|             response_dict = {}
 | |
|             headers = split_headers(
 | |
|                 options['meta'], 'X-Container-Meta-')
 | |
|             headers.update(
 | |
|                 split_headers(options['header'], ''))
 | |
|             if options['read_acl'] is not None:
 | |
|                 headers['X-Container-Read'] = options['read_acl']
 | |
|             if options['write_acl'] is not None:
 | |
|                 headers['X-Container-Write'] = options['write_acl']
 | |
|             if options['sync_to'] is not None:
 | |
|                 headers['X-Container-Sync-To'] = options['sync_to']
 | |
|             if options['sync_key'] is not None:
 | |
|                 headers['X-Container-Sync-Key'] = options['sync_key']
 | |
|             res['headers'] = headers
 | |
|             try:
 | |
|                 post = self.thread_manager.container_pool.submit(
 | |
|                     self._post_container_job, container,
 | |
|                     headers, response_dict
 | |
|                 )
 | |
|                 get_future_result(post)
 | |
|             except ClientException as err:
 | |
|                 if err.http_status != 404:
 | |
|                     traceback, err_time = report_traceback()
 | |
|                     logger.exception(err)
 | |
|                     res.update({
 | |
|                         'action': 'post_container',
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'traceback': traceback,
 | |
|                         'error_timestamp': err_time,
 | |
|                         'response_dict': response_dict
 | |
|                     })
 | |
|                     return res
 | |
|                 raise SwiftError(
 | |
|                     "Container '%s' not found" % container,
 | |
|                     container=container, exc=err
 | |
|                 )
 | |
|             except Exception as err:
 | |
|                 traceback, err_time = report_traceback()
 | |
|                 logger.exception(err)
 | |
|                 res.update({
 | |
|                     'action': 'post_container',
 | |
|                     'success': False,
 | |
|                     'error': err,
 | |
|                     'response_dict': response_dict,
 | |
|                     'traceback': traceback,
 | |
|                     'error_timestamp': err_time
 | |
|                 })
 | |
|             return res
 | |
|         else:
 | |
|             post_futures = []
 | |
|             post_objects = self._make_post_objects(objects)
 | |
|             for post_object in post_objects:
 | |
|                 obj = post_object.object_name
 | |
|                 obj_options = post_object.options
 | |
|                 response_dict = {}
 | |
|                 headers = split_headers(
 | |
|                     options['meta'], 'X-Object-Meta-')
 | |
|                 # add header options to the headers object for the request.
 | |
|                 headers.update(
 | |
|                     split_headers(options['header'], ''))
 | |
|                 if obj_options is not None:
 | |
|                     if 'meta' in obj_options:
 | |
|                         headers.update(
 | |
|                             split_headers(
 | |
|                                 obj_options['meta'], 'X-Object-Meta-'
 | |
|                             )
 | |
|                         )
 | |
|                     if 'header' in obj_options:
 | |
|                         headers.update(
 | |
|                             split_headers(obj_options['header'], '')
 | |
|                         )
 | |
| 
 | |
|                 post = self.thread_manager.object_uu_pool.submit(
 | |
|                     self._post_object_job, container, obj,
 | |
|                     headers, response_dict
 | |
|                 )
 | |
|                 post_futures.append(post)
 | |
| 
 | |
|             return ResultsIterator(post_futures)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _make_post_objects(objects):
 | |
|         post_objects = []
 | |
| 
 | |
|         for o in objects:
 | |
|             if isinstance(o, string_types):
 | |
|                 obj = SwiftPostObject(o)
 | |
|                 post_objects.append(obj)
 | |
|             elif isinstance(o, SwiftPostObject):
 | |
|                 post_objects.append(o)
 | |
|             else:
 | |
|                 raise SwiftError(
 | |
|                     "The post operation takes only strings or "
 | |
|                     "SwiftPostObjects as input",
 | |
|                     obj=o)
 | |
| 
 | |
|         return post_objects
 | |
| 
 | |
|     @staticmethod
 | |
|     def _post_account_job(conn, headers, result):
 | |
|         return conn.post_account(headers=headers, response_dict=result)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _post_container_job(conn, container, headers, result):
 | |
|         try:
 | |
|             res = conn.post_container(
 | |
|                 container, headers=headers, response_dict=result)
 | |
|         except ClientException as err:
 | |
|             if err.http_status != 404:
 | |
|                 raise
 | |
|             _response_dict = {}
 | |
|             res = conn.put_container(
 | |
|                 container, headers=headers, response_dict=_response_dict)
 | |
|             result['post_put'] = _response_dict
 | |
|         return res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _post_object_job(conn, container, obj, headers, result):
 | |
|         res = {
 | |
|             'success': True,
 | |
|             'action': 'post_object',
 | |
|             'container': container,
 | |
|             'object': obj,
 | |
|             'headers': headers,
 | |
|             'response_dict': result
 | |
|         }
 | |
|         try:
 | |
|             conn.post_object(
 | |
|                 container, obj, headers=headers, response_dict=result)
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
| 
 | |
|         return res
 | |
| 
 | |
|     # List related methods
 | |
|     #
 | |
|     def list(self, container=None, options=None):
 | |
|         """
 | |
|         List operations on an account, container.
 | |
| 
 | |
|         :param container: The container to make the list operation against.
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation::
 | |
| 
 | |
|                             {
 | |
|                                 'long': False,
 | |
|                                 'prefix': None,
 | |
|                                 'delimiter': None,
 | |
|                                 'header': []
 | |
|                             }
 | |
| 
 | |
|         :returns: A generator for returning the results of the list operation
 | |
|                   on an account or container. Each result yielded from the
 | |
|                   generator is either a 'list_account_part' or
 | |
|                   'list_container_part', containing part of the listing.
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         rq = Queue(maxsize=10)  # Just stop list running away consuming memory
 | |
| 
 | |
|         if container is None:
 | |
|             listing_future = self.thread_manager.container_pool.submit(
 | |
|                 self._list_account_job, options, rq
 | |
|             )
 | |
|         else:
 | |
|             listing_future = self.thread_manager.container_pool.submit(
 | |
|                 self._list_container_job, container, options, rq
 | |
|             )
 | |
| 
 | |
|         res = get_from_queue(rq)
 | |
|         while res is not None:
 | |
|             yield res
 | |
|             res = get_from_queue(rq)
 | |
| 
 | |
|         # Make sure the future has completed
 | |
|         get_future_result(listing_future)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _list_account_job(conn, options, result_queue):
 | |
|         marker = ''
 | |
|         error = None
 | |
|         req_headers = split_headers(options.get('header', []))
 | |
|         try:
 | |
|             while True:
 | |
|                 _, items = conn.get_account(
 | |
|                     marker=marker, prefix=options['prefix'],
 | |
|                     headers=req_headers
 | |
|                 )
 | |
| 
 | |
|                 if not items:
 | |
|                     result_queue.put(None)
 | |
|                     return
 | |
| 
 | |
|                 if options['long']:
 | |
|                     for i in items:
 | |
|                         name = i['name']
 | |
|                         i['meta'] = conn.head_container(name)
 | |
| 
 | |
|                 res = {
 | |
|                     'action': 'list_account_part',
 | |
|                     'container': None,
 | |
|                     'prefix': options['prefix'],
 | |
|                     'success': True,
 | |
|                     'listing': items,
 | |
|                     'marker': marker,
 | |
|                 }
 | |
|                 result_queue.put(res)
 | |
| 
 | |
|                 marker = items[-1].get('name', items[-1].get('subdir'))
 | |
|         except ClientException as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             if err.http_status != 404:
 | |
|                 error = (err, traceback, err_time)
 | |
|             else:
 | |
|                 error = (
 | |
|                     SwiftError('Account not found', exc=err),
 | |
|                     traceback, err_time
 | |
|                 )
 | |
| 
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             error = (err, traceback, err_time)
 | |
| 
 | |
|         res = {
 | |
|             'action': 'list_account_part',
 | |
|             'container': None,
 | |
|             'prefix': options['prefix'],
 | |
|             'success': False,
 | |
|             'marker': marker,
 | |
|             'error': error[0],
 | |
|             'traceback': error[1],
 | |
|             'error_timestamp': error[2]
 | |
|         }
 | |
|         result_queue.put(res)
 | |
|         result_queue.put(None)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _list_container_job(conn, container, options, result_queue):
 | |
|         marker = options.get('marker', '')
 | |
|         error = None
 | |
|         req_headers = split_headers(options.get('header', []))
 | |
|         try:
 | |
|             while True:
 | |
|                 _, items = conn.get_container(
 | |
|                     container, marker=marker, prefix=options['prefix'],
 | |
|                     delimiter=options['delimiter'], headers=req_headers
 | |
|                 )
 | |
| 
 | |
|                 if not items:
 | |
|                     result_queue.put(None)
 | |
|                     return
 | |
| 
 | |
|                 res = {
 | |
|                     'action': 'list_container_part',
 | |
|                     'container': container,
 | |
|                     'prefix': options['prefix'],
 | |
|                     'success': True,
 | |
|                     'marker': marker,
 | |
|                     'listing': items,
 | |
|                 }
 | |
|                 result_queue.put(res)
 | |
| 
 | |
|                 marker = items[-1].get('name', items[-1].get('subdir'))
 | |
|         except ClientException as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             if err.http_status != 404:
 | |
|                 error = (err, traceback, err_time)
 | |
|             else:
 | |
|                 error = (
 | |
|                     SwiftError(
 | |
|                         'Container %r not found' % container,
 | |
|                         container=container, exc=err
 | |
|                     ),
 | |
|                     traceback,
 | |
|                     err_time
 | |
|                 )
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             error = (err, traceback, err_time)
 | |
| 
 | |
|         res = {
 | |
|             'action': 'list_container_part',
 | |
|             'container': container,
 | |
|             'prefix': options['prefix'],
 | |
|             'success': False,
 | |
|             'marker': marker,
 | |
|             'error': error[0],
 | |
|             'traceback': error[1],
 | |
|             'error_timestamp': error[2]
 | |
|         }
 | |
|         result_queue.put(res)
 | |
|         result_queue.put(None)
 | |
| 
 | |
|     # Download related methods
 | |
|     #
 | |
|     def download(self, container=None, objects=None, options=None):
 | |
|         """
 | |
|         Download operations on an account, optional container and optional list
 | |
|         of objects.
 | |
| 
 | |
|         :param container: The container to download from.
 | |
|         :param objects: A list of object names to download (a list of strings).
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation::
 | |
| 
 | |
|                             {
 | |
|                                 'yes_all': False,
 | |
|                                 'marker': '',
 | |
|                                 'prefix': None,
 | |
|                                 'no_download': False,
 | |
|                                 'header': [],
 | |
|                                 'skip_identical': False,
 | |
|                                 'out_directory': None,
 | |
|                                 'checksum': True,
 | |
|                                 'out_file': None,
 | |
|                                 'remove_prefix': False,
 | |
|                                 'shuffle' : False
 | |
|                             }
 | |
| 
 | |
|         :returns: A generator for returning the results of the download
 | |
|                   operations. Each result yielded from the generator is a
 | |
|                   'download_object' dictionary containing the results of an
 | |
|                   individual file download.
 | |
| 
 | |
|         :raises ClientException:
 | |
|         :raises SwiftError:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         if not container:
 | |
|             # Download everything if options['yes_all'] is set
 | |
|             if options['yes_all']:
 | |
|                 try:
 | |
|                     options_copy = deepcopy(options)
 | |
|                     options_copy["long"] = False
 | |
| 
 | |
|                     for part in self.list(options=options_copy):
 | |
|                         if part["success"]:
 | |
|                             containers = [i['name'] for i in part["listing"]]
 | |
| 
 | |
|                             if options['shuffle']:
 | |
|                                 shuffle(containers)
 | |
| 
 | |
|                             for con in containers:
 | |
|                                 for res in self._download_container(
 | |
|                                         con, options_copy):
 | |
|                                     yield res
 | |
|                         else:
 | |
|                             raise part["error"]
 | |
| 
 | |
|                 # If we see a 404 here, the listing of the account failed
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         raise
 | |
|                     raise SwiftError('Account not found', exc=err)
 | |
| 
 | |
|         elif objects is None:
 | |
|             if '/' in container:
 | |
|                 raise SwiftError('\'/\' in container name',
 | |
|                                  container=container)
 | |
|             for res in self._download_container(container, options):
 | |
|                 yield res
 | |
| 
 | |
|         else:
 | |
|             if '/' in container:
 | |
|                 raise SwiftError('\'/\' in container name',
 | |
|                                  container=container)
 | |
|             if options['out_file'] and len(objects) > 1:
 | |
|                 options['out_file'] = None
 | |
| 
 | |
|             o_downs = [
 | |
|                 self.thread_manager.object_dd_pool.submit(
 | |
|                     self._download_object_job, container, obj, options
 | |
|                 ) for obj in objects
 | |
|             ]
 | |
| 
 | |
|             for o_down in interruptable_as_completed(o_downs):
 | |
|                 yield o_down.result()
 | |
| 
 | |
|     def _download_object_job(self, conn, container, obj, options):
 | |
|         out_file = options['out_file']
 | |
|         results_dict = {}
 | |
| 
 | |
|         req_headers = split_headers(options['header'], '')
 | |
| 
 | |
|         pseudodir = False
 | |
|         path = join(container, obj) if options['yes_all'] else obj
 | |
|         path = path.lstrip(os_path_sep)
 | |
|         options['skip_identical'] = (options['skip_identical'] and
 | |
|                                      out_file != '-')
 | |
| 
 | |
|         if options['prefix'] and options['remove_prefix']:
 | |
|             path = path[len(options['prefix']):].lstrip('/')
 | |
| 
 | |
|         if options['out_directory']:
 | |
|             path = os.path.join(options['out_directory'], path)
 | |
| 
 | |
|         if options['skip_identical']:
 | |
|             filename = out_file if out_file else path
 | |
|             try:
 | |
|                 fp = open(filename, 'rb', DISK_BUFFER)
 | |
|             except IOError:
 | |
|                 pass
 | |
|             else:
 | |
|                 with fp:
 | |
|                     md5sum = md5()
 | |
|                     while True:
 | |
|                         data = fp.read(DISK_BUFFER)
 | |
|                         if not data:
 | |
|                             break
 | |
|                         md5sum.update(data)
 | |
|                     req_headers['If-None-Match'] = md5sum.hexdigest()
 | |
| 
 | |
|         try:
 | |
|             start_time = time()
 | |
|             get_args = {'resp_chunk_size': DISK_BUFFER,
 | |
|                         'headers': req_headers,
 | |
|                         'response_dict': results_dict}
 | |
|             if options['skip_identical']:
 | |
|                 # Assume the file is a large object; if we're wrong, the query
 | |
|                 # string is ignored and the If-None-Match header will trigger
 | |
|                 # the behavior we want
 | |
|                 get_args['query_string'] = 'multipart-manifest=get'
 | |
| 
 | |
|             try:
 | |
|                 headers, body = conn.get_object(container, obj, **get_args)
 | |
|             except ClientException as e:
 | |
|                 if not options['skip_identical']:
 | |
|                     raise
 | |
|                 if e.http_status != 304:  # Only handling Not Modified
 | |
|                     raise
 | |
| 
 | |
|                 headers = results_dict['headers']
 | |
|                 if 'x-object-manifest' in headers:
 | |
|                     # DLO: most likely it has more than one page worth of
 | |
|                     #      segments and we have an empty file locally
 | |
|                     body = []
 | |
|                 elif config_true_value(headers.get('x-static-large-object')):
 | |
|                     # SLO: apparently we have a copy of the manifest locally?
 | |
|                     #      provide no chunking data to force a fresh download
 | |
|                     body = [b'[]']
 | |
|                 else:
 | |
|                     # Normal object: let it bubble up
 | |
|                     raise
 | |
| 
 | |
|             if options['skip_identical']:
 | |
|                 if config_true_value(headers.get('x-static-large-object')) or \
 | |
|                         'x-object-manifest' in headers:
 | |
|                     # The request was chunked, so stitch it back together
 | |
|                     chunk_data = self._get_chunk_data(conn, container, obj,
 | |
|                                                       headers, b''.join(body))
 | |
|                 else:
 | |
|                     chunk_data = None
 | |
| 
 | |
|                 if chunk_data is not None:
 | |
|                     if self._is_identical(chunk_data, filename):
 | |
|                         raise ClientException('Large object is identical',
 | |
|                                               http_status=304)
 | |
| 
 | |
|                     # Large objects are different; start the real download
 | |
|                     del get_args['query_string']
 | |
|                     get_args['response_dict'].clear()
 | |
|                     headers, body = conn.get_object(container, obj, **get_args)
 | |
| 
 | |
|             headers_receipt = time()
 | |
| 
 | |
|             obj_body = _SwiftReader(path, body, headers,
 | |
|                                     options.get('checksum', True))
 | |
| 
 | |
|             no_file = options['no_download']
 | |
|             if out_file == "-" and not no_file:
 | |
|                 res = {
 | |
|                     'action': 'download_object',
 | |
|                     'container': container,
 | |
|                     'object': obj,
 | |
|                     'path': path,
 | |
|                     'pseudodir': pseudodir,
 | |
|                     'contents': obj_body
 | |
|                 }
 | |
|                 return res
 | |
| 
 | |
|             fp = None
 | |
|             try:
 | |
|                 content_type = headers.get('content-type', '').split(';', 1)[0]
 | |
|                 if content_type in KNOWN_DIR_MARKERS:
 | |
|                     make_dir = not no_file and out_file != "-"
 | |
|                     if make_dir and not isdir(path):
 | |
|                         mkdirs(path)
 | |
| 
 | |
|                 else:
 | |
|                     make_dir = not (no_file or out_file)
 | |
|                     if make_dir:
 | |
|                         dirpath = dirname(path)
 | |
|                         if dirpath and not isdir(dirpath):
 | |
|                             mkdirs(dirpath)
 | |
| 
 | |
|                     if not no_file:
 | |
|                         if out_file:
 | |
|                             fp = open(out_file, 'wb', DISK_BUFFER)
 | |
|                         else:
 | |
|                             if basename(path):
 | |
|                                 fp = open(path, 'wb', DISK_BUFFER)
 | |
|                             else:
 | |
|                                 pseudodir = True
 | |
| 
 | |
|                 for chunk in obj_body:
 | |
|                     if fp is not None:
 | |
|                         fp.write(chunk)
 | |
| 
 | |
|                 finish_time = time()
 | |
| 
 | |
|             finally:
 | |
|                 bytes_read = obj_body.bytes_read()
 | |
|                 if fp is not None:
 | |
|                     fp.close()
 | |
|                     if ('x-object-meta-mtime' in headers and not no_file and
 | |
|                             not options['ignore_mtime']):
 | |
|                         try:
 | |
|                             mtime = float(headers['x-object-meta-mtime'])
 | |
|                         except ValueError:
 | |
|                             pass  # no real harm; couldn't trust it anyway
 | |
|                         else:
 | |
|                             if options['out_file']:
 | |
|                                 utime(options['out_file'], (mtime, mtime))
 | |
|                             else:
 | |
|                                 utime(path, (mtime, mtime))
 | |
| 
 | |
|             res = {
 | |
|                 'action': 'download_object',
 | |
|                 'success': True,
 | |
|                 'container': container,
 | |
|                 'object': obj,
 | |
|                 'path': path,
 | |
|                 'pseudodir': pseudodir,
 | |
|                 'start_time': start_time,
 | |
|                 'finish_time': finish_time,
 | |
|                 'headers_receipt': headers_receipt,
 | |
|                 'auth_end_time': conn.auth_end_time,
 | |
|                 'read_length': bytes_read,
 | |
|                 'attempts': conn.attempts,
 | |
|                 'response_dict': results_dict
 | |
|             }
 | |
|             return res
 | |
| 
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res = {
 | |
|                 'action': 'download_object',
 | |
|                 'container': container,
 | |
|                 'object': obj,
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time,
 | |
|                 'response_dict': results_dict,
 | |
|                 'path': path,
 | |
|                 'pseudodir': pseudodir,
 | |
|                 'attempts': conn.attempts
 | |
|             }
 | |
|             return res
 | |
| 
 | |
|     def _submit_page_downloads(self, container, page_generator, options):
 | |
|         try:
 | |
|             list_page = next(page_generator)
 | |
|         except StopIteration:
 | |
|             return None
 | |
| 
 | |
|         if list_page["success"]:
 | |
|             objects = [o["name"] for o in list_page["listing"]]
 | |
| 
 | |
|             if options["shuffle"]:
 | |
|                 shuffle(objects)
 | |
| 
 | |
|             o_downs = [
 | |
|                 self.thread_manager.object_dd_pool.submit(
 | |
|                     self._download_object_job, container, obj, options
 | |
|                 ) for obj in objects
 | |
|             ]
 | |
| 
 | |
|             return o_downs
 | |
|         else:
 | |
|             raise list_page["error"]
 | |
| 
 | |
|     def _download_container(self, container, options):
 | |
|         _page_generator = self.list(container=container, options=options)
 | |
|         try:
 | |
|             next_page_downs = self._submit_page_downloads(
 | |
|                 container, _page_generator, options
 | |
|             )
 | |
|         except ClientException as err:
 | |
|             if err.http_status != 404:
 | |
|                 raise
 | |
|             raise SwiftError(
 | |
|                 'Container %r not found' % container,
 | |
|                 container=container, exc=err
 | |
|             )
 | |
| 
 | |
|         error = None
 | |
|         while next_page_downs:
 | |
|             page_downs = next_page_downs
 | |
|             next_page_downs = None
 | |
| 
 | |
|             # Start downloading the next page of list results when
 | |
|             # we have completed 80% of the previous page
 | |
|             next_page_triggered = False
 | |
|             next_page_trigger_point = 0.8 * len(page_downs)
 | |
| 
 | |
|             page_results_yielded = 0
 | |
|             for o_down in interruptable_as_completed(page_downs):
 | |
|                 yield o_down.result()
 | |
| 
 | |
|                 # Do we need to start the next set of downloads yet?
 | |
|                 if not next_page_triggered:
 | |
|                     page_results_yielded += 1
 | |
|                     if page_results_yielded >= next_page_trigger_point:
 | |
|                         try:
 | |
|                             next_page_downs = self._submit_page_downloads(
 | |
|                                 container, _page_generator, options
 | |
|                             )
 | |
|                         except ClientException as err:
 | |
|                             # Allow the current page to finish downloading
 | |
|                             logger.exception(err)
 | |
|                             error = err
 | |
|                         except Exception:
 | |
|                             # Something unexpected went wrong - cancel
 | |
|                             # remaining downloads
 | |
|                             for _d in page_downs:
 | |
|                                 _d.cancel()
 | |
|                             raise
 | |
|                         finally:
 | |
|                             # Stop counting and testing
 | |
|                             next_page_triggered = True
 | |
| 
 | |
|         if error:
 | |
|             raise error
 | |
| 
 | |
|     # Upload related methods
 | |
|     #
 | |
|     def upload(self, container, objects, options=None):
 | |
|         """
 | |
|         Upload a list of objects to a given container.
 | |
| 
 | |
|         :param container: The container (or pseudo-folder path) to put the
 | |
|                           uploads into.
 | |
|         :param objects: A list of file/directory names (strings) or
 | |
|                         SwiftUploadObject instances containing a source for the
 | |
|                         created object, an object name, and an options dict
 | |
|                         (can be None) to override the options for that
 | |
|                         individual upload operation::
 | |
| 
 | |
|                             [
 | |
|                                 '/path/to/file',
 | |
|                                 SwiftUploadObject('/path', object_name='obj1'),
 | |
|                                 ...
 | |
|                             ]
 | |
| 
 | |
|                         The options dict is as described below.
 | |
| 
 | |
|                         The SwiftUploadObject source may be one of:
 | |
| 
 | |
|                             * A file-like object (with a read method)
 | |
|                             * A string containing the path to a local
 | |
|                               file or directory
 | |
|                             * None, to indicate that we want an empty object
 | |
| 
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation.
 | |
|                         These options are applied to all upload operations
 | |
|                         performed by this call, unless overridden on a per
 | |
|                         object basis. Possible options are given below::
 | |
| 
 | |
|                             {
 | |
|                                 'meta': [],
 | |
|                                 'header': [],
 | |
|                                 'segment_size': None,
 | |
|                                 'use_slo': False,
 | |
|                                 'segment_container': None,
 | |
|                                 'leave_segments': False,
 | |
|                                 'changed': None,
 | |
|                                 'skip_identical': False,
 | |
|                                 'fail_fast': False,
 | |
|                                 'dir_marker': False  # Only for None sources
 | |
|                             }
 | |
| 
 | |
|         :returns: A generator for returning the results of the uploads.
 | |
| 
 | |
|         :raises SwiftError:
 | |
|         :raises ClientException:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         try:
 | |
|             segment_size = int(0 if options['segment_size'] is None else
 | |
|                                options['segment_size'])
 | |
|         except ValueError:
 | |
|             raise SwiftError('Segment size should be an integer value')
 | |
| 
 | |
|         # Incase we have a psudeo-folder path for <container> arg, derive
 | |
|         # the container name from the top path and prepend the rest to
 | |
|         # the object name. (same as passing --object-name).
 | |
|         container, _sep, pseudo_folder = container.partition('/')
 | |
| 
 | |
|         # Try to create the container, just in case it doesn't exist. If this
 | |
|         # fails, it might just be because the user doesn't have container PUT
 | |
|         # permissions, so we'll ignore any error. If there's really a problem,
 | |
|         # it'll surface on the first object PUT.
 | |
|         policy_header = {}
 | |
|         _header = split_headers(options["header"])
 | |
|         if POLICY in _header:
 | |
|             policy_header[POLICY] = \
 | |
|                 _header[POLICY]
 | |
|         create_containers = [
 | |
|             self.thread_manager.container_pool.submit(
 | |
|                 self._create_container_job, container, headers=policy_header)
 | |
|         ]
 | |
| 
 | |
|         # wait for first container job to complete before possibly attempting
 | |
|         # segment container job because segment container job may attempt
 | |
|         # to HEAD the first container
 | |
|         for r in interruptable_as_completed(create_containers):
 | |
|             res = r.result()
 | |
|             yield res
 | |
| 
 | |
|         if segment_size:
 | |
|             seg_container = container + '_segments'
 | |
|             if options['segment_container']:
 | |
|                 seg_container = options['segment_container']
 | |
|             if seg_container != container:
 | |
|                 if not policy_header:
 | |
|                     # Since no storage policy was specified on the command
 | |
|                     # line, rather than just letting swift pick the default
 | |
|                     # storage policy, we'll try to create the segments
 | |
|                     # container with the same policy as the upload container
 | |
|                     create_containers = [
 | |
|                         self.thread_manager.container_pool.submit(
 | |
|                             self._create_container_job, seg_container,
 | |
|                             policy_source=container
 | |
|                         )
 | |
|                     ]
 | |
|                 else:
 | |
|                     create_containers = [
 | |
|                         self.thread_manager.container_pool.submit(
 | |
|                             self._create_container_job, seg_container,
 | |
|                             headers=policy_header
 | |
|                         )
 | |
|                     ]
 | |
| 
 | |
|                 for r in interruptable_as_completed(create_containers):
 | |
|                     res = r.result()
 | |
|                     yield res
 | |
| 
 | |
|         # We maintain a results queue here and a separate thread to monitor
 | |
|         # the futures because we want to get results back from potential
 | |
|         # segment uploads too
 | |
|         rq = Queue()
 | |
|         file_jobs = {}
 | |
| 
 | |
|         upload_objects = self._make_upload_objects(objects, pseudo_folder)
 | |
|         for upload_object in upload_objects:
 | |
|             s = upload_object.source
 | |
|             o = upload_object.object_name
 | |
|             o_opts = upload_object.options
 | |
|             details = {'action': 'upload', 'container': container}
 | |
|             if o_opts is not None:
 | |
|                 object_options = deepcopy(options)
 | |
|                 object_options.update(o_opts)
 | |
|             else:
 | |
|                 object_options = options
 | |
|             if hasattr(s, 'read'):
 | |
|                 # We've got a file like object to upload to o
 | |
|                 file_future = self.thread_manager.object_uu_pool.submit(
 | |
|                     self._upload_object_job, container, s, o, object_options,
 | |
|                     results_queue=rq
 | |
|                 )
 | |
|                 details['file'] = s
 | |
|                 details['object'] = o
 | |
|                 file_jobs[file_future] = details
 | |
|             elif s is not None:
 | |
|                 # We've got a path to upload to o
 | |
|                 details['path'] = s
 | |
|                 details['object'] = o
 | |
|                 if isdir(s):
 | |
|                     dir_future = self.thread_manager.object_uu_pool.submit(
 | |
|                         self._create_dir_marker_job, container, o,
 | |
|                         object_options, path=s
 | |
|                     )
 | |
|                     file_jobs[dir_future] = details
 | |
|                 else:
 | |
|                     try:
 | |
|                         stat(s)
 | |
|                         file_future = \
 | |
|                             self.thread_manager.object_uu_pool.submit(
 | |
|                                 self._upload_object_job, container, s, o,
 | |
|                                 object_options, results_queue=rq
 | |
|                             )
 | |
|                         file_jobs[file_future] = details
 | |
|                     except OSError as err:
 | |
|                         # Avoid tying up threads with jobs that will fail
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         res = {
 | |
|                             'action': 'upload_object',
 | |
|                             'container': container,
 | |
|                             'object': o,
 | |
|                             'success': False,
 | |
|                             'error': err,
 | |
|                             'traceback': traceback,
 | |
|                             'error_timestamp': err_time,
 | |
|                             'path': s
 | |
|                         }
 | |
|                         rq.put(res)
 | |
|             else:
 | |
|                 # Create an empty object (as a dir marker if is_dir)
 | |
|                 details['file'] = None
 | |
|                 details['object'] = o
 | |
|                 if object_options['dir_marker']:
 | |
|                     dir_future = self.thread_manager.object_uu_pool.submit(
 | |
|                         self._create_dir_marker_job, container, o,
 | |
|                         object_options
 | |
|                     )
 | |
|                     file_jobs[dir_future] = details
 | |
|                 else:
 | |
|                     file_future = self.thread_manager.object_uu_pool.submit(
 | |
|                         self._upload_object_job, container, StringIO(),
 | |
|                         o, object_options
 | |
|                     )
 | |
|                     file_jobs[file_future] = details
 | |
| 
 | |
|         # Start a thread to watch for upload results
 | |
|         Thread(
 | |
|             target=self._watch_futures, args=(file_jobs, rq)
 | |
|         ).start()
 | |
| 
 | |
|         # yield results as they become available, including those from
 | |
|         # segment uploads.
 | |
|         res = get_from_queue(rq)
 | |
|         cancelled = False
 | |
|         while res is not None:
 | |
|             yield res
 | |
| 
 | |
|             if not res['success']:
 | |
|                 if not cancelled and options['fail_fast']:
 | |
|                     cancelled = True
 | |
|                     for f in file_jobs:
 | |
|                         f.cancel()
 | |
| 
 | |
|             res = get_from_queue(rq)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _make_upload_objects(objects, pseudo_folder=''):
 | |
|         upload_objects = []
 | |
| 
 | |
|         for o in objects:
 | |
|             if isinstance(o, string_types):
 | |
|                 obj = SwiftUploadObject(o, urljoin(pseudo_folder,
 | |
|                                                    o.lstrip('/')))
 | |
|                 upload_objects.append(obj)
 | |
|             elif isinstance(o, SwiftUploadObject):
 | |
|                 o.object_name = urljoin(pseudo_folder, o.object_name)
 | |
|                 upload_objects.append(o)
 | |
|             else:
 | |
|                 raise SwiftError(
 | |
|                     "The upload operation takes only strings or "
 | |
|                     "SwiftUploadObjects as input",
 | |
|                     obj=o)
 | |
| 
 | |
|         return upload_objects
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_container_job(
 | |
|             conn, container, headers=None, policy_source=None):
 | |
|         """
 | |
|         Create a container using the given connection
 | |
| 
 | |
|         :param conn: The swift connection used for requests.
 | |
|         :param container: The container name to create.
 | |
|         :param headers: An optional dict of headers for the
 | |
|                         put_container request.
 | |
|         :param policy_source: An optional name of a container whose policy we
 | |
|                               should duplicate.
 | |
|         :return: A dict containing the results of the operation.
 | |
|         """
 | |
|         res = {
 | |
|             'action': 'create_container',
 | |
|             'container': container,
 | |
|             'headers': headers
 | |
|         }
 | |
|         create_response = {}
 | |
|         try:
 | |
|             if policy_source is not None:
 | |
|                 _meta = conn.head_container(policy_source)
 | |
|                 if 'x-storage-policy' in _meta:
 | |
|                     policy_header = {
 | |
|                         POLICY: _meta.get('x-storage-policy')
 | |
|                     }
 | |
|                     if headers is None:
 | |
|                         headers = policy_header
 | |
|                     else:
 | |
|                         headers.update(policy_header)
 | |
| 
 | |
|             conn.put_container(
 | |
|                 container, headers, response_dict=create_response
 | |
|             )
 | |
|             res.update({
 | |
|                 'success': True,
 | |
|                 'response_dict': create_response
 | |
|             })
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time,
 | |
|                 'response_dict': create_response
 | |
|             })
 | |
|         return res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _create_dir_marker_job(conn, container, obj, options, path=None):
 | |
|         res = {
 | |
|             'action': 'create_dir_marker',
 | |
|             'container': container,
 | |
|             'object': obj,
 | |
|             'path': path
 | |
|         }
 | |
|         results_dict = {}
 | |
|         if obj.startswith('./') or obj.startswith('.\\'):
 | |
|             obj = obj[2:]
 | |
|         if obj.startswith('/'):
 | |
|             obj = obj[1:]
 | |
|         if path is not None:
 | |
|             put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
 | |
|         else:
 | |
|             put_headers = {'x-object-meta-mtime': "%f" % round(time())}
 | |
|         res['headers'] = put_headers
 | |
|         if options['changed']:
 | |
|             try:
 | |
|                 headers = conn.head_object(container, obj)
 | |
|                 ct = headers.get('content-type', '').split(';', 1)[0]
 | |
|                 cl = int(headers.get('content-length'))
 | |
|                 et = headers.get('etag')
 | |
|                 mt = headers.get('x-object-meta-mtime')
 | |
| 
 | |
|                 if (ct in KNOWN_DIR_MARKERS and
 | |
|                         cl == 0 and
 | |
|                         et == EMPTY_ETAG and
 | |
|                         mt == put_headers['x-object-meta-mtime']):
 | |
|                     res['success'] = True
 | |
|                     return res
 | |
|             except ClientException as err:
 | |
|                 if err.http_status != 404:
 | |
|                     traceback, err_time = report_traceback()
 | |
|                     logger.exception(err)
 | |
|                     res.update({
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'traceback': traceback,
 | |
|                         'error_timestamp': err_time
 | |
|                     })
 | |
|                     return res
 | |
|         try:
 | |
|             conn.put_object(container, obj, '', content_length=0,
 | |
|                             content_type=KNOWN_DIR_MARKERS[0],
 | |
|                             headers=put_headers,
 | |
|                             response_dict=results_dict)
 | |
|             res.update({
 | |
|                 'success': True,
 | |
|                 'response_dict': results_dict})
 | |
|             return res
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time,
 | |
|                 'response_dict': results_dict})
 | |
|             return res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _upload_segment_job(conn, path, container, segment_name, segment_start,
 | |
|                             segment_size, segment_index, obj_name, options,
 | |
|                             results_queue=None):
 | |
|         results_dict = {}
 | |
|         if options['segment_container']:
 | |
|             segment_container = options['segment_container']
 | |
|         else:
 | |
|             segment_container = container + '_segments'
 | |
| 
 | |
|         res = {
 | |
|             'action': 'upload_segment',
 | |
|             'for_container': container,
 | |
|             'for_object': obj_name,
 | |
|             'segment_index': segment_index,
 | |
|             'segment_size': segment_size,
 | |
|             'segment_location': '/%s/%s' % (segment_container,
 | |
|                                             segment_name),
 | |
|             'log_line': '%s segment %s' % (obj_name, segment_index),
 | |
|         }
 | |
|         fp = None
 | |
|         try:
 | |
|             fp = open(path, 'rb', DISK_BUFFER)
 | |
|             fp.seek(segment_start)
 | |
| 
 | |
|             contents = LengthWrapper(fp, segment_size, md5=options['checksum'])
 | |
|             etag = conn.put_object(
 | |
|                 segment_container,
 | |
|                 segment_name,
 | |
|                 contents,
 | |
|                 content_length=segment_size,
 | |
|                 content_type='application/swiftclient-segment',
 | |
|                 response_dict=results_dict)
 | |
| 
 | |
|             if options['checksum'] and etag and etag != contents.get_md5sum():
 | |
|                 raise SwiftError('Segment {0}: upload verification failed: '
 | |
|                                  'md5 mismatch, local {1} != remote {2} '
 | |
|                                  '(remote segment has not been removed)'
 | |
|                                  .format(segment_index,
 | |
|                                          contents.get_md5sum(),
 | |
|                                          etag))
 | |
| 
 | |
|             res.update({
 | |
|                 'success': True,
 | |
|                 'response_dict': results_dict,
 | |
|                 'segment_etag': etag,
 | |
|                 'attempts': conn.attempts
 | |
|             })
 | |
| 
 | |
|             if results_queue is not None:
 | |
|                 results_queue.put(res)
 | |
|             return res
 | |
| 
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time,
 | |
|                 'response_dict': results_dict,
 | |
|                 'attempts': conn.attempts
 | |
|             })
 | |
| 
 | |
|             if results_queue is not None:
 | |
|                 results_queue.put(res)
 | |
|             return res
 | |
|         finally:
 | |
|             if fp is not None:
 | |
|                 fp.close()
 | |
| 
 | |
|     @staticmethod
 | |
|     def _put_object(conn, container, name, content, headers=None, md5=None):
 | |
|         """
 | |
|         Upload object into a given container and verify the resulting ETag, if
 | |
|         the md5 optional parameter is passed.
 | |
| 
 | |
|         :param conn: The Swift connection to use for uploads.
 | |
|         :param container: The container to put the object into.
 | |
|         :param name: The name of the object.
 | |
|         :param content: Object content.
 | |
|         :param headers: Headers (optional) to associate with the object.
 | |
|         :param md5: MD5 sum of the content. If passed in, will be used to
 | |
|                     verify the returned ETag.
 | |
| 
 | |
|         :returns: A dictionary as the response from calling put_object.
 | |
|                   The keys are:
 | |
|                     - status
 | |
|                     - reason
 | |
|                     - headers
 | |
|                   On error, the dictionary contains the following keys:
 | |
|                     - success (with value False)
 | |
|                     - error - the encountered exception (object)
 | |
|                     - error_timestamp
 | |
|                     - response_dict - results from the put_object call, as
 | |
|                       documented above
 | |
|                     - attempts - number of attempts made
 | |
|         """
 | |
|         if headers is None:
 | |
|             headers = {}
 | |
|         else:
 | |
|             headers = dict(headers)
 | |
|         if md5 is not None:
 | |
|             headers['etag'] = md5
 | |
|         results = {}
 | |
|         try:
 | |
|             etag = conn.put_object(
 | |
|                 container, name, content, content_length=len(content),
 | |
|                 headers=headers, response_dict=results)
 | |
|             if md5 is not None and etag != md5:
 | |
|                 raise SwiftError('Upload verification failed for {0}: md5 '
 | |
|                                  'mismatch {1} != {2}'.format(name, md5, etag))
 | |
|             results['success'] = True
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             return {
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'error_timestamp': err_time,
 | |
|                 'response_dict': results,
 | |
|                 'attempts': conn.attempts,
 | |
|                 'traceback': traceback
 | |
|             }
 | |
|         return results
 | |
| 
 | |
|     @staticmethod
 | |
|     def _upload_stream_segment(conn, container, object_name,
 | |
|                                segment_container, segment_name,
 | |
|                                segment_size, segment_index,
 | |
|                                headers, fd):
 | |
|         """
 | |
|         Upload a segment from a stream, buffering it in memory first. The
 | |
|         resulting object is placed either as a segment in the segment
 | |
|         container, or if it is smaller than a single segment, as the given
 | |
|         object name.
 | |
| 
 | |
|         :param conn: Swift Connection to use.
 | |
|         :param container: Container in which the object would be placed.
 | |
|         :param object_name: Name of the final object (used in case the stream
 | |
|                             is smaller than the segment_size)
 | |
|         :param segment_container: Container to hold the object segments.
 | |
|         :param segment_name: The name of the segment.
 | |
|         :param segment_size: Minimum segment size.
 | |
|         :param segment_index: The segment index.
 | |
|         :param headers: Headers to attach to the segment/object.
 | |
|         :param fd: File-like handle for the content. Must implement read().
 | |
| 
 | |
|         :returns: Dictionary, containing the following keys:
 | |
|                     - complete -- whether the stream is exhausted
 | |
|                     - segment_size - the actual size of the segment (may be
 | |
|                                      smaller than the passed in segment_size)
 | |
|                     - segment_location - path to the segment
 | |
|                     - segment_index - index of the segment
 | |
|                     - segment_etag - the ETag for the segment
 | |
|         """
 | |
|         buf = []
 | |
|         dgst = md5()
 | |
|         bytes_read = 0
 | |
|         while bytes_read < segment_size:
 | |
|             data = fd.read(segment_size - bytes_read)
 | |
|             if not data:
 | |
|                 break
 | |
|             bytes_read += len(data)
 | |
|             dgst.update(data)
 | |
|             buf.append(data)
 | |
|         buf = b''.join(buf)
 | |
|         segment_hash = dgst.hexdigest()
 | |
| 
 | |
|         if not buf and segment_index > 0:
 | |
|             # Happens if the segment size aligns with the object size
 | |
|             return {'complete': True,
 | |
|                     'segment_size': 0,
 | |
|                     'segment_index': None,
 | |
|                     'segment_etag': None,
 | |
|                     'segment_location': None,
 | |
|                     'success': True}
 | |
| 
 | |
|         if segment_index == 0 and len(buf) < segment_size:
 | |
|             ret = SwiftService._put_object(
 | |
|                 conn, container, object_name, buf, headers, segment_hash)
 | |
|             ret['segment_location'] = '/%s/%s' % (container, object_name)
 | |
|         else:
 | |
|             ret = SwiftService._put_object(
 | |
|                 conn, segment_container, segment_name, buf, headers,
 | |
|                 segment_hash)
 | |
|             ret['segment_location'] = '/%s/%s' % (
 | |
|                 segment_container, segment_name)
 | |
| 
 | |
|         ret.update(
 | |
|             dict(complete=len(buf) < segment_size,
 | |
|                  segment_size=len(buf),
 | |
|                  segment_index=segment_index,
 | |
|                  segment_etag=segment_hash,
 | |
|                  for_object=object_name))
 | |
|         return ret
 | |
| 
 | |
|     def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
 | |
|         chunks = []
 | |
|         if 'x-object-manifest' in headers:
 | |
|             scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
 | |
|             for part in self.list(scontainer, {'prefix': sprefix}):
 | |
|                 if part["success"]:
 | |
|                     chunks.extend(part["listing"])
 | |
|                 else:
 | |
|                     raise part["error"]
 | |
|         elif config_true_value(headers.get('x-static-large-object')):
 | |
|             if manifest is None:
 | |
|                 headers, manifest = conn.get_object(
 | |
|                     container, obj, query_string='multipart-manifest=get')
 | |
|             manifest = parse_api_response(headers, manifest)
 | |
|             for chunk in manifest:
 | |
|                 if chunk.get('sub_slo'):
 | |
|                     scont, sobj = chunk['name'].lstrip('/').split('/', 1)
 | |
|                     chunks.extend(self._get_chunk_data(
 | |
|                         conn, scont, sobj, {'x-static-large-object': True}))
 | |
|                 else:
 | |
|                     chunks.append(chunk)
 | |
|         else:
 | |
|             chunks.append({'hash': headers.get('etag').strip('"'),
 | |
|                            'bytes': int(headers.get('content-length'))})
 | |
|         return chunks
 | |
| 
 | |
|     def _is_identical(self, chunk_data, path):
 | |
|         if path is None:
 | |
|             return False
 | |
|         try:
 | |
|             fp = open(path, 'rb', DISK_BUFFER)
 | |
|         except IOError:
 | |
|             return False
 | |
| 
 | |
|         with fp:
 | |
|             for chunk in chunk_data:
 | |
|                 to_read = chunk['bytes']
 | |
|                 md5sum = md5()
 | |
|                 while to_read:
 | |
|                     data = fp.read(min(DISK_BUFFER, to_read))
 | |
|                     if not data:
 | |
|                         return False
 | |
|                     md5sum.update(data)
 | |
|                     to_read -= len(data)
 | |
|                 if md5sum.hexdigest() != chunk['hash']:
 | |
|                     return False
 | |
|             # Each chunk is verified; check that we're at the end of the file
 | |
|             return not fp.read(1)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _upload_slo_manifest(conn, segment_results, container, obj, headers):
 | |
|         """
 | |
|         Upload an SLO manifest, given the results of uploading each segment, to
 | |
|         the specified container.
 | |
| 
 | |
|         :param segment_results: List of response_dict structures, as populated
 | |
|                                 by _upload_segment_job. Specifically, each
 | |
|                                 entry must container the following keys:
 | |
|                                 - segment_location
 | |
|                                 - segment_etag
 | |
|                                 - segment_size
 | |
|                                 - segment_index
 | |
|         :param container: The container to put the manifest into.
 | |
|         :param obj: The name of the manifest object to use.
 | |
|         :param headers: Optional set of headers to attach to the manifest.
 | |
|         """
 | |
|         if headers is None:
 | |
|             headers = {}
 | |
|         segment_results.sort(key=lambda di: di['segment_index'])
 | |
|         for seg in segment_results:
 | |
|             seg_loc = seg['segment_location'].lstrip('/')
 | |
|             if isinstance(seg_loc, text_type):
 | |
|                 seg_loc = seg_loc.encode('utf-8')
 | |
| 
 | |
|         manifest_data = json.dumps([
 | |
|             {
 | |
|                 'path': d['segment_location'],
 | |
|                 'etag': d['segment_etag'],
 | |
|                 'size_bytes': d['segment_size']
 | |
|             } for d in segment_results
 | |
|         ])
 | |
| 
 | |
|         response = {}
 | |
|         conn.put_object(
 | |
|             container, obj, manifest_data,
 | |
|             headers=headers,
 | |
|             query_string='multipart-manifest=put',
 | |
|             response_dict=response)
 | |
|         return response
 | |
| 
 | |
|     def _upload_object_job(self, conn, container, source, obj, options,
 | |
|                            results_queue=None):
 | |
|         if obj.startswith('./') or obj.startswith('.\\'):
 | |
|             obj = obj[2:]
 | |
|         if obj.startswith('/'):
 | |
|             obj = obj[1:]
 | |
|         res = {
 | |
|             'action': 'upload_object',
 | |
|             'container': container,
 | |
|             'object': obj
 | |
|         }
 | |
|         if hasattr(source, 'read'):
 | |
|             stream = source
 | |
|             path = None
 | |
|         else:
 | |
|             path = source
 | |
|         res['path'] = path
 | |
|         try:
 | |
|             if path is not None:
 | |
|                 put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
 | |
|             else:
 | |
|                 put_headers = {'x-object-meta-mtime': "%f" % round(time())}
 | |
| 
 | |
|             res['headers'] = put_headers
 | |
| 
 | |
|             # We need to HEAD all objects now in case we're overwriting a
 | |
|             # manifest object and need to delete the old segments
 | |
|             # ourselves.
 | |
|             old_manifest = None
 | |
|             old_slo_manifest_paths = []
 | |
|             new_slo_manifest_paths = set()
 | |
|             segment_size = int(0 if options['segment_size'] is None
 | |
|                                else options['segment_size'])
 | |
|             if (options['changed'] or options['skip_identical'] or
 | |
|                     not options['leave_segments']):
 | |
|                 try:
 | |
|                     headers = conn.head_object(container, obj)
 | |
|                     is_slo = config_true_value(
 | |
|                         headers.get('x-static-large-object'))
 | |
| 
 | |
|                     if options['skip_identical'] or (
 | |
|                             is_slo and not options['leave_segments']):
 | |
|                         chunk_data = self._get_chunk_data(
 | |
|                             conn, container, obj, headers)
 | |
| 
 | |
|                     if options['skip_identical'] and self._is_identical(
 | |
|                             chunk_data, path):
 | |
|                         res.update({
 | |
|                             'success': True,
 | |
|                             'status': 'skipped-identical'
 | |
|                         })
 | |
|                         return res
 | |
| 
 | |
|                     cl = int(headers.get('content-length'))
 | |
|                     mt = headers.get('x-object-meta-mtime')
 | |
|                     if (path is not None and options['changed'] and
 | |
|                             cl == getsize(path) and
 | |
|                             mt == put_headers['x-object-meta-mtime']):
 | |
|                         res.update({
 | |
|                             'success': True,
 | |
|                             'status': 'skipped-changed'
 | |
|                         })
 | |
|                         return res
 | |
|                     if not options['leave_segments']:
 | |
|                         old_manifest = headers.get('x-object-manifest')
 | |
|                         if is_slo:
 | |
|                             for old_seg in chunk_data:
 | |
|                                 seg_path = old_seg['name'].lstrip('/')
 | |
|                                 if isinstance(seg_path, text_type):
 | |
|                                     seg_path = seg_path.encode('utf-8')
 | |
|                                 old_slo_manifest_paths.append(seg_path)
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         res.update({
 | |
|                             'success': False,
 | |
|                             'error': err,
 | |
|                             'traceback': traceback,
 | |
|                             'error_timestamp': err_time
 | |
|                         })
 | |
|                         return res
 | |
| 
 | |
|             # Merge the command line header options to the put_headers
 | |
|             put_headers.update(split_headers(
 | |
|                 options['meta'], 'X-Object-Meta-'))
 | |
|             put_headers.update(split_headers(options['header'], ''))
 | |
| 
 | |
|             # Don't do segment job if object is not big enough, and never do
 | |
|             # a segment job if we're reading from a stream - we may fail if we
 | |
|             # go over the single object limit, but this gives us a nice way
 | |
|             # to create objects from memory
 | |
|             if (path is not None and segment_size and
 | |
|                     (getsize(path) > segment_size)):
 | |
|                 res['large_object'] = True
 | |
|                 seg_container = container + '_segments'
 | |
|                 if options['segment_container']:
 | |
|                     seg_container = options['segment_container']
 | |
|                 full_size = getsize(path)
 | |
| 
 | |
|                 segment_futures = []
 | |
|                 segment_pool = self.thread_manager.segment_pool
 | |
|                 segment = 0
 | |
|                 segment_start = 0
 | |
| 
 | |
|                 while segment_start < full_size:
 | |
|                     if segment_start + segment_size > full_size:
 | |
|                         segment_size = full_size - segment_start
 | |
|                     if options['use_slo']:
 | |
|                         segment_name = '%s/slo/%s/%s/%s/%08d' % (
 | |
|                             obj, put_headers['x-object-meta-mtime'],
 | |
|                             full_size, options['segment_size'], segment
 | |
|                         )
 | |
|                     else:
 | |
|                         segment_name = '%s/%s/%s/%s/%08d' % (
 | |
|                             obj, put_headers['x-object-meta-mtime'],
 | |
|                             full_size, options['segment_size'], segment
 | |
|                         )
 | |
|                     seg = segment_pool.submit(
 | |
|                         self._upload_segment_job, path, container,
 | |
|                         segment_name, segment_start, segment_size, segment,
 | |
|                         obj, options, results_queue=results_queue
 | |
|                     )
 | |
|                     segment_futures.append(seg)
 | |
|                     segment += 1
 | |
|                     segment_start += segment_size
 | |
| 
 | |
|                 segment_results = []
 | |
|                 errors = False
 | |
|                 exceptions = []
 | |
|                 for f in interruptable_as_completed(segment_futures):
 | |
|                     try:
 | |
|                         r = f.result()
 | |
|                         if not r['success']:
 | |
|                             errors = True
 | |
|                         segment_results.append(r)
 | |
|                     except Exception as err:
 | |
|                         traceback, err_time = report_traceback()
 | |
|                         logger.exception(err)
 | |
|                         errors = True
 | |
|                         exceptions.append((err, traceback, err_time))
 | |
|                 if errors:
 | |
|                     err = ClientException(
 | |
|                         'Aborting manifest creation '
 | |
|                         'because not all segments could be uploaded. %s/%s'
 | |
|                         % (container, obj))
 | |
|                     res.update({
 | |
|                         'success': False,
 | |
|                         'error': err,
 | |
|                         'exceptions': exceptions,
 | |
|                         'segment_results': segment_results
 | |
|                     })
 | |
|                     return res
 | |
| 
 | |
|                 res['segment_results'] = segment_results
 | |
| 
 | |
|                 if options['use_slo']:
 | |
|                     response = self._upload_slo_manifest(
 | |
|                         conn, segment_results, container, obj, put_headers)
 | |
|                     res['manifest_response_dict'] = response
 | |
|                     new_slo_manifest_paths = {
 | |
|                         seg['segment_location'] for seg in segment_results}
 | |
|                 else:
 | |
|                     new_object_manifest = '%s/%s/%s/%s/%s/' % (
 | |
|                         quote(seg_container.encode('utf8')),
 | |
|                         quote(obj.encode('utf8')),
 | |
|                         put_headers['x-object-meta-mtime'], full_size,
 | |
|                         options['segment_size'])
 | |
|                     if old_manifest and old_manifest.rstrip('/') == \
 | |
|                             new_object_manifest.rstrip('/'):
 | |
|                         old_manifest = None
 | |
|                     put_headers['x-object-manifest'] = new_object_manifest
 | |
|                     mr = {}
 | |
|                     conn.put_object(
 | |
|                         container, obj, '', content_length=0,
 | |
|                         headers=put_headers,
 | |
|                         response_dict=mr
 | |
|                     )
 | |
|                     res['manifest_response_dict'] = mr
 | |
|             elif options['use_slo'] and segment_size and not path:
 | |
|                 segment = 0
 | |
|                 results = []
 | |
|                 while True:
 | |
|                     segment_name = '%s/slo/%s/%s/%08d' % (
 | |
|                         obj, put_headers['x-object-meta-mtime'],
 | |
|                         segment_size, segment
 | |
|                     )
 | |
|                     seg_container = container + '_segments'
 | |
|                     if options['segment_container']:
 | |
|                         seg_container = options['segment_container']
 | |
|                     ret = self._upload_stream_segment(
 | |
|                         conn, container, obj,
 | |
|                         seg_container,
 | |
|                         segment_name,
 | |
|                         segment_size,
 | |
|                         segment,
 | |
|                         put_headers,
 | |
|                         stream
 | |
|                     )
 | |
|                     if not ret['success']:
 | |
|                         return ret
 | |
|                     if (ret['complete'] and segment == 0) or\
 | |
|                             ret['segment_size'] > 0:
 | |
|                         results.append(ret)
 | |
|                     if results_queue is not None:
 | |
|                         # Don't insert the 0-sized segments or objects
 | |
|                         # themselves
 | |
|                         if ret['segment_location'] != '/%s/%s' % (
 | |
|                                 container, obj) and ret['segment_size'] > 0:
 | |
|                             results_queue.put(ret)
 | |
|                     if ret['complete']:
 | |
|                         break
 | |
|                     segment += 1
 | |
|                 if results[0]['segment_location'] != '/%s/%s' % (
 | |
|                         container, obj):
 | |
|                     response = self._upload_slo_manifest(
 | |
|                         conn, results, container, obj, put_headers)
 | |
|                     res['manifest_response_dict'] = response
 | |
|                     new_slo_manifest_paths = {
 | |
|                         r['segment_location'] for r in results}
 | |
|                     res['large_object'] = True
 | |
|                 else:
 | |
|                     res['response_dict'] = ret
 | |
|                     res['large_object'] = False
 | |
|             else:
 | |
|                 res['large_object'] = False
 | |
|                 obr = {}
 | |
|                 fp = None
 | |
|                 try:
 | |
|                     if path is not None:
 | |
|                         content_length = getsize(path)
 | |
|                         fp = open(path, 'rb', DISK_BUFFER)
 | |
|                         contents = LengthWrapper(fp,
 | |
|                                                  content_length,
 | |
|                                                  md5=options['checksum'])
 | |
|                     else:
 | |
|                         content_length = None
 | |
|                         contents = ReadableToIterable(stream,
 | |
|                                                       md5=options['checksum'])
 | |
| 
 | |
|                     etag = conn.put_object(
 | |
|                         container, obj, contents,
 | |
|                         content_length=content_length, headers=put_headers,
 | |
|                         response_dict=obr
 | |
|                     )
 | |
|                     res['response_dict'] = obr
 | |
| 
 | |
|                     if (options['checksum'] and
 | |
|                             etag and etag != contents.get_md5sum()):
 | |
|                         raise SwiftError(
 | |
|                             'Object upload verification failed: '
 | |
|                             'md5 mismatch, local {0} != remote {1} '
 | |
|                             '(remote object has not been removed)'
 | |
|                             .format(contents.get_md5sum(), etag))
 | |
|                 finally:
 | |
|                     if fp is not None:
 | |
|                         fp.close()
 | |
|             if old_manifest or old_slo_manifest_paths:
 | |
|                 drs = []
 | |
|                 delobjsmap = {}
 | |
|                 if old_manifest:
 | |
|                     scontainer, sprefix = old_manifest.split('/', 1)
 | |
|                     sprefix = sprefix.rstrip('/') + '/'
 | |
|                     delobjsmap[scontainer] = []
 | |
|                     for part in self.list(scontainer, {'prefix': sprefix}):
 | |
|                         if not part["success"]:
 | |
|                             raise part["error"]
 | |
|                         delobjsmap[scontainer].extend(
 | |
|                             seg['name'] for seg in part['listing'])
 | |
| 
 | |
|                 if old_slo_manifest_paths:
 | |
|                     for seg_to_delete in old_slo_manifest_paths:
 | |
|                         if seg_to_delete in new_slo_manifest_paths:
 | |
|                             continue
 | |
|                         scont, sobj = \
 | |
|                             seg_to_delete.split(b'/', 1)
 | |
|                         delobjs_cont = delobjsmap.get(scont, [])
 | |
|                         delobjs_cont.append(sobj)
 | |
|                         delobjsmap[scont] = delobjs_cont
 | |
| 
 | |
|                 del_segs = []
 | |
|                 for dscont, dsobjs in delobjsmap.items():
 | |
|                     for dsobj in dsobjs:
 | |
|                         del_seg = self.thread_manager.segment_pool.submit(
 | |
|                             self._delete_segment, dscont, dsobj,
 | |
|                             results_queue=results_queue
 | |
|                         )
 | |
|                         del_segs.append(del_seg)
 | |
| 
 | |
|                 for del_seg in interruptable_as_completed(del_segs):
 | |
|                     drs.append(del_seg.result())
 | |
|                 res['segment_delete_results'] = drs
 | |
| 
 | |
|             # return dict for printing
 | |
|             res.update({
 | |
|                 'success': True,
 | |
|                 'status': 'uploaded',
 | |
|                 'attempts': conn.attempts})
 | |
|             return res
 | |
| 
 | |
|         except OSError as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             if err.errno == ENOENT:
 | |
|                 error = SwiftError('Local file %r not found' % path, exc=err)
 | |
|             else:
 | |
|                 error = err
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': error,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
|         return res
 | |
| 
 | |
|     # Delete related methods
 | |
|     #
 | |
|     def delete(self, container=None, objects=None, options=None):
 | |
|         """
 | |
|         Delete operations on an account, optional container and optional list
 | |
|         of objects.
 | |
| 
 | |
|         :param container: The container to delete or delete from.
 | |
|         :param objects: The list of objects to delete.
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation::
 | |
| 
 | |
|                             {
 | |
|                                 'yes_all': False,
 | |
|                                 'leave_segments': False,
 | |
|                                 'prefix': None,
 | |
|                                 'header': [],
 | |
|                             }
 | |
| 
 | |
|         :returns: A generator for returning the results of the delete
 | |
|                   operations. Each result yielded from the generator is either
 | |
|                   a 'delete_container', 'delete_object', 'delete_segment', or
 | |
|                   'bulk_delete' dictionary containing the results of an
 | |
|                   individual delete operation.
 | |
| 
 | |
|         :raises ClientException:
 | |
|         :raises SwiftError:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         if container is not None:
 | |
|             if objects is not None:
 | |
|                 if options['prefix']:
 | |
|                     objects = [obj for obj in objects
 | |
|                                if obj.startswith(options['prefix'])]
 | |
|                 rq = Queue()
 | |
|                 obj_dels = {}
 | |
| 
 | |
|                 bulk_page_size = self._bulk_delete_page_size(objects)
 | |
|                 if bulk_page_size > 1:
 | |
|                     page_at_a_time = n_at_a_time(objects, bulk_page_size)
 | |
|                     for page_slice in page_at_a_time:
 | |
|                         for obj_slice in n_groups(
 | |
|                                 page_slice,
 | |
|                                 self._options['object_dd_threads']):
 | |
|                             self._bulk_delete(container, obj_slice, options,
 | |
|                                               obj_dels)
 | |
|                 else:
 | |
|                     self._per_item_delete(container, objects, options,
 | |
|                                           obj_dels, rq)
 | |
| 
 | |
|                 # Start a thread to watch for delete results
 | |
|                 Thread(
 | |
|                     target=self._watch_futures, args=(obj_dels, rq)
 | |
|                 ).start()
 | |
| 
 | |
|                 # yield results as they become available, raising the first
 | |
|                 # encountered exception
 | |
|                 res = get_from_queue(rq)
 | |
|                 while res is not None:
 | |
|                     yield res
 | |
| 
 | |
|                     # Cancel the remaining jobs if necessary
 | |
|                     if options['fail_fast'] and not res['success']:
 | |
|                         for d in obj_dels.keys():
 | |
|                             d.cancel()
 | |
| 
 | |
|                     res = get_from_queue(rq)
 | |
|             else:
 | |
|                 for res in self._delete_container(container, options):
 | |
|                     yield res
 | |
|         else:
 | |
|             if objects:
 | |
|                 raise SwiftError('Objects specified without container')
 | |
|             if options['prefix']:
 | |
|                 raise SwiftError('Prefix specified without container')
 | |
|             if options['yes_all']:
 | |
|                 cancelled = False
 | |
|                 containers = []
 | |
|                 for part in self.list():
 | |
|                     if part["success"]:
 | |
|                         containers.extend(c['name'] for c in part['listing'])
 | |
|                     else:
 | |
|                         raise part["error"]
 | |
| 
 | |
|                 for con in containers:
 | |
|                     if cancelled:
 | |
|                         break
 | |
|                     else:
 | |
|                         for res in self._delete_container(
 | |
|                                 con, options=options):
 | |
|                             yield res
 | |
| 
 | |
|                             # Cancel the remaining container deletes, but yield
 | |
|                             # any pending results
 | |
|                             if (not cancelled and options['fail_fast'] and
 | |
|                                     not res['success']):
 | |
|                                 cancelled = True
 | |
| 
 | |
|     def _bulk_delete_page_size(self, objects):
 | |
|         '''
 | |
|         Given the iterable 'objects', will return how many items should be
 | |
|         deleted at a time.
 | |
| 
 | |
|         :param objects: An iterable that supports 'len()'
 | |
|         :returns: The bulk delete page size (i.e. the max number of
 | |
|                   objects that can be bulk deleted at once, as reported by
 | |
|                   the cluster). If bulk delete is disabled, return 1
 | |
|         '''
 | |
|         if len(objects) <= 2 * self._options['object_dd_threads']:
 | |
|             # Not many objects; may as well delete one-by-one
 | |
|             return 1
 | |
| 
 | |
|         try:
 | |
|             cap_result = self.capabilities()
 | |
|             if not cap_result['success']:
 | |
|                 # This shouldn't actually happen, but just in case we start
 | |
|                 # being more nuanced about our capabilities result...
 | |
|                 return 1
 | |
|         except ClientException:
 | |
|             # Old swift, presumably; assume no bulk middleware
 | |
|             return 1
 | |
| 
 | |
|         swift_info = cap_result['capabilities']
 | |
|         if 'bulk_delete' in swift_info:
 | |
|             return swift_info['bulk_delete'].get(
 | |
|                 'max_deletes_per_request', 10000)
 | |
|         else:
 | |
|             return 1
 | |
| 
 | |
|     def _per_item_delete(self, container, objects, options, rdict, rq):
 | |
|         for obj in objects:
 | |
|             obj_del = self.thread_manager.object_dd_pool.submit(
 | |
|                 self._delete_object, container, obj, options,
 | |
|                 results_queue=rq
 | |
|             )
 | |
|             obj_details = {'container': container, 'object': obj}
 | |
|             rdict[obj_del] = obj_details
 | |
| 
 | |
|     @staticmethod
 | |
|     def _delete_segment(conn, container, obj, results_queue=None):
 | |
|         results_dict = {}
 | |
|         try:
 | |
|             res = {'success': True}
 | |
|             conn.delete_object(container, obj, response_dict=results_dict)
 | |
|         except Exception as err:
 | |
|             if not isinstance(err, ClientException) or err.http_status != 404:
 | |
|                 traceback, err_time = report_traceback()
 | |
|                 logger.exception(err)
 | |
|                 res = {
 | |
|                     'success': False,
 | |
|                     'error': err,
 | |
|                     'traceback': traceback,
 | |
|                     'error_timestamp': err_time
 | |
|                 }
 | |
| 
 | |
|         res.update({
 | |
|             'action': 'delete_segment',
 | |
|             'container': container,
 | |
|             'object': obj,
 | |
|             'attempts': conn.attempts,
 | |
|             'response_dict': results_dict
 | |
|         })
 | |
| 
 | |
|         if results_queue is not None:
 | |
|             results_queue.put(res)
 | |
|         return res
 | |
| 
 | |
|     def _delete_object(self, conn, container, obj, options,
 | |
|                        results_queue=None):
 | |
|         _headers = {}
 | |
|         _headers = split_headers(options.get('header', []))
 | |
|         res = {
 | |
|             'action': 'delete_object',
 | |
|             'container': container,
 | |
|             'object': obj
 | |
|         }
 | |
|         try:
 | |
|             old_manifest = None
 | |
|             query_string = None
 | |
| 
 | |
|             if not options['leave_segments']:
 | |
|                 try:
 | |
|                     headers = conn.head_object(container, obj,
 | |
|                                                headers=_headers)
 | |
|                     old_manifest = headers.get('x-object-manifest')
 | |
|                     if config_true_value(headers.get('x-static-large-object')):
 | |
|                         query_string = 'multipart-manifest=delete'
 | |
|                 except ClientException as err:
 | |
|                     if err.http_status != 404:
 | |
|                         raise
 | |
| 
 | |
|             results_dict = {}
 | |
|             conn.delete_object(container, obj,
 | |
|                                headers=_headers,
 | |
|                                query_string=query_string,
 | |
|                                response_dict=results_dict)
 | |
| 
 | |
|             if old_manifest:
 | |
| 
 | |
|                 dlo_segments_deleted = True
 | |
|                 segment_pool = self.thread_manager.segment_pool
 | |
|                 s_container, s_prefix = old_manifest.split('/', 1)
 | |
|                 s_prefix = s_prefix.rstrip('/') + '/'
 | |
| 
 | |
|                 del_segs = []
 | |
|                 for part in self.list(
 | |
|                         container=s_container, options={'prefix': s_prefix}):
 | |
|                     if part["success"]:
 | |
|                         seg_list = [o["name"] for o in part["listing"]]
 | |
|                     else:
 | |
|                         raise part["error"]
 | |
| 
 | |
|                     for seg in seg_list:
 | |
|                         del_seg = segment_pool.submit(
 | |
|                             self._delete_segment, s_container,
 | |
|                             seg, results_queue=results_queue
 | |
|                         )
 | |
|                         del_segs.append(del_seg)
 | |
| 
 | |
|                 for del_seg in interruptable_as_completed(del_segs):
 | |
|                     del_res = del_seg.result()
 | |
|                     if not del_res["success"]:
 | |
|                         dlo_segments_deleted = False
 | |
| 
 | |
|                 res['dlo_segments_deleted'] = dlo_segments_deleted
 | |
| 
 | |
|             res.update({
 | |
|                 'success': True,
 | |
|                 'response_dict': results_dict,
 | |
|                 'attempts': conn.attempts,
 | |
|             })
 | |
| 
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
|             return res
 | |
| 
 | |
|         return res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _delete_empty_container(conn, container, options):
 | |
|         results_dict = {}
 | |
|         _headers = {}
 | |
|         _headers = split_headers(options.get('header', []))
 | |
|         try:
 | |
|             conn.delete_container(container, headers=_headers,
 | |
|                                   response_dict=results_dict)
 | |
|             res = {'success': True}
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res = {
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             }
 | |
| 
 | |
|         res.update({
 | |
|             'action': 'delete_container',
 | |
|             'container': container,
 | |
|             'object': None,
 | |
|             'attempts': conn.attempts,
 | |
|             'response_dict': results_dict
 | |
|         })
 | |
|         return res
 | |
| 
 | |
|     def _delete_container(self, container, options):
 | |
|         try:
 | |
|             for part in self.list(container=container, options=options):
 | |
|                 if not part["success"]:
 | |
| 
 | |
|                     raise part["error"]
 | |
| 
 | |
|                 for res in self.delete(
 | |
|                         container=container,
 | |
|                         objects=[o['name'] for o in part['listing']],
 | |
|                         options=options):
 | |
|                     yield res
 | |
|             if options['prefix']:
 | |
|                 # We're only deleting a subset of objects within the container
 | |
|                 return
 | |
| 
 | |
|             con_del = self.thread_manager.container_pool.submit(
 | |
|                 self._delete_empty_container, container, options
 | |
|             )
 | |
|             con_del_res = get_future_result(con_del)
 | |
| 
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             con_del_res = {
 | |
|                 'action': 'delete_container',
 | |
|                 'container': container,
 | |
|                 'object': None,
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             }
 | |
| 
 | |
|         yield con_del_res
 | |
| 
 | |
|     # Bulk methods
 | |
|     #
 | |
|     def _bulk_delete(self, container, objects, options, rdict):
 | |
|         if objects:
 | |
|             bulk_del = self.thread_manager.object_dd_pool.submit(
 | |
|                 self._bulkdelete, container, objects, options
 | |
|             )
 | |
|             bulk_details = {'container': container, 'objects': objects}
 | |
|             rdict[bulk_del] = bulk_details
 | |
| 
 | |
|     @staticmethod
 | |
|     def _bulkdelete(conn, container, objects, options):
 | |
|         results_dict = {}
 | |
|         try:
 | |
|             headers = {
 | |
|                 'Accept': 'application/json',
 | |
|                 'Content-Type': 'text/plain',
 | |
|             }
 | |
|             res = {'container': container, 'objects': objects}
 | |
|             objects = [quote(('/%s/%s' % (container, obj)).encode('utf-8'))
 | |
|                        for obj in objects]
 | |
|             headers, body = conn.post_account(
 | |
|                 headers=headers,
 | |
|                 query_string='bulk-delete',
 | |
|                 data=b''.join(obj.encode('utf-8') + b'\n' for obj in objects),
 | |
|                 response_dict=results_dict)
 | |
|             if body:
 | |
|                 res.update({'success': True,
 | |
|                             'result': parse_api_response(headers, body)})
 | |
|             else:
 | |
|                 res.update({
 | |
|                     'success': False,
 | |
|                     'error': SwiftError(
 | |
|                         'No content received on account POST. '
 | |
|                         'Is the bulk operations middleware enabled?')})
 | |
|         except Exception as e:
 | |
|             res.update({'success': False, 'error': e})
 | |
| 
 | |
|         res.update({
 | |
|             'action': 'bulk_delete',
 | |
|             'attempts': conn.attempts,
 | |
|             'response_dict': results_dict
 | |
|         })
 | |
| 
 | |
|         return res
 | |
| 
 | |
|     # Copy related methods
 | |
|     #
 | |
|     def copy(self, container, objects, options=None):
 | |
|         """
 | |
|         Copy operations on a list of objects in a container. Destination
 | |
|         containers will be created.
 | |
| 
 | |
|         :param container: The container from which to copy the objects.
 | |
|         :param objects: A list of object names (strings) or SwiftCopyObject
 | |
|                         instances containing an object name and an
 | |
|                         options dict (can be None) to override the options for
 | |
|                         that individual copy operation::
 | |
| 
 | |
|                             [
 | |
|                                 'object_name',
 | |
|                                 SwiftCopyObject(
 | |
|                                     'object_name',
 | |
|                                      options={
 | |
|                                         'destination': '/container/object',
 | |
|                                         'fresh_metadata': False,
 | |
|                                         ...
 | |
|                                         }),
 | |
|                                 ...
 | |
|                             ]
 | |
| 
 | |
|                         The options dict is described below.
 | |
|         :param options: A dictionary containing options to override the global
 | |
|                         options specified during the service object creation.
 | |
|                         These options are applied to all copy operations
 | |
|                         performed by this call, unless overridden on a per
 | |
|                         object basis.
 | |
|                         The options "destination" and "fresh_metadata" do
 | |
|                         not need to be set, in this case objects will be
 | |
|                         copied onto themselves and metadata will not be
 | |
|                         refreshed.
 | |
|                         The option "destination" can also be specified in the
 | |
|                         format '/container', in which case objects without an
 | |
|                         explicit destination will be copied to the destination
 | |
|                         /container/original_object_name. Combinations of
 | |
|                         multiple objects and a destination in the format
 | |
|                         '/container/object' is invalid. Possible options are
 | |
|                         given below::
 | |
| 
 | |
|                             {
 | |
|                                 'meta': [],
 | |
|                                 'header': [],
 | |
|                                 'destination': '/container/object',
 | |
|                                 'fresh_metadata': False,
 | |
|                             }
 | |
| 
 | |
|         :returns: A generator returning the results of copying the given list
 | |
|                   of objects.
 | |
| 
 | |
|         :raises SwiftError:
 | |
|         """
 | |
|         if options is not None:
 | |
|             options = dict(self._options, **options)
 | |
|         else:
 | |
|             options = self._options
 | |
| 
 | |
|         # Try to create the container, just in case it doesn't exist. If this
 | |
|         # fails, it might just be because the user doesn't have container PUT
 | |
|         # permissions, so we'll ignore any error. If there's really a problem,
 | |
|         # it'll surface on the first object COPY.
 | |
|         containers = set(
 | |
|             next(p for p in obj.destination.split("/") if p)
 | |
|             for obj in objects
 | |
|             if isinstance(obj, SwiftCopyObject) and obj.destination
 | |
|         )
 | |
|         if options.get('destination'):
 | |
|             destination_split = options['destination'].split('/')
 | |
|             if destination_split[0]:
 | |
|                 raise SwiftError("destination must be in format /cont[/obj]")
 | |
|             _str_objs = [
 | |
|                 o for o in objects if not isinstance(o, SwiftCopyObject)
 | |
|             ]
 | |
|             if len(destination_split) > 2 and len(_str_objs) > 1:
 | |
|                 # TODO (clayg): could be useful to copy multiple objects into
 | |
|                 # a destination like "/container/common/prefix/for/objects/"
 | |
|                 # where the trailing "/" indicates the destination option is a
 | |
|                 # prefix!
 | |
|                 raise SwiftError("Combination of multiple objects and "
 | |
|                                  "destination including object is invalid")
 | |
|             if destination_split[-1] == '':
 | |
|                 # N.B. this protects the above case
 | |
|                 raise SwiftError("destination can not end in a slash")
 | |
|             containers.add(destination_split[1])
 | |
| 
 | |
|         policy_header = {}
 | |
|         _header = split_headers(options["header"])
 | |
|         if POLICY in _header:
 | |
|             policy_header[POLICY] = _header[POLICY]
 | |
|         create_containers = [
 | |
|             self.thread_manager.container_pool.submit(
 | |
|                 self._create_container_job, cont, headers=policy_header)
 | |
|             for cont in containers
 | |
|         ]
 | |
| 
 | |
|         # wait for container creation jobs to complete before any COPY
 | |
|         for r in interruptable_as_completed(create_containers):
 | |
|             res = r.result()
 | |
|             yield res
 | |
| 
 | |
|         copy_futures = []
 | |
|         copy_objects = self._make_copy_objects(objects, options)
 | |
|         for copy_object in copy_objects:
 | |
|             obj = copy_object.object_name
 | |
|             obj_options = copy_object.options
 | |
|             destination = copy_object.destination
 | |
|             fresh_metadata = copy_object.fresh_metadata
 | |
|             headers = split_headers(
 | |
|                 options['meta'], 'X-Object-Meta-')
 | |
|             # add header options to the headers object for the request.
 | |
|             headers.update(
 | |
|                 split_headers(options['header'], ''))
 | |
|             if obj_options is not None:
 | |
|                 if 'meta' in obj_options:
 | |
|                     headers.update(
 | |
|                         split_headers(
 | |
|                             obj_options['meta'], 'X-Object-Meta-'
 | |
|                         )
 | |
|                     )
 | |
|                 if 'header' in obj_options:
 | |
|                     headers.update(
 | |
|                         split_headers(obj_options['header'], '')
 | |
|                     )
 | |
| 
 | |
|             copy = self.thread_manager.object_uu_pool.submit(
 | |
|                 self._copy_object_job, container, obj, destination,
 | |
|                 headers, fresh_metadata
 | |
|             )
 | |
|             copy_futures.append(copy)
 | |
| 
 | |
|         for r in interruptable_as_completed(copy_futures):
 | |
|             res = r.result()
 | |
|             yield res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _make_copy_objects(objects, options):
 | |
|         copy_objects = []
 | |
| 
 | |
|         for o in objects:
 | |
|             if isinstance(o, string_types):
 | |
|                 obj = SwiftCopyObject(o, options)
 | |
|                 copy_objects.append(obj)
 | |
|             elif isinstance(o, SwiftCopyObject):
 | |
|                 copy_objects.append(o)
 | |
|             else:
 | |
|                 raise SwiftError(
 | |
|                     "The copy operation takes only strings or "
 | |
|                     "SwiftCopyObjects as input",
 | |
|                     obj=o)
 | |
| 
 | |
|         return copy_objects
 | |
| 
 | |
|     @staticmethod
 | |
|     def _copy_object_job(conn, container, obj, destination, headers,
 | |
|                          fresh_metadata):
 | |
|         response_dict = {}
 | |
|         res = {
 | |
|             'success': True,
 | |
|             'action': 'copy_object',
 | |
|             'container': container,
 | |
|             'object': obj,
 | |
|             'destination': destination,
 | |
|             'headers': headers,
 | |
|             'fresh_metadata': fresh_metadata,
 | |
|             'response_dict': response_dict
 | |
|         }
 | |
|         try:
 | |
|             conn.copy_object(
 | |
|                 container, obj, destination=destination, headers=headers,
 | |
|                 fresh_metadata=fresh_metadata, response_dict=response_dict)
 | |
|         except Exception as err:
 | |
|             traceback, err_time = report_traceback()
 | |
|             logger.exception(err)
 | |
|             res.update({
 | |
|                 'success': False,
 | |
|                 'error': err,
 | |
|                 'traceback': traceback,
 | |
|                 'error_timestamp': err_time
 | |
|             })
 | |
| 
 | |
|         return res
 | |
| 
 | |
|     # Capabilities related methods
 | |
|     #
 | |
|     def capabilities(self, url=None, refresh_cache=False):
 | |
|         """
 | |
|         List the cluster capabilities.
 | |
| 
 | |
|         :param url: Proxy URL of the cluster to retrieve capabilities.
 | |
| 
 | |
|         :returns: A dictionary containing the capabilities of the cluster.
 | |
| 
 | |
|         :raises ClientException:
 | |
|         """
 | |
|         if not refresh_cache and url in self.capabilities_cache:
 | |
|             return self.capabilities_cache[url]
 | |
| 
 | |
|         res = {
 | |
|             'action': 'capabilities',
 | |
|             'timestamp': time(),
 | |
|         }
 | |
| 
 | |
|         cap = self.thread_manager.container_pool.submit(
 | |
|             self._get_capabilities, url
 | |
|         )
 | |
|         capabilities = get_future_result(cap)
 | |
|         res.update({
 | |
|             'success': True,
 | |
|             'capabilities': capabilities
 | |
|         })
 | |
|         if url is not None:
 | |
|             res.update({
 | |
|                 'url': url
 | |
|             })
 | |
| 
 | |
|         self.capabilities_cache[url] = res
 | |
|         return res
 | |
| 
 | |
|     @staticmethod
 | |
|     def _get_capabilities(conn, url):
 | |
|         return conn.get_capabilities(url)
 | |
| 
 | |
|     # Helper methods
 | |
|     #
 | |
|     @staticmethod
 | |
|     def _watch_futures(futures, result_queue):
 | |
|         """
 | |
|         Watches a dict of futures and pushes their results onto the given
 | |
|         queue. We use this to wait for a set of futures which may create
 | |
|         futures of their own to wait for, whilst also allowing us to
 | |
|         immediately return the results of those sub-jobs.
 | |
| 
 | |
|         When all futures have completed, None is pushed to the queue
 | |
| 
 | |
|         If the future is cancelled, we use the dict to return details about
 | |
|         the cancellation.
 | |
|         """
 | |
|         futures_only = list(futures.keys())
 | |
|         for f in interruptable_as_completed(futures_only):
 | |
|             try:
 | |
|                 r = f.result()
 | |
|                 if r is not None:
 | |
|                     result_queue.put(r)
 | |
|             except CancelledError:
 | |
|                 details = futures[f]
 | |
|                 res = details
 | |
|                 res['status'] = 'cancelled'
 | |
|                 result_queue.put(res)
 | |
|             except Exception as err:
 | |
|                 traceback, err_time = report_traceback()
 | |
|                 logger.exception(err)
 | |
|                 details = futures[f]
 | |
|                 res = details
 | |
|                 res.update({
 | |
|                     'success': False,
 | |
|                     'error': err,
 | |
|                     'traceback': traceback,
 | |
|                     'error_timestamp': err_time
 | |
|                 })
 | |
|                 result_queue.put(res)
 | |
| 
 | |
|         result_queue.put(None)
 | 
