diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 609f21b5..65884d95 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -348,67 +348,6 @@ class BaseObjectController(Controller): return headers - def _send_file(self, conn, path): - """Method for a file PUT coro""" - while True: - chunk = conn.queue.get() - if not conn.failed: - try: - with ChunkWriteTimeout(self.app.node_timeout): - conn.send(chunk) - except (Exception, ChunkWriteTimeout): - conn.failed = True - self.app.exception_occurred( - conn.node, _('Object'), - _('Trying to write to %s') % path) - conn.queue.task_done() - - def _connect_put_node(self, nodes, part, path, headers, - logger_thread_locals): - """ - Make a connection for a replicated object. - - Connects to the first working node that it finds in node_iter - and sends over the request headers. Returns an HTTPConnection - object to handle the rest of the streaming. - """ - self.app.logger.thread_locals = logger_thread_locals - for node in nodes: - try: - start_time = time.time() - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect( - node['ip'], node['port'], node['device'], part, 'PUT', - path, headers) - self.app.set_node_timing(node, time.time() - start_time) - with Timeout(self.app.node_timeout): - resp = conn.getexpect() - if resp.status == HTTP_CONTINUE: - conn.resp = None - conn.node = node - return conn - elif is_success(resp.status) or resp.status == HTTP_CONFLICT: - conn.resp = resp - conn.node = node - return conn - elif headers['If-None-Match'] is not None and \ - resp.status == HTTP_PRECONDITION_FAILED: - conn.resp = resp - conn.node = node - return conn - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.app.error_limit(node, _('ERROR Insufficient Storage')) - elif is_server_error(resp.status): - self.app.error_occurred( - node, - _('ERROR %(status)d Expect: 100-continue ' - 'From Object Server') % { - 'status': resp.status}) - except (Exception, Timeout): - self.app.exception_occurred( - node, _('Object'), - _('Expect: 100-continue on %s') % path) - def _await_response(self, conn, **kwargs): with Timeout(self.app.node_timeout): if conn.resp: @@ -732,6 +671,28 @@ class BaseObjectController(Controller): self._check_min_conn(req, conns, min_conns) + def _connect_put_node(self, nodes, part, path, headers, + logger_thread_locals): + """ + Make connection to storage nodes + + Connects to the first working node that it finds in nodes iter + and sends over the request headers. Returns an HTTPConnection + object to handle the rest of the streaming. + + This method must be implemented by each policy ObjectController. + + :param nodes: an iterator of the target storage nodes + :param partition: ring partition number + :param path: the object path to send to the storage node + :param headers: request headers + :param logger_thread_locals: The thread local values to be set on the + self.app.logger to retain transaction + logging information. + :return: HTTPConnection object + """ + raise NotImplementedError() + def _get_put_connections(self, req, nodes, partition, outgoing_headers, policy, expect): """ @@ -762,120 +723,23 @@ class BaseObjectController(Controller): {'conns': len(conns), 'nodes': min_conns}) raise HTTPServiceUnavailable(request=req) - def _transfer_data(self, req, data_source, conns, nodes): - """ - Transfer data for a replicated object. - - This method was added in the PUT method extraction change - """ - min_conns = quorum_size(len(nodes)) - bytes_transferred = 0 - try: - with ContextPool(len(nodes)) as pool: - for conn in conns: - conn.failed = False - conn.queue = Queue(self.app.put_queue_depth) - pool.spawn(self._send_file, conn, req.path) - while True: - with ChunkReadTimeout(self.app.client_timeout): - try: - chunk = next(data_source) - except StopIteration: - if req.is_chunked: - for conn in conns: - conn.queue.put('0\r\n\r\n') - break - bytes_transferred += len(chunk) - if bytes_transferred > constraints.MAX_FILE_SIZE: - raise HTTPRequestEntityTooLarge(request=req) - for conn in list(conns): - if not conn.failed: - conn.queue.put( - '%x\r\n%s\r\n' % (len(chunk), chunk) - if req.is_chunked else chunk) - else: - conn.close() - conns.remove(conn) - self._check_min_conn( - req, conns, min_conns, - msg='Object PUT exceptions during' - ' send, %(conns)s/%(nodes)s required connections') - for conn in conns: - if conn.queue.unfinished_tasks: - conn.queue.join() - conns = [conn for conn in conns if not conn.failed] - self._check_min_conn( - req, conns, min_conns, - msg='Object PUT exceptions after last send, ' - '%(conns)s/%(nodes)s required connections') - except ChunkReadTimeout as err: - self.app.logger.warn( - _('ERROR Client read timeout (%ss)'), err.seconds) - self.app.logger.increment('client_timeouts') - raise HTTPRequestTimeout(request=req) - except HTTPException: - raise - except (Exception, Timeout): - self.app.logger.exception( - _('ERROR Exception causing client disconnect')) - raise HTTPClientDisconnect(request=req) - if req.content_length and bytes_transferred < req.content_length: - req.client_disconnect = True - self.app.logger.warn( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - raise HTTPClientDisconnect(request=req) - def _store_object(self, req, data_source, nodes, partition, outgoing_headers): """ - Store a replicated object. - This method is responsible for establishing connection - with storage nodes and sending object to each one of those - nodes. After sending the data, the "best" response will be - returned based on statuses from all connections + with storage nodes and sending the data to each one of those + nodes. The process of transfering data is specific to each + Storage Policy, thus it is required for each policy specific + ObjectController to provide their own implementation of this method. + + :param req: the PUT Request + :param data_source: an iterator of the source of the data + :param nodes: an iterator of the target storage nodes + :param partition: ring partition number + :param outgoing_headers: system headers to storage nodes + :return: Response object """ - policy_index = req.headers.get('X-Backend-Storage-Policy-Index') - policy = POLICIES.get_by_index(policy_index) - if not nodes: - return HTTPNotFound() - - # RFC2616:8.2.3 disallows 100-continue without a body - if (req.content_length > 0) or req.is_chunked: - expect = True - else: - expect = False - conns = self._get_put_connections(req, nodes, partition, - outgoing_headers, policy, expect) - min_conns = quorum_size(len(nodes)) - try: - # check that a minimum number of connections were established and - # meet all the correct conditions set in the request - self._check_failure_put_connections(conns, req, nodes, min_conns) - - # transfer data - self._transfer_data(req, data_source, conns, nodes) - - # get responses - statuses, reasons, bodies, etags = self._get_put_responses( - req, conns, nodes) - except HTTPException as resp: - return resp - finally: - for conn in conns: - conn.close() - - if len(etags) > 1: - self.app.logger.error( - _('Object servers returned %s mismatched etags'), len(etags)) - return HTTPServerError(request=req) - etag = etags.pop() if len(etags) else None - resp = self.best_response(req, statuses, reasons, bodies, - _('Object PUT'), etag=etag) - resp.last_modified = math.ceil( - float(Timestamp(req.headers['X-Timestamp']))) - return resp + raise NotImplementedError() @public @cors_validation @@ -1133,6 +997,182 @@ class ReplicatedObjectController(BaseObjectController): req.swift_entity_path) return resp + def _connect_put_node(self, nodes, part, path, headers, + logger_thread_locals): + """ + Make a connection for a replicated object. + + Connects to the first working node that it finds in node_iter + and sends over the request headers. Returns an HTTPConnection + object to handle the rest of the streaming. + """ + self.app.logger.thread_locals = logger_thread_locals + for node in nodes: + try: + start_time = time.time() + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect( + node['ip'], node['port'], node['device'], part, 'PUT', + path, headers) + self.app.set_node_timing(node, time.time() - start_time) + with Timeout(self.app.node_timeout): + resp = conn.getexpect() + if resp.status == HTTP_CONTINUE: + conn.resp = None + conn.node = node + return conn + elif is_success(resp.status) or resp.status == HTTP_CONFLICT: + conn.resp = resp + conn.node = node + return conn + elif headers['If-None-Match'] is not None and \ + resp.status == HTTP_PRECONDITION_FAILED: + conn.resp = resp + conn.node = node + return conn + elif resp.status == HTTP_INSUFFICIENT_STORAGE: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(resp.status): + self.app.error_occurred( + node, + _('ERROR %(status)d Expect: 100-continue ' + 'From Object Server') % { + 'status': resp.status}) + except (Exception, Timeout): + self.app.exception_occurred( + node, _('Object'), + _('Expect: 100-continue on %s') % path) + + def _send_file(self, conn, path): + """Method for a file PUT coro""" + while True: + chunk = conn.queue.get() + if not conn.failed: + try: + with ChunkWriteTimeout(self.app.node_timeout): + conn.send(chunk) + except (Exception, ChunkWriteTimeout): + conn.failed = True + self.app.exception_occurred( + conn.node, _('Object'), + _('Trying to write to %s') % path) + conn.queue.task_done() + + def _transfer_data(self, req, data_source, conns, nodes): + """ + Transfer data for a replicated object. + + This method was added in the PUT method extraction change + """ + min_conns = quorum_size(len(nodes)) + bytes_transferred = 0 + try: + with ContextPool(len(nodes)) as pool: + for conn in conns: + conn.failed = False + conn.queue = Queue(self.app.put_queue_depth) + pool.spawn(self._send_file, conn, req.path) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + if req.is_chunked: + for conn in conns: + conn.queue.put('0\r\n\r\n') + break + bytes_transferred += len(chunk) + if bytes_transferred > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) + for conn in list(conns): + if not conn.failed: + conn.queue.put( + '%x\r\n%s\r\n' % (len(chunk), chunk) + if req.is_chunked else chunk) + else: + conn.close() + conns.remove(conn) + self._check_min_conn( + req, conns, min_conns, + msg='Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections') + for conn in conns: + if conn.queue.unfinished_tasks: + conn.queue.join() + conns = [conn for conn in conns if not conn.failed] + self._check_min_conn( + req, conns, min_conns, + msg='Object PUT exceptions after last send, ' + '%(conns)s/%(nodes)s required connections') + except ChunkReadTimeout as err: + self.app.logger.warn( + _('ERROR Client read timeout (%ss)'), err.seconds) + self.app.logger.increment('client_timeouts') + raise HTTPRequestTimeout(request=req) + except HTTPException: + raise + except (Exception, Timeout): + self.app.logger.exception( + _('ERROR Exception causing client disconnect')) + raise HTTPClientDisconnect(request=req) + if req.content_length and bytes_transferred < req.content_length: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) + + def _store_object(self, req, data_source, nodes, partition, + outgoing_headers): + """ + Store a replicated object. + + This method is responsible for establishing connection + with storage nodes and sending object to each one of those + nodes. After sending the data, the "best" response will be + returned based on statuses from all connections + """ + policy_index = req.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_index) + if not nodes: + return HTTPNotFound() + + # RFC2616:8.2.3 disallows 100-continue without a body + if (req.content_length > 0) or req.is_chunked: + expect = True + else: + expect = False + conns = self._get_put_connections(req, nodes, partition, + outgoing_headers, policy, expect) + min_conns = quorum_size(len(nodes)) + try: + # check that a minimum number of connections were established and + # meet all the correct conditions set in the request + self._check_failure_put_connections(conns, req, nodes, min_conns) + + # transfer data + self._transfer_data(req, data_source, conns, nodes) + + # get responses + statuses, reasons, bodies, etags = self._get_put_responses( + req, conns, nodes) + except HTTPException as resp: + return resp + finally: + for conn in conns: + conn.close() + + if len(etags) > 1: + self.app.logger.error( + _('Object servers returned %s mismatched etags'), len(etags)) + return HTTPServerError(request=req) + etag = etags.pop() if len(etags) else None + resp = self.best_response(req, statuses, reasons, bodies, + _('Object PUT'), etag=etag) + resp.last_modified = math.ceil( + float(Timestamp(req.headers['X-Timestamp']))) + return resp + class ECAppIter(object): """