novajoin-tempest-plugin/novajoin_tempest_plugin/tests/scenario/test_tripleo_tls.py

192 lines
7.8 KiB
Python

# Copyright (c) 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from novajoin_tempest_plugin.tests.scenario import novajoin_manager
from oslo_log import log as logging
from tempest import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
TLS_EXCEPTIONS = []
NOVADB_USER = 'nova::db::mysql::user'
NOVADB_HOST = 'nova::db::mysql::host'
NOVADB_PASSWORD = 'nova::db::mysql::password'
class TripleOTLSTest(novajoin_manager.NovajoinScenarioTest):
"""The test suite for tripleO configuration
Novajoin is currently deployed in tripleO as part of the
undercloud as part of a tripleO deployment.
This test is to validate that all the nodes and services
for an HA deployment have been correctly created.
This means:
* Validate that all haproxy services can be connected
using openssl client (tls)
* Validate rabbitmq can be connected using TLS.
"""
@classmethod
def skip_checks(cls):
super(TripleOTLSTest, cls).skip_checks()
if not CONF.novajoin.tripleo:
raise cls.skipException('Tripleo configuration is not enabled')
def get_haproxy_cfg_1(self, hostip):
print(hostip)
return "/home/{user}/haproxy.cfg".format(
user=CONF.novajoin.tripleo_undercloud_user)
def parse_haproxy_cfg(self, haproxy_data):
content = haproxy_data.splitlines()
content = [x.strip() for x in content]
params = []
services = {}
service_tag = None
for x in content:
if x.startswith('listen'):
service_tag = re.search('listen (.*)', x).group(1)
params = []
if service_tag is not None:
if x.startswith('bind'):
params.append(x)
if x.startswith('server'):
params.append(x)
services[service_tag] = params
return services
def test_haproxy_tls_connections(self):
for controller in CONF.novajoin.tripleo_controllers:
controller_ip = self.get_overcloud_server_ip(controller)
haproxy = self.get_haproxy_cfg(self.get_ssh_user(), controller_ip)
services = self.parse_haproxy_cfg(haproxy)
for tag, params in services.items():
print("*** Testing {service}".format(service=tag))
for param in params:
print(param)
hostport = self.get_hostport(param)
host_ip = re.search('(\S*):\d*', hostport).group(1)
port = re.search('\S*:(\d*)', hostport).group(1)
if "ssl" not in param:
if (tag, port) in TLS_EXCEPTIONS:
print("Exception: {p}".format(p=param))
continue
self.assertTrue("ssl" in param)
if tag == 'haproxy.stats':
# haproxy.stats is supposed to be accessible
# only to localhost - ie. the controller that
# contains the vip
vip_node = self.get_pcs_node(
host_ip,
controller_ip,
self.get_ssh_user(),
hostport)
print("vip_node={vip_node}".format(vip_node=vip_node))
if controller != vip_node:
print("Stats VIP not on controller: {ctl}".format(
ctl=controller))
continue
self.verify_overcloud_tls_connection(
controller_ip=controller_ip,
user=self.get_ssh_user(),
hostport=hostport
)
def get_hostport(self, param):
if param.startswith("bind"):
return re.search('bind (\S*) .*', param).group(1)
if param.startswith('server'):
return re.search('server (\S*) (\S*) .*', param).group(2)
def test_rabbitmq_tls_connection(self):
for controller in CONF.novajoin.tripleo_controllers:
controller_ip = self.get_overcloud_server_ip(controller)
rabbitmq_host = self.get_rabbitmq_host(self.get_ssh_user(),
controller_ip)
rabbitmq_port = self.get_rabbitmq_port(self.get_ssh_user(),
controller_ip)
self.verify_overcloud_tls_connection(
controller_ip=controller_ip,
user=self.get_ssh_user(),
hostport="{host}:{port}".format(host=rabbitmq_host,
port=rabbitmq_port)
)
def test_libvirt_tls_connection(self):
for compute in CONF.novajoin.tripleo_computes:
compute_ip = self.get_overcloud_server_ip(compute)
libvirt_port = self.get_libvirt_port(
self.get_ssh_user(),
compute_ip)
# TODO(alee) Is the host correct?
self.verify_overcloud_tls_connection(
controller_ip=compute_ip,
user=self.get_ssh_user(),
hostport="{host}.internalapi.{domain}:{port}".format(
host=compute,
domain=self.ipa_client.domain,
port=libvirt_port
)
)
def test_mysql_nova_connection_with_ssl(self):
for controller in CONF.novajoin.tripleo_controllers:
controller_ip = self.get_overcloud_server_ip(controller)
dbuser = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_USER)
dbhost = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_HOST)
dbpassword = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_PASSWORD)
self.verify_mysql_access_with_ssl(self.get_ssh_user(),
controller_ip,
dbuser,
dbhost,
dbpassword)
def test_mysql_nova_connection_without_ssl(self):
for controller in CONF.novajoin.tripleo_controllers:
controller_ip = self.get_overcloud_server_ip(controller)
dbuser = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_USER)
dbhost = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_HOST)
dbpassword = self.get_hiera(self.get_ssh_user(),
controller_ip,
NOVADB_PASSWORD)
self.verify_mysql_access_without_ssl(self.get_ssh_user(),
controller_ip,
dbuser,
dbhost,
dbpassword)