diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 38c0475975..f7963b1b24 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -215,7 +215,7 @@ Configuring each node Sample configuration files are provided with all defaults in line-by-line comments. - #. If your going to use the DevAuth (the default swift-auth-server), create + #. If you're going to use the DevAuth (the default swift-auth-server), create `/etc/swift/auth-server.conf` (you can skip this if you're going to use Swauth):: diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 17378e1aae..41c75aaa65 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -38,13 +38,17 @@ TRY_COUNT = 3 # will be considered failed for ERROR_LIMIT_DURATION seconds. ERROR_LIMIT_COUNT = 10 ERROR_LIMIT_TIME = 60 -ERROR_LIMIT_DURATION = 300 +ERROR_LIMIT_DURATION = 60 def md5hash(key): return md5(key).hexdigest() +class MemcacheConnectionError(Exception): + pass + + class MemcacheRing(object): """ Simple, consistent-hashed memcache client. @@ -180,6 +184,7 @@ class MemcacheRing(object): :param delta: amount to add to the value of key (or set as the value if the key is not found) will be cast to an int :param timeout: ttl in memcache + :raises MemcacheConnectionError: """ key = md5hash(key) command = 'incr' @@ -209,6 +214,7 @@ class MemcacheRing(object): return ret except Exception, e: self._exception_occurred(server, e) + raise MemcacheConnectionError("No Memcached connections succeeded.") def decr(self, key, delta=1, timeout=0): """ @@ -220,6 +226,7 @@ class MemcacheRing(object): value to 0 if the key is not found) will be cast to an int :param timeout: ttl in memcache + :raises MemcacheConnectionError: """ self.incr(key, delta=-delta, timeout=timeout) diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 485b1db26e..836cb51bb2 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound from swift.common.utils import split_path, cache_from_env, get_logger from swift.proxy.server import get_container_memcache_key +from swift.common.memcached import MemcacheConnectionError class MaxSleepTimeHitError(Exception): @@ -136,28 +137,31 @@ class RateLimitMiddleware(object): :param max_rate: maximum rate allowed in requests per second :raises: MaxSleepTimeHitError if max sleep time is exceeded. ''' - now_m = int(round(time.time() * self.clock_accuracy)) - time_per_request_m = int(round(self.clock_accuracy / max_rate)) - running_time_m = self.memcache_client.incr(key, - delta=time_per_request_m) - need_to_sleep_m = 0 - if (now_m - running_time_m > - self.rate_buffer_seconds * self.clock_accuracy): - next_avail_time = int(now_m + time_per_request_m) - self.memcache_client.set(key, str(next_avail_time), - serialize=False) - else: - need_to_sleep_m = \ - max(running_time_m - now_m - time_per_request_m, 0) + try: + now_m = int(round(time.time() * self.clock_accuracy)) + time_per_request_m = int(round(self.clock_accuracy / max_rate)) + running_time_m = self.memcache_client.incr(key, + delta=time_per_request_m) + need_to_sleep_m = 0 + if (now_m - running_time_m > + self.rate_buffer_seconds * self.clock_accuracy): + next_avail_time = int(now_m + time_per_request_m) + self.memcache_client.set(key, str(next_avail_time), + serialize=False) + else: + need_to_sleep_m = \ + max(running_time_m - now_m - time_per_request_m, 0) - max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy - if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01: - # treat as no-op decrement time - self.memcache_client.decr(key, delta=time_per_request_m) - raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" % - need_to_sleep_m) + max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy + if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01: + # treat as no-op decrement time + self.memcache_client.decr(key, delta=time_per_request_m) + raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" % + need_to_sleep_m) - return float(need_to_sleep_m) / self.clock_accuracy + return float(need_to_sleep_m) / self.clock_accuracy + except MemcacheConnectionError: + return 0 def handle_ratelimit(self, req, account_name, container_name, obj_name): ''' diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 727e687f38..99a427e7ec 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -159,11 +159,10 @@ class LogProcessor(object): def get_object_data(self, swift_account, container_name, object_name, compressed=False): '''reads an object and yields its lines''' - code, o = self.internal_proxy.get_object(swift_account, - container_name, - object_name) + code, o = self.internal_proxy.get_object(swift_account, container_name, + object_name) if code < 200 or code >= 300: - return + raise BadFileDownload() last_part = '' last_compressed_part = '' # magic in the following zlib.decompressobj argument is courtesy of @@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon): already_processed_files = cPickle.loads(buf) else: already_processed_files = set() - except Exception: + except BadFileDownload: already_processed_files = set() self.logger.debug(_('found %d processed files') % \ len(already_processed_files)) @@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon): def multiprocess_collate(processor_args, logs_to_process, worker_count): - '''yield hourly data from logs_to_process''' + ''' + yield hourly data from logs_to_process + Every item that this function yields will be added to the processed files + list. + ''' results = [] in_queue = multiprocessing.Queue() out_queue = multiprocessing.Queue() @@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count): for x in logs_to_process: in_queue.put(x) for _junk in range(worker_count): - in_queue.put(None) - count = 0 + in_queue.put(None) # tell the worker to end while True: try: item, data = out_queue.get_nowait() - count += 1 - if data: - yield item, data - if count >= len(logs_to_process): - # this implies that one result will come from every request - break except Queue.Empty: - time.sleep(.1) - for r in results: - r.join() + time.sleep(.01) + else: + if not isinstance(data, BadFileDownload): + yield item, data + if not any(r.is_alive() for r in results) and out_queue.empty(): + # all the workers are done and nothing is in the queue + break def collate_worker(processor_args, in_queue, out_queue): '''worker process for multiprocess_collate''' p = LogProcessor(*processor_args) while True: + item = in_queue.get() + if item is None: + # no more work to process + break try: - item = in_queue.get_nowait() - if item is None: - break - except Queue.Empty: - time.sleep(.1) - else: ret = p.process_one_file(*item) - out_queue.put((item, ret)) + except BadFileDownload, err: + ret = err + out_queue.put((item, ret)) diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py index ef1abca91e..4afefb0351 100644 --- a/test/unit/common/middleware/test_ratelimit.py +++ b/test/unit/common/middleware/test_ratelimit.py @@ -21,12 +21,14 @@ from webob import Request from swift.common.middleware import ratelimit from swift.proxy.server import get_container_memcache_key +from swift.common.memcached import MemcacheConnectionError class FakeMemcache(object): def __init__(self): self.store = {} + self.error_on_incr = False def get(self, key): return self.store.get(key) @@ -36,6 +38,8 @@ class FakeMemcache(object): return True def incr(self, key, delta=1, timeout=0): + if self.error_on_incr: + raise MemcacheConnectionError('Memcache restarting') self.store[key] = int(self.store.setdefault(key, 0)) + int(delta) if self.store[key] < 0: self.store[key] = 0 @@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase): start_response) self._run(make_app_call, num_calls, current_rate) + def test_restarting_memcache(self): + current_rate = 2 + num_calls = 5 + conf_dict = {'account_ratelimit': current_rate} + self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp()) + ratelimit.http_connect = mock_http_connect(204) + req = Request.blank('/v/a') + req.environ['swift.cache'] = FakeMemcache() + req.environ['swift.cache'].error_on_incr = True + make_app_call = lambda: self.test_ratelimit(req.environ, + start_response) + begin = time.time() + self._run(make_app_call, num_calls, current_rate, check_time=False) + time_took = time.time() - begin + self.assert_(round(time_took, 1) == 0) # no memcache, no limiting if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 43f11650cf..b54f915dca 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -50,6 +50,7 @@ class MockMemcached(object): self.cache = {} self.down = False self.exc_on_delete = False + self.read_return_none = False def sendall(self, string): if self.down: @@ -110,6 +111,8 @@ class MockMemcached(object): else: self.outbuf += 'NOT_FOUND\r\n' def readline(self): + if self.read_return_none: + return None if self.down: raise Exception('mock is down') if '\n' in self.outbuf: @@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase): self.assertEquals(memcache_client.get('some_key'), '6') memcache_client.incr('some_key', delta=-15) self.assertEquals(memcache_client.get('some_key'), '0') + mock.read_return_none = True + self.assertRaises(memcached.MemcacheConnectionError, + memcache_client.incr, 'some_key', delta=-15) def test_decr(self): memcache_client = memcached.MemcacheRing(['1.2.3.4:11211']) @@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase): self.assertEquals(memcache_client.get('some_key'), '11') memcache_client.decr('some_key', delta=15) self.assertEquals(memcache_client.get('some_key'), '0') + mock.read_return_none = True + self.assertRaises(memcached.MemcacheConnectionError, + memcache_client.decr, 'some_key', delta=15) + def test_retry(self): logging.getLogger().addHandler(NullLoggingHandler()) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 75acc02123..c0625e9699 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -15,9 +15,11 @@ import unittest from test.unit import tmpfile +import Queue from swift.common import internal_proxy from swift.stats import log_processor +from swift.common.exceptions import ChunkReadTimeout class FakeUploadApp(object): @@ -33,6 +35,11 @@ class DumbLogger(object): pass class DumbInternalProxy(object): + def __init__(self, code=200, timeout=False, bad_compressed=False): + self.code = code + self.timeout = timeout + self.bad_compressed = bad_compressed + def get_container_list(self, account, container, marker=None, end_marker=None): n = '2010/03/14/13/obj1' @@ -46,22 +53,28 @@ class DumbInternalProxy(object): return [] def get_object(self, account, container, object_name): - code = 200 if object_name.endswith('.gz'): - # same data as below, compressed with gzip -9 - def data(): - yield '\x1f\x8b\x08' - yield '\x08"\xd79L' - yield '\x02\x03te' - yield 'st\x00\xcbO' - yield '\xca\xe2JI,I' - yield '\xe4\x02\x00O\xff' - yield '\xa3Y\t\x00\x00\x00' + if self.bad_compressed: + # invalid compressed data + def data(): + yield '\xff\xff\xff\xff\xff\xff\xff' + else: + # 'obj\ndata', compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' else: def data(): yield 'obj\n' + if self.timeout: + raise ChunkReadTimeout yield 'data' - return code, data() + return self.code, data() class TestLogProcessor(unittest.TestCase): @@ -159,6 +172,19 @@ use = egg:swift#proxy 'prefix_query': 0}} self.assertEquals(result, expected) + def test_process_one_access_file_error(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + self.assertRaises(log_processor.BadFileDownload, p.process_one_file, + 'access', 'a', 'c', 'o') + def test_get_container_listing(self): p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) p._internal_proxy = DumbInternalProxy() @@ -193,6 +219,18 @@ use = egg:swift#proxy result = list(p.get_object_data('a', 'c', 'o.gz', True)) self.assertEquals(result, expected) + def test_get_object_data_errors(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(bad_compressed=True) + result = p.get_object_data('a', 'c', 'o.gz', True) + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(timeout=True) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + def test_get_stat_totals(self): stats_proxy_config = self.proxy_config.copy() stats_proxy_config.update({ @@ -262,3 +300,130 @@ use = egg:swift#proxy # these only work for Py2.7+ #self.assertIsInstance(k, str) self.assertTrue(isinstance(k, str), type(k)) + + def test_collate_worker(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(ret, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_collate_worker_error(self): + def get_object_data(*a,**kw): + raise log_processor.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + # these only work for Py2.7+ + #self.assertIsInstance(ret, log_processor.BadFileDownload) + self.assertTrue(isinstance(ret, log_processor.BadFileDownload), + type(ret)) + finally: + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [(item, {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}})] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate_errors(self): + def get_object_data(*a,**kw): + raise log_processor.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data