swift/test/unit/__init__.py

270 lines
6.2 KiB
Python

""" Swift tests """
import sys
import os
import copy
import logging
from sys import exc_info
from contextlib import contextmanager
from collections import defaultdict
from tempfile import NamedTemporaryFile
from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
from test import get_config
from ConfigParser import MissingSectionHeaderError
from StringIO import StringIO
from swift.common.utils import readconf, config_true_value
from logging import Handler
import logging.handlers
def readuntil2crlfs(fd):
rv = ''
lc = ''
crlfs = 0
while crlfs < 2:
c = fd.read(1)
rv = rv + c
if c == '\r' and lc != '\n':
crlfs = 0
if lc == '\r' and c == '\n':
crlfs += 1
lc = c
return rv
def connect_tcp(hostport):
rv = socket.socket()
rv.connect(hostport)
return rv
@contextmanager
def tmpfile(content):
with NamedTemporaryFile('w', delete=False) as f:
file_name = f.name
f.write(str(content))
try:
yield file_name
finally:
os.unlink(file_name)
xattr_data = {}
def _get_inode(fd):
if not isinstance(fd, int):
try:
fd = fd.fileno()
except AttributeError:
return os.stat(fd).st_ino
return os.fstat(fd).st_ino
def _setxattr(fd, k, v):
inode = _get_inode(fd)
data = xattr_data.get(inode, {})
data[k] = v
xattr_data[inode] = data
def _getxattr(fd, k):
inode = _get_inode(fd)
data = xattr_data.get(inode, {}).get(k)
if not data:
raise IOError
return data
import xattr
xattr.setxattr = _setxattr
xattr.getxattr = _getxattr
@contextmanager
def temptree(files, contents=''):
# generate enough contents to fill the files
c = len(files)
contents = (list(contents) + [''] * c)[:c]
tempdir = mkdtemp()
for path, content in zip(files, contents):
if os.path.isabs(path):
path = '.' + path
new_path = os.path.join(tempdir, path)
subdir = os.path.dirname(new_path)
if not os.path.exists(subdir):
os.makedirs(subdir)
with open(new_path, 'w') as f:
f.write(str(content))
try:
yield tempdir
finally:
rmtree(tempdir)
class NullLoggingHandler(logging.Handler):
def emit(self, record):
pass
class FakeLogger(object):
# a thread safe logger
def __init__(self, *args, **kwargs):
self._clear()
self.level = logging.NOTSET
if 'facility' in kwargs:
self.facility = kwargs['facility']
def _clear(self):
self.log_dict = defaultdict(list)
def _store_in(store_name):
def stub_fn(self, *args, **kwargs):
self.log_dict[store_name].append((args, kwargs))
return stub_fn
error = _store_in('error')
info = _store_in('info')
warning = _store_in('warning')
debug = _store_in('debug')
def exception(self, *args, **kwargs):
self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
# mock out the StatsD logging methods:
increment = _store_in('increment')
decrement = _store_in('decrement')
timing = _store_in('timing')
timing_since = _store_in('timing_since')
update_stats = _store_in('update_stats')
set_statsd_prefix = _store_in('set_statsd_prefix')
def get_increments(self):
return [call[0][0] for call in self.log_dict['increment']]
def get_increment_counts(self):
counts = {}
for metric in self.get_increments():
if metric not in counts:
counts[metric] = 0
counts[metric] += 1
return counts
def setFormatter(self, obj):
self.formatter = obj
def close(self):
self._clear()
def set_name(self, name):
# don't touch _handlers
self._name = name
def acquire(self):
pass
def release(self):
pass
def createLock(self):
pass
def emit(self, record):
pass
def handle(self, record):
pass
def flush(self):
pass
def handleError(self, record):
pass
original_syslog_handler = logging.handlers.SysLogHandler
def fake_syslog_handler():
for attr in dir(original_syslog_handler):
if attr.startswith('LOG'):
setattr(FakeLogger, attr,
copy.copy(getattr(logging.handlers.SysLogHandler, attr)))
FakeLogger.priority_map = \
copy.deepcopy(logging.handlers.SysLogHandler.priority_map)
logging.handlers.SysLogHandler = FakeLogger
if config_true_value(get_config('unit_test').get('fake_syslog', 'False')):
fake_syslog_handler()
class MockTrue(object):
"""
Instances of MockTrue evaluate like True
Any attr accessed on an instance of MockTrue will return a MockTrue
instance. Any method called on an instance of MockTrue will return
a MockTrue instance.
>>> thing = MockTrue()
>>> thing
True
>>> thing == True # True == True
True
>>> thing == False # True == False
False
>>> thing != True # True != True
False
>>> thing != False # True != False
True
>>> thing.attribute
True
>>> thing.method()
True
>>> thing.attribute.method()
True
>>> thing.method().attribute
True
"""
def __getattribute__(self, *args, **kwargs):
return self
def __call__(self, *args, **kwargs):
return self
def __repr__(*args, **kwargs):
return repr(True)
def __eq__(self, other):
return other is True
def __ne__(self, other):
return other is not True
@contextmanager
def mock(update):
returns = []
deletes = []
for key, value in update.items():
imports = key.split('.')
attr = imports.pop(-1)
module = __import__(imports[0], fromlist=imports[1:])
for modname in imports[1:]:
module = getattr(module, modname)
if hasattr(module, attr):
returns.append((module, attr, getattr(module, attr)))
else:
deletes.append((module, attr))
setattr(module, attr, value)
yield True
for module, attr, value in returns:
setattr(module, attr, value)
for module, attr in deletes:
delattr(module, attr)