Refactor server and extract lib entry-point

Change-Id: Ib42aa2f80dcd2906403fb1fa9b29b44d09cf8bf0
This commit is contained in:
Ilya Shakhat 2015-03-23 16:40:51 +03:00 committed by Ilya Shakhat
parent 44f7a4dbc6
commit 82e23b123d
7 changed files with 153 additions and 68 deletions

View File

@ -67,7 +67,7 @@
#logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s #logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# List of logger=LEVEL pairs. (list value) # List of logger=LEVEL pairs. (list value)
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN #default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN
# Enables or disables publication of error events. (boolean value) # Enables or disables publication of error events. (boolean value)
#publish_errors = false #publish_errors = false

View File

@ -18,7 +18,7 @@ execution:
tests: tests:
- -
class: netperf class: netperf
method: TCP_STREAM program: TCP_STREAM
- -
class: shell class: shell
method: ls -al program: ls -al

View File

@ -34,27 +34,22 @@ OPENSTACK_OPTS = [
cfg.StrOpt('os-auth-url', metavar='<auth-url>', cfg.StrOpt('os-auth-url', metavar='<auth-url>',
default=utils.env('OS_AUTH_URL'), default=utils.env('OS_AUTH_URL'),
sample_default='', sample_default='',
required=True,
help='Authentication URL, defaults to env[OS_AUTH_URL].'), help='Authentication URL, defaults to env[OS_AUTH_URL].'),
cfg.StrOpt('os-tenant-name', metavar='<auth-tenant-name>', cfg.StrOpt('os-tenant-name', metavar='<auth-tenant-name>',
default=utils.env('OS_TENANT_NAME'), default=utils.env('OS_TENANT_NAME'),
sample_default='', sample_default='',
required=True,
help='Authentication tenant name, defaults to ' help='Authentication tenant name, defaults to '
'env[OS_TENANT_NAME].'), 'env[OS_TENANT_NAME].'),
cfg.StrOpt('os-username', metavar='<auth-username>', cfg.StrOpt('os-username', metavar='<auth-username>',
default=utils.env('OS_USERNAME'), default=utils.env('OS_USERNAME'),
sample_default='', sample_default='',
required=True,
help='Authentication username, defaults to env[OS_USERNAME].'), help='Authentication username, defaults to env[OS_USERNAME].'),
cfg.StrOpt('os-password', metavar='<auth-password>', cfg.StrOpt('os-password', metavar='<auth-password>',
default=utils.env('OS_PASSWORD'), default=utils.env('OS_PASSWORD'),
sample_default='', sample_default='',
required=True,
help='Authentication password, defaults to env[OS_PASSWORD].'), help='Authentication password, defaults to env[OS_PASSWORD].'),
cfg.StrOpt('os-region-name', metavar='<auth-region-name>', cfg.StrOpt('os-region-name', metavar='<auth-region-name>',
default=utils.env('OS_REGION_NAME') or 'RegionOne', default=utils.env('OS_REGION_NAME') or 'RegionOne',
required=True,
help='Authentication region name, defaults to ' help='Authentication region name, defaults to '
'env[OS_REGION_NAME].'), 'env[OS_REGION_NAME].'),

View File

@ -114,23 +114,27 @@ def filter_agents(agents, stack_outputs):
class Deployment(object): class Deployment(object):
def __init__(self, os_username, os_password, os_tenant_name, os_auth_url, def __init__(self, server_endpoint):
os_region_name, server_endpoint, external_net, flavor_name, self.server_endpoint = server_endpoint
image_name): self.openstack_client = None
self.stack_deployed = False
def connect_to_openstack(self, os_username, os_password, os_tenant_name,
os_auth_url, os_region_name, external_net,
flavor_name, image_name):
LOG.debug('Connecting to OpenStack')
self.openstack_client = openstack.OpenStackClient( self.openstack_client = openstack.OpenStackClient(
username=os_username, password=os_password, username=os_username, password=os_password,
tenant_name=os_tenant_name, auth_url=os_auth_url, tenant_name=os_tenant_name, auth_url=os_auth_url,
region_name=os_region_name) region_name=os_region_name)
self.server_endpoint = server_endpoint self.flavor_name = flavor_name
self.image_name = image_name
self.external_net = (external_net or self.external_net = (external_net or
neutron.choose_external_net( neutron.choose_external_net(
self.openstack_client.neutron)) self.openstack_client.neutron))
self.flavor_name = flavor_name
self.image_name = image_name
self.stack_name = 'shaker_%s' % utils.random_string() self.stack_name = 'shaker_%s' % utils.random_string()
self.stack_deployed = False
def _deploy_from_hot(self, specification, base_dir=None): def _deploy_from_hot(self, specification, base_dir=None):
agents = generate_agents( agents = generate_agents(
@ -182,8 +186,13 @@ class Deployment(object):
agents = {} agents = {}
if deployment.get('template'): if deployment.get('template'):
# deploy topology specified by HOT if not self.openstack_client:
agents.update(self._deploy_from_hot(deployment, base_dir=base_dir)) LOG.error('OpenStack client is not initialized. Template '
'deployment is ignored.')
else:
# deploy topology specified by HOT
agents.update(self._deploy_from_hot(
deployment, base_dir=base_dir))
if deployment.get('agents'): if deployment.get('agents'):
# agents are specified statically # agents are specified statically

View File

@ -0,0 +1,52 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import zmq
from shaker.engine import utils
LOG = logging.getLogger(__name__)
class MessageQueue(object):
def __init__(self, endpoint):
_, port = utils.split_address(endpoint)
context = zmq.Context()
self.socket = context.socket(zmq.REP)
self.socket.bind("tcp://*:%s" % port)
LOG.info('Listening on *:%s', port)
def __iter__(self):
try:
while True:
# Wait for next request from client
message = self.socket.recv_json()
LOG.debug('Received request: %s', message)
def reply_handler(reply_message):
self.socket.send_json(reply_message)
try:
yield message, reply_handler
except GeneratorExit:
break
except BaseException as e:
if not isinstance(e, KeyboardInterrupt):
LOG.exception(e)
raise

View File

@ -22,11 +22,11 @@ import uuid
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import yaml import yaml
import zmq
from shaker.engine import config from shaker.engine import config
from shaker.engine import deploy from shaker.engine import deploy
from shaker.engine import executors as executors_classes from shaker.engine import executors as executors_classes
from shaker.engine import messaging
from shaker.engine import report from shaker.engine import report
from shaker.engine import utils from shaker.engine import utils
@ -35,17 +35,20 @@ LOG = logging.getLogger(__name__)
class Quorum(object): class Quorum(object):
def __init__(self, message_queue): def __init__(self, message_queue, polling_interval):
self.message_queue = message_queue self.message_queue = message_queue
self.polling_interval = polling_interval
def wait_join(self, agent_ids): def wait_join(self, agent_ids):
agent_ids = set(agent_ids)
LOG.debug('Waiting for quorum of agents: %s', agent_ids) LOG.debug('Waiting for quorum of agents: %s', agent_ids)
alive_agents = set() alive_agents = set()
for message, reply_handler in self.message_queue: for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id') agent_id = message.get('agent_id')
alive_agents.add(agent_id) alive_agents.add(agent_id)
reply_handler(dict(operation='none')) reply_handler(dict(operation='configure',
polling_interval=self.polling_interval))
LOG.debug('Alive agents: %s', alive_agents) LOG.debug('Alive agents: %s', alive_agents)
@ -58,7 +61,7 @@ class Quorum(object):
replied_agents = set() replied_agents = set()
result = {} result = {}
start_at = int(time.time()) + 30 # schedule tasks in a 30 sec from now start_at = int(time.time()) + self.polling_interval * 2
for message, reply_handler in self.message_queue: for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id') agent_id = message.get('agent_id')
@ -93,36 +96,6 @@ class Quorum(object):
return result return result
class MessageQueue(object):
def __init__(self, endpoint):
_, port = utils.split_address(endpoint)
context = zmq.Context()
self.socket = context.socket(zmq.REP)
self.socket.bind("tcp://*:%s" % port)
LOG.info('Listening on *:%s', port)
def __iter__(self):
try:
while True:
# Wait for next request from client
message = self.socket.recv_json()
LOG.debug('Received request: %s', message)
def reply_handler(reply_message):
self.socket.send_json(reply_message)
try:
yield message, reply_handler
except GeneratorExit:
break
except BaseException as e:
if not isinstance(e, KeyboardInterrupt):
LOG.exception(e)
raise
def read_scenario(): def read_scenario():
scenario_raw = utils.read_file(cfg.CONF.scenario) scenario_raw = utils.read_file(cfg.CONF.scenario)
scenario = yaml.safe_load(scenario_raw) scenario = yaml.safe_load(scenario_raw)
@ -159,14 +132,9 @@ def _pick_agents(agents, size):
yield agents[:i] yield agents[:i]
def execute(execution, agents): def execute(quorum, execution, agents):
_extend_agents(agents) _extend_agents(agents)
message_queue = MessageQueue(cfg.CONF.server_endpoint)
quorum = Quorum(message_queue)
quorum.wait_join(set(agents.keys()))
result = [] result = []
for test in execution['tests']: for test in execution['tests']:
@ -209,15 +177,15 @@ def main():
result = [] result = []
try: try:
deployment = deploy.Deployment(cfg.CONF.os_username, deployment = deploy.Deployment(cfg.CONF.server_endpoint)
cfg.CONF.os_password,
cfg.CONF.os_tenant_name, if (cfg.CONF.os_username and cfg.CONF.os_password and
cfg.CONF.os_auth_url, cfg.CONF.os_tenant_name and cfg.CONF.os_auth_url):
cfg.CONF.os_region_name, deployment.connect_to_openstack(
cfg.CONF.server_endpoint, cfg.CONF.os_username, cfg.CONF.os_password,
cfg.CONF.external_net, cfg.CONF.os_tenant_name, cfg.CONF.os_auth_url,
cfg.CONF.flavor_name, cfg.CONF.os_region_name, cfg.CONF.external_net,
cfg.CONF.image_name) cfg.CONF.flavor_name, cfg.CONF.image_name)
agents = deployment.deploy(scenario['deployment'], agents = deployment.deploy(scenario['deployment'],
base_dir=os.path.dirname(cfg.CONF.scenario)) base_dir=os.path.dirname(cfg.CONF.scenario))
@ -226,7 +194,12 @@ def main():
if not agents: if not agents:
LOG.warning('No agents deployed.') LOG.warning('No agents deployed.')
else: else:
result = execute(scenario['execution'], agents) message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint)
quorum = Quorum(message_queue, cfg.CONF.polling_interval)
quorum.wait_join(set(agents.keys()))
result = execute(quorum, scenario['execution'], agents)
LOG.debug('Result: %s', result) LOG.debug('Result: %s', result)
except Exception as e: except Exception as e:
LOG.error('Error while executing scenario: %s', e) LOG.error('Error while executing scenario: %s', e)

56
shaker/lib.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from shaker.engine import messaging
from shaker.engine import server
LOG = logging.getLogger(__name__)
class Shaker(object):
"""How to use Shaker as library
shaker = Shaker('127.0.0.1:5999', ['the-agent'])
res = shaker.run_program('the-agent', 'ls -al')
"""
def __init__(self, server_endpoint, agent_ids, polling_interval=1):
self.server_endpoint = server_endpoint
self.polling_interval = polling_interval
message_queue = messaging.MessageQueue(self.server_endpoint)
self.quorum = server.Quorum(message_queue, self.polling_interval)
self.quorum.wait_join(agent_ids)
def _run(self, agent_id, item):
agents = dict([(agent_id, dict(id=agent_id, mode='alone'))])
test = {'class': 'shell'}
test.update(item)
execution = {'tests': [test]}
execution_result = server.execute(self.quorum, execution, agents)
results_per_iteration = execution_result[0]['results_per_iteration']
results_per_agent = results_per_iteration[0]['results_per_agent']
return dict((s['agent']['id'], s) for s in results_per_agent)
def run_program(self, agent_id, program):
return self._run(agent_id, {'program': program})
def run_script(self, agent_id, script):
return self._run(agent_id, {'script': script})