diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 62ab9a39b9..b7e9d3e03b 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -21,7 +21,7 @@ import time from swift import gettext_ as _ from random import random -from eventlet import patcher, Timeout +from eventlet import spawn, patcher, Timeout from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout @@ -216,20 +216,22 @@ class ObjectUpdater(Daemon): update['account'], update['container']) obj = '/%s/%s/%s' % \ (update['account'], update['container'], update['obj']) + headers_out = update['headers'].copy() + headers_out['user-agent'] = 'object-updater %s' % os.getpid() + headers_out.setdefault('X-Backend-Storage-Policy-Index', + str(policy_idx)) + events = [spawn(self.object_update, + node, part, update['op'], obj, headers_out) + for node in nodes if node['id'] not in successes] success = True new_successes = False - for node in nodes: - if node['id'] not in successes: - headers = update['headers'].copy() - headers.setdefault('X-Backend-Storage-Policy-Index', - str(policy_idx)) - status = self.object_update(node, part, update['op'], obj, - headers) - if not is_success(status) and status != HTTP_NOT_FOUND: - success = False - else: - successes.append(node['id']) - new_successes = True + for event in events: + event_success, node_id = event.wait() + if event_success is True: + successes.append(node_id) + new_successes = True + else: + success = False if success: self.successes += 1 self.logger.increment('successes') @@ -247,7 +249,7 @@ class ObjectUpdater(Daemon): write_pickle(update, update_path, os.path.join( device, get_tmp_dir(policy_idx))) - def object_update(self, node, part, op, obj, headers): + def object_update(self, node, part, op, obj, headers_out): """ Perform the object update to the container @@ -255,10 +257,8 @@ class ObjectUpdater(Daemon): :param part: partition that holds the container :param op: operation performed (ex: 'POST' or 'DELETE') :param obj: object name being updated - :param headers: headers to send with the update + :param headers_out: headers to send with the update """ - headers_out = headers.copy() - headers_out['user-agent'] = 'object-updater %s' % os.getpid() try: with ConnectionTimeout(self.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], @@ -266,8 +266,10 @@ class ObjectUpdater(Daemon): with Timeout(self.node_timeout): resp = conn.getresponse() resp.read() - return resp.status + success = (is_success(resp.status) or + resp.status == HTTP_NOT_FOUND) + return (success, node['id']) except (Exception, Timeout): self.logger.exception(_('ERROR with remote server ' '%(ip)s:%(port)s/%(device)s'), node) - return HTTP_INTERNAL_SERVER_ERROR + return HTTP_INTERNAL_SERVER_ERROR, node['id']