Add additional test for verification of rabbitmq
Next checks were added: * Check crm status for rabbit resources * Check Master is live * list channels * create connection to each member of the qurum * create queue * publish message * consume message and assert it content Change-Id: Ie28f541acb14d1afac1a8463aa35a606a7c5f329 Implements: blueprint ostf-rabbit-replication-tests
This commit is contained in:
parent
5cc59010fd
commit
8bdde45378
|
@ -1,150 +0,0 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import fuel_health.common.ssh
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RabbitClient(object):
|
||||
def __init__(self, host, username, key, timeout,
|
||||
rabbit_username='nova', rabbit_password=None):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.key_file = key
|
||||
self.timeout = timeout
|
||||
self.rabbit_user = rabbit_username
|
||||
self.rabbit_password = rabbit_password
|
||||
|
||||
self.ssh = fuel_health.common.ssh.Client(
|
||||
host=self.host,
|
||||
username=self.username,
|
||||
key_filename=self.key_file,
|
||||
timeout=self.timeout)
|
||||
|
||||
def list_nodes(self):
|
||||
output = self.ssh.exec_command("rabbitmqctl cluster_status")
|
||||
substring_ind = output.find('{running_nodes')
|
||||
sub_end_ind = output.find('cluster_name')
|
||||
result_str = output[substring_ind: sub_end_ind]
|
||||
num_node = result_str.count("rabbit@")
|
||||
return num_node
|
||||
|
||||
def list_queues(self):
|
||||
query = self._query('queues?"columns=name&sort=name"', header=False)
|
||||
return self._execute(query)
|
||||
|
||||
def create_queue(self, queue_name):
|
||||
query = self._query(
|
||||
query='queues/%2f/{queue_name}'.format(queue_name=queue_name),
|
||||
type='-XPUT',
|
||||
arguments='-d \'{}\''
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def delete_queue(self, queue_name):
|
||||
query = self._query(
|
||||
query='queues/%2f/{queue_name}'.format(queue_name=queue_name),
|
||||
type='-XDELETE'
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def create_exchange(self, exchange_name):
|
||||
query = self._query(
|
||||
query='exchanges/%2f/{name}'.format(name=exchange_name),
|
||||
type='-XPUT',
|
||||
arguments='-d \'{"type":"direct"}\''
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def delete_exchange(self, exchange_name):
|
||||
query = self._query(
|
||||
query='exchanges/%2f/{name}'.format(name=exchange_name),
|
||||
type='-XDELETE'
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def create_binding(self, exchange_name, queue_name, binding_name):
|
||||
query = self._query(
|
||||
query='bindings/%2f/e/{ename}/q/{qname}/{name}'.format(
|
||||
ename=exchange_name,
|
||||
qname=queue_name,
|
||||
name=binding_name
|
||||
),
|
||||
type='-XPUT'
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def delete_binding(self, exchange_name, queue_name, binding_name):
|
||||
query = self._query(
|
||||
query='bindings/%2f/e/{ename}/q/{qname}/{name}'.format(
|
||||
ename=exchange_name,
|
||||
qname=queue_name,
|
||||
name=binding_name
|
||||
),
|
||||
type='-XDELETE'
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def publish_message(self, message, exchange, binding):
|
||||
query = self._query(
|
||||
query='exchanges/%2f/{ename}/publish'.format(ename=exchange),
|
||||
type='-XPOST',
|
||||
arguments='-d \'{{"properties":{{}},"routing_key":"{bname}",'
|
||||
'"payload":"{msg}","payload_encoding":"string"}}\''.
|
||||
format(
|
||||
bname=binding,
|
||||
msg=message
|
||||
)
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def get_message(self, queue):
|
||||
query = self._query(
|
||||
query='queues/%2f/{qname}/get'.format(qname=queue),
|
||||
type='-XPOST',
|
||||
arguments='-d \'{"count":1,"requeue":false,"encoding":"auto"}\''
|
||||
)
|
||||
return self._execute(query)
|
||||
|
||||
def _query(self, query, header=True, type='-XGET', arguments=''):
|
||||
start = (header and 'curl -i') or 'curl'
|
||||
return '{start} -u {ruser}:{rpass} -H "content-type:application/json"'\
|
||||
' {type} {args} http://localhost:55672/api/{query}'.\
|
||||
format(
|
||||
start=start,
|
||||
ruser=self.rabbit_user,
|
||||
rpass=self.rabbit_password,
|
||||
type=type, query=query,
|
||||
args=arguments
|
||||
)
|
||||
|
||||
def _execute(self, query, times=5):
|
||||
exception = None
|
||||
for i in range(times + 1):
|
||||
try:
|
||||
result = self.ssh.exec_command(query)
|
||||
if result:
|
||||
return result
|
||||
time.sleep(1)
|
||||
except Exception as exc:
|
||||
exception = exc
|
||||
LOG.debug(traceback.format_exc())
|
||||
raise exception
|
|
@ -19,6 +19,8 @@ import itertools
|
|||
import random
|
||||
import re
|
||||
import urllib
|
||||
import uuid
|
||||
|
||||
|
||||
from fuel_health import exceptions
|
||||
|
||||
|
@ -69,3 +71,7 @@ def arbitrary_string(size=4, base_text=None):
|
|||
if not base_text:
|
||||
base_text = 'ost1_test-'
|
||||
return ''.join(itertools.islice(itertools.cycle(base_text), size))
|
||||
|
||||
|
||||
def generate_uuid():
|
||||
return uuid.uuid4().hex
|
||||
|
|
|
@ -186,7 +186,6 @@ def register_compute_opts(conf):
|
|||
for opt in ComputeGroup:
|
||||
conf.register_opt(opt, group='compute')
|
||||
|
||||
|
||||
image_group = cfg.OptGroup(name='image',
|
||||
title="Image Service Options")
|
||||
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import kombu
|
||||
from kombu import Connection
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import fuel_health
|
||||
from fuel_health.common import ssh
|
||||
from fuel_health.common.utils import data_utils
|
||||
from fuel_health.test import BaseTestCase
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RabbitSanityClass(BaseTestCase):
|
||||
"""TestClass contains RabbitMQ sanity checks."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.config = fuel_health.config.FuelConfig()
|
||||
cls._controllers = cls.config.compute.online_controllers
|
||||
cls._usr = cls.config.compute.controller_node_ssh_user
|
||||
cls._pwd = cls.config.compute.controller_node_ssh_password
|
||||
cls._key = cls.config.compute.path_to_private_key
|
||||
cls._ssh_timeout = cls.config.compute.ssh_timeout
|
||||
cls.connections = []
|
||||
cls.ids = []
|
||||
cls.queues = []
|
||||
cls.data = []
|
||||
|
||||
def get_ssh_connection_to_controller(self, controller):
|
||||
remote = ssh.Client(host=controller,
|
||||
username=self._usr,
|
||||
password=self._pwd,
|
||||
key_filename=self._key,
|
||||
timeout=self._ssh_timeout)
|
||||
return remote
|
||||
|
||||
def list_nodes(self):
|
||||
if not self._controllers:
|
||||
self.fail('There are no online controllers')
|
||||
remote = self.get_ssh_connection_to_controller(self._controllers[0])
|
||||
output = remote.exec_command("rabbitmqctl cluster_status")
|
||||
substring_ind = output.find('{running_nodes')
|
||||
sub_end_ind = output.find('cluster_name')
|
||||
result_str = output[substring_ind: sub_end_ind]
|
||||
num_node = result_str.count("rabbit@")
|
||||
return num_node
|
||||
|
||||
def pick_rabbit_master(self):
|
||||
if not self._controllers:
|
||||
self.fail('There are no online controllers')
|
||||
remote = self.get_ssh_connection_to_controller(self._controllers[0])
|
||||
LOG.info('ssh session to node {0} was open'.format(
|
||||
self._controllers[0]))
|
||||
LOG.info('Try to execute command <crm resource '
|
||||
'status master_p_rabbitmq-server>')
|
||||
output = remote.exec_command(
|
||||
"crm resource status master_p_rabbitmq-server")
|
||||
LOG.debug('Output is {0}'.format(output))
|
||||
substring_ind = output.find(
|
||||
'resource master_p_rabbitmq-server is running on:')
|
||||
sub_end_ind = output.find('Master')
|
||||
LOG.debug('Start index is {0} end'
|
||||
' index is {1}'.format(substring_ind, sub_end_ind))
|
||||
result_str = output[substring_ind: sub_end_ind]
|
||||
LOG.debug('Result string is {0}'.format(result_str))
|
||||
return result_str
|
||||
|
||||
def list_channels(self):
|
||||
if not self._controllers:
|
||||
self.fail('There are no online controllers')
|
||||
remote = self.get_ssh_connection_to_controller(self._controllers[0])
|
||||
output = remote.exec_command("rabbitmqctl list_channels")
|
||||
if 'done' not in output:
|
||||
self.fail('Get channels list command fail.')
|
||||
else:
|
||||
LOG.debug('Result of executing command rabbitmqctl'
|
||||
' list_channels is {0}'.format(output))
|
||||
return output
|
||||
|
||||
def get_conf_values(self, variable="rabbit_password",
|
||||
sections="DEFAULT",
|
||||
conf_path="/etc/nova/nova.conf"):
|
||||
cmd = ("python -c 'import ConfigParser; "
|
||||
"cfg=ConfigParser.ConfigParser(); "
|
||||
"cfg.readfp(open('\"'{0}'\"')); "
|
||||
"print cfg.get('\"'{1}'\"', '\"'{2}'\"')'")
|
||||
LOG.debug("Try to execute cmd {0}".format(cmd))
|
||||
remote = self.get_ssh_connection_to_controller(self._controllers[0])
|
||||
try:
|
||||
res = remote.exec_command(cmd.format(
|
||||
conf_path, sections, variable))
|
||||
LOG.debug("result is {0}".format(res))
|
||||
return res
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail("Fail to get data from config")
|
||||
|
||||
def check_rabbit_connections(self):
|
||||
if not self._controllers:
|
||||
self.fail('There are no online controllers')
|
||||
pwd = self.get_conf_values().strip()
|
||||
for host in self._controllers:
|
||||
try:
|
||||
conn = Connection(host, userid='nova',
|
||||
password=pwd,
|
||||
virtual_host='/', port=5673)
|
||||
conn.connect()
|
||||
|
||||
channel = conn.channel()
|
||||
self.connections.append((channel, host))
|
||||
LOG.debug('connections is {0}'.format(self.connections))
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail("Failed to connect to "
|
||||
"5673 port on host {0}".format(host))
|
||||
|
||||
def create_queue(self):
|
||||
for channel, host in self.connections:
|
||||
test_queue = data_utils.rand_name() + data_utils.generate_uuid()
|
||||
queue = kombu.Queue(
|
||||
'test-rabbit-{0}-{1}'.format(test_queue, host),
|
||||
channel=channel,
|
||||
durable=False,
|
||||
queue_arguments={'x-expires': 15 * 60 * 1000})
|
||||
try:
|
||||
LOG.debug("Declaring queue {0} on host {1}".format(
|
||||
queue.name, host))
|
||||
queue.declare()
|
||||
self.data.append((channel, host, queue))
|
||||
self.queues.append(queue)
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail("Failed to declare queue on host {0}".format(host))
|
||||
|
||||
def publish_message(self):
|
||||
for channel, host, queue in self.data:
|
||||
self.ids.append(data_utils.generate_uuid())
|
||||
try:
|
||||
LOG.debug('Try to publish message {0}'.format(queue.name))
|
||||
producer = kombu.Producer(
|
||||
channel=channel, routing_key=queue.name)
|
||||
for msg_id in self.ids:
|
||||
producer.publish(msg_id)
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail("failed to publish message")
|
||||
|
||||
def check_queue_message_replication(self):
|
||||
for channel, host, queue in self.data:
|
||||
rec_queue = kombu.Queue(queue.name, channel=channel)
|
||||
try:
|
||||
msg = None
|
||||
for i in range(10):
|
||||
LOG.debug('messages ids are {0}'.format(self.ids))
|
||||
msg = rec_queue.get(True)
|
||||
LOG.debug('Message is {0}'.format(msg.body))
|
||||
if msg is None:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
if msg is None:
|
||||
self.fail("No message received")
|
||||
elif msg.body not in self.ids:
|
||||
self.fail('received incorrect message {0}'
|
||||
' in ids {1}'.format(msg.body, self.ids))
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail('Failed to check message replication')
|
||||
|
||||
def delete_queue(self):
|
||||
LOG.debug('Try to deleting queue {0}... '.format(self.queues))
|
||||
if self.queues:
|
||||
try:
|
||||
self.ids = []
|
||||
[queue.delete() and self.queues.remove(queue)
|
||||
for queue in self.queues]
|
||||
except Exception:
|
||||
LOG.debug(traceback.format_exc())
|
||||
self.fail('Failed to delete queue')
|
|
@ -13,45 +13,24 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import fuel_health
|
||||
import fuel_health.common.amqp_client
|
||||
import fuel_health.common.utils.data_utils
|
||||
from fuel_health.test import BaseTestCase
|
||||
from fuel_health import ha_base
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RabbitSmokeTest(BaseTestCase):
|
||||
class RabbitSanityTest(ha_base.RabbitSanityClass):
|
||||
"""TestClass contains RabbitMQ test checks."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.config = fuel_health.config.FuelConfig()
|
||||
cls._controllers = cls.config.compute.online_controllers
|
||||
cls._usr = cls.config.compute.controller_node_ssh_user
|
||||
cls._pwd = cls.config.compute.controller_node_ssh_password
|
||||
cls._key = cls.config.compute.path_to_private_key
|
||||
cls._ssh_timeout = cls.config.compute.ssh_timeout
|
||||
cls.amqp_pwd = cls.config.compute.amqp_pwd
|
||||
cls.amqp_clients = [fuel_health.common.amqp_client.RabbitClient(
|
||||
cnt,
|
||||
cls._usr,
|
||||
cls._key,
|
||||
cls._ssh_timeout,
|
||||
rabbit_username='nova',
|
||||
rabbit_password=cls.amqp_pwd) for cnt in cls._controllers]
|
||||
|
||||
def setUp(self):
|
||||
super(RabbitSmokeTest, self).setUp()
|
||||
super(RabbitSanityTest, self).setUp()
|
||||
if 'ha' not in self.config.mode:
|
||||
self.skipTest("It is not HA configuration")
|
||||
if not self._controllers:
|
||||
self.skipTest('There are no controller nodes')
|
||||
if not self.amqp_clients:
|
||||
self.skipTest('Cannot create AMQP clients for controllers')
|
||||
|
||||
def test_001_rabbitmqctl_status(self):
|
||||
"""Check RabbitMQ is available
|
||||
|
@ -59,31 +38,85 @@ class RabbitSmokeTest(BaseTestCase):
|
|||
Scenario:
|
||||
1. Retrieve cluster status for each controller.
|
||||
2. Check that numbers of rabbit nodes is the same as controllers.
|
||||
3. Check crm status for rabbit
|
||||
4. List channels
|
||||
Duration: 100 s.
|
||||
Deployment tags: CENTOS
|
||||
Deployment tags: CENTOS, 2014.2-6.1
|
||||
"""
|
||||
self.verify(10, self.amqp_clients[0].list_nodes, 1,
|
||||
'Cannot retrieve cluster nodes'
|
||||
' list for {ctlr} controller.'.format(
|
||||
ctlr=self.amqp_clients[0].host))
|
||||
self.verify(10, self.list_nodes, 1,
|
||||
'Cannot retrieve cluster nodes')
|
||||
|
||||
if len(self._controllers) != self.amqp_clients[0].list_nodes():
|
||||
if len(self._controllers) != self.list_nodes():
|
||||
self.fail('Step 2 failed: Number of controllers is not equal to '
|
||||
'number of cluster nodes.')
|
||||
|
||||
res = self.verify(10, self.pick_rabbit_master, 3,
|
||||
'Cannot retrieve crm status')
|
||||
|
||||
LOG.debug("Current res is {0}".format(res))
|
||||
|
||||
if not res:
|
||||
LOG.debug("Current res is {0}".format(res))
|
||||
self.fail('Step 3 failed: Rabbit Master node is not running.')
|
||||
|
||||
fail_msg_4 = 'Can not get rabbit channel list in 20 second.'
|
||||
|
||||
self.verify(20, self.list_channels, 4, fail_msg_4,
|
||||
'Can not retrieve channels list')
|
||||
|
||||
def test_002_rabbitmqctl_status_ubuntu(self):
|
||||
"""RabbitMQ availability
|
||||
Scenario:
|
||||
1. Retrieve cluster status for each controller.
|
||||
2. Check that numbers of rabbit nodes is the same as controllers.
|
||||
3. Check crm status for rabbit
|
||||
4. List channels
|
||||
Duration: 100 s.
|
||||
Deployment tags: Ubuntu
|
||||
Deployment tags: Ubuntu, 2014.2-6.1
|
||||
"""
|
||||
self.verify(10, self.amqp_clients[0].list_nodes, 1,
|
||||
'Cannot retrieve cluster nodes'
|
||||
' list for {ctlr} controller.'.format(
|
||||
ctlr=self.amqp_clients[0].host))
|
||||
self.verify(10, self.list_nodes, 1, 'Cannot retrieve cluster nodes')
|
||||
|
||||
if len(self._controllers) != self.amqp_clients[0].list_nodes():
|
||||
if len(self._controllers) != self.list_nodes():
|
||||
self.fail('Step 2 failed: Number of controllers is not equal to '
|
||||
'number of cluster nodes.')
|
||||
|
||||
res = self.verify(10, self.pick_rabbit_master, 3,
|
||||
'Cannot retrieve crm status')
|
||||
|
||||
LOG.debug("Current res is {0}".format(res))
|
||||
|
||||
if not res:
|
||||
LOG.debug("Current res is {0}".format(res))
|
||||
self.fail('Step 3 failed: Rabbit Master node is not running.')
|
||||
|
||||
fail_msg_4 = 'Can not get rabbit channel list in 20 second.'
|
||||
|
||||
self.verify(20, self.list_channels, 4, fail_msg_4,
|
||||
'Can not retrieve channels list')
|
||||
|
||||
def test_003_rabbitmqctl_replication(self):
|
||||
"""RabbitMQ replication
|
||||
Scenario:
|
||||
1. Check rabbitmq connections.
|
||||
2. Create queue.
|
||||
3. Publish test message in created queue
|
||||
4. Request created queue and message
|
||||
5. Delete queue
|
||||
Duration: 100 s.
|
||||
Available since release: 2014.2-6.1
|
||||
Deployment tags: 2014.2-6.1
|
||||
"""
|
||||
self.verify(40, self.check_rabbit_connections, 1,
|
||||
'Cannot retrieve cluster nodes')
|
||||
|
||||
self.verify(60, self.create_queue, 2,
|
||||
'Failed to create queue')
|
||||
|
||||
self.verify(40, self.publish_message, 3,
|
||||
'Failed to publish message')
|
||||
|
||||
self.verify(40, self.check_queue_message_replication, 4,
|
||||
'Consume of message failed')
|
||||
|
||||
self.verify(40, self.delete_queue, 5,
|
||||
'Failed to delete queue')
|
||||
|
|
|
@ -15,8 +15,12 @@ testresources>=0.2.7
|
|||
nose>=1.3.0
|
||||
SQLAlchemy>=0.7.8,<=0.9.99
|
||||
alembic>=0.5.0
|
||||
amqp
|
||||
anyjson
|
||||
gevent==0.13.8
|
||||
importlib
|
||||
keystonemiddleware>=1.2.0
|
||||
kombu
|
||||
pecan>=0.3.0,<0.6.0
|
||||
psycopg2>=2.5.1
|
||||
stevedore>=0.10
|
||||
keystonemiddleware>=1.2.0
|
||||
|
|
Loading…
Reference in New Issue