Refactor Quorum to dedup common code in wait_join and run

Change-Id: Iaffc4b7ef500d3e17d5ef0daed07854d29436537
This commit is contained in:
Ilya Shakhat 2015-04-07 16:09:24 +03:00
parent ff11cd9e04
commit 1a5108fb3c
7 changed files with 190 additions and 88 deletions

View File

@ -1,11 +1,12 @@
usage: shaker [-h] [--agent-loss-timeout AGENT_LOSS_TIMEOUT]
[--config-dir DIR] [--config-file PATH] [--debug]
[--external-net EXTERNAL_NET] [--flavor-name FLAVOR_NAME]
[--image-name IMAGE_NAME] [--log-config-append PATH]
[--log-date-format DATE_FORMAT] [--log-dir LOG_DIR]
[--log-file PATH] [--log-format FORMAT] [--nodebug]
[--nouse-syslog] [--nouse-syslog-rfc-format] [--noverbose]
[--os-auth-url <auth-url>] [--os-password <auth-password>]
usage: shaker [-h] [--agent-join-timeout AGENT_JOIN_TIMEOUT]
[--agent-loss-timeout AGENT_LOSS_TIMEOUT] [--config-dir DIR]
[--config-file PATH] [--debug] [--external-net EXTERNAL_NET]
[--flavor-name FLAVOR_NAME] [--image-name IMAGE_NAME]
[--log-config-append PATH] [--log-date-format DATE_FORMAT]
[--log-dir LOG_DIR] [--log-file PATH] [--log-format FORMAT]
[--nodebug] [--nouse-syslog] [--nouse-syslog-rfc-format]
[--noverbose] [--os-auth-url <auth-url>]
[--os-password <auth-password>]
[--os-region-name <auth-region-name>]
[--os-tenant-name <auth-tenant-name>]
[--os-username <auth-username>] [--output OUTPUT]
@ -17,8 +18,12 @@ usage: shaker [-h] [--agent-loss-timeout AGENT_LOSS_TIMEOUT]
optional arguments:
-h, --help show this help message and exit
--agent-join-timeout AGENT_JOIN_TIMEOUT
How long to wait for agents to join in seconds (time
between stack deployment and start of scenario
execution).
--agent-loss-timeout AGENT_LOSS_TIMEOUT
Timeout to treat agent as lost
Timeout to treat agent as lost in seconds
--config-dir DIR Path to a config directory to pull *.conf files from.
This file set is sorted, so as to provide a
predictable parse order if individual options are

View File

@ -129,9 +129,13 @@
# value)
#output = <None>
# Timeout to treat agent as lost (integer value)
# Timeout to treat agent as lost in seconds (integer value)
#agent_loss_timeout = 60
# How long to wait for agents to join in seconds (time between stack deployment
# and start of scenario execution). (integer value)
#agent_join_timeout = 600
# Report template in Jinja format (string value)
#report_template = shaker/resources/report_template.jinja2

View File

@ -110,6 +110,7 @@ def main():
elif task['operation'] == 'configure':
if 'polling_interval' in task:
polling_interval = task.get('polling_interval')
send_reply(socket, agent_id, {})
time.sleep(polling_interval)

View File

@ -81,7 +81,12 @@ SERVER_OPTS = [
'defaults to env[SHAKER_OUTPUT].'),
cfg.IntOpt('agent-loss-timeout',
default=60,
help='Timeout to treat agent as lost in seconds')
help='Timeout to treat agent as lost in seconds'),
cfg.IntOpt('agent-join-timeout',
default=600,
help='How long to wait for agents to join in seconds (time '
'between stack deployment and start of scenario '
'execution).')
]
REPORT_OPTS = [

View File

@ -34,94 +34,138 @@ from shaker.engine import utils
LOG = logging.getLogger(__name__)
class BaseOperation(object):
def get_agent_join_timeout(self):
return 0
def get_active_agent_ids(self):
pass
def get_reply(self, agent_id, start_at):
return {}
def process_reply(self, agent_id, message):
return {'status': 'ok'}
def process_failure(self, agent_id):
return {'status': 'lost'}
class JoinOperation(BaseOperation):
def __init__(self, agent_ids, polling_interval, agent_join_timeout):
super(JoinOperation, self).__init__()
self.agent_ids = agent_ids
self.polling_interval = polling_interval
self.agent_join_timeout = agent_join_timeout
def get_agent_join_timeout(self):
return self.agent_join_timeout
def get_active_agent_ids(self):
return set(self.agent_ids)
def get_reply(self, agent_id, start_at):
return dict(operation='configure',
polling_interval=self.polling_interval,
expected_duration=0)
class ExecuteOperation(BaseOperation):
def __init__(self, executors):
super(ExecuteOperation, self).__init__()
self.executors = executors
def get_active_agent_ids(self):
return set(self.executors.keys())
def get_reply(self, agent_id, start_at):
reply = dict(operation='execute',
start_at=start_at,
command=self.executors[agent_id].get_command(),
expected_duration=(self.executors[agent_id].
get_expected_duration()))
return reply
def process_reply(self, agent_id, message):
r = super(ExecuteOperation, self).process_reply(agent_id, message)
r.update(self.executors[agent_id].process_reply(message))
return r
def process_failure(self, agent_id):
r = super(ExecuteOperation, self).process_failure(agent_id)
r.update(self.executors[agent_id].process_failure())
return r
class Quorum(object):
def __init__(self, message_queue, polling_interval, agent_loss_timeout):
def __init__(self, message_queue, polling_interval, agent_loss_timeout,
agent_join_timeout):
self.message_queue = message_queue
self.polling_interval = polling_interval
self.agent_loss_timeout = agent_loss_timeout
self.agent_join_timeout = agent_join_timeout
def wait_join(self, agent_ids):
agent_ids = set(agent_ids)
LOG.debug('Waiting for quorum of agents: %s', agent_ids)
alive_agents = set()
for message, reply_handler in self.message_queue:
msg_agent_id = message.get('agent_id')
if msg_agent_id not in agent_ids:
reply_handler(dict(operation='none'))
continue
alive_agents.add(msg_agent_id)
reply_handler(dict(operation='configure',
polling_interval=self.polling_interval))
LOG.debug('Alive agents: %s', alive_agents)
if alive_agents == agent_ids:
LOG.info('All expected agents are alive')
break
def run_test_case(self, test_case):
current = set(test_case.keys())
LOG.debug('Running test case on agents: %s', current)
def _run(self, operation):
current = operation.get_active_agent_ids()
LOG.debug('Executing operation %s on agents: %s', operation, current)
working = set()
replied = set()
result = {}
start_at = time.time() + self.polling_interval * 2
lives = dict((agent_id, start_at) for agent_id in current)
lives = dict((agent_id, start_at + operation.get_agent_join_timeout())
for agent_id in current)
for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id')
operation = message.get('operation')
now = time.time()
lives[agent_id] = (now + self.polling_interval * 2 +
self.agent_loss_timeout)
op = message.get('operation')
reply = {'operation': 'none'}
now = time.time()
if agent_id in current:
# message from a known agent
test = test_case[agent_id]
if agent_id in (current - replied):
# message from a known not yet worked agent
lives[agent_id] = (now + self.polling_interval * 2 +
self.agent_loss_timeout)
if operation == 'poll':
reply = {
'operation': 'execute',
'start_at': start_at,
'command': test.get_command(),
}
if op == 'poll':
reply = operation.get_reply(agent_id, start_at)
lives[agent_id] += reply.get('expected_duration')
working.add(agent_id)
lives[agent_id] += test.get_expected_duration()
LOG.debug('Working agents: %s', working)
elif operation == 'reply':
replied.add(agent_id)
result[agent_id] = test.process_reply(message)
result[agent_id].update(dict(status='ok', time=now))
LOG.debug('Replied agents: %s', replied)
elif op == 'reply':
if agent_id in working:
result[agent_id] = operation.process_reply(
agent_id, message)
replied.add(agent_id)
LOG.debug('Replied agents: %s', replied)
reply_handler(reply)
lost = set(a for a, t in lives.items() if t < now)
lost = set(a for a, t in lives.items() if t < now) - replied
if lost:
LOG.debug('Lost agents: %s', lost)
if replied | lost >= current:
# update result with info about lost agents
for agent_id in lost:
if agent_id not in replied and agent_id in current:
result[agent_id] = (
test_case[agent_id].process_failure())
result[agent_id].update(dict(status='lost', time=now))
if lost:
LOG.warning('Lost agents: %s', lost)
# update result with info about lost agents
for agent_id in lost:
result[agent_id] = operation.process_failure(agent_id)
LOG.info('Received replies from all alive agents for '
'test case: %s', test_case)
LOG.info('Finished processing operation: %s', operation)
break
return result
def join(self, agent_ids):
LOG.debug('Waiting for quorum of agents: %s', agent_ids)
return self._run(JoinOperation(agent_ids, self.polling_interval,
self.agent_join_timeout))
def execute(self, executors):
return self._run(ExecuteOperation(executors))
def read_scenario():
scenario_raw = utils.read_file(cfg.CONF.scenario)
@ -176,8 +220,9 @@ def execute(quorum, execution, agents):
executors = dict((a['id'], executors_classes.get_executor(test, a))
for a in selected_agents)
test_case_result = quorum.run_test_case(executors)
values = test_case_result.values()
execution_result = quorum.execute(executors)
values = execution_result.values()
for v in values:
v['uuid'] = str(uuid.uuid4())
results_per_iteration.append({
@ -228,8 +273,9 @@ def main():
message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint)
quorum = Quorum(message_queue, cfg.CONF.polling_interval,
cfg.CONF.agent_loss_timeout)
quorum.wait_join(set(agents.keys()))
cfg.CONF.agent_loss_timeout,
cfg.CONF.agent_join_timeout)
quorum.join(set(agents.keys()))
result = execute(quorum, scenario['execution'], agents)
LOG.debug('Result: %s', result)

View File

@ -33,7 +33,7 @@ class Shaker(object):
message_queue = messaging.MessageQueue(server_endpoint)
self.quorum = server.Quorum(message_queue, polling_interval,
agent_loss_timeout)
self.quorum.wait_join(agent_ids)
self.quorum.join(agent_ids)
def _run(self, agent_id, item):
agents = dict([(agent_id, dict(id=agent_id, mode='alone'))])

View File

@ -24,9 +24,11 @@ from shaker.engine import server
STEP = 10 # polling interval
LOSS_TIMEOUT = 60
JOIN_TIMEOUT = 600
make_quorum = functools.partial(server.Quorum, polling_interval=STEP,
agent_loss_timeout=LOSS_TIMEOUT)
agent_loss_timeout=LOSS_TIMEOUT,
agent_join_timeout=JOIN_TIMEOUT)
class DummyExecutor(base_executor.BaseExecutor):
@ -74,7 +76,7 @@ class TestQuorum(testtools.TestCase):
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
@ -85,7 +87,7 @@ class TestQuorum(testtools.TestCase):
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_poll_reply_unknown_agent_ignored(self):
@ -93,7 +95,7 @@ class TestQuorum(testtools.TestCase):
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
@ -107,7 +109,7 @@ class TestQuorum(testtools.TestCase):
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
def test_lost_agent(self):
@ -115,7 +117,7 @@ class TestQuorum(testtools.TestCase):
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
@ -126,7 +128,7 @@ class TestQuorum(testtools.TestCase):
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
@ -136,7 +138,7 @@ class TestQuorum(testtools.TestCase):
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
@ -150,7 +152,7 @@ class TestQuorum(testtools.TestCase):
test_case = {
'alpha': DummyExecutor()
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])
@ -159,11 +161,11 @@ class TestQuorum(testtools.TestCase):
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='poll', agent_id='beta'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP),
time=2),
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
@ -178,17 +180,17 @@ class TestQuorum(testtools.TestCase):
'alpha': DummyExecutor(),
'beta': DummyExecutor(),
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
self.assertEqual('ok', result['beta']['status'])
def test_wait_agent_running_long_test(self):
def test_wait_agentexecutening_long_test(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2),
start_at=STEP * 2, expected_duration=STEP * 9),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
@ -202,6 +204,45 @@ class TestQuorum(testtools.TestCase):
test_case = {
'alpha': DummyExecutor(duration=STEP * 9)
}
result = quorum.run_test_case(test_case)
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])
def test_join_succeed(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='configure', polling_interval=STEP,
expected_duration=0),
time=STEP),
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=STEP * 2),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=STEP * 2),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
result = quorum.join(['alpha'])
lost = [agent_id for agent_id, r in result.items()
if r['status'] == 'lost']
self.assertEqual([], lost)
def test_join_failed(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
reply=dict(operation='configure', polling_interval=STEP,
expected_duration=0),
time=STEP),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
reply=dict(operation='none'),
time=JOIN_TIMEOUT + STEP * 2),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
result = quorum.join(['alpha'])
lost = [agent_id for agent_id, r in result.items()
if r['status'] == 'lost']
self.assertEqual(['alpha'], lost)