From 82e23b123d4d37534f2bd0e591798f64c6b9e42f Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 23 Mar 2015 16:40:51 +0300 Subject: [PATCH] Refactor server and extract lib entry-point Change-Id: Ib42aa2f80dcd2906403fb1fa9b29b44d09cf8bf0 --- etc/shaker.conf | 2 +- scenarios/misc/static_agents_pair.yaml | 4 +- shaker/engine/config.py | 5 -- shaker/engine/deploy.py | 29 ++++++---- shaker/engine/messaging.py | 52 ++++++++++++++++++ shaker/engine/server.py | 73 ++++++++------------------ shaker/lib.py | 56 ++++++++++++++++++++ 7 files changed, 153 insertions(+), 68 deletions(-) create mode 100644 shaker/engine/messaging.py create mode 100644 shaker/lib.py diff --git a/etc/shaker.conf b/etc/shaker.conf index d8b1916..7033648 100644 --- a/etc/shaker.conf +++ b/etc/shaker.conf @@ -67,7 +67,7 @@ #logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s # List of logger=LEVEL pairs. (list value) -#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN +#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN # Enables or disables publication of error events. (boolean value) #publish_errors = false diff --git a/scenarios/misc/static_agents_pair.yaml b/scenarios/misc/static_agents_pair.yaml index 2986ffe..fa2185c 100644 --- a/scenarios/misc/static_agents_pair.yaml +++ b/scenarios/misc/static_agents_pair.yaml @@ -18,7 +18,7 @@ execution: tests: - class: netperf - method: TCP_STREAM + program: TCP_STREAM - class: shell - method: ls -al + program: ls -al diff --git a/shaker/engine/config.py b/shaker/engine/config.py index 8b7597b..b8a9e7f 100644 --- a/shaker/engine/config.py +++ b/shaker/engine/config.py @@ -34,27 +34,22 @@ OPENSTACK_OPTS = [ cfg.StrOpt('os-auth-url', metavar='', default=utils.env('OS_AUTH_URL'), sample_default='', - required=True, help='Authentication URL, defaults to env[OS_AUTH_URL].'), cfg.StrOpt('os-tenant-name', metavar='', default=utils.env('OS_TENANT_NAME'), sample_default='', - required=True, help='Authentication tenant name, defaults to ' 'env[OS_TENANT_NAME].'), cfg.StrOpt('os-username', metavar='', default=utils.env('OS_USERNAME'), sample_default='', - required=True, help='Authentication username, defaults to env[OS_USERNAME].'), cfg.StrOpt('os-password', metavar='', default=utils.env('OS_PASSWORD'), sample_default='', - required=True, help='Authentication password, defaults to env[OS_PASSWORD].'), cfg.StrOpt('os-region-name', metavar='', default=utils.env('OS_REGION_NAME') or 'RegionOne', - required=True, help='Authentication region name, defaults to ' 'env[OS_REGION_NAME].'), diff --git a/shaker/engine/deploy.py b/shaker/engine/deploy.py index 214fa4d..cd7f62d 100644 --- a/shaker/engine/deploy.py +++ b/shaker/engine/deploy.py @@ -114,23 +114,27 @@ def filter_agents(agents, stack_outputs): class Deployment(object): - def __init__(self, os_username, os_password, os_tenant_name, os_auth_url, - os_region_name, server_endpoint, external_net, flavor_name, - image_name): + def __init__(self, server_endpoint): + self.server_endpoint = server_endpoint + self.openstack_client = None + self.stack_deployed = False + + def connect_to_openstack(self, os_username, os_password, os_tenant_name, + os_auth_url, os_region_name, external_net, + flavor_name, image_name): + LOG.debug('Connecting to OpenStack') + self.openstack_client = openstack.OpenStackClient( username=os_username, password=os_password, tenant_name=os_tenant_name, auth_url=os_auth_url, region_name=os_region_name) - self.server_endpoint = server_endpoint + self.flavor_name = flavor_name + self.image_name = image_name self.external_net = (external_net or neutron.choose_external_net( self.openstack_client.neutron)) - self.flavor_name = flavor_name - self.image_name = image_name - self.stack_name = 'shaker_%s' % utils.random_string() - self.stack_deployed = False def _deploy_from_hot(self, specification, base_dir=None): agents = generate_agents( @@ -182,8 +186,13 @@ class Deployment(object): agents = {} if deployment.get('template'): - # deploy topology specified by HOT - agents.update(self._deploy_from_hot(deployment, base_dir=base_dir)) + if not self.openstack_client: + LOG.error('OpenStack client is not initialized. Template ' + 'deployment is ignored.') + else: + # deploy topology specified by HOT + agents.update(self._deploy_from_hot( + deployment, base_dir=base_dir)) if deployment.get('agents'): # agents are specified statically diff --git a/shaker/engine/messaging.py b/shaker/engine/messaging.py new file mode 100644 index 0000000..80fe14a --- /dev/null +++ b/shaker/engine/messaging.py @@ -0,0 +1,52 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import zmq + +from shaker.engine import utils + + +LOG = logging.getLogger(__name__) + + +class MessageQueue(object): + def __init__(self, endpoint): + _, port = utils.split_address(endpoint) + + context = zmq.Context() + self.socket = context.socket(zmq.REP) + self.socket.bind("tcp://*:%s" % port) + LOG.info('Listening on *:%s', port) + + def __iter__(self): + try: + while True: + # Wait for next request from client + message = self.socket.recv_json() + LOG.debug('Received request: %s', message) + + def reply_handler(reply_message): + self.socket.send_json(reply_message) + + try: + yield message, reply_handler + except GeneratorExit: + break + + except BaseException as e: + if not isinstance(e, KeyboardInterrupt): + LOG.exception(e) + raise diff --git a/shaker/engine/server.py b/shaker/engine/server.py index fe31ba8..316f84b 100644 --- a/shaker/engine/server.py +++ b/shaker/engine/server.py @@ -22,11 +22,11 @@ import uuid from oslo_config import cfg from oslo_log import log as logging import yaml -import zmq from shaker.engine import config from shaker.engine import deploy from shaker.engine import executors as executors_classes +from shaker.engine import messaging from shaker.engine import report from shaker.engine import utils @@ -35,17 +35,20 @@ LOG = logging.getLogger(__name__) class Quorum(object): - def __init__(self, message_queue): + def __init__(self, message_queue, polling_interval): self.message_queue = message_queue + self.polling_interval = polling_interval def wait_join(self, agent_ids): + agent_ids = set(agent_ids) LOG.debug('Waiting for quorum of agents: %s', agent_ids) alive_agents = set() for message, reply_handler in self.message_queue: agent_id = message.get('agent_id') alive_agents.add(agent_id) - reply_handler(dict(operation='none')) + reply_handler(dict(operation='configure', + polling_interval=self.polling_interval)) LOG.debug('Alive agents: %s', alive_agents) @@ -58,7 +61,7 @@ class Quorum(object): replied_agents = set() result = {} - start_at = int(time.time()) + 30 # schedule tasks in a 30 sec from now + start_at = int(time.time()) + self.polling_interval * 2 for message, reply_handler in self.message_queue: agent_id = message.get('agent_id') @@ -93,36 +96,6 @@ class Quorum(object): return result -class MessageQueue(object): - def __init__(self, endpoint): - _, port = utils.split_address(endpoint) - - context = zmq.Context() - self.socket = context.socket(zmq.REP) - self.socket.bind("tcp://*:%s" % port) - LOG.info('Listening on *:%s', port) - - def __iter__(self): - try: - while True: - # Wait for next request from client - message = self.socket.recv_json() - LOG.debug('Received request: %s', message) - - def reply_handler(reply_message): - self.socket.send_json(reply_message) - - try: - yield message, reply_handler - except GeneratorExit: - break - - except BaseException as e: - if not isinstance(e, KeyboardInterrupt): - LOG.exception(e) - raise - - def read_scenario(): scenario_raw = utils.read_file(cfg.CONF.scenario) scenario = yaml.safe_load(scenario_raw) @@ -159,14 +132,9 @@ def _pick_agents(agents, size): yield agents[:i] -def execute(execution, agents): +def execute(quorum, execution, agents): _extend_agents(agents) - message_queue = MessageQueue(cfg.CONF.server_endpoint) - - quorum = Quorum(message_queue) - quorum.wait_join(set(agents.keys())) - result = [] for test in execution['tests']: @@ -209,15 +177,15 @@ def main(): result = [] try: - deployment = deploy.Deployment(cfg.CONF.os_username, - cfg.CONF.os_password, - cfg.CONF.os_tenant_name, - cfg.CONF.os_auth_url, - cfg.CONF.os_region_name, - cfg.CONF.server_endpoint, - cfg.CONF.external_net, - cfg.CONF.flavor_name, - cfg.CONF.image_name) + deployment = deploy.Deployment(cfg.CONF.server_endpoint) + + if (cfg.CONF.os_username and cfg.CONF.os_password and + cfg.CONF.os_tenant_name and cfg.CONF.os_auth_url): + deployment.connect_to_openstack( + cfg.CONF.os_username, cfg.CONF.os_password, + cfg.CONF.os_tenant_name, cfg.CONF.os_auth_url, + cfg.CONF.os_region_name, cfg.CONF.external_net, + cfg.CONF.flavor_name, cfg.CONF.image_name) agents = deployment.deploy(scenario['deployment'], base_dir=os.path.dirname(cfg.CONF.scenario)) @@ -226,7 +194,12 @@ def main(): if not agents: LOG.warning('No agents deployed.') else: - result = execute(scenario['execution'], agents) + message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint) + + quorum = Quorum(message_queue, cfg.CONF.polling_interval) + quorum.wait_join(set(agents.keys())) + + result = execute(quorum, scenario['execution'], agents) LOG.debug('Result: %s', result) except Exception as e: LOG.error('Error while executing scenario: %s', e) diff --git a/shaker/lib.py b/shaker/lib.py new file mode 100644 index 0000000..e52085b --- /dev/null +++ b/shaker/lib.py @@ -0,0 +1,56 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from shaker.engine import messaging +from shaker.engine import server + + +LOG = logging.getLogger(__name__) + + +class Shaker(object): + """How to use Shaker as library + + shaker = Shaker('127.0.0.1:5999', ['the-agent']) + res = shaker.run_program('the-agent', 'ls -al') + """ + def __init__(self, server_endpoint, agent_ids, polling_interval=1): + self.server_endpoint = server_endpoint + self.polling_interval = polling_interval + + message_queue = messaging.MessageQueue(self.server_endpoint) + self.quorum = server.Quorum(message_queue, self.polling_interval) + self.quorum.wait_join(agent_ids) + + def _run(self, agent_id, item): + agents = dict([(agent_id, dict(id=agent_id, mode='alone'))]) + + test = {'class': 'shell'} + test.update(item) + + execution = {'tests': [test]} + execution_result = server.execute(self.quorum, execution, agents) + + results_per_iteration = execution_result[0]['results_per_iteration'] + results_per_agent = results_per_iteration[0]['results_per_agent'] + return dict((s['agent']['id'], s) for s in results_per_agent) + + def run_program(self, agent_id, program): + return self._run(agent_id, {'program': program}) + + def run_script(self, agent_id, script): + return self._run(agent_id, {'script': script})