Add share's networks DB model, API and neutron support

Add share's networks DB models and API for managing
data about tenant's networks.
Add neutron network plug-in which will be responsible
for managing neutron network resources (creating
and deleting ports).

Partially implements bp: join-tenant-network

Change-Id: I20249f3640a10fc3ed55003a817a994efdb64681
This commit is contained in:
Aleks Chirko 2013-12-05 15:51:32 +02:00
parent c450c15f1c
commit d3131f40d2
14 changed files with 1475 additions and 95 deletions

View File

@ -73,8 +73,6 @@ IMPL = utils.LazyPluggable('db_backend',
###################
def service_destroy(context, service_id):
"""Destroy the service or raise if it does not exist."""
return IMPL.service_destroy(context, service_id)
@ -476,3 +474,61 @@ def share_metadata_delete(context, share_id, key):
def share_metadata_update(context, share, metadata, delete):
"""Update metadata if it exists, otherwise create it."""
IMPL.share_metadata_update(context, share, metadata, delete)
###################
def share_network_create(context, values):
"""Create a share network DB record."""
return IMPL.share_network_create(context, values)
def share_network_delete(context, id):
"""Delete a share network DB record."""
return IMPL.share_network_delete(context, id)
def share_network_update(context, id, values):
"""Update a share network DB record."""
return IMPL.share_network_update(context, id, values)
def share_network_get(context, id):
"""Get requested share network DB record."""
return IMPL.share_network_get(context, id)
def share_network_get_all(context):
"""Get all share network DB records."""
return IMPL.share_network_get_all(context)
def share_network_get_all_by_project(context, project_id):
"""Get all share network DB records for the given project."""
return IMPL.share_network_get_all_by_project(context, project_id)
def share_network_add_security_service(context, id, security_service_id):
return IMPL.share_network_add_security_service(context,
id,
security_service_id)
def share_network_remove_security_service(context, id, security_service_id):
return IMPL.share_network_remove_security_service(context,
id,
security_service_id)
def network_allocation_create(context, values):
"""Create a network allocation DB record."""
return IMPL.network_allocation_create(context, values)
def network_allocation_delete(context, id):
"""Delete a network allocation DB record."""
return IMPL.network_allocation_delete(context, id)
def network_allocation_update(context, id, values):
"""Update a network allocation DB record."""
return IMPL.network_allocation_update(context, id, values)

View File

@ -32,6 +32,7 @@ from sqlalchemy.orm import joinedload
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql import func
from manila.common import constants
from manila.common import sqlalchemyutils
from manila import db
from manila.db.sqlalchemy import models
@ -1480,3 +1481,170 @@ def _security_service_get_query(context, session=None):
if session is None:
session = get_session()
return model_query(context, models.SecurityService, session=session)
###################
def _network_get_query(context, session=None):
if session is None:
session = get_session()
return model_query(context, models.ShareNetwork, session=session).\
options(joinedload('shares'),
joinedload('network_allocations'),
joinedload('security_services'))
@require_context
def share_network_create(context, values):
if not values.get('id'):
values['id'] = uuidutils.generate_uuid()
network_ref = models.ShareNetwork()
network_ref.update(values)
session = get_session()
with session.begin():
network_ref.save(session=session)
return network_ref
@require_context
def share_network_delete(context, id):
session = get_session()
with session.begin():
network_ref = share_network_get(context, id, session=session)
network_ref.delete(session=session)
@require_context
def share_network_update(context, id, values):
session = get_session()
with session.begin():
network_ref = share_network_get(context, id, session=session)
network_ref.update(values)
network_ref.save(session=session)
return network_ref
@require_context
def share_network_get(context, id, session=None):
result = _network_get_query(context, session).filter_by(id=id).first()
if result is None:
raise exception.ShareNetworkNotFound(share_network_id=id)
return result
@require_context
def share_network_get_all(context):
return _network_get_query(context).all()
@require_context
def share_network_get_all_by_project(context, project_id):
return _network_get_query(context).filter_by(project_id=project_id).all()
@require_context
def share_network_add_security_service(context, id, security_service_id):
session = get_session()
with session.begin():
assoc_ref = model_query(
context,
models.ShareNetworkSecurityServiceAssociation,
session=session).\
filter_by(share_network_id=id).\
filter_by(security_service_id=security_service_id).first()
if assoc_ref:
msg = "Already associated"
raise exception.ShareNetworkSecurityServiceAssociationError(
share_network_id=id,
security_service_id=security_service_id,
reason=msg)
share_nw_ref = share_network_get(context, id, session=session)
if share_nw_ref['status'] == constants.STATUS_ACTIVE:
msg = "Share network is active"
raise exception.ShareNetworkSecurityServiceAssociationError(
share_network_id=id,
security_service_id=security_service_id,
reason=msg)
security_service_ref = security_service_get(context,
security_service_id,
session=session)
share_nw_ref.security_services += [security_service_ref]
share_nw_ref.save(session=session)
return share_nw_ref
@require_context
def share_network_remove_security_service(context, id, security_service_id):
session = get_session()
with session.begin():
share_nw_ref = share_network_get(context, id, session=session)
security_service_get(context, security_service_id, session=session)
assoc_ref = model_query(
context,
models.ShareNetworkSecurityServiceAssociation,
session=session).\
filter_by(share_network_id=id).\
filter_by(security_service_id=security_service_id).first()
if assoc_ref:
assoc_ref.delete(session=session)
else:
msg = "No association defined"
raise exception.ShareNetworkSecurityServiceDissociationError(
share_network_id=id,
security_service_id=security_service_id,
reason=msg)
return share_nw_ref
###################
@require_context
def network_allocation_create(context, values):
alloc_ref = models.NetworkAllocation()
alloc_ref.update(values)
session = get_session()
with session.begin():
alloc_ref.save(session=session)
return alloc_ref
@require_context
def network_allocation_delete(context, id):
session = get_session()
with session.begin():
alloc_ref = network_allocation_get(context, id, session=session)
alloc_ref.delete(session=session)
@require_context
def network_allocation_get(context, id, session=None):
if session is None:
session = get_session()
result = model_query(context, models.NetworkAllocation, session=session).\
filter_by(id=id).first()
if result is None:
raise exception.NotFound()
return result
@require_context
def network_allocation_update(context, id, values):
session = get_session()
with session.begin():
alloc_ref = network_allocation_get(context, id, session=session)
alloc_ref.update(values)
alloc_ref.save(session=session)
return alloc_ref

View File

@ -0,0 +1,127 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate import ForeignKeyConstraint
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer
from sqlalchemy import MetaData, String, Table, UniqueConstraint
from manila.openstack.common.gettextutils import _
from manila.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
shares = Table('shares', meta, autoload=True)
Table('security_services', meta, autoload=True)
share_networks = Table(
'share_networks', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean, default=False),
Column('id', String(length=36), primary_key=True,
nullable=False),
Column('project_id', String(length=36), nullable=False),
Column('neutron_net_id', String(length=36), nullable=True),
Column('neutron_subnet_id', String(length=36), nullable=True),
Column('network_type', String(length=32), nullable=True),
Column('segmentation_id', Integer, nullable=True),
Column('cidr', String(length=64), nullable=True),
Column('ip_version', Integer, nullable=True),
Column('name', String(length=255), nullable=True),
Column('description', String(length=255), nullable=True),
Column('status', String(length=32)),
UniqueConstraint('neutron_net_id', 'neutron_subnet_id', 'project_id',
name='net_subnet_uc'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
allocations = Table(
'network_allocations', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean, default=False),
Column('id', String(length=36), primary_key=True, nullable=False),
Column('ip_address', String(length=64), nullable=True),
Column('mac_address', String(length=32), nullable=True),
Column('share_network_id', String(length=36),
ForeignKey('share_networks.id'), nullable=False),
Column('status', String(length=32)),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
association = Table(
'share_network_security_service_association', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean, default=False),
Column('id', Integer, primary_key=True, nullable=False),
Column('share_network_id', String(length=36),
ForeignKey('share_networks.id'), nullable=False),
Column('security_service_id', String(length=36),
ForeignKey('security_services.id'), nullable=False),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
tables = [share_networks, allocations, association]
for table in tables:
try:
table.create()
except Exception:
LOG.exception(_("Exception while creating table"))
meta.drop_all(tables=tables)
raise
network_id = Column('share_network_id',
String(length=36),
ForeignKey('share_networks.id'),
nullable=True)
network_id.create(shares)
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
shares = Table('shares', meta, autoload=True)
share_networks = Table('share_networks', meta, autoload=True)
network_allocations = Table('network_allocations', meta, autoload=True)
association = Table('share_network_security_service_association',
meta,
autoload=True)
if migrate_engine.name == 'mysql':
fkeys = list(shares.c.share_network_id.foreign_keys)
params = {'columns': [shares.c['share_network_id']],
'refcolumns': [share_networks.c['id']],
'name': fkeys[0].constraint.name}
with migrate_engine.begin():
fkey = ForeignKeyConstraint(**params)
fkey.drop()
shares.c.share_network_id.drop()
network_allocations.drop()
association.drop()
share_networks.drop()

View File

@ -256,6 +256,8 @@ class Share(BASE, ManilaBase):
snapshot_id = Column(String(36))
share_proto = Column(String(255))
export_location = Column(String(255))
share_network_id = Column(String(36), ForeignKey('share_networks.id'),
nullable=True)
class ShareMetadata(BASE, ManilaBase):
@ -339,6 +341,72 @@ class SecurityService(BASE, ManilaBase):
default=constants.STATUS_NEW)
class ShareNetwork(BASE, ManilaBase):
"Represents network data used by share."
__tablename__ = 'share_networks'
id = Column(String(36), primary_key=True, nullable=False)
project_id = Column(String(36), nullable=False)
neutron_net_id = Column(String(36), nullable=True)
neutron_subnet_id = Column(String(36), nullable=True)
network_type = Column(String(32), nullable=True)
segmentation_id = Column(Integer, nullable=True)
cidr = Column(String(64), nullable=True)
ip_version = Column(Integer, nullable=True)
name = Column(String(255), nullable=True)
description = Column(String(255), nullable=True)
status = Column(Enum(constants.STATUS_INACTIVE, constants.STATUS_ACTIVE,
constants.STATUS_ERROR),
default=constants.STATUS_INACTIVE)
security_services = relationship("SecurityService",
secondary="share_network_security_service_association",
backref="share_networks",
primaryjoin='and_('
'ShareNetwork.id == '
'ShareNetworkSecurityServiceAssociation.share_network_id,'
'ShareNetworkSecurityServiceAssociation.deleted == False,'
'ShareNetwork.deleted == False)',
secondaryjoin='and_('
'SecurityService.id == '
'ShareNetworkSecurityServiceAssociation.security_service_id,'
'SecurityService.deleted == False)')
network_allocations = relationship("NetworkAllocation",
primaryjoin='and_('
'ShareNetwork.id == NetworkAllocation.share_network_id,'
'NetworkAllocation.deleted == False)')
shares = relationship("Share",
backref='share_network',
primaryjoin='and_('
'ShareNetwork.id == Share.share_network_id,'
'Share.deleted == False)')
class ShareNetworkSecurityServiceAssociation(BASE, ManilaBase):
"""" Association table between compute_zones and compute_nodes tables.
"""
__tablename__ = 'share_network_security_service_association'
id = Column(Integer, primary_key=True)
share_network_id = Column(String(36),
ForeignKey('share_networks.id'),
nullable=False)
security_service_id = Column(String(36),
ForeignKey('security_services.id'),
nullable=False)
class NetworkAllocation(BASE, ManilaBase):
"Represents network allocation data."
__tablename__ = 'network_allocations'
id = Column(String(36), primary_key=True, nullable=False)
ip_address = Column(String(64), nullable=True)
mac_address = Column(String(32), nullable=True)
share_network_id = Column(String(36), ForeignKey('share_networks.id'),
nullable=False)
status = Column(Enum(constants.STATUS_NEW, constants.STATUS_ACTIVE,
constants.STATUS_ERROR),
default=constants.STATUS_NEW)
def register_models():
"""Register Models and create metadata.

View File

@ -139,6 +139,14 @@ class NetworkException(ManilaException):
message = _("Exception due to network failure")
class NetworkAllocationException(NetworkException):
message = _("Failure during network allocation")
class NetworkBadConfigurationException(NetworkException):
message = _("Bad network configuration: %(reason)s")
class GlanceConnectionFailed(ManilaException):
message = _("Connection to glance failed") + ": %(reason)s"
@ -214,6 +222,14 @@ class NotFound(ManilaException):
safe = True
class InUse(ManilaException):
message = _("Resource is in use.")
class ShareNetworkNotFound(NotFound):
message = _("Network %(share_network_id)s could not be found.")
class InvalidImageRef(Invalid):
message = _("Invalid image href %(image_href)s.")
@ -495,3 +511,13 @@ class InvalidShareMetadataSize(Invalid):
class SecurityServiceNotFound(NotFound):
message = _("Security service %(security_service_id)s could not be found.")
class ShareNetworkSecurityServiceAssociationError(ManilaException):
message = _("Failed to associate share network %(share_network_id)s"
" and security service %(security_service_id)s: %(reason)s.")
class ShareNetworkSecurityServiceDissociationError(ManilaException):
message = _("Failed to dissociate share network %(share_network_id)s"
" and security service %(security_service_id)s: %(reason)s.")

View File

@ -14,15 +14,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import manila.openstack.common.importutils
from oslo.config import cfg
network_opts = [
cfg.StrOpt('network_api_class',
default='manila.network.neutron.api.API',
help='The full class name of the '
'network API class to use'),
default=
'manila.network.neutron.neutron_network_plugin.NeutronNetworkPlugin',
help='The full class name of the network API class to use'),
]
cfg.CONF.register_opts(network_opts)
@ -33,3 +34,14 @@ def API():
network_api_class = cfg.CONF.network_api_class
cls = importutils.import_class(network_api_class)
return cls()
class NetworkBaseAPI(object):
@abc.abstractmethod
def allocate_network(self, context, share_network, **kwargs):
pass
@abc.abstractmethod
def deallocate_network(self, context, share_network):
pass

View File

@ -73,7 +73,6 @@ neutron_opts = [
'neutron client requests.'),
]
PORTBINDING_EXT = 'Port Binding'
CONF = cfg.CONF
CONF.register_opts(neutron_opts)
LOG = logging.getLogger(__name__)
@ -118,7 +117,7 @@ class API(base.Base):
port_req_body['port']['device_owner'] = device_owner
if device_id:
port_req_body['port']['device_id'] = device_id
port = self.client.create_port(port_req_body)
port = self.client.create_port(port_req_body).get('port', {})
return port
except neutron_client_exc.NeutronClientException as e:
LOG.exception(_('Neutron error creating port on network %s') %
@ -160,9 +159,19 @@ class API(base.Base):
raise exception.NetworkException(code=e.status_code,
message=e.message)
def get_subnet(self, subnet_uuid):
"""Get specific subnet for client."""
try:
return self.client.show_subnet(subnet_uuid).get('subnet', {})
except neutron_client_exc.NeutronClientException as e:
raise exception.NetworkException(code=e.status_code,
message=e.message)
def list_extensions(self):
extensions_list = self.client.list_extensions().get('extensions')
return dict((ext['name'], ext) for ext in extensions_list)
def _has_port_binding_extension(self):
if not self.extensions:
extensions_list = self.client.list_extensions()['extensions']
self.extensions = dict((ext['name'], ext)
for ext in extensions_list)
return PORTBINDING_EXT in self.extensions
self.extensions = self.list_extensions()
return neutron.constants.PORTBINDING_EXT in self.extensions

View File

@ -0,0 +1,17 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
PROVIDER_NW_EXT = 'Provider Network'
PORTBINDING_EXT = 'Port Binding'

View File

@ -0,0 +1,142 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manila.common import constants
from manila.db import base as db_base
from manila import exception
from manila import network as manila_network
from manila.network.neutron import api as neutron_api
from manila.network.neutron import constants as neutron_constants
class NeutronNetworkPlugin(manila_network.NetworkBaseAPI, db_base.Base):
def __init__(self):
super(NeutronNetworkPlugin, self).__init__()
self.neutron_api = neutron_api.API()
def allocate_network(self, context, share_network, **kwargs):
"""Allocate network resources using given network information: create
neutron ports for a given neutron network and subnet, create manila db
records for allocated neutron ports.
:param context: RequestContext object
:param share_network: share network data
:param kwargs: allocations parameters given by the back-end
driver. Supported params:
'count' - how many allocations should be created
'device_owner' - set owner for network allocations
:rtype: list of :class: 'dict'
"""
if not self._has_provider_network_extension():
msg = "%s extension required" % neutron_constants.PROVIDER_NW_EXT
raise exception.NetworkBadConfigurationException(reason=msg)
self._save_neutron_network_data(context, share_network)
self._save_neutron_subnet_data(context, share_network)
allocation_count = kwargs.get('count', 1)
device_owner = kwargs.get('device_owner', 'share')
for _ in range(0, allocation_count):
self._create_port(context, share_network, device_owner)
return self.db.share_network_update(
context,
share_network['id'],
{'status': constants.STATUS_ACTIVE})
def deallocate_network(self, context, share_network):
"""Deallocate neutron network resources for the given network info:
delete previously allocated neutron ports, delete manila db records for
deleted ports.
:param context: RequestContext object
:param share_network: share network data
:rtype: None
"""
ports = share_network['network_allocations']
for port in ports:
self._delete_port(context, port)
self.db.share_network_update(context,
share_network['id'],
{'status': constants.STATUS_INACTIVE})
def _create_port(self, context, share_network, device_owner):
try:
port = self.neutron_api.create_port(
share_network['project_id'],
network_id=share_network['neutron_net_id'],
subnet_id=share_network['neutron_subnet_id'],
device_owner='manila:' + device_owner)
except exception.NetworkException:
self.db.share_network_update(context,
share_network['id'],
{'status': constants.STATUS_ERROR})
raise
else:
port_dict = {'id': port['id'],
'share_network_id': share_network['id'],
'ip_address': port['fixed_ips'][0]['ip_address'],
'mac_address': port['mac_address'],
'status': constants.STATUS_ACTIVE}
self.db.network_allocation_create(context, port_dict)
return port_dict
def _delete_port(self, context, port):
try:
self.neutron_api.delete_port(port['id'])
except exception.NetworkException:
self.db.network_allocation_update(context,
port['id'],
{'status': constants.STATUS_ERROR})
self.db.share_network_update(context,
port['share_network_id'],
{'status': constants.STATUS_ERROR})
raise
else:
self.db.network_allocation_delete(context, port['id'])
def _has_provider_network_extension(self):
extentions = self.neutron_api.list_extensions()
return neutron_constants.PROVIDER_NW_EXT in extentions
def _save_neutron_network_data(self, context, share_network):
net_info = self.neutron_api.get_network(
share_network['neutron_net_id'])
provider_nw_dict = {
'network_type': net_info['provider:network_type'],
'segmentation_id': net_info['provider:segmentation_id']
}
self.db.share_network_update(context,
share_network['id'],
provider_nw_dict)
def _save_neutron_subnet_data(self, context, share_network):
subnet_info = self.neutron_api.get_subnet(
share_network['neutron_subnet_id'])
subnet_values = {
'cidr': subnet_info['cidr'],
'ip_version': subnet_info['ip_version']
}
self.db.share_network_update(context,
share_network['id'],
subnet_values)

View File

View File

@ -15,8 +15,7 @@
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
from mock import Mock
from mock import patch
import mock
from neutronclient.common import exceptions as neutron_client_exc
from neutronclient.v2_0 import client as clientv20
from oslo.config import cfg
@ -27,6 +26,7 @@ from manila.db import base
from manila import exception
from manila.network import neutron
from manila.network.neutron import api as neutron_api
from manila.network.neutron import constants as neutron_constants
from manila.tests.db import fakes
CONF = cfg.CONF
@ -52,6 +52,9 @@ class FakeNeutronClient(object):
def show_network(self, network_uuid):
pass
def show_subnet(self, subnet_uuid):
pass
def list_extensions(self):
pass
@ -62,20 +65,20 @@ class NeutronApiTest(unittest.TestCase):
super(NeutronApiTest, self).setUp()
self._create_neutron_api()
@patch.object(base, 'Base', fakes.FakeModel)
@patch.object(context, 'get_admin_context',
Mock(return_value='context'))
@patch.object(neutron, 'get_client',
Mock(return_value=FakeNeutronClient()))
@mock.patch.object(base, 'Base', fakes.FakeModel)
@mock.patch.object(context, 'get_admin_context',
mock.Mock(return_value='context'))
@mock.patch.object(neutron, 'get_client',
mock.Mock(return_value=FakeNeutronClient()))
def _create_neutron_api(self):
self.neutron_api = neutron_api.API()
@patch.object(base, 'Base', fakes.FakeModel)
@patch.object(context, 'get_admin_context',
Mock(return_value='context'))
@patch.object(neutron, 'get_client', Mock())
@mock.patch.object(base, 'Base', fakes.FakeModel)
@mock.patch.object(context, 'get_admin_context',
mock.Mock(return_value='context'))
@mock.patch.object(neutron, 'get_client', mock.Mock())
def test_create_api_object(self):
with patch.object(base.Base, '__init__', Mock()):
with mock.patch.object(base.Base, '__init__', mock.Mock()):
neutron_api_obj = neutron_api.API()
base.Base.__init__.assert_called_once()
neutron.get_client.assert_called_once_with('context')
@ -88,77 +91,75 @@ class NeutronApiTest(unittest.TestCase):
'security_group_ids': 'test group',
'dhcp_opts': 'test dhcp'}
with patch.object(self.neutron_api, '_has_port_binding_extension',
Mock(return_value=True)):
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
port = self.neutron_api.create_port(**port_args)
self.assertEqual(port['port']['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['port']['network_id'],
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['network_id'],
port_args['network_id'])
self.assertEqual(port['port']['binding:host_id'],
self.assertEqual(port['binding:host_id'],
port_args['host_id'])
self.assertEqual(port['port']['fixed_ips'][0]['subnet_id'],
self.assertEqual(port['fixed_ips'][0]['subnet_id'],
port_args['subnet_id'])
self.assertEqual(port['port']['fixed_ips'][0]['ip_address'],
self.assertEqual(port['fixed_ips'][0]['ip_address'],
port_args['fixed_ip'])
self.assertEqual(port['port']['device_owner'],
self.assertEqual(port['device_owner'],
port_args['device_owner'])
self.assertEqual(port['port']['device_id'], port_args['device_id'])
self.assertEqual(port['port']['mac_address'],
self.assertEqual(port['device_id'], port_args['device_id'])
self.assertEqual(port['mac_address'],
port_args['mac_address'])
self.assertEqual(port['port']['security_groups'],
self.assertEqual(port['security_groups'],
port_args['security_group_ids'])
self.assertEqual(port['port']['extra_dhcp_opts'],
self.assertEqual(port['extra_dhcp_opts'],
port_args['dhcp_opts'])
def test_create_port_with_required_args(self):
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
with patch.object(self.neutron_api, '_has_port_binding_extension',
Mock(return_value=True)):
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
port = self.neutron_api.create_port(**port_args)
self.assertEqual(port['port']['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['port']['network_id'],
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['network_id'],
port_args['network_id'])
@patch.object(neutron_api.LOG, 'exception', Mock())
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
def test_create_port_exception(self):
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
client_create_port_mock = Mock(side_effect=
client_create_port_mock = mock.Mock(side_effect=
neutron_client_exc.NeutronClientException)
with patch.object(self.neutron_api, '_has_port_binding_extension',
Mock(return_value=True)):
with patch.object(self.neutron_api.client, 'create_port',
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.create_port,
**port_args)
client_create_port_mock.assert_called_once()
neutron_api.LOG.exception.assert_called_once()
@patch.object(neutron_api.LOG, 'exception', Mock())
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
def test_create_port_exception_status_409(self):
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
client_create_port_mock = Mock(side_effect=
client_create_port_mock = mock.Mock(side_effect=
neutron_client_exc.NeutronClientException(status_code=409))
with patch.object(self.neutron_api, '_has_port_binding_extension',
Mock(return_value=True)):
with patch.object(self.neutron_api.client, 'create_port',
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
self.assertRaises(exception.PortLimitExceeded,
self.neutron_api.create_port,
**port_args)
client_create_port_mock.assert_called_once()
neutron_api.LOG.exception.assert_called_once()
def test_delete_port(self):
port_id = 'test port id'
with patch.object(self.neutron_api.client, 'delete_port',
Mock()) as client_delete_port_mock:
with mock.patch.object(self.neutron_api.client, 'delete_port',
mock.Mock()) as client_delete_port_mock:
self.neutron_api.delete_port(port_id)
client_delete_port_mock.assert_called_once_with(port_id)
@ -166,9 +167,9 @@ class NeutronApiTest(unittest.TestCase):
def test_list_ports(self):
search_opts = {'test_option': 'test_value'}
fake_ports = [{'fake port': 'fake port info'}]
client_list_ports_mock = Mock(return_value={'ports': fake_ports})
client_list_ports_mock = mock.Mock(return_value={'ports': fake_ports})
with patch.object(self.neutron_api.client, 'list_ports',
with mock.patch.object(self.neutron_api.client, 'list_ports',
client_list_ports_mock):
ports = self.neutron_api.list_ports(**search_opts)
@ -178,9 +179,9 @@ class NeutronApiTest(unittest.TestCase):
def test_show_port(self):
port_id = 'test port id'
fake_port = {'fake port': 'fake port info'}
client_show_port_mock = Mock(return_value={'port': fake_port})
client_show_port_mock = mock.Mock(return_value={'port': fake_port})
with patch.object(self.neutron_api.client, 'show_port',
with mock.patch.object(self.neutron_api.client, 'show_port',
client_show_port_mock):
port = self.neutron_api.show_port(port_id)
@ -190,64 +191,62 @@ class NeutronApiTest(unittest.TestCase):
def test_get_network(self):
network_id = 'test network id'
fake_network = {'fake network': 'fake network info'}
client_show_network_mock = Mock(return_value={'network': fake_network})
client_show_network_mock = mock.Mock(
return_value={'network': fake_network})
with patch.object(self.neutron_api.client, 'show_network',
with mock.patch.object(self.neutron_api.client, 'show_network',
client_show_network_mock):
network = self.neutron_api.get_network(network_id)
client_show_network_mock.assert_called_once_with(network_id)
self.assertEqual(network, fake_network)
def test_get_subnet(self):
subnet_id = 'fake subnet id'
with mock.patch.object(self.neutron_api.client, 'show_subnet',
mock.Mock(return_value={'subnet': {}})):
subnet = self.neutron_api.get_subnet(subnet_id)
self.neutron_api.client.show_subnet.assert_called_once_with(
subnet_id)
self.assertEqual(subnet, {})
def test_get_all_network(self):
fake_networks = [{'fake network': 'fake network info'}]
client_list_networks_mock = Mock(
client_list_networks_mock = mock.Mock(
return_value={'networks': fake_networks})
with patch.object(self.neutron_api.client, 'list_networks',
with mock.patch.object(self.neutron_api.client, 'list_networks',
client_list_networks_mock):
networks = self.neutron_api.get_all_networks()
client_list_networks_mock.assert_called_once()
client_list_networks_mock.assert_any_call()
self.assertEqual(networks, fake_networks)
def test_has_port_binding_extension_01(self):
fake_extensions = [{'name': neutron_api.PORTBINDING_EXT}]
client_list_ext_mock = Mock(
return_value={'extensions': fake_extensions})
def test_list_extensions(self):
extensions = [{'name': neutron_constants.PORTBINDING_EXT},
{'name': neutron_constants.PROVIDER_NW_EXT}]
with patch.object(self.neutron_api.client, 'list_extensions',
client_list_ext_mock):
result = self.neutron_api._has_port_binding_extension()
client_list_ext_mock.assert_called_once()
self.assertTrue(result)
with mock.patch.object(
self.neutron_api.client,
'list_extensions',
mock.Mock(return_value={'extensions': extensions})):
def test_has_port_binding_extension_02(self):
client_list_ext_mock = Mock()
self.neutron_api.extensions =\
{neutron_api.PORTBINDING_EXT: 'extension description'}
with patch.object(self.neutron_api.client, 'list_extensions',
client_list_ext_mock):
result = self.neutron_api._has_port_binding_extension()
client_list_ext_mock.assert_not_called_once()
self.assertTrue(result)
def test_has_port_binding_extension_03(self):
client_list_ext_mock = Mock()
self.neutron_api.extensions =\
{'neutron extension X': 'extension description'}
with patch.object(self.neutron_api.client, 'list_extensions',
client_list_ext_mock):
result = self.neutron_api._has_port_binding_extension()
client_list_ext_mock.assert_not_called_once()
self.assertFalse(result)
result = self.neutron_api.list_extensions()
self.neutron_api.client.list_extensions.assert_any_call()
self.assertTrue(neutron_constants.PORTBINDING_EXT in result)
self.assertTrue(neutron_constants.PROVIDER_NW_EXT in result)
self.assertEqual(result[neutron_constants.PORTBINDING_EXT],
extensions[0])
self.assertEqual(result[neutron_constants.PROVIDER_NW_EXT],
extensions[1])
class TestNeutronClient(unittest.TestCase):
@patch.object(clientv20.Client, '__init__', Mock(return_value=None))
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_with_token(self):
client_args = {'endpoint_url': CONF.neutron_url,
'timeout': CONF.neutron_url_timeout,
@ -262,7 +261,8 @@ class TestNeutronClient(unittest.TestCase):
neutron.get_client(my_context)
clientv20.Client.__init__.assert_called_once_with(**client_args)
@patch.object(clientv20.Client, '__init__', Mock(return_value=None))
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_no_token(self):
my_context = context.RequestContext('test_user', 'test_tenant',
is_admin=False)
@ -271,7 +271,8 @@ class TestNeutronClient(unittest.TestCase):
neutron.get_client,
my_context)
@patch.object(clientv20.Client, '__init__', Mock(return_value=None))
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_admin_context(self):
client_args = {'endpoint_url': CONF.neutron_url,
'timeout': CONF.neutron_url_timeout,

View File

@ -0,0 +1,288 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from manila.common import constants
from manila import context
from manila.db import api as db_api
from manila import exception
from manila.network.neutron import constants as neutron_constants
from manila.network.neutron import neutron_network_plugin as plugin
fake_neutron_port = {
"status": "test_port_status",
"allowed_address_pairs": [],
"admin_state_up": True,
"network_id": "test_net_id",
"tenant_id": "fake_tenant_id",
"extra_dhcp_opts": [],
"device_owner": "test",
"binding:capabilities": {"port_filter": True},
"mac_address": "test_mac",
"fixed_ips": [
{"subnet_id": "test_subnet_id",
"ip_address": "test_ip"}
],
"id": "test_port_id",
"security_groups": ["fake_sec_group_id"],
"device_id": "fake_device_id"
}
fake_share_network = {'id': 'fake nw info id',
'neutron_subnet_id': 'fake subnet id',
'neutron_net_id': 'fake net id',
'project_id': 'fake project id',
'status': 'test_subnet_status',
'name': 'fake name',
'description': 'fake description',
'network_allocations': [],
'security_services': [],
'shares': []}
fake_network_allocation = \
{'id': fake_neutron_port['id'],
'share_network_id': fake_share_network['id'],
'ip_address': fake_neutron_port['fixed_ips'][0]['ip_address'],
'mac_address': fake_neutron_port['mac_address'],
'status': constants.STATUS_ACTIVE}
class NeutronNetworkPluginTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(NeutronNetworkPluginTest, self).__init__(*args, **kwargs)
self.plugin = plugin.NeutronNetworkPlugin()
self.plugin.db = db_api
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
@mock.patch.object(db_api, 'network_allocation_create',
mock.Mock(return_values=fake_network_allocation))
@mock.patch.object(db_api, 'share_network_update',
mock.Mock(return_value=fake_share_network))
def test_allocate_network_one_allocation(self):
has_provider_nw_ext = mock.patch.object(self.plugin,
'_has_provider_network_extension').start()
has_provider_nw_ext.return_value = True
save_nw_data = mock.patch.object(self.plugin,
'_save_neutron_network_data').start()
save_subnet_data = mock.patch.object(
self.plugin,
'_save_neutron_subnet_data').start()
with mock.patch.object(self.plugin.neutron_api, 'create_port',
mock.Mock(return_value=fake_neutron_port)):
self.plugin.allocate_network(
self.fake_context,
fake_share_network,
allocation_info={'count': 1})
has_provider_nw_ext.assert_any_call()
save_nw_data.assert_called_once_with(self.fake_context,
fake_share_network)
save_subnet_data.assert_called_once_with(self.fake_context,
fake_share_network)
self.plugin.neutron_api.create_port.assert_called_once_with(
fake_share_network['project_id'],
network_id=fake_share_network['neutron_net_id'],
subnet_id=fake_share_network['neutron_subnet_id'],
device_owner='manila:share')
db_api.network_allocation_create.assert_called_once_with(
self.fake_context,
fake_network_allocation)
db_api.share_network_update.assert_called_once_with(
self.fake_context,
fake_share_network['id'],
{'status': constants.STATUS_ACTIVE})
has_provider_nw_ext.stop()
save_nw_data.stop()
save_subnet_data.stop()
@mock.patch.object(db_api, 'network_allocation_create',
mock.Mock(return_values=fake_network_allocation))
@mock.patch.object(db_api, 'share_network_update',
mock.Mock(return_value=fake_share_network))
def test_allocate_network_two_allocation(self):
has_provider_nw_ext = mock.patch.object(self.plugin,
'_has_provider_network_extension').start()
has_provider_nw_ext.return_value = True
save_nw_data = mock.patch.object(self.plugin,
'_save_neutron_network_data').start()
save_subnet_data = mock.patch.object(
self.plugin,
'_save_neutron_subnet_data').start()
with mock.patch.object(self.plugin.neutron_api, 'create_port',
mock.Mock(return_value=fake_neutron_port)):
self.plugin.allocate_network(
self.fake_context,
fake_share_network,
count=2)
neutron_api_calls = [
mock.call(fake_share_network['project_id'],
network_id=fake_share_network['neutron_net_id'],
subnet_id=fake_share_network['neutron_subnet_id'],
device_owner='manila:share'),
mock.call(fake_share_network['project_id'],
network_id=fake_share_network['neutron_net_id'],
subnet_id=fake_share_network['neutron_subnet_id'],
device_owner='manila:share'),
]
db_api_calls = [
mock.call(self.fake_context, fake_network_allocation),
mock.call(self.fake_context, fake_network_allocation)
]
self.plugin.neutron_api.create_port.assert_has_calls(
neutron_api_calls)
db_api.network_allocation_create.assert_has_calls(db_api_calls)
db_api.share_network_update.assert_called_once_with(
self.fake_context,
fake_share_network['id'],
{'status': constants.STATUS_ACTIVE})
has_provider_nw_ext.stop()
save_nw_data.stop()
save_subnet_data.stop()
@mock.patch.object(db_api, 'share_network_update', mock.Mock())
def test_allocate_network_create_port_exception(self):
has_provider_nw_ext = mock.patch.object(self.plugin,
'_has_provider_network_extension').start()
has_provider_nw_ext.return_value = True
save_nw_data = mock.patch.object(self.plugin,
'_save_neutron_network_data').start()
save_subnet_data = mock.patch.object(
self.plugin,
'_save_neutron_subnet_data').start()
create_port = mock.patch.object(self.plugin.neutron_api,
'create_port').start()
create_port.side_effect = exception.NetworkException
self.assertRaises(exception.NetworkException,
self.plugin.allocate_network,
self.fake_context,
fake_share_network)
db_api.share_network_update.assert_called_once_with(
self.fake_context,
fake_share_network['id'],
{'status': constants.STATUS_ERROR})
has_provider_nw_ext.stop()
save_nw_data.stop()
save_subnet_data.stop()
create_port.stop()
@mock.patch.object(db_api, 'network_allocation_delete', mock.Mock())
@mock.patch.object(db_api, 'share_network_update', mock.Mock())
def test_deallocate_network_nominal(self):
share_nw = {'id': fake_share_network['id']}
share_nw['network_allocations'] = [fake_network_allocation]
with mock.patch.object(self.plugin.neutron_api, 'delete_port',
mock.Mock()):
self.plugin.deallocate_network(self.fake_context, share_nw)
self.plugin.neutron_api.delete_port.assert_called_once_with(
fake_network_allocation['id'])
db_api.network_allocation_delete.assert_called_once_with(
self.fake_context,
fake_network_allocation['id'])
@mock.patch.object(db_api, 'share_network_update',
mock.Mock(return_value=fake_share_network))
@mock.patch.object(db_api, 'network_allocation_update', mock.Mock())
def test_deallocate_network_neutron_api_exception(self):
share_nw = {'id': fake_share_network['id']}
share_nw['network_allocations'] = [fake_network_allocation]
delete_port = mock.patch.object(self.plugin.neutron_api,
'delete_port').start()
delete_port.side_effect = exception.NetworkException
self.assertRaises(exception.NetworkException,
self.plugin.deallocate_network,
self.fake_context,
share_nw)
db_api.network_allocation_update.assert_called_once_with(
self.fake_context,
fake_network_allocation['id'],
{'status': constants.STATUS_ERROR})
db_api.share_network_update.assert_called_once_with(
self.fake_context,
share_nw['id'],
{'status': constants.STATUS_ERROR})
delete_port.stop()
@mock.patch.object(db_api, 'share_network_update', mock.Mock())
def test_save_neutron_network_data(self):
neutron_nw_info = {'provider:network_type': 'vlan',
'provider:segmentation_id': 1000}
share_nw_update_dict = {'network_type': 'vlan',
'segmentation_id': 1000}
with mock.patch.object(self.plugin.neutron_api,
'get_network',
mock.Mock(return_value=neutron_nw_info)):
self.plugin._save_neutron_network_data(self.fake_context,
fake_share_network)
self.plugin.neutron_api.get_network.assert_called_once_with(
fake_share_network['neutron_net_id'])
self.plugin.db.share_network_update.assert_called_once_with(
self.fake_context,
fake_share_network['id'],
share_nw_update_dict)
@mock.patch.object(db_api, 'share_network_update', mock.Mock())
def test_save_neutron_subnet_data(self):
neutron_subnet_info = {'cidr': '10.0.0.0/24',
'ip_version': 4}
with mock.patch.object(self.plugin.neutron_api,
'get_subnet',
mock.Mock(return_value=neutron_subnet_info)):
self.plugin._save_neutron_subnet_data(self.fake_context,
fake_share_network)
self.plugin.neutron_api.get_subnet.assert_called_once_with(
fake_share_network['neutron_subnet_id'])
self.plugin.db.share_network_update.assert_called_once_with(
self.fake_context,
fake_share_network['id'],
neutron_subnet_info)
def test_has_network_provider_extension_true(self):
extensions = {neutron_constants.PROVIDER_NW_EXT: {}}
with mock.patch.object(self.plugin.neutron_api,
'list_extensions',
mock.Mock(return_value=extensions)):
result = self.plugin._has_provider_network_extension()
self.plugin.neutron_api.list_extensions.assert_any_call()
self.assertTrue(result)
def test_has_network_provider_extension_false(self):
with mock.patch.object(self.plugin.neutron_api,
'list_extensions',
mock.Mock(return_value={})):
result = self.plugin._has_provider_network_extension()
self.plugin.neutron_api.list_extensions.assert_any_call()
self.assertFalse(result)

View File

@ -0,0 +1,450 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manila.common import constants
from manila import context
from manila.db import api as db_api
from manila.db.sqlalchemy import api as sqlalchemy_api
from manila.db.sqlalchemy import models
from manila import exception
from manila import test
class ShareNetworkDBTest(test.TestCase):
def __init__(self, *args, **kwargs):
super(ShareNetworkDBTest, self).__init__(*args, **kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
def _check_fields(self, expected, actual):
for key in expected:
self.assertEqual(actual[key], expected[key])
def setUp(self):
super(ShareNetworkDBTest, self).setUp()
self.share_nw_dict = {'id': 'fake network id',
'neutron_net_id': 'fake net id',
'neutron_subnet_id': 'fake subnet id',
'project_id': self.fake_context.project_id,
'network_type': 'vlan',
'segmentation_id': 1000,
'cidr': '10.0.0.0/24',
'ip_version': 4,
'name': 'whatever',
'description': 'fake description',
'status': constants.STATUS_INACTIVE}
self.allocation_dict = {'id': 'fake port id',
'share_network_id': self.share_nw_dict['id'],
'ip_address': 'fake ip address',
'mac_address': 'fake mac address',
'status': constants.STATUS_ACTIVE}
def test_create_one_network(self):
result = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
self.assertEqual(len(result['network_allocations']), 0)
def test_create_two_networks(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] = None
share_nw_dict2['project_id'] = 'fake project 2'
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
self._check_fields(expected=share_nw_dict2, actual=result2)
def test_create_same_project_netid_and_subnetid(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] = None
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.DBError,
db_api.share_network_create,
self.fake_context,
share_nw_dict2)
def test_get(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
self.assertEqual(len(result['network_allocations']), 0)
def test_get_with_one_share(self):
share_dict1 = {'id': 'fake share id1',
'share_network_id': self.share_nw_dict['id']}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict1)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 1)
self._check_fields(expected=share_dict1,
actual=result['shares'][0])
def test_get_with_two_shares(self):
share_dict1 = {'id': 'fake share id1',
'share_network_id': self.share_nw_dict['id']}
share_dict2 = {'id': 'fake share id2',
'share_network_id': self.share_nw_dict['id']}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict1)
db_api.share_create(self.fake_context, share_dict2)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 2)
def test_get_with_one_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 1)
self._check_fields(expected=security_dict1,
actual=result['security_services'][0])
def test_get_with_two_security_services(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
security_dict2 = {'id': 'fake security service id2',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.security_service_create(self.fake_context, security_dict2)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict2['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 2)
def test_get_with_one_allocation(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.network_allocation_create(self.fake_context,
self.allocation_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['network_allocations']), 1)
self._check_fields(expected=self.allocation_dict,
actual=result['network_allocations'][0])
def test_get_with_two_allocations(self):
allocation_dict2 = dict(self.allocation_dict)
allocation_dict2['id'] = 'fake port id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.network_allocation_create(self.fake_context,
self.allocation_dict)
db_api.network_allocation_create(self.fake_context, allocation_dict2)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['network_allocations']), 2)
def test_get_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
'fake id')
def test_delete(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_delete(self.fake_context,
self.share_nw_dict['id'])
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
self.share_nw_dict['id'])
def test_delete_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_delete,
self.fake_context,
'fake id')
def test_update(self):
new_status = constants.STATUS_ERROR
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result_update = db_api.share_network_update(self.fake_context,
self.share_nw_dict['id'],
{'status': new_status})
result_get = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(result_update['status'], new_status)
self._check_fields(expected=dict(result_update.iteritems()),
actual=dict(result_get.iteritems()))
def test_update_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_update,
self.fake_context,
'fake id',
{})
def test_get_all_one_record(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result = db_api.share_network_get_all(self.fake_context)
self.assertEqual(len(result), 1)
self._check_fields(expected=self.share_nw_dict, actual=result[0])
def test_get_all_two_records(self):
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake subnet id2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all(self.fake_context)
self.assertEqual(len(result), 2)
def test_get_all_by_project(self):
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake share nw id2'
share_nw_dict2['project_id'] = 'fake project 2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
share_nw_dict2['project_id'])
self.assertEqual(len(result), 1)
self._check_fields(expected=share_nw_dict2, actual=result[0])
def test_add_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is not None)
def test_add_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_add_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_add_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_add_security_service_association_error_already_associated(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
self.assertRaises(
exception.ShareNetworkSecurityServiceAssociationError,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_add_security_service_association_error_status_active(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_update(self.fake_context,
self.share_nw_dict['id'],
{'status': constants.STATUS_ACTIVE})
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(
exception.ShareNetworkSecurityServiceAssociationError,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
assoc_ref = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(assoc_ref is None)
def test_remove_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
db_api.share_network_remove_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is None)
share_nw_ref = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(share_nw_ref['security_services']), 0)
def test_remove_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_remove_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_remove_security_service_dissociation_error(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(
exception.ShareNetworkSecurityServiceDissociationError,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_security_services_relation(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 0)
def test_shares_relation(self):
share_dict = {'id': 'fake share id1'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 0)
def test_network_allocations_relation(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.network_allocation_create(self.fake_context,
self.allocation_dict)
db_api.network_allocation_delete(self.fake_context,
self.allocation_dict['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['network_allocations']), 0)

View File

@ -33,6 +33,8 @@ import re
import shlex
import shutil
import signal
import socket
import struct
import sys
import tempfile
import time
@ -1209,3 +1211,17 @@ def to_bytes(text, default=0):
raise TypeError(msg)
except ValueError:
return default
def cidr_to_netmask(cidr):
"""
Convert cidr notation to the netmask string
:param cidr: integer which represents cidr notation
:rtype: string
"""
cidr = int(cidr)
bits = 0
for i in xrange(32 - cidr, 32):
bits |= (1 << i)
return socket.inet_ntoa(struct.pack('>I', bits))