Cisco plugin db code cleanup, part II

Remove an unused table.
Prefix Cisco tablenames with cisco_.
Make full use of neutron model base class.
DRY out the nexusport binding queries.
Follow coding guidelines for imports.
Improve cisco/db/network_db_v2.py test coverage.
Improve cisco/db/nexus_db_v2.py test coverage.

Change-Id: I8ea110de37552176f2d9bcbff4ca3e0b65dfacdc
Closes-bug: #1202687
Partial-bug: #1190619
This commit is contained in:
HenryGessau 2013-07-29 00:08:50 -04:00
parent 537674c33e
commit 08112f087f
9 changed files with 628 additions and 245 deletions

View File

@ -0,0 +1,72 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Cisco plugin db cleanup part II
Revision ID: 263772d65691
Revises: 35c7c198ddea
Create Date: 2013-07-29 02:31:26.646343
"""
# revision identifiers, used by Alembic.
revision = '263772d65691'
down_revision = '35c7c198ddea'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.plugins.cisco.network_plugin.PluginV2'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
if 'credentials' in sa.MetaData().tables:
op.rename_table('credentials', 'cisco_credentials')
if 'nexusport_bindings' in sa.MetaData().tables:
op.rename_table('nexusport_bindings', 'cisco_nexusport_bindings')
if 'qoss' in sa.MetaData().tables:
op.rename_table('qoss', 'cisco_qos_policies')
op.drop_table('cisco_vlan_ids')
def downgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
op.create_table(
'cisco_vlan_ids',
sa.Column('vlan_id', sa.Integer, nullable=False),
sa.Column('vlan_used', sa.Boolean),
sa.PrimaryKeyConstraint('vlan_id'),
)
if 'cisco_credentials' in sa.MetaData().tables:
op.rename_table('cisco_credentials', 'credentials')
if 'cisco_nexusport_bindings' in sa.MetaData().tables:
op.rename_table('cisco_nexusport_bindings', 'nexusport_bindings')
if 'cisco_qos_policies' in sa.MetaData().tables:
op.rename_table('cisco_qos_policies', 'qoss')

View File

@ -74,8 +74,8 @@ class CredentialNameNotFound(exceptions.NeutronException):
class CredentialAlreadyExists(exceptions.NeutronException):
"""Credential ID already exists."""
message = _("Credential %(credential_id)s already exists "
"""Credential already exists."""
message = _("Credential %(credential_name)s already exists "
"for tenant %(tenant_id)s")

View File

@ -20,90 +20,19 @@ from sqlalchemy.orm import exc
from neutron.db import api as db
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_models_v2
# Do NOT remove this import. It is required for all the models to be seen
# by db.initalize() when called from VirtualPhysicalSwitchModelV2.__init__.
from neutron.plugins.cisco.db import nexus_models_v2 # noqa
from neutron.plugins.openvswitch import ovs_models_v2
LOG = logging.getLogger(__name__)
def get_all_vlanids():
"""Gets all the vlanids."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
return session.query(network_models_v2.VlanID).all()
def is_vlanid_used(vlan_id):
"""Checks if a vlanid is in use."""
LOG.debug(_("is_vlanid_used() called"))
session = db.get_session()
try:
vlanid = (session.query(network_models_v2.VlanID).
filter_by(vlan_id=vlan_id).one())
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
def release_vlanid(vlan_id):
"""Sets the vlanid state to be unused."""
LOG.debug(_("release_vlanid() called"))
session = db.get_session()
try:
vlanid = (session.query(network_models_v2.VlanID).
filter_by(vlan_id=vlan_id).one())
vlanid["vlan_used"] = False
session.merge(vlanid)
session.flush()
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
def delete_vlanid(vlan_id):
"""Deletes a vlanid entry from db."""
LOG.debug(_("delete_vlanid() called"))
session = db.get_session()
try:
vlanid = (session.query(network_models_v2.VlanID).
filter_by(vlan_id=vlan_id).one())
session.delete(vlanid)
session.flush()
return vlanid
except exc.NoResultFound:
pass
def reserve_vlanid():
"""Reserves the first unused vlanid."""
LOG.debug(_("reserve_vlanid() called"))
session = db.get_session()
try:
rvlan = (session.query(network_models_v2.VlanID).
filter_by(vlan_used=False).first())
if not rvlan:
raise exc.NoResultFound
rvlanid = (session.query(network_models_v2.VlanID).
filter_by(vlan_id=rvlan["vlan_id"]).one())
rvlanid["vlan_used"] = True
session.merge(rvlanid)
session.flush()
return rvlan["vlan_id"]
except exc.NoResultFound:
raise c_exc.VlanIDNotAvailable()
def get_all_vlanids_used():
"""Gets all the vlanids used."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
return (session.query(network_models_v2.VlanID).
filter_by(vlan_used=True).all())
def get_all_qoss(tenant_id):
"""Lists all the qos to tenant associations."""
LOG.debug(_("get_all_qoss() called"))
@ -137,7 +66,10 @@ def add_qos(tenant_id, qos_name, qos_desc):
raise c_exc.QosNameAlreadyExists(qos_name=qos_name,
tenant_id=tenant_id)
except exc.NoResultFound:
qos = network_models_v2.QoS(tenant_id, qos_name, qos_desc)
qos = network_models_v2.QoS(qos_id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
qos_name=qos_name,
qos_desc=qos_desc)
session.add(qos)
session.flush()
return qos
@ -217,8 +149,12 @@ def add_credential(tenant_id, credential_name, user_name, password):
raise c_exc.CredentialAlreadyExists(credential_name=credential_name,
tenant_id=tenant_id)
except exc.NoResultFound:
cred = network_models_v2.Credential(tenant_id, credential_name,
user_name, password)
cred = network_models_v2.Credential(
credential_id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
credential_name=credential_name,
user_name=user_name,
password=password)
session.add(cred)
session.flush()
return cred

View File

@ -16,72 +16,32 @@
#
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
import sqlalchemy as sa
from neutron.db import model_base
from neutron.openstack.common import uuidutils
class VlanID(model_base.BASEV2):
"""Represents a vlan_id usage."""
__tablename__ = 'cisco_vlan_ids'
vlan_id = Column(Integer, primary_key=True)
vlan_used = Column(Boolean)
def __init__(self, vlan_id):
self.vlan_id = vlan_id
self.vlan_used = False
def __repr__(self):
return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
class QoS(model_base.BASEV2):
"""Represents QoS for a tenant."""
"""Represents QoS policies for a tenant."""
__tablename__ = 'qoss'
__tablename__ = 'cisco_qos_policies'
qos_id = Column(String(255))
tenant_id = Column(String(255), primary_key=True)
qos_name = Column(String(255), primary_key=True)
qos_desc = Column(String(255))
def __init__(self, tenant_id, qos_name, qos_desc):
self.qos_id = uuidutils.generate_uuid()
self.tenant_id = tenant_id
self.qos_name = qos_name
self.qos_desc = qos_desc
def __repr__(self):
return "<QoS(%s,%s,%s,%s)>" % (self.qos_id, self.tenant_id,
self.qos_name, self.qos_desc)
qos_id = sa.Column(sa.String(255))
tenant_id = sa.Column(sa.String(255), primary_key=True)
qos_name = sa.Column(sa.String(255), primary_key=True)
qos_desc = sa.Column(sa.String(255))
class Credential(model_base.BASEV2):
"""Represents credentials for a tenant."""
"""Represents credentials for a tenant to control Cisco switches."""
__tablename__ = 'credentials'
__tablename__ = 'cisco_credentials'
credential_id = Column(String(255))
tenant_id = Column(String(255), primary_key=True)
credential_name = Column(String(255), primary_key=True)
user_name = Column(String(255))
password = Column(String(255))
def __init__(self, tenant_id, credential_name, user_name, password):
self.credential_id = uuidutils.generate_uuid()
self.tenant_id = tenant_id
self.credential_name = credential_name
self.user_name = user_name
self.password = password
def __repr__(self):
return "<Credentials(%s,%s,%s,%s,%s)>" % (self.credential_id,
self.tenant_id,
self.credential_name,
self.user_name,
self.password)
credential_id = sa.Column(sa.String(255))
tenant_id = sa.Column(sa.String(255), primary_key=True)
credential_name = sa.Column(sa.String(255), primary_key=True)
user_name = sa.Column(sa.String(255))
password = sa.Column(sa.String(255))
class ProviderNetwork(model_base.BASEV2):
@ -89,8 +49,8 @@ class ProviderNetwork(model_base.BASEV2):
__tablename__ = 'cisco_provider_networks'
network_id = Column(String(36),
ForeignKey('networks.id', ondelete="CASCADE"),
primary_key=True)
network_type = Column(String(255), nullable=False)
segmentation_id = Column(Integer, nullable=False)
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
primary_key=True)
network_type = sa.Column(sa.String(255), nullable=False)
segmentation_id = sa.Column(sa.Integer, nullable=False)

View File

@ -18,7 +18,7 @@
# @author: Arvind Somya, Cisco Systems, Inc. (asomya@cisco.com)
#
from sqlalchemy.orm import exc
import sqlalchemy.orm.exc as sa_exc
import neutron.db.api as db
from neutron.openstack.common import log as logging
@ -29,48 +29,29 @@ from neutron.plugins.cisco.db import nexus_models_v2
LOG = logging.getLogger(__name__)
def get_all_nexusport_bindings():
"""Lists all the nexusport bindings."""
LOG.debug(_("get_all_nexusport_bindings() called"))
session = db.get_session()
return session.query(nexus_models_v2.NexusPortBinding).all()
def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Lists a nexusport binding."""
LOG.debug(_("get_nexusport_binding() called"))
session = db.get_session()
filters = dict(port_id=port_id, vlan_id=vlan_id, switch_ip=switch_ip,
instance_id=instance_id)
bindings = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(**filters).all())
if not bindings:
raise c_exc.NexusPortBindingNotFound(**filters)
return bindings
return _lookup_all_nexus_bindings(port_id=port_id,
vlan_id=vlan_id,
switch_ip=switch_ip,
instance_id=instance_id)
def get_nexusvlan_binding(vlan_id, switch_ip):
"""Lists a vlan and switch binding."""
LOG.debug(_("get_nexusvlan_binding() called"))
session = db.get_session()
filters = dict(vlan_id=vlan_id, switch_ip=switch_ip)
bindings = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(**filters).all())
if not bindings:
raise c_exc.NexusPortBindingNotFound(**filters)
return bindings
return _lookup_all_nexus_bindings(vlan_id=vlan_id, switch_ip=switch_ip)
def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Adds a nexusport binding."""
LOG.debug(_("add_nexusport_binding() called"))
session = db.get_session()
binding = nexus_models_v2.NexusPortBinding(
port_id, vlan_id, switch_ip, instance_id)
binding = nexus_models_v2.NexusPortBinding(port_id=port_id,
vlan_id=vlan_id,
switch_ip=switch_ip,
instance_id=instance_id)
session.add(binding)
session.flush()
return binding
@ -80,11 +61,11 @@ def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Removes a nexusport binding."""
LOG.debug(_("remove_nexusport_binding() called"))
session = db.get_session()
binding = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
filter_by(port_id=port_id).
filter_by(instance_id=instance_id).all())
binding = _lookup_all_nexus_bindings(session=session,
vlan_id=vlan_id,
switch_ip=switch_ip,
port_id=port_id,
instance_id=instance_id)
for bind in binding:
session.delete(bind)
session.flush()
@ -93,46 +74,31 @@ def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
def update_nexusport_binding(port_id, new_vlan_id):
"""Updates nexusport binding."""
if not new_vlan_id:
LOG.warning(_("update_nexusport_binding called with no vlan"))
return
LOG.debug(_("update_nexusport_binding called"))
session = db.get_session()
try:
binding = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(port_id=port_id).one())
if new_vlan_id:
binding["vlan_id"] = new_vlan_id
session.merge(binding)
session.flush()
return binding
except exc.NoResultFound:
raise c_exc.NexusPortBindingNotFound(port_id=port_id)
binding = _lookup_one_nexus_binding(session=session, port_id=port_id)
binding.vlan_id = new_vlan_id
session.merge(binding)
session.flush()
return binding
def get_nexusvm_binding(vlan_id, instance_id):
"""Lists nexusvm bindings."""
LOG.debug(_("get_nexusvm_binding() called"))
session = db.get_session()
filters = dict(instance_id=instance_id, vlan_id=vlan_id)
binding = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(**filters).first())
if not binding:
raise c_exc.NexusPortBindingNotFound(**filters)
return binding
return _lookup_first_nexus_binding(instance_id=instance_id,
vlan_id=vlan_id)
def get_port_vlan_switch_binding(port_id, vlan_id, switch_ip):
"""Lists nexusvm bindings."""
LOG.debug(_("get_port_vlan_switch_binding() called"))
session = db.get_session()
filters = dict(port_id=port_id, switch_ip=switch_ip, vlan_id=vlan_id)
bindings = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(**filters).all())
if not bindings:
raise c_exc.NexusPortBindingNotFound(**filters)
return bindings
return _lookup_all_nexus_bindings(port_id=port_id,
switch_ip=switch_ip,
vlan_id=vlan_id)
def get_port_switch_bindings(port_id, switch_ip):
@ -140,25 +106,48 @@ def get_port_switch_bindings(port_id, switch_ip):
LOG.debug(_("get_port_switch_bindings() called, "
"port:'%(port_id)s', switch:'%(switch_ip)s'"),
{'port_id': port_id, 'switch_ip': switch_ip})
session = db.get_session()
try:
binding = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(port_id=port_id).
filter_by(switch_ip=switch_ip).all())
return binding
except exc.NoResultFound:
return
return _lookup_all_nexus_bindings(port_id=port_id,
switch_ip=switch_ip)
except c_exc.NexusPortBindingNotFound:
pass
def get_nexussvi_bindings():
"""Lists nexus svi bindings."""
LOG.debug(_("get_nexussvi_bindings() called"))
session = db.get_session()
return _lookup_all_nexus_bindings(port_id='router')
filters = {'port_id': 'router'}
bindings = (session.query(nexus_models_v2.NexusPortBinding).
filter_by(**filters).all())
if not bindings:
raise c_exc.NexusPortBindingNotFound(**filters)
return bindings
def _lookup_nexus_bindings(query_type, session=None, **bfilter):
"""Look up 'query_type' Nexus bindings matching the filter.
:param query_type: 'all', 'one' or 'first'
:param session: db session
:param bfilter: filter for bindings query
:return: bindings if query gave a result, else
raise NexusPortBindingNotFound.
"""
if session is None:
session = db.get_session()
query_method = getattr(session.query(
nexus_models_v2.NexusPortBinding).filter_by(**bfilter), query_type)
try:
bindings = query_method()
if bindings:
return bindings
except sa_exc.NoResultFound:
pass
raise c_exc.NexusPortBindingNotFound(**bfilter)
def _lookup_all_nexus_bindings(session=None, **bfilter):
return _lookup_nexus_bindings('all', session, **bfilter)
def _lookup_one_nexus_binding(session=None, **bfilter):
return _lookup_nexus_bindings('one', session, **bfilter)
def _lookup_first_nexus_binding(session=None, **bfilter):
return _lookup_nexus_bindings('first', session, **bfilter)

View File

@ -15,7 +15,7 @@
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, Integer, String
import sqlalchemy as sa
from neutron.db import model_base
@ -23,25 +23,21 @@ from neutron.db import model_base
class NexusPortBinding(model_base.BASEV2):
"""Represents a binding of VM's to nexus ports."""
__tablename__ = "nexusport_bindings"
__tablename__ = "cisco_nexusport_bindings"
id = Column(Integer, primary_key=True, autoincrement=True)
port_id = Column(String(255))
vlan_id = Column(Integer, nullable=False)
switch_ip = Column(String(255))
instance_id = Column(String(255))
def __init__(self, port_id, vlan_id, switch_ip, instance_id):
self.port_id = port_id
self.vlan_id = vlan_id
self.switch_ip = switch_ip
self.instance_id = instance_id
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
port_id = sa.Column(sa.String(255))
vlan_id = sa.Column(sa.Integer, nullable=False)
switch_ip = sa.Column(sa.String(255))
instance_id = sa.Column(sa.String(255))
def __repr__(self):
return "<NexusPortBinding (%s,%d, %s, %s)>" % \
(self.port_id, self.vlan_id, self.switch_ip, self.instance_id)
"""Just the binding, without the id key."""
return ("<NexusPortBinding(%s,%s,%s,%s)>" %
(self.port_id, self.vlan_id, self.switch_ip, self.instance_id))
def __eq__(self, other):
"""Compare only the binding, without the id key."""
return (
self.port_id == other.port_id and
self.vlan_id == other.vlan_id and

View File

@ -177,7 +177,7 @@ class NexusPlugin(L2DevicePluginBase):
"""Remove VLAN SVI from the Nexus Switch."""
# Grab switch_ip from database
switch_ip = nxos_db.get_nexusvm_binding(vlan_id,
router_id)['switch_ip']
router_id).switch_ip
# Delete the SVI interface from the switch
self._client.delete_vlan_svi(switch_ip, vlan_id)
@ -263,37 +263,37 @@ class NexusPlugin(L2DevicePluginBase):
auto_untrunk = conf.CISCO.provider_vlan_auto_trunk
LOG.debug("delete_network(): provider vlan %s" % vlan_id)
switch_ip = row['switch_ip']
switch_ip = row.switch_ip
nexus_port = None
if row['port_id'] != 'router':
nexus_port = row['port_id']
if row.port_id != 'router':
nexus_port = row.port_id
nxos_db.remove_nexusport_binding(row['port_id'], row['vlan_id'],
row['switch_ip'],
row['instance_id'])
nxos_db.remove_nexusport_binding(row.port_id, row.vlan_id,
row.switch_ip,
row.instance_id)
# Check for any other bindings with the same vlan_id and switch_ip
try:
nxos_db.get_nexusvlan_binding(row['vlan_id'], row['switch_ip'])
nxos_db.get_nexusvlan_binding(row.vlan_id, row.switch_ip)
except cisco_exc.NexusPortBindingNotFound:
try:
# Delete this vlan from this switch
if nexus_port and auto_untrunk:
self._client.disable_vlan_on_trunk_int(
switch_ip, row['vlan_id'], nexus_port)
switch_ip, row.vlan_id, nexus_port)
if auto_delete:
self._client.delete_vlan(switch_ip, row['vlan_id'])
self._client.delete_vlan(switch_ip, row.vlan_id)
except Exception:
# The delete vlan operation on the Nexus failed,
# so this delete_port request has failed. For
# consistency, roll back the Nexus database to what
# it was before this request.
with excutils.save_and_reraise_exception():
nxos_db.add_nexusport_binding(row['port_id'],
row['vlan_id'],
row['switch_ip'],
row['instance_id'])
nxos_db.add_nexusport_binding(row.port_id,
row.vlan_id,
row.switch_ip,
row.instance_id)
return row['instance_id']
return row.instance_id
def update_port(self, tenant_id, net_id, port_id, port_state, **kwargs):
"""Update port.

View File

@ -0,0 +1,240 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import testtools
from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.tests import base
class CiscoNetworkQosDbTest(base.BaseTestCase):
"""Unit tests for cisco.db.network_models_v2.QoS model."""
QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
def setUp(self):
super(CiscoNetworkQosDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.addCleanup(db.clear_db)
def _qos_test_obj(self, tnum, qnum, desc=None):
"""Create a Qos test object from a pair of numbers."""
if desc is None:
desc = 'test qos %s-%s' % (str(tnum), str(qnum))
tenant = 'tenant_%s' % str(tnum)
qname = 'qos_%s' % str(qnum)
return self.QosObj(tenant, qname, desc)
def _assert_equal(self, qos, qos_obj):
self.assertEqual(qos.tenant_id, qos_obj.tenant)
self.assertEqual(qos.qos_name, qos_obj.qname)
self.assertEqual(qos.qos_desc, qos_obj.desc)
def test_qos_add_remove(self):
qos11 = self._qos_test_obj(1, 1)
qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc)
self._assert_equal(qos, qos11)
qos_id = qos.qos_id
qos = cdb.remove_qos(qos11.tenant, qos_id)
self._assert_equal(qos, qos11)
qos = cdb.remove_qos(qos11.tenant, qos_id)
self.assertIsNone(qos)
def test_qos_add_dup(self):
qos22 = self._qos_test_obj(2, 2)
qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc)
self._assert_equal(qos, qos22)
qos_id = qos.qos_id
with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22")
qos = cdb.remove_qos(qos22.tenant, qos_id)
self._assert_equal(qos, qos22)
qos = cdb.remove_qos(qos22.tenant, qos_id)
self.assertIsNone(qos)
def test_qos_get(self):
qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
qos21 = self._qos_test_obj(2, 1)
qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id
qos22 = self._qos_test_obj(2, 2)
qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id
qos = cdb.get_qos(qos11.tenant, qos11_id)
self._assert_equal(qos, qos11)
qos = cdb.get_qos(qos21.tenant, qos21_id)
self._assert_equal(qos, qos21)
qos = cdb.get_qos(qos21.tenant, qos22_id)
self._assert_equal(qos, qos22)
with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, "dummyQosId")
with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, qos21_id)
with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos21.tenant, qos11_id)
qos_all_t1 = cdb.get_all_qoss(qos11.tenant)
self.assertEqual(len(qos_all_t1), 1)
qos_all_t2 = cdb.get_all_qoss(qos21.tenant)
self.assertEqual(len(qos_all_t2), 2)
qos_all_t3 = cdb.get_all_qoss("tenant3")
self.assertEqual(len(qos_all_t3), 0)
def test_qos_update(self):
qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id
cdb.update_qos(qos11.tenant, qos11_id)
new_qname = "new qos name"
new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname)
expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
self._assert_equal(new_qos, expected_qobj)
new_qos = cdb.get_qos(qos11.tenant, qos11_id)
self._assert_equal(new_qos, expected_qobj)
with testtools.ExpectedException(c_exc.QosNotFound):
cdb.update_qos(qos11.tenant, "dummyQosId")
class CiscoNetworkCredentialDbTest(base.BaseTestCase):
"""Unit tests for cisco.db.network_models_v2.Credential model."""
CredObj = collections.namedtuple('CredObj', 'tenant cname usr pwd')
def setUp(self):
super(CiscoNetworkCredentialDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.addCleanup(db.clear_db)
def _cred_test_obj(self, tnum, cnum):
"""Create a Credential test object from a pair of numbers."""
tenant = 'tenant_%s' % str(tnum)
cname = 'credential_%s' % str(cnum)
usr = 'User_%s_%s' % (str(tnum), str(cnum))
pwd = 'Password_%s_%s' % (str(tnum), str(cnum))
return self.CredObj(tenant, cname, usr, pwd)
def _assert_equal(self, credential, cred_obj):
self.assertEqual(credential.tenant_id, cred_obj.tenant)
self.assertEqual(credential.credential_name, cred_obj.cname)
self.assertEqual(credential.user_name, cred_obj.usr)
self.assertEqual(credential.password, cred_obj.pwd)
def test_credential_add_remove(self):
cred11 = self._cred_test_obj(1, 1)
cred = cdb.add_credential(
cred11.tenant, cred11.cname, cred11.usr, cred11.pwd)
self._assert_equal(cred, cred11)
cred_id = cred.credential_id
cred = cdb.remove_credential(cred11.tenant, cred_id)
self._assert_equal(cred, cred11)
cred = cdb.remove_credential(cred11.tenant, cred_id)
self.assertIsNone(cred)
def test_credential_add_dup(self):
cred22 = self._cred_test_obj(2, 2)
cred = cdb.add_credential(
cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
self._assert_equal(cred, cred22)
cred_id = cred.credential_id
with testtools.ExpectedException(c_exc.CredentialAlreadyExists):
cdb.add_credential(
cred22.tenant, cred22.cname, cred22.usr, cred22.pwd)
cred = cdb.remove_credential(cred22.tenant, cred_id)
self._assert_equal(cred, cred22)
cred = cdb.remove_credential(cred22.tenant, cred_id)
self.assertIsNone(cred)
def test_credential_get_id(self):
cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential(
cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
cred21 = self._cred_test_obj(2, 1)
cred21_id = cdb.add_credential(
cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
cred22 = self._cred_test_obj(2, 2)
cred22_id = cdb.add_credential(
cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
cred = cdb.get_credential(cred11.tenant, cred11_id)
self._assert_equal(cred, cred11)
cred = cdb.get_credential(cred21.tenant, cred21_id)
self._assert_equal(cred, cred21)
cred = cdb.get_credential(cred21.tenant, cred22_id)
self._assert_equal(cred, cred22)
with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.get_credential(cred11.tenant, "dummyCredentialId")
with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.get_credential(cred11.tenant, cred21_id)
with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.get_credential(cred21.tenant, cred11_id)
cred_all_t1 = cdb.get_all_credentials(cred11.tenant)
self.assertEqual(len(cred_all_t1), 1)
cred_all_t2 = cdb.get_all_credentials(cred21.tenant)
self.assertEqual(len(cred_all_t2), 2)
cred_all_t3 = cdb.get_all_credentials("dummyTenant")
self.assertEqual(len(cred_all_t3), 0)
def test_credential_get_name(self):
cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential(
cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
cred21 = self._cred_test_obj(2, 1)
cred21_id = cdb.add_credential(
cred21.tenant, cred21.cname, cred21.usr, cred21.pwd).credential_id
cred22 = self._cred_test_obj(2, 2)
cred22_id = cdb.add_credential(
cred22.tenant, cred22.cname, cred22.usr, cred22.pwd).credential_id
self.assertNotEqual(cred11_id, cred21_id)
self.assertNotEqual(cred11_id, cred22_id)
self.assertNotEqual(cred21_id, cred22_id)
cred = cdb.get_credential_name(cred11.tenant, cred11.cname)
self._assert_equal(cred, cred11)
cred = cdb.get_credential_name(cred21.tenant, cred21.cname)
self._assert_equal(cred, cred21)
cred = cdb.get_credential_name(cred22.tenant, cred22.cname)
self._assert_equal(cred, cred22)
with testtools.ExpectedException(c_exc.CredentialNameNotFound):
cdb.get_credential_name(cred11.tenant, "dummyCredentialName")
with testtools.ExpectedException(c_exc.CredentialNameNotFound):
cdb.get_credential_name(cred11.tenant, cred22.cname)
def test_credential_update(self):
cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential(
cred11.tenant, cred11.cname, cred11.usr, cred11.pwd).credential_id
cdb.update_credential(cred11.tenant, cred11_id)
new_usr = "new user name"
new_pwd = "new password"
new_credential = cdb.update_credential(
cred11.tenant, cred11_id, new_usr, new_pwd)
expected_cred = self.CredObj(
cred11.tenant, cred11.cname, new_usr, new_pwd)
self._assert_equal(new_credential, expected_cred)
new_credential = cdb.get_credential(cred11.tenant, cred11_id)
self._assert_equal(new_credential, expected_cred)
with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.update_credential(
cred11.tenant, "dummyCredentialId", new_usr, new_pwd)

View File

@ -0,0 +1,190 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import testtools
from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import nexus_db_v2 as nxdb
from neutron.tests import base
class CiscoNexusDbTest(base.BaseTestCase):
"""Unit tests for cisco.db.nexus_models_v2.NexusPortBinding model."""
NpbObj = collections.namedtuple('NpbObj', 'port vlan switch instance')
def setUp(self):
super(CiscoNexusDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.addCleanup(db.clear_db)
def _npb_test_obj(self, pnum, vnum, switch=None, instance=None):
"""Create a Nexus port binding test object from a pair of numbers."""
if pnum is 'router':
port = pnum
else:
port = '1/%s' % str(pnum)
vlan = str(vnum)
if switch is None:
switch = '10.9.8.7'
if instance is None:
instance = 'instance_%s_%s' % (str(pnum), str(vnum))
return self.NpbObj(port, vlan, switch, instance)
def _assert_equal(self, npb, npb_obj):
self.assertEqual(npb.port_id, npb_obj.port)
self.assertEqual(int(npb.vlan_id), int(npb_obj.vlan))
self.assertEqual(npb.switch_ip, npb_obj.switch)
self.assertEqual(npb.instance_id, npb_obj.instance)
def _add_to_db(self, npbs):
for npb in npbs:
nxdb.add_nexusport_binding(
npb.port, npb.vlan, npb.switch, npb.instance)
def test_nexusportbinding_add_remove(self):
npb11 = self._npb_test_obj(10, 100)
npb = nxdb.add_nexusport_binding(
npb11.port, npb11.vlan, npb11.switch, npb11.instance)
self._assert_equal(npb, npb11)
npb = nxdb.remove_nexusport_binding(
npb11.port, npb11.vlan, npb11.switch, npb11.instance)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb11)
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.remove_nexusport_binding(
npb11.port, npb11.vlan, npb11.switch, npb11.instance)
def test_nexusportbinding_get(self):
npb11 = self._npb_test_obj(10, 100)
npb21 = self._npb_test_obj(20, 100)
npb22 = self._npb_test_obj(20, 200)
self._add_to_db([npb11, npb21, npb22])
npb = nxdb.get_nexusport_binding(
npb11.port, npb11.vlan, npb11.switch, npb11.instance)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb11)
npb = nxdb.get_nexusport_binding(
npb21.port, npb21.vlan, npb21.switch, npb21.instance)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb21)
npb = nxdb.get_nexusport_binding(
npb22.port, npb22.vlan, npb22.switch, npb22.instance)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb22)
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.get_nexusport_binding(
npb21.port, npb21.vlan, npb21.switch, "dummyInstance")
def test_nexusvlanbinding_get(self):
npb11 = self._npb_test_obj(10, 100)
npb21 = self._npb_test_obj(20, 100)
npb22 = self._npb_test_obj(20, 200)
self._add_to_db([npb11, npb21, npb22])
npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, npb11.switch)
self.assertEqual(len(npb_all_v100), 2)
npb_v200 = nxdb.get_nexusvlan_binding(npb22.vlan, npb22.switch)
self.assertEqual(len(npb_v200), 1)
self._assert_equal(npb_v200[0], npb22)
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.get_nexusvlan_binding(npb21.vlan, "dummySwitch")
def test_nexusvmbinding_get(self):
npb11 = self._npb_test_obj(10, 100)
npb21 = self._npb_test_obj(20, 100)
npb22 = self._npb_test_obj(20, 200)
self._add_to_db([npb11, npb21, npb22])
npb = nxdb.get_nexusvm_binding(npb21.vlan, npb21.instance)
self._assert_equal(npb, npb21)
npb = nxdb.get_nexusvm_binding(npb22.vlan, npb22.instance)
self._assert_equal(npb, npb22)
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.get_nexusvm_binding(npb21.vlan, "dummyInstance")
def test_nexusportvlanswitchbinding_get(self):
npb11 = self._npb_test_obj(10, 100)
npb21 = self._npb_test_obj(20, 100)
self._add_to_db([npb11, npb21])
npb = nxdb.get_port_vlan_switch_binding(
npb11.port, npb11.vlan, npb11.switch)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb11)
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.get_port_vlan_switch_binding(
npb21.port, npb21.vlan, "dummySwitch")
def test_nexusportswitchbinding_get(self):
npb11 = self._npb_test_obj(10, 100)
npb21 = self._npb_test_obj(20, 100, switch='2.2.2.2')
npb22 = self._npb_test_obj(20, 200, switch='2.2.2.2')
self._add_to_db([npb11, npb21, npb22])
npb = nxdb.get_port_switch_bindings(npb11.port, npb11.switch)
self.assertEqual(len(npb), 1)
self._assert_equal(npb[0], npb11)
npb_all_p20 = nxdb.get_port_switch_bindings(npb21.port, npb21.switch)
self.assertEqual(len(npb_all_p20), 2)
npb = nxdb.get_port_switch_bindings(npb21.port, "dummySwitch")
self.assertIsNone(npb)
def test_nexussvibinding_get(self):
npbr1 = self._npb_test_obj('router', 100)
npb21 = self._npb_test_obj(20, 100)
self._add_to_db([npbr1, npb21])
npb_svi = nxdb.get_nexussvi_bindings()
self.assertEqual(len(npb_svi), 1)
self._assert_equal(npb_svi[0], npbr1)
npbr2 = self._npb_test_obj('router', 200)
self._add_to_db([npbr2])
npb_svi = nxdb.get_nexussvi_bindings()
self.assertEqual(len(npb_svi), 2)
def test_nexusbinding_update(self):
npb11 = self._npb_test_obj(10, 100, switch='1.1.1.1', instance='test')
npb21 = self._npb_test_obj(20, 100, switch='1.1.1.1', instance='test')
self._add_to_db([npb11, npb21])
npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
self.assertEqual(len(npb_all_v100), 2)
npb22 = self._npb_test_obj(20, 200, switch='1.1.1.1', instance='test')
npb = nxdb.update_nexusport_binding(npb21.port, 200)
self._assert_equal(npb, npb22)
npb_all_v100 = nxdb.get_nexusvlan_binding(npb11.vlan, '1.1.1.1')
self.assertEqual(len(npb_all_v100), 1)
self._assert_equal(npb_all_v100[0], npb11)
npb = nxdb.update_nexusport_binding(npb21.port, 0)
self.assertIsNone(npb)
npb33 = self._npb_test_obj(30, 300, switch='1.1.1.1', instance='test')
with testtools.ExpectedException(c_exc.NexusPortBindingNotFound):
nxdb.update_nexusport_binding(npb33.port, 200)