Merge "move replication code to ReplicatedObjectController"
This commit is contained in:
commit
5d2103165c
@ -348,67 +348,6 @@ class BaseObjectController(Controller):
|
||||
|
||||
return headers
|
||||
|
||||
def _send_file(self, conn, path):
|
||||
"""Method for a file PUT coro"""
|
||||
while True:
|
||||
chunk = conn.queue.get()
|
||||
if not conn.failed:
|
||||
try:
|
||||
with ChunkWriteTimeout(self.app.node_timeout):
|
||||
conn.send(chunk)
|
||||
except (Exception, ChunkWriteTimeout):
|
||||
conn.failed = True
|
||||
self.app.exception_occurred(
|
||||
conn.node, _('Object'),
|
||||
_('Trying to write to %s') % path)
|
||||
conn.queue.task_done()
|
||||
|
||||
def _connect_put_node(self, nodes, part, path, headers,
|
||||
logger_thread_locals):
|
||||
"""
|
||||
Make a connection for a replicated object.
|
||||
|
||||
Connects to the first working node that it finds in node_iter
|
||||
and sends over the request headers. Returns an HTTPConnection
|
||||
object to handle the rest of the streaming.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
for node in nodes:
|
||||
try:
|
||||
start_time = time.time()
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'], part, 'PUT',
|
||||
path, headers)
|
||||
self.app.set_node_timing(node, time.time() - start_time)
|
||||
with Timeout(self.app.node_timeout):
|
||||
resp = conn.getexpect()
|
||||
if resp.status == HTTP_CONTINUE:
|
||||
conn.resp = None
|
||||
conn.node = node
|
||||
return conn
|
||||
elif is_success(resp.status) or resp.status == HTTP_CONFLICT:
|
||||
conn.resp = resp
|
||||
conn.node = node
|
||||
return conn
|
||||
elif headers['If-None-Match'] is not None and \
|
||||
resp.status == HTTP_PRECONDITION_FAILED:
|
||||
conn.resp = resp
|
||||
conn.node = node
|
||||
return conn
|
||||
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
elif is_server_error(resp.status):
|
||||
self.app.error_occurred(
|
||||
node,
|
||||
_('ERROR %(status)d Expect: 100-continue '
|
||||
'From Object Server') % {
|
||||
'status': resp.status})
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, _('Object'),
|
||||
_('Expect: 100-continue on %s') % path)
|
||||
|
||||
def _await_response(self, conn, **kwargs):
|
||||
with Timeout(self.app.node_timeout):
|
||||
if conn.resp:
|
||||
@ -732,6 +671,28 @@ class BaseObjectController(Controller):
|
||||
|
||||
self._check_min_conn(req, conns, min_conns)
|
||||
|
||||
def _connect_put_node(self, nodes, part, path, headers,
|
||||
logger_thread_locals):
|
||||
"""
|
||||
Make connection to storage nodes
|
||||
|
||||
Connects to the first working node that it finds in nodes iter
|
||||
and sends over the request headers. Returns an HTTPConnection
|
||||
object to handle the rest of the streaming.
|
||||
|
||||
This method must be implemented by each policy ObjectController.
|
||||
|
||||
:param nodes: an iterator of the target storage nodes
|
||||
:param partition: ring partition number
|
||||
:param path: the object path to send to the storage node
|
||||
:param headers: request headers
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.app.logger to retain transaction
|
||||
logging information.
|
||||
:return: HTTPConnection object
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_put_connections(self, req, nodes, partition, outgoing_headers,
|
||||
policy, expect):
|
||||
"""
|
||||
@ -762,120 +723,23 @@ class BaseObjectController(Controller):
|
||||
{'conns': len(conns), 'nodes': min_conns})
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
|
||||
def _transfer_data(self, req, data_source, conns, nodes):
|
||||
"""
|
||||
Transfer data for a replicated object.
|
||||
|
||||
This method was added in the PUT method extraction change
|
||||
"""
|
||||
min_conns = quorum_size(len(nodes))
|
||||
bytes_transferred = 0
|
||||
try:
|
||||
with ContextPool(len(nodes)) as pool:
|
||||
for conn in conns:
|
||||
conn.failed = False
|
||||
conn.queue = Queue(self.app.put_queue_depth)
|
||||
pool.spawn(self._send_file, conn, req.path)
|
||||
while True:
|
||||
with ChunkReadTimeout(self.app.client_timeout):
|
||||
try:
|
||||
chunk = next(data_source)
|
||||
except StopIteration:
|
||||
if req.is_chunked:
|
||||
for conn in conns:
|
||||
conn.queue.put('0\r\n\r\n')
|
||||
break
|
||||
bytes_transferred += len(chunk)
|
||||
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
||||
raise HTTPRequestEntityTooLarge(request=req)
|
||||
for conn in list(conns):
|
||||
if not conn.failed:
|
||||
conn.queue.put(
|
||||
'%x\r\n%s\r\n' % (len(chunk), chunk)
|
||||
if req.is_chunked else chunk)
|
||||
else:
|
||||
conn.close()
|
||||
conns.remove(conn)
|
||||
self._check_min_conn(
|
||||
req, conns, min_conns,
|
||||
msg='Object PUT exceptions during'
|
||||
' send, %(conns)s/%(nodes)s required connections')
|
||||
for conn in conns:
|
||||
if conn.queue.unfinished_tasks:
|
||||
conn.queue.join()
|
||||
conns = [conn for conn in conns if not conn.failed]
|
||||
self._check_min_conn(
|
||||
req, conns, min_conns,
|
||||
msg='Object PUT exceptions after last send, '
|
||||
'%(conns)s/%(nodes)s required connections')
|
||||
except ChunkReadTimeout as err:
|
||||
self.app.logger.warn(
|
||||
_('ERROR Client read timeout (%ss)'), err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
if req.content_length and bytes_transferred < req.content_length:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
def _store_object(self, req, data_source, nodes, partition,
|
||||
outgoing_headers):
|
||||
"""
|
||||
Store a replicated object.
|
||||
|
||||
This method is responsible for establishing connection
|
||||
with storage nodes and sending object to each one of those
|
||||
nodes. After sending the data, the "best" response will be
|
||||
returned based on statuses from all connections
|
||||
with storage nodes and sending the data to each one of those
|
||||
nodes. The process of transfering data is specific to each
|
||||
Storage Policy, thus it is required for each policy specific
|
||||
ObjectController to provide their own implementation of this method.
|
||||
|
||||
:param req: the PUT Request
|
||||
:param data_source: an iterator of the source of the data
|
||||
:param nodes: an iterator of the target storage nodes
|
||||
:param partition: ring partition number
|
||||
:param outgoing_headers: system headers to storage nodes
|
||||
:return: Response object
|
||||
"""
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
if not nodes:
|
||||
return HTTPNotFound()
|
||||
|
||||
# RFC2616:8.2.3 disallows 100-continue without a body
|
||||
if (req.content_length > 0) or req.is_chunked:
|
||||
expect = True
|
||||
else:
|
||||
expect = False
|
||||
conns = self._get_put_connections(req, nodes, partition,
|
||||
outgoing_headers, policy, expect)
|
||||
min_conns = quorum_size(len(nodes))
|
||||
try:
|
||||
# check that a minimum number of connections were established and
|
||||
# meet all the correct conditions set in the request
|
||||
self._check_failure_put_connections(conns, req, nodes, min_conns)
|
||||
|
||||
# transfer data
|
||||
self._transfer_data(req, data_source, conns, nodes)
|
||||
|
||||
# get responses
|
||||
statuses, reasons, bodies, etags = self._get_put_responses(
|
||||
req, conns, nodes)
|
||||
except HTTPException as resp:
|
||||
return resp
|
||||
finally:
|
||||
for conn in conns:
|
||||
conn.close()
|
||||
|
||||
if len(etags) > 1:
|
||||
self.app.logger.error(
|
||||
_('Object servers returned %s mismatched etags'), len(etags))
|
||||
return HTTPServerError(request=req)
|
||||
etag = etags.pop() if len(etags) else None
|
||||
resp = self.best_response(req, statuses, reasons, bodies,
|
||||
_('Object PUT'), etag=etag)
|
||||
resp.last_modified = math.ceil(
|
||||
float(Timestamp(req.headers['X-Timestamp'])))
|
||||
return resp
|
||||
raise NotImplementedError()
|
||||
|
||||
@public
|
||||
@cors_validation
|
||||
@ -1133,6 +997,182 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
req.swift_entity_path)
|
||||
return resp
|
||||
|
||||
def _connect_put_node(self, nodes, part, path, headers,
|
||||
logger_thread_locals):
|
||||
"""
|
||||
Make a connection for a replicated object.
|
||||
|
||||
Connects to the first working node that it finds in node_iter
|
||||
and sends over the request headers. Returns an HTTPConnection
|
||||
object to handle the rest of the streaming.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
for node in nodes:
|
||||
try:
|
||||
start_time = time.time()
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'], part, 'PUT',
|
||||
path, headers)
|
||||
self.app.set_node_timing(node, time.time() - start_time)
|
||||
with Timeout(self.app.node_timeout):
|
||||
resp = conn.getexpect()
|
||||
if resp.status == HTTP_CONTINUE:
|
||||
conn.resp = None
|
||||
conn.node = node
|
||||
return conn
|
||||
elif is_success(resp.status) or resp.status == HTTP_CONFLICT:
|
||||
conn.resp = resp
|
||||
conn.node = node
|
||||
return conn
|
||||
elif headers['If-None-Match'] is not None and \
|
||||
resp.status == HTTP_PRECONDITION_FAILED:
|
||||
conn.resp = resp
|
||||
conn.node = node
|
||||
return conn
|
||||
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
elif is_server_error(resp.status):
|
||||
self.app.error_occurred(
|
||||
node,
|
||||
_('ERROR %(status)d Expect: 100-continue '
|
||||
'From Object Server') % {
|
||||
'status': resp.status})
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, _('Object'),
|
||||
_('Expect: 100-continue on %s') % path)
|
||||
|
||||
def _send_file(self, conn, path):
|
||||
"""Method for a file PUT coro"""
|
||||
while True:
|
||||
chunk = conn.queue.get()
|
||||
if not conn.failed:
|
||||
try:
|
||||
with ChunkWriteTimeout(self.app.node_timeout):
|
||||
conn.send(chunk)
|
||||
except (Exception, ChunkWriteTimeout):
|
||||
conn.failed = True
|
||||
self.app.exception_occurred(
|
||||
conn.node, _('Object'),
|
||||
_('Trying to write to %s') % path)
|
||||
conn.queue.task_done()
|
||||
|
||||
def _transfer_data(self, req, data_source, conns, nodes):
|
||||
"""
|
||||
Transfer data for a replicated object.
|
||||
|
||||
This method was added in the PUT method extraction change
|
||||
"""
|
||||
min_conns = quorum_size(len(nodes))
|
||||
bytes_transferred = 0
|
||||
try:
|
||||
with ContextPool(len(nodes)) as pool:
|
||||
for conn in conns:
|
||||
conn.failed = False
|
||||
conn.queue = Queue(self.app.put_queue_depth)
|
||||
pool.spawn(self._send_file, conn, req.path)
|
||||
while True:
|
||||
with ChunkReadTimeout(self.app.client_timeout):
|
||||
try:
|
||||
chunk = next(data_source)
|
||||
except StopIteration:
|
||||
if req.is_chunked:
|
||||
for conn in conns:
|
||||
conn.queue.put('0\r\n\r\n')
|
||||
break
|
||||
bytes_transferred += len(chunk)
|
||||
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
||||
raise HTTPRequestEntityTooLarge(request=req)
|
||||
for conn in list(conns):
|
||||
if not conn.failed:
|
||||
conn.queue.put(
|
||||
'%x\r\n%s\r\n' % (len(chunk), chunk)
|
||||
if req.is_chunked else chunk)
|
||||
else:
|
||||
conn.close()
|
||||
conns.remove(conn)
|
||||
self._check_min_conn(
|
||||
req, conns, min_conns,
|
||||
msg='Object PUT exceptions during'
|
||||
' send, %(conns)s/%(nodes)s required connections')
|
||||
for conn in conns:
|
||||
if conn.queue.unfinished_tasks:
|
||||
conn.queue.join()
|
||||
conns = [conn for conn in conns if not conn.failed]
|
||||
self._check_min_conn(
|
||||
req, conns, min_conns,
|
||||
msg='Object PUT exceptions after last send, '
|
||||
'%(conns)s/%(nodes)s required connections')
|
||||
except ChunkReadTimeout as err:
|
||||
self.app.logger.warn(
|
||||
_('ERROR Client read timeout (%ss)'), err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
if req.content_length and bytes_transferred < req.content_length:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
def _store_object(self, req, data_source, nodes, partition,
|
||||
outgoing_headers):
|
||||
"""
|
||||
Store a replicated object.
|
||||
|
||||
This method is responsible for establishing connection
|
||||
with storage nodes and sending object to each one of those
|
||||
nodes. After sending the data, the "best" response will be
|
||||
returned based on statuses from all connections
|
||||
"""
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
if not nodes:
|
||||
return HTTPNotFound()
|
||||
|
||||
# RFC2616:8.2.3 disallows 100-continue without a body
|
||||
if (req.content_length > 0) or req.is_chunked:
|
||||
expect = True
|
||||
else:
|
||||
expect = False
|
||||
conns = self._get_put_connections(req, nodes, partition,
|
||||
outgoing_headers, policy, expect)
|
||||
min_conns = quorum_size(len(nodes))
|
||||
try:
|
||||
# check that a minimum number of connections were established and
|
||||
# meet all the correct conditions set in the request
|
||||
self._check_failure_put_connections(conns, req, nodes, min_conns)
|
||||
|
||||
# transfer data
|
||||
self._transfer_data(req, data_source, conns, nodes)
|
||||
|
||||
# get responses
|
||||
statuses, reasons, bodies, etags = self._get_put_responses(
|
||||
req, conns, nodes)
|
||||
except HTTPException as resp:
|
||||
return resp
|
||||
finally:
|
||||
for conn in conns:
|
||||
conn.close()
|
||||
|
||||
if len(etags) > 1:
|
||||
self.app.logger.error(
|
||||
_('Object servers returned %s mismatched etags'), len(etags))
|
||||
return HTTPServerError(request=req)
|
||||
etag = etags.pop() if len(etags) else None
|
||||
resp = self.best_response(req, statuses, reasons, bodies,
|
||||
_('Object PUT'), etag=etag)
|
||||
resp.last_modified = math.ceil(
|
||||
float(Timestamp(req.headers['X-Timestamp'])))
|
||||
return resp
|
||||
|
||||
|
||||
class ECAppIter(object):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user