Revert "Detect lost agents"

There are concerns related to timeout calculations

This reverts commit e686426b22.

Change-Id: Ie0eba4a02847626e23903341460fcd0d49bab216
This commit is contained in:
Ilya Shakhat
2015-04-02 13:34:10 +00:00
parent e686426b22
commit b1901a42fd
5 changed files with 30 additions and 232 deletions

View File

@@ -46,20 +46,13 @@ class BaseExecutor(object):
self.test_definition = test_definition
self.agent = agent
def get_test_duration(self):
"""Get the expected duration of the test
Duration is used by Quorum to calculate expected time of reply.
:return: time in seconds
"""
return self.test_definition.get('time') or 60
def get_command(self):
return None
def process_reply(self, message):
LOG.debug('Test %s on agent %s finished with %s',
self.test_definition, self.agent, message)
res = dict((k, message.get(k))
for k in ['stdout', 'stderr', 'status', 'time'])
res.update(dict(command=self.get_command(), agent=self.agent))
return res
return dict(stdout=message.get('stdout'),
stderr=message.get('stderr'),
command=self.get_command(),
agent=self.agent)

View File

@@ -31,7 +31,7 @@ class IperfExecutor(base.BaseExecutor):
cmd.add('--udp')
if self.test_definition.get('bandwidth'):
cmd.add('--bandwidth', self.test_definition.get('bandwidth'))
cmd.add('--time', self.get_test_duration())
cmd.add('--time', self.test_definition.get('time') or 60)
cmd.add('--parallel', self.test_definition.get('threads') or 1)
if self.test_definition.get('csv'):
cmd.add('--reportstyle', 'C')

View File

@@ -22,7 +22,7 @@ class NetperfExecutor(base.BaseExecutor):
def get_command(self):
cmd = base.CommandLine('netperf')
cmd.add('-H', self.agent['slave']['ip'])
cmd.add('-l', self.get_test_duration())
cmd.add('-l', self.test_definition.get('time') or 60)
cmd.add('-t', self.test_definition.get('method') or 'TCP_STREAM')
return cmd.make()
@@ -31,7 +31,7 @@ class NetperfWrapperExecutor(base.BaseExecutor):
def get_command(self):
cmd = base.CommandLine('netperf-wrapper')
cmd.add('-H', self.agent['slave']['ip'])
cmd.add('-l', self.get_test_duration())
cmd.add('-l', self.test_definition.get('time') or 60)
cmd.add('-s', self.test_definition.get('interval') or 1)
cmd.add('-f', 'csv')
cmd.add(self.test_definition.get('method') or 'tcp_download')

View File

@@ -62,61 +62,43 @@ class Quorum(object):
break
def run_test_case(self, test_case):
current = set(test_case.keys())
lives = {} # agent-id -> live until (timestamp)
LOG.debug('Running test case: %s on agents: %s', test_case, current)
working = set()
replied = set()
working_agents = set()
replied_agents = set()
result = {}
start_at = time.time() + self.polling_interval * 2
start_at = int(time.time()) + self.polling_interval * 2
for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id')
operation = message.get('operation')
now = time.time()
lives[agent_id] = now + self.polling_interval * 2
reply = {'operation': 'none'}
if agent_id in current:
# message from a known agent
test = test_case[agent_id]
if agent_id not in test_case:
reply_handler(reply)
continue
if operation == 'poll':
reply = {
'operation': 'execute',
'start_at': start_at,
'command': test.get_command(),
}
working.add(agent_id)
if test.get_test_duration():
lives[agent_id] += test.get_test_duration()
LOG.debug('Working agents: %s', working)
elif operation == 'reply':
replied.add(agent_id)
result[agent_id] = test.process_reply(message)
result[agent_id].update(dict(status='ok', time=now))
LOG.debug('Replied agents: %s', replied)
# message from a known agent
test = test_case[agent_id]
if operation == 'poll':
reply = {
'operation': 'execute',
'start_at': start_at,
'command': test.get_command(),
}
working_agents.add(agent_id)
elif operation == 'reply':
replied_agents.add(agent_id)
result[agent_id] = test.process_reply(message)
reply_handler(reply)
lost = set(a for a, t in lives.items() if t < now)
if lost:
LOG.debug('Lost agents: %s', lost)
LOG.debug('Working agents: %s', working_agents)
LOG.debug('Replied agents: %s', replied_agents)
if replied | lost >= current:
# update result with info about lost agents
for agent_id in lost:
if agent_id not in replied and agent_id in current:
result[agent_id] = test_case[agent_id].process_reply(
dict(status='lost', time=lives[agent_id]))
LOG.info('Received replies from all alive agents for '
'test case: %s', test_case)
if replied_agents >= set(test_case.keys()):
LOG.info('Received all replies for test case: %s', test_case)
break
return result

View File

@@ -1,177 +0,0 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from shaker.engine.executors import base as base_executor
from shaker.engine import server
STEP = 10 # polling interval
class TestExecutor(base_executor.BaseExecutor):
def __init__(self, duration=STEP):
super(TestExecutor, self).__init__({}, None)
self.duration = duration
def get_test_duration(self):
return self.duration
def process_reply(self, message):
return super(TestExecutor, self).process_reply(message)
def get_command(self):
return 'RUN'
class TestQuorum(testtools.TestCase):
def setUp(self):
self.mock_time = mock.Mock()
self._mock_patch = mock.patch('time.time', self.mock_time)
self._mock_patch.start()
return super(TestQuorum, self).setUp()
def tearDown(self):
self._mock_patch.stop()
return super(TestQuorum, self).tearDown()
def _reply(self, expected):
def reply_handler(reply_message):
self.assertEqual(expected, reply_message)
return reply_handler
def _message_queue_gen(self, event_stream):
for event in event_stream:
self.mock_time.return_value = event['time']
yield (event['msg'], self._reply(event['reply']))
self.fail('Unreachable state signalling that event loop hangs')
def test_poll_reply(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=20),
]
quorum = server.Quorum(self._message_queue_gen(event_stream), STEP)
test_case = {
'alpha': TestExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_poll_reply_unknown_agent_ignored(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
time=20),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=20),
]
quorum = server.Quorum(self._message_queue_gen(event_stream), STEP)
test_case = {
'alpha': TestExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_lost_agent(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 10),
]
quorum = server.Quorum(self._message_queue_gen(event_stream), STEP)
test_case = {
'alpha': TestExecutor()
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
def test_good_and_lost(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='poll', agent_id='beta'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=2),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
time=20),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 10),
]
quorum = server.Quorum(self._message_queue_gen(event_stream), STEP)
test_case = {
'alpha': TestExecutor(),
'beta': TestExecutor(),
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
self.assertEqual('ok', result['beta']['status'])
def test_wait_slow_agent(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 4),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=STEP * 9),
]
quorum = server.Quorum(self._message_queue_gen(event_stream), STEP)
test_case = {
'alpha': TestExecutor(duration=STEP * 9)
}
result = quorum.run_test_case(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])