diff --git a/swift/common/utils.py b/swift/common/utils.py index f2e186c03e..59c9ee7c3c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -32,6 +32,9 @@ import ctypes.util import fcntl import struct from ConfigParser import ConfigParser +from tempfile import mkstemp +import cPickle as pickle + import eventlet from eventlet import greenio, GreenPool, sleep, Timeout, listen @@ -510,8 +513,8 @@ def unlink_older_than(path, mtime): """ Remove any file in a given path that that was last modified before mtime. - :param path: Path to remove file from - :mtime: Timestamp of oldest file to keep + :param path: path to remove file from + :mtime: timestamp of oldest file to keep """ if os.path.exists(path): for fname in os.listdir(path): @@ -524,6 +527,14 @@ def unlink_older_than(path, mtime): def item_from_env(env, item_name): + """ + Get a value from the wsgi environment + + :param env: wsgi environment dict + :param item_name: name of item to get + + :returns: the value from the environment + """ item = env.get(item_name, None) if item is None: logging.error("ERROR: %s could not be found in env!" % item_name) @@ -531,10 +542,27 @@ def item_from_env(env, item_name): def cache_from_env(env): + """ + Get memcache connection pool from the environment (which had been + previously set by the memcache middleware + + :param env: wsgi environment dict + + :returns: swift.common.memcached.MemcacheRing from environment + """ return item_from_env(env, 'swift.cache') def readconf(conf, section_name, log_name=None): + """ + Read config file and return config items as a dict + + :param conf: path to config file + :param section_name: config section to read + :param log_name: name to be used with logging (will use section_name if + not defined) + :returns: dict of config items + """ c = ConfigParser() if not c.read(conf): print "Unable to read config file %s" % conf @@ -550,3 +578,21 @@ def readconf(conf, section_name, log_name=None): else: conf['log_name'] = section_name return conf + + +def write_pickle(obj, dest, tmp): + """ + Ensure that a pickle file gets written to disk. The file + is first written to a tmp location, ensure it is synced to disk, then + perform a move to its final location + + :param obj: python object to be pickled + :param dest: path of final destination file + :param tmp: path to tmp to use + """ + fd, tmppath = mkstemp(dir=tmp) + with os.fdopen(fd, 'wb') as fo: + pickle.dump(obj, fo) + fo.flush() + os.fsync(fd) + renamer(tmppath, dest) diff --git a/swift/obj/server.py b/swift/obj/server.py index 0a73421923..fe26eebf20 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -37,7 +37,7 @@ from eventlet import sleep, Timeout from swift.common.utils import mkdirs, normalize_timestamp, \ storage_directory, hash_path, renamer, fallocate, \ - split_path, drop_buffer_cache, get_logger + split_path, drop_buffer_cache, get_logger, write_pickle from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 @@ -300,15 +300,13 @@ class ObjectController(object): '%s:%s/%s transaction %s (saving for async update later)' % (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-'))) async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) - fd, tmppath = mkstemp(dir=os.path.join(self.devices, objdevice, 'tmp')) - with os.fdopen(fd, 'wb') as fo: - pickle.dump({'op': op, 'account': account, 'container': container, - 'obj': obj, 'headers': headers_out}, fo) - fo.flush() - os.fsync(fd) - ohash = hash_path(account, container, obj) - renamer(tmppath, os.path.join(async_dir, ohash[-3:], ohash + '-' + - normalize_timestamp(headers_out['x-timestamp']))) + ohash = hash_path(account, container, obj) + write_pickle( + {'op': op, 'account': account, 'container': container, + 'obj': obj, 'headers': headers_out}, + os.path.join(async_dir, ohash[-3:], ohash + '-' + + normalize_timestamp(headers_out['x-timestamp'])), + os.path.join(self.devices, objdevice, 'tmp')) def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 40121e4b9d..3d6a15cc4f 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -25,7 +25,7 @@ from eventlet import patcher, Timeout from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring -from swift.common.utils import get_logger, renamer +from swift.common.utils import get_logger, renamer, write_pickle from swift.common.daemon import Daemon from swift.obj.server import ASYNCDIR @@ -154,16 +154,20 @@ class ObjectUpdater(Daemon): renamer(update_path, os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path))) return + successes = update.get('successes', []) part, nodes = self.get_container_ring().get_nodes( update['account'], update['container']) obj = '/%s/%s/%s' % \ (update['account'], update['container'], update['obj']) success = True for node in nodes: - status = self.object_update(node, part, update['op'], obj, + if node['id'] not in successes: + status = self.object_update(node, part, update['op'], obj, update['headers']) - if not (200 <= status < 300) and status != 404: - success = False + if not (200 <= status < 300) and status != 404: + success = False + else: + successes.append(node['id']) if success: self.successes += 1 self.logger.debug('Update sent for %s %s' % (obj, update_path)) @@ -171,6 +175,8 @@ class ObjectUpdater(Daemon): else: self.failures += 1 self.logger.debug('Update failed for %s %s' % (obj, update_path)) + update['successes'] = successes + write_pickle(update, update_path, os.path.join(device, 'tmp')) def object_update(self, node, part, op, obj, headers): """ diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 0064e2cbd2..9887c6fcaf 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -48,6 +48,7 @@ class TestObjectUpdater(unittest.TestCase): os.mkdir(self.devices_dir) self.sda1 = os.path.join(self.devices_dir, 'sda1') os.mkdir(self.sda1) + os.mkdir(os.path.join(self.sda1,'tmp')) def tearDown(self): rmtree(self.testdir, ignore_errors=1) @@ -122,13 +123,15 @@ class TestObjectUpdater(unittest.TestCase): except BaseException, err: return err return None - def accept(return_code): + def accept(return_codes): + codes = iter(return_codes) try: events = [] - for x in xrange(2): + for x in xrange(len(return_codes)): with Timeout(3): sock, addr = bindsock.accept() - events.append(spawn(accepter, sock, return_code)) + events.append( + spawn(accepter, sock, codes.next())) for event in events: err = event.wait() if err: @@ -136,16 +139,21 @@ class TestObjectUpdater(unittest.TestCase): except BaseException, err: return err return None - event = spawn(accept, 201) + event = spawn(accept, [201,500]) for dev in cu.get_container_ring().devs: if dev is not None: dev['port'] = bindsock.getsockname()[1] cu.run_once() err = event.wait() + if err: + raise err + self.assert_(os.path.exists(op_path)) + event = spawn(accept, [201]) + cu.run_once() + err = event.wait() if err: raise err self.assert_(not os.path.exists(op_path)) - if __name__ == '__main__': unittest.main()