Merge "Multithread optimization for object updater"

This commit is contained in:
Jenkins
2014-08-23 15:26:36 +00:00
committed by Gerrit Code Review

View File

@@ -21,7 +21,7 @@ import time
from swift import gettext_ as _ from swift import gettext_ as _
from random import random from random import random
from eventlet import patcher, Timeout from eventlet import spawn, patcher, Timeout
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout from swift.common.exceptions import ConnectionTimeout
@@ -216,20 +216,22 @@ class ObjectUpdater(Daemon):
update['account'], update['container']) update['account'], update['container'])
obj = '/%s/%s/%s' % \ obj = '/%s/%s/%s' % \
(update['account'], update['container'], update['obj']) (update['account'], update['container'], update['obj'])
headers_out = update['headers'].copy()
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
headers_out.setdefault('X-Backend-Storage-Policy-Index',
str(policy_idx))
events = [spawn(self.object_update,
node, part, update['op'], obj, headers_out)
for node in nodes if node['id'] not in successes]
success = True success = True
new_successes = False new_successes = False
for node in nodes: for event in events:
if node['id'] not in successes: event_success, node_id = event.wait()
headers = update['headers'].copy() if event_success is True:
headers.setdefault('X-Backend-Storage-Policy-Index', successes.append(node_id)
str(policy_idx)) new_successes = True
status = self.object_update(node, part, update['op'], obj, else:
headers) success = False
if not is_success(status) and status != HTTP_NOT_FOUND:
success = False
else:
successes.append(node['id'])
new_successes = True
if success: if success:
self.successes += 1 self.successes += 1
self.logger.increment('successes') self.logger.increment('successes')
@@ -247,7 +249,7 @@ class ObjectUpdater(Daemon):
write_pickle(update, update_path, os.path.join( write_pickle(update, update_path, os.path.join(
device, get_tmp_dir(policy_idx))) device, get_tmp_dir(policy_idx)))
def object_update(self, node, part, op, obj, headers): def object_update(self, node, part, op, obj, headers_out):
""" """
Perform the object update to the container Perform the object update to the container
@@ -255,10 +257,8 @@ class ObjectUpdater(Daemon):
:param part: partition that holds the container :param part: partition that holds the container
:param op: operation performed (ex: 'POST' or 'DELETE') :param op: operation performed (ex: 'POST' or 'DELETE')
:param obj: object name being updated :param obj: object name being updated
:param headers: headers to send with the update :param headers_out: headers to send with the update
""" """
headers_out = headers.copy()
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
try: try:
with ConnectionTimeout(self.conn_timeout): with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], conn = http_connect(node['ip'], node['port'], node['device'],
@@ -266,8 +266,10 @@ class ObjectUpdater(Daemon):
with Timeout(self.node_timeout): with Timeout(self.node_timeout):
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
return resp.status success = (is_success(resp.status) or
resp.status == HTTP_NOT_FOUND)
return (success, node['id'])
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception(_('ERROR with remote server ' self.logger.exception(_('ERROR with remote server '
'%(ip)s:%(port)s/%(device)s'), node) '%(ip)s:%(port)s/%(device)s'), node)
return HTTP_INTERNAL_SERVER_ERROR return HTTP_INTERNAL_SERVER_ERROR, node['id']