object-updater now records successful updates in the pickle, so that in failure scenarios it doesn't have to retry the ones the were successful before.
This commit is contained in:
@@ -32,6 +32,9 @@ import ctypes.util
|
|||||||
import fcntl
|
import fcntl
|
||||||
import struct
|
import struct
|
||||||
from ConfigParser import ConfigParser
|
from ConfigParser import ConfigParser
|
||||||
|
from tempfile import mkstemp
|
||||||
|
import cPickle as pickle
|
||||||
|
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from eventlet import greenio, GreenPool, sleep, Timeout, listen
|
from eventlet import greenio, GreenPool, sleep, Timeout, listen
|
||||||
@@ -510,8 +513,8 @@ def unlink_older_than(path, mtime):
|
|||||||
"""
|
"""
|
||||||
Remove any file in a given path that that was last modified before mtime.
|
Remove any file in a given path that that was last modified before mtime.
|
||||||
|
|
||||||
:param path: Path to remove file from
|
:param path: path to remove file from
|
||||||
:mtime: Timestamp of oldest file to keep
|
:mtime: timestamp of oldest file to keep
|
||||||
"""
|
"""
|
||||||
if os.path.exists(path):
|
if os.path.exists(path):
|
||||||
for fname in os.listdir(path):
|
for fname in os.listdir(path):
|
||||||
@@ -524,6 +527,14 @@ def unlink_older_than(path, mtime):
|
|||||||
|
|
||||||
|
|
||||||
def item_from_env(env, item_name):
|
def item_from_env(env, item_name):
|
||||||
|
"""
|
||||||
|
Get a value from the wsgi environment
|
||||||
|
|
||||||
|
:param env: wsgi environment dict
|
||||||
|
:param item_name: name of item to get
|
||||||
|
|
||||||
|
:returns: the value from the environment
|
||||||
|
"""
|
||||||
item = env.get(item_name, None)
|
item = env.get(item_name, None)
|
||||||
if item is None:
|
if item is None:
|
||||||
logging.error("ERROR: %s could not be found in env!" % item_name)
|
logging.error("ERROR: %s could not be found in env!" % item_name)
|
||||||
@@ -531,10 +542,27 @@ def item_from_env(env, item_name):
|
|||||||
|
|
||||||
|
|
||||||
def cache_from_env(env):
|
def cache_from_env(env):
|
||||||
|
"""
|
||||||
|
Get memcache connection pool from the environment (which had been
|
||||||
|
previously set by the memcache middleware
|
||||||
|
|
||||||
|
:param env: wsgi environment dict
|
||||||
|
|
||||||
|
:returns: swift.common.memcached.MemcacheRing from environment
|
||||||
|
"""
|
||||||
return item_from_env(env, 'swift.cache')
|
return item_from_env(env, 'swift.cache')
|
||||||
|
|
||||||
|
|
||||||
def readconf(conf, section_name, log_name=None):
|
def readconf(conf, section_name, log_name=None):
|
||||||
|
"""
|
||||||
|
Read config file and return config items as a dict
|
||||||
|
|
||||||
|
:param conf: path to config file
|
||||||
|
:param section_name: config section to read
|
||||||
|
:param log_name: name to be used with logging (will use section_name if
|
||||||
|
not defined)
|
||||||
|
:returns: dict of config items
|
||||||
|
"""
|
||||||
c = ConfigParser()
|
c = ConfigParser()
|
||||||
if not c.read(conf):
|
if not c.read(conf):
|
||||||
print "Unable to read config file %s" % conf
|
print "Unable to read config file %s" % conf
|
||||||
@@ -550,3 +578,21 @@ def readconf(conf, section_name, log_name=None):
|
|||||||
else:
|
else:
|
||||||
conf['log_name'] = section_name
|
conf['log_name'] = section_name
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
|
|
||||||
|
def write_pickle(obj, dest, tmp):
|
||||||
|
"""
|
||||||
|
Ensure that a pickle file gets written to disk. The file
|
||||||
|
is first written to a tmp location, ensure it is synced to disk, then
|
||||||
|
perform a move to its final location
|
||||||
|
|
||||||
|
:param obj: python object to be pickled
|
||||||
|
:param dest: path of final destination file
|
||||||
|
:param tmp: path to tmp to use
|
||||||
|
"""
|
||||||
|
fd, tmppath = mkstemp(dir=tmp)
|
||||||
|
with os.fdopen(fd, 'wb') as fo:
|
||||||
|
pickle.dump(obj, fo)
|
||||||
|
fo.flush()
|
||||||
|
os.fsync(fd)
|
||||||
|
renamer(tmppath, dest)
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ from eventlet import sleep, Timeout
|
|||||||
|
|
||||||
from swift.common.utils import mkdirs, normalize_timestamp, \
|
from swift.common.utils import mkdirs, normalize_timestamp, \
|
||||||
storage_directory, hash_path, renamer, fallocate, \
|
storage_directory, hash_path, renamer, fallocate, \
|
||||||
split_path, drop_buffer_cache, get_logger
|
split_path, drop_buffer_cache, get_logger, write_pickle
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.constraints import check_object_creation, check_mount, \
|
from swift.common.constraints import check_object_creation, check_mount, \
|
||||||
check_float, check_utf8
|
check_float, check_utf8
|
||||||
@@ -300,15 +300,13 @@ class ObjectController(object):
|
|||||||
'%s:%s/%s transaction %s (saving for async update later)' %
|
'%s:%s/%s transaction %s (saving for async update later)' %
|
||||||
(ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))
|
(ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))
|
||||||
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
|
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
|
||||||
fd, tmppath = mkstemp(dir=os.path.join(self.devices, objdevice, 'tmp'))
|
ohash = hash_path(account, container, obj)
|
||||||
with os.fdopen(fd, 'wb') as fo:
|
write_pickle(
|
||||||
pickle.dump({'op': op, 'account': account, 'container': container,
|
{'op': op, 'account': account, 'container': container,
|
||||||
'obj': obj, 'headers': headers_out}, fo)
|
'obj': obj, 'headers': headers_out},
|
||||||
fo.flush()
|
os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
||||||
os.fsync(fd)
|
normalize_timestamp(headers_out['x-timestamp'])),
|
||||||
ohash = hash_path(account, container, obj)
|
os.path.join(self.devices, objdevice, 'tmp'))
|
||||||
renamer(tmppath, os.path.join(async_dir, ohash[-3:], ohash + '-' +
|
|
||||||
normalize_timestamp(headers_out['x-timestamp'])))
|
|
||||||
|
|
||||||
def POST(self, request):
|
def POST(self, request):
|
||||||
"""Handle HTTP POST requests for the Swift Object Server."""
|
"""Handle HTTP POST requests for the Swift Object Server."""
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ from eventlet import patcher, Timeout
|
|||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.exceptions import ConnectionTimeout
|
from swift.common.exceptions import ConnectionTimeout
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import get_logger, renamer
|
from swift.common.utils import get_logger, renamer, write_pickle
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.obj.server import ASYNCDIR
|
from swift.obj.server import ASYNCDIR
|
||||||
|
|
||||||
@@ -154,16 +154,20 @@ class ObjectUpdater(Daemon):
|
|||||||
renamer(update_path, os.path.join(device,
|
renamer(update_path, os.path.join(device,
|
||||||
'quarantined', 'objects', os.path.basename(update_path)))
|
'quarantined', 'objects', os.path.basename(update_path)))
|
||||||
return
|
return
|
||||||
|
successes = update.get('successes', [])
|
||||||
part, nodes = self.get_container_ring().get_nodes(
|
part, nodes = self.get_container_ring().get_nodes(
|
||||||
update['account'], update['container'])
|
update['account'], update['container'])
|
||||||
obj = '/%s/%s/%s' % \
|
obj = '/%s/%s/%s' % \
|
||||||
(update['account'], update['container'], update['obj'])
|
(update['account'], update['container'], update['obj'])
|
||||||
success = True
|
success = True
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
status = self.object_update(node, part, update['op'], obj,
|
if node['id'] not in successes:
|
||||||
|
status = self.object_update(node, part, update['op'], obj,
|
||||||
update['headers'])
|
update['headers'])
|
||||||
if not (200 <= status < 300) and status != 404:
|
if not (200 <= status < 300) and status != 404:
|
||||||
success = False
|
success = False
|
||||||
|
else:
|
||||||
|
successes.append(node['id'])
|
||||||
if success:
|
if success:
|
||||||
self.successes += 1
|
self.successes += 1
|
||||||
self.logger.debug('Update sent for %s %s' % (obj, update_path))
|
self.logger.debug('Update sent for %s %s' % (obj, update_path))
|
||||||
@@ -171,6 +175,8 @@ class ObjectUpdater(Daemon):
|
|||||||
else:
|
else:
|
||||||
self.failures += 1
|
self.failures += 1
|
||||||
self.logger.debug('Update failed for %s %s' % (obj, update_path))
|
self.logger.debug('Update failed for %s %s' % (obj, update_path))
|
||||||
|
update['successes'] = successes
|
||||||
|
write_pickle(update, update_path, os.path.join(device, 'tmp'))
|
||||||
|
|
||||||
def object_update(self, node, part, op, obj, headers):
|
def object_update(self, node, part, op, obj, headers):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
os.mkdir(self.devices_dir)
|
os.mkdir(self.devices_dir)
|
||||||
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
||||||
os.mkdir(self.sda1)
|
os.mkdir(self.sda1)
|
||||||
|
os.mkdir(os.path.join(self.sda1,'tmp'))
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
rmtree(self.testdir, ignore_errors=1)
|
rmtree(self.testdir, ignore_errors=1)
|
||||||
@@ -122,13 +123,15 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
def accept(return_code):
|
def accept(return_codes):
|
||||||
|
codes = iter(return_codes)
|
||||||
try:
|
try:
|
||||||
events = []
|
events = []
|
||||||
for x in xrange(2):
|
for x in xrange(len(return_codes)):
|
||||||
with Timeout(3):
|
with Timeout(3):
|
||||||
sock, addr = bindsock.accept()
|
sock, addr = bindsock.accept()
|
||||||
events.append(spawn(accepter, sock, return_code))
|
events.append(
|
||||||
|
spawn(accepter, sock, codes.next()))
|
||||||
for event in events:
|
for event in events:
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
@@ -136,16 +139,21 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
event = spawn(accept, 201)
|
event = spawn(accept, [201,500])
|
||||||
for dev in cu.get_container_ring().devs:
|
for dev in cu.get_container_ring().devs:
|
||||||
if dev is not None:
|
if dev is not None:
|
||||||
dev['port'] = bindsock.getsockname()[1]
|
dev['port'] = bindsock.getsockname()[1]
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
|
if err:
|
||||||
|
raise err
|
||||||
|
self.assert_(os.path.exists(op_path))
|
||||||
|
event = spawn(accept, [201])
|
||||||
|
cu.run_once()
|
||||||
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
raise err
|
raise err
|
||||||
self.assert_(not os.path.exists(op_path))
|
self.assert_(not os.path.exists(op_path))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user