Replace all "with Chunk*Timeout" by a watchdog

The contextmanager eventlet.timeout.Timeout is scheduling a call to
throw an exception every time is is entered. The swift-proxy uses
Chunk(Read|Write)Timeout for every chunk read/written from the client or
object-server. For a single upload/download of a big object, it means
tens of thousands of scheduling in eventlet, which is very costly.

This patch replace the usage of these context managers by a watchdog
greenthread that will schedule itself by sleeping until the next timeout
expiration. Then, only if a timeout expired, it will schedule a call to
throw the appropriate exception.

The gain on bandwidth and CPU usage is significant. On a benchmark
environment, it gave this result for an upload of 6 Gbpson a replica
policy (average of 3 runs):
    master: 5.66 Gbps / 849 jiffies consumed by the proxy-server
    this patch: 7.56 Gbps / 618 jiffies consumed by the proxy-server

Change-Id: I19fd42908be5a6ac5905ba193967cd860cb27a0b
This commit is contained in:
Romain LE DISEZ 2019-11-04 14:47:35 +08:00
parent b4b0ebd4aa
commit 8378a11d11
6 changed files with 267 additions and 36 deletions

View File

@ -62,6 +62,7 @@ import eventlet.patcher
import eventlet.semaphore
import pkg_resources
from eventlet import GreenPool, sleep, Timeout
from eventlet.event import Event
from eventlet.green import socket, threading
import eventlet.hubs
import eventlet.queue
@ -5847,3 +5848,132 @@ def systemd_notify(logger=None):
except EnvironmentError:
if logger:
logger.debug("Systemd notification failed", exc_info=True)
class Watchdog(object):
"""
Implements a watchdog to efficiently manage concurrent timeouts.
Compared to eventlet.timeouts.Timeout, it reduces the number of context
switching in eventlet by avoiding to schedule actions (throw an Exception),
then unschedule them if the timeouts are cancelled.
1. at T+0, request timeout(10)
=> wathdog greenlet sleeps 10 seconds
2. at T+1, request timeout(15)
=> the timeout will expire after the current, no need to wake up the
watchdog greenlet
3. at T+2, request timeout(5)
=> the timeout will expire before the first timeout, wake up the
watchdog greenlet to calculate a new sleep period
4. at T+7, the 3rd timeout expires
=> the exception is raised, then the greenlet watchdog sleep(3) to
wake up for the 1st timeout expiration
"""
def __init__(self):
# key => (timeout, timeout_at, caller_greenthread, exception)
self._timeouts = dict()
self._evt = Event()
self._next_expiration = None
self._run_gth = None
def start(self, timeout, exc, timeout_at=None):
"""
Schedule a timeout action
:param timeout: duration before the timeout expires
:param exc: exception to throw when the timeout expire, must inherit
from eventlet.timeouts.Timeout
:param timeout_at: allow to force the expiration timestamp
:return: id of the scheduled timeout, needed to cancel it
"""
if not timeout_at:
timeout_at = time.time() + timeout
gth = eventlet.greenthread.getcurrent()
timeout_definition = (timeout, timeout_at, gth, exc)
key = id(timeout_definition)
self._timeouts[key] = timeout_definition
# Wake up the watchdog loop only when there is a new shorter timeout
if (self._next_expiration is None
or self._next_expiration > timeout_at):
# There could be concurrency on .send(), so wrap it in a try
try:
if not self._evt.ready():
self._evt.send()
except AssertionError:
pass
return key
def stop(self, key):
"""
Cancel a scheduled timeout
:param key: timeout id, as returned by start()
"""
try:
if key in self._timeouts:
del(self._timeouts[key])
except KeyError:
pass
def spawn(self):
"""
Start the watchdog greenthread.
"""
if self._run_gth is None:
self._run_gth = eventlet.spawn(self.run)
def run(self):
while True:
self._run()
def _run(self):
now = time.time()
self._next_expiration = None
if self._evt.ready():
self._evt.reset()
for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()):
if timeout_at <= now:
try:
if k in self._timeouts:
del(self._timeouts[k])
except KeyError:
pass
e = exc()
e.seconds = timeout
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
else:
if (self._next_expiration is None
or self._next_expiration > timeout_at):
self._next_expiration = timeout_at
if self._next_expiration is None:
sleep_duration = self._next_expiration
else:
sleep_duration = self._next_expiration - now
self._evt.wait(sleep_duration)
class WatchdogTimeout(object):
"""
Context manager to schedule a timeout in a Watchdog instance
"""
def __init__(self, watchdog, timeout, exc, timeout_at=None):
"""
Schedule a timeout in a Watchdog instance
:param watchdog: Watchdog instance
:param timeout: duration before the timeout expires
:param exc: exception to throw when the timeout expire, must inherit
from eventlet.timeouts.Timeout
:param timeout_at: allow to force the expiration timestamp
"""
self.watchdog = watchdog
self.key = watchdog.start(timeout, exc, timeout_at=timeout_at)
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
self.watchdog.stop(self.key)

View File

@ -42,7 +42,7 @@ from eventlet.timeout import Timeout
import six
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, config_true_value, \
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, find_shard_range
@ -1047,7 +1047,8 @@ class ResumingGetter(object):
# but just a 200 or a single-range 206, then this
# performs no IO, and either just returns source or
# raises StopIteration.
with ChunkReadTimeout(node_timeout):
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
# if StopIteration is raised, it escapes and is
# handled elsewhere
start_byte, end_byte, length, headers, part = next(
@ -1078,7 +1079,8 @@ class ResumingGetter(object):
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
try:
with ChunkReadTimeout(node_timeout):
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
@ -1138,8 +1140,9 @@ class ResumingGetter(object):
if not chunk:
if buf:
with ChunkWriteTimeout(
self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
@ -1149,13 +1152,16 @@ class ResumingGetter(object):
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with ChunkWriteTimeout(
self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += \
len(client_chunk)
yield client_chunk
else:
with ChunkWriteTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''

View File

@ -44,7 +44,7 @@ from eventlet.timeout import Timeout
from swift.common.utils import (
clean_content_type, config_true_value, ContextPool, csv_append,
GreenAsyncPile, GreenthreadSafeIterator, Timestamp,
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads)
@ -869,7 +869,7 @@ class ReplicatedObjectController(BaseObjectController):
def _make_putter(self, node, part, req, headers):
if req.environ.get('swift.callback.update_footers'):
putter = MIMEPutter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -879,7 +879,7 @@ class ReplicatedObjectController(BaseObjectController):
else:
te = ',' + headers.get('Transfer-Encoding', '')
putter = Putter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -897,9 +897,10 @@ class ReplicatedObjectController(BaseObjectController):
bytes_transferred = 0
def send_chunk(chunk):
timeout_at = time.time() + self.app.node_timeout
for putter in list(putters):
if not putter.failed:
putter.send_chunk(chunk)
putter.send_chunk(chunk, timeout_at=timeout_at)
else:
putter.close()
putters.remove(putter)
@ -911,7 +912,9 @@ class ReplicatedObjectController(BaseObjectController):
min_conns = quorum_size(len(nodes))
try:
while True:
with ChunkReadTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkReadTimeout):
try:
chunk = next(data_source)
except StopIteration:
@ -1569,13 +1572,14 @@ class Putter(object):
:param resp: an HTTPResponse instance if connect() received final response
:param path: the object path to send to the storage node
:param connect_duration: time taken to initiate the HTTPConnection
:param watchdog: a spawned Watchdog instance that will enforce timeouts
:param write_timeout: time limit to write a chunk to the connection socket
:param send_exception_handler: callback called when an exception occured
writing to the connection socket
:param logger: a Logger instance
:param chunked: boolean indicating if the request encoding is chunked
"""
def __init__(self, conn, node, resp, path, connect_duration,
def __init__(self, conn, node, resp, path, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
chunked=False):
# Note: you probably want to call Putter.connect() instead of
@ -1585,6 +1589,7 @@ class Putter(object):
self.resp = self.final_resp = resp
self.path = path
self.connect_duration = connect_duration
self.watchdog = watchdog
self.write_timeout = write_timeout
self.send_exception_handler = send_exception_handler
# for handoff nodes node_index is None
@ -1627,7 +1632,7 @@ class Putter(object):
# Subclasses may implement custom behaviour
pass
def send_chunk(self, chunk):
def send_chunk(self, chunk, timeout_at=None):
if not chunk:
# If we're not using chunked transfer-encoding, sending a 0-byte
# chunk is just wasteful. If we *are* using chunked
@ -1641,7 +1646,7 @@ class Putter(object):
self._start_object_data()
self.state = SENDING_DATA
self._send_chunk(chunk)
self._send_chunk(chunk, timeout_at=timeout_at)
def end_of_object_data(self, **kwargs):
"""
@ -1653,14 +1658,15 @@ class Putter(object):
self._send_chunk(b'')
self.state = DATA_SENT
def _send_chunk(self, chunk):
def _send_chunk(self, chunk, timeout_at=None):
if not self.failed:
if self.chunked:
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
else:
to_send = chunk
try:
with ChunkWriteTimeout(self.write_timeout):
with WatchdogTimeout(self.watchdog, self.write_timeout,
ChunkWriteTimeout, timeout_at=timeout_at):
self.conn.send(to_send)
except (Exception, ChunkWriteTimeout):
self.failed = True
@ -1702,9 +1708,9 @@ class Putter(object):
return conn, resp, final_resp, connect_duration
@classmethod
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
write_timeout, send_exception_handler, logger=None,
chunked=False, **kwargs):
def connect(cls, node, part, path, headers, watchdog, conn_timeout,
node_timeout, write_timeout, send_exception_handler,
logger=None, chunked=False, **kwargs):
"""
Connect to a backend node and send the headers.
@ -1717,7 +1723,7 @@ class Putter(object):
"""
conn, expect_resp, final_resp, connect_duration = cls._make_connection(
node, part, path, headers, conn_timeout, node_timeout)
return cls(conn, node, final_resp, path, connect_duration,
return cls(conn, node, final_resp, path, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
chunked=chunked)
@ -1732,12 +1738,13 @@ class MIMEPutter(Putter):
An HTTP PUT request that supports streaming.
"""
def __init__(self, conn, node, resp, req, connect_duration,
def __init__(self, conn, node, resp, req, connect_duration, watchdog,
write_timeout, send_exception_handler, logger, mime_boundary,
multiphase=False):
super(MIMEPutter, self).__init__(conn, node, resp, req,
connect_duration, write_timeout,
send_exception_handler, logger)
connect_duration, watchdog,
write_timeout, send_exception_handler,
logger)
# Note: you probably want to call MimePutter.connect() instead of
# instantiating one of these directly.
self.chunked = True # MIME requests always send chunked body
@ -1815,9 +1822,9 @@ class MIMEPutter(Putter):
self.state = COMMIT_SENT
@classmethod
def connect(cls, node, part, req, headers, conn_timeout, node_timeout,
write_timeout, send_exception_handler, logger=None,
need_multiphase=True, **kwargs):
def connect(cls, node, part, req, headers, watchdog, conn_timeout,
node_timeout, write_timeout, send_exception_handler,
logger=None, need_multiphase=True, **kwargs):
"""
Connect to a backend node and send the headers.
@ -1869,7 +1876,7 @@ class MIMEPutter(Putter):
if need_multiphase and not can_handle_multiphase_put:
raise MultiphasePUTNotSupported()
return cls(conn, node, final_resp, req, connect_duration,
return cls(conn, node, final_resp, req, connect_duration, watchdog,
write_timeout, send_exception_handler, logger,
mime_boundary, multiphase=need_multiphase)
@ -2502,7 +2509,7 @@ class ECObjectController(BaseObjectController):
def _make_putter(self, node, part, req, headers):
return MIMEPutter.connect(
node, part, req.swift_entity_path, headers,
node, part, req.swift_entity_path, headers, self.app.watchdog,
conn_timeout=self.app.conn_timeout,
node_timeout=self.app.node_timeout,
write_timeout=self.app.node_timeout,
@ -2603,6 +2610,7 @@ class ECObjectController(BaseObjectController):
return
updated_frag_indexes = set()
timeout_at = time.time() + self.app.node_timeout
for putter in list(putters):
frag_index = putter_to_frag_index[putter]
backend_chunk = backend_chunks[frag_index]
@ -2613,7 +2621,7 @@ class ECObjectController(BaseObjectController):
if frag_index not in updated_frag_indexes:
frag_hashers[frag_index].update(backend_chunk)
updated_frag_indexes.add(frag_index)
putter.send_chunk(backend_chunk)
putter.send_chunk(backend_chunk, timeout_at=timeout_at)
else:
putter.close()
putters.remove(putter)
@ -2629,7 +2637,9 @@ class ECObjectController(BaseObjectController):
putters, policy)
while True:
with ChunkReadTimeout(self.app.client_timeout):
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkReadTimeout):
try:
chunk = next(data_source)
except StopIteration:

View File

@ -32,7 +32,7 @@ from swift.common import constraints
from swift.common.http import is_server_error
from swift.common.storage_policy import POLICIES
from swift.common.ring import Ring
from swift.common.utils import cache_from_env, get_logger, \
from swift.common.utils import Watchdog, cache_from_env, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info, readconf, config_auto_int_value
@ -317,6 +317,8 @@ class Application(object):
allow_account_management=self.allow_account_management,
account_autocreate=self.account_autocreate,
**constraints.EFFECTIVE_CONSTRAINTS)
self.watchdog = Watchdog()
self.watchdog.spawn()
def _make_policy_override(self, policy, conf, override_conf):
label_for_policy = _label_for_policy(policy)

View File

@ -8381,3 +8381,86 @@ class Test_LibcWrapper(unittest.TestCase):
# 0 is SEEK_SET
0)
self.assertEqual(tf.read(100), b"defgh")
class TestWatchdog(unittest.TestCase):
def test_start_stop(self):
w = utils.Watchdog()
w._evt.send = mock.Mock(side_effect=w._evt.send)
gth = object()
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
patch('time.time', return_value=10.0):
# On first call, _next_expiration is None, it should unblock
# greenthread that is blocked for ever
key = w.start(1.0, Timeout)
self.assertIn(key, w._timeouts)
self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout))
w._evt.send.assert_called_once()
w.stop(key)
self.assertNotIn(key, w._timeouts)
def test_timeout_concurrency(self):
w = utils.Watchdog()
w._evt.send = mock.Mock(side_effect=w._evt.send)
w._evt.wait = mock.Mock()
gth = object()
w._run()
w._evt.wait.assert_called_once_with(None)
with patch('eventlet.greenthread.getcurrent', return_value=gth):
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=10.00):
# On first call, _next_expiration is None, it should unblock
# greenthread that is blocked for ever
w.start(5.0, Timeout) # Will end at 15.0
w._evt.send.assert_called_once()
with patch('time.time', return_value=10.01):
w._run()
self.assertEqual(15.0, w._next_expiration)
w._evt.wait.assert_called_once_with(15.0 - 10.01)
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=12.00):
# Now _next_expiration is 15.0, it won't unblock greenthread
# because this expiration is later
w.start(5.0, Timeout) # Will end at 17.0
w._evt.send.assert_not_called()
w._evt.send.reset_mock()
w._evt.wait.reset_mock()
with patch('time.time', return_value=14.00):
# Now _next_expiration is still 15.0, it will unblock
# greenthread because this new expiration is 14.5
w.start(0.5, Timeout) # Will end at 14.5
w._evt.send.assert_called_once()
with patch('time.time', return_value=14.01):
w._run()
w._evt.wait.assert_called_once_with(14.5 - 14.01)
self.assertEqual(14.5, w._next_expiration)
# Should wakeup at 14.5
def test_timeout_expire(self):
w = utils.Watchdog()
w._evt.send = mock.Mock() # To avoid it to call get_hub()
w._evt.wait = mock.Mock() # To avoid it to call get_hub()
with patch('eventlet.hubs.get_hub') as m_gh:
with patch('time.time', return_value=10.0):
w.start(5.0, Timeout) # Will end at 15.0
with patch('time.time', return_value=16.0):
w._run()
m_gh.assert_called_once()
m_gh.return_value.schedule_call_global.assert_called_once()
exc = m_gh.return_value.schedule_call_global.call_args[0][2]
self.assertIsInstance(exc, Timeout)
self.assertEqual(exc.seconds, 5.0)
self.assertEqual(None, w._next_expiration)
w._evt.wait.assert_called_once_with(None)

View File

@ -67,7 +67,7 @@ from swift.common.middleware import proxy_logging, versioned_writes, \
copy, listing_formats
from swift.common.middleware.acl import parse_acl, format_acl
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
APIVersionError, ChunkWriteTimeout, ChunkReadError
APIVersionError, ChunkReadError
from swift.common import utils, constraints
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, \
@ -7302,7 +7302,7 @@ class BaseTestECObjectController(BaseTestObjectController):
exp = b'HTTP/1.1 201'
self.assertEqual(headers[:len(exp)], exp)
class WrappedTimeout(ChunkWriteTimeout):
class WrappedTimeout(utils.WatchdogTimeout):
def __enter__(self):
timeouts[self] = traceback.extract_stack()
return super(WrappedTimeout, self).__enter__()
@ -7312,7 +7312,7 @@ class BaseTestECObjectController(BaseTestObjectController):
return super(WrappedTimeout, self).__exit__(typ, value, tb)
timeouts = {}
with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout',
with mock.patch('swift.proxy.controllers.base.WatchdogTimeout',
WrappedTimeout):
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
# get object