Added an internal client.

Refactored object expirer to use this client.

Change-Id: Ibeca6dba873f8b4a558ecf3ba6e8d23d36f545b0
This commit is contained in:
Greg Lange 2012-05-04 18:07:22 +00:00
parent 5dcc9083a7
commit 8d2fe89a7d
5 changed files with 1598 additions and 460 deletions

View File

@ -80,6 +80,16 @@ Direct Client
:undoc-members:
:show-inheritance:
.. _internal_client:
Internal Client
===============
.. automodule:: swift.common.internal_client
:members:
:undoc-members:
:show-inheritance:
.. _buffered_http:
Buffered HTTP

View File

@ -0,0 +1,624 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import sleep, Timeout
import json
from paste.deploy import loadapp
import struct
from sys import exc_info
from urllib import quote
from webob import Request
import zlib
from zlib import compressobj
from swift.common.http import HTTP_NOT_FOUND
class UnexpectedResponse(Exception):
"""
Exception raised on invalid responses to InternalClient.make_request().
:param message: Exception message.
:param resp: The unexpected response.
"""
def __init__(self, message, resp):
super(UnexpectedResponse, self).__init__(self, message)
self.resp = resp
class CompressingFileReader(object):
"""
Wrapper for file object to compress object while reading.
Can be used to wrap file objects passed to InternalClient.upload_object().
Used in testing of InternalClient.
:param file_obj: File object to wrap.
:param compresslevel: Compression level, defaults to 9.
"""
def __init__(self, file_obj, compresslevel=9):
self._f = file_obj
self._compressor = compressobj(compresslevel, zlib.DEFLATED,
-zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
self.done = False
self.first = True
self.crc32 = 0
self.total_size = 0
def read(self, *a, **kw):
"""
Reads a chunk from the file object.
Params are passed directly to the underlying file object's read().
:returns: Compressed chunk from file object.
"""
if self.done:
return ''
x = self._f.read(*a, **kw)
if x:
self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
self.total_size += len(x)
compressed = self._compressor.compress(x)
if not compressed:
compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
compressed = self._compressor.flush(zlib.Z_FINISH)
crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
size = struct.pack("<L", self.total_size & 0xffffffffL)
footer = crc32 + size
compressed += footer
self.done = True
if self.first:
self.first = False
header = '\037\213\010\000\000\000\000\000\002\377'
compressed = header + compressed
return compressed
def __iter__(self):
return self
def next(self):
chunk = self.read()
if chunk:
return chunk
raise StopIteration
class InternalClient(object):
"""
An internal client that uses a swift proxy app to make requests to Swift.
This client will exponentially slow down for retries.
:param conf_path: Full path to proxy config.
:param user_agent: User agent to be sent to requests to Swift.
:param request_tries: Number of tries before InternalClient.make_request()
gives up.
"""
def __init__(self, conf_path, user_agent, request_tries):
self.app = loadapp('config:' + conf_path)
self.user_agent = user_agent
self.request_tries = request_tries
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
"""
Makes a request to Swift with retries.
:param method: HTTP method of request.
:param path: Path of request.
:param headers: Headers to be sent with request.
:param acceptable_statuses: List of acceptable statuses for request.
:param body_file: Body file to be passed along with request,
defaults to None.
:returns : Response object on success.
:raises UnexpectedResponse: Exception raised when make_request() fails
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
headers = dict(headers)
headers['user-agent'] = self.user_agent
resp = exc_type = exc_value = exc_traceback = None
for attempt in xrange(self.request_tries):
req = Request.blank(path, environ={'REQUEST_METHOD': method},
headers=headers)
if body_file is not None:
if hasattr(body_file, 'seek'):
body_file.seek(0)
req.body_file = body_file
try:
resp = req.get_response(self.app)
if resp.status_int in acceptable_statuses or \
resp.status_int // 100 in acceptable_statuses:
return resp
except (Exception, Timeout):
exc_type, exc_value, exc_traceback = exc_info()
sleep(2 ** (attempt + 1))
if resp:
raise UnexpectedResponse(_('Unexpected response: %s' %
(resp.status,)), resp)
if exc_type:
# To make pep8 tool happy, in place of raise t, v, tb:
raise exc_type(*exc_value.args), None, exc_traceback
def _get_metadata(self, path, metadata_prefix='',
acceptable_statuses=(2,)):
"""
Gets metadata by doing a HEAD on a path and using the metadata_prefix
to get values from the headers returned.
:param path: Path to do HEAD on.
:param metadata_prefix: Used to filter values from the headers
returned. Will strip that prefix from the
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:returns : A dict of metadata with metadata_prefix stripped from keys.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
resp = self.make_request('HEAD', path, {}, acceptable_statuses)
metadata_prefix = metadata_prefix.lower()
metadata = {}
for k, v in resp.headers.iteritems():
if k.lower().startswith(metadata_prefix):
metadata[k[len(metadata_prefix):]] = v
return metadata
def _iter_items(self, path, marker='', end_marker='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of items from a json listing. Assumes listing has
'name' key defined and uses markers.
:param path: Path to do GET on.
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
while True:
resp = self.make_request('GET',
'%s?format=json&marker=%s&end_marker=%s' %
(path, quote(marker), quote(end_marker)),
{}, acceptable_statuses)
if resp.status_int != 200:
break
data = json.loads(resp.body)
if not data:
break
for item in data:
yield item
marker = data[-1]['name']
def _set_metadata(self, path, metadata, metadata_prefix='',
acceptable_statuses=(2,)):
"""
Sets metadata on path using metadata_prefix to set values in headers of
POST request.
:param path: Path to do POST on.
:param metadata: Dict of metadata to set.
:param metadata_prefix: Prefix used to set metdata values in headers of
requets, used to prefix keys in metadata when
setting metdata, defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
headers = {}
for k, v in metadata.iteritems():
if k.lower().startswith(metadata_prefix):
headers[k] = v
else:
headers['%s%s' % (metadata_prefix, k)] = v
self.make_request('POST', path, headers, acceptable_statuses)
# account methods
def iter_containers(self, account, marker='', end_marker='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of containers dicts from an account.
:param account: Account on which to do the container listing.
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s' % (quote(account),)
return self._iter_items(path, marker, end_marker, acceptable_statuses)
def get_account_info(self, account,
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns (container_count, object_count) for an account.
:param account: Account on which to get the information.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s' % (quote(account),)
resp = self.make_request('HEAD', path, {}, acceptable_statuses)
return (int(resp.headers.get('x-account-container-count', 0)),
int(resp.headers.get('x-account-object-count', 0)))
def get_account_metadata(self, account, metadata_prefix='',
acceptable_statuses=(2,)):
"""
Gets account metadata.
:param account: Account on which to get the metadata.
:param metadata_prefix: Used to filter values from the headers
returned. Will strip that prefix from the
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:returns : Returns dict of account metadata.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s' % (quote(account))
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
def set_account_metadata(self, account, metadata, metadata_prefix='',
acceptable_statuses=(2,)):
"""
Sets account metadata. A call to this will add to the account
metadata and not overwrite all of it with values in the metadata dict.
To clear an account metadata value, pass an empty string as
the value for the key in the metadata dict.
:param account: Account on which to get the metadata.
:param metadata: Dict of metadata to set.
:param metadata_prefix: Prefix used to set metdata values in headers of
requets, used to prefix keys in metadata when
setting metdata, defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s' % (quote(account))
self._set_metadata(path, metadata, metadata_prefix,
acceptable_statuses)
# container methods
def container_exists(self, account, container):
"""
Checks to see if a container exists.
:param account: The container's account.
:param container: Container to check.
:returns : True if container exists, false otherwise.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s' % (quote(account), quote(container))
resp = self.make_request('HEAD', path, {}, (2, HTTP_NOT_FOUND))
return resp.status_int != HTTP_NOT_FOUND
def create_container(self, account, container, headers=None,
acceptable_statuses=(2,)):
"""
Creates container.
:param account: The container's account.
:param container: Container to create.
:param headers: Defaults to empty dict.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
headers = headers or {}
path = '/v1/%s/%s' % (quote(account), quote(container))
self.make_request('PUT', path, headers, acceptable_statuses)
def delete_container(self, account, container,
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Deletes a container.
:param account: The container's account.
:param container: Container to delete.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s' % (quote(account), quote(container))
self.make_request('DELETE', path, {}, acceptable_statuses)
def get_container_metadata(self, account, container, metadata_prefix='',
acceptable_statuses=(2,)):
"""
Gets container metadata.
:param account: The container's account.
:param container: Container to get metadata on.
:param metadata_prefix: Used to filter values from the headers
returned. Will strip that prefix from the
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:returns : Returns dict of container metadata.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s' % (quote(account), quote(container))
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
def iter_objects(self, account, container, marker='', end_marker='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of object dicts from a container.
:param account: The container's account.
:param container: Container to iterate objects on.
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s' % (quote(account), quote(container))
return self._iter_items(path, marker, end_marker, acceptable_statuses)
def set_container_metadata(self, account, container, metadata,
metadata_prefix='', acceptable_statuses=(2,)):
"""
Sets container metadata. A call to this will add to the container
metadata and not overwrite all of it with values in the metadata dict.
To clear a container metadata value, pass an empty string as the value
for the key in the metadata dict.
:param account: The container's account.
:param container: Container to set metadata on.
:param metadata: Dict of metadata to set.
:param metadata_prefix: Prefix used to set metdata values in headers of
requets, used to prefix keys in metadata when
setting metdata, defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s' % (quote(account), quote(container))
self._set_metadata(path, metadata, metadata_prefix,
acceptable_statuses)
# object methods
def delete_object(self, account, container, object_name,
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Deletes an object.
:param account: The object's account.
:param container: The object's container.
:param object_name: The object.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s/%s' % (quote(account), quote(container),
quote(object_name))
self.make_request('DELETE', path, {}, acceptable_statuses)
def get_object_metadata(self, account, container, object_name,
metadata_prefix='', acceptable_statuses=(2,)):
"""
Gets object metadata.
:param account: The object's account.
:param container: The object's container.
:param object_name: The object.
:param metadata_prefix: Used to filter values from the headers
returned. Will strip that prefix from the
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:returns : Dict of object metadata.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s/%s' % (quote(account), quote(container),
quote(object_name))
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
def iter_object_lines(self, account, container, object_name, headers=None,
acceptable_statuses=(2,)):
"""
Returns an iterator of object lines from an uncompressed or compressed
text object.
Uncompress object as it is read if the object's name ends with '.gz'.
:param account: The object's account.
:param container: The object's container.
:param objec_namet: The object.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
headers = headers or {}
path = '/v1/%s/%s/%s' % (quote(account), quote(container),
quote(object_name))
resp = self.make_request('GET', path, headers, acceptable_statuses)
last_part = ''
compressed = object_name.endswith('.gz')
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
for chunk in resp.app_iter:
if compressed:
chunk = d.decompress(chunk)
parts = chunk.split('\n')
if len(parts) == 1:
last_part = last_part + parts[0]
else:
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
def set_object_metadata(self, account, container, object_name, metadata,
metadata_prefix='', acceptable_statuses=(2,)):
"""
Sets an object's metadata. The object's metadata will be overwritten
by the values in the metadata dict.
:param account: The object's account.
:param container: The object's container.
:param object_name: The object.
:param metadata: Dict of metadata to set.
:param metadata_prefix: Prefix used to set metdata values in headers of
requets, used to prefix keys in metadata when
setting metdata, defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
path = '/v1/%s/%s/%s' % (quote(account), quote(container),
quote(object_name))
self._set_metadata(path, metadata, metadata_prefix,
acceptable_statuses)
def upload_object(self, fobj, account, container, object_name,
headers=None):
"""
:param fobj: File object to read object's content from.
:param account: The object's account.
:param container: The object's container.
:param object_name: The object.
:param headers: Headers to send with request, defaults ot empty dict.
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
:raises Exception: Exception is raised when code fails in an
unexpected way.
"""
headers = dict(headers or {})
headers['Transfer-Encoding'] = 'chunked'
path = '/v1/%s/%s/%s' % (quote(account), quote(container),
quote(object_name))
self.make_request('PUT', path, headers, (2,), fobj)

View File

@ -19,12 +19,12 @@ from time import time
from urllib import quote
from eventlet import sleep, Timeout
from paste.deploy import loadapp
from webob import Request
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient
from swift.common.utils import get_logger
from swift.common.http import HTTP_NO_CONTENT, HTTP_NOT_FOUND, HTTP_CONFLICT, \
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED
try:
@ -48,9 +48,10 @@ class ObjectExpirer(Daemon):
self.expiring_objects_account = \
(conf.get('auto_create_account_prefix') or '.') + \
'expiring_objects'
self.retries = int(conf.get('retries') or 3)
self.app = loadapp('config:' + (conf.get('__file__') or
'/etc/swift/object-expirer.conf'))
conf_path = conf.get('__file__') or '/etc/swift/object-expirer.conf'
request_tries = int(conf.get('request_tries') or 3)
self.swift = InternalClient(conf_path, 'Swift Object Expirer',
request_tries)
self.report_interval = int(conf.get('report_interval') or 300)
self.report_first_time = self.report_last_time = time()
self.report_objects = 0
@ -86,20 +87,26 @@ class ObjectExpirer(Daemon):
self.report_objects = 0
try:
self.logger.debug(_('Run begin'))
containers, objects = \
self.swift.get_account_info(self.expiring_objects_account)
self.logger.info(_('Pass beginning; %s possible containers; %s '
'possible objects') % self.get_account_info())
for container in self.iter_containers():
'possible objects') % (containers, objects))
for c in self.swift.iter_containers(self.expiring_objects_account):
container = c['name']
timestamp = int(container)
if timestamp > int(time()):
break
for obj in self.iter_objects(container):
for o in self.swift.iter_objects(self.expiring_objects_account,
container):
obj = o['name']
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
if timestamp > int(time()):
break
try:
self.delete_actual_object(actual_obj, timestamp)
self.delete_object(container, obj)
self.swift.delete_object(self.expiring_objects_account,
container, obj)
self.report_objects += 1
except (Exception, Timeout), err:
self.logger.exception(
@ -107,7 +114,10 @@ class ObjectExpirer(Daemon):
(container, obj, str(err)))
self.report()
try:
self.delete_container(container)
self.swift.delete_container(
self.expiring_objects_account, container,
acceptable_statuses=(2, HTTP_NOT_FOUND,
HTTP_CONFLICT))
except (Exception, Timeout), err:
self.logger.exception(
_('Exception while deleting container %s %s') %
@ -137,80 +147,6 @@ class ObjectExpirer(Daemon):
if elapsed < self.interval:
sleep(random() * (self.interval - elapsed))
def get_response(self, method, path, headers, acceptable_statuses):
headers['user-agent'] = 'Swift Object Expirer'
resp = exc_type = exc_value = exc_traceback = None
for attempt in xrange(self.retries):
req = Request.blank(path, environ={'REQUEST_METHOD': method},
headers=headers)
try:
resp = req.get_response(self.app)
if resp.status_int in acceptable_statuses or \
resp.status_int // 100 in acceptable_statuses:
return resp
except (Exception, Timeout):
exc_type, exc_value, exc_traceback = exc_info()
sleep(2 ** (attempt + 1))
if resp:
raise Exception(_('Unexpected response %s') % (resp.status,))
if exc_type:
# To make pep8 tool happy, in place of raise t, v, tb:
raise exc_type(*exc_value.args), None, exc_traceback
def get_account_info(self):
"""
Returns (container_count, object_count) tuple indicating the values for
the hidden expiration account.
"""
resp = self.get_response('HEAD',
'/v1/' + quote(self.expiring_objects_account), {},
(2, HTTP_NOT_FOUND))
if resp.status_int == HTTP_NOT_FOUND:
return (0, 0)
return (int(resp.headers['x-account-container-count']),
int(resp.headers['x-account-object-count']))
def iter_containers(self):
"""
Returns an iterator of container names of the hidden expiration account
listing.
"""
path = '/v1/%s?format=json' % (quote(self.expiring_objects_account),)
marker = ''
while True:
resp = self.get_response('GET', path + '&marker=' + quote(marker),
{}, (2, HTTP_NOT_FOUND))
if resp.status_int in (HTTP_NO_CONTENT, HTTP_NOT_FOUND):
break
data = json.loads(resp.body)
if not data:
break
for item in data:
yield item['name']
marker = data[-1]['name']
def iter_objects(self, container):
"""
Returns an iterator of object names of the hidden expiration account's
container listing for the container name given.
:param container: The name of the container to list.
"""
path = '/v1/%s/%s?format=json' % \
(quote(self.expiring_objects_account), quote(container))
marker = ''
while True:
resp = self.get_response('GET', path + '&marker=' + quote(marker),
{}, (2, HTTP_NOT_FOUND))
if resp.status_int in (HTTP_NO_CONTENT, HTTP_NOT_FOUND):
break
data = json.loads(resp.body)
if not data:
break
for item in data:
yield item['name']
marker = data[-1]['name']
def delete_actual_object(self, actual_obj, timestamp):
"""
Deletes the end-user object indicated by the actual object name given
@ -222,29 +158,6 @@ class ObjectExpirer(Daemon):
:param timestamp: The timestamp the X-Delete-At value must match to
perform the actual delete.
"""
self.get_response('DELETE', '/v1/%s' % (quote(actual_obj),),
{'X-If-Delete-At': str(timestamp)},
(2, HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED))
def delete_object(self, container, obj):
"""
Deletes an object from the hidden expiring object account.
:param container: The name of the container for the object.
:param obj: The name of the object to delete.
"""
self.get_response('DELETE',
'/v1/%s/%s/%s' % (quote(self.expiring_objects_account),
quote(container), quote(obj)),
{}, (2, HTTP_NOT_FOUND))
def delete_container(self, container):
"""
Deletes a container from the hidden expiring object account.
:param container: The name of the container to delete.
"""
self.get_response('DELETE',
'/v1/%s/%s' % (quote(self.expiring_objects_account),
quote(container)),
{}, (2, HTTP_NOT_FOUND, HTTP_CONFLICT))
self.swift.make_request('DELETE', '/v1/%s' % (quote(actual_obj),),
{'X-If-Delete-At': str(timestamp)}, (2, HTTP_NOT_FOUND,
HTTP_PRECONDITION_FAILED))

View File

@ -18,6 +18,7 @@ from sys import exc_info
from time import time
from unittest import main, TestCase
from swift.common import internal_client
from swift.obj import expirer
from swift.proxy.server import Application
@ -51,34 +52,20 @@ class MockLogger(object):
self.exceptions.append('%s: %s' % (msg, exc_info()[1]))
class FakeRing(object):
def __init__(self):
self.devs = {1: {'ip': '10.0.0.1', 'port': 1000, 'device': 'sda'},
2: {'ip': '10.0.0.2', 'port': 1000, 'device': 'sda'},
3: {'ip': '10.0.0.3', 'port': 1000, 'device': 'sda'},
4: {'ip': '10.0.0.4', 'port': 1000, 'device': 'sda'}}
self.replica_count = 3
def get_nodes(self, account, container=None, obj=None):
return 1, [self.devs[i] for i in xrange(1, self.replica_count + 1)]
def get_part_nodes(self, part):
return self.get_nodes('')[1]
def get_more_nodes(self, nodes):
yield self.devs[self.replica_count + 1]
class TestObjectExpirer(TestCase):
def setUp(self):
self.orig_loadapp = expirer.loadapp
expirer.loadapp = lambda x: Application({}, account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
global not_sleep
def tearDown(self):
expirer.loadapp = self.orig_loadapp
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda x: None
internal_client.sleep = not_sleep
def teardown(self):
internal_client.sleep = self.old_sleep
internal_client.loadapp = self.loadapp
def test_report(self):
x = expirer.ObjectExpirer({})
@ -102,16 +89,23 @@ class TestObjectExpirer(TestCase):
def test_run_once_nothing_to_do(self):
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = 'throw error because a string is not callable'
x.swift = 'throw error because a string does not have needed methods'
x.run_once()
self.assertEquals(x.logger.exceptions,
["Unhandled exception: 'str' object is not callable"])
["Unhandled exception: 'str' object has no attribute "
"'get_account_info'"])
def test_run_once_calls_report(self):
class InternalClient(object):
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(*a, **kw):
return []
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: []
x.swift = InternalClient()
x.run_once()
self.assertEquals(x.logger.exceptions, [])
self.assertEquals(x.logger.infos,
@ -119,15 +113,22 @@ class TestObjectExpirer(TestCase):
'Pass completed in 0s; 0 objects expired'])
def test_container_timestamp_break(self):
class InternalClient(object):
def __init__(self, containers):
self.containers = containers
def should_not_get_called(container):
raise Exception('This should not have been called')
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(self, *a, **kw):
return self.containers
def iter_objects(*a, **kw):
raise Exception('This should not have been called')
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() + 86400))]
x.iter_objects = should_not_get_called
x.swift = InternalClient([{'name': str(int(time() + 86400))}])
x.run_once()
self.assertEquals(x.logger.exceptions, [])
self.assertEquals(x.logger.infos,
@ -137,26 +138,37 @@ class TestObjectExpirer(TestCase):
# Reverse test to be sure it still would blow up the way expected.
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
x.iter_objects = should_not_get_called
x.swift = InternalClient([{'name': str(int(time() - 86400))}])
x.run_once()
self.assertEquals(x.logger.exceptions,
['Unhandled exception: This should not have been called'])
def test_object_timestamp_break(self):
class InternalClient(object):
def __init__(self, containers, objects):
self.containers = containers
self.objects = objects
def should_not_get_called(actual_obj, timestamp):
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(self, *a, **kw):
return self.containers
def delete_container(*a, **kw):
pass
def iter_objects(self, *a, **kw):
return self.objects
def should_not_be_called(*a, **kw):
raise Exception('This should not have been called')
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
x.iter_objects = lambda c: ['%d-actual-obj' % int(time() + 86400)]
x.delete_actual_object = should_not_get_called
x.delete_container = lambda c: None
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() + 86400)}])
x.delete_actual_object = should_not_be_called
x.run_once()
self.assertEquals(x.logger.exceptions, [])
self.assertEquals(x.logger.infos,
@ -166,18 +178,35 @@ class TestObjectExpirer(TestCase):
# Reverse test to be sure it still would blow up the way expected.
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
ts = int(time() - 86400)
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
x.delete_actual_object = should_not_get_called
x.delete_container = lambda c: None
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.delete_actual_object = should_not_be_called
x.run_once()
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
'object %d %d-actual-obj This should not have been called: This '
'should not have been called' % (ts, ts)])
def test_failed_delete_keeps_entry(self):
class InternalClient(object):
def __init__(self, containers, objects):
self.containers = containers
self.objects = objects
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(self, *a, **kw):
return self.containers
def delete_container(*a, **kw):
pass
def delete_object(*a, **kw):
raise Exception('This should not have been called')
def iter_objects(self, *a, **kw):
return self.objects
def deliberately_blow_up(actual_obj, timestamp):
raise Exception('failed to delete actual object')
@ -187,13 +216,11 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
ts = int(time() - 86400)
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
x.delete_actual_object = deliberately_blow_up
x.delete_object = should_not_get_called
x.delete_container = lambda c: None
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
'object %d %d-actual-obj failed to delete actual object: failed '
@ -205,28 +232,42 @@ class TestObjectExpirer(TestCase):
# Reverse test to be sure it still would blow up the way expected.
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
ts = int(time() - 86400)
x.iter_objects = lambda c: ['%d-actual-obj' % ts]
x.delete_actual_object = lambda o, t: None
x.delete_object = should_not_get_called
x.delete_container = lambda c: None
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.exceptions, ['Exception while deleting '
'object %d %d-actual-obj This should not have been called: This '
'should not have been called' % (ts, ts)])
def test_success_gets_counted(self):
class InternalClient(object):
def __init__(self, containers, objects):
self.containers = containers
self.objects = objects
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(self, *a, **kw):
return self.containers
def delete_container(*a, **kw):
pass
def delete_object(*a, **kw):
pass
def iter_objects(self, *a, **kw):
return self.objects
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
x.iter_containers = lambda: [str(int(time() - 86400))]
x.iter_objects = lambda c: ['%d-actual-obj' % int(time() - 86400)]
x.delete_actual_object = lambda o, t: None
x.delete_object = lambda c, o: None
x.delete_container = lambda c: None
self.assertEquals(x.report_objects, 0)
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() - 86400)}])
x.run_once()
self.assertEquals(x.report_objects, 1)
self.assertEquals(x.logger.exceptions, [])
@ -235,23 +276,47 @@ class TestObjectExpirer(TestCase):
'Pass completed in 0s; 1 objects expired'])
def test_failed_delete_continues_on(self):
class InternalClient(object):
def __init__(self, containers, objects):
self.containers = containers
self.objects = objects
def get_account_info(*a, **kw):
return 1, 2
def iter_containers(self, *a, **kw):
return self.containers
def delete_container(*a, **kw):
raise Exception('failed to delete container')
def delete_object(*a, **kw):
pass
def iter_objects(self, *a, **kw):
return self.objects
def fail_delete_actual_object(actual_obj, timestamp):
raise Exception('failed to delete actual object')
def fail_delete_container(container):
raise Exception('failed to delete container')
x = expirer.ObjectExpirer({})
x.logger = MockLogger()
x.get_account_info = lambda: (1, 2)
cts = int(time() - 86400)
x.iter_containers = lambda: [str(cts), str(cts + 1)]
ots = int(time() - 86400)
x.iter_objects = lambda c: ['%d-actual-obj' % ots, '%d-next-obj' % ots]
containers = [
{'name': str(cts)},
{'name': str(cts + 1)},
]
objects = [
{'name': '%d-actual-obj' % ots},
{'name': '%d-next-obj' % ots}
]
x.swift = InternalClient(containers, objects)
x.delete_actual_object = fail_delete_actual_object
x.delete_object = lambda c, o: None
x.delete_container = fail_delete_container
x.run_once()
self.assertEquals(x.logger.exceptions, [
'Exception while deleting object %d %d-actual-obj failed to '
@ -320,179 +385,7 @@ class TestObjectExpirer(TestCase):
expirer.sleep = orig_sleep
self.assertEquals(str(err), 'exiting exception 2')
self.assertEquals(x.logger.exceptions,
['Unhandled exception: exception 1'])
def test_get_response_sets_user_agent(self):
env_given = [None]
def fake_app(env, start_response):
env_given[0] = env
start_response('200 Ok', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
resp = x.get_response('GET', '/', {}, (200,))
self.assertEquals(env_given[0]['HTTP_USER_AGENT'],
'Swift Object Expirer')
def test_get_response_retries(self):
global last_not_sleep
tries = [0]
def fake_app(env, start_response):
tries[0] += 1
if tries[0] < 3:
start_response('500 Internal Server Error',
[('Content-Length', '0')])
else:
start_response('200 Ok', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
orig_sleep = expirer.sleep
try:
expirer.sleep = not_sleep
resp = x.get_response('GET', '/', {}, (200,))
finally:
expirer.sleep = orig_sleep
self.assertEquals(tries[0], 3)
self.assertEquals(last_not_sleep, 4)
def test_get_response_method_path_headers(self):
env_given = [None]
def fake_app(env, start_response):
env_given[0] = env
start_response('200 Ok', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
for method in ('GET', 'SOMETHINGELSE'):
resp = x.get_response(method, '/', {}, (200,))
self.assertEquals(env_given[0]['REQUEST_METHOD'], method)
for path in ('/one', '/two/three'):
resp = x.get_response('GET', path, {'X-Test': path}, (200,))
self.assertEquals(env_given[0]['PATH_INFO'], path)
self.assertEquals(env_given[0]['HTTP_X_TEST'], path)
def test_get_response_codes(self):
def fake_app(env, start_response):
start_response('200 Ok', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
orig_sleep = expirer.sleep
try:
expirer.sleep = not_sleep
resp = x.get_response('GET', '/', {}, (200,))
resp = x.get_response('GET', '/', {}, (2,))
resp = x.get_response('GET', '/', {}, (400, 200))
resp = x.get_response('GET', '/', {}, (400, 2))
try:
resp = x.get_response('GET', '/', {}, (400,))
except Exception, err:
exc = err
self.assertEquals(str(err), 'Unexpected response 200 Ok')
try:
resp = x.get_response('GET', '/', {}, (201,))
except Exception, err:
exc = err
self.assertEquals(str(err), 'Unexpected response 200 Ok')
finally:
expirer.sleep = orig_sleep
def test_get_account_info(self):
def fake_app(env, start_response):
start_response('200 Ok', [('Content-Length', '0'),
('X-Account-Container-Count', '80'),
('X-Account-Object-Count', '90')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(x.get_account_info(), (80, 90))
def test_get_account_info_handles_404(self):
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(x.get_account_info(), (0, 0))
def test_iter_containers(self):
calls = [0]
def fake_app(env, start_response):
calls[0] += 1
if calls[0] == 1:
body = json.dumps([{'name': 'one'}, {'name': 'two'}])
start_response('200 Ok', [('Content-Length', str(len(body)))])
return [body]
elif calls[0] == 2:
body = json.dumps([{'name': 'three'}, {'name': 'four'}])
start_response('200 Ok', [('Content-Length', str(len(body)))])
return [body]
elif calls[0] == 3:
start_response('204 Ok', [('Content-Length', '0')])
return []
raise Exception('Should not get here')
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(list(x.iter_containers()),
['one', 'two', 'three', 'four'])
def test_iter_containers_handles_404(self):
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(list(x.iter_containers()), [])
def test_iter_objects(self):
calls = [0]
def fake_app(env, start_response):
calls[0] += 1
if calls[0] == 1:
body = json.dumps([{'name': 'one'}, {'name': 'two'}])
start_response('200 Ok', [('Content-Length', str(len(body)))])
return [body]
elif calls[0] == 2:
body = json.dumps([{'name': 'three'}, {'name': 'four'}])
start_response('200 Ok', [('Content-Length', str(len(body)))])
return [body]
elif calls[0] == 3:
start_response('204 Ok', [('Content-Length', '0')])
return []
raise Exception('Should not get here')
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(list(x.iter_objects('container')),
['one', 'two', 'three', 'four'])
def test_iter_objects_handles_404(self):
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
self.assertEquals(list(x.iter_objects('container')), [])
['Unhandled exception: exception 1'])
def test_delete_actual_object(self):
got_env = [None]
@ -502,8 +395,9 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
x = expirer.ObjectExpirer({})
x.app = fake_app
ts = '1234'
x.delete_actual_object('/path/to/object', ts)
self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
@ -514,8 +408,9 @@ class TestObjectExpirer(TestCase):
start_response('404 Not Found', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_actual_object('/path/to/object', '1234')
def test_delete_actual_object_handles_412(self):
@ -525,127 +420,29 @@ class TestObjectExpirer(TestCase):
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_actual_object('/path/to/object', '1234')
def test_delete_actual_object_does_not_handle_odd_stuff(self):
def fake_app(env, start_response):
start_response('503 Internal Server Error',
[('Content-Length', '0')])
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda x: fake_app
x = expirer.ObjectExpirer({})
x.app = fake_app
orig_sleep = expirer.sleep
exc = None
try:
expirer.sleep = not_sleep
x.delete_actual_object('/path/to/object', '1234')
except Exception, err:
exc = err
finally:
expirer.sleep = orig_sleep
self.assertEquals(str(err),
'Unexpected response 503 Internal Server Error')
def test_delete_object(self):
got_env = [None]
def fake_app(env, start_response):
got_env[0] = env
start_response('204 No Content', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_object('container', 'object')
def test_delete_object_handles_404(self):
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_object('container', 'object')
def test_delete_object_does_not_handle_odd_stuff(self):
def fake_app(env, start_response):
start_response('503 Internal Server Error',
[('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
orig_sleep = expirer.sleep
exc = None
try:
expirer.sleep = not_sleep
x.delete_object('container', 'object')
except Exception, err:
exc = err
finally:
expirer.sleep = orig_sleep
self.assertEquals(str(err),
'Unexpected response 503 Internal Server Error')
def test_delete_container(self):
got_env = [None]
def fake_app(env, start_response):
got_env[0] = env
start_response('204 No Content', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_container('container')
def test_delete_container_handles_404(self):
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_container('container')
def test_delete_container_handles_409(self):
def fake_app(env, start_response):
start_response('409 Conflict', [('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
x.delete_container('container')
def test_delete_container_does_not_handle_odd_stuff(self):
def fake_app(env, start_response):
start_response('503 Internal Server Error',
[('Content-Length', '0')])
return []
x = expirer.ObjectExpirer({})
x.app = fake_app
orig_sleep = expirer.sleep
exc = None
try:
expirer.sleep = not_sleep
x.delete_container('container')
except Exception, err:
exc = err
finally:
expirer.sleep = orig_sleep
self.assertEquals(str(err),
'Unexpected response 503 Internal Server Error')
pass
self.assertEquals(503, exc.resp.status_int)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,794 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from StringIO import StringIO
import unittest
import zlib
from swift.common import internal_client
def not_sleep(seconds):
pass
class GetMetadataInternalClient(internal_client.InternalClient):
def __init__(self, test, path, metadata_prefix, acceptable_statuses):
self.test = test
self.path = path
self.metadata_prefix = metadata_prefix
self.acceptable_statuses = acceptable_statuses
self.get_metadata_called = 0
self.metadata = 'some_metadata'
def _get_metadata(self, path, metadata_prefix, acceptable_statuses=None):
self.get_metadata_called += 1
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.metadata_prefix, metadata_prefix)
self.test.assertEquals(self.acceptable_statuses, acceptable_statuses)
return self.metadata
class SetMetadataInternalClient(internal_client.InternalClient):
def __init__(self, test, path, metadata, metadata_prefix,
acceptable_statuses):
self.test = test
self.path = path
self.metadata = metadata
self.metadata_prefix = metadata_prefix
self.acceptable_statuses = acceptable_statuses
self.set_metadata_called = 0
self.metadata = 'some_metadata'
def _set_metadata(self, path, metadata, metadata_prefix='',
acceptable_statuses=None):
self.set_metadata_called += 1
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.metadata_prefix, metadata_prefix)
self.test.assertEquals(self.metadata, metadata)
self.test.assertEquals(self.acceptable_statuses, acceptable_statuses)
class IterInternalClient(internal_client.InternalClient):
def __init__(self, test, path, marker, end_marker, acceptable_statuses,
items):
self.test = test
self.path = path
self.marker = marker
self.end_marker = end_marker
self.acceptable_statuses = acceptable_statuses
self.items = items
def _iter_items(self, path, marker='', end_marker='',
acceptable_statuses=None):
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.marker, marker)
self.test.assertEquals(self.end_marker, end_marker)
self.test.assertEquals(self.acceptable_statuses, acceptable_statuses)
for item in self.items:
yield item
class TestCompressingfileReader(unittest.TestCase):
def test_init(self):
class CompressObj(object):
def __init__(self, test, *args):
self.test = test
self.args = args
def method(self, *args):
self.test.assertEquals(self.args, args)
return self
try:
compressobj = CompressObj(self, 9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
old_compressobj = internal_client.compressobj
internal_client.compressobj = compressobj.method
f = StringIO('')
fobj = internal_client.CompressingFileReader(f)
self.assertEquals(f, fobj._f)
self.assertEquals(compressobj, fobj._compressor)
self.assertEquals(False, fobj.done)
self.assertEquals(True, fobj.first)
self.assertEquals(0, fobj.crc32)
self.assertEquals(0, fobj.total_size)
finally:
internal_client.compressobj = old_compressobj
def test_read(self):
exp_data = 'abcdefghijklmnopqrstuvwxyz'
fobj = internal_client.CompressingFileReader(StringIO(exp_data))
data = ''
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
for chunk in fobj.read():
data += d.decompress(chunk)
self.assertEquals(exp_data, data)
class TestInternalClient(unittest.TestCase):
def test_init(self):
class App(object):
def __init__(self, test, conf_path):
self.test = test
self.conf_path = conf_path
self.load_called = 0
def load(self, uri):
self.load_called += 1
self.test.assertEquals('config:' + conf_path, uri)
return self
conf_path = 'some_path'
app = App(self, conf_path)
old_loadapp = internal_client.loadapp
internal_client.loadapp = app.load
user_agent = 'some_user_agent'
request_tries = 'some_request_tries'
try:
client = internal_client.InternalClient(conf_path, user_agent,
request_tries)
finally:
internal_client.loadapp = old_loadapp
self.assertEquals(1, app.load_called)
self.assertEquals(app, client.app)
self.assertEquals(user_agent, client.user_agent)
self.assertEquals(request_tries, client.request_tries)
def test_make_request_sets_user_agent(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test):
self.test = test
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 1
def fake_app(self, env, start_response):
self.test.assertEquals(self.user_agent, env['HTTP_USER_AGENT'])
start_response('200 Ok', [{'Content-Length': '0'}])
return []
client = InternalClient(self)
client.make_request('GET', '/', {}, (200,))
def test_make_request_retries(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test):
self.test = test
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 4
self.tries = 0
self.sleep_called = 0
def fake_app(self, env, start_response):
self.tries += 1
if self.tries < self.request_tries:
start_response('500 Internal Server Error',
[('Content-Length', '0')])
else:
start_response('200 Ok', [('Content-Length', '0')])
return []
def sleep(self, seconds):
self.sleep_called += 1
self.test.assertEquals(2 ** (self.sleep_called), seconds)
client = InternalClient(self)
old_sleep = internal_client.sleep
internal_client.sleep = client.sleep
try:
client.make_request('GET', '/', {}, (200,))
finally:
internal_client.sleep = old_sleep
self.assertEquals(3, client.sleep_called)
self.assertEquals(4, client.tries)
def test_make_request_method_path_headers(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
self.env = None
def fake_app(self, env, start_response):
self.env = env
start_response('200 Ok', [('Content-Length', '0')])
return []
client = InternalClient()
for method in 'GET PUT HEAD'.split():
client.make_request(method, '/', {}, (200,))
self.assertEquals(client.env['REQUEST_METHOD'], method)
for path in '/one /two/three'.split():
client.make_request('GET', path, {'X-Test': path}, (200,))
self.assertEquals(client.env['PATH_INFO'], path)
self.assertEquals(client.env['HTTP_X_TEST'], path)
def test_make_request_codes(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
start_response('200 Ok', [('Content-Length', '0')])
return []
client = InternalClient()
try:
old_sleep = internal_client.sleep
internal_client.sleep = not_sleep
client.make_request('GET', '/', {}, (200,))
client.make_request('GET', '/', {}, (2,))
client.make_request('GET', '/', {}, (400, 200))
client.make_request('GET', '/', {}, (400, 2))
try:
client.make_request('GET', '/', {}, (400,))
except Exception, err:
exc = err
self.assertEquals(200, err.resp.status_int)
try:
client.make_request('GET', '/', {}, (201,))
except Exception, err:
exc = err
self.assertEquals(200, err.resp.status_int)
finally:
internal_client.sleep = old_sleep
def test_make_request_calls_fobj_seek_each_try(self):
class FileObject(object):
def __init__(self, test):
self.test = test
self.seek_called = 0
def seek(self, offset, whence=0):
self.seek_called += 1
self.test.assertEquals(0, offset)
self.test.assertEquals(0, whence)
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
fobj = FileObject(self)
client = InternalClient()
try:
old_sleep = internal_client.sleep
internal_client.sleep = not_sleep
try:
client.make_request('PUT', '/', {}, (2,), fobj)
except Exception, err:
exc = err
self.assertEquals(404, err.resp.status_int)
finally:
internal_client.sleep = old_sleep
self.assertEquals(client.request_tries, fobj.seek_called)
def test_make_request_request_exception(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
raise Exception()
client = InternalClient()
try:
old_sleep = internal_client.sleep
internal_client.sleep = not_sleep
self.assertRaises(Exception,
client.make_request, 'GET', '/', {}, (2,))
finally:
internal_client.sleep = old_sleep
def test_get_metadata(self):
class Response(object):
def __init__(self, headers):
self.headers = headers
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, resp_headers):
self.test = test
self.path = path
self.resp_headers = resp_headers
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('HEAD', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals((2,), acceptable_statuses)
self.test.assertEquals(None, body_file)
return Response(self.resp_headers)
path = 'some_path'
metadata_prefix = 'some_key-'
resp_headers = {
'%sone' % (metadata_prefix): '1',
'%stwo' % (metadata_prefix): '2',
'%sthree' % (metadata_prefix): '3',
'some_header-four': '4',
'some_header-five': '5',
}
exp_metadata = {
'one': '1',
'two': '2',
'three': '3',
}
client = InternalClient(self, path, resp_headers)
metadata = client._get_metadata(path, metadata_prefix)
self.assertEquals(exp_metadata, metadata)
self.assertEquals(1, client.make_request_called)
def test_iter_items(self):
class Response(object):
def __init__(self, status_int, body):
self.status_int = status_int
self.body = body
class InternalClient(internal_client.InternalClient):
def __init__(self, test, responses):
self.test = test
self.responses = responses
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
return self.responses.pop(0)
exp_items = []
responses = [Response(200, json.dumps([])), ]
items = []
client = InternalClient(self, responses)
for item in client._iter_items('/'):
items.append(item)
self.assertEquals(exp_items, items)
exp_items = []
responses = []
for i in xrange(3):
data = [{'name': 'item%02d' % (2 * i)},
{'name': 'item%02d' % (2 * i + 1)}]
responses.append(Response(200, json.dumps(data)))
exp_items.extend(data)
responses.append(Response(204, ''))
items = []
client = InternalClient(self, responses)
for item in client._iter_items('/'):
items.append(item)
self.assertEquals(exp_items, items)
def test_iter_items_with_markers(self):
class Response(object):
def __init__(self, status_int, body):
self.status_int = status_int
self.body = body
class InternalClient(internal_client.InternalClient):
def __init__(self, test, paths, responses):
self.test = test
self.paths = paths
self.responses = responses
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
exp_path = self.paths.pop(0)
self.test.assertEquals(exp_path, path)
return self.responses.pop(0)
paths = [
'/?format=json&marker=start&end_marker=end',
'/?format=json&marker=one&end_marker=end',
'/?format=json&marker=two&end_marker=end',
]
responses = [
Response(200, json.dumps([{'name': 'one'}, ])),
Response(200, json.dumps([{'name': 'two'}, ])),
Response(204, ''),
]
items = []
client = InternalClient(self, paths, responses)
for item in client._iter_items('/', marker='start', end_marker='end'):
items.append(item['name'])
self.assertEquals('one two'.split(), items)
def test_set_metadata(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, exp_headers):
self.test = test
self.path = path
self.exp_headers = exp_headers
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('POST', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.exp_headers, headers)
self.test.assertEquals((2,), acceptable_statuses)
self.test.assertEquals(None, body_file)
path = 'some_path'
metadata_prefix = 'some_key-'
metadata = {
'%sone' % (metadata_prefix): '1',
'%stwo' % (metadata_prefix): '2',
'three': '3',
}
exp_headers = {
'%sone' % (metadata_prefix): '1',
'%stwo' % (metadata_prefix): '2',
'%sthree' % (metadata_prefix): '3',
}
client = InternalClient(self, path, exp_headers)
client._set_metadata(path, metadata, metadata_prefix)
self.assertEquals(1, client.make_request_called)
def test_iter_containers(self):
account = 'some_account'
path = '/v1/%s' % (account)
items = '0 1 2'.split()
marker = 'some_marker'
end_marker = 'some_end_marker'
acceptable_statuses = 'some_status_list'
client = IterInternalClient(self, path, marker, end_marker,
acceptable_statuses, items)
ret_items = []
for container in client.iter_containers(account, marker, end_marker,
acceptable_statuses=acceptable_statuses):
ret_items.append(container)
self.assertEquals(items, ret_items)
def test_get_account_info(self):
class Response(object):
def __init__(self, containers, objects):
self.headers = {
'x-account-container-count': containers,
'x-account-object-count': objects,
}
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, resp):
self.test = test
self.path = path
self.resp = resp
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.test.assertEquals('HEAD', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals({}, headers)
self.test.assertEquals((2, 404), acceptable_statuses)
self.test.assertEquals(None, body_file)
return self.resp
account = 'some_account'
path = '/v1/%s' % (account)
containers, objects = 10, 100
client = InternalClient(self, path, Response(containers, objects))
info = client.get_account_info(account)
self.assertEquals((containers, objects), info)
def test_get_account_metadata(self):
account = 'some_account'
path = '/v1/%s' % (account)
acceptable_statuses = 'some_status_list'
metadata_prefix = 'some_metadata_prefix'
client = GetMetadataInternalClient(self, path, metadata_prefix,
acceptable_statuses)
metadata = client.get_account_metadata(account, metadata_prefix,
acceptable_statuses)
self.assertEquals(client.metadata, metadata)
self.assertEquals(1, client.get_metadata_called)
def test_set_account_metadata(self):
account = 'some_account'
path = '/v1/%s' % (account)
metadata = 'some_metadata'
metadata_prefix = 'some_metadata_prefix'
acceptable_statuses = 'some_status_list'
client = SetMetadataInternalClient(self, path, metadata,
metadata_prefix, acceptable_statuses)
client.set_account_metadata(account, metadata, metadata_prefix,
acceptable_statuses)
self.assertEquals(1, client.set_metadata_called)
def test_container_exists(self):
class Response(object):
def __init__(self, status_int):
self.status_int = status_int
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, resp):
self.test = test
self.path = path
self.make_request_called = 0
self.resp = resp
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('HEAD', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals({}, headers)
self.test.assertEquals((2, 404), acceptable_statuses)
self.test.assertEquals(None, body_file)
return self.resp
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
client = InternalClient(self, path, Response(200))
self.assertEquals(True, client.container_exists(account, container))
self.assertEquals(1, client.make_request_called)
client = InternalClient(self, path, Response(404))
self.assertEquals(False, client.container_exists(account, container))
self.assertEquals(1, client.make_request_called)
def test_create_container(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, headers):
self.test = test
self.path = path
self.headers = headers
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('PUT', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.headers, headers)
self.test.assertEquals((2,), acceptable_statuses)
self.test.assertEquals(None, body_file)
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
headers = 'some_headers'
client = InternalClient(self, path, headers)
client.create_container(account, container, headers)
self.assertEquals(1, client.make_request_called)
def test_delete_container(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path):
self.test = test
self.path = path
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('DELETE', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals({}, headers)
self.test.assertEquals((2, 404), acceptable_statuses)
self.test.assertEquals(None, body_file)
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
client = InternalClient(self, path)
client.delete_container(account, container)
self.assertEquals(1, client.make_request_called)
def test_get_container_metadata(self):
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
metadata_prefix = 'some_metadata_prefix'
acceptable_statuses = 'some_status_list'
client = GetMetadataInternalClient(self, path, metadata_prefix,
acceptable_statuses)
metadata = client.get_container_metadata(account, container,
metadata_prefix, acceptable_statuses)
self.assertEquals(client.metadata, metadata)
self.assertEquals(1, client.get_metadata_called)
def test_iter_objects(self):
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
marker = 'some_maker'
end_marker = 'some_end_marker'
acceptable_statuses = 'some_status_list'
items = '0 1 2'.split()
client = IterInternalClient(self, path, marker, end_marker,
acceptable_statuses, items)
ret_items = []
for obj in client.iter_objects(account, container, marker, end_marker,
acceptable_statuses):
ret_items.append(obj)
self.assertEquals(items, ret_items)
def test_set_container_metadata(self):
account = 'some_account'
container = 'some_container'
path = '/v1/%s/%s' % (account, container)
metadata = 'some_metadata'
metadata_prefix = 'some_metadata_prefix'
acceptable_statuses = 'some_status_list'
client = SetMetadataInternalClient(self, path, metadata,
metadata_prefix, acceptable_statuses)
client.set_container_metadata(account, container, metadata,
metadata_prefix, acceptable_statuses)
self.assertEquals(1, client.set_metadata_called)
def test_delete_object(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path):
self.test = test
self.path = path
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals('DELETE', method)
self.test.assertEquals(self.path, path)
self.test.assertEquals({}, headers)
self.test.assertEquals((2, 404), acceptable_statuses)
self.test.assertEquals(None, body_file)
account = 'some_account'
container = 'some_container'
object_name = 'some_object'
path = '/v1/%s/%s/%s' % (account, container, object_name)
client = InternalClient(self, path)
client.delete_object(account, container, object_name)
self.assertEquals(1, client.make_request_called)
def test_get_object_metadata(self):
account = 'some_account'
container = 'some_container'
object_name = 'some_object'
path = '/v1/%s/%s/%s' % (account, container, object_name)
metadata_prefix = 'some_metadata_prefix'
acceptable_statuses = 'some_status_list'
client = GetMetadataInternalClient(self, path, metadata_prefix,
acceptable_statuses)
metadata = client.get_object_metadata(account, container, object_name,
metadata_prefix, acceptable_statuses)
self.assertEquals(client.metadata, metadata)
self.assertEquals(1, client.get_metadata_called)
def test_iter_object_lines(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, lines):
self.lines = lines
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
start_response('200 Ok', [('Content-Length', '0')])
return ['%s\n' % x for x in self.lines]
lines = 'line1 line2 line3'.split()
client = InternalClient(lines)
ret_lines = []
for line in client.iter_object_lines('account', 'container', 'object'):
ret_lines.append(line)
self.assertEquals(lines, ret_lines)
def test_iter_object_lines_compressed_object(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, lines):
self.lines = lines
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
start_response('200 Ok', [('Content-Length', '0')])
return internal_client.CompressingFileReader(
StringIO('\n'.join(self.lines)))
lines = 'line1 line2 line3'.split()
client = InternalClient(lines)
ret_lines = []
for line in client.iter_object_lines('account', 'container',
'object.gz'):
ret_lines.append(line)
self.assertEquals(lines, ret_lines)
def test_set_object_metadata(self):
account = 'some_account'
container = 'some_container'
object_name = 'some_object'
path = '/v1/%s/%s/%s' % (account, container, object_name)
metadata = 'some_metadata'
metadata_prefix = 'some_metadata_prefix'
acceptable_statuses = 'some_status_list'
client = SetMetadataInternalClient(self, path, metadata,
metadata_prefix, acceptable_statuses)
client.set_object_metadata(account, container, object_name, metadata,
metadata_prefix, acceptable_statuses)
self.assertEquals(1, client.set_metadata_called)
def test_upload_object(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, headers, fobj):
self.test = test
self.path = path
self.headers = headers
self.fobj = fobj
self.make_request_called = 0
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None):
self.make_request_called += 1
self.test.assertEquals(self.path, path)
exp_headers = dict(self.headers)
exp_headers['Transfer-Encoding'] = 'chunked'
self.test.assertEquals(exp_headers, headers)
self.test.assertEquals(self.fobj, fobj)
fobj = 'some_fobj'
account = 'some_account'
container = 'some_container'
object_name = 'some_object'
path = '/v1/%s/%s/%s' % (account, container, object_name)
headers = {'key': 'value'}
client = InternalClient(self, path, headers, fobj)
client.upload_object(fobj, account, container, object_name, headers)
self.assertEquals(1, client.make_request_called)
if __name__ == '__main__':
unittest.main()