trunk merge

This commit is contained in:
Michael Barton
2011-03-02 20:25:49 +00:00
7 changed files with 262 additions and 57 deletions

View File

@@ -215,7 +215,7 @@ Configuring each node
Sample configuration files are provided with all defaults in line-by-line comments. Sample configuration files are provided with all defaults in line-by-line comments.
#. If your going to use the DevAuth (the default swift-auth-server), create #. If you're going to use the DevAuth (the default swift-auth-server), create
`/etc/swift/auth-server.conf` (you can skip this if you're going to use `/etc/swift/auth-server.conf` (you can skip this if you're going to use
Swauth):: Swauth)::

View File

@@ -38,13 +38,17 @@ TRY_COUNT = 3
# will be considered failed for ERROR_LIMIT_DURATION seconds. # will be considered failed for ERROR_LIMIT_DURATION seconds.
ERROR_LIMIT_COUNT = 10 ERROR_LIMIT_COUNT = 10
ERROR_LIMIT_TIME = 60 ERROR_LIMIT_TIME = 60
ERROR_LIMIT_DURATION = 300 ERROR_LIMIT_DURATION = 60
def md5hash(key): def md5hash(key):
return md5(key).hexdigest() return md5(key).hexdigest()
class MemcacheConnectionError(Exception):
pass
class MemcacheRing(object): class MemcacheRing(object):
""" """
Simple, consistent-hashed memcache client. Simple, consistent-hashed memcache client.
@@ -180,6 +184,7 @@ class MemcacheRing(object):
:param delta: amount to add to the value of key (or set as the value :param delta: amount to add to the value of key (or set as the value
if the key is not found) will be cast to an int if the key is not found) will be cast to an int
:param timeout: ttl in memcache :param timeout: ttl in memcache
:raises MemcacheConnectionError:
""" """
key = md5hash(key) key = md5hash(key)
command = 'incr' command = 'incr'
@@ -209,6 +214,7 @@ class MemcacheRing(object):
return ret return ret
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e)
raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, timeout=0): def decr(self, key, delta=1, timeout=0):
""" """
@@ -220,6 +226,7 @@ class MemcacheRing(object):
value to 0 if the key is not found) will be cast to value to 0 if the key is not found) will be cast to
an int an int
:param timeout: ttl in memcache :param timeout: ttl in memcache
:raises MemcacheConnectionError:
""" """
self.incr(key, delta=-delta, timeout=timeout) self.incr(key, delta=-delta, timeout=timeout)

View File

@@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound
from swift.common.utils import split_path, cache_from_env, get_logger from swift.common.utils import split_path, cache_from_env, get_logger
from swift.proxy.server import get_container_memcache_key from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class MaxSleepTimeHitError(Exception): class MaxSleepTimeHitError(Exception):
@@ -136,6 +137,7 @@ class RateLimitMiddleware(object):
:param max_rate: maximum rate allowed in requests per second :param max_rate: maximum rate allowed in requests per second
:raises: MaxSleepTimeHitError if max sleep time is exceeded. :raises: MaxSleepTimeHitError if max sleep time is exceeded.
''' '''
try:
now_m = int(round(time.time() * self.clock_accuracy)) now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate)) time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key, running_time_m = self.memcache_client.incr(key,
@@ -158,6 +160,8 @@ class RateLimitMiddleware(object):
need_to_sleep_m) need_to_sleep_m)
return float(need_to_sleep_m) / self.clock_accuracy return float(need_to_sleep_m) / self.clock_accuracy
except MemcacheConnectionError:
return 0
def handle_ratelimit(self, req, account_name, container_name, obj_name): def handle_ratelimit(self, req, account_name, container_name, obj_name):
''' '''

View File

@@ -159,11 +159,10 @@ class LogProcessor(object):
def get_object_data(self, swift_account, container_name, object_name, def get_object_data(self, swift_account, container_name, object_name,
compressed=False): compressed=False):
'''reads an object and yields its lines''' '''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account, code, o = self.internal_proxy.get_object(swift_account, container_name,
container_name,
object_name) object_name)
if code < 200 or code >= 300: if code < 200 or code >= 300:
return raise BadFileDownload()
last_part = '' last_part = ''
last_compressed_part = '' last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of # magic in the following zlib.decompressobj argument is courtesy of
@@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon):
already_processed_files = cPickle.loads(buf) already_processed_files = cPickle.loads(buf)
else: else:
already_processed_files = set() already_processed_files = set()
except Exception: except BadFileDownload:
already_processed_files = set() already_processed_files = set()
self.logger.debug(_('found %d processed files') % \ self.logger.debug(_('found %d processed files') % \
len(already_processed_files)) len(already_processed_files))
@@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon):
def multiprocess_collate(processor_args, logs_to_process, worker_count): def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''yield hourly data from logs_to_process''' '''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
list.
'''
results = [] results = []
in_queue = multiprocessing.Queue() in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue() out_queue = multiprocessing.Queue()
@@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
for x in logs_to_process: for x in logs_to_process:
in_queue.put(x) in_queue.put(x)
for _junk in range(worker_count): for _junk in range(worker_count):
in_queue.put(None) in_queue.put(None) # tell the worker to end
count = 0
while True: while True:
try: try:
item, data = out_queue.get_nowait() item, data = out_queue.get_nowait()
count += 1
if data:
yield item, data
if count >= len(logs_to_process):
# this implies that one result will come from every request
break
except Queue.Empty: except Queue.Empty:
time.sleep(.1) time.sleep(.01)
for r in results: else:
r.join() if not isinstance(data, BadFileDownload):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_args, in_queue, out_queue): def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate''' '''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args) p = LogProcessor(*processor_args)
while True: while True:
try: item = in_queue.get()
item = in_queue.get_nowait()
if item is None: if item is None:
# no more work to process
break break
except Queue.Empty: try:
time.sleep(.1)
else:
ret = p.process_one_file(*item) ret = p.process_one_file(*item)
except BadFileDownload, err:
ret = err
out_queue.put((item, ret)) out_queue.put((item, ret))

View File

@@ -21,12 +21,14 @@ from webob import Request
from swift.common.middleware import ratelimit from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class FakeMemcache(object): class FakeMemcache(object):
def __init__(self): def __init__(self):
self.store = {} self.store = {}
self.error_on_incr = False
def get(self, key): def get(self, key):
return self.store.get(key) return self.store.get(key)
@@ -36,6 +38,8 @@ class FakeMemcache(object):
return True return True
def incr(self, key, delta=1, timeout=0): def incr(self, key, delta=1, timeout=0):
if self.error_on_incr:
raise MemcacheConnectionError('Memcache restarting')
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta) self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
if self.store[key] < 0: if self.store[key] < 0:
self.store[key] = 0 self.store[key] = 0
@@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase):
start_response) start_response)
self._run(make_app_call, num_calls, current_rate) self._run(make_app_call, num_calls, current_rate)
def test_restarting_memcache(self):
current_rate = 2
num_calls = 5
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
req.environ['swift.cache'].error_on_incr = True
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
begin = time.time()
self._run(make_app_call, num_calls, current_rate, check_time=False)
time_took = time.time() - begin
self.assert_(round(time_took, 1) == 0) # no memcache, no limiting
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -50,6 +50,7 @@ class MockMemcached(object):
self.cache = {} self.cache = {}
self.down = False self.down = False
self.exc_on_delete = False self.exc_on_delete = False
self.read_return_none = False
def sendall(self, string): def sendall(self, string):
if self.down: if self.down:
@@ -110,6 +111,8 @@ class MockMemcached(object):
else: else:
self.outbuf += 'NOT_FOUND\r\n' self.outbuf += 'NOT_FOUND\r\n'
def readline(self): def readline(self):
if self.read_return_none:
return None
if self.down: if self.down:
raise Exception('mock is down') raise Exception('mock is down')
if '\n' in self.outbuf: if '\n' in self.outbuf:
@@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '6') self.assertEquals(memcache_client.get('some_key'), '6')
memcache_client.incr('some_key', delta=-15) memcache_client.incr('some_key', delta=-15)
self.assertEquals(memcache_client.get('some_key'), '0') self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15)
def test_decr(self): def test_decr(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211']) memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
@@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '11') self.assertEquals(memcache_client.get('some_key'), '11')
memcache_client.decr('some_key', delta=15) memcache_client.decr('some_key', delta=15)
self.assertEquals(memcache_client.get('some_key'), '0') self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.decr, 'some_key', delta=15)
def test_retry(self): def test_retry(self):
logging.getLogger().addHandler(NullLoggingHandler()) logging.getLogger().addHandler(NullLoggingHandler())

View File

@@ -15,9 +15,11 @@
import unittest import unittest
from test.unit import tmpfile from test.unit import tmpfile
import Queue
from swift.common import internal_proxy from swift.common import internal_proxy
from swift.stats import log_processor from swift.stats import log_processor
from swift.common.exceptions import ChunkReadTimeout
class FakeUploadApp(object): class FakeUploadApp(object):
@@ -33,6 +35,11 @@ class DumbLogger(object):
pass pass
class DumbInternalProxy(object): class DumbInternalProxy(object):
def __init__(self, code=200, timeout=False, bad_compressed=False):
self.code = code
self.timeout = timeout
self.bad_compressed = bad_compressed
def get_container_list(self, account, container, marker=None, def get_container_list(self, account, container, marker=None,
end_marker=None): end_marker=None):
n = '2010/03/14/13/obj1' n = '2010/03/14/13/obj1'
@@ -46,9 +53,13 @@ class DumbInternalProxy(object):
return [] return []
def get_object(self, account, container, object_name): def get_object(self, account, container, object_name):
code = 200
if object_name.endswith('.gz'): if object_name.endswith('.gz'):
# same data as below, compressed with gzip -9 if self.bad_compressed:
# invalid compressed data
def data():
yield '\xff\xff\xff\xff\xff\xff\xff'
else:
# 'obj\ndata', compressed with gzip -9
def data(): def data():
yield '\x1f\x8b\x08' yield '\x1f\x8b\x08'
yield '\x08"\xd79L' yield '\x08"\xd79L'
@@ -60,8 +71,10 @@ class DumbInternalProxy(object):
else: else:
def data(): def data():
yield 'obj\n' yield 'obj\n'
if self.timeout:
raise ChunkReadTimeout
yield 'data' yield 'data'
return code, data() return self.code, data()
class TestLogProcessor(unittest.TestCase): class TestLogProcessor(unittest.TestCase):
@@ -159,6 +172,19 @@ use = egg:swift#proxy
'prefix_query': 0}} 'prefix_query': 0}}
self.assertEquals(result, expected) self.assertEquals(result, expected)
def test_process_one_access_file_error(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
'access', 'a', 'c', 'o')
def test_get_container_listing(self): def test_get_container_listing(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy() p._internal_proxy = DumbInternalProxy()
@@ -193,6 +219,18 @@ use = egg:swift#proxy
result = list(p.get_object_data('a', 'c', 'o.gz', True)) result = list(p.get_object_data('a', 'c', 'o.gz', True))
self.assertEquals(result, expected) self.assertEquals(result, expected)
def test_get_object_data_errors(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
result = p.get_object_data('a', 'c', 'o.gz', True)
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(timeout=True)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
def test_get_stat_totals(self): def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy() stats_proxy_config = self.proxy_config.copy()
stats_proxy_config.update({ stats_proxy_config.update({
@@ -262,3 +300,130 @@ use = egg:swift#proxy
# these only work for Py2.7+ # these only work for Py2.7+
#self.assertIsInstance(k, str) #self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k)) self.assertTrue(isinstance(k, str), type(k))
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
type(ret))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data