|
|
|
@@ -876,6 +876,8 @@ class ReplicatedObjectController(BaseObjectController):
|
|
|
|
|
node, part, req.swift_entity_path, headers,
|
|
|
|
|
conn_timeout=self.app.conn_timeout,
|
|
|
|
|
node_timeout=self.app.node_timeout,
|
|
|
|
|
write_timeout=self.app.node_timeout,
|
|
|
|
|
send_exception_handler=self.app.exception_occurred,
|
|
|
|
|
logger=self.app.logger,
|
|
|
|
|
need_multiphase=False)
|
|
|
|
|
else:
|
|
|
|
@@ -884,6 +886,8 @@ class ReplicatedObjectController(BaseObjectController):
|
|
|
|
|
node, part, req.swift_entity_path, headers,
|
|
|
|
|
conn_timeout=self.app.conn_timeout,
|
|
|
|
|
node_timeout=self.app.node_timeout,
|
|
|
|
|
write_timeout=self.app.node_timeout,
|
|
|
|
|
send_exception_handler=self.app.exception_occurred,
|
|
|
|
|
logger=self.app.logger,
|
|
|
|
|
chunked=te.endswith(',chunked'))
|
|
|
|
|
return putter
|
|
|
|
@@ -910,42 +914,35 @@ class ReplicatedObjectController(BaseObjectController):
|
|
|
|
|
|
|
|
|
|
min_conns = quorum_size(len(nodes))
|
|
|
|
|
try:
|
|
|
|
|
with ContextPool(len(nodes)) as pool:
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.spawn_sender_greenthread(
|
|
|
|
|
pool, self.app.put_queue_depth, self.app.node_timeout,
|
|
|
|
|
self.app.exception_occurred)
|
|
|
|
|
while True:
|
|
|
|
|
with ChunkReadTimeout(self.app.client_timeout):
|
|
|
|
|
try:
|
|
|
|
|
chunk = next(data_source)
|
|
|
|
|
except StopIteration:
|
|
|
|
|
break
|
|
|
|
|
bytes_transferred += len(chunk)
|
|
|
|
|
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
|
|
|
|
raise HTTPRequestEntityTooLarge(request=req)
|
|
|
|
|
while True:
|
|
|
|
|
with ChunkReadTimeout(self.app.client_timeout):
|
|
|
|
|
try:
|
|
|
|
|
chunk = next(data_source)
|
|
|
|
|
except StopIteration:
|
|
|
|
|
break
|
|
|
|
|
bytes_transferred += len(chunk)
|
|
|
|
|
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
|
|
|
|
raise HTTPRequestEntityTooLarge(request=req)
|
|
|
|
|
|
|
|
|
|
send_chunk(chunk)
|
|
|
|
|
send_chunk(chunk)
|
|
|
|
|
|
|
|
|
|
ml = req.message_length()
|
|
|
|
|
if ml and bytes_transferred < ml:
|
|
|
|
|
req.client_disconnect = True
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('Client disconnected without sending enough data'))
|
|
|
|
|
self.app.logger.increment('client_disconnects')
|
|
|
|
|
raise HTTPClientDisconnect(request=req)
|
|
|
|
|
ml = req.message_length()
|
|
|
|
|
if ml and bytes_transferred < ml:
|
|
|
|
|
req.client_disconnect = True
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('Client disconnected without sending enough data'))
|
|
|
|
|
self.app.logger.increment('client_disconnects')
|
|
|
|
|
raise HTTPClientDisconnect(request=req)
|
|
|
|
|
|
|
|
|
|
trail_md = self._get_footers(req)
|
|
|
|
|
for putter in putters:
|
|
|
|
|
# send any footers set by middleware
|
|
|
|
|
putter.end_of_object_data(footer_metadata=trail_md)
|
|
|
|
|
trail_md = self._get_footers(req)
|
|
|
|
|
for putter in putters:
|
|
|
|
|
# send any footers set by middleware
|
|
|
|
|
putter.end_of_object_data(footer_metadata=trail_md)
|
|
|
|
|
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.wait()
|
|
|
|
|
self._check_min_conn(
|
|
|
|
|
req, [p for p in putters if not p.failed], min_conns,
|
|
|
|
|
msg=_('Object PUT exceptions after last send, '
|
|
|
|
|
'%(conns)s/%(nodes)s required connections'))
|
|
|
|
|
self._check_min_conn(
|
|
|
|
|
req, [p for p in putters if not p.failed], min_conns,
|
|
|
|
|
msg=_('Object PUT exceptions after last send, '
|
|
|
|
|
'%(conns)s/%(nodes)s required connections'))
|
|
|
|
|
except ChunkReadTimeout as err:
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('ERROR Client read timeout (%ss)'), err.seconds)
|
|
|
|
@@ -1576,10 +1573,14 @@ class Putter(object):
|
|
|
|
|
:param resp: an HTTPResponse instance if connect() received final response
|
|
|
|
|
:param path: the object path to send to the storage node
|
|
|
|
|
:param connect_duration: time taken to initiate the HTTPConnection
|
|
|
|
|
:param write_timeout: time limit to write a chunk to the connection socket
|
|
|
|
|
:param send_exception_handler: callback called when an exception occured
|
|
|
|
|
writing to the connection socket
|
|
|
|
|
:param logger: a Logger instance
|
|
|
|
|
:param chunked: boolean indicating if the request encoding is chunked
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, conn, node, resp, path, connect_duration, logger,
|
|
|
|
|
def __init__(self, conn, node, resp, path, connect_duration,
|
|
|
|
|
write_timeout, send_exception_handler, logger,
|
|
|
|
|
chunked=False):
|
|
|
|
|
# Note: you probably want to call Putter.connect() instead of
|
|
|
|
|
# instantiating one of these directly.
|
|
|
|
@@ -1588,11 +1589,12 @@ class Putter(object):
|
|
|
|
|
self.resp = self.final_resp = resp
|
|
|
|
|
self.path = path
|
|
|
|
|
self.connect_duration = connect_duration
|
|
|
|
|
self.write_timeout = write_timeout
|
|
|
|
|
self.send_exception_handler = send_exception_handler
|
|
|
|
|
# for handoff nodes node_index is None
|
|
|
|
|
self.node_index = node.get('index')
|
|
|
|
|
|
|
|
|
|
self.failed = False
|
|
|
|
|
self.queue = None
|
|
|
|
|
self.state = NO_DATA_SENT
|
|
|
|
|
self.chunked = chunked
|
|
|
|
|
self.logger = logger
|
|
|
|
@@ -1624,16 +1626,6 @@ class Putter(object):
|
|
|
|
|
self.resp = self.conn.getresponse()
|
|
|
|
|
return self.resp
|
|
|
|
|
|
|
|
|
|
def spawn_sender_greenthread(self, pool, queue_depth, write_timeout,
|
|
|
|
|
exception_handler):
|
|
|
|
|
"""Call before sending the first chunk of request body"""
|
|
|
|
|
self.queue = Queue(queue_depth)
|
|
|
|
|
pool.spawn(self._send_file, write_timeout, exception_handler)
|
|
|
|
|
|
|
|
|
|
def wait(self):
|
|
|
|
|
if self.queue.unfinished_tasks:
|
|
|
|
|
self.queue.join()
|
|
|
|
|
|
|
|
|
|
def _start_object_data(self):
|
|
|
|
|
# Called immediately before the first chunk of object data is sent.
|
|
|
|
|
# Subclasses may implement custom behaviour
|
|
|
|
@@ -1653,7 +1645,7 @@ class Putter(object):
|
|
|
|
|
self._start_object_data()
|
|
|
|
|
self.state = SENDING_DATA
|
|
|
|
|
|
|
|
|
|
self.queue.put(chunk)
|
|
|
|
|
self._send_chunk(chunk)
|
|
|
|
|
|
|
|
|
|
def end_of_object_data(self, **kwargs):
|
|
|
|
|
"""
|
|
|
|
@@ -1662,33 +1654,23 @@ class Putter(object):
|
|
|
|
|
if self.state == DATA_SENT:
|
|
|
|
|
raise ValueError("called end_of_object_data twice")
|
|
|
|
|
|
|
|
|
|
self.queue.put(b'')
|
|
|
|
|
self._send_chunk(b'')
|
|
|
|
|
self.state = DATA_SENT
|
|
|
|
|
|
|
|
|
|
def _send_file(self, write_timeout, exception_handler):
|
|
|
|
|
"""
|
|
|
|
|
Method for a file PUT coroutine. Takes chunks from a queue and sends
|
|
|
|
|
them down a socket.
|
|
|
|
|
|
|
|
|
|
If something goes wrong, the "failed" attribute will be set to true
|
|
|
|
|
and the exception handler will be called.
|
|
|
|
|
"""
|
|
|
|
|
while True:
|
|
|
|
|
chunk = self.queue.get()
|
|
|
|
|
if not self.failed:
|
|
|
|
|
if self.chunked:
|
|
|
|
|
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
|
|
|
|
|
else:
|
|
|
|
|
to_send = chunk
|
|
|
|
|
try:
|
|
|
|
|
with ChunkWriteTimeout(write_timeout):
|
|
|
|
|
self.conn.send(to_send)
|
|
|
|
|
except (Exception, ChunkWriteTimeout):
|
|
|
|
|
self.failed = True
|
|
|
|
|
exception_handler(self.node, _('Object'),
|
|
|
|
|
_('Trying to write to %s') % self.path)
|
|
|
|
|
|
|
|
|
|
self.queue.task_done()
|
|
|
|
|
def _send_chunk(self, chunk):
|
|
|
|
|
if not self.failed:
|
|
|
|
|
if self.chunked:
|
|
|
|
|
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
|
|
|
|
|
else:
|
|
|
|
|
to_send = chunk
|
|
|
|
|
try:
|
|
|
|
|
with ChunkWriteTimeout(self.write_timeout):
|
|
|
|
|
self.conn.send(to_send)
|
|
|
|
|
except (Exception, ChunkWriteTimeout):
|
|
|
|
|
self.failed = True
|
|
|
|
|
self.send_exception_handler(self.node, _('Object'),
|
|
|
|
|
_('Trying to write to %s')
|
|
|
|
|
% self.path)
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
# release reference to response to ensure connection really does close,
|
|
|
|
@@ -1725,7 +1707,8 @@ class Putter(object):
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
|
|
|
|
|
logger=None, chunked=False, **kwargs):
|
|
|
|
|
write_timeout, send_exception_handler, logger=None,
|
|
|
|
|
chunked=False, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
Connect to a backend node and send the headers.
|
|
|
|
|
|
|
|
|
@@ -1738,7 +1721,8 @@ class Putter(object):
|
|
|
|
|
"""
|
|
|
|
|
conn, expect_resp, final_resp, connect_duration = cls._make_connection(
|
|
|
|
|
node, part, path, headers, conn_timeout, node_timeout)
|
|
|
|
|
return cls(conn, node, final_resp, path, connect_duration, logger,
|
|
|
|
|
return cls(conn, node, final_resp, path, connect_duration,
|
|
|
|
|
write_timeout, send_exception_handler, logger,
|
|
|
|
|
chunked=chunked)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1753,9 +1737,11 @@ class MIMEPutter(Putter):
|
|
|
|
|
An HTTP PUT request that supports streaming.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, conn, node, resp, req, connect_duration,
|
|
|
|
|
logger, mime_boundary, multiphase=False):
|
|
|
|
|
write_timeout, send_exception_handler, logger, mime_boundary,
|
|
|
|
|
multiphase=False):
|
|
|
|
|
super(MIMEPutter, self).__init__(conn, node, resp, req,
|
|
|
|
|
connect_duration, logger)
|
|
|
|
|
connect_duration, write_timeout,
|
|
|
|
|
send_exception_handler, logger)
|
|
|
|
|
# Note: you probably want to call MimePutter.connect() instead of
|
|
|
|
|
# instantiating one of these directly.
|
|
|
|
|
self.chunked = True # MIME requests always send chunked body
|
|
|
|
@@ -1766,8 +1752,8 @@ class MIMEPutter(Putter):
|
|
|
|
|
# We're sending the object plus other stuff in the same request
|
|
|
|
|
# body, all wrapped up in multipart MIME, so we'd better start
|
|
|
|
|
# off the MIME document before sending any object data.
|
|
|
|
|
self.queue.put(b"--%s\r\nX-Document: object body\r\n\r\n" %
|
|
|
|
|
(self.mime_boundary,))
|
|
|
|
|
self._send_chunk(b"--%s\r\nX-Document: object body\r\n\r\n" %
|
|
|
|
|
(self.mime_boundary,))
|
|
|
|
|
|
|
|
|
|
def end_of_object_data(self, footer_metadata=None):
|
|
|
|
|
"""
|
|
|
|
@@ -1800,9 +1786,9 @@ class MIMEPutter(Putter):
|
|
|
|
|
footer_body, b"\r\n",
|
|
|
|
|
tail_boundary, b"\r\n",
|
|
|
|
|
]
|
|
|
|
|
self.queue.put(b"".join(message_parts))
|
|
|
|
|
self._send_chunk(b"".join(message_parts))
|
|
|
|
|
|
|
|
|
|
self.queue.put(b'')
|
|
|
|
|
self._send_chunk(b'')
|
|
|
|
|
self.state = DATA_SENT
|
|
|
|
|
|
|
|
|
|
def send_commit_confirmation(self):
|
|
|
|
@@ -1827,14 +1813,15 @@ class MIMEPutter(Putter):
|
|
|
|
|
body, b"\r\n",
|
|
|
|
|
tail_boundary,
|
|
|
|
|
]
|
|
|
|
|
self.queue.put(b"".join(message_parts))
|
|
|
|
|
self._send_chunk(b"".join(message_parts))
|
|
|
|
|
|
|
|
|
|
self.queue.put(b'')
|
|
|
|
|
self._send_chunk(b'')
|
|
|
|
|
self.state = COMMIT_SENT
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def connect(cls, node, part, req, headers, conn_timeout, node_timeout,
|
|
|
|
|
logger=None, need_multiphase=True, **kwargs):
|
|
|
|
|
write_timeout, send_exception_handler, logger=None,
|
|
|
|
|
need_multiphase=True, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
Connect to a backend node and send the headers.
|
|
|
|
|
|
|
|
|
@@ -1886,7 +1873,8 @@ class MIMEPutter(Putter):
|
|
|
|
|
if need_multiphase and not can_handle_multiphase_put:
|
|
|
|
|
raise MultiphasePUTNotSupported()
|
|
|
|
|
|
|
|
|
|
return cls(conn, node, final_resp, req, connect_duration, logger,
|
|
|
|
|
return cls(conn, node, final_resp, req, connect_duration,
|
|
|
|
|
write_timeout, send_exception_handler, logger,
|
|
|
|
|
mime_boundary, multiphase=need_multiphase)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -2499,6 +2487,8 @@ class ECObjectController(BaseObjectController):
|
|
|
|
|
node, part, req.swift_entity_path, headers,
|
|
|
|
|
conn_timeout=self.app.conn_timeout,
|
|
|
|
|
node_timeout=self.app.node_timeout,
|
|
|
|
|
write_timeout=self.app.node_timeout,
|
|
|
|
|
send_exception_handler=self.app.exception_occurred,
|
|
|
|
|
logger=self.app.logger,
|
|
|
|
|
need_multiphase=True)
|
|
|
|
|
|
|
|
|
@@ -2615,106 +2605,95 @@ class ECObjectController(BaseObjectController):
|
|
|
|
|
'%(conns)s/%(nodes)s required connections'))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with ContextPool(len(putters)) as pool:
|
|
|
|
|
# build our putter_to_frag_index dict to place handoffs in the
|
|
|
|
|
# same part nodes index as the primaries they are covering
|
|
|
|
|
putter_to_frag_index = self._determine_chunk_destinations(
|
|
|
|
|
putters, policy)
|
|
|
|
|
|
|
|
|
|
# build our putter_to_frag_index dict to place handoffs in the
|
|
|
|
|
# same part nodes index as the primaries they are covering
|
|
|
|
|
putter_to_frag_index = self._determine_chunk_destinations(
|
|
|
|
|
putters, policy)
|
|
|
|
|
while True:
|
|
|
|
|
with ChunkReadTimeout(self.app.client_timeout):
|
|
|
|
|
try:
|
|
|
|
|
chunk = next(data_source)
|
|
|
|
|
except StopIteration:
|
|
|
|
|
break
|
|
|
|
|
bytes_transferred += len(chunk)
|
|
|
|
|
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
|
|
|
|
raise HTTPRequestEntityTooLarge(request=req)
|
|
|
|
|
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.spawn_sender_greenthread(
|
|
|
|
|
pool, self.app.put_queue_depth, self.app.node_timeout,
|
|
|
|
|
self.app.exception_occurred)
|
|
|
|
|
while True:
|
|
|
|
|
with ChunkReadTimeout(self.app.client_timeout):
|
|
|
|
|
try:
|
|
|
|
|
chunk = next(data_source)
|
|
|
|
|
except StopIteration:
|
|
|
|
|
break
|
|
|
|
|
bytes_transferred += len(chunk)
|
|
|
|
|
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
|
|
|
|
raise HTTPRequestEntityTooLarge(request=req)
|
|
|
|
|
send_chunk(chunk)
|
|
|
|
|
|
|
|
|
|
send_chunk(chunk)
|
|
|
|
|
ml = req.message_length()
|
|
|
|
|
if ml and bytes_transferred < ml:
|
|
|
|
|
req.client_disconnect = True
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('Client disconnected without sending enough data'))
|
|
|
|
|
self.app.logger.increment('client_disconnects')
|
|
|
|
|
raise HTTPClientDisconnect(request=req)
|
|
|
|
|
|
|
|
|
|
ml = req.message_length()
|
|
|
|
|
if ml and bytes_transferred < ml:
|
|
|
|
|
req.client_disconnect = True
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('Client disconnected without sending enough data'))
|
|
|
|
|
self.app.logger.increment('client_disconnects')
|
|
|
|
|
raise HTTPClientDisconnect(request=req)
|
|
|
|
|
send_chunk(b'') # flush out any buffered data
|
|
|
|
|
|
|
|
|
|
send_chunk(b'') # flush out any buffered data
|
|
|
|
|
computed_etag = (etag_hasher.hexdigest()
|
|
|
|
|
if etag_hasher else None)
|
|
|
|
|
footers = self._get_footers(req)
|
|
|
|
|
received_etag = footers.get('etag', req.headers.get(
|
|
|
|
|
'etag', '')).strip('"')
|
|
|
|
|
if (computed_etag and received_etag and
|
|
|
|
|
computed_etag != received_etag):
|
|
|
|
|
raise HTTPUnprocessableEntity(request=req)
|
|
|
|
|
|
|
|
|
|
computed_etag = (etag_hasher.hexdigest()
|
|
|
|
|
if etag_hasher else None)
|
|
|
|
|
footers = self._get_footers(req)
|
|
|
|
|
received_etag = footers.get('etag', req.headers.get(
|
|
|
|
|
'etag', '')).strip('"')
|
|
|
|
|
if (computed_etag and received_etag and
|
|
|
|
|
computed_etag != received_etag):
|
|
|
|
|
raise HTTPUnprocessableEntity(request=req)
|
|
|
|
|
# Remove any EC reserved metadata names from footers
|
|
|
|
|
footers = {(k, v) for k, v in footers.items()
|
|
|
|
|
if not k.lower().startswith('x-object-sysmeta-ec-')}
|
|
|
|
|
for putter in putters:
|
|
|
|
|
frag_index = putter_to_frag_index[putter]
|
|
|
|
|
# Update any footers set by middleware with EC footers
|
|
|
|
|
trail_md = trailing_metadata(
|
|
|
|
|
policy, etag_hasher,
|
|
|
|
|
bytes_transferred, frag_index)
|
|
|
|
|
trail_md.update(footers)
|
|
|
|
|
# Etag footer must always be hash of what we sent
|
|
|
|
|
trail_md['Etag'] = frag_hashers[frag_index].hexdigest()
|
|
|
|
|
putter.end_of_object_data(footer_metadata=trail_md)
|
|
|
|
|
|
|
|
|
|
# Remove any EC reserved metadata names from footers
|
|
|
|
|
footers = {(k, v) for k, v in footers.items()
|
|
|
|
|
if not k.lower().startswith('x-object-sysmeta-ec-')}
|
|
|
|
|
for putter in putters:
|
|
|
|
|
frag_index = putter_to_frag_index[putter]
|
|
|
|
|
# Update any footers set by middleware with EC footers
|
|
|
|
|
trail_md = trailing_metadata(
|
|
|
|
|
policy, etag_hasher,
|
|
|
|
|
bytes_transferred, frag_index)
|
|
|
|
|
trail_md.update(footers)
|
|
|
|
|
# Etag footer must always be hash of what we sent
|
|
|
|
|
trail_md['Etag'] = frag_hashers[frag_index].hexdigest()
|
|
|
|
|
putter.end_of_object_data(footer_metadata=trail_md)
|
|
|
|
|
# for storage policies requiring 2-phase commit (e.g.
|
|
|
|
|
# erasure coding), enforce >= 'quorum' number of
|
|
|
|
|
# 100-continue responses - this indicates successful
|
|
|
|
|
# object data and metadata commit and is a necessary
|
|
|
|
|
# condition to be met before starting 2nd PUT phase
|
|
|
|
|
final_phase = False
|
|
|
|
|
statuses, reasons, bodies, _junk = \
|
|
|
|
|
self._get_put_responses(
|
|
|
|
|
req, putters, len(nodes), final_phase=final_phase,
|
|
|
|
|
min_responses=min_conns)
|
|
|
|
|
if not self.have_quorum(
|
|
|
|
|
statuses, len(nodes), quorum=min_conns):
|
|
|
|
|
self.app.logger.error(
|
|
|
|
|
_('Not enough object servers ack\'ed (got %d)'),
|
|
|
|
|
statuses.count(HTTP_CONTINUE))
|
|
|
|
|
raise HTTPServiceUnavailable(request=req)
|
|
|
|
|
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.wait()
|
|
|
|
|
|
|
|
|
|
# for storage policies requiring 2-phase commit (e.g.
|
|
|
|
|
# erasure coding), enforce >= 'quorum' number of
|
|
|
|
|
# 100-continue responses - this indicates successful
|
|
|
|
|
# object data and metadata commit and is a necessary
|
|
|
|
|
# condition to be met before starting 2nd PUT phase
|
|
|
|
|
final_phase = False
|
|
|
|
|
statuses, reasons, bodies, _junk = \
|
|
|
|
|
self._get_put_responses(
|
|
|
|
|
req, putters, len(nodes), final_phase=final_phase,
|
|
|
|
|
min_responses=min_conns)
|
|
|
|
|
if not self.have_quorum(
|
|
|
|
|
statuses, len(nodes), quorum=min_conns):
|
|
|
|
|
self.app.logger.error(
|
|
|
|
|
_('Not enough object servers ack\'ed (got %d)'),
|
|
|
|
|
statuses.count(HTTP_CONTINUE))
|
|
|
|
|
elif not self._have_adequate_informational(
|
|
|
|
|
statuses, min_conns):
|
|
|
|
|
resp = self.best_response(req, statuses, reasons, bodies,
|
|
|
|
|
_('Object PUT'),
|
|
|
|
|
quorum_size=min_conns)
|
|
|
|
|
if is_client_error(resp.status_int):
|
|
|
|
|
# if 4xx occurred in this state it is absolutely
|
|
|
|
|
# a bad conversation between proxy-server and
|
|
|
|
|
# object-server (even if it's
|
|
|
|
|
# HTTP_UNPROCESSABLE_ENTITY) so we should regard this
|
|
|
|
|
# as HTTPServiceUnavailable.
|
|
|
|
|
raise HTTPServiceUnavailable(request=req)
|
|
|
|
|
else:
|
|
|
|
|
# Other errors should use raw best_response
|
|
|
|
|
raise resp
|
|
|
|
|
|
|
|
|
|
elif not self._have_adequate_informational(
|
|
|
|
|
statuses, min_conns):
|
|
|
|
|
resp = self.best_response(req, statuses, reasons, bodies,
|
|
|
|
|
_('Object PUT'),
|
|
|
|
|
quorum_size=min_conns)
|
|
|
|
|
if is_client_error(resp.status_int):
|
|
|
|
|
# if 4xx occurred in this state it is absolutely
|
|
|
|
|
# a bad conversation between proxy-server and
|
|
|
|
|
# object-server (even if it's
|
|
|
|
|
# HTTP_UNPROCESSABLE_ENTITY) so we should regard this
|
|
|
|
|
# as HTTPServiceUnavailable.
|
|
|
|
|
raise HTTPServiceUnavailable(request=req)
|
|
|
|
|
else:
|
|
|
|
|
# Other errors should use raw best_response
|
|
|
|
|
raise resp
|
|
|
|
|
|
|
|
|
|
# quorum achieved, start 2nd phase - send commit
|
|
|
|
|
# confirmation to participating object servers
|
|
|
|
|
# so they write a .durable state file indicating
|
|
|
|
|
# a successful PUT
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.send_commit_confirmation()
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.wait()
|
|
|
|
|
# quorum achieved, start 2nd phase - send commit
|
|
|
|
|
# confirmation to participating object servers
|
|
|
|
|
# so they write a .durable state file indicating
|
|
|
|
|
# a successful PUT
|
|
|
|
|
for putter in putters:
|
|
|
|
|
putter.send_commit_confirmation()
|
|
|
|
|
except ChunkReadTimeout as err:
|
|
|
|
|
self.app.logger.warning(
|
|
|
|
|
_('ERROR Client read timeout (%ss)'), err.seconds)
|
|
|
|
|