Updated object updater to keep track of successes so it doesn't retry updating nodes that it has already updated
This commit is contained in:
@@ -154,16 +154,20 @@ class ObjectUpdater(Daemon):
|
|||||||
renamer(update_path, os.path.join(device,
|
renamer(update_path, os.path.join(device,
|
||||||
'quarantined', 'objects', os.path.basename(update_path)))
|
'quarantined', 'objects', os.path.basename(update_path)))
|
||||||
return
|
return
|
||||||
|
successes = update.get('successes', [])
|
||||||
part, nodes = self.get_container_ring().get_nodes(
|
part, nodes = self.get_container_ring().get_nodes(
|
||||||
update['account'], update['container'])
|
update['account'], update['container'])
|
||||||
obj = '/%s/%s/%s' % \
|
obj = '/%s/%s/%s' % \
|
||||||
(update['account'], update['container'], update['obj'])
|
(update['account'], update['container'], update['obj'])
|
||||||
success = True
|
success = True
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
status = self.object_update(node, part, update['op'], obj,
|
if node['id'] not in successes:
|
||||||
|
status = self.object_update(node, part, update['op'], obj,
|
||||||
update['headers'])
|
update['headers'])
|
||||||
if not (200 <= status < 300) and status != 404:
|
if not (200 <= status < 300) and status != 404:
|
||||||
success = False
|
success = False
|
||||||
|
else:
|
||||||
|
successes.append(node['id'])
|
||||||
if success:
|
if success:
|
||||||
self.successes += 1
|
self.successes += 1
|
||||||
self.logger.debug('Update sent for %s %s' % (obj, update_path))
|
self.logger.debug('Update sent for %s %s' % (obj, update_path))
|
||||||
@@ -171,6 +175,8 @@ class ObjectUpdater(Daemon):
|
|||||||
else:
|
else:
|
||||||
self.failures += 1
|
self.failures += 1
|
||||||
self.logger.debug('Update failed for %s %s' % (obj, update_path))
|
self.logger.debug('Update failed for %s %s' % (obj, update_path))
|
||||||
|
update['successes'] = successes
|
||||||
|
pickle.dump(update, open(update_path,'w'))
|
||||||
|
|
||||||
def object_update(self, node, part, op, obj, headers):
|
def object_update(self, node, part, op, obj, headers):
|
||||||
"""
|
"""
|
||||||
|
@@ -122,13 +122,15 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
def accept(return_code):
|
def accept(return_codes):
|
||||||
|
codes = iter(return_codes)
|
||||||
try:
|
try:
|
||||||
events = []
|
events = []
|
||||||
for x in xrange(2):
|
for x in xrange(len(return_codes)):
|
||||||
with Timeout(3):
|
with Timeout(3):
|
||||||
sock, addr = bindsock.accept()
|
sock, addr = bindsock.accept()
|
||||||
events.append(spawn(accepter, sock, return_code))
|
events.append(
|
||||||
|
spawn(accepter, sock, codes.next()))
|
||||||
for event in events:
|
for event in events:
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
@@ -136,16 +138,21 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
event = spawn(accept, 201)
|
event = spawn(accept, [201,500])
|
||||||
for dev in cu.get_container_ring().devs:
|
for dev in cu.get_container_ring().devs:
|
||||||
if dev is not None:
|
if dev is not None:
|
||||||
dev['port'] = bindsock.getsockname()[1]
|
dev['port'] = bindsock.getsockname()[1]
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
|
if err:
|
||||||
|
raise err
|
||||||
|
self.assert_(os.path.exists(op_path))
|
||||||
|
event = spawn(accept, [201])
|
||||||
|
cu.run_once()
|
||||||
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
raise err
|
raise err
|
||||||
self.assert_(not os.path.exists(op_path))
|
self.assert_(not os.path.exists(op_path))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Reference in New Issue
Block a user