Merge "proxy: stop sending chunks to objects with a Queue"

This commit is contained in:
Zuul
2019-11-14 10:28:54 +00:00
committed by Gerrit Code Review
4 changed files with 151 additions and 178 deletions

View File

@@ -1061,8 +1061,6 @@ recheck_account_existence before the 403s kick in.
This is a comma separated list of account hashes that ignore the max_containers_per_account cap. This is a comma separated list of account hashes that ignore the max_containers_per_account cap.
.IP \fBdeny_host_headers\fR .IP \fBdeny_host_headers\fR
Comma separated list of Host headers to which the proxy will deny requests. The default is empty. Comma separated list of Host headers to which the proxy will deny requests. The default is empty.
.IP \fBput_queue_depth\fR
Depth of the proxy put queue. The default is 10.
.IP \fBsorting_method\fR .IP \fBsorting_method\fR
Storage nodes can be chosen at random (shuffle - default), by using timing Storage nodes can be chosen at random (shuffle - default), by using timing
measurements (timing), or by using an explicit match (affinity). measurements (timing), or by using an explicit match (affinity).

View File

@@ -188,9 +188,6 @@ use = egg:swift#proxy
# Prefix used when automatically creating accounts. # Prefix used when automatically creating accounts.
# auto_create_account_prefix = . # auto_create_account_prefix = .
# #
# Depth of the proxy put queue.
# put_queue_depth = 10
#
# During GET and HEAD requests, storage nodes can be chosen at random # During GET and HEAD requests, storage nodes can be chosen at random
# (shuffle), by using timing measurements (timing), or by using an explicit # (shuffle), by using timing measurements (timing), or by using an explicit
# region/zone match (affinity). Using timing measurements may allow for lower # region/zone match (affinity). Using timing measurements may allow for lower

View File

@@ -876,6 +876,8 @@ class ReplicatedObjectController(BaseObjectController):
node, part, req.swift_entity_path, headers, node, part, req.swift_entity_path, headers,
conn_timeout=self.app.conn_timeout, conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout, node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
send_exception_handler=self.app.exception_occurred,
logger=self.app.logger, logger=self.app.logger,
need_multiphase=False) need_multiphase=False)
else: else:
@@ -884,6 +886,8 @@ class ReplicatedObjectController(BaseObjectController):
node, part, req.swift_entity_path, headers, node, part, req.swift_entity_path, headers,
conn_timeout=self.app.conn_timeout, conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout, node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
send_exception_handler=self.app.exception_occurred,
logger=self.app.logger, logger=self.app.logger,
chunked=te.endswith(',chunked')) chunked=te.endswith(',chunked'))
return putter return putter
@@ -910,42 +914,35 @@ class ReplicatedObjectController(BaseObjectController):
min_conns = quorum_size(len(nodes)) min_conns = quorum_size(len(nodes))
try: try:
with ContextPool(len(nodes)) as pool: while True:
for putter in putters: with ChunkReadTimeout(self.app.client_timeout):
putter.spawn_sender_greenthread( try:
pool, self.app.put_queue_depth, self.app.node_timeout, chunk = next(data_source)
self.app.exception_occurred) except StopIteration:
while True: break
with ChunkReadTimeout(self.app.client_timeout): bytes_transferred += len(chunk)
try: if bytes_transferred > constraints.MAX_FILE_SIZE:
chunk = next(data_source) raise HTTPRequestEntityTooLarge(request=req)
except StopIteration:
break
bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE:
raise HTTPRequestEntityTooLarge(request=req)
send_chunk(chunk) send_chunk(chunk)
ml = req.message_length() ml = req.message_length()
if ml and bytes_transferred < ml: if ml and bytes_transferred < ml:
req.client_disconnect = True req.client_disconnect = True
self.app.logger.warning( self.app.logger.warning(
_('Client disconnected without sending enough data')) _('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects') self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req) raise HTTPClientDisconnect(request=req)
trail_md = self._get_footers(req) trail_md = self._get_footers(req)
for putter in putters: for putter in putters:
# send any footers set by middleware # send any footers set by middleware
putter.end_of_object_data(footer_metadata=trail_md) putter.end_of_object_data(footer_metadata=trail_md)
for putter in putters: self._check_min_conn(
putter.wait() req, [p for p in putters if not p.failed], min_conns,
self._check_min_conn( msg=_('Object PUT exceptions after last send, '
req, [p for p in putters if not p.failed], min_conns, '%(conns)s/%(nodes)s required connections'))
msg=_('Object PUT exceptions after last send, '
'%(conns)s/%(nodes)s required connections'))
except ChunkReadTimeout as err: except ChunkReadTimeout as err:
self.app.logger.warning( self.app.logger.warning(
_('ERROR Client read timeout (%ss)'), err.seconds) _('ERROR Client read timeout (%ss)'), err.seconds)
@@ -1576,10 +1573,14 @@ class Putter(object):
:param resp: an HTTPResponse instance if connect() received final response :param resp: an HTTPResponse instance if connect() received final response
:param path: the object path to send to the storage node :param path: the object path to send to the storage node
:param connect_duration: time taken to initiate the HTTPConnection :param connect_duration: time taken to initiate the HTTPConnection
:param write_timeout: time limit to write a chunk to the connection socket
:param send_exception_handler: callback called when an exception occured
writing to the connection socket
:param logger: a Logger instance :param logger: a Logger instance
:param chunked: boolean indicating if the request encoding is chunked :param chunked: boolean indicating if the request encoding is chunked
""" """
def __init__(self, conn, node, resp, path, connect_duration, logger, def __init__(self, conn, node, resp, path, connect_duration,
write_timeout, send_exception_handler, logger,
chunked=False): chunked=False):
# Note: you probably want to call Putter.connect() instead of # Note: you probably want to call Putter.connect() instead of
# instantiating one of these directly. # instantiating one of these directly.
@@ -1588,11 +1589,12 @@ class Putter(object):
self.resp = self.final_resp = resp self.resp = self.final_resp = resp
self.path = path self.path = path
self.connect_duration = connect_duration self.connect_duration = connect_duration
self.write_timeout = write_timeout
self.send_exception_handler = send_exception_handler
# for handoff nodes node_index is None # for handoff nodes node_index is None
self.node_index = node.get('index') self.node_index = node.get('index')
self.failed = False self.failed = False
self.queue = None
self.state = NO_DATA_SENT self.state = NO_DATA_SENT
self.chunked = chunked self.chunked = chunked
self.logger = logger self.logger = logger
@@ -1624,16 +1626,6 @@ class Putter(object):
self.resp = self.conn.getresponse() self.resp = self.conn.getresponse()
return self.resp return self.resp
def spawn_sender_greenthread(self, pool, queue_depth, write_timeout,
exception_handler):
"""Call before sending the first chunk of request body"""
self.queue = Queue(queue_depth)
pool.spawn(self._send_file, write_timeout, exception_handler)
def wait(self):
if self.queue.unfinished_tasks:
self.queue.join()
def _start_object_data(self): def _start_object_data(self):
# Called immediately before the first chunk of object data is sent. # Called immediately before the first chunk of object data is sent.
# Subclasses may implement custom behaviour # Subclasses may implement custom behaviour
@@ -1653,7 +1645,7 @@ class Putter(object):
self._start_object_data() self._start_object_data()
self.state = SENDING_DATA self.state = SENDING_DATA
self.queue.put(chunk) self._send_chunk(chunk)
def end_of_object_data(self, **kwargs): def end_of_object_data(self, **kwargs):
""" """
@@ -1662,33 +1654,23 @@ class Putter(object):
if self.state == DATA_SENT: if self.state == DATA_SENT:
raise ValueError("called end_of_object_data twice") raise ValueError("called end_of_object_data twice")
self.queue.put(b'') self._send_chunk(b'')
self.state = DATA_SENT self.state = DATA_SENT
def _send_file(self, write_timeout, exception_handler): def _send_chunk(self, chunk):
""" if not self.failed:
Method for a file PUT coroutine. Takes chunks from a queue and sends if self.chunked:
them down a socket. to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
else:
If something goes wrong, the "failed" attribute will be set to true to_send = chunk
and the exception handler will be called. try:
""" with ChunkWriteTimeout(self.write_timeout):
while True: self.conn.send(to_send)
chunk = self.queue.get() except (Exception, ChunkWriteTimeout):
if not self.failed: self.failed = True
if self.chunked: self.send_exception_handler(self.node, _('Object'),
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk) _('Trying to write to %s')
else: % self.path)
to_send = chunk
try:
with ChunkWriteTimeout(write_timeout):
self.conn.send(to_send)
except (Exception, ChunkWriteTimeout):
self.failed = True
exception_handler(self.node, _('Object'),
_('Trying to write to %s') % self.path)
self.queue.task_done()
def close(self): def close(self):
# release reference to response to ensure connection really does close, # release reference to response to ensure connection really does close,
@@ -1725,7 +1707,8 @@ class Putter(object):
@classmethod @classmethod
def connect(cls, node, part, path, headers, conn_timeout, node_timeout, def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
logger=None, chunked=False, **kwargs): write_timeout, send_exception_handler, logger=None,
chunked=False, **kwargs):
""" """
Connect to a backend node and send the headers. Connect to a backend node and send the headers.
@@ -1738,7 +1721,8 @@ class Putter(object):
""" """
conn, expect_resp, final_resp, connect_duration = cls._make_connection( conn, expect_resp, final_resp, connect_duration = cls._make_connection(
node, part, path, headers, conn_timeout, node_timeout) node, part, path, headers, conn_timeout, node_timeout)
return cls(conn, node, final_resp, path, connect_duration, logger, return cls(conn, node, final_resp, path, connect_duration,
write_timeout, send_exception_handler, logger,
chunked=chunked) chunked=chunked)
@@ -1753,9 +1737,11 @@ class MIMEPutter(Putter):
An HTTP PUT request that supports streaming. An HTTP PUT request that supports streaming.
""" """
def __init__(self, conn, node, resp, req, connect_duration, def __init__(self, conn, node, resp, req, connect_duration,
logger, mime_boundary, multiphase=False): write_timeout, send_exception_handler, logger, mime_boundary,
multiphase=False):
super(MIMEPutter, self).__init__(conn, node, resp, req, super(MIMEPutter, self).__init__(conn, node, resp, req,
connect_duration, logger) connect_duration, write_timeout,
send_exception_handler, logger)
# Note: you probably want to call MimePutter.connect() instead of # Note: you probably want to call MimePutter.connect() instead of
# instantiating one of these directly. # instantiating one of these directly.
self.chunked = True # MIME requests always send chunked body self.chunked = True # MIME requests always send chunked body
@@ -1766,8 +1752,8 @@ class MIMEPutter(Putter):
# We're sending the object plus other stuff in the same request # We're sending the object plus other stuff in the same request
# body, all wrapped up in multipart MIME, so we'd better start # body, all wrapped up in multipart MIME, so we'd better start
# off the MIME document before sending any object data. # off the MIME document before sending any object data.
self.queue.put(b"--%s\r\nX-Document: object body\r\n\r\n" % self._send_chunk(b"--%s\r\nX-Document: object body\r\n\r\n" %
(self.mime_boundary,)) (self.mime_boundary,))
def end_of_object_data(self, footer_metadata=None): def end_of_object_data(self, footer_metadata=None):
""" """
@@ -1800,9 +1786,9 @@ class MIMEPutter(Putter):
footer_body, b"\r\n", footer_body, b"\r\n",
tail_boundary, b"\r\n", tail_boundary, b"\r\n",
] ]
self.queue.put(b"".join(message_parts)) self._send_chunk(b"".join(message_parts))
self.queue.put(b'') self._send_chunk(b'')
self.state = DATA_SENT self.state = DATA_SENT
def send_commit_confirmation(self): def send_commit_confirmation(self):
@@ -1827,14 +1813,15 @@ class MIMEPutter(Putter):
body, b"\r\n", body, b"\r\n",
tail_boundary, tail_boundary,
] ]
self.queue.put(b"".join(message_parts)) self._send_chunk(b"".join(message_parts))
self.queue.put(b'') self._send_chunk(b'')
self.state = COMMIT_SENT self.state = COMMIT_SENT
@classmethod @classmethod
def connect(cls, node, part, req, headers, conn_timeout, node_timeout, def connect(cls, node, part, req, headers, conn_timeout, node_timeout,
logger=None, need_multiphase=True, **kwargs): write_timeout, send_exception_handler, logger=None,
need_multiphase=True, **kwargs):
""" """
Connect to a backend node and send the headers. Connect to a backend node and send the headers.
@@ -1886,7 +1873,8 @@ class MIMEPutter(Putter):
if need_multiphase and not can_handle_multiphase_put: if need_multiphase and not can_handle_multiphase_put:
raise MultiphasePUTNotSupported() raise MultiphasePUTNotSupported()
return cls(conn, node, final_resp, req, connect_duration, logger, return cls(conn, node, final_resp, req, connect_duration,
write_timeout, send_exception_handler, logger,
mime_boundary, multiphase=need_multiphase) mime_boundary, multiphase=need_multiphase)
@@ -2499,6 +2487,8 @@ class ECObjectController(BaseObjectController):
node, part, req.swift_entity_path, headers, node, part, req.swift_entity_path, headers,
conn_timeout=self.app.conn_timeout, conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout, node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
send_exception_handler=self.app.exception_occurred,
logger=self.app.logger, logger=self.app.logger,
need_multiphase=True) need_multiphase=True)
@@ -2615,106 +2605,95 @@ class ECObjectController(BaseObjectController):
'%(conns)s/%(nodes)s required connections')) '%(conns)s/%(nodes)s required connections'))
try: try:
with ContextPool(len(putters)) as pool: # build our putter_to_frag_index dict to place handoffs in the
# same part nodes index as the primaries they are covering
putter_to_frag_index = self._determine_chunk_destinations(
putters, policy)
# build our putter_to_frag_index dict to place handoffs in the while True:
# same part nodes index as the primaries they are covering with ChunkReadTimeout(self.app.client_timeout):
putter_to_frag_index = self._determine_chunk_destinations( try:
putters, policy) chunk = next(data_source)
except StopIteration:
break
bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE:
raise HTTPRequestEntityTooLarge(request=req)
for putter in putters: send_chunk(chunk)
putter.spawn_sender_greenthread(
pool, self.app.put_queue_depth, self.app.node_timeout,
self.app.exception_occurred)
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = next(data_source)
except StopIteration:
break
bytes_transferred += len(chunk)
if bytes_transferred > constraints.MAX_FILE_SIZE:
raise HTTPRequestEntityTooLarge(request=req)
send_chunk(chunk) ml = req.message_length()
if ml and bytes_transferred < ml:
req.client_disconnect = True
self.app.logger.warning(
_('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req)
ml = req.message_length() send_chunk(b'') # flush out any buffered data
if ml and bytes_transferred < ml:
req.client_disconnect = True
self.app.logger.warning(
_('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req)
send_chunk(b'') # flush out any buffered data computed_etag = (etag_hasher.hexdigest()
if etag_hasher else None)
footers = self._get_footers(req)
received_etag = footers.get('etag', req.headers.get(
'etag', '')).strip('"')
if (computed_etag and received_etag and
computed_etag != received_etag):
raise HTTPUnprocessableEntity(request=req)
computed_etag = (etag_hasher.hexdigest() # Remove any EC reserved metadata names from footers
if etag_hasher else None) footers = {(k, v) for k, v in footers.items()
footers = self._get_footers(req) if not k.lower().startswith('x-object-sysmeta-ec-')}
received_etag = footers.get('etag', req.headers.get( for putter in putters:
'etag', '')).strip('"') frag_index = putter_to_frag_index[putter]
if (computed_etag and received_etag and # Update any footers set by middleware with EC footers
computed_etag != received_etag): trail_md = trailing_metadata(
raise HTTPUnprocessableEntity(request=req) policy, etag_hasher,
bytes_transferred, frag_index)
trail_md.update(footers)
# Etag footer must always be hash of what we sent
trail_md['Etag'] = frag_hashers[frag_index].hexdigest()
putter.end_of_object_data(footer_metadata=trail_md)
# Remove any EC reserved metadata names from footers # for storage policies requiring 2-phase commit (e.g.
footers = {(k, v) for k, v in footers.items() # erasure coding), enforce >= 'quorum' number of
if not k.lower().startswith('x-object-sysmeta-ec-')} # 100-continue responses - this indicates successful
for putter in putters: # object data and metadata commit and is a necessary
frag_index = putter_to_frag_index[putter] # condition to be met before starting 2nd PUT phase
# Update any footers set by middleware with EC footers final_phase = False
trail_md = trailing_metadata( statuses, reasons, bodies, _junk = \
policy, etag_hasher, self._get_put_responses(
bytes_transferred, frag_index) req, putters, len(nodes), final_phase=final_phase,
trail_md.update(footers) min_responses=min_conns)
# Etag footer must always be hash of what we sent if not self.have_quorum(
trail_md['Etag'] = frag_hashers[frag_index].hexdigest() statuses, len(nodes), quorum=min_conns):
putter.end_of_object_data(footer_metadata=trail_md) self.app.logger.error(
_('Not enough object servers ack\'ed (got %d)'),
statuses.count(HTTP_CONTINUE))
raise HTTPServiceUnavailable(request=req)
for putter in putters: elif not self._have_adequate_informational(
putter.wait() statuses, min_conns):
resp = self.best_response(req, statuses, reasons, bodies,
# for storage policies requiring 2-phase commit (e.g. _('Object PUT'),
# erasure coding), enforce >= 'quorum' number of quorum_size=min_conns)
# 100-continue responses - this indicates successful if is_client_error(resp.status_int):
# object data and metadata commit and is a necessary # if 4xx occurred in this state it is absolutely
# condition to be met before starting 2nd PUT phase # a bad conversation between proxy-server and
final_phase = False # object-server (even if it's
statuses, reasons, bodies, _junk = \ # HTTP_UNPROCESSABLE_ENTITY) so we should regard this
self._get_put_responses( # as HTTPServiceUnavailable.
req, putters, len(nodes), final_phase=final_phase,
min_responses=min_conns)
if not self.have_quorum(
statuses, len(nodes), quorum=min_conns):
self.app.logger.error(
_('Not enough object servers ack\'ed (got %d)'),
statuses.count(HTTP_CONTINUE))
raise HTTPServiceUnavailable(request=req) raise HTTPServiceUnavailable(request=req)
else:
# Other errors should use raw best_response
raise resp
elif not self._have_adequate_informational( # quorum achieved, start 2nd phase - send commit
statuses, min_conns): # confirmation to participating object servers
resp = self.best_response(req, statuses, reasons, bodies, # so they write a .durable state file indicating
_('Object PUT'), # a successful PUT
quorum_size=min_conns) for putter in putters:
if is_client_error(resp.status_int): putter.send_commit_confirmation()
# if 4xx occurred in this state it is absolutely
# a bad conversation between proxy-server and
# object-server (even if it's
# HTTP_UNPROCESSABLE_ENTITY) so we should regard this
# as HTTPServiceUnavailable.
raise HTTPServiceUnavailable(request=req)
else:
# Other errors should use raw best_response
raise resp
# quorum achieved, start 2nd phase - send commit
# confirmation to participating object servers
# so they write a .durable state file indicating
# a successful PUT
for putter in putters:
putter.send_commit_confirmation()
for putter in putters:
putter.wait()
except ChunkReadTimeout as err: except ChunkReadTimeout as err:
self.app.logger.warning( self.app.logger.warning(
_('ERROR Client read timeout (%ss)'), err.seconds) _('ERROR Client read timeout (%ss)'), err.seconds)

View File

@@ -191,7 +191,6 @@ class Application(object):
conf.get('recoverable_node_timeout', self.node_timeout)) conf.get('recoverable_node_timeout', self.node_timeout))
self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60)) self.client_timeout = int(conf.get('client_timeout', 60))
self.put_queue_depth = int(conf.get('put_queue_depth', 10))
self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.trans_id_suffix = conf.get('trans_id_suffix', '') self.trans_id_suffix = conf.get('trans_id_suffix', '')