From eee0d7897eb9ce914ac0f9ba55ab684aaff5bc15 Mon Sep 17 00:00:00 2001 From: xiaodongwang Date: Tue, 18 Mar 2014 17:02:26 -0700 Subject: [PATCH] add unittest for config_translator_callbacks Change-Id: I0bcfbc55a461689a480ec1460c8419c62152463e --- .../utils/config_translator_callbacks.py | 4 +- compass/db/model.py | 15 +- .../utils/test_config_translator_callbacks.py | 103 ++++ compass/tests/db/test_model.py | 522 ++++++++++++++++++ 4 files changed, 638 insertions(+), 6 deletions(-) create mode 100644 compass/tests/config_management/utils/test_config_translator_callbacks.py create mode 100644 compass/tests/db/test_model.py diff --git a/compass/config_management/utils/config_translator_callbacks.py b/compass/config_management/utils/config_translator_callbacks.py index 0e8fca7b..433b2250 100644 --- a/compass/config_management/utils/config_translator_callbacks.py +++ b/compass/config_management/utils/config_translator_callbacks.py @@ -28,14 +28,16 @@ def get_key_from_pattern( match = re.match(from_pattern, path) if not match: return None + group = match.groupdict() util.merge_dict(group, kwargs) try: translated_key = to_pattern % group - except Exception as error: + except KeyError as error: logging.error('failed to get translated key from %s %% %s', to_pattern, group) raise error + return translated_key diff --git a/compass/db/model.py b/compass/db/model.py index 16ae492c..79b3f2ea 100644 --- a/compass/db/model.py +++ b/compass/db/model.py @@ -180,8 +180,8 @@ class Machine(BASE): super(Machine, self).__init__(**kwargs) def __repr__(self): - return ''\ - % (self.mac, self.port, self.vlan, self.switch) + return '' % ( + self.mac, self.port, self.vlan, self.switch) class HostState(BASE): @@ -308,7 +308,10 @@ class Cluster(BASE): security_config = Column(Text) networking_config = Column(Text) partition_config = Column(Text) - adapter_id = Column(Integer, ForeignKey('adapter.id')) + adapter_id = Column(Integer, ForeignKey('adapter.id', + onupdate='CASCADE', + ondelete='SET NULL'), + nullable=True) raw_config = Column(Text) adapter = relationship("Adapter", backref=backref('clusters', lazy='dynamic')) @@ -486,8 +489,10 @@ class ClusterHost(BASE): __table_args__ = (UniqueConstraint('cluster_id', 'hostname', name='unique_host'),) - cluster = relationship("Cluster", backref=backref('hosts', lazy='dynamic')) - machine = relationship("Machine", backref=backref('host', uselist=False)) + cluster = relationship("Cluster", + backref=backref('hosts', lazy='dynamic')) + machine = relationship("Machine", + backref=backref('host', uselist=False)) def __init__(self, **kwargs): if 'hostname' not in kwargs or not kwargs['hostname']: diff --git a/compass/tests/config_management/utils/test_config_translator_callbacks.py b/compass/tests/config_management/utils/test_config_translator_callbacks.py new file mode 100644 index 00000000..0ab6b9e7 --- /dev/null +++ b/compass/tests/config_management/utils/test_config_translator_callbacks.py @@ -0,0 +1,103 @@ +#!/usr/bin/python +# +# Copyright 2014 Huawei Technologies Co. Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""test config_filter module. + + .. moduleauthor:: Xiaodong Wang +""" +import os +import unittest2 + + +os.environ['COMPASS_IGNORE_SETTING'] = 'true' + + +from compass.utils import setting_wrapper as setting +reload(setting) + + +from compass.config_management.utils import config_reference +from compass.config_management.utils import config_translator_callbacks +from compass.utils import flags +from compass.utils import logsetting + + +class TestGetKeyFromPattern(unittest2.TestCase): + """test get key from pattern.""" + + def setUp(self): + super(TestGetKeyFromPattern, self).setUp() + logsetting.init() + + def tearDown(self): + super(TestGetKeyFromPattern, self).tearDown() + + def test_get_key_from_pattern(self): + key = config_translator_callbacks.get_key_from_pattern( + None, '/networking/interfaces/management/ip', + to_pattern='macaddress-%(nic)s', + nic='eth0') + self.assertEqual(key, 'macaddress-eth0') + + def test_get_key_from_pattern_extra_keys_in_to_pattern(self): + self.assertRaises( + KeyError, config_translator_callbacks.get_key_from_pattern, + None, '/networking/interfaces/management/ip', + to_pattern='macaddress-%(nic)s') + + +class TestAddValue(unittest2.TestCase): + """test add value.""" + + def setUp(self): + super(TestAddValue, self).setUp() + logsetting.init() + + def tearDown(self): + super(TestAddValue, self).tearDown() + + def test_add_value_if_not_exist(self): + config = 'hello' + ref = config_reference.ConfigReference(config) + translated_config = '' + translated_ref = config_reference.ConfigReference(translated_config) + new_value = config_translator_callbacks.add_value( + ref, None, translated_ref, None, condition=True) + self.assertEqual(new_value, 'hello') + + def test_add_value(self): + config = 'hello' + ref = config_reference.ConfigReference(config) + translated_config = 'hi' + translated_ref = config_reference.ConfigReference(translated_config) + new_value = config_translator_callbacks.add_value( + ref, None, translated_ref, None, condition=True) + self.assertEqual(new_value, 'hi,hello') + + def test_ignore_add_value(self): + config = 'hello' + ref = config_reference.ConfigReference(config) + translated_config = 'hi' + translated_ref = config_reference.ConfigReference(translated_config) + new_value = config_translator_callbacks.add_value( + ref, None, translated_ref, None, condition=False) + self.assertEqual(new_value, 'hi') + + +if __name__ == '__main__': + flags.init() + logsetting.init() + unittest2.main() diff --git a/compass/tests/db/test_model.py b/compass/tests/db/test_model.py new file mode 100644 index 00000000..50e58740 --- /dev/null +++ b/compass/tests/db/test_model.py @@ -0,0 +1,522 @@ +# Copyright 2014 Huawei Technologies Co. Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""test util module. + + .. moduleauthor:: Xiaodong Wang +""" +import os +import sqlalchemy.exc +import unittest2 + + +os.environ['COMPASS_IGNORE_SETTING'] = 'true' + + +from compass.utils import setting_wrapper as setting +reload(setting) + + +from compass.db import database +from compass.db import model +from compass.utils import flags +from compass.utils import logsetting + + +class TestModel(unittest2.TestCase): + """Test database model.""" + + def setUp(self): + super(TestModel, self).setUp() + logsetting.init() + database.init('sqlite://') + database.create_db() + + def tearDown(self): + database.drop_db() + super(TestModel, self).tearDown() + + def test_switch_config(self): + with database.session() as session: + session.add(model.SwitchConfig( + ip='10.145.88.1', filter_port='123' + )) + + with database.session() as session: + switch_configs = session.query(model.SwitchConfig).all() + self.assertEqual(len(switch_configs), 1) + self.assertEqual(switch_configs[0].ip, '10.145.88.1') + self.assertEqual(switch_configs[0].filter_port, '123') + + def test_switch_config_ip_filterport_unique(self): + def _call(): + with database.session() as session: + session.add(model.SwitchConfig( + ip='10.145.88.1', filter_port='123')) + session.add(model.SwitchConfig( + ip='10.145.88.1', filter_port='123')) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_switch(self): + with database.session() as session: + switch = model.Switch(ip='10.145.88.1') + switch.credential = { + 'version': 'v2c', 'community': 'public' + } + switch.vendor = 'huawei' + session.add(switch) + + with database.session() as session: + switches = session.query(model.Switch).all() + self.assertEqual(len(switches), 1) + self.assertEqual(switches[0].ip, '10.145.88.1') + self.assertEqual( + switches[0].credential, { + 'version': 'v2c', 'community': 'public' + } + ) + self.assertEqual(switches[0].vendor, 'huawei') + + def test_switch_ip_unique(self): + def _call(): + with database.session() as session: + session.add(model.Switch(ip='10.145.88.1')) + session.add(model.Switch(ip='10.145.88.1')) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_machine_no_switch(self): + with database.session() as session: + session.add(model.Machine( + mac='00:00:00:01:02:03', + port='123', vlan=100)) + + with database.session() as session: + machines = session.query(model.Machine).all() + self.assertEqual(len(machines), 1) + self.assertEqual(machines[0].mac, '00:00:00:01:02:03') + self.assertEqual(machines[0].port, '123') + self.assertEqual(machines[0].vlan, 100) + self.assertIsNone(machines[0].switch) + + def test_machine_with_switch(self): + with database.session() as session: + switch = model.Switch(ip='192.168.1.1') + switch.machines.append( + model.Machine( + mac='00:00:00:01:02:03', + port='123', vlan=100) + ) + session.add(switch) + + with database.session() as session: + machines = session.query(model.Machine).all() + self.assertEqual(len(machines), 1) + self.assertEqual(machines[0].mac, '00:00:00:01:02:03') + self.assertEqual(machines[0].port, '123') + self.assertEqual(machines[0].vlan, 100) + self.assertIsNotNone(machines[0].switch) + + def test_machine_mac_switch_vlan_unique(self): + def _call(): + with database.session() as session: + machine1 = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100 + ) + machine2 = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100 + ) + switch = model.Switch(ip='192.168.1.1') + switch.machines = [machine1, machine2] + session.add(switch) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_machine_owned_by_one_switch(self): + with database.session() as session: + switch1 = model.Switch(ip='192.168.1.1') + switch2 = model.Switch(ip='192.168.1.2') + machine = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100 + ) + switch1.machines = [machine] + switch2.machines = [machine] + session.add(switch1) + session.add(switch2) + + with database.session() as session: + machine = session.query(model.Machine).first() + self.assertEqual(machine.switch.ip, '192.168.1.2') + + def test_del_switch(self): + with database.session() as session: + switch = model.Switch(ip='192.68.1.1') + switch.machines = [model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100 + )] + session.add(switch) + + with database.session() as session: + session.query(model.Switch).delete() + + with database.session() as session: + machines = session.query(model.Machine).all() + self.assertEqual(len(machines), 1) + self.assertEqual(machines[0].mac, '00:00:00:01:02:03') + self.assertIsNone(machines[0].switch) + + def test_del_machine(self): + with database.session() as session: + switch = model.Switch(ip='192.68.1.1') + switch.machines = [model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100 + )] + session.add(switch) + + with database.session() as session: + session.query(model.Machine).delete() + + with database.session() as session: + switch = session.query(model.Switch).first() + self.assertEqual(switch.machines.count(), 0) + + def test_cluster_no_name(self): + with database.session() as session: + session.add(model.Cluster()) + + with database.session() as session: + clusters = session.query(model.Cluster).all() + self.assertEqual(len(clusters), 1) + self.assertIsNotNone(clusters[0].name) + self.assertFalse(clusters[0].name == '') + + def test_cluster_empty_name(self): + with database.session() as session: + session.add(model.Cluster(name='')) + + with database.session() as session: + clusters = session.query(model.Cluster).all() + self.assertEqual(len(clusters), 1) + self.assertIsNotNone(clusters[0].name) + self.assertFalse(clusters[0].name == '') + + def test_cluster_name_unique(self): + def _call(): + with database.session() as session: + session.add(model.Cluster(name='cluster1')) + session.add(model.Cluster(name='cluster1')) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_adapter(self): + with database.session() as session: + session.add(model.Adapter( + name='CentOS_openstack', + os='CentOS', target_system='Openstack')) + + with database.session() as session: + adapters = session.query(model.Adapter).all() + self.assertEqual(len(adapters), 1) + self.assertEqual(adapters[0].name, 'CentOS_openstack') + self.assertEqual(adapters[0].os, 'CentOS') + self.assertEqual(adapters[0].target_system, 'Openstack') + + def test_adapter_name_unique(self): + def _call(): + with database.session() as session: + session.add(model.Adapter( + name='CentOS_openstack', + os='CentOS6.4', target_system='Openstack1')) + session.add(model.Adapter( + name='CentOS_openstack', + os='CentOSi6.5', target_system='Openstack2')) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_adapter_os_target_system_unique(self): + def _call(): + with database.session() as session: + session.add(model.Adapter( + name='CentOS_openstack1', + os='CentOS', target_system='Openstack')) + session.add(model.Adapter( + name='CentOS_openstack2', + os='CentOS', target_system='Openstack')) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_cluster_adapter(self): + with database.session() as session: + adapter = model.Adapter( + name='CentOS_openstack', + os='CentOS', + target_system='openstack') + cluster = model.Cluster( + name='cluster1') + cluster.adapter = adapter + session.add(cluster) + + with database.session() as session: + adapter = session.query(model.Adapter).first() + self.assertEqual(adapter.clusters.count(), 1) + + def test_cluster_del(self): + with database.session() as session: + adapter = model.Adapter( + name='CentOS_openstack', + os='CentOS', + target_system='openstack') + cluster = model.Cluster( + name='cluster1') + cluster.adapter = adapter + session.add(cluster) + + with database.session() as session: + session.query(model.Cluster).delete() + + with database.session() as session: + adapters = session.query(model.Adapter).all() + self.assertEqual(len(adapters), 1) + + def test_adapter_del(self): + with database.session() as session: + adapter = model.Adapter( + name='CentOS_openstack', + os='CentOS', + target_system='openstack') + cluster = model.Cluster( + name='cluster1') + cluster.adapter = adapter + session.add(cluster) + + with database.session() as session: + session.query(model.Adapter).delete() + + with database.session() as session: + cluster = session.query(model.Cluster).first() + self.assertIsNone(cluster.adapter) + + def test_cluster_config(self): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + cluster.security = { + 'user': 'abc', + 'password': '123' + } + cluster.networking = { + 'interface': 'eth0', + } + cluster.partition = '/tmp 20%' + session.add(cluster) + + with database.session() as session: + cluster = session.query(model.Cluster).first() + self.assertDictContainsSubset( + { + 'clustername': 'cluster1', + 'security': {'user': 'abc', 'password': '123'}, + 'networking': {'interface': 'eth0'}, + 'partition': '/tmp 20%' + }, cluster.config) + + def test_cluster_config_set(self): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + cluster.config = { + 'security': { + 'user': 'abc', + 'password': '123' + }, + 'networking': { + 'interface': 'eth0', + }, + 'partition': '/tmp 20%' + } + session.add(cluster) + + with database.session() as session: + cluster = session.query(model.Cluster).first() + self.assertEqual( + cluster.security, + {'user': 'abc', 'password': '123'}) + self.assertEqual( + cluster.networking, + {'interface': 'eth0'}) + self.assertEqual( + cluster.partition, + '/tmp 20%') + + def test_clusterhost(self): + with database.session() as session: + host = model.ClusterHost( + hostname='host1') + host.cluster = model.Cluster( + name='cluster1') + host.machine = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100) + host.machine.switch = model.Switch( + ip='192.168.1.1') + session.add(host) + + with database.session() as session: + host = session.query(model.ClusterHost).first() + self.assertEqual(host.cluster.name, 'cluster1') + self.assertEqual(host.machine.mac, '00:00:00:01:02:03') + self.assertEqual(host.machine.switch.ip, '192.168.1.1') + + def test_no_hostname(self): + with database.session() as session: + cluster = model.Cluster() + cluster.hosts = [model.ClusterHost()] + session.add(cluster) + + with database.session() as session: + hosts = session.query(model.ClusterHost).all() + self.assertEqual(len(hosts), 1) + self.assertIsNotNone(hosts[0].hostname) + self.assertFalse(hosts[0].hostname == '') + + def test_hostname_empty(self): + with database.session() as session: + cluster = model.Cluster() + cluster.hosts = [model.ClusterHost(hostname='')] + session.add(cluster) + + with database.session() as session: + hosts = session.query(model.ClusterHost).all() + self.assertEqual(len(hosts), 1) + self.assertIsNotNone(hosts[0].hostname) + self.assertFalse(hosts[0].hostname == '') + + def test_hostname_cluster_unique(self): + def _call(): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + cluster.hosts = [ + model.ClusterHost(hostname='host1'), + model.ClusterHost(hostname='host1') + ] + session.add(cluster) + + self.assertRaises(sqlalchemy.exc.IntegrityError, _call) + + def test_clusterhost_delete_cluster(self): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + cluster.hosts = [ + model.ClusterHost(hostname='host1') + ] + session.add(cluster) + + with database.session() as session: + session.query(model.Cluster).delete() + + with database.session() as session: + host = session.query(model.ClusterHost).first() + self.assertIsNone(host.cluster) + + def test_clusterhost_delete_machine(self): + with database.session() as session: + host = model.ClusterHost(hostname='host1') + host.machine = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100) + session.add(host) + + with database.session() as session: + session.query(model.Machine).delete() + + with database.session() as session: + host = session.query(model.ClusterHost).first() + self.assertIsNone(host.machine) + + def test_clusterhost_delete_host(self): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + host = model.ClusterHost(hostname='host1') + cluster.hosts = [host] + host.machine = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100) + session.add(cluster) + + with database.session() as session: + session.query(model.ClusterHost).delete() + + with database.session() as session: + cluster = session.query(model.Cluster).first() + self.assertEqual(cluster.hosts.count(), 0) + machine = session.query(model.Machine).first() + self.assertIsNone(machine.host) + + def test_host_config(self): + with database.session() as session: + cluster = model.Cluster(name='cluster1') + host = model.ClusterHost( + hostname='host1') + host.machine = model.Machine( + mac='00:00:00:01:02:03', + port='123', + vlan=100) + host.machine.switch = model.Switch( + ip='192.168.1.1') + cluster.hosts = [host] + host.config = { + 'networking': { + 'interfaces': { + 'management': {'ip': '192.168.1.100'} + } + } + } + session.add(cluster) + + with database.session() as session: + host = session.query(model.ClusterHost).first() + self.assertDictContainsSubset( + { + 'hostname': 'host1', + 'clustername': 'cluster1', + 'networking': { + 'interfaces': { + 'management': { + 'mac': '00:00:00:01:02:03', + 'ip': '192.168.1.100' + } + } + }, + 'switch_port': '123', + 'vlan': 100, + 'switch_ip': '192.168.1.1' + }, host.config) + + +if __name__ == '__main__': + flags.init() + logsetting.init() + unittest2.main()