fuel-web/nailgun/nailgun/test/base.py

2076 lines
72 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment
from unittest2.case import TestCase
import copy
import mock
import os
import re
import six
from six.moves import range
import uuid
from datetime import datetime
import functools
from itertools import izip
from netaddr import IPNetwork
from random import randint
from oslo_serialization import jsonutils
import sqlalchemy as sa
import web
from webtest import app
import nailgun
from nailgun import consts
from nailgun import errors
from nailgun.settings import settings
from nailgun.db import db
from nailgun.db import flush
from nailgun.db import syncdb
from nailgun.logger import logger
from nailgun.db.sqlalchemy.fixman import load_fake_deployment_tasks
from nailgun.db.sqlalchemy.fixman import load_fixture
from nailgun.db.sqlalchemy.fixman import upload_fixture
from nailgun.db.sqlalchemy.models import ClusterPluginLink
from nailgun.db.sqlalchemy.models import IPAddr
from nailgun.db.sqlalchemy.models import NodeNICInterface
from nailgun.db.sqlalchemy.models import Notification
from nailgun.db.sqlalchemy.models import PluginLink
from nailgun.db.sqlalchemy.models import Release as ReleaseModel
from nailgun.db.sqlalchemy.models import Task
# here come objects
from nailgun.objects import Cluster
from nailgun.objects import ClusterPlugin
from nailgun.objects import MasterNodeSettings
from nailgun.objects import NetworkGroup
from nailgun.objects import Node
from nailgun.objects import NodeGroup
from nailgun.objects import OpenstackConfig
from nailgun.objects import Plugin
from nailgun.objects import Release
from nailgun.app import build_app
from nailgun.consts import NETWORK_INTERFACE_TYPES
from nailgun.extensions.network_manager.manager import NetworkManager
from nailgun.extensions.network_manager.template import NetworkTemplate
from nailgun.middleware.connection_monitor import ConnectionMonitorMiddleware
from nailgun.middleware.keystone import NailgunFakeKeystoneAuthMiddleware
from nailgun.utils import dict_merge
from nailgun.utils import is_feature_supported
from nailgun.utils import reverse
class TimeoutError(Exception):
pass
def test_db_driver(handler):
try:
return handler()
except web.HTTPError:
if str(web.ctx.status).startswith(("4", "5")):
db.rollback()
raise
except Exception:
db.rollback()
raise
finally:
db.commit()
# we do not remove session in tests
class EnvironmentManager(object):
_regex_type = type(re.compile("regex"))
def __init__(self, app, session=None):
self.db = session or db()
self.app = app
self.tester = TestCase
self.tester.runTest = lambda a: None
self.tester = self.tester()
self.here = os.path.abspath(os.path.dirname(__file__))
self.fixture_dir = os.path.join(self.here, "..", "fixtures")
self.default_headers = {
"Content-Type": "application/json"
}
self.releases = []
self.clusters = []
self.nodes = []
self.plugins = []
self.openstack_configs = []
self.network_manager = NetworkManager
def create(self, **kwargs):
release_data = kwargs.pop('release_kwargs', {"api": False})
cluster_data = kwargs.pop('cluster_kwargs', {})
if 'release_id' not in cluster_data:
cluster_data['release_id'] = self.create_release(**release_data).id
cluster = self.create_cluster(
**cluster_data
)
for node_kwargs in kwargs.pop('nodes_kwargs', []):
if "cluster_id" not in node_kwargs:
node_kwargs["cluster_id"] = cluster.id
node_kwargs.setdefault("api", False)
if "pending_roles" not in node_kwargs:
node_kwargs.setdefault("roles", ["controller"])
self.create_node(
**node_kwargs
)
return cluster
def create_release(self, api=False, expect_errors=False, **kwargs):
os = kwargs.get(
'operating_system', consts.RELEASE_OS.centos)
version = kwargs.get(
'version', '{0}-6.1'.format(randint(0, 100000000)))
# NOTE(ikalnitsky): In order to do not read each time openstack.yaml
# we're reading it once and then look for needed release.
releases = self.read_fixtures(('openstack',))
release_data = next((
r for r in releases if r['fields']['operating_system'] == os),
releases[0])
release_data = release_data['fields']
release_data.update({
'name': u"release_name_" + version,
'version': version,
'state': consts.RELEASE_STATES.available,
'description': u"release_desc" + version,
})
if kwargs.get('deployment_tasks') is None:
kwargs['deployment_tasks'] = \
load_fake_deployment_tasks(apply_to_db=False)
_patch_tags_legacy(release_data, version)
release_data.update(kwargs)
if api:
resp = self.app.post(
reverse('ReleaseCollectionHandler'),
params=jsonutils.dumps(release_data),
headers=self.default_headers,
expect_errors=expect_errors
)
release = resp.json_body
if not expect_errors:
self.releases.append(
self.db.query(ReleaseModel).get(release['id'])
)
else:
release = Release.create(release_data)
db().commit()
self.releases.append(release)
return release
def create_openstack_config(self, api=False, **kwargs):
if api:
resp = self.app.post(
reverse('OpenstackConfigCollectionHandler'),
params=jsonutils.dumps(kwargs),
headers=self.default_headers
)
self.tester.assertEqual(resp.status_code, 201)
config = resp.json_body
self.openstack_configs.append(
self.db.query(OpenstackConfig).get(config['id']))
else:
config = OpenstackConfig.create(kwargs)
db().flush()
self.openstack_configs.append(config)
return config
def get_all_roles(self, obj_type, obj_id, expect_errors=False):
return self.app.get(
reverse(
'RoleCollectionHandler',
kwargs={'obj_id': obj_id, 'obj_type': obj_type}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def get_role(self, obj_type, obj_id, role_name, expect_errors=False):
return self.app.get(
reverse(
'RoleHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'role_name': role_name}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def update_role(self, obj_type, obj_id, role_name, data,
expect_errors=False):
return self.app.put(
reverse(
'RoleHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'role_name': role_name}
),
jsonutils.dumps(data),
headers=self.default_headers,
expect_errors=expect_errors
)
def delete_role(self, obj_type, obj_id, role_name, expect_errors=False):
return self.app.delete(
reverse(
'RoleHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'role_name': role_name}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def create_role(self, obj_type, obj_id, data, expect_errors=False):
return self.app.post(
reverse(
'RoleCollectionHandler',
kwargs={'obj_id': obj_id, 'obj_type': obj_type}
),
jsonutils.dumps(data),
headers=self.default_headers,
expect_errors=expect_errors
)
def get_all_tags(self, obj_type, obj_id, expect_errors=False):
return self.app.get(
reverse(
'TagCollectionHandler',
kwargs={'obj_id': obj_id, 'obj_type': obj_type}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def get_tag(self, obj_type, obj_id, tag_name, expect_errors=False):
return self.app.get(
reverse(
'TagHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'tag_name': tag_name}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def update_tag(self, obj_type, obj_id, tag_name, data,
expect_errors=False):
return self.app.put(
reverse(
'TagHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'tag_name': tag_name}
),
jsonutils.dumps(data),
headers=self.default_headers,
expect_errors=expect_errors
)
def delete_tag(self, obj_type, obj_id, tag_name, expect_errors=False):
return self.app.delete(
reverse(
'TagHandler',
kwargs={'obj_id': obj_id,
'obj_type': obj_type,
'tag_name': tag_name}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def create_tag(self, obj_type, obj_id, data, expect_errors=False):
return self.app.post(
reverse(
'TagCollectionHandler',
kwargs={'obj_id': obj_id, 'obj_type': obj_type}
),
jsonutils.dumps(data),
headers=self.default_headers,
expect_errors=expect_errors
)
def create_cluster(self, api=True, exclude=None, **kwargs):
cluster_data = {
'name': 'cluster-api-' + str(randint(0, 1000000)),
}
editable_attributes = kwargs.pop('editable_attributes', None)
vmware_attributes = kwargs.pop('vmware_attributes', None)
if kwargs:
cluster_data.update(kwargs)
if 'release_id' not in cluster_data:
cluster_data['release_id'] = self.create_release(api=False).id
if exclude and isinstance(exclude, list):
for ex in exclude:
try:
del cluster_data[ex]
except KeyError as err:
logger.warning(err)
if api:
resp = self.app.post(
reverse('ClusterCollectionHandler'),
jsonutils.dumps(cluster_data),
headers=self.default_headers,
expect_errors=True
)
self.tester.assertEqual(resp.status_code, 201, resp.body)
cluster = resp.json_body
cluster_db = Cluster.get_by_uid(cluster['id'])
else:
cluster = Cluster.create(cluster_data)
cluster_db = cluster
db().commit()
self.clusters.append(cluster_db)
if editable_attributes:
Cluster.patch_attributes(cluster_db,
{'editable': editable_attributes})
if vmware_attributes:
Cluster.update_vmware_attributes(cluster_db, vmware_attributes)
return cluster_db
def create_node(
self, api=False,
exclude=None, expect_http=201,
expected_error=None,
**kwargs):
# TODO(alekseyk) Simplify 'interfaces' and 'mac' manipulation logic
metadata = kwargs.get('meta', {})
default_metadata = self.default_metadata()
default_metadata.update(metadata)
mac = kwargs.get('mac', self.generate_random_mac())
if default_metadata['interfaces']:
if not metadata or 'interfaces' not in metadata:
default_metadata['interfaces'][0]['mac'] = mac
default_metadata['interfaces'][0]['pxe'] = True
for iface in default_metadata['interfaces'][1:]:
if 'mac' in iface:
iface['mac'] = self.generate_random_mac()
else:
for iface in default_metadata['interfaces']:
if iface.get('pxe'):
if not iface.get('mac'):
iface['mac'] = mac
elif 'mac' not in kwargs:
mac = iface['mac']
if iface.get('mac') == mac:
break
else:
default_metadata['interfaces'][0]['mac'] = mac
default_metadata['interfaces'][0]['pxe'] = True
node_data = {
'mac': mac,
'status': 'discover',
'ip': '10.20.0.130',
'meta': default_metadata
}
if kwargs:
meta = kwargs.pop('meta', None)
node_data.update(kwargs)
if meta:
kwargs['meta'] = meta
if exclude and isinstance(exclude, list):
for ex in exclude:
try:
del node_data[ex]
except KeyError as err:
logger.warning(err)
if api:
resp = self.app.post(
reverse('NodeCollectionHandler'),
jsonutils.dumps(node_data),
headers=self.default_headers,
expect_errors=True
)
self.tester.assertEqual(resp.status_code, expect_http, resp.body)
if expected_error:
self.tester.assertEqual(
resp.json_body["message"],
expected_error
)
if str(expect_http)[0] != "2":
return None
self.tester.assertEqual(resp.status_code, expect_http)
node = resp.json_body
node_db = Node.get_by_uid(node['id'])
if 'interfaces' not in node_data['meta'] \
or not node_data['meta']['interfaces']:
self._set_interfaces_if_not_set_in_meta(
node_db.id,
kwargs.get('meta', None))
self.nodes.append(node_db)
else:
node = Node.create(node_data)
db().commit()
self.nodes.append(node)
return node
def create_nodes(self, count, *args, **kwargs):
"""Helper to generate specific number of nodes."""
return [self.create_node(*args, **kwargs) for _ in range(count)]
def create_nodes_w_interfaces_count(self,
nodes_count, if_count=2, **kwargs):
"""Create nodes_count nodes with if_count interfaces each
Default random MAC is generated for each interface
"""
nodes = []
for i in range(nodes_count):
meta = self.default_metadata()
if_list = [
{
"name": "eth{0}".format(i),
"mac": self.generate_random_mac(),
}
for i in range(if_count)]
if_list[0]['pxe'] = True
self.set_interfaces_in_meta(meta, if_list)
nodes.append(self.create_node(meta=meta, mac=if_list[0]['mac'],
**kwargs))
return nodes
def create_task(self, **kwargs):
task = Task(**kwargs)
self.db.add(task)
self.db.commit()
return task
def create_notification(self, **kwargs):
notif_data = {
"topic": "discover",
"message": "Test message",
"status": "unread",
"datetime": datetime.now()
}
if kwargs:
notif_data.update(kwargs)
notification = Notification()
notification.cluster_id = notif_data.get("cluster_id")
for f, v in six.iteritems(notif_data):
setattr(notification, f, v)
self.db.add(notification)
self.db.commit()
return notification
def create_node_group(self, api=True, expect_errors=False,
cluster_id=None, **kwargs):
cluster_id = self._get_cluster_by_id(cluster_id).id
ng_data = {
'cluster_id': cluster_id,
'name': 'test_ng'
}
if kwargs:
ng_data.update(kwargs)
if api:
resp = self.app.post(
reverse('NodeGroupCollectionHandler'),
jsonutils.dumps(ng_data),
headers=self.default_headers,
expect_errors=expect_errors
)
ng = resp
else:
ng = NodeGroup.create(ng_data)
db().flush()
return ng
def create_ip_addrs_by_rules(self, cluster, rules):
"""Manually create VIPs in database basing on given rules
Format exmaple for rules
{
'management': {
'haproxy': '192.168.0.1',
'vrouter': '192.168.0.2',
}
}
:param cluster: cluster ORM instance
:param rules: mapping of networks and VIPs information needed
for proper database entry creation
"""
created_ips = []
for net_group in cluster.network_groups:
if net_group.name not in rules:
continue
vips_by_names = rules[net_group.name]
for vip_name, ip_addr in six.iteritems(vips_by_names):
ip = IPAddr(
network=net_group.id,
ip_addr=ip_addr,
vip_name=vip_name,
)
self.db.add(ip)
created_ips.append(ip)
if created_ips:
self.db.flush()
return created_ips
def disable_task_deploy(self, cluster):
cluster.attributes.editable['common']['task_deploy']['value'] = False
cluster.attributes.editable.changed()
self.db().flush()
def delete_node_group(self, ng_id, status_code=200, api=True):
if api:
return self.app.delete(
reverse(
'NodeGroupHandler',
kwargs={'obj_id': ng_id}
),
headers=self.default_headers,
expect_errors=(status_code != 200)
)
else:
ng = db().query(NodeGroup).get(ng_id)
db().delete(ng)
db().flush()
def setup_networks_for_nodegroup(
self, cluster_id, node_group, cidr_start, floating_ranges=None):
"""Setup networks of particular node group in multi-node-group mode.
:param cluster_id: Cluster Id
:param node_group: NodeGroup instance
:param cidr_start: first two octets of CIDR which are common for all
networks (e.g. "192.168")
:param floating_ranges: Floating IP ranges
:return: response from network setup API request
"""
ng2_networks = {
'fuelweb_admin': {'cidr': '{0}.9.0/24'.format(cidr_start),
'ip_ranges': [['{0}.9.2'.format(cidr_start),
'{0}.9.254'.format(cidr_start)]],
'gateway': '{0}.9.1'.format(cidr_start)},
'public': {'cidr': '{0}.0.0/24'.format(cidr_start),
'ip_ranges': [['{0}.0.2'.format(cidr_start),
'{0}.0.127'.format(cidr_start)]],
'gateway': '{0}.0.1'.format(cidr_start)},
'management': {'cidr': '{0}.1.0/24'.format(cidr_start),
'gateway': '{0}.1.1'.format(cidr_start)},
'storage': {'cidr': '{0}.2.0/24'.format(cidr_start),
'gateway': '{0}.2.1'.format(cidr_start)},
'private': {'cidr': '{0}.3.0/24'.format(cidr_start),
'gateway': '{0}.3.1'.format(cidr_start)},
}
netw_ids = set(net.id for net in node_group.networks)
netconfig = self.neutron_networks_get(cluster_id).json_body
for net in netconfig['networks']:
if not net['meta']['notation']:
continue
if net['id'] in netw_ids and net['name'] in ng2_networks:
for pkey, pval in six.iteritems(ng2_networks[net['name']]):
net[pkey] = pval
net['meta']['use_gateway'] = True
if floating_ranges:
netconfig['networking_parameters']['floating_ranges'] = \
floating_ranges
resp = self.neutron_networks_put(cluster_id, netconfig)
return resp
@mock.patch('nailgun.plugins.loaders.files_manager.FilesManager.load')
@mock.patch('nailgun.plugins.loaders.loader_base.os.path.isdir')
def create_plugin(self, is_dir_m, files_manager_m, sample=None, api=False,
cluster=None, enabled=True, expect_errors=False,
directories=None, **kwargs):
if sample:
plugin_data = sample
plugin_data.update(**kwargs)
else:
plugin_data = self.get_default_plugin_metadata(**kwargs)
env_config = plugin_data.pop('attributes_metadata', None)
node_roles = plugin_data.pop('roles_metadata', None)
node_tags = plugin_data.pop('tags_metadata', None)
volumes = plugin_data.pop('volumes_metadata', None)
network_roles = plugin_data.pop('network_roles_metadata', None)
deployment_tasks = plugin_data.pop('deployment_tasks', None)
tasks = plugin_data.pop('tasks', None)
components = plugin_data.pop('components', None)
nic_config = plugin_data.pop('nic_attributes_metadata', None)
bond_config = plugin_data.pop('bond_attributes_metadata', None)
node_config = plugin_data.pop('node_attributes_metadata', None)
mocked_metadata = {
'metadata.*': plugin_data,
'metadata.yaml': plugin_data,
'environment_config.yaml': env_config,
'node_roles.yaml': node_roles,
'node_tags.yaml': node_tags,
'volumes.yaml': volumes,
'network_roles.yaml': network_roles,
'deployment_tasks.yaml': deployment_tasks,
'tasks.yaml': tasks,
'components.yaml': components,
'nic_config.yaml': nic_config,
'bond_config.yaml': bond_config,
'node_config.yaml': node_config
}
# good only when everything is located in root dir
files_manager_m.side_effect = lambda key: copy.deepcopy(
mocked_metadata.get(os.path.basename(key))
)
# mock is_dir
directories = (set(directories) if directories else set()).union({
'deployment_scripts/',
'repositories/ubuntu',
'repositories/centos'
})
def define_dir(path):
return any(path.endswith(d) for d in directories)
is_dir_m.side_effect = define_dir
if api:
return self.app.post(
reverse('PluginCollectionHandler'),
jsonutils.dumps(plugin_data),
headers=self.default_headers,
expect_errors=expect_errors
)
else:
plugin = Plugin.create(plugin_data)
self.plugins.append(plugin)
# Enable plugin for specific cluster
if cluster:
cluster.plugins.append(plugin)
ClusterPlugin.set_attributes(
cluster.id, plugin.id, enabled=enabled,
attrs=plugin.attributes_metadata or {}
)
return plugin
def create_cluster_plugin_link(self, **kwargs):
dash_data = {
"title": "title",
"url": "url",
"description": "description",
"cluster_id": None,
"hidden": False
}
if kwargs:
dash_data.update(kwargs)
cluster_plugin_link = ClusterPluginLink()
cluster_plugin_link.update(dash_data)
self.db.add(cluster_plugin_link)
self.db.commit()
return cluster_plugin_link
def create_plugin_link(self, **kwargs):
dash_data = {
"title": "title",
"url": "url",
"description": "description",
"plugin_id": None,
"hidden": False
}
if kwargs:
dash_data.update(kwargs)
plugin_link = PluginLink()
plugin_link.update(dash_data)
self.db.add(plugin_link)
self.db.commit()
return plugin_link
def default_metadata(self):
item = self.find_item_by_pk_model(
self.read_fixtures(("sample_environment",)),
1, 'nailgun.node')
return item.get('fields').get('meta', {})
def generate_random_mac(self):
mac = [randint(0x00, 0x7f) for _ in range(6)]
return ':'.join(map(lambda x: "%02x" % x, mac)).lower()
def generate_interfaces_in_meta(self, amount):
nics = []
for i in range(amount):
nics.append(
{
'name': 'eth{0}'.format(i),
'mac': self.generate_random_mac(),
'current_speed': 100,
'max_speed': 1000,
'offloading_modes': [
{
'name': 'enabled_offloading_mode',
'state': True,
"sub": [
{
'name': 'disabled_offloading_sub_mode',
'state': False,
"sub": []
}
]
},
{
'name': 'disabled_offloading_mode',
'state': False,
"sub": []
}
]
}
)
self.set_admin_ip_for_for_single_interface(nics)
return {'interfaces': nics}
def _set_interfaces_if_not_set_in_meta(self, node_id, meta):
if not meta or 'interfaces' not in meta:
self._add_interfaces_to_node(node_id)
def _create_interfaces_from_meta(self, node):
# Create interfaces from meta
for interface in node.meta['interfaces']:
interface = NodeNICInterface(
mac=interface.get('mac'),
name=interface.get('name'),
ip_addr=interface.get('ip'),
netmask=interface.get('netmask')
)
self.db.add(interface)
node.nic_interfaces.append(interface)
self.db.flush()
# If node in a cluster then assign networks for all interfaces
if node.cluster_id:
self.network_manager.assign_networks_by_default(node)
# At least one interface should have
# same ip as mac in meta
if node.nic_interfaces and not \
filter(lambda i: node.mac == i.mac, node.nic_interfaces):
node.nic_interfaces[0].mac = node.mac
self.db.commit()
def _add_interfaces_to_node(self, node_id, count=1):
interfaces = []
node = self.db.query(Node.model).get(node_id)
networks_to_assign = \
list(node.cluster.network_groups) if node.cluster else []
for i in range(count):
interface = NodeNICInterface(
node_id=node_id,
name='eth{0}'.format(i),
mac=self.generate_random_mac(),
current_speed=100,
max_speed=1000,
assigned_networks=networks_to_assign
)
self.db.add(interface)
self.db.commit()
interfaces.append(interface)
# assign all networks to first NIC
networks_to_assign = []
return interfaces
def set_admin_ip_for_for_single_interface(self, interfaces):
"""Set admin ip for single interface if it not setted yet."""
ips = [interface.get('ip') for interface in interfaces]
admin_ips = [
ip for ip in ips
if self.network_manager.is_ip_belongs_to_admin_subnet(ip)]
if not admin_ips:
admin_cidr = NetworkGroup.get_admin_network_group().cidr
interfaces[0]['ip'] = str(IPNetwork(admin_cidr).ip)
def set_interfaces_in_meta(self, meta, interfaces):
"""Set interfaces in metadata."""
meta['interfaces'] = interfaces
self.set_admin_ip_for_for_single_interface(meta['interfaces'])
return meta['interfaces']
def get_default_roles(self):
return list(self.get_default_roles_metadata.keys())
def get_default_volumes_metadata(self):
return self.read_fixtures(
('openstack',))[0]['fields']['volumes_metadata']
def get_default_roles_metadata(self):
return self.read_fixtures(
('openstack',))[0]['fields']['roles_metadata']
def get_default_networks_metadata(self):
return self.read_fixtures(
('openstack',))[0]['fields']['networks_metadata']
def get_default_attributes_metadata(self):
return self.read_fixtures(
['openstack'])[0]['fields']['attributes_metadata']
def get_default_plugin_env_config(self, **kwargs):
return {
'attributes': {
'{0}_text'.format(kwargs.get('plugin_name', 'plugin_name')): {
'value': kwargs.get('value', 'value'),
'type': kwargs.get('type', 'text'),
'description': kwargs.get('description', 'description'),
'weight': kwargs.get('weight', 25),
'label': kwargs.get('label', 'label')}}}
def get_default_plugin_nic_config(self, **kwargs):
nic_attributes = {
'metadata': {
'label': 'Test base plugin'
},
'plugin_name_text': {
'value': 'value',
'type': 'text',
'description': 'Some description',
'weight': 25,
'label': 'label'
}
}
nic_attributes.update(kwargs)
return nic_attributes
def get_default_plugin_bond_config(self, **kwargs):
bond_attributes = {
'metadata': {
'label': 'Test base plugin'
},
'plugin_name_text': {
'value': 'value',
'type': 'text',
'description': 'Some description',
'weight': 25,
'label': 'label'
}
}
bond_attributes.update(kwargs)
return bond_attributes
def get_default_plugin_node_config(self, **kwargs):
node_attributes = {
'plugin_a_section': {
'metadata': {
'label': 'Plugin A Section'
},
'plugin_attr_key': {
'value': 'plugin_attr_val',
'type': 'text',
'description': 'Some description',
'weight': 25,
'label': 'label'
}
}
}
node_attributes.update(kwargs)
return node_attributes
def get_default_plugin_node_roles_config(self, **kwargs):
node_roles = {
'testing_plugin': {
'name': 'Some plugin role',
'description': 'Some description',
'tags': ['testing_plugin']
}
}
node_roles.update(kwargs)
return node_roles
def get_default_plugin_node_tags_config(self, **kwargs):
node_tags = {
'testing_plugin': {
'has_primary': False
}
}
node_tags.update(kwargs)
return node_tags
def get_default_plugin_volumes_config(self, **kwargs):
volumes = {
'volumes_roles_mapping': {
'testing_plugin': [
{'allocate_size': 'min', 'id': 'os'},
{'allocate_size': 'all', 'id': 'test_volume'}
]
},
'volumes': [
{'id': 'test_volume', 'type': 'vg'}
]
}
volumes.update(kwargs)
return volumes
def get_default_network_roles_config(self, **kwargs):
network_roles = [
{
'id': 'test_network_role',
'default_mapping': 'public',
'properties': {
'subnet': 'true',
'gateway': 'false',
'vip': [
{'name': 'test_vip_1', 'shared': False},
{'name': 'test_vip_2', 'shared': False}
]
}
}
]
network_roles[0].update(kwargs)
return network_roles
def get_default_plugin_deployment_tasks(self, **kwargs):
deployment_tasks = [
{
'id': 'role-name',
'type': 'group',
'role': ['role-name'],
'requires': ['controller'],
'required_for': ['deploy_end'],
'parameters': {
'strategy': {
'type': 'parallel'
}
}
}
]
deployment_tasks[0].update(kwargs)
return deployment_tasks
def get_default_plugin_tasks(self, **kwargs):
default_tasks = [
{
'role': '[test_role]',
'stage': 'post_deployment',
'type': 'puppet',
'parameters': {
'puppet_manifest': '/etc/puppet/modules/test_manigest.pp',
'puppet_modules': '/etc/puppet/modules',
'timeout': 720
}
}
]
default_tasks[0].update(kwargs)
return default_tasks
def get_default_plugin_metadata(self, **kwargs):
sample_plugin = {
'version': '0.1.0',
'name': 'testing_plugin',
'title': 'Test plugin',
'package_version': '1.0.0',
'description': 'Enable to use plugin X for Neutron',
'fuel_version': ['6.0'],
'groups': [],
'licenses': ['License 1'],
'authors': ['Author1'],
'homepage': 'http://some-plugin-url.com/',
'is_hotpluggable': False,
'releases': [
{'repository_path': 'repositories/ubuntu',
'version': '2014.2-6.0', 'os': 'ubuntu',
'mode': ['ha', 'multinode'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/centos',
'version': '2014.2-6.0', 'os': 'centos',
'mode': ['ha', 'multinode'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/ubuntu',
'version': '2015.1-8.0', 'os': 'ubuntu',
'mode': ['ha', 'multinode'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/ubuntu',
'version': 'mitaka-9.0', 'os': 'ubuntu',
'mode': ['ha'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/ubuntu',
'version': 'newton-10.0', 'os': 'ubuntu',
'mode': ['ha'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/ubuntu',
'version': 'newton-10.0', 'os': 'ubuntu',
'mode': ['ha'],
'deployment_scripts_path': 'deployment_scripts/'},
{'repository_path': 'repositories/ubuntu',
'version': 'ocata-11.0', 'os': 'ubuntu',
'mode': ['ha'],
'deployment_scripts_path': 'deployment_scripts/'}
]
}
sample_plugin.update(kwargs)
return sample_plugin
def get_default_components(self, **kwargs):
default_components = [
{
'name': 'hypervisor:test_hypervisor',
'compatible': [
{'name': 'hypervisor:*'},
{'name': 'storage:object:block:swift'}
],
'incompatible': [
{'name': 'network:*'},
{'name': 'additional_service:*'}
]
}
]
default_components[0].update(kwargs)
return default_components
def get_default_vmware_attributes_metadata(self):
return self.read_fixtures(
['openstack'])[0]['fields']['vmware_attributes_metadata']
def upload_fixtures(self, fxtr_names):
for fxtr_path in self.fxtr_paths_by_names(fxtr_names):
with open(fxtr_path, "r") as fxtr_file:
upload_fixture(fxtr_file)
def read_fixtures(self, fxtr_names):
data = []
for fxtr_path in self.fxtr_paths_by_names(fxtr_names):
with open(fxtr_path, "r") as fxtr_file:
try:
data.extend(load_fixture(fxtr_file))
except Exception as exc:
logger.error(
'Error "%s" occurred while loading '
'fixture %s' % (exc, fxtr_path)
)
return data
def fxtr_paths_by_names(self, fxtr_names):
for fxtr in fxtr_names:
for ext in ['json', 'yaml']:
fxtr_path = os.path.join(
self.fixture_dir,
"%s.%s" % (fxtr, ext)
)
if os.path.exists(fxtr_path):
logger.debug(
"Fixture file is found, yielding path: %s",
fxtr_path
)
yield fxtr_path
break
else:
logger.warning(
"Fixture file was not found: %s",
fxtr
)
def find_item_by_pk_model(self, data, pk, model):
for item in data:
if item.get('pk') == pk and item.get('model') == model:
return item
def _get_cluster_by_id(self, cluster_id):
"""Get cluster by cluster ID.
Get cluster by cluster ID. If `cluster_id` is `None` then return
first cluster from the list.
:param cluster_id: cluster ID
:type cluster_id: int
:return: cluster
"""
if cluster_id is None:
return self.clusters[0]
for cluster in self.clusters:
if cluster.id == cluster_id:
return cluster
raise Exception(
'Cluster with ID "{0}" was not found.'.format(cluster_id))
def _launch_for_selected_nodes(self, handler, nodes_uids, cluster_id,
body=None):
if body is None:
body = {}
if self.clusters:
cluster = self._get_cluster_by_id(cluster_id)
if not nodes_uids:
nodes_uids = [n.uid for n in cluster.nodes]
action_url = reverse(
handler,
kwargs={'cluster_id': cluster.id}
) + '?nodes={0}'.format(','.join(nodes_uids))
resp = self.app.put(
action_url,
jsonutils.dumps(body),
headers=self.default_headers,
expect_errors=True
)
# If @fake_tasks runs synchoronously, then API
# returns 200 (executed). If fake_rpc=False, then
# API returns 202 (scheduled)
self.tester.assertIn(resp.status_code, [200, 202])
response = resp.json_body
return self.db.query(Task).filter_by(
uuid=response['uuid']
).first()
else:
raise NotImplementedError(
"Nothing to provision - try creating cluster"
)
def launch_provisioning_selected(self, nodes_uids=None, cluster_id=None):
return self._launch_for_selected_nodes(
'ProvisionSelectedNodes', nodes_uids, cluster_id
)
def launch_deployment_selected(self, nodes_uids=None, cluster_id=None):
return self._launch_for_selected_nodes(
'DeploySelectedNodes', nodes_uids, cluster_id
)
def launch_deployment_selected_tasks(self,
nodes_uids, cluster_id, task_ids):
return self._launch_for_selected_nodes(
'DeploySelectedNodesWithTasks', nodes_uids, cluster_id,
task_ids or [],
)
def _launch_for_cluster(self, handler, cluster_id, **kwargs):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
if kwargs:
get_string = '?' + ('&'.join(
'{}={}'.format(k, v) for k, v in six.iteritems(kwargs)
))
else:
get_string = ''
resp = self.app.put(
reverse(
handler,
kwargs={'cluster_id': cluster_id}
) + get_string,
headers=self.default_headers)
return self.db.query(Task).filter_by(
uuid=resp.json_body['uuid']
).first()
else:
raise NotImplementedError(
"Nothing to deploy - try creating cluster"
)
def launch_deployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesHandler', cluster_id, **kwargs
)
def launch_redeployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id, **kwargs
)
def stop_deployment(self, cluster_id=None):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
resp = self.app.put(
reverse(
'ClusterStopDeploymentHandler',
kwargs={'cluster_id': cluster_id}),
expect_errors=True,
headers=self.default_headers)
return self.db.query(Task).filter_by(
uuid=resp.json_body['uuid']
).first()
else:
raise NotImplementedError(
"Nothing to stop - try creating cluster"
)
def reset_environment(self, expect_http=None, cluster_id=None, force=1):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
resp = self.app.put(
reverse(
'ClusterResetHandler',
kwargs={'cluster_id': cluster_id}
) + '?force={0}'.format(int(force)),
expect_errors=True,
headers=self.default_headers)
if expect_http is not None:
self.tester.assertEqual(resp.status_code, expect_http)
else:
# if task was started status code can be either 200 or 202
# depending on task status
self.tester.assertIn(resp.status_code, [200, 202])
if not (200 <= resp.status_code < 400):
return resp.body
return self.db.query(Task).filter_by(
uuid=resp.json_body['uuid']
).first()
else:
raise NotImplementedError(
"Nothing to reset - try creating cluster"
)
def delete_environment(self, expect_http=202, cluster_id=None, force=1):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
resp = self.app.delete(
reverse(
'ClusterHandler',
kwargs={'obj_id': cluster_id}
) + '?force={0}'.format(int(force)),
expect_errors=True,
headers=self.default_headers)
self.tester.assertEqual(resp.status_code, expect_http)
if not str(expect_http).startswith("2"):
return resp.body
return self.db.query(Task).filter_by(
name=consts.TASK_NAMES.cluster_deletion
).first()
else:
raise NotImplementedError(
"Nothing to delete - try creating cluster"
)
def launch_verify_networks(self, data=None, expect_errors=False,
cluster_id=None):
if self.clusters:
cluster = self._get_cluster_by_id(cluster_id)
net_urls = {
"nova_network": {
"config": "NovaNetworkConfigurationHandler",
"verify": "NovaNetworkConfigurationVerifyHandler"
},
"neutron": {
"config": "NeutronNetworkConfigurationHandler",
"verify": "NeutronNetworkConfigurationVerifyHandler"
}
}
provider = cluster.net_provider
if data:
nets = jsonutils.dumps(data)
else:
resp = self.app.get(
reverse(
net_urls[provider]["config"],
kwargs={'cluster_id': cluster.id}
),
headers=self.default_headers
)
self.tester.assertEqual(200, resp.status_code)
nets = resp.body
resp = self.app.put(
reverse(
net_urls[provider]["verify"],
kwargs={'cluster_id': cluster.id}),
nets,
headers=self.default_headers,
expect_errors=expect_errors,
)
if expect_errors:
return resp
else:
task_uuid = resp.json_body['uuid']
return self.db.query(Task).filter_by(uuid=task_uuid).first()
else:
raise NotImplementedError(
"Nothing to verify - try creating cluster"
)
def make_bond_via_api(self, bond_name, bond_mode, nic_names,
node_id=None, attrs=None):
if not node_id:
node_id = self.nodes[0]["id"]
resp = self.app.get(
reverse("NodeNICsHandler",
kwargs={"node_id": node_id}),
headers=self.default_headers)
self.tester.assertEqual(resp.status_code, 200)
data = resp.json_body
nics = self.db.query(NodeNICInterface).filter(
NodeNICInterface.name.in_(nic_names)
).filter(
NodeNICInterface.node_id == node_id
)
self.tester.assertEqual(nics.count(), len(nic_names))
assigned_nets, slaves = [], []
for nic in data:
if nic['name'] in nic_names:
assigned_nets.extend(nic['assigned_networks'])
slaves.append({'name': nic['name']})
nic['assigned_networks'] = []
bond_dict = {
"name": bond_name,
"type": NETWORK_INTERFACE_TYPES.bond,
"mode": bond_mode,
"slaves": slaves,
"assigned_networks": assigned_nets,
"attributes": attrs or {}
}
data.append(bond_dict)
resp = self.node_nics_put(node_id, data)
self.tester.assertEqual(resp.status_code, 200)
def refresh_nodes(self):
for n in self.nodes[:]:
try:
self.db.add(n)
self.db.refresh(n)
except Exception:
self.nodes.remove(n)
self.db.flush()
def refresh_clusters(self):
for n in self.clusters[:]:
try:
self.db.refresh(n)
except Exception:
self.nodes.remove(n)
def _api_get(self, method, instance_id, expect_errors=False):
return self.app.get(
reverse(method,
kwargs=instance_id),
headers=self.default_headers,
expect_errors=expect_errors)
def _api_put(self, method, instance_id, data, expect_errors=False):
return self.app.put(
reverse(method,
kwargs=instance_id),
jsonutils.dumps(data),
headers=self.default_headers,
expect_errors=expect_errors)
def nova_networks_get(self, cluster_id, expect_errors=False):
return self._api_get('NovaNetworkConfigurationHandler',
{'cluster_id': cluster_id},
expect_errors)
def nova_networks_put(self, cluster_id, networks, expect_errors=False):
return self._api_put('NovaNetworkConfigurationHandler',
{'cluster_id': cluster_id},
networks,
expect_errors)
def neutron_networks_get(self, cluster_id, expect_errors=False):
return self._api_get('NeutronNetworkConfigurationHandler',
{'cluster_id': cluster_id},
expect_errors)
def neutron_networks_put(self, cluster_id, networks, expect_errors=False):
return self._api_put('NeutronNetworkConfigurationHandler',
{'cluster_id': cluster_id},
networks,
expect_errors)
def cluster_changes_put(self, cluster_id, expect_errors=False):
return self._api_put('ClusterChangesHandler',
{'cluster_id': cluster_id},
[],
expect_errors)
def node_nics_get(self, node_id, expect_errors=False):
return self._api_get('NodeNICsHandler',
{'node_id': node_id},
expect_errors)
def node_nics_put(self, node_id, interfaces, expect_errors=False):
return self._api_put('NodeNICsHandler',
{'node_id': node_id},
interfaces,
expect_errors)
def node_collection_nics_put(self, nodes,
expect_errors=False):
return self._api_put('NodeCollectionNICsHandler',
{},
nodes,
expect_errors)
def _create_network_group(self, expect_errors=False, cluster=None,
group_id=None, **kwargs):
if not cluster:
cluster = self.clusters[0]
ng = {
"release": cluster.release.id,
"name": "external",
"vlan_start": 50,
"cidr": "10.3.0.0/24",
"gateway": "10.3.0.1",
"group_id": group_id or Cluster.get_default_group(cluster).id,
"meta": {
"notation": consts.NETWORK_NOTATION.cidr,
"use_gateway": True,
"map_priority": 2}
}
ng.update(kwargs)
resp = self.app.post(
reverse('NetworkGroupCollectionHandler'),
jsonutils.dumps(ng),
headers=self.default_headers,
expect_errors=expect_errors,
)
return resp
def _update_network_group(self, ng_data, expect_errors=False):
return self.app.put(
reverse(
'NetworkGroupHandler',
kwargs={'obj_id': ng_data['id']}
),
jsonutils.dumps(ng_data),
headers=self.default_headers,
expect_errors=expect_errors
)
def _set_additional_component(self, cluster, component, value):
attrs = copy.deepcopy(cluster.attributes.editable)
attrs['additional_components'][component]['value'] = value
self.app.patch(
reverse(
'ClusterAttributesHandler',
kwargs={'cluster_id': cluster.id}),
params=jsonutils.dumps({'editable': attrs}),
headers=self.default_headers
)
def _delete_network_group(self, ng_id, expect_errors=False):
return self.app.delete(
reverse(
'NetworkGroupHandler',
kwargs={'obj_id': ng_id}
),
headers=self.default_headers,
expect_errors=expect_errors
)
def set_task_status_recursively(self, supertask, status):
supertask.status = consts.TASK_STATUSES.ready
for sub_task in supertask.subtasks:
self.set_task_status_recursively(sub_task, status)
class BaseUnitTest(TestCase):
def datadiff(self, data1, data2, path=None, ignore_keys=[],
compare_sorted=False):
if path is None:
path = []
def fail(msg, failed_path):
self.fail('Path "{0}": {1}'.format("->".join(failed_path), msg))
if not isinstance(data1, dict) or not isinstance(data2, dict):
if isinstance(data1, (list, tuple)):
newpath = path[:]
if compare_sorted:
data1 = sorted(data1)
data2 = sorted(data2)
for i, keys in enumerate(izip(data1, data2)):
newpath.append(str(i))
self.datadiff(keys[0], keys[1], newpath, ignore_keys,
compare_sorted)
newpath.pop()
elif data1 != data2:
err = "Values differ: {0} != {1}".format(
str(data1),
str(data2)
)
fail(err, path)
else:
newpath = path[:]
if len(data1) != len(data2):
fail('Dicts have different keys number: {0} != {1}'.format(
len(data1), len(data2)), path)
for key1, key2 in zip(
sorted(data1),
sorted(data2)
):
if key1 != key2:
err = "Keys differ: {0} != {1}".format(
str(key1),
str(key2)
)
fail(err, path)
if key1 in ignore_keys:
continue
newpath.append(key1)
self.datadiff(data1[key1], data2[key2], newpath, ignore_keys,
compare_sorted)
newpath.pop()
def assertNotRaises(self, exception, method, *args, **kwargs):
try:
method(*args, **kwargs)
except exception:
self.fail('Exception "{0}" raised.'.format(exception))
def assertRaisesWithMessage(self, exc, msg, func, *args, **kwargs):
try:
func(*args, **kwargs)
self.fail('Exception "{0}" raised.'.format(exc))
except Exception as inst:
self.assertIsInstance(inst, exc)
self.assertEqual(inst.message, msg)
def assertRaisesWithMessageIn(self, exc, msg, func, *args, **kwargs):
try:
func(*args, **kwargs)
self.fail('Exception "{0}" raised.'.format(exc))
except Exception as inst:
self.assertIsInstance(inst, exc)
self.assertIn(msg, inst.message)
def assertValidJSON(self, data):
self.assertNotRaises(ValueError, jsonutils.loads, data)
class BaseTestCase(BaseUnitTest):
fixtures = ['admin_network', 'master_node_settings']
def __init__(self, *args, **kwargs):
super(BaseTestCase, self).__init__(*args, **kwargs)
self.default_headers = {
"Content-Type": "application/json"
}
@classmethod
def setUpClass(cls):
cls.app = app.TestApp(
build_app(db_driver=test_db_driver).wsgifunc(
ConnectionMonitorMiddleware)
)
syncdb()
# syncdb disables logger, we need to enable it again
logger.disabled = 0
def setUp(self):
self.db = db
flush()
self.env = EnvironmentManager(app=self.app, session=self.db)
self.env.upload_fixtures(self.fixtures)
def tearDown(self):
self.db.remove()
class BaseIntegrationTest(BaseTestCase):
@classmethod
def setUpClass(cls):
super(BaseIntegrationTest, cls).setUpClass()
nailgun.task.task.logs_utils.prepare_syslog_dir = mock.Mock()
def emulate_nodes_provisioning(self, nodes):
for node in nodes:
node.status = consts.NODE_STATUSES.provisioned
node.pending_addition = False
self.db.add_all(nodes)
self.db.flush()
def emulate_nodes_deployment(self, nodes):
for node in nodes:
node.status = consts.NODE_STATUSES.ready
node.pending_addition = False
self.db.flush()
@classmethod
def tearDownClass(cls):
super(BaseIntegrationTest, cls).tearDownClass()
class BaseAuthenticationIntegrationTest(BaseIntegrationTest):
@classmethod
def setUpClass(cls):
super(BaseAuthenticationIntegrationTest, cls).setUpClass()
cls.app = app.TestApp(build_app(db_driver=test_db_driver).wsgifunc(
ConnectionMonitorMiddleware, NailgunFakeKeystoneAuthMiddleware))
syncdb()
def get_auth_token(self):
resp = self.app.post(
'/keystone/v2.0/tokens',
jsonutils.dumps({
'auth': {
'tenantName': 'admin',
'passwordCredentials': {
'username': settings.FAKE_KEYSTONE_USERNAME,
'password': settings.FAKE_KEYSTONE_PASSWORD,
},
},
})
)
return resp.json['access']['token']['id'].encode('utf-8')
def fake_tasks(fake_rpc=True,
mock_rpc=True,
tick_count=100,
tick_interval=0,
**kwargs):
def wrapper(func):
func = mock.patch(
'nailgun.task.task.settings.FAKE_TASKS',
True
)(func)
func = mock.patch(
'nailgun.task.fake.settings.FAKE_TASKS_TICK_COUNT',
tick_count
)(func)
func = mock.patch(
'nailgun.task.fake.settings.FAKE_TASKS_TICK_INTERVAL',
tick_interval
)(func)
if fake_rpc:
func = mock.patch(
'nailgun.task.task.rpc.cast',
functools.partial(
nailgun.task.task.fake_cast,
**kwargs
)
)(func)
func = mock.patch(
'nailgun.task.fake.settings.TESTS_WITH_NO_THREADS',
True
)(func)
# In fake.py, we have threading.Event().isSet(), which
# we need to return False. We also mock threading module
# We could just do mock.patch('..threading', **config), but
# mock object is inserted as first argument in every test then.
# If replacement object is passed as second arg to patch(), then
# args in original function are not changed. That's why isSet_mock
# created as replacement object.
config = {'Event.return_value.isSet.return_value': False}
isSet_mock = mock.Mock()
isSet_mock.configure_mock(**config)
func = mock.patch(
'nailgun.task.fake.threading',
isSet_mock
)(func)
elif mock_rpc:
func = mock.patch(
'nailgun.task.task.rpc.cast',
**kwargs
)(func)
return func
return wrapper
def mock_rpc(pass_mock=False, **rpc_mock_kwargs):
"""Decorator that mocks rpc.cast
:param pass_mock: should decorator pass mocked object to decorated
function arguments
:param rpc_mock_kwargs: additional arguments to mock.patch function
"""
def wrapper(f):
@functools.wraps(f)
def inner(*args, **kwargs):
with mock.patch('nailgun.rpc.cast', **rpc_mock_kwargs) as rpc_mock:
if pass_mock:
return f(*(args + (rpc_mock,)), **kwargs)
else:
return f(*args, **kwargs)
return inner
return wrapper
class DeploymentTasksTestMixin(object):
def _compare_tasks(self, reference, result):
"""Compare deployment tasks.
Considering legacy format and compatible validator output with extra
fields where legacy fields is converted to new syntax.
:param reference: list of tasks
:type reference: list
:param result: list of tasks
:type result: list
"""
reference.sort(key=lambda x: x.get('id', x.get('task_name')))
result.sort(key=lambda x: x.get('id', x.get('task_name')))
self.assertEqual(len(reference), len(result))
for ref, res in six.moves.zip(reference, result):
for field in ref:
if field == '_custom':
# unpack custom json fields if persist
self.assertEqual(
jsonutils.loads(ref.get(field)),
jsonutils.loads((res or {}).get(field))
)
else:
self.assertEqual(ref.get(field), (res or {}).get(field))
def _patch_tags_legacy(release_data, version):
if is_feature_supported(version, consts.TAGS_SUPPORT_VERSION):
return
roles = release_data.get('roles_metadata', {})
for role_name, meta in six.iteritems(roles):
meta['tags'] = [role_name]
# this method is for development and troubleshooting purposes
def datadiff(data1, data2, branch, p=True):
def iterator(data1, data2):
if isinstance(data1, (list,)) and isinstance(data2, (list,)):
return range(max(len(data1), len(data2)))
elif isinstance(data1, (dict,)) and isinstance(data2, (dict,)):
return (set(data1.keys()) | set(data2.keys()))
else:
raise TypeError
diff = []
if data1 != data2:
try:
it = iterator(data1, data2)
except Exception:
return [(branch, data1, data2)]
for k in it:
newbranch = branch[:]
newbranch.append(k)
if p:
print("Comparing branch: %s" % newbranch)
try:
try:
v1 = data1[k]
except (KeyError, IndexError):
if p:
print("data1 seems does not have key = %s" % k)
diff.append((newbranch, None, data2[k]))
continue
try:
v2 = data2[k]
except (KeyError, IndexError):
if p:
print("data2 seems does not have key = %s" % k)
diff.append((newbranch, data1[k], None))
continue
except Exception:
if p:
print("data1 and data2 cannot be compared on "
"branch: %s" % newbranch)
return diff.append((newbranch, data1, data2))
else:
if v1 != v2:
if p:
print("data1 and data2 do not match "
"each other on branch: %s" % newbranch)
# print("data1 = %s" % data1)
print("v1 = %s" % v1)
# print("data2 = %s" % data2)
print("v2 = %s" % v2)
diff.extend(datadiff(v1, v2, newbranch))
return diff
def reflect_db_metadata():
meta = sa.MetaData()
meta.reflect(bind=db.get_bind())
return meta
def get_nodegroup_network_schema_template(template, group_name):
custom_template = template['adv_net_template'][group_name]
custom_template_obj = NetworkTemplate(jsonutils.dumps(custom_template))
node_custom_template = custom_template_obj.safe_substitute(
custom_template['nic_mapping']['default'])
return jsonutils.loads(node_custom_template)['network_scheme']
class BaseAlembicMigrationTest(TestCase):
def setUp(self):
super(BaseAlembicMigrationTest, self).setUp()
self.meta = reflect_db_metadata()
def tearDown(self):
db.remove()
super(BaseAlembicMigrationTest, self).tearDown()
class BaseMasterNodeSettignsTest(BaseIntegrationTest):
def setUp(self):
super(BaseMasterNodeSettignsTest, self).setUp()
self.create_master_node_settings()
master_node_settings_template = {
"settings": {
"ui_settings": {
"view_mode": "standard",
"filter": {},
"sort": [{"status": "asc"}],
"filter_by_labels": {},
"sort_by_labels": [],
"search": ""
},
"statistics": {
"send_anonymous_statistic": {
"type": "checkbox",
"value": True,
"label": "statistics.setting_labels."
"send_anonymous_statistic",
"weight": 10
},
"user_choice_saved": {
"type": "hidden",
"value": False
}
}
}
}
def create_master_node_settings(self):
self.master_node_settings = {
'master_node_uid': str(uuid.uuid4()),
}
self.master_node_settings.update(self.master_node_settings_template)
MasterNodeSettings.create(self.master_node_settings)
self.db.commit()
def set_sending_stats(self, value):
mn_settings = MasterNodeSettings.get_one()
mn_settings.settings = dict_merge(
mn_settings.settings,
{'statistics': {
'user_choice_saved': {'value': True},
'send_anonymous_statistic': {'value': value}
}})
self.db.flush()
def enable_sending_stats(self):
self.set_sending_stats(True)
def disable_sending_stats(self):
self.set_sending_stats(False)
class BaseValidatorTest(BaseTestCase):
"""JSON-schema validation policy:
1) All required properties are present;
2) No additional properties allowed;
3) Item has correct type.
"""
validator = None
def serialize(self, data):
"""Serialize object to a string.
:param data: object being serialized
:return: stringified JSON-object
"""
return jsonutils.dumps(data)
def get_invalid_data_context(self, data, *args):
"""Returns context object of raised InvalidData exception.
:return: context of 'errors.InvalidData'
"""
serialized_data = self.serialize(data)
with self.assertRaises(errors.InvalidData) as context:
self.validator(serialized_data, *args)
return context
def assertRaisesAdditionalProperty(self, obj, key, *args):
context = self.get_invalid_data_context(obj, *args)
self.assertIn(
"Additional properties are not allowed".format(key),
context.exception.message)
self.assertIn(
"'{0}' was unexpected".format(key),
context.exception.message)
def assertRaisesRequiredProperty(self, obj, key):
context = self.get_invalid_data_context(obj)
self.assertIn(
"Failed validating 'required' in schema",
context.exception.message)
self.assertIn(
"'{0}' is a required property".format(key),
context.exception.message)
def assertRaisesInvalidType(self, obj, value, expected_value):
context = self.get_invalid_data_context(obj)
self.assertIn(
"Failed validating 'type' in schema",
context.exception.message)
self.assertIn(
"{0} is not of type {1}".format(value, expected_value),
context.exception.message)
def assertRaisesInvalidAnyOf(self, obj, passed_value, instance):
context = self.get_invalid_data_context(obj)
self.assertIn(
"Failed validating 'anyOf' in schema",
context.exception.message)
err_msg = "{0} is not valid under any of the given schemas"
self.assertIn(
err_msg.format(passed_value),
context.exception.message)
self.assertIn(
"On instance{0}".format(instance),
context.exception.message)
def assertRaisesInvalidOneOf(self, passed_data,
incorrect, data_label, *args):
"""Check raised within error context exception
Test that the exception has features pertaining to
failed 'oneOf' validation case of jsonschema
:param passed_data: dict to be serialized and passed for validation
:param incorrect: data value which doesn't pass the validation
and is present in error message of original exception
:param data_label: key name from passed_data one of which value
doesn't pass the validation; is present in the error message
:param *args: list of additional arguments passed to validation code
"""
context = self.get_invalid_data_context(passed_data, *args)
self.assertIn(
"Failed validating 'oneOf' in schema",
context.exception.message)
err_msg = "{0} is not valid under any of the given schemas"
self.assertIn(
err_msg.format(incorrect),
context.exception.message)
self.assertIn(
"On instance{0}".format(data_label),
context.exception.message)
def assertRaisesInvalidEnum(self, obj, value, expected_value):
context = self.get_invalid_data_context(obj)
self.assertIn(
"Failed validating 'enum' in schema",
context.exception.message)
self.assertIn(
"{0} is not one of {1}".format(value, expected_value),
context.exception.message)
def assertRaisesTooLong(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"{0} is too long".format(stringified_values),
context.exception.message)
def assertRaisesTooShort(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"{0} is too short".format(stringified_values),
context.exception.message)
def assertRaisesNonUnique(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"{0} has non-unique elements".format(stringified_values),
context.exception.message)
def assertRaisesLessThanMinimum(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"{0} is less than the minimum".format(stringified_values),
context.exception.message)
def assertRaisesGreaterThanMaximum(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"{0} is greater than the maximum".format(stringified_values),
context.exception.message)
def assertRaisesNotMatchPattern(self, obj, stringified_values):
context = self.get_invalid_data_context(obj)
self.assertIn(
"Failed validating 'pattern' in schema",
context.exception.message)
self.assertIn(
"{0} does not match".format(stringified_values),
context.exception.message)