diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 534053f418..fb1d0ae38c 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -348,7 +348,9 @@ class ObjectUpdater(Daemon): """ try: update = pickle.load(open(update_path, 'rb')) - except Exception: + except Exception as e: + if getattr(e, 'errno', None) == errno.ENOENT: + return self.logger.exception( _('ERROR Pickle problem, quarantining %s'), update_path) self.stats.quarantines += 1 @@ -356,6 +358,13 @@ class ObjectUpdater(Daemon): target_path = os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)) renamer(update_path, target_path, fsync=False) + try: + # If this was the last async_pending in the directory, + # then this will succeed. Otherwise, it'll fail, and + # that's okay. + os.rmdir(os.path.dirname(update_path)) + except OSError: + pass return def do_update(): diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index a1e83e7957..4a6aeb18c1 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -647,7 +647,6 @@ class TestObjectUpdater(unittest.TestCase): data, timestamp, policy) def test_obj_put_async_updates(self): - ts_iter = make_timestamp_iter() policies = list(POLICIES) random.shuffle(policies) @@ -664,8 +663,8 @@ class TestObjectUpdater(unittest.TestCase): def do_test(headers_out, expected, container_path=None): # write an async dfmanager = DiskFileManager(conf, daemon.logger) - self._write_async_update(dfmanager, next(ts_iter), policies[0], - headers=headers_out, + self._write_async_update(dfmanager, next(self.ts_iter), + policies[0], headers=headers_out, container_path=container_path) request_log = [] @@ -691,7 +690,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertFalse(os.listdir(async_dir)) daemon.logger.clear() - ts = next(ts_iter) + ts = next(self.ts_iter) # use a dict rather than HeaderKeyDict so we can vary the case of the # pickled headers headers_out = { @@ -1153,6 +1152,69 @@ class TestObjectUpdater(unittest.TestCase): daemon.logger.get_increment_counts()) self.assertFalse(os.listdir(async_dir)) # no async file + def test_obj_update_quarantine(self): + policies = list(POLICIES) + random.shuffle(policies) + + # setup updater + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + async_dir = os.path.join(self.sda1, get_async_dir(policies[0])) + os.mkdir(async_dir) + + ohash = hash_path('a', 'c', 'o') + odir = os.path.join(async_dir, ohash[-3:]) + mkdirs(odir) + op_path = os.path.join( + odir, + '%s-%s' % (ohash, next(self.ts_iter).internal)) + with open(op_path, 'wb') as async_pending: + async_pending.write(b'\xff') # invalid pickle + + with mocked_http_conn(): + with mock.patch('swift.obj.updater.dump_recon_cache'): + daemon.run_once() + + self.assertEqual( + {'quarantines': 1}, + daemon.logger.get_increment_counts()) + self.assertFalse(os.listdir(async_dir)) # no asyncs + + def test_obj_update_gone_missing(self): + # if you've got multiple updaters running (say, both a background + # and foreground process), process_object_update may get a file + # that doesn't exist + policies = list(POLICIES) + random.shuffle(policies) + + # setup updater + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + async_dir = os.path.join(self.sda1, get_async_dir(policies[0])) + os.mkdir(async_dir) + + ohash = hash_path('a', 'c', 'o') + odir = os.path.join(async_dir, ohash[-3:]) + mkdirs(odir) + op_path = os.path.join( + odir, + '%s-%s' % (ohash, next(self.ts_iter).internal)) + + with mocked_http_conn(): + with mock.patch('swift.obj.updater.dump_recon_cache'): + daemon.process_object_update(op_path, self.sda1, policies[0]) + self.assertEqual({}, daemon.logger.get_increment_counts()) + self.assertEqual(os.listdir(async_dir), [ohash[-3:]]) + self.assertFalse(os.listdir(odir)) + if __name__ == '__main__': unittest.main()