Detect lost agents

If during test execution some agent is not replied in expected time
it is marked as lost.

Change-Id: Idce887480518deb80f7722dac721991648e850f0
This commit is contained in:
Ilya Shakhat 2015-03-30 19:35:09 +03:00
parent aa9716f3e0
commit ff11cd9e04
12 changed files with 303 additions and 43 deletions

View File

@ -1,4 +1,5 @@
usage: shaker [-h] [--config-dir DIR] [--config-file PATH] [--debug]
usage: shaker [-h] [--agent-loss-timeout AGENT_LOSS_TIMEOUT]
[--config-dir DIR] [--config-file PATH] [--debug]
[--external-net EXTERNAL_NET] [--flavor-name FLAVOR_NAME]
[--image-name IMAGE_NAME] [--log-config-append PATH]
[--log-date-format DATE_FORMAT] [--log-dir LOG_DIR]
@ -16,6 +17,8 @@ usage: shaker [-h] [--config-dir DIR] [--config-file PATH] [--debug]
optional arguments:
-h, --help show this help message and exit
--agent-loss-timeout AGENT_LOSS_TIMEOUT
Timeout to treat agent as lost
--config-dir DIR Path to a config directory to pull *.conf files from.
This file set is sorted, so as to provide a
predictable parse order if individual options are

View File

@ -67,7 +67,7 @@
#logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# List of logger=LEVEL pairs. (list value)
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN
# Enables or disables publication of error events. (boolean value)
#publish_errors = false
@ -129,6 +129,9 @@
# value)
#output = <None>
# Timeout to treat agent as lost (integer value)
#agent_loss_timeout = 60
# Report template in Jinja format (string value)
#report_template = shaker/resources/report_template.jinja2

View File

@ -100,9 +100,9 @@ class TrafficAggregator(base.BaseAggregator):
def agent_summary(self, agent_data):
# convert bps to Mbps
for idx, item_meta in enumerate(agent_data['meta']):
for idx, item_meta in enumerate(agent_data.get('meta', [])):
if item_meta[1] == 'bps':
for row in agent_data['samples']:
for row in agent_data.get('samples'):
if row[idx]:
row[idx] = float(row[idx]) / 1024 / 1024
item_meta[1] = 'Mbps'
@ -111,8 +111,8 @@ class TrafficAggregator(base.BaseAggregator):
agent_data['stats'] = dict()
agent_data['chart'] = []
for idx, item_meta in enumerate(agent_data['meta']):
column = [row[idx] for row in agent_data['samples']]
for idx, item_meta in enumerate(agent_data.get('meta', [])):
column = [row[idx] for row in agent_data.get('samples')]
item_title = item_meta[0]
if item_title != 'time':
@ -125,4 +125,5 @@ class TrafficAggregator(base.BaseAggregator):
agent_data['chart'].append([item_title] + column)
# drop stdout
del agent_data['stdout']
if 'stdout' in agent_data:
del agent_data['stdout']

View File

@ -79,6 +79,9 @@ SERVER_OPTS = [
default=utils.env('SHAKER_OUTPUT'),
help='File for output in JSON format, '
'defaults to env[SHAKER_OUTPUT].'),
cfg.IntOpt('agent-loss-timeout',
default=60,
help='Timeout to treat agent as lost in seconds')
]
REPORT_OPTS = [

View File

@ -46,6 +46,13 @@ class BaseExecutor(object):
self.test_definition = test_definition
self.agent = agent
def get_expected_duration(self):
"""Get the expected duration of executor's run
The value is used by Quorum to calculate expected time of reply.
:return: time in seconds
"""
return 0
def get_command(self):
return None
@ -56,3 +63,7 @@ class BaseExecutor(object):
stderr=message.get('stderr'),
command=self.get_command(),
agent=self.agent)
def process_failure(self):
return dict(command=self.get_command(),
agent=self.agent)

View File

@ -19,6 +19,9 @@ from shaker.engine.executors import base
class IperfExecutor(base.BaseExecutor):
def get_expected_duration(self):
return self.test_definition.get('time') or 60
def get_command(self):
cmd = base.CommandLine('sudo nice -n -20 iperf')
cmd.add('--client', self.agent['slave']['ip'])
@ -31,7 +34,7 @@ class IperfExecutor(base.BaseExecutor):
cmd.add('--udp')
if self.test_definition.get('bandwidth'):
cmd.add('--bandwidth', self.test_definition.get('bandwidth'))
cmd.add('--time', self.test_definition.get('time') or 60)
cmd.add('--time', self.get_expected_duration())
cmd.add('--parallel', self.test_definition.get('threads') or 1)
if self.test_definition.get('csv'):
cmd.add('--reportstyle', 'C')

View File

@ -19,19 +19,25 @@ from shaker.engine.executors import base
class NetperfExecutor(base.BaseExecutor):
def get_expected_duration(self):
return self.test_definition.get('time') or 60
def get_command(self):
cmd = base.CommandLine('netperf')
cmd.add('-H', self.agent['slave']['ip'])
cmd.add('-l', self.test_definition.get('time') or 60)
cmd.add('-l', self.get_expected_duration())
cmd.add('-t', self.test_definition.get('method') or 'TCP_STREAM')
return cmd.make()
class NetperfWrapperExecutor(base.BaseExecutor):
def get_expected_duration(self):
return self.test_definition.get('time') or 60
def get_command(self):
cmd = base.CommandLine('netperf-wrapper')
cmd.add('-H', self.agent['slave']['ip'])
cmd.add('-l', self.test_definition.get('time') or 60)
cmd.add('-l', self.get_expected_duration())
cmd.add('-s', self.test_definition.get('interval') or 1)
cmd.add('-f', 'csv')
cmd.add(self.test_definition.get('method') or 'tcp_download')

View File

@ -35,9 +35,10 @@ LOG = logging.getLogger(__name__)
class Quorum(object):
def __init__(self, message_queue, polling_interval):
def __init__(self, message_queue, polling_interval, agent_loss_timeout):
self.message_queue = message_queue
self.polling_interval = polling_interval
self.agent_loss_timeout = agent_loss_timeout
def wait_join(self, agent_ids):
agent_ids = set(agent_ids)
@ -62,43 +63,61 @@ class Quorum(object):
break
def run_test_case(self, test_case):
working_agents = set()
replied_agents = set()
current = set(test_case.keys())
LOG.debug('Running test case on agents: %s', current)
working = set()
replied = set()
result = {}
start_at = int(time.time()) + self.polling_interval * 2
start_at = time.time() + self.polling_interval * 2
lives = dict((agent_id, start_at) for agent_id in current)
for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id')
operation = message.get('operation')
now = time.time()
lives[agent_id] = (now + self.polling_interval * 2 +
self.agent_loss_timeout)
reply = {'operation': 'none'}
if agent_id not in test_case:
reply_handler(reply)
continue
if agent_id in current:
# message from a known agent
test = test_case[agent_id]
# message from a known agent
test = test_case[agent_id]
if operation == 'poll':
reply = {
'operation': 'execute',
'start_at': start_at,
'command': test.get_command(),
}
working_agents.add(agent_id)
elif operation == 'reply':
replied_agents.add(agent_id)
result[agent_id] = test.process_reply(message)
if operation == 'poll':
reply = {
'operation': 'execute',
'start_at': start_at,
'command': test.get_command(),
}
working.add(agent_id)
lives[agent_id] += test.get_expected_duration()
LOG.debug('Working agents: %s', working)
elif operation == 'reply':
replied.add(agent_id)
result[agent_id] = test.process_reply(message)
result[agent_id].update(dict(status='ok', time=now))
LOG.debug('Replied agents: %s', replied)
reply_handler(reply)
LOG.debug('Working agents: %s', working_agents)
LOG.debug('Replied agents: %s', replied_agents)
lost = set(a for a, t in lives.items() if t < now)
if lost:
LOG.debug('Lost agents: %s', lost)
if replied_agents >= set(test_case.keys()):
LOG.info('Received all replies for test case: %s', test_case)
if replied | lost >= current:
# update result with info about lost agents
for agent_id in lost:
if agent_id not in replied and agent_id in current:
result[agent_id] = (
test_case[agent_id].process_failure())
result[agent_id].update(dict(status='lost', time=now))
LOG.info('Received replies from all alive agents for '
'test case: %s', test_case)
break
return result
@ -208,7 +227,8 @@ def main():
else:
message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint)
quorum = Quorum(message_queue, cfg.CONF.polling_interval)
quorum = Quorum(message_queue, cfg.CONF.polling_interval,
cfg.CONF.agent_loss_timeout)
quorum.wait_join(set(agents.keys()))
result = execute(quorum, scenario['execution'], agents)

View File

@ -28,12 +28,11 @@ class Shaker(object):
shaker = Shaker('127.0.0.1:5999', ['the-agent'])
res = shaker.run_program('the-agent', 'ls -al')
"""
def __init__(self, server_endpoint, agent_ids, polling_interval=1):
self.server_endpoint = server_endpoint
self.polling_interval = polling_interval
message_queue = messaging.MessageQueue(self.server_endpoint)
self.quorum = server.Quorum(message_queue, self.polling_interval)
def __init__(self, server_endpoint, agent_ids, polling_interval=1,
agent_loss_timeout=60):
message_queue = messaging.MessageQueue(server_endpoint)
self.quorum = server.Quorum(message_queue, polling_interval,
agent_loss_timeout)
self.quorum.wait_join(agent_ids)
def _run(self, agent_id, item):

View File

@ -514,6 +514,8 @@
<h4>Agent {{ result_per_agent.agent.id }}
({{ result_per_agent.agent.ip }}, {{ result_per_agent.agent.node }})</h4>
<h5>Status: {{ result_per_agent.status }}</h5>
{% if result_per_agent.samples %}
{% if result_per_agent.stats %}
<div class="row">

207
tests/test_quorum.py Normal file
View File

@ -0,0 +1,207 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import mock
import testtools
from shaker.engine.executors import base as base_executor
from shaker.engine import server
STEP = 10 # polling interval
LOSS_TIMEOUT = 60
make_quorum = functools.partial(server.Quorum, polling_interval=STEP,
agent_loss_timeout=LOSS_TIMEOUT)
class DummyExecutor(base_executor.BaseExecutor):
def __init__(self, duration=STEP):
super(DummyExecutor, self).__init__({}, None)
self.duration = duration
def get_expected_duration(self):
return self.duration
def process_reply(self, message):
return super(DummyExecutor, self).process_reply(message)
def get_command(self):
return 'RUN'
class TestQuorum(testtools.TestCase):
def setUp(self):
self.mock_time = mock.Mock()
self._mock_patch = mock.patch('time.time', self.mock_time)
self._mock_patch.start()
return super(TestQuorum, self).setUp()
def tearDown(self):
self._mock_patch.stop()
return super(TestQuorum, self).tearDown()
def _reply(self, expected):
def reply_handler(reply_message):
self.assertEqual(expected, reply_message)
return reply_handler
def _message_queue_gen(self, event_stream):
for event in event_stream:
self.mock_time.return_value = event['time']
yield (event['msg'], self._reply(event['reply']))
self.fail('Unreachable state signalling that event loop hangs')
def test_poll_reply(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=20),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_poll_reply_unknown_agent_ignored(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
time=20),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=20),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_lost_agent(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 10),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
def test_agent_loss_timeout(self):
"""Tests that agent is not marked as lost during loss-timeout."""
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=LOSS_TIMEOUT),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=LOSS_TIMEOUT),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])
def test_good_and_lost(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='poll', agent_id='beta'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=2),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
time=20),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 10),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor(),
'beta': DummyExecutor(),
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
self.assertEqual('ok', result['beta']['status'])
def test_wait_agent_running_long_test(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 4),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=STEP * 9),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor(duration=STEP * 9)
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])

View File

@ -13,11 +13,9 @@ setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --testr-args='{posargs}'
distribute = false
[testenv:pep8]
commands = flake8
distribute = false
[testenv:venv]
commands = {posargs}
@ -31,7 +29,11 @@ commands = bash -c "find {toxinidir} -type f -not -wholename \*.tox/\* -and \( -
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:genconfig]
# When shaker is setup in develop mode it results in 2 packages: shaker and pyshaker
# The workaround is to setup it in production mode
usedevelop = False
commands =
python setup.py install
oslo-config-generator --config-file=config-generator.conf
python tools/cli_auto_doc.py doc/source/tools