Add importable SwiftService incorporating shell.py logic

This patch adds a SwiftService class that incorporates the high
level logic from swiftclient/shell.py. It also ports shell.py to
use the new class, and updates the code in swiftclient/multithreading.py
to allow the SwiftService to be used for multiple operations whilst
using only one thread pool.

Currently, code that imports swiftclient has to have its own logic for
things like creating large objects, parallel uploads, and parallel
downloads. This patch adds a SwiftService class that makes that
functionality available in Python code as well as through the shell.

Change-Id: I08c5796b4c01001d79fd571651c3017c16462ffd
Implements: blueprint bin-swift-logic-as-importable-library
This commit is contained in:
Joel Wright 2014-04-04 21:13:01 +02:00
parent d97ec374cb
commit 24673f8d19
9 changed files with 2971 additions and 1482 deletions

@ -10,6 +10,11 @@ swiftclient.client
.. automodule:: swiftclient.client .. automodule:: swiftclient.client
swiftclient.service
===================
.. automodule:: swiftclient.service
swiftclient.exceptions swiftclient.exceptions
====================== ======================

@ -1,3 +1,4 @@
futures>=2.1.3
requests>=1.1 requests>=1.1
simplejson>=2.0.9 simplejson>=2.0.9
six>=1.5.2 six>=1.5.2

@ -14,114 +14,167 @@
from swiftclient.utils import prt_bytes from swiftclient.utils import prt_bytes
def stat_account(conn, options, thread_manager): POLICY_HEADER_PREFIX = 'x-account-storage-policy-'
items_to_print = []
def stat_account(conn, options):
items = []
headers = conn.head_account() headers = conn.head_account()
if options.verbose > 1: if options['verbose'] > 1:
items_to_print.extend(( items.extend([
('StorageURL', conn.url), ('StorageURL', conn.url),
('Auth Token', conn.token), ('Auth Token', conn.token),
)) ])
container_count = int(headers.get('x-account-container-count', 0)) container_count = int(headers.get('x-account-container-count', 0))
object_count = prt_bytes(headers.get('x-account-object-count', 0), object_count = prt_bytes(headers.get('x-account-object-count', 0),
options.human).lstrip() options['human']).lstrip()
bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0), bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0),
options.human).lstrip() options['human']).lstrip()
items_to_print.extend(( items.extend([
('Account', conn.url.rsplit('/', 1)[-1]), ('Account', conn.url.rsplit('/', 1)[-1]),
('Containers', container_count), ('Containers', container_count),
('Objects', object_count), ('Objects', object_count),
('Bytes', bytes_used), ('Bytes', bytes_used),
)) ])
policies = set() policies = set()
exclude_policy_headers = []
ps_header_prefix = 'x-account-storage-policy-'
for header_key, header_value in headers.items(): for header_key, header_value in headers.items():
if header_key.lower().startswith(ps_header_prefix): if header_key.lower().startswith(POLICY_HEADER_PREFIX):
policy_name = header_key.rsplit('-', 2)[0].split('-', 4)[-1] policy_name = header_key.rsplit('-', 2)[0].split('-', 4)[-1]
policies.add(policy_name) policies.add(policy_name)
exclude_policy_headers.append(header_key)
for policy in policies: for policy in policies:
items_to_print.extend(( items.extend((
('Objects in policy "' + policy + '"', ('Objects in policy "' + policy + '"',
prt_bytes(headers.get(ps_header_prefix + policy + '-object-count', prt_bytes(
0), options.human).lstrip()), headers.get(
POLICY_HEADER_PREFIX + policy + '-object-count', 0),
options['human']
).lstrip()),
('Bytes in policy "' + policy + '"', ('Bytes in policy "' + policy + '"',
prt_bytes(headers.get(ps_header_prefix + policy + '-bytes-used', prt_bytes(
0), options.human).lstrip()), headers.get(
POLICY_HEADER_PREFIX + policy + '-bytes-used', 0),
options['human']
).lstrip()),
)) ))
items_to_print.extend(thread_manager.headers_to_items( return items, headers
def print_account_stats(items, headers, output_manager):
exclude_policy_headers = []
for header_key, header_value in headers.items():
if header_key.lower().startswith(POLICY_HEADER_PREFIX):
exclude_policy_headers.append(header_key)
items.extend(headers_to_items(
headers, meta_prefix='x-account-meta-', headers, meta_prefix='x-account-meta-',
exclude_headers=([ exclude_headers=([
'content-length', 'date', 'content-length', 'date',
'x-account-container-count', 'x-account-container-count',
'x-account-object-count', 'x-account-object-count',
'x-account-bytes-used'] + exclude_policy_headers))) 'x-account-bytes-used'] + exclude_policy_headers)))
# line up the items nicely # line up the items nicely
offset = max(len(item) for item, value in items_to_print) offset = max(len(item) for item, value in items)
thread_manager.print_items(items_to_print, offset=offset) output_manager.print_items(items, offset=offset)
def stat_container(conn, options, args, thread_manager): def stat_container(conn, options, container):
headers = conn.head_container(args[0]) headers = conn.head_container(container)
if options.verbose > 1: items = []
path = '%s/%s' % (conn.url, args[0]) if options['verbose'] > 1:
thread_manager.print_items(( path = '%s/%s' % (conn.url, container)
items.extend([
('URL', path), ('URL', path),
('Auth Token', conn.token), ('Auth Token', conn.token)
)) ])
object_count = prt_bytes( object_count = prt_bytes(
headers.get('x-container-object-count', 0), headers.get('x-container-object-count', 0),
options.human).lstrip() options['human']).lstrip()
bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0), bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0),
options.human).lstrip() options['human']).lstrip()
thread_manager.print_items(( items.extend([
('Account', conn.url.rsplit('/', 1)[-1]), ('Account', conn.url.rsplit('/', 1)[-1]),
('Container', args[0]), ('Container', container),
('Objects', object_count), ('Objects', object_count),
('Bytes', bytes_used), ('Bytes', bytes_used),
('Read ACL', headers.get('x-container-read', '')), ('Read ACL', headers.get('x-container-read', '')),
('Write ACL', headers.get('x-container-write', '')), ('Write ACL', headers.get('x-container-write', '')),
('Sync To', headers.get('x-container-sync-to', '')), ('Sync To', headers.get('x-container-sync-to', '')),
('Sync Key', headers.get('x-container-sync-key', '')), ('Sync Key', headers.get('x-container-sync-key', ''))
])
return items, headers
def print_container_stats(items, headers, output_manager):
items.extend(headers_to_items(
headers,
meta_prefix='x-container-meta-',
exclude_headers=(
'content-length', 'date',
'x-container-object-count',
'x-container-bytes-used',
'x-container-read',
'x-container-write',
'x-container-sync-to',
'x-container-sync-key'
)
)) ))
thread_manager.print_headers(headers, # line up the items nicely
meta_prefix='x-container-meta-', offset = max(len(item) for item, value in items)
exclude_headers=( output_manager.print_items(items, offset=offset)
'content-length', 'date',
'x-container-object-count',
'x-container-bytes-used',
'x-container-read',
'x-container-write',
'x-container-sync-to',
'x-container-sync-key'))
def stat_object(conn, options, args, thread_manager): def stat_object(conn, options, container, obj):
headers = conn.head_object(args[0], args[1]) headers = conn.head_object(container, obj)
if options.verbose > 1: items = []
path = '%s/%s/%s' % (conn.url, args[0], args[1]) if options['verbose'] > 1:
thread_manager.print_items(( path = '%s/%s/%s' % (conn.url, container, obj)
items.extend([
('URL', path), ('URL', path),
('Auth Token', conn.token), ('Auth Token', conn.token)
)) ])
content_length = prt_bytes(headers.get('content-length', 0), content_length = prt_bytes(headers.get('content-length', 0),
options.human).lstrip() options['human']).lstrip()
thread_manager.print_items(( items.extend([
('Account', conn.url.rsplit('/', 1)[-1]), ('Account', conn.url.rsplit('/', 1)[-1]),
('Container', args[0]), ('Container', container),
('Object', args[1]), ('Object', obj),
('Content Type', headers.get('content-type')), ('Content Type', headers.get('content-type')),
('Content Length', content_length), ('Content Length', content_length),
('Last Modified', headers.get('last-modified')), ('Last Modified', headers.get('last-modified')),
('ETag', headers.get('etag')), ('ETag', headers.get('etag')),
('Manifest', headers.get('x-object-manifest')), ('Manifest', headers.get('x-object-manifest'))
), skip_missing=True) ])
thread_manager.print_headers(headers, return items, headers
meta_prefix='x-object-meta-',
exclude_headers=(
'content-type', 'content-length', def print_object_stats(items, headers, output_manager):
'last-modified', 'etag', 'date', items.extend(headers_to_items(
'x-object-manifest')) headers,
meta_prefix='x-object-meta-',
exclude_headers=(
'content-type', 'content-length',
'last-modified', 'etag', 'date',
'x-object-manifest')
))
# line up the items nicely
offset = max(len(item) for item, value in items)
output_manager.print_items(items, offset=offset, skip_missing=True)
def headers_to_items(headers, meta_prefix='', exclude_headers=None):
exclude_headers = exclude_headers or []
other_items = []
meta_items = []
for key, value in headers.items():
if key not in exclude_headers:
if key.startswith(meta_prefix):
meta_key = 'Meta %s' % key[len(meta_prefix):].title()
meta_items.append((meta_key, value))
else:
other_items.append((key.title(), value))
return meta_items + other_items

@ -15,171 +15,21 @@
from __future__ import print_function from __future__ import print_function
from itertools import chain
import six import six
import sys import sys
from time import sleep
from six.moves.queue import Queue
from threading import Thread
from traceback import format_exception
from swiftclient.exceptions import ClientException from concurrent.futures import ThreadPoolExecutor
from six.moves.queue import PriorityQueue
class StopWorkerThreadSignal(object): class OutputManager(object):
pass
class QueueFunctionThread(Thread):
""" """
Calls `func`` for each item in ``queue``; ``func`` is called with a One object to manage and provide helper functions for output.
de-queued item as the first arg followed by ``*args`` and ``**kwargs``.
Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.
If the optional kwarg ``store_results`` is specified, it must be a list and
each result of invoking ``func`` will be appended to that list.
Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
this thread to exit.
"""
def __init__(self, queue, func, *args, **kwargs):
"""
:param queue: A :class:`Queue` object from which work jobs will be
pulled.
:param func: A callable which will be invoked with a dequeued item
followed by ``*args`` and ``**kwargs``.
:param \*args: Optional positional arguments for ``func``.
:param \*\*kwargs: Optional kwargs for func. If the kwarg
``store_results`` is specified, its value must be a
list, and every result from invoking ``func`` will
be appended to the supplied list. The kwarg
``store_results`` will not be passed into ``func``.
"""
Thread.__init__(self)
self.queue = queue
self.func = func
self.args = args
self.kwargs = kwargs
self.exc_infos = []
self.store_results = kwargs.pop('store_results', None)
def run(self):
while True:
item = self.queue.get()
if isinstance(item, StopWorkerThreadSignal):
break
try:
result = self.func(item, *self.args, **self.kwargs)
if self.store_results is not None:
self.store_results.append(result)
except Exception:
self.exc_infos.append(sys.exc_info())
class QueueFunctionManager(object):
"""
A context manager to handle the life-cycle of a single :class:`Queue`
and a list of associated :class:`QueueFunctionThread` instances.
This class is not usually instantiated directly. Instead, call the
:meth:`MultiThreadingManager.queue_manager` object method,
which will return an instance of this class.
When entering the context, ``thread_count`` :class:`QueueFunctionThread`
instances are created and started. The input queue is returned. Inside
the context, any work item put into the queue will get worked on by one of
the :class:`QueueFunctionThread` instances.
When the context is exited, all threads are sent a
:class:`StopWorkerThreadSignal` instance and then all threads are waited
upon. Finally, any exceptions from any of the threads are reported on via
the supplied ``thread_manager``'s :meth:`error` method. If an
``error_counter`` list was supplied on instantiation, its first element is
incremented once for every exception which occurred.
"""
def __init__(self, func, thread_count, thread_manager, thread_args=None,
thread_kwargs=None, error_counter=None,
connection_maker=None):
"""
:param func: The worker function which will be passed into each
:class:`QueueFunctionThread`'s constructor.
:param thread_count: The number of worker threads to run.
:param thread_manager: An instance of :class:`MultiThreadingManager`.
:param thread_args: Optional positional arguments to be passed into
each invocation of ``func`` after the de-queued
work item.
:param thread_kwargs: Optional keyword arguments to be passed into each
invocation of ``func``. If a list is supplied as
the ``store_results`` keyword argument, it will
be filled with every result of invoking ``func``
in all threads.
:param error_counter: Optional list containing one integer. If
supplied, the list's first element will be
incremented once for each exception in any
thread. This happens only when exiting the
context.
:param connection_maker: Optional callable. If supplied, this callable
will be invoked once per created thread, and
the result will be passed into func after the
de-queued work item but before ``thread_args``
and ``thread_kwargs``. This is used to ensure
each thread has its own connection to Swift.
"""
self.func = func
self.thread_count = thread_count
self.thread_manager = thread_manager
self.error_counter = error_counter
self.connection_maker = connection_maker
self.queue = Queue(10000)
self.thread_list = []
self.thread_args = thread_args if thread_args else ()
self.thread_kwargs = thread_kwargs if thread_kwargs else {}
def __enter__(self):
for _junk in range(self.thread_count):
if self.connection_maker:
thread_args = (self.connection_maker(),) + self.thread_args
else:
thread_args = self.thread_args
qf_thread = QueueFunctionThread(self.queue, self.func,
*thread_args, **self.thread_kwargs)
qf_thread.start()
self.thread_list.append(qf_thread)
return self.queue
def __exit__(self, exc_type, exc_value, traceback):
for thread in [t for t in self.thread_list if t.isAlive()]:
self.queue.put(StopWorkerThreadSignal())
while any(map(QueueFunctionThread.is_alive, self.thread_list)):
sleep(0.05)
for thread in self.thread_list:
for info in thread.exc_infos:
if self.error_counter:
self.error_counter[0] += 1
if isinstance(info[1], ClientException):
self.thread_manager.error(str(info[1]))
else:
self.thread_manager.error(''.join(format_exception(*info)))
class MultiThreadingManager(object):
"""
One object to manage context for multi-threading. This should make
bin/swift less error-prone and allow us to test this code.
This object is a context manager and returns itself into the context. When This object is a context manager and returns itself into the context. When
entering the context, two printing threads are created (see below) and they entering the context, two printing threads are created (see below) and they
are waited on and cleaned up when exiting the context. are waited on and cleaned up when exiting the context.
A convenience method, :meth:`queue_manager`, is provided to create a
:class:`QueueFunctionManager` context manager (a thread-pool with an
associated input queue for work items).
Also, thread-safe printing to two streams is provided. The Also, thread-safe printing to two streams is provided. The
:meth:`print_msg` method will print to the supplied ``print_stream`` :meth:`print_msg` method will print to the supplied ``print_stream``
(defaults to ``sys.stdout``) and the :meth:`error` method will print to the (defaults to ``sys.stdout``) and the :meth:`error` method will print to the
@ -198,39 +48,29 @@ class MultiThreadingManager(object):
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr): def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
""" """
:param print_stream: The stream to which :meth:`print_msg` sends :param print_stream: The stream to which :meth:`print_msg` sends
formatted messages formatted messages.
:param error_stream: The stream to which :meth:`error` sends formatted :param error_stream: The stream to which :meth:`error` sends formatted
messages messages.
On Python 2, Unicode messages are encoded to utf8. On Python 2, Unicode messages are encoded to utf8.
""" """
self.print_stream = print_stream self.print_stream = print_stream
self.printer = QueueFunctionManager(self._print, 1, self) self.print_pool = ThreadPoolExecutor(max_workers=1)
self.error_stream = error_stream self.error_stream = error_stream
self.error_printer = QueueFunctionManager(self._print_error, 1, self) self.error_print_pool = ThreadPoolExecutor(max_workers=1)
self.error_count = 0 self.error_count = 0
def __enter__(self): def __enter__(self):
self.printer.__enter__()
self.error_printer.__enter__()
return self return self
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
self.error_printer.__exit__(exc_type, exc_value, traceback) self.error_print_pool.__exit__(exc_type, exc_value, traceback)
self.printer.__exit__(exc_type, exc_value, traceback) self.print_pool.__exit__(exc_type, exc_value, traceback)
def queue_manager(self, func, thread_count, *args, **kwargs):
connection_maker = kwargs.pop('connection_maker', None)
error_counter = kwargs.pop('error_counter', None)
return QueueFunctionManager(func, thread_count, self, thread_args=args,
thread_kwargs=kwargs,
connection_maker=connection_maker,
error_counter=error_counter)
def print_msg(self, msg, *fmt_args): def print_msg(self, msg, *fmt_args):
if fmt_args: if fmt_args:
msg = msg % fmt_args msg = msg % fmt_args
self.printer.queue.put(msg) self.print_pool.submit(self._print, msg)
def print_items(self, items, offset=DEFAULT_OFFSET, skip_missing=False): def print_items(self, items, offset=DEFAULT_OFFSET, skip_missing=False):
lines = [] lines = []
@ -241,36 +81,10 @@ class MultiThreadingManager(object):
lines.append((template % (k, v)).rstrip()) lines.append((template % (k, v)).rstrip())
self.print_msg('\n'.join(lines)) self.print_msg('\n'.join(lines))
def print_headers(self, headers, meta_prefix='', exclude_headers=None,
offset=DEFAULT_OFFSET):
exclude_headers = exclude_headers or []
meta_headers = []
other_headers = []
template = '%%%ds: %%s' % offset
for key, value in headers.items():
if key.startswith(meta_prefix):
meta_key = 'Meta %s' % key[len(meta_prefix):].title()
meta_headers.append(template % (meta_key, value))
elif key not in exclude_headers:
other_headers.append(template % (key.title(), value))
self.print_msg('\n'.join(chain(meta_headers, other_headers)))
def headers_to_items(self, headers, meta_prefix='', exclude_headers=None):
exclude_headers = exclude_headers or []
meta_items = []
other_items = []
for key, value in headers.items():
if key.startswith(meta_prefix):
meta_key = 'Meta %s' % key[len(meta_prefix):].title()
meta_items.append((meta_key, value))
elif key not in exclude_headers:
other_items.append((key.title(), value))
return meta_items + other_items
def error(self, msg, *fmt_args): def error(self, msg, *fmt_args):
if fmt_args: if fmt_args:
msg = msg % fmt_args msg = msg % fmt_args
self.error_printer.queue.put(msg) self.error_print_pool.submit(self._print_error, msg)
def _print(self, item, stream=None): def _print(self, item, stream=None):
if stream is None: if stream is None:
@ -282,3 +96,79 @@ class MultiThreadingManager(object):
def _print_error(self, item): def _print_error(self, item):
self.error_count += 1 self.error_count += 1
return self._print(item, stream=self.error_stream) return self._print(item, stream=self.error_stream)
class MultiThreadingManager(object):
"""
One object to manage context for multi-threading. This should make
bin/swift less error-prone and allow us to test this code.
"""
def __init__(self, create_connection, segment_threads=10,
object_dd_threads=10, object_uu_threads=10,
container_threads=10):
"""
:param segment_threads: The number of threads allocated to segment
uploads
:param object_dd_threads: The number of threads allocated to object
download/delete jobs
:param object_uu_threads: The number of threads allocated to object
upload/update based jobs
:param container_threads: The number of threads allocated to
container/account level jobs
"""
self.segment_pool = ConnectionThreadPoolExecutor(
create_connection, max_workers=segment_threads)
self.object_dd_pool = ConnectionThreadPoolExecutor(
create_connection, max_workers=object_dd_threads)
self.object_uu_pool = ConnectionThreadPoolExecutor(
create_connection, max_workers=object_uu_threads)
self.container_pool = ConnectionThreadPoolExecutor(
create_connection, max_workers=container_threads)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.segment_pool.__exit__(exc_type, exc_value, traceback)
self.object_dd_pool.__exit__(exc_type, exc_value, traceback)
self.object_uu_pool.__exit__(exc_type, exc_value, traceback)
self.container_pool.__exit__(exc_type, exc_value, traceback)
class ConnectionThreadPoolExecutor(ThreadPoolExecutor):
"""
A wrapper class to maintain a pool of connections alongside the thread
pool. We start by creating a priority queue of connections, and each job
submitted takes one of those connections (initialising if necessary) and
passes it as the first arg to the executed function.
At the end of execution that connection is returned to the queue.
By using a PriorityQueue we avoid creating more connections than required.
We will only create as many connections as are required concurrently.
"""
def __init__(self, create_connection, max_workers):
self._connections = PriorityQueue()
self._create_connection = create_connection
for p in range(0, max_workers):
self._connections.put((p, None))
super(ConnectionThreadPoolExecutor, self).__init__(max_workers)
def submit(self, fn, *args, **kwargs):
def conn_fn():
priority = None
conn = None
try:
# If we get a connection we must put it back later
(priority, conn) = self._connections.get()
if conn is None:
conn = self._create_connection()
conn_args = (conn,) + args
return fn(*conn_args, **kwargs)
finally:
if priority is not None:
self._connections.put((priority, conn))
return super(ConnectionThreadPoolExecutor, self).submit(conn_fn)

2062
swiftclient/service.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -22,7 +22,7 @@ from six import StringIO
import testtools import testtools
from swiftclient import command_helpers as h from swiftclient import command_helpers as h
from swiftclient.multithreading import MultiThreadingManager from swiftclient.multithreading import OutputManager
class TestStatHelpers(testtools.TestCase): class TestStatHelpers(testtools.TestCase):
@ -34,10 +34,10 @@ class TestStatHelpers(testtools.TestCase):
'token': 'tk12345', 'token': 'tk12345',
} }
self.conn = mock.MagicMock(**conn_attrs) self.conn = mock.MagicMock(**conn_attrs)
self.options = mock.MagicMock(human=False, verbose=1) self.options = {'human': False, 'verbose': 1}
self.stdout = StringIO() self.stdout = StringIO()
self.stderr = StringIO() self.stderr = StringIO()
self.thread_manager = MultiThreadingManager(self.stdout, self.stderr) self.output_manager = OutputManager(self.stdout, self.stderr)
def assertOut(self, expected): def assertOut(self, expected):
real = self.stdout.getvalue() real = self.stdout.getvalue()
@ -57,7 +57,7 @@ class TestStatHelpers(testtools.TestCase):
raise raise
def test_stat_account_human(self): def test_stat_account_human(self):
self.options.human = True self.options['human'] = True
# stub head_account # stub head_account
stub_headers = { stub_headers = {
'x-account-container-count': 42, 'x-account-container-count': 42,
@ -66,8 +66,9 @@ class TestStatHelpers(testtools.TestCase):
} }
self.conn.head_account.return_value = stub_headers self.conn.head_account.return_value = stub_headers
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_account(self.conn, self.options, thread_manager) items, headers = h.stat_account(self.conn, self.options)
h.print_account_stats(items, headers, output_manager)
expected = """ expected = """
Account: a Account: a
Containers: 42 Containers: 42
@ -77,7 +78,7 @@ Containers: 42
self.assertOut(expected) self.assertOut(expected)
def test_stat_account_verbose(self): def test_stat_account_verbose(self):
self.options.verbose += 1 self.options['verbose'] += 1
# stub head_account # stub head_account
stub_headers = { stub_headers = {
'x-account-container-count': 42, 'x-account-container-count': 42,
@ -86,8 +87,9 @@ Containers: 42
} }
self.conn.head_account.return_value = stub_headers self.conn.head_account.return_value = stub_headers
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_account(self.conn, self.options, thread_manager) items, headers = h.stat_account(self.conn, self.options)
h.print_account_stats(items, headers, output_manager)
expected = """ expected = """
StorageURL: http://storage/v1/a StorageURL: http://storage/v1/a
Auth Token: tk12345 Auth Token: tk12345
@ -109,8 +111,9 @@ Containers: 42
} }
self.conn.head_account.return_value = stub_headers self.conn.head_account.return_value = stub_headers
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_account(self.conn, self.options, thread_manager) items, headers = h.stat_account(self.conn, self.options)
h.print_account_stats(items, headers, output_manager)
expected = """ expected = """
Account: a Account: a
Containers: 42 Containers: 42
@ -122,7 +125,7 @@ Objects in policy "nada": 1000000
self.assertOut(expected) self.assertOut(expected)
def test_stat_container_human(self): def test_stat_container_human(self):
self.options.human = True self.options['human'] = True
# stub head container request # stub head container request
stub_headers = { stub_headers = {
'x-container-object-count': 10 ** 6, 'x-container-object-count': 10 ** 6,
@ -130,22 +133,23 @@ Objects in policy "nada": 1000000
} }
self.conn.head_container.return_value = stub_headers self.conn.head_container.return_value = stub_headers
args = ('c',) args = ('c',)
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_container(self.conn, self.options, args, thread_manager) items, headers = h.stat_container(self.conn, self.options, *args)
h.print_container_stats(items, headers, output_manager)
expected = """ expected = """
Account: a Account: a
Container: c Container: c
Objects: 976K Objects: 976K
Bytes: 1.0G Bytes: 1.0G
Read ACL: Read ACL:
Write ACL: Write ACL:
Sync To: Sync To:
Sync Key: Sync Key:
""" """
self.assertOut(expected) self.assertOut(expected)
def test_stat_container_verbose(self): def test_stat_container_verbose(self):
self.options.verbose += 1 self.options['verbose'] += 1
# stub head container request # stub head container request
stub_headers = { stub_headers = {
'x-container-object-count': 10 ** 6, 'x-container-object-count': 10 ** 6,
@ -153,24 +157,25 @@ Objects in policy "nada": 1000000
} }
self.conn.head_container.return_value = stub_headers self.conn.head_container.return_value = stub_headers
args = ('c',) args = ('c',)
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_container(self.conn, self.options, args, thread_manager) items, headers = h.stat_container(self.conn, self.options, *args)
h.print_container_stats(items, headers, output_manager)
expected = """ expected = """
URL: http://storage/v1/a/c URL: http://storage/v1/a/c
Auth Token: tk12345 Auth Token: tk12345
Account: a Account: a
Container: c Container: c
Objects: 1000000 Objects: 1000000
Bytes: 1073741824 Bytes: 1073741824
Read ACL: Read ACL:
Write ACL: Write ACL:
Sync To: Sync To:
Sync Key: Sync Key:
""" """
self.assertOut(expected) self.assertOut(expected)
def test_stat_object_human(self): def test_stat_object_human(self):
self.options.human = True self.options['human'] = True
# stub head object request # stub head object request
stub_headers = { stub_headers = {
'content-length': 2 ** 20, 'content-length': 2 ** 20,
@ -180,21 +185,22 @@ Objects in policy "nada": 1000000
} }
self.conn.head_object.return_value = stub_headers self.conn.head_object.return_value = stub_headers
args = ('c', 'o') args = ('c', 'o')
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_object(self.conn, self.options, args, thread_manager) items, headers = h.stat_object(self.conn, self.options, *args)
h.print_object_stats(items, headers, output_manager)
expected = """ expected = """
Account: a Account: a
Container: c Container: c
Object: o Object: o
Content Length: 1.0M Content Length: 1.0M
ETag: 68b329da9893e34099c7d8ad5cb9c940 ETag: 68b329da9893e34099c7d8ad5cb9c940
Meta Color: blue Meta Color: blue
Content-Encoding: gzip Content-Encoding: gzip
""" """
self.assertOut(expected) self.assertOut(expected)
def test_stat_object_verbose(self): def test_stat_object_verbose(self):
self.options.verbose += 1 self.options['verbose'] += 1
# stub head object request # stub head object request
stub_headers = { stub_headers = {
'content-length': 2 ** 20, 'content-length': 2 ** 20,
@ -204,17 +210,18 @@ Content-Encoding: gzip
} }
self.conn.head_object.return_value = stub_headers self.conn.head_object.return_value = stub_headers
args = ('c', 'o') args = ('c', 'o')
with self.thread_manager as thread_manager: with self.output_manager as output_manager:
h.stat_object(self.conn, self.options, args, thread_manager) items, headers = h.stat_object(self.conn, self.options, *args)
h.print_object_stats(items, headers, output_manager)
expected = """ expected = """
URL: http://storage/v1/a/c/o URL: http://storage/v1/a/c/o
Auth Token: tk12345 Auth Token: tk12345
Account: a Account: a
Container: c Container: c
Object: o Object: o
Content Length: 1048576 Content Length: 1048576
ETag: 68b329da9893e34099c7d8ad5cb9c940 ETag: 68b329da9893e34099c7d8ad5cb9c940
Meta Color: blue Meta Color: blue
Content-Encoding: gzip Content-Encoding: gzip
""" """
self.assertOut(expected) self.assertOut(expected)

@ -14,41 +14,40 @@
# limitations under the License. # limitations under the License.
import sys import sys
import time
try:
from unittest import mock
except ImportError:
import mock
import testtools import testtools
import threading import threading
import six import six
from concurrent.futures import as_completed
from six.moves.queue import Queue, Empty from six.moves.queue import Queue, Empty
from time import sleep
from swiftclient import multithreading as mt from swiftclient import multithreading as mt
from swiftclient.exceptions import ClientException
class ThreadTestCase(testtools.TestCase): class ThreadTestCase(testtools.TestCase):
def setUp(self): def setUp(self):
super(ThreadTestCase, self).setUp() super(ThreadTestCase, self).setUp()
self.got_items = Queue()
self.got_args_kwargs = Queue() self.got_args_kwargs = Queue()
self.starting_thread_count = threading.active_count() self.starting_thread_count = threading.active_count()
def _func(self, q_item, *args, **kwargs): def _func(self, conn, item, *args, **kwargs):
self.got_items.put(q_item) self.got_items.put((conn, item))
self.got_args_kwargs.put((args, kwargs)) self.got_args_kwargs.put((args, kwargs))
if q_item == 'go boom': if item == 'sleep':
sleep(1)
if item == 'go boom':
raise Exception('I went boom!') raise Exception('I went boom!')
if q_item == 'c boom':
raise ClientException(
'Client Boom', http_scheme='http', http_host='192.168.22.1',
http_port=80, http_path='/booze', http_status=404,
http_reason='to much', http_response_content='no sir!')
return 'best result EVAR!' return 'success'
def _create_conn(self):
return "This is a connection"
def _create_conn_fail(self):
raise Exception("This is a failed connection")
def assertQueueContains(self, queue, expected_contents): def assertQueueContains(self, queue, expected_contents):
got_contents = [] got_contents = []
@ -62,240 +61,125 @@ class ThreadTestCase(testtools.TestCase):
self.assertEqual(expected_contents, got_contents) self.assertEqual(expected_contents, got_contents)
class TestQueueFunctionThread(ThreadTestCase): class TestConnectionThreadPoolExecutor(ThreadTestCase):
def setUp(self): def setUp(self):
super(TestQueueFunctionThread, self).setUp() super(TestConnectionThreadPoolExecutor, self).setUp()
self.input_queue = Queue() self.input_queue = Queue()
self.got_items = Queue()
self.stored_results = [] self.stored_results = []
self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
'one_arg', 'two_arg',
red_fish='blue_arg',
store_results=self.stored_results)
self.qft.start()
def tearDown(self): def tearDown(self):
if self.qft.is_alive(): super(TestConnectionThreadPoolExecutor, self).tearDown()
self.finish_up_thread()
super(TestQueueFunctionThread, self).tearDown() def test_submit_good_connection(self):
ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 1)
with ctpe as pool:
# Try submitting a job that should succeed
f = pool.submit(self._func, "succeed")
f.result()
self.assertQueueContains(
self.got_items,
[("This is a connection", "succeed")]
)
def finish_up_thread(self): # Now a job that fails
self.input_queue.put(mt.StopWorkerThreadSignal()) went_boom = False
while self.qft.is_alive(): try:
time.sleep(0.05) f = pool.submit(self._func, "go boom")
f.result()
except Exception as e:
went_boom = True
self.assertEquals('I went boom!', str(e))
self.assertTrue(went_boom)
def test_plumbing_and_store_results(self): # Has the connection been returned to the pool?
self.input_queue.put('abc') f = pool.submit(self._func, "succeed")
self.input_queue.put(123) f.result()
self.finish_up_thread() self.assertQueueContains(
self.got_items,
[
("This is a connection", "go boom"),
("This is a connection", "succeed")
]
)
self.assertQueueContains(self.got_items, ['abc', 123]) def test_submit_bad_connection(self):
self.assertQueueContains(self.got_args_kwargs, [ ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn_fail, 1)
(('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}), with ctpe as pool:
(('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})]) # Now a connection that fails
self.assertEqual(self.stored_results, connection_failed = False
['best result EVAR!', 'best result EVAR!']) try:
f = pool.submit(self._func, "succeed")
f.result()
except Exception as e:
connection_failed = True
self.assertEquals('This is a failed connection', str(e))
self.assertTrue(connection_failed)
def test_exception_handling(self): # Make sure we don't lock up on failed connections
self.input_queue.put('go boom') connection_failed = False
self.input_queue.put('ok') try:
self.input_queue.put('go boom') f = pool.submit(self._func, "go boom")
self.finish_up_thread() f.result()
except Exception as e:
connection_failed = True
self.assertEquals('This is a failed connection', str(e))
self.assertTrue(connection_failed)
self.assertQueueContains(self.got_items, def test_lazy_connections(self):
['go boom', 'ok', 'go boom']) ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
self.assertEqual(len(self.qft.exc_infos), 2) with ctpe as pool:
self.assertEqual(Exception, self.qft.exc_infos[0][0]) # Submit multiple jobs sequentially - should only use 1 conn
self.assertEqual(Exception, self.qft.exc_infos[1][0]) f = pool.submit(self._func, "succeed")
self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args) f.result()
self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args) f = pool.submit(self._func, "succeed")
f.result()
f = pool.submit(self._func, "succeed")
f.result()
expected_connections = [(0, "This is a connection")]
expected_connections.extend([(x, None) for x in range(1, 10)])
self.assertQueueContains(
pool._connections, expected_connections
)
ctpe = mt.ConnectionThreadPoolExecutor(self._create_conn, 10)
with ctpe as pool:
fs = []
f1 = pool.submit(self._func, "sleep")
f2 = pool.submit(self._func, "sleep")
f3 = pool.submit(self._func, "sleep")
fs.extend([f1, f2, f3])
expected_connections = [
(0, "This is a connection"),
(1, "This is a connection"),
(2, "This is a connection")
]
expected_connections.extend([(x, None) for x in range(3, 10)])
for f in as_completed(fs):
f.result()
self.assertQueueContains(
pool._connections, expected_connections
)
class TestQueueFunctionManager(ThreadTestCase): class TestOutputManager(testtools.TestCase):
def setUp(self):
super(TestQueueFunctionManager, self).setUp()
self.thread_manager = mock.create_autospec(
mt.MultiThreadingManager, spec_set=True, instance=True)
self.thread_count = 4
self.error_counter = [0]
self.got_items = Queue()
self.stored_results = []
self.qfq = mt.QueueFunctionManager(
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'),
thread_kwargs={'a': 'b', 'store_results': self.stored_results},
error_counter=self.error_counter,
connection_maker=self.connection_maker)
def connection_maker(self): def test_instantiation(self):
return 'yup, I made a connection' output_manager = mt.OutputManager()
def test_context_manager_without_error_counter(self): self.assertEqual(sys.stdout, output_manager.print_stream)
self.qfq = mt.QueueFunctionManager( self.assertEqual(sys.stderr, output_manager.error_stream)
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'),
thread_kwargs={'a': 'b', 'store_results': self.stored_results},
connection_maker=self.connection_maker)
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
input_queue.put('go boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = list(map(str, self.thread_manager.error.call_args_list))
self.assertEqual(1, len(error_strs))
self.assertTrue('Exception: I went boom!' in error_strs[0])
def test_context_manager_without_conn_maker_or_error_counter(self):
self.qfq = mt.QueueFunctionManager(
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in range(20):
input_queue.put('slap%d' % i)
self.assertEqual(self.starting_thread_count, threading.active_count())
self.assertEqual([], self.thread_manager.error.call_args_list)
self.assertEqual(0, self.error_counter[0])
self.assertQueueContains(self.got_items,
set(['slap%d' % i for i in range(20)]))
self.assertQueueContains(
self.got_args_kwargs,
[(('1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, [])
def test_context_manager_with_exceptions(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in range(20):
input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = list(map(str, self.thread_manager.error.call_args_list))
self.assertEqual(10, len(error_strs))
self.assertTrue(all(['Exception: I went boom!' in s for s in
error_strs]))
self.assertEqual(10, self.error_counter[0])
expected_items = set(['go boom'] +
['item%d' % i for i in range(20)
if i % 2 == 0])
self.assertQueueContains(self.got_items, expected_items)
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
def test_context_manager_with_client_exceptions(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in range(20):
input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = list(map(str, self.thread_manager.error.call_args_list))
self.assertEqual(10, len(error_strs))
stringification = 'Client Boom: ' \
'http://192.168.22.1:80/booze 404 to much no sir!'
self.assertTrue(all([stringification in s for s in error_strs]))
self.assertEqual(10, self.error_counter[0])
expected_items = set(['c boom'] +
['item%d' % i for i in range(20)
if i % 2 == 0])
self.assertQueueContains(self.got_items, expected_items)
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
def test_context_manager_with_connection_maker(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in range(20):
input_queue.put('item%d' % i)
self.assertEqual(self.starting_thread_count, threading.active_count())
self.assertEqual([], self.thread_manager.error.call_args_list)
self.assertEqual(0, self.error_counter[0])
self.assertQueueContains(self.got_items,
set(['item%d' % i for i in range(20)]))
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
class TestMultiThreadingManager(ThreadTestCase):
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_instantiation(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
self.assertEqual([
mock.call(thread_manager._print, 1, thread_manager),
mock.call(thread_manager._print_error, 1, thread_manager),
], mock_qfq.call_args_list)
# These contexts don't get entered into until the
# MultiThreadingManager's context is entered.
self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
self.assertEqual([],
thread_manager.error_printer.__enter__.call_args_list)
# Test default values for the streams.
self.assertEqual(sys.stdout, thread_manager.print_stream)
self.assertEqual(sys.stderr, thread_manager.error_stream)
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_queue_manager_no_args(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
mock_qfq.reset_mock()
mock_qfq.return_value = 'slap happy!'
self.assertEqual(
'slap happy!',
thread_manager.queue_manager(self._func, 88))
self.assertEqual([
mock.call(self._func, 88, thread_manager, thread_args=(),
thread_kwargs={}, connection_maker=None,
error_counter=None)
], mock_qfq.call_args_list)
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_queue_manager_with_args(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
mock_qfq.reset_mock()
mock_qfq.return_value = 'do run run'
self.assertEqual(
'do run run',
thread_manager.queue_manager(self._func, 88, 'fun', times='are',
connection_maker='abc', to='be had',
error_counter='def'))
self.assertEqual([
mock.call(self._func, 88, thread_manager, thread_args=('fun',),
thread_kwargs={'times': 'are', 'to': 'be had'},
connection_maker='abc', error_counter='def')
], mock_qfq.call_args_list)
def test_printers(self): def test_printers(self):
out_stream = six.StringIO() out_stream = six.StringIO()
err_stream = six.StringIO() err_stream = six.StringIO()
starting_thread_count = threading.active_count()
with mt.MultiThreadingManager( with mt.OutputManager(
print_stream=out_stream, print_stream=out_stream,
error_stream=err_stream) as thread_manager: error_stream=err_stream) as thread_manager:
@ -304,7 +188,8 @@ class TestMultiThreadingManager(ThreadTestCase):
self.assertEqual(out_stream, thread_manager.print_stream) self.assertEqual(out_stream, thread_manager.print_stream)
self.assertEqual(err_stream, thread_manager.error_stream) self.assertEqual(err_stream, thread_manager.error_stream)
self.assertEqual(self.starting_thread_count + 2, # No printing has happened yet, so no new threads
self.assertEqual(starting_thread_count,
threading.active_count()) threading.active_count())
thread_manager.print_msg('one-argument') thread_manager.print_msg('one-argument')
@ -317,7 +202,13 @@ class TestMultiThreadingManager(ThreadTestCase):
thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!', thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
3.14159) 3.14159)
self.assertEqual(self.starting_thread_count, threading.active_count()) # Now we have a thread for error printing and a thread for
# normal print messages
self.assertEqual(starting_thread_count + 2,
threading.active_count())
# The threads should have been cleaned up
self.assertEqual(starting_thread_count, threading.active_count())
out_stream.seek(0) out_stream.seek(0)
if six.PY3: if six.PY3:

@ -24,6 +24,7 @@ import swiftclient
import swiftclient.shell import swiftclient.shell
import swiftclient.utils import swiftclient.utils
from os.path import basename, dirname
if six.PY2: if six.PY2:
BUILTIN_OPEN = '__builtin__.open' BUILTIN_OPEN = '__builtin__.open'
@ -50,8 +51,8 @@ class TestShell(unittest.TestCase):
except OSError: except OSError:
pass pass
@mock.patch('swiftclient.shell.MultiThreadingManager._print') @mock.patch('swiftclient.shell.OutputManager._print')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_stat_account(self, connection, mock_print): def test_stat_account(self, connection, mock_print):
argv = ["", "stat"] argv = ["", "stat"]
return_headers = { return_headers = {
@ -66,12 +67,11 @@ class TestShell(unittest.TestCase):
calls = [mock.call(' Account: AUTH_account\n' + calls = [mock.call(' Account: AUTH_account\n' +
'Containers: 1\n' + 'Containers: 1\n' +
' Objects: 2\n' + ' Objects: 2\n' +
' Bytes: 3'), ' Bytes: 3')]
]
mock_print.assert_has_calls(calls) mock_print.assert_has_calls(calls)
@mock.patch('swiftclient.shell.MultiThreadingManager._print') @mock.patch('swiftclient.shell.OutputManager._print')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_stat_container(self, connection, mock_print): def test_stat_container(self, connection, mock_print):
return_headers = { return_headers = {
'x-container-object-count': '1', 'x-container-object-count': '1',
@ -85,19 +85,18 @@ class TestShell(unittest.TestCase):
connection.return_value.head_container.return_value = return_headers connection.return_value.head_container.return_value = return_headers
connection.return_value.url = 'http://127.0.0.1/v1/AUTH_account' connection.return_value.url = 'http://127.0.0.1/v1/AUTH_account'
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
calls = [mock.call(' Account: AUTH_account\n' + calls = [mock.call(' Account: AUTH_account\n' +
' Container: container\n' + 'Container: container\n' +
' Objects: 1\n' + ' Objects: 1\n' +
' Bytes: 2\n' + ' Bytes: 2\n' +
' Read ACL: test2:tester2\n' + ' Read ACL: test2:tester2\n' +
' Write ACL: test3:tester3\n' + 'Write ACL: test3:tester3\n' +
' Sync To: other\n' + ' Sync To: other\n' +
' Sync Key: secret'), ' Sync Key: secret')]
mock.call('')]
mock_print.assert_has_calls(calls) mock_print.assert_has_calls(calls)
@mock.patch('swiftclient.shell.MultiThreadingManager._print') @mock.patch('swiftclient.shell.OutputManager._print')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_stat_object(self, connection, mock_print): def test_stat_object(self, connection, mock_print):
return_headers = { return_headers = {
'x-object-manifest': 'manifest', 'x-object-manifest': 'manifest',
@ -117,12 +116,11 @@ class TestShell(unittest.TestCase):
'Content Length: 42\n' + 'Content Length: 42\n' +
' Last Modified: yesterday\n' + ' Last Modified: yesterday\n' +
' ETag: md5\n' + ' ETag: md5\n' +
' Manifest: manifest'), ' Manifest: manifest')]
mock.call('')]
mock_print.assert_has_calls(calls) mock_print.assert_has_calls(calls)
@mock.patch('swiftclient.shell.MultiThreadingManager._print') @mock.patch('swiftclient.shell.OutputManager._print')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_list_account(self, connection, mock_print): def test_list_account(self, connection, mock_print):
# Test account listing # Test account listing
connection.return_value.get_account.side_effect = [ connection.return_value.get_account.side_effect = [
@ -138,8 +136,8 @@ class TestShell(unittest.TestCase):
calls = [mock.call('container')] calls = [mock.call('container')]
mock_print.assert_has_calls(calls) mock_print.assert_has_calls(calls)
@mock.patch('swiftclient.shell.MultiThreadingManager._print') @mock.patch('swiftclient.shell.OutputManager._print')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_list_container(self, connection, mock_print): def test_list_container(self, connection, mock_print):
connection.return_value.get_container.side_effect = [ connection.return_value.get_container.side_effect = [
[None, [{'name': 'object_a'}]], [None, [{'name': 'object_a'}]],
@ -173,8 +171,8 @@ class TestShell(unittest.TestCase):
mock.call(' 0')] mock.call(' 0')]
mock_print.assert_has_calls(calls) mock_print.assert_has_calls(calls)
@mock.patch('swiftclient.shell.makedirs') @mock.patch('swiftclient.service.makedirs')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_download(self, connection, makedirs): def test_download(self, connection, makedirs):
connection.return_value.get_object.return_value = [ connection.return_value.get_object.return_value = [
{'content-type': 'text/plain', {'content-type': 'text/plain',
@ -194,9 +192,11 @@ class TestShell(unittest.TestCase):
argv = ["", "download", "container"] argv = ["", "download", "container"]
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
calls = [mock.call('container', 'object', calls = [mock.call('container', 'object',
headers={}, resp_chunk_size=65536), headers={}, resp_chunk_size=65536,
response_dict={}),
mock.call('container', 'pseudo/', mock.call('container', 'pseudo/',
headers={}, resp_chunk_size=65536)] headers={}, resp_chunk_size=65536,
response_dict={})]
connection.return_value.get_object.assert_has_calls( connection.return_value.get_object.assert_has_calls(
calls, any_order=True) calls, any_order=True)
mock_open.assert_called_once_with('object', 'wb') mock_open.assert_called_once_with('object', 'wb')
@ -206,12 +206,13 @@ class TestShell(unittest.TestCase):
argv = ["", "download", "container", "object"] argv = ["", "download", "container", "object"]
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.get_object.assert_called_with( connection.return_value.get_object.assert_called_with(
'container', 'object', headers={}, resp_chunk_size=65536) 'container', 'object', headers={}, resp_chunk_size=65536,
response_dict={})
mock_open.assert_called_with('object', 'wb') mock_open.assert_called_with('object', 'wb')
@mock.patch('swiftclient.shell.listdir') @mock.patch('swiftclient.shell.walk')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_upload(self, connection, listdir): def test_upload(self, connection, walk):
connection.return_value.head_object.return_value = { connection.return_value.head_object.return_value = {
'content-length': '0'} 'content-length': '0'}
connection.return_value.attempts = 0 connection.return_value.attempts = 0
@ -220,7 +221,8 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.put_container.assert_called_with( connection.return_value.put_container.assert_called_with(
'container', 'container',
{'X-Storage-Policy': mock.ANY}) {'X-Storage-Policy': mock.ANY},
response_dict={})
connection.return_value.put_object.assert_called_with( connection.return_value.put_object.assert_called_with(
'container', 'container',
@ -228,18 +230,23 @@ class TestShell(unittest.TestCase):
mock.ANY, mock.ANY,
content_length=0, content_length=0,
headers={'x-object-meta-mtime': mock.ANY, headers={'x-object-meta-mtime': mock.ANY,
'X-Storage-Policy': 'one'}) 'X-Storage-Policy': 'one'},
response_dict={})
# Upload whole directory # Upload whole directory
argv = ["", "upload", "container", "/tmp"] argv = ["", "upload", "container", "/tmp"]
listdir.return_value = [self.tmpfile] _tmpfile = self.tmpfile
_tmpfile_dir = dirname(_tmpfile)
_tmpfile_base = basename(_tmpfile)
walk.return_value = [(_tmpfile_dir, [], [_tmpfile_base])]
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.put_object.assert_called_with( connection.return_value.put_object.assert_called_with(
'container', 'container',
self.tmpfile.lstrip('/'), self.tmpfile.lstrip('/'),
mock.ANY, mock.ANY,
content_length=0, content_length=0,
headers={'x-object-meta-mtime': mock.ANY}) headers={'x-object-meta-mtime': mock.ANY},
response_dict={})
# Upload in segments # Upload in segments
connection.return_value.head_container.return_value = { connection.return_value.head_container.return_value = {
@ -250,16 +257,18 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.put_container.assert_called_with( connection.return_value.put_container.assert_called_with(
'container_segments', 'container_segments',
{'X-Storage-Policy': mock.ANY}) {'X-Storage-Policy': mock.ANY},
response_dict={})
connection.return_value.put_object.assert_called_with( connection.return_value.put_object.assert_called_with(
'container', 'container',
self.tmpfile.lstrip('/'), self.tmpfile.lstrip('/'),
'', '',
content_length=0, content_length=0,
headers={'x-object-manifest': mock.ANY, headers={'x-object-manifest': mock.ANY,
'x-object-meta-mtime': mock.ANY}) 'x-object-meta-mtime': mock.ANY},
response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_delete_account(self, connection): def test_delete_account(self, connection):
connection.return_value.get_account.side_effect = [ connection.return_value.get_account.side_effect = [
[None, [{'name': 'container'}]], [None, [{'name': 'container'}]],
@ -274,11 +283,11 @@ class TestShell(unittest.TestCase):
connection.return_value.head_object.return_value = {} connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.delete_container.assert_called_with( connection.return_value.delete_container.assert_called_with(
'container') 'container', response_dict={})
connection.return_value.delete_object.assert_called_with( connection.return_value.delete_object.assert_called_with(
'container', 'object', query_string=None) 'container', 'object', query_string=None, response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_delete_container(self, connection): def test_delete_container(self, connection):
connection.return_value.get_container.side_effect = [ connection.return_value.get_container.side_effect = [
[None, [{'name': 'object'}]], [None, [{'name': 'object'}]],
@ -289,34 +298,34 @@ class TestShell(unittest.TestCase):
connection.return_value.head_object.return_value = {} connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.delete_container.assert_called_with( connection.return_value.delete_container.assert_called_with(
'container') 'container', response_dict={})
connection.return_value.delete_object.assert_called_with( connection.return_value.delete_object.assert_called_with(
'container', 'object', query_string=None) 'container', 'object', query_string=None, response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_delete_object(self, connection): def test_delete_object(self, connection):
argv = ["", "delete", "container", "object"] argv = ["", "delete", "container", "object"]
connection.return_value.head_object.return_value = {} connection.return_value.head_object.return_value = {}
connection.return_value.attempts = 0 connection.return_value.attempts = 0
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.delete_object.assert_called_with( connection.return_value.delete_object.assert_called_with(
'container', 'object', query_string=None) 'container', 'object', query_string=None, response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_post_account(self, connection): def test_post_account(self, connection):
argv = ["", "post"] argv = ["", "post"]
connection.return_value.head_object.return_value = {} connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.post_account.assert_called_with( connection.return_value.post_account.assert_called_with(
headers={}) headers={}, response_dict={})
argv = ["", "post", "container"] argv = ["", "post", "container"]
connection.return_value.head_object.return_value = {} connection.return_value.head_object.return_value = {}
swiftclient.shell.main(argv) swiftclient.shell.main(argv)
connection.return_value.post_container.assert_called_with( connection.return_value.post_container.assert_called_with(
'container', headers={}) 'container', headers={}, response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_post_container(self, connection): def test_post_container(self, connection):
argv = ["", "post", "container", argv = ["", "post", "container",
"--read-acl", "test2:tester2", "--read-acl", "test2:tester2",
@ -331,9 +340,9 @@ class TestShell(unittest.TestCase):
'X-Container-Write': 'test3:tester3 test4', 'X-Container-Write': 'test3:tester3 test4',
'X-Container-Read': 'test2:tester2', 'X-Container-Read': 'test2:tester2',
'X-Container-Sync-Key': 'secret', 'X-Container-Sync-Key': 'secret',
'X-Container-Sync-To': 'othersite'}) 'X-Container-Sync-To': 'othersite'}, response_dict={})
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_post_object(self, connection): def test_post_object(self, connection):
argv = ["", "post", "container", "object", argv = ["", "post", "container", "object",
"--meta", "Color:Blue", "--meta", "Color:Blue",
@ -344,7 +353,7 @@ class TestShell(unittest.TestCase):
connection.return_value.post_object.assert_called_with( connection.return_value.post_object.assert_called_with(
'container', 'object', headers={ 'container', 'object', headers={
'Content-Type': 'text/plain', 'Content-Type': 'text/plain',
'X-Object-Meta-Color': 'Blue'}) 'X-Object-Meta-Color': 'Blue'}, response_dict={})
@mock.patch('swiftclient.shell.generate_temp_url') @mock.patch('swiftclient.shell.generate_temp_url')
def test_temp_url(self, temp_url): def test_temp_url(self, temp_url):
@ -356,7 +365,7 @@ class TestShell(unittest.TestCase):
temp_url.assert_called_with( temp_url.assert_called_with(
'/v1/AUTH_account/c/o', 60, 'secret_key', 'GET') '/v1/AUTH_account/c/o', 60, 'secret_key', 'GET')
@mock.patch('swiftclient.shell.Connection') @mock.patch('swiftclient.service.Connection')
def test_capabilities(self, connection): def test_capabilities(self, connection):
argv = ["", "capabilities"] argv = ["", "capabilities"]
connection.return_value.get_capabilities.return_value = {'swift': None} connection.return_value.get_capabilities.return_value = {'swift': None}