From 196a73192f698f0151b85e4135bac19b47e52da2 Mon Sep 17 00:00:00 2001 From: Alex Pecoraro Date: Fri, 21 Feb 2014 10:55:08 -0800 Subject: [PATCH] Multithread optimization for object updater This change makes the object updater work the same as the container updater, which is that it now spawns one thread for each node that it needs to update. This should help optimize its performance/efficiency. Change-Id: If740b8c67c0d953e4b510913bd8594570d868da6 --- swift/obj/updater.py | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 62ab9a39b9..b7e9d3e03b 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -21,7 +21,7 @@ import time from swift import gettext_ as _ from random import random -from eventlet import patcher, Timeout +from eventlet import spawn, patcher, Timeout from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout @@ -216,20 +216,22 @@ class ObjectUpdater(Daemon): update['account'], update['container']) obj = '/%s/%s/%s' % \ (update['account'], update['container'], update['obj']) + headers_out = update['headers'].copy() + headers_out['user-agent'] = 'object-updater %s' % os.getpid() + headers_out.setdefault('X-Backend-Storage-Policy-Index', + str(policy_idx)) + events = [spawn(self.object_update, + node, part, update['op'], obj, headers_out) + for node in nodes if node['id'] not in successes] success = True new_successes = False - for node in nodes: - if node['id'] not in successes: - headers = update['headers'].copy() - headers.setdefault('X-Backend-Storage-Policy-Index', - str(policy_idx)) - status = self.object_update(node, part, update['op'], obj, - headers) - if not is_success(status) and status != HTTP_NOT_FOUND: - success = False - else: - successes.append(node['id']) - new_successes = True + for event in events: + event_success, node_id = event.wait() + if event_success is True: + successes.append(node_id) + new_successes = True + else: + success = False if success: self.successes += 1 self.logger.increment('successes') @@ -247,7 +249,7 @@ class ObjectUpdater(Daemon): write_pickle(update, update_path, os.path.join( device, get_tmp_dir(policy_idx))) - def object_update(self, node, part, op, obj, headers): + def object_update(self, node, part, op, obj, headers_out): """ Perform the object update to the container @@ -255,10 +257,8 @@ class ObjectUpdater(Daemon): :param part: partition that holds the container :param op: operation performed (ex: 'POST' or 'DELETE') :param obj: object name being updated - :param headers: headers to send with the update + :param headers_out: headers to send with the update """ - headers_out = headers.copy() - headers_out['user-agent'] = 'object-updater %s' % os.getpid() try: with ConnectionTimeout(self.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], @@ -266,8 +266,10 @@ class ObjectUpdater(Daemon): with Timeout(self.node_timeout): resp = conn.getresponse() resp.read() - return resp.status + success = (is_success(resp.status) or + resp.status == HTTP_NOT_FOUND) + return (success, node['id']) except (Exception, Timeout): self.logger.exception(_('ERROR with remote server ' '%(ip)s:%(port)s/%(device)s'), node) - return HTTP_INTERNAL_SERVER_ERROR + return HTTP_INTERNAL_SERVER_ERROR, node['id']