Merge "Replace all "with Chunk*Timeout" by a watchdog"
This commit is contained in:
commit
7158adfb22
@ -62,6 +62,7 @@ import eventlet.patcher
|
|||||||
import eventlet.semaphore
|
import eventlet.semaphore
|
||||||
import pkg_resources
|
import pkg_resources
|
||||||
from eventlet import GreenPool, sleep, Timeout
|
from eventlet import GreenPool, sleep, Timeout
|
||||||
|
from eventlet.event import Event
|
||||||
from eventlet.green import socket, threading
|
from eventlet.green import socket, threading
|
||||||
import eventlet.hubs
|
import eventlet.hubs
|
||||||
import eventlet.queue
|
import eventlet.queue
|
||||||
@ -5847,3 +5848,132 @@ def systemd_notify(logger=None):
|
|||||||
except EnvironmentError:
|
except EnvironmentError:
|
||||||
if logger:
|
if logger:
|
||||||
logger.debug("Systemd notification failed", exc_info=True)
|
logger.debug("Systemd notification failed", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Watchdog(object):
|
||||||
|
"""
|
||||||
|
Implements a watchdog to efficiently manage concurrent timeouts.
|
||||||
|
|
||||||
|
Compared to eventlet.timeouts.Timeout, it reduces the number of context
|
||||||
|
switching in eventlet by avoiding to schedule actions (throw an Exception),
|
||||||
|
then unschedule them if the timeouts are cancelled.
|
||||||
|
|
||||||
|
1. at T+0, request timeout(10)
|
||||||
|
=> wathdog greenlet sleeps 10 seconds
|
||||||
|
2. at T+1, request timeout(15)
|
||||||
|
=> the timeout will expire after the current, no need to wake up the
|
||||||
|
watchdog greenlet
|
||||||
|
3. at T+2, request timeout(5)
|
||||||
|
=> the timeout will expire before the first timeout, wake up the
|
||||||
|
watchdog greenlet to calculate a new sleep period
|
||||||
|
4. at T+7, the 3rd timeout expires
|
||||||
|
=> the exception is raised, then the greenlet watchdog sleep(3) to
|
||||||
|
wake up for the 1st timeout expiration
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
# key => (timeout, timeout_at, caller_greenthread, exception)
|
||||||
|
self._timeouts = dict()
|
||||||
|
self._evt = Event()
|
||||||
|
self._next_expiration = None
|
||||||
|
self._run_gth = None
|
||||||
|
|
||||||
|
def start(self, timeout, exc, timeout_at=None):
|
||||||
|
"""
|
||||||
|
Schedule a timeout action
|
||||||
|
|
||||||
|
:param timeout: duration before the timeout expires
|
||||||
|
:param exc: exception to throw when the timeout expire, must inherit
|
||||||
|
from eventlet.timeouts.Timeout
|
||||||
|
:param timeout_at: allow to force the expiration timestamp
|
||||||
|
:return: id of the scheduled timeout, needed to cancel it
|
||||||
|
"""
|
||||||
|
if not timeout_at:
|
||||||
|
timeout_at = time.time() + timeout
|
||||||
|
gth = eventlet.greenthread.getcurrent()
|
||||||
|
timeout_definition = (timeout, timeout_at, gth, exc)
|
||||||
|
key = id(timeout_definition)
|
||||||
|
self._timeouts[key] = timeout_definition
|
||||||
|
|
||||||
|
# Wake up the watchdog loop only when there is a new shorter timeout
|
||||||
|
if (self._next_expiration is None
|
||||||
|
or self._next_expiration > timeout_at):
|
||||||
|
# There could be concurrency on .send(), so wrap it in a try
|
||||||
|
try:
|
||||||
|
if not self._evt.ready():
|
||||||
|
self._evt.send()
|
||||||
|
except AssertionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def stop(self, key):
|
||||||
|
"""
|
||||||
|
Cancel a scheduled timeout
|
||||||
|
|
||||||
|
:param key: timeout id, as returned by start()
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if key in self._timeouts:
|
||||||
|
del(self._timeouts[key])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def spawn(self):
|
||||||
|
"""
|
||||||
|
Start the watchdog greenthread.
|
||||||
|
"""
|
||||||
|
if self._run_gth is None:
|
||||||
|
self._run_gth = eventlet.spawn(self.run)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
self._run()
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
now = time.time()
|
||||||
|
self._next_expiration = None
|
||||||
|
if self._evt.ready():
|
||||||
|
self._evt.reset()
|
||||||
|
for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()):
|
||||||
|
if timeout_at <= now:
|
||||||
|
try:
|
||||||
|
if k in self._timeouts:
|
||||||
|
del(self._timeouts[k])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
e = exc()
|
||||||
|
e.seconds = timeout
|
||||||
|
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
|
||||||
|
else:
|
||||||
|
if (self._next_expiration is None
|
||||||
|
or self._next_expiration > timeout_at):
|
||||||
|
self._next_expiration = timeout_at
|
||||||
|
if self._next_expiration is None:
|
||||||
|
sleep_duration = self._next_expiration
|
||||||
|
else:
|
||||||
|
sleep_duration = self._next_expiration - now
|
||||||
|
self._evt.wait(sleep_duration)
|
||||||
|
|
||||||
|
|
||||||
|
class WatchdogTimeout(object):
|
||||||
|
"""
|
||||||
|
Context manager to schedule a timeout in a Watchdog instance
|
||||||
|
"""
|
||||||
|
def __init__(self, watchdog, timeout, exc, timeout_at=None):
|
||||||
|
"""
|
||||||
|
Schedule a timeout in a Watchdog instance
|
||||||
|
|
||||||
|
:param watchdog: Watchdog instance
|
||||||
|
:param timeout: duration before the timeout expires
|
||||||
|
:param exc: exception to throw when the timeout expire, must inherit
|
||||||
|
from eventlet.timeouts.Timeout
|
||||||
|
:param timeout_at: allow to force the expiration timestamp
|
||||||
|
"""
|
||||||
|
self.watchdog = watchdog
|
||||||
|
self.key = watchdog.start(timeout, exc, timeout_at=timeout_at)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
self.watchdog.stop(self.key)
|
||||||
|
@ -42,7 +42,7 @@ from eventlet.timeout import Timeout
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
||||||
from swift.common.utils import Timestamp, config_true_value, \
|
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
||||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||||
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
||||||
document_iters_to_http_response_body, ShardRange, find_shard_range
|
document_iters_to_http_response_body, ShardRange, find_shard_range
|
||||||
@ -1047,7 +1047,8 @@ class ResumingGetter(object):
|
|||||||
# but just a 200 or a single-range 206, then this
|
# but just a 200 or a single-range 206, then this
|
||||||
# performs no IO, and either just returns source or
|
# performs no IO, and either just returns source or
|
||||||
# raises StopIteration.
|
# raises StopIteration.
|
||||||
with ChunkReadTimeout(node_timeout):
|
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||||
|
ChunkReadTimeout):
|
||||||
# if StopIteration is raised, it escapes and is
|
# if StopIteration is raised, it escapes and is
|
||||||
# handled elsewhere
|
# handled elsewhere
|
||||||
start_byte, end_byte, length, headers, part = next(
|
start_byte, end_byte, length, headers, part = next(
|
||||||
@ -1078,7 +1079,8 @@ class ResumingGetter(object):
|
|||||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with ChunkReadTimeout(node_timeout):
|
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||||
|
ChunkReadTimeout):
|
||||||
chunk = part_file.read(self.app.object_chunk_size)
|
chunk = part_file.read(self.app.object_chunk_size)
|
||||||
nchunks += 1
|
nchunks += 1
|
||||||
# NB: this append must be *inside* the context
|
# NB: this append must be *inside* the context
|
||||||
@ -1138,8 +1140,9 @@ class ResumingGetter(object):
|
|||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
if buf:
|
if buf:
|
||||||
with ChunkWriteTimeout(
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
self.app.client_timeout):
|
self.app.client_timeout,
|
||||||
|
ChunkWriteTimeout):
|
||||||
self.bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(buf)
|
||||||
yield buf
|
yield buf
|
||||||
buf = b''
|
buf = b''
|
||||||
@ -1149,13 +1152,16 @@ class ResumingGetter(object):
|
|||||||
while len(buf) >= client_chunk_size:
|
while len(buf) >= client_chunk_size:
|
||||||
client_chunk = buf[:client_chunk_size]
|
client_chunk = buf[:client_chunk_size]
|
||||||
buf = buf[client_chunk_size:]
|
buf = buf[client_chunk_size:]
|
||||||
with ChunkWriteTimeout(
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
self.app.client_timeout):
|
self.app.client_timeout,
|
||||||
|
ChunkWriteTimeout):
|
||||||
self.bytes_used_from_backend += \
|
self.bytes_used_from_backend += \
|
||||||
len(client_chunk)
|
len(client_chunk)
|
||||||
yield client_chunk
|
yield client_chunk
|
||||||
else:
|
else:
|
||||||
with ChunkWriteTimeout(self.app.client_timeout):
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
|
self.app.client_timeout,
|
||||||
|
ChunkWriteTimeout):
|
||||||
self.bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(buf)
|
||||||
yield buf
|
yield buf
|
||||||
buf = b''
|
buf = b''
|
||||||
|
@ -44,7 +44,7 @@ from eventlet.timeout import Timeout
|
|||||||
|
|
||||||
from swift.common.utils import (
|
from swift.common.utils import (
|
||||||
clean_content_type, config_true_value, ContextPool, csv_append,
|
clean_content_type, config_true_value, ContextPool, csv_append,
|
||||||
GreenAsyncPile, GreenthreadSafeIterator, Timestamp,
|
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
|
||||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||||
document_iters_to_http_response_body, parse_content_range,
|
document_iters_to_http_response_body, parse_content_range,
|
||||||
quorum_size, reiterate, close_if_possible, safe_json_loads)
|
quorum_size, reiterate, close_if_possible, safe_json_loads)
|
||||||
@ -869,7 +869,7 @@ class ReplicatedObjectController(BaseObjectController):
|
|||||||
def _make_putter(self, node, part, req, headers):
|
def _make_putter(self, node, part, req, headers):
|
||||||
if req.environ.get('swift.callback.update_footers'):
|
if req.environ.get('swift.callback.update_footers'):
|
||||||
putter = MIMEPutter.connect(
|
putter = MIMEPutter.connect(
|
||||||
node, part, req.swift_entity_path, headers,
|
node, part, req.swift_entity_path, headers, self.app.watchdog,
|
||||||
conn_timeout=self.app.conn_timeout,
|
conn_timeout=self.app.conn_timeout,
|
||||||
node_timeout=self.app.node_timeout,
|
node_timeout=self.app.node_timeout,
|
||||||
write_timeout=self.app.node_timeout,
|
write_timeout=self.app.node_timeout,
|
||||||
@ -879,7 +879,7 @@ class ReplicatedObjectController(BaseObjectController):
|
|||||||
else:
|
else:
|
||||||
te = ',' + headers.get('Transfer-Encoding', '')
|
te = ',' + headers.get('Transfer-Encoding', '')
|
||||||
putter = Putter.connect(
|
putter = Putter.connect(
|
||||||
node, part, req.swift_entity_path, headers,
|
node, part, req.swift_entity_path, headers, self.app.watchdog,
|
||||||
conn_timeout=self.app.conn_timeout,
|
conn_timeout=self.app.conn_timeout,
|
||||||
node_timeout=self.app.node_timeout,
|
node_timeout=self.app.node_timeout,
|
||||||
write_timeout=self.app.node_timeout,
|
write_timeout=self.app.node_timeout,
|
||||||
@ -897,9 +897,10 @@ class ReplicatedObjectController(BaseObjectController):
|
|||||||
bytes_transferred = 0
|
bytes_transferred = 0
|
||||||
|
|
||||||
def send_chunk(chunk):
|
def send_chunk(chunk):
|
||||||
|
timeout_at = time.time() + self.app.node_timeout
|
||||||
for putter in list(putters):
|
for putter in list(putters):
|
||||||
if not putter.failed:
|
if not putter.failed:
|
||||||
putter.send_chunk(chunk)
|
putter.send_chunk(chunk, timeout_at=timeout_at)
|
||||||
else:
|
else:
|
||||||
putter.close()
|
putter.close()
|
||||||
putters.remove(putter)
|
putters.remove(putter)
|
||||||
@ -911,7 +912,9 @@ class ReplicatedObjectController(BaseObjectController):
|
|||||||
min_conns = quorum_size(len(nodes))
|
min_conns = quorum_size(len(nodes))
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
with ChunkReadTimeout(self.app.client_timeout):
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
|
self.app.client_timeout,
|
||||||
|
ChunkReadTimeout):
|
||||||
try:
|
try:
|
||||||
chunk = next(data_source)
|
chunk = next(data_source)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
@ -1569,13 +1572,14 @@ class Putter(object):
|
|||||||
:param resp: an HTTPResponse instance if connect() received final response
|
:param resp: an HTTPResponse instance if connect() received final response
|
||||||
:param path: the object path to send to the storage node
|
:param path: the object path to send to the storage node
|
||||||
:param connect_duration: time taken to initiate the HTTPConnection
|
:param connect_duration: time taken to initiate the HTTPConnection
|
||||||
|
:param watchdog: a spawned Watchdog instance that will enforce timeouts
|
||||||
:param write_timeout: time limit to write a chunk to the connection socket
|
:param write_timeout: time limit to write a chunk to the connection socket
|
||||||
:param send_exception_handler: callback called when an exception occured
|
:param send_exception_handler: callback called when an exception occured
|
||||||
writing to the connection socket
|
writing to the connection socket
|
||||||
:param logger: a Logger instance
|
:param logger: a Logger instance
|
||||||
:param chunked: boolean indicating if the request encoding is chunked
|
:param chunked: boolean indicating if the request encoding is chunked
|
||||||
"""
|
"""
|
||||||
def __init__(self, conn, node, resp, path, connect_duration,
|
def __init__(self, conn, node, resp, path, connect_duration, watchdog,
|
||||||
write_timeout, send_exception_handler, logger,
|
write_timeout, send_exception_handler, logger,
|
||||||
chunked=False):
|
chunked=False):
|
||||||
# Note: you probably want to call Putter.connect() instead of
|
# Note: you probably want to call Putter.connect() instead of
|
||||||
@ -1585,6 +1589,7 @@ class Putter(object):
|
|||||||
self.resp = self.final_resp = resp
|
self.resp = self.final_resp = resp
|
||||||
self.path = path
|
self.path = path
|
||||||
self.connect_duration = connect_duration
|
self.connect_duration = connect_duration
|
||||||
|
self.watchdog = watchdog
|
||||||
self.write_timeout = write_timeout
|
self.write_timeout = write_timeout
|
||||||
self.send_exception_handler = send_exception_handler
|
self.send_exception_handler = send_exception_handler
|
||||||
# for handoff nodes node_index is None
|
# for handoff nodes node_index is None
|
||||||
@ -1627,7 +1632,7 @@ class Putter(object):
|
|||||||
# Subclasses may implement custom behaviour
|
# Subclasses may implement custom behaviour
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def send_chunk(self, chunk):
|
def send_chunk(self, chunk, timeout_at=None):
|
||||||
if not chunk:
|
if not chunk:
|
||||||
# If we're not using chunked transfer-encoding, sending a 0-byte
|
# If we're not using chunked transfer-encoding, sending a 0-byte
|
||||||
# chunk is just wasteful. If we *are* using chunked
|
# chunk is just wasteful. If we *are* using chunked
|
||||||
@ -1641,7 +1646,7 @@ class Putter(object):
|
|||||||
self._start_object_data()
|
self._start_object_data()
|
||||||
self.state = SENDING_DATA
|
self.state = SENDING_DATA
|
||||||
|
|
||||||
self._send_chunk(chunk)
|
self._send_chunk(chunk, timeout_at=timeout_at)
|
||||||
|
|
||||||
def end_of_object_data(self, **kwargs):
|
def end_of_object_data(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
@ -1653,14 +1658,15 @@ class Putter(object):
|
|||||||
self._send_chunk(b'')
|
self._send_chunk(b'')
|
||||||
self.state = DATA_SENT
|
self.state = DATA_SENT
|
||||||
|
|
||||||
def _send_chunk(self, chunk):
|
def _send_chunk(self, chunk, timeout_at=None):
|
||||||
if not self.failed:
|
if not self.failed:
|
||||||
if self.chunked:
|
if self.chunked:
|
||||||
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
|
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
|
||||||
else:
|
else:
|
||||||
to_send = chunk
|
to_send = chunk
|
||||||
try:
|
try:
|
||||||
with ChunkWriteTimeout(self.write_timeout):
|
with WatchdogTimeout(self.watchdog, self.write_timeout,
|
||||||
|
ChunkWriteTimeout, timeout_at=timeout_at):
|
||||||
self.conn.send(to_send)
|
self.conn.send(to_send)
|
||||||
except (Exception, ChunkWriteTimeout):
|
except (Exception, ChunkWriteTimeout):
|
||||||
self.failed = True
|
self.failed = True
|
||||||
@ -1702,9 +1708,9 @@ class Putter(object):
|
|||||||
return conn, resp, final_resp, connect_duration
|
return conn, resp, final_resp, connect_duration
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
|
def connect(cls, node, part, path, headers, watchdog, conn_timeout,
|
||||||
write_timeout, send_exception_handler, logger=None,
|
node_timeout, write_timeout, send_exception_handler,
|
||||||
chunked=False, **kwargs):
|
logger=None, chunked=False, **kwargs):
|
||||||
"""
|
"""
|
||||||
Connect to a backend node and send the headers.
|
Connect to a backend node and send the headers.
|
||||||
|
|
||||||
@ -1717,7 +1723,7 @@ class Putter(object):
|
|||||||
"""
|
"""
|
||||||
conn, expect_resp, final_resp, connect_duration = cls._make_connection(
|
conn, expect_resp, final_resp, connect_duration = cls._make_connection(
|
||||||
node, part, path, headers, conn_timeout, node_timeout)
|
node, part, path, headers, conn_timeout, node_timeout)
|
||||||
return cls(conn, node, final_resp, path, connect_duration,
|
return cls(conn, node, final_resp, path, connect_duration, watchdog,
|
||||||
write_timeout, send_exception_handler, logger,
|
write_timeout, send_exception_handler, logger,
|
||||||
chunked=chunked)
|
chunked=chunked)
|
||||||
|
|
||||||
@ -1732,12 +1738,13 @@ class MIMEPutter(Putter):
|
|||||||
|
|
||||||
An HTTP PUT request that supports streaming.
|
An HTTP PUT request that supports streaming.
|
||||||
"""
|
"""
|
||||||
def __init__(self, conn, node, resp, req, connect_duration,
|
def __init__(self, conn, node, resp, req, connect_duration, watchdog,
|
||||||
write_timeout, send_exception_handler, logger, mime_boundary,
|
write_timeout, send_exception_handler, logger, mime_boundary,
|
||||||
multiphase=False):
|
multiphase=False):
|
||||||
super(MIMEPutter, self).__init__(conn, node, resp, req,
|
super(MIMEPutter, self).__init__(conn, node, resp, req,
|
||||||
connect_duration, write_timeout,
|
connect_duration, watchdog,
|
||||||
send_exception_handler, logger)
|
write_timeout, send_exception_handler,
|
||||||
|
logger)
|
||||||
# Note: you probably want to call MimePutter.connect() instead of
|
# Note: you probably want to call MimePutter.connect() instead of
|
||||||
# instantiating one of these directly.
|
# instantiating one of these directly.
|
||||||
self.chunked = True # MIME requests always send chunked body
|
self.chunked = True # MIME requests always send chunked body
|
||||||
@ -1815,9 +1822,9 @@ class MIMEPutter(Putter):
|
|||||||
self.state = COMMIT_SENT
|
self.state = COMMIT_SENT
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def connect(cls, node, part, req, headers, conn_timeout, node_timeout,
|
def connect(cls, node, part, req, headers, watchdog, conn_timeout,
|
||||||
write_timeout, send_exception_handler, logger=None,
|
node_timeout, write_timeout, send_exception_handler,
|
||||||
need_multiphase=True, **kwargs):
|
logger=None, need_multiphase=True, **kwargs):
|
||||||
"""
|
"""
|
||||||
Connect to a backend node and send the headers.
|
Connect to a backend node and send the headers.
|
||||||
|
|
||||||
@ -1869,7 +1876,7 @@ class MIMEPutter(Putter):
|
|||||||
if need_multiphase and not can_handle_multiphase_put:
|
if need_multiphase and not can_handle_multiphase_put:
|
||||||
raise MultiphasePUTNotSupported()
|
raise MultiphasePUTNotSupported()
|
||||||
|
|
||||||
return cls(conn, node, final_resp, req, connect_duration,
|
return cls(conn, node, final_resp, req, connect_duration, watchdog,
|
||||||
write_timeout, send_exception_handler, logger,
|
write_timeout, send_exception_handler, logger,
|
||||||
mime_boundary, multiphase=need_multiphase)
|
mime_boundary, multiphase=need_multiphase)
|
||||||
|
|
||||||
@ -2502,7 +2509,7 @@ class ECObjectController(BaseObjectController):
|
|||||||
|
|
||||||
def _make_putter(self, node, part, req, headers):
|
def _make_putter(self, node, part, req, headers):
|
||||||
return MIMEPutter.connect(
|
return MIMEPutter.connect(
|
||||||
node, part, req.swift_entity_path, headers,
|
node, part, req.swift_entity_path, headers, self.app.watchdog,
|
||||||
conn_timeout=self.app.conn_timeout,
|
conn_timeout=self.app.conn_timeout,
|
||||||
node_timeout=self.app.node_timeout,
|
node_timeout=self.app.node_timeout,
|
||||||
write_timeout=self.app.node_timeout,
|
write_timeout=self.app.node_timeout,
|
||||||
@ -2603,6 +2610,7 @@ class ECObjectController(BaseObjectController):
|
|||||||
return
|
return
|
||||||
|
|
||||||
updated_frag_indexes = set()
|
updated_frag_indexes = set()
|
||||||
|
timeout_at = time.time() + self.app.node_timeout
|
||||||
for putter in list(putters):
|
for putter in list(putters):
|
||||||
frag_index = putter_to_frag_index[putter]
|
frag_index = putter_to_frag_index[putter]
|
||||||
backend_chunk = backend_chunks[frag_index]
|
backend_chunk = backend_chunks[frag_index]
|
||||||
@ -2613,7 +2621,7 @@ class ECObjectController(BaseObjectController):
|
|||||||
if frag_index not in updated_frag_indexes:
|
if frag_index not in updated_frag_indexes:
|
||||||
frag_hashers[frag_index].update(backend_chunk)
|
frag_hashers[frag_index].update(backend_chunk)
|
||||||
updated_frag_indexes.add(frag_index)
|
updated_frag_indexes.add(frag_index)
|
||||||
putter.send_chunk(backend_chunk)
|
putter.send_chunk(backend_chunk, timeout_at=timeout_at)
|
||||||
else:
|
else:
|
||||||
putter.close()
|
putter.close()
|
||||||
putters.remove(putter)
|
putters.remove(putter)
|
||||||
@ -2629,7 +2637,9 @@ class ECObjectController(BaseObjectController):
|
|||||||
putters, policy)
|
putters, policy)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
with ChunkReadTimeout(self.app.client_timeout):
|
with WatchdogTimeout(self.app.watchdog,
|
||||||
|
self.app.client_timeout,
|
||||||
|
ChunkReadTimeout):
|
||||||
try:
|
try:
|
||||||
chunk = next(data_source)
|
chunk = next(data_source)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
|
@ -32,7 +32,7 @@ from swift.common import constraints
|
|||||||
from swift.common.http import is_server_error
|
from swift.common.http import is_server_error
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import cache_from_env, get_logger, \
|
from swift.common.utils import Watchdog, cache_from_env, get_logger, \
|
||||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||||
register_swift_info, readconf, config_auto_int_value
|
register_swift_info, readconf, config_auto_int_value
|
||||||
@ -317,6 +317,8 @@ class Application(object):
|
|||||||
allow_account_management=self.allow_account_management,
|
allow_account_management=self.allow_account_management,
|
||||||
account_autocreate=self.account_autocreate,
|
account_autocreate=self.account_autocreate,
|
||||||
**constraints.EFFECTIVE_CONSTRAINTS)
|
**constraints.EFFECTIVE_CONSTRAINTS)
|
||||||
|
self.watchdog = Watchdog()
|
||||||
|
self.watchdog.spawn()
|
||||||
|
|
||||||
def _make_policy_override(self, policy, conf, override_conf):
|
def _make_policy_override(self, policy, conf, override_conf):
|
||||||
label_for_policy = _label_for_policy(policy)
|
label_for_policy = _label_for_policy(policy)
|
||||||
|
@ -8381,3 +8381,86 @@ class Test_LibcWrapper(unittest.TestCase):
|
|||||||
# 0 is SEEK_SET
|
# 0 is SEEK_SET
|
||||||
0)
|
0)
|
||||||
self.assertEqual(tf.read(100), b"defgh")
|
self.assertEqual(tf.read(100), b"defgh")
|
||||||
|
|
||||||
|
|
||||||
|
class TestWatchdog(unittest.TestCase):
|
||||||
|
def test_start_stop(self):
|
||||||
|
w = utils.Watchdog()
|
||||||
|
w._evt.send = mock.Mock(side_effect=w._evt.send)
|
||||||
|
gth = object()
|
||||||
|
|
||||||
|
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
|
||||||
|
patch('time.time', return_value=10.0):
|
||||||
|
# On first call, _next_expiration is None, it should unblock
|
||||||
|
# greenthread that is blocked for ever
|
||||||
|
key = w.start(1.0, Timeout)
|
||||||
|
self.assertIn(key, w._timeouts)
|
||||||
|
self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout))
|
||||||
|
w._evt.send.assert_called_once()
|
||||||
|
|
||||||
|
w.stop(key)
|
||||||
|
self.assertNotIn(key, w._timeouts)
|
||||||
|
|
||||||
|
def test_timeout_concurrency(self):
|
||||||
|
w = utils.Watchdog()
|
||||||
|
w._evt.send = mock.Mock(side_effect=w._evt.send)
|
||||||
|
w._evt.wait = mock.Mock()
|
||||||
|
gth = object()
|
||||||
|
|
||||||
|
w._run()
|
||||||
|
w._evt.wait.assert_called_once_with(None)
|
||||||
|
|
||||||
|
with patch('eventlet.greenthread.getcurrent', return_value=gth):
|
||||||
|
w._evt.send.reset_mock()
|
||||||
|
w._evt.wait.reset_mock()
|
||||||
|
with patch('time.time', return_value=10.00):
|
||||||
|
# On first call, _next_expiration is None, it should unblock
|
||||||
|
# greenthread that is blocked for ever
|
||||||
|
w.start(5.0, Timeout) # Will end at 15.0
|
||||||
|
w._evt.send.assert_called_once()
|
||||||
|
|
||||||
|
with patch('time.time', return_value=10.01):
|
||||||
|
w._run()
|
||||||
|
self.assertEqual(15.0, w._next_expiration)
|
||||||
|
w._evt.wait.assert_called_once_with(15.0 - 10.01)
|
||||||
|
|
||||||
|
w._evt.send.reset_mock()
|
||||||
|
w._evt.wait.reset_mock()
|
||||||
|
with patch('time.time', return_value=12.00):
|
||||||
|
# Now _next_expiration is 15.0, it won't unblock greenthread
|
||||||
|
# because this expiration is later
|
||||||
|
w.start(5.0, Timeout) # Will end at 17.0
|
||||||
|
w._evt.send.assert_not_called()
|
||||||
|
|
||||||
|
w._evt.send.reset_mock()
|
||||||
|
w._evt.wait.reset_mock()
|
||||||
|
with patch('time.time', return_value=14.00):
|
||||||
|
# Now _next_expiration is still 15.0, it will unblock
|
||||||
|
# greenthread because this new expiration is 14.5
|
||||||
|
w.start(0.5, Timeout) # Will end at 14.5
|
||||||
|
w._evt.send.assert_called_once()
|
||||||
|
|
||||||
|
with patch('time.time', return_value=14.01):
|
||||||
|
w._run()
|
||||||
|
w._evt.wait.assert_called_once_with(14.5 - 14.01)
|
||||||
|
self.assertEqual(14.5, w._next_expiration)
|
||||||
|
# Should wakeup at 14.5
|
||||||
|
|
||||||
|
def test_timeout_expire(self):
|
||||||
|
w = utils.Watchdog()
|
||||||
|
w._evt.send = mock.Mock() # To avoid it to call get_hub()
|
||||||
|
w._evt.wait = mock.Mock() # To avoid it to call get_hub()
|
||||||
|
|
||||||
|
with patch('eventlet.hubs.get_hub') as m_gh:
|
||||||
|
with patch('time.time', return_value=10.0):
|
||||||
|
w.start(5.0, Timeout) # Will end at 15.0
|
||||||
|
|
||||||
|
with patch('time.time', return_value=16.0):
|
||||||
|
w._run()
|
||||||
|
m_gh.assert_called_once()
|
||||||
|
m_gh.return_value.schedule_call_global.assert_called_once()
|
||||||
|
exc = m_gh.return_value.schedule_call_global.call_args[0][2]
|
||||||
|
self.assertIsInstance(exc, Timeout)
|
||||||
|
self.assertEqual(exc.seconds, 5.0)
|
||||||
|
self.assertEqual(None, w._next_expiration)
|
||||||
|
w._evt.wait.assert_called_once_with(None)
|
||||||
|
@ -67,7 +67,7 @@ from swift.common.middleware import proxy_logging, versioned_writes, \
|
|||||||
copy, listing_formats
|
copy, listing_formats
|
||||||
from swift.common.middleware.acl import parse_acl, format_acl
|
from swift.common.middleware.acl import parse_acl, format_acl
|
||||||
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
|
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
|
||||||
APIVersionError, ChunkWriteTimeout, ChunkReadError
|
APIVersionError, ChunkReadError
|
||||||
from swift.common import utils, constraints
|
from swift.common import utils, constraints
|
||||||
from swift.common.utils import hash_path, storage_directory, \
|
from swift.common.utils import hash_path, storage_directory, \
|
||||||
parse_content_type, parse_mime_headers, \
|
parse_content_type, parse_mime_headers, \
|
||||||
@ -7302,7 +7302,7 @@ class BaseTestECObjectController(BaseTestObjectController):
|
|||||||
exp = b'HTTP/1.1 201'
|
exp = b'HTTP/1.1 201'
|
||||||
self.assertEqual(headers[:len(exp)], exp)
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
|
|
||||||
class WrappedTimeout(ChunkWriteTimeout):
|
class WrappedTimeout(utils.WatchdogTimeout):
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
timeouts[self] = traceback.extract_stack()
|
timeouts[self] = traceback.extract_stack()
|
||||||
return super(WrappedTimeout, self).__enter__()
|
return super(WrappedTimeout, self).__enter__()
|
||||||
@ -7312,7 +7312,7 @@ class BaseTestECObjectController(BaseTestObjectController):
|
|||||||
return super(WrappedTimeout, self).__exit__(typ, value, tb)
|
return super(WrappedTimeout, self).__exit__(typ, value, tb)
|
||||||
|
|
||||||
timeouts = {}
|
timeouts = {}
|
||||||
with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout',
|
with mock.patch('swift.proxy.controllers.base.WatchdogTimeout',
|
||||||
WrappedTimeout):
|
WrappedTimeout):
|
||||||
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
|
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
|
||||||
# get object
|
# get object
|
||||||
|
Loading…
x
Reference in New Issue
Block a user