Cisco plugin db code cleanup

Remove redundant model classes.
Follow code style guidelines for imports.

Fixes bug 1200723

Change-Id: I2949065bb1d053805032e0a429f77e93dfc61a55
This commit is contained in:
HenryGessau 2013-07-12 13:49:03 -04:00
parent bd18990d78
commit eafcc4107c
7 changed files with 4 additions and 974 deletions

View File

@ -1,313 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Brad Hall, Nicira Networks, Inc.
# @author: Dan Wendlandt, Nicira Networks, Inc.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, exc, joinedload
from neutron.common import constants
from neutron.common import exceptions as q_exc
from neutron.plugins.cisco.db import models
_ENGINE = None
_MAKER = None
BASE = models.BASE
def configure_db(options):
"""Configure database.
Establish the database, create an engine if needed, and register the
models.
:param options: Mapping of configuration options
"""
global _ENGINE
if not _ENGINE:
_ENGINE = create_engine(options['sql_connection'],
echo=False,
echo_pool=True,
pool_recycle=3600)
register_models()
def clear_db():
global _ENGINE
assert _ENGINE
for table in reversed(BASE.metadata.sorted_tables):
_ENGINE.execute(table.delete())
def get_session(autocommit=True, expire_on_commit=False):
"""Helper method to grab session."""
global _MAKER, _ENGINE
if not _MAKER:
assert _ENGINE
_MAKER = sessionmaker(bind=_ENGINE,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
return _MAKER()
def register_models():
"""Register Models and create properties."""
global _ENGINE
assert _ENGINE
BASE.metadata.create_all(_ENGINE)
def unregister_models():
"""Unregister Models, useful clearing out data before testing."""
global _ENGINE
assert _ENGINE
BASE.metadata.drop_all(_ENGINE)
def network_create(tenant_id, name):
session = get_session()
with session.begin():
net = models.Network(tenant_id, name)
session.add(net)
session.flush()
return net
def network_list(tenant_id):
session = get_session()
return (session.query(models.Network).
options(joinedload(models.Network.ports)).
filter_by(tenant_id=tenant_id).
all())
def network_id(net_name):
session = get_session()
networks = (session.query(models.Network).
options(joinedload(models.Network.ports)).
filter_by(name=net_name).
all())
if networks:
return networks
raise q_exc.NetworkNotFound(net_name=net_name)
def network_get(net_id):
session = get_session()
try:
return session.query(models.Network).\
options(joinedload(models.Network.ports)). \
filter_by(uuid=net_id).\
one()
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
def network_update(net_id, tenant_id, **kwargs):
session = get_session()
net = network_get(net_id)
for key in kwargs.keys():
net[key] = kwargs[key]
session.merge(net)
session.flush()
return net
def network_destroy(net_id):
session = get_session()
try:
net = (session.query(models.Network).
filter_by(uuid=net_id).
one())
session.delete(net)
session.flush()
return net
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
def validate_network_ownership(tenant_id, net_id):
session = get_session()
try:
return (session.query(models.Network).
filter_by(uuid=net_id).
filter_by(tenant_id=tenant_id).
one())
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
def port_create(net_id, state=None):
# confirm network exists
network_get(net_id)
session = get_session()
with session.begin():
port = models.Port(net_id)
port['state'] = state or 'DOWN'
session.add(port)
session.flush()
return port
def port_list(net_id):
session = get_session()
return (session.query(models.Port).
options(joinedload(models.Port.network)).
filter_by(network_id=net_id).
all())
def port_get(net_id, port_id):
# confirm network exists
network_get(net_id)
session = get_session()
try:
return (session.query(models.Port).
filter_by(uuid=port_id).
filter_by(network_id=net_id).
one())
except exc.NoResultFound:
raise q_exc.PortNotFound(net_id=net_id, port_id=port_id)
def port_update(port_id, net_id, **kwargs):
# confirm network exists
network_get(net_id)
port = port_get(net_id, port_id)
session = get_session()
for key in kwargs.keys():
if key == "state":
if kwargs[key] not in (constants.PORT_STATUS_ACTIVE,
constants.PORT_STATUS_DOWN):
raise q_exc.StateInvalid(port_state=kwargs[key])
port[key] = kwargs[key]
session.merge(port)
session.flush()
return port
def port_set_attachment(net_id, port_id, new_interface_id):
# confirm network exists
network_get(net_id)
session = get_session()
port = port_get(net_id, port_id)
if new_interface_id != "":
# We are setting, not clearing, the attachment-id
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
device_id=port['interface_id'])
try:
port = (session.query(models.Port).
filter_by(interface_id=new_interface_id).
one())
raise q_exc.AlreadyAttached(net_id=net_id,
port_id=port_id,
att_id=new_interface_id,
att_port_id=port['uuid'])
except exc.NoResultFound:
# this is what should happen
pass
port.interface_id = new_interface_id
session.merge(port)
session.flush()
return port
def port_unset_attachment(net_id, port_id):
# confirm network exists
network_get(net_id)
session = get_session()
port = port_get(net_id, port_id)
port.interface_id = None
session.merge(port)
session.flush()
return port
def port_destroy(net_id, port_id):
# confirm network exists
network_get(net_id)
session = get_session()
try:
port = (session.query(models.Port).
filter_by(uuid=port_id).
filter_by(network_id=net_id).
one())
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
device_id=port['interface_id'])
session.delete(port)
session.flush()
return port
except exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id)
#methods using just port_id
def port_get_by_id(port_id):
session = get_session()
try:
return (session.query(models.Port).
filter_by(uuid=port_id).one())
except exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id)
def port_set_attachment_by_id(port_id, new_interface_id):
session = get_session()
port = port_get_by_id(port_id)
if new_interface_id != "":
if port['interface_id']:
raise q_exc.PortInUse(port_id=port_id,
device_id=port['interface_id'])
try:
port = session.query(models.Port).filter_by(
interface_id=new_interface_id).one()
raise q_exc.AlreadyAttached(port_id=port_id,
att_id=new_interface_id,
att_port_id=port['uuid'])
except exc.NoResultFound:
pass
port.interface_id = new_interface_id
session.merge(port)
session.flush()
return port
def port_unset_attachment_by_id(port_id):
session = get_session()
port = port_get_by_id(port_id)
port.interface_id = None
session.merge(port)
session.flush()
return port
def validate_port_ownership(tenant_id, net_id, port_id, session=None):
validate_network_ownership(tenant_id, net_id)
port_get(net_id, port_id)

View File

@ -1,350 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy.orm import exc
from neutron.common import exceptions as q_exc
from neutron.openstack.common import log as logging
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import l2network_models
import neutron.plugins.cisco.db.api as db
LOG = logging.getLogger(__name__)
def initialize():
"""Establish database connection and load models."""
db.configure_db()
def create_vlanids():
"""Prepopulates the vlan_bindings table."""
LOG.debug(_("create_vlanids() called"))
session = db.get_session()
try:
vlanid = session.query(l2network_models.VlanID).one()
except exc.MultipleResultsFound:
pass
except exc.NoResultFound:
start = int(config.CISCO.vlan_start)
end = int(config.CISCO.vlan_end)
while start <= end:
vlanid = l2network_models.VlanID(start)
session.add(vlanid)
start += 1
session.flush()
return
def get_all_vlanids():
"""Gets all the vlanids."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
return session.query(l2network_models.VlanID).all()
def is_vlanid_used(vlan_id):
"""Checks if a vlanid is in use."""
LOG.debug(_("is_vlanid_used() called"))
session = db.get_session()
try:
vlanid = (session.query(l2network_models.VlanID).
filter_by(vlan_id=vlan_id).one())
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
def release_vlanid(vlan_id):
"""Sets the vlanid state to be unused."""
LOG.debug(_("release_vlanid() called"))
session = db.get_session()
try:
vlanid = (session.query(l2network_models.VlanID).
filter_by(vlan_id=vlan_id).one())
vlanid["vlan_used"] = False
session.merge(vlanid)
session.flush()
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
return
def delete_vlanid(vlan_id):
"""Deletes a vlanid entry from db."""
LOG.debug(_("delete_vlanid() called"))
session = db.get_session()
try:
vlanid = (session.query(l2network_models.VlanID).
filter_by(vlan_id=vlan_id).one())
session.delete(vlanid)
session.flush()
return vlanid
except exc.NoResultFound:
pass
def reserve_vlanid():
"""Reserves the first unused vlanid."""
LOG.debug(_("reserve_vlanid() called"))
session = db.get_session()
try:
rvlan = (session.query(l2network_models.VlanID).
filter_by(vlan_used=False).first())
if not rvlan:
raise exc.NoResultFound
rvlanid = (session.query(l2network_models.VlanID).
filter_by(vlan_id=rvlan["vlan_id"]).one())
rvlanid["vlan_used"] = True
session.merge(rvlanid)
session.flush()
return rvlan["vlan_id"]
except exc.NoResultFound:
raise c_exc.VlanIDNotAvailable()
def get_all_vlanids_used():
"""Gets all the vlanids used."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
return (session.query(l2network_models.VlanID).
filter_by(vlan_used=True).all())
def get_all_vlan_bindings():
"""Lists all the vlan to network associations."""
LOG.debug(_("get_all_vlan_bindings() called"))
session = db.get_session()
return session.query(l2network_models.VlanBinding).all()
def get_vlan_binding(netid):
"""Lists the vlan given a network_id."""
LOG.debug(_("get_vlan_binding() called"))
session = db.get_session()
try:
binding = (session.query(l2network_models.VlanBinding).
filter_by(network_id=netid).one())
return binding
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=netid)
def add_vlan_binding(vlanid, vlanname, netid):
"""Adds a vlan to network association."""
LOG.debug(_("add_vlan_binding() called"))
session = db.get_session()
try:
binding = (session.query(l2network_models.VlanBinding).
filter_by(vlan_id=vlanid).one())
raise c_exc.NetworkVlanBindingAlreadyExists(vlan_id=vlanid,
network_id=netid)
except exc.NoResultFound:
binding = l2network_models.VlanBinding(vlanid, vlanname, netid)
session.add(binding)
session.flush()
return binding
def remove_vlan_binding(netid):
"""Removes a vlan to network association."""
LOG.debug(_("remove_vlan_binding() called"))
session = db.get_session()
try:
binding = (session.query(l2network_models.VlanBinding).
filter_by(network_id=netid).one())
session.delete(binding)
session.flush()
return binding
except exc.NoResultFound:
pass
def update_vlan_binding(netid, newvlanid=None, newvlanname=None):
"""Updates a vlan to network association."""
LOG.debug(_("update_vlan_binding() called"))
session = db.get_session()
try:
binding = (session.query(l2network_models.VlanBinding).
filter_by(network_id=netid).one())
if newvlanid:
binding["vlan_id"] = newvlanid
if newvlanname:
binding["vlan_name"] = newvlanname
session.merge(binding)
session.flush()
return binding
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=netid)
def get_all_qoss(tenant_id):
"""Lists all the qos to tenant associations."""
LOG.debug(_("get_all_qoss() called"))
session = db.get_session()
return (session.query(l2network_models.QoS).
filter_by(tenant_id=tenant_id).all())
def get_qos(tenant_id, qos_id):
"""Lists the qos given a tenant_id and qos_id."""
LOG.debug(_("get_qos() called"))
session = db.get_session()
try:
qos = (session.query(l2network_models.QoS).
filter_by(tenant_id=tenant_id).
filter_by(qos_id=qos_id).one())
return qos
except exc.NoResultFound:
raise c_exc.QosNotFound(qos_id=qos_id,
tenant_id=tenant_id)
def add_qos(tenant_id, qos_name, qos_desc):
"""Adds a qos to tenant association."""
LOG.debug(_("add_qos() called"))
session = db.get_session()
try:
qos = (session.query(l2network_models.QoS).
filter_by(tenant_id=tenant_id).
filter_by(qos_name=qos_name).one())
raise c_exc.QosNameAlreadyExists(qos_name=qos_name,
tenant_id=tenant_id)
except exc.NoResultFound:
qos = l2network_models.QoS(tenant_id, qos_name, qos_desc)
session.add(qos)
session.flush()
return qos
def remove_qos(tenant_id, qos_id):
"""Removes a qos to tenant association."""
session = db.get_session()
try:
qos = (session.query(l2network_models.QoS).
filter_by(tenant_id=tenant_id).
filter_by(qos_id=qos_id).one())
session.delete(qos)
session.flush()
return qos
except exc.NoResultFound:
pass
def update_qos(tenant_id, qos_id, new_qos_name=None):
"""Updates a qos to tenant association."""
session = db.get_session()
try:
qos = (session.query(l2network_models.QoS).
filter_by(tenant_id=tenant_id).
filter_by(qos_id=qos_id).one())
if new_qos_name:
qos["qos_name"] = new_qos_name
session.merge(qos)
session.flush()
return qos
except exc.NoResultFound:
raise c_exc.QosNotFound(qos_id=qos_id,
tenant_id=tenant_id)
def get_all_credentials(tenant_id):
"""Lists all the creds for a tenant."""
session = db.get_session()
return (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).all())
def get_credential(tenant_id, credential_id):
"""Lists the creds for given a cred_id and tenant_id."""
session = db.get_session()
try:
cred = (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).
filter_by(credential_id=credential_id).one())
return cred
except exc.NoResultFound:
raise c_exc.CredentialNotFound(credential_id=credential_id,
tenant_id=tenant_id)
def get_credential_name(tenant_id, credential_name):
"""Lists the creds for given a cred_name and tenant_id."""
session = db.get_session()
try:
cred = (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).
filter_by(credential_name=credential_name).one())
return cred
except exc.NoResultFound:
raise c_exc.CredentialNameNotFound(credential_name=credential_name,
tenant_id=tenant_id)
def add_credential(tenant_id, credential_name, user_name, password):
"""Adds a qos to tenant association."""
session = db.get_session()
try:
cred = (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).
filter_by(credential_name=credential_name).one())
raise c_exc.CredentialAlreadyExists(credential_name=credential_name,
tenant_id=tenant_id)
except exc.NoResultFound:
cred = l2network_models.Credential(tenant_id, credential_name,
user_name, password)
session.add(cred)
session.flush()
return cred
def remove_credential(tenant_id, credential_id):
"""Removes a credential from a tenant."""
session = db.get_session()
try:
cred = (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).
filter_by(credential_id=credential_id).one())
session.delete(cred)
session.flush()
return cred
except exc.NoResultFound:
pass
def update_credential(tenant_id, credential_id,
new_user_name=None, new_password=None):
"""Updates a credential for a tenant."""
session = db.get_session()
try:
cred = (session.query(l2network_models.Credential).
filter_by(tenant_id=tenant_id).
filter_by(credential_id=credential_id).one())
if new_user_name:
cred["user_name"] = new_user_name
if new_password:
cred["password"] = new_password
session.merge(cred)
session.flush()
return cred
except exc.NoResultFound:
raise c_exc.CredentialNotFound(credential_id=credential_id,
tenant_id=tenant_id)

View File

@ -1,150 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import object_mapper
from neutron.openstack.common import uuidutils
from neutron.plugins.cisco.db.models import BASE
class L2NetworkBase(object):
"""Base class for L2Network Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
def __setitem__(self, key, value):
"""Internal Dict set method."""
setattr(self, key, value)
def __getitem__(self, key):
"""Internal Dict get method."""
return getattr(self, key)
def get(self, key, default=None):
"""Dict get method."""
return getattr(self, key, default)
def __iter__(self):
"""Iterate over table columns."""
self._i = iter(object_mapper(self).columns)
return self
def next(self):
"""Next method for the iterator."""
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
class VlanID(BASE, L2NetworkBase):
"""Represents a vlan_id usage."""
__tablename__ = 'vlan_ids'
vlan_id = Column(Integer, primary_key=True)
vlan_used = Column(Boolean)
def __init__(self, vlan_id):
self.vlan_id = vlan_id
self.vlan_used = False
def __repr__(self):
return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
class VlanBinding(BASE, L2NetworkBase):
"""Represents a binding of vlan_id to network_id."""
__tablename__ = 'vlan_bindings'
vlan_id = Column(Integer, primary_key=True)
vlan_name = Column(String(255))
network_id = Column(String(255),
nullable=False)
def __init__(self, vlan_id, vlan_name, network_id):
self.vlan_id = vlan_id
self.vlan_name = vlan_name
self.network_id = network_id
def __repr__(self):
return "<VlanBinding(%d,%s,%s)>" % (self.vlan_id,
self.vlan_name,
self.network_id)
class QoS(BASE, L2NetworkBase):
"""Represents QoS for a tenant."""
__tablename__ = 'qoss'
qos_id = Column(String(255))
tenant_id = Column(String(255), primary_key=True)
qos_name = Column(String(255), primary_key=True)
qos_desc = Column(String(255))
def __init__(self, tenant_id, qos_name, qos_desc):
self.qos_id = uuidutils.generate_uuid()
self.tenant_id = tenant_id
self.qos_name = qos_name
self.qos_desc = qos_desc
def __repr__(self):
return "<QoS(%s,%s,%s,%s)>" % (self.qos_id, self.tenant_id,
self.qos_name, self.qos_desc)
class Credential(BASE, L2NetworkBase):
"""Represents credentials for a tenant."""
__tablename__ = 'credentials'
credential_id = Column(String(255))
tenant_id = Column(String(255), primary_key=True)
credential_name = Column(String(255), primary_key=True)
user_name = Column(String(255))
password = Column(String(255))
def __init__(self, tenant_id, credential_name, user_name, password):
self.credential_id = uuidutils.generate_uuid()
self.tenant_id = tenant_id
self.credential_name = credential_name
self.user_name = user_name
self.password = password
def __repr__(self):
return "<Credentials(%s,%s,%s,%s,%s)>" % (self.credential_id,
self.tenant_id,
self.credential_name,
self.user_name,
self.password)

View File

@ -1,108 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Brad Hall, Nicira Networks, Inc.
# @author: Dan Wendlandt, Nicira Networks, Inc.
from sqlalchemy import Column, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relation, object_mapper
from neutron.openstack.common import uuidutils
BASE = declarative_base()
class NeutronBase(object):
"""Base class for Neutron Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
self._i = iter(object_mapper(self).columns)
return self
def next(self):
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
class Port(BASE, NeutronBase):
"""Represents a port on a neutron network."""
__tablename__ = 'ports'
uuid = Column(String(255), primary_key=True)
network_id = Column(String(255), ForeignKey("networks.uuid"),
nullable=False)
interface_id = Column(String(255))
# Port state - Hardcoding string value at the moment
state = Column(String(8))
def __init__(self, network_id):
self.uuid = uuidutils.generate_uuid()
self.network_id = network_id
self.state = "DOWN"
def __repr__(self):
return "<Port(%s,%s,%s,%s)>" % (self.uuid, self.network_id,
self.state, self.interface_id)
class Network(BASE, NeutronBase):
"""Represents a neutron network."""
__tablename__ = 'networks'
uuid = Column(String(255), primary_key=True)
tenant_id = Column(String(255), nullable=False)
name = Column(String(255))
ports = relation(Port, order_by=Port.uuid, backref="network")
def __init__(self, tenant_id, name):
self.uuid = uuidutils.generate_uuid()
self.tenant_id = tenant_id
self.name = name
def __repr__(self):
return "<Network(%s,%s,%s)>" % (self.uuid, self.name, self.tenant_id)

View File

@ -23,7 +23,6 @@ from neutron.openstack.common import log as logging
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import network_models_v2
from neutron.plugins.cisco.db import nexus_models_v2 # noqa
from neutron.plugins.openvswitch import ovs_models_v2
@ -81,7 +80,6 @@ def release_vlanid(vlan_id):
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
return
def delete_vlanid(vlan_id):

View File

@ -17,58 +17,12 @@
# @author: Rohit Agarwalla, Cisco Systems, Inc.
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import object_mapper
from neutron.db import model_base
from neutron.db import models_v2 as models # noqa
from neutron.openstack.common import uuidutils
class L2NetworkBase(object):
"""Base class for L2Network Models."""
#__table_args__ = {'mysql_engine': 'InnoDB'}
def __setitem__(self, key, value):
"""Internal Dict set method."""
setattr(self, key, value)
def __getitem__(self, key):
"""Internal Dict get method."""
return getattr(self, key)
def get(self, key, default=None):
"""Dict get method."""
return getattr(self, key, default)
def __iter__(self):
"""Iterate over table columns."""
self._i = iter(object_mapper(self).columns)
return self
def next(self):
"""Next method for the iterator."""
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
class VlanID(model_base.BASEV2, L2NetworkBase):
class VlanID(model_base.BASEV2):
"""Represents a vlan_id usage."""
__tablename__ = 'cisco_vlan_ids'
@ -83,7 +37,7 @@ class VlanID(model_base.BASEV2, L2NetworkBase):
return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
class QoS(model_base.BASEV2, L2NetworkBase):
class QoS(model_base.BASEV2):
"""Represents QoS for a tenant."""
__tablename__ = 'qoss'
@ -104,7 +58,7 @@ class QoS(model_base.BASEV2, L2NetworkBase):
self.qos_name, self.qos_desc)
class Credential(model_base.BASEV2, L2NetworkBase):
class Credential(model_base.BASEV2):
"""Represents credentials for a tenant."""
__tablename__ = 'credentials'

View File

@ -18,10 +18,9 @@
from sqlalchemy import Column, Integer, String
from neutron.db import model_base
from neutron.plugins.cisco.db.l2network_models import L2NetworkBase
class NexusPortBinding(model_base.BASEV2, L2NetworkBase):
class NexusPortBinding(model_base.BASEV2):
"""Represents a binding of VM's to nexus ports."""
__tablename__ = "nexusport_bindings"