Multithread optimization for object updater
This change makes the object updater work the same as the container updater, which is that it now spawns one thread for each node that it needs to update. This should help optimize its performance/efficiency. Change-Id: If740b8c67c0d953e4b510913bd8594570d868da6
This commit is contained in:

committed by
John Dickinson

parent
9bc3b0ebd8
commit
196a73192f
@@ -21,7 +21,7 @@ import time
|
||||
from swift import gettext_ as _
|
||||
from random import random
|
||||
|
||||
from eventlet import patcher, Timeout
|
||||
from eventlet import spawn, patcher, Timeout
|
||||
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
@@ -216,20 +216,22 @@ class ObjectUpdater(Daemon):
|
||||
update['account'], update['container'])
|
||||
obj = '/%s/%s/%s' % \
|
||||
(update['account'], update['container'], update['obj'])
|
||||
headers_out = update['headers'].copy()
|
||||
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
|
||||
headers_out.setdefault('X-Backend-Storage-Policy-Index',
|
||||
str(policy_idx))
|
||||
events = [spawn(self.object_update,
|
||||
node, part, update['op'], obj, headers_out)
|
||||
for node in nodes if node['id'] not in successes]
|
||||
success = True
|
||||
new_successes = False
|
||||
for node in nodes:
|
||||
if node['id'] not in successes:
|
||||
headers = update['headers'].copy()
|
||||
headers.setdefault('X-Backend-Storage-Policy-Index',
|
||||
str(policy_idx))
|
||||
status = self.object_update(node, part, update['op'], obj,
|
||||
headers)
|
||||
if not is_success(status) and status != HTTP_NOT_FOUND:
|
||||
success = False
|
||||
else:
|
||||
successes.append(node['id'])
|
||||
new_successes = True
|
||||
for event in events:
|
||||
event_success, node_id = event.wait()
|
||||
if event_success is True:
|
||||
successes.append(node_id)
|
||||
new_successes = True
|
||||
else:
|
||||
success = False
|
||||
if success:
|
||||
self.successes += 1
|
||||
self.logger.increment('successes')
|
||||
@@ -247,7 +249,7 @@ class ObjectUpdater(Daemon):
|
||||
write_pickle(update, update_path, os.path.join(
|
||||
device, get_tmp_dir(policy_idx)))
|
||||
|
||||
def object_update(self, node, part, op, obj, headers):
|
||||
def object_update(self, node, part, op, obj, headers_out):
|
||||
"""
|
||||
Perform the object update to the container
|
||||
|
||||
@@ -255,10 +257,8 @@ class ObjectUpdater(Daemon):
|
||||
:param part: partition that holds the container
|
||||
:param op: operation performed (ex: 'POST' or 'DELETE')
|
||||
:param obj: object name being updated
|
||||
:param headers: headers to send with the update
|
||||
:param headers_out: headers to send with the update
|
||||
"""
|
||||
headers_out = headers.copy()
|
||||
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'],
|
||||
@@ -266,8 +266,10 @@ class ObjectUpdater(Daemon):
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
resp.read()
|
||||
return resp.status
|
||||
success = (is_success(resp.status) or
|
||||
resp.status == HTTP_NOT_FOUND)
|
||||
return (success, node['id'])
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(_('ERROR with remote server '
|
||||
'%(ip)s:%(port)s/%(device)s'), node)
|
||||
return HTTP_INTERNAL_SERVER_ERROR
|
||||
return HTTP_INTERNAL_SERVER_ERROR, node['id']
|
||||
|
Reference in New Issue
Block a user