Introduce executors that convert test definition into commands

This commit is contained in:
Ilya Shakhat 2015-02-11 18:36:01 +03:00
parent a8f56788f0
commit 4d8f7d1958
3 changed files with 114 additions and 45 deletions

View File

@ -0,0 +1,67 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from shaker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class BaseExecutor(object):
def __init__(self, test_definition, agent):
super(BaseExecutor, self).__init__()
self.test_definition = test_definition
self.agent = agent
def get_command(self):
return None
def process_reply(self, message):
pass
class ShellExecutor(BaseExecutor):
def get_command(self):
return self.test_definition['method']
def process_reply(self, message):
LOG.debug('Test %s on agent %s finished with %s',
self.test_definition, self.agent, message)
class NetperfExecutor(BaseExecutor):
def get_command(self):
target_ip = self.agent['brigade']['slave']['private_ip']
return ('netperf-wrapper -H %(ip)s -f stats %(method)s' %
dict(ip=target_ip,
method=self.test_definition['method']))
def process_reply(self, message):
LOG.debug('Test %s on agent %s finished with %s',
self.test_definition, self.agent, message)
EXECUTORS = {
'shell': ShellExecutor,
'netperf': NetperfExecutor,
'_default': ShellExecutor,
}
def get_executor(test_definition, agent):
# returns executor of the specified test on the specified agent
executor_class = test_definition['class']
klazz = EXECUTORS.get(executor_class, EXECUTORS['_default'])
return klazz(test_definition, agent)

View File

@ -44,7 +44,7 @@ def wait_stack_completion(heat_client, stack_id):
if status not in ['IN_PROGRESS', '']: if status not in ['IN_PROGRESS', '']:
break break
time.sleep(1) time.sleep(5)
if status != 'COMPLETE': if status != 'COMPLETE':
raise Exception(status) raise Exception(status)

View File

@ -21,6 +21,7 @@ import zmq
from shaker.engine import config from shaker.engine import config
from shaker.engine import deploy from shaker.engine import deploy
from shaker.engine import executors as executors_classes
from shaker.engine import utils from shaker.engine import utils
from shaker.openstack.common import log as logging from shaker.openstack.common import log as logging
@ -34,10 +35,6 @@ class Quorum(object):
self.agents = agents self.agents = agents
self.agent_ids = set(a['id'] for a in agents) self.agent_ids = set(a['id'] for a in agents)
self.master_agent_ids = set(a['id'] for a in agents
if a['mode'] == 'master')
self.slave_agent_ids = set(a['id'] for a in agents
if a['mode'] == 'slave')
def wait_join(self): def wait_join(self):
alive_agents = set() alive_agents = set()
@ -54,35 +51,38 @@ class Quorum(object):
break break
def run_test_case(self, test_case): def run_test_case(self, test_case):
LOG.debug('Running test case: %s', test_case)
working_agents = set() working_agents = set()
replied_agents = set() replied_agents = set()
start_at = int(time.time()) + 60 # schedule tasks in a minute from now start_at = int(time.time()) + 30 # schedule tasks in a 30 sec from now
for message, reply_handler in self.message_queue: for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id') agent_id = message.get('agent_id')
operation = message.get('operation') operation = message.get('operation')
reply = {'operation': 'none'} reply = {'operation': 'none'}
if (operation == 'poll') and (agent_id in self.master_agent_ids): if agent_id in test_case:
# message from a known agent
test = test_case[agent_id]
if operation == 'poll':
reply = { reply = {
'operation': 'execute', 'operation': 'execute',
'start_at': start_at, 'start_at': start_at,
'command': test_case['command'], # todo make abstract 'command': test.get_command(),
} }
working_agents.add(agent_id) working_agents.add(agent_id)
elif operation == 'reply': elif operation == 'reply':
# store data
replied_agents.add(agent_id) replied_agents.add(agent_id)
test.process_reply(message)
reply_handler(reply) reply_handler(reply)
LOG.debug('Working agents: %s', working_agents) LOG.debug('Working agents: %s', working_agents)
LOG.debug('Replied agents: %s', replied_agents) LOG.debug('Replied agents: %s', replied_agents)
if replied_agents >= self.master_agent_ids: if replied_agents >= set(test_case.keys()):
LOG.info('Received all replies for test case: %s', test_case) LOG.info('Received all replies for test case: %s', test_case)
break break
@ -128,26 +128,6 @@ class MessageQueue(object):
raise raise
def run(message_queue, agents, execution):
# sample data, will be picked from scenario
tests = [
# {'command': 'netperf-wrapper -H 172.18.76.77 tcp_bidirectional'},
{'command': 'ls -al'},
]
LOG.debug('Creating quorum of agents: %s', agents)
quorum = Quorum(message_queue, agents)
LOG.debug('Waiting for quorum of agents')
quorum.wait_join()
for test_case in tests:
LOG.debug('Running test case: %s', test_case)
quorum.run_test_case(test_case)
LOG.info('Done')
def read_scenario(): def read_scenario():
scenario_raw = utils.read_file(cfg.CONF.scenario) scenario_raw = utils.read_file(cfg.CONF.scenario)
scenario = yaml.safe_load(scenario_raw) scenario = yaml.safe_load(scenario_raw)
@ -160,24 +140,46 @@ def convert_instance_name_to_agent_id(instance_name):
def execute(execution, brigades): def execute(execution, brigades):
# define agents agents = {}
agents = []
for brigade in brigades: for brigade in brigades:
if brigade['master'].get('instance_name'): if brigade['master'].get('instance_name'):
agent_id = convert_instance_name_to_agent_id( agent_id = convert_instance_name_to_agent_id(
brigade['master'].get('instance_name')) brigade['master'].get('instance_name'))
agents.append(dict(mode='master', id=agent_id)) agents[agent_id] = dict(
mode='master', id=agent_id, brigade=brigade)
if brigade['slave'].get('instance_name'): if brigade['slave'].get('instance_name'):
agent_id = convert_instance_name_to_agent_id( agent_id = convert_instance_name_to_agent_id(
brigade['slave'].get('instance_name')) brigade['slave'].get('instance_name'))
agents.append(dict(mode='slave', id=agent_id)) agents[agent_id] = dict(
mode='slave', id=agent_id, brigade=brigade)
if not agents: if not agents:
LOG.warning('No active agents. Is the stack deployed?') LOG.warning('No master instances found. Is the stack deployed?')
return return
message_queue = MessageQueue(cfg.CONF.server_endpoint) message_queue = MessageQueue(cfg.CONF.server_endpoint)
run(message_queue, agents, execution)
LOG.debug('Creating quorum of agents: %s', agents)
quorum = Quorum(message_queue, agents.values())
LOG.debug('Waiting for quorum of agents')
quorum.wait_join()
for test_definition in execution['tests']:
LOG.debug('Running test %s on all agents', test_definition)
executors = dict()
for agent_id, agent in agents.items():
if agent['mode'] == 'master':
# tests are executed on master agents only
executors[agent_id] = executors_classes.get_executor(
test_definition, agent)
quorum.run_test_case(executors)
LOG.info('Done')
def main(): def main():