proxy ptg extension

Driver extension that allows users to specify a 'proxied_group_id'
during PTG creation. Whenever a PTG proxies another group, it is
expected to intercept all the traffic before sending it to the
original destination. Proxy groups can modify the traffic if needed.
This is a useful construct for the Traffic Stitching Plumber.

Partially implements blueprint node-centric-chain-plugin

Change-Id: Idc185df5b0c7e61ef800ca449911656a8c1d2b87
This commit is contained in:
Ivar Lazzaro 2015-07-31 19:28:06 -07:00
parent 3becb34638
commit 090f94a381
14 changed files with 781 additions and 324 deletions

View File

@ -30,6 +30,15 @@ def clean_session(session):
session.expunge_all()
def get_resource_plural(resource):
if resource.endswith('y'):
resource_plural = resource.replace('y', 'ies')
else:
resource_plural = resource + 's'
return resource_plural
def load_plugin(namespace, plugin):
try:
# Try to resolve plugin by name

View File

@ -0,0 +1,39 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import model_base
import sqlalchemy as sa
class GroupProxyMapping(model_base.BASEV2):
__tablename__ = 'gp_group_proxy_mappings'
policy_target_group_id = sa.Column(
sa.String(36), sa.ForeignKey('gp_policy_target_groups.id',
ondelete="CASCADE"), primary_key=True)
# A group can only be proxied by one single group
proxied_group_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_target_groups.id'))
# A group can only proxy one single other group
# REVISIT(ivar): Can a backref be put here instead?
proxy_group_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_target_groups.id',
ondelete="SET NULL"))
proxy_type = sa.Column(sa.String(24))
class ProxyGatewayMapping(model_base.BASEV2):
__tablename__ = 'gp_proxy_gateway_mappings'
policy_target_id = sa.Column(
sa.String(36), sa.ForeignKey('gp_policy_targets.id',
ondelete="CASCADE"), primary_key=True)
proxy_gateway = sa.Column(sa.Boolean, nullable=False)
group_default_gateway = sa.Column(sa.Boolean, nullable=False)

View File

@ -1 +1 @@
c2a9d04c8cef
acb613677ffa

View File

@ -0,0 +1,64 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Proxy Group Mapping
Revision ID: acb613677ffa
"""
# revision identifiers, used by Alembic.
revision = 'acb613677ffa'
down_revision = 'c2a9d04c8cef'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'gp_group_proxy_mappings',
sa.Column('policy_target_group_id', sa.String(length=36),
nullable=False),
sa.Column('proxied_group_id', sa.String(length=36)),
sa.Column('proxy_group_id', sa.String(length=36)),
sa.Column('proxy_type', sa.String(length=24)),
sa.PrimaryKeyConstraint('policy_target_group_id'),
sa.ForeignKeyConstraint(['policy_target_group_id'],
['gp_policy_target_groups.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['proxied_group_id'],
['gp_policy_target_groups.id']),
sa.ForeignKeyConstraint(['proxy_group_id'],
['gp_policy_target_groups.id'],
ondelete='SET NULL')
)
op.create_table(
'gp_proxy_gateway_mappings',
sa.Column('policy_target_id', sa.String(length=36), nullable=False),
sa.Column('proxy_gateway', sa.Boolean, nullable=False, default=False),
sa.Column('group_default_gateway', sa.Boolean, nullable=False,
default=False),
sa.PrimaryKeyConstraint('policy_target_id'),
sa.ForeignKeyConstraint(['policy_target_id'],
['gp_policy_targets.id'],
ondelete='CASCADE',
name='group_proxy_mapping_fk_ptg_id'),
)
def downgrade():
pass

View File

@ -0,0 +1,103 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as nexc
from gbpservice.neutron.extensions import group_policy as gp
from gbpservice.neutron.services.grouppolicy.common import exceptions as gp_exc
PROXY_TYPE_L2 = 'l2'
PROXY_TYPE_L3 = 'l3'
DEFAULT_PROXY_TYPE = PROXY_TYPE_L3
class ProxyGroupBadRequest(gp_exc.GroupPolicyBadRequest):
message = _("Invalid input for Proxy Group extension, reason: %(msg)s")
class InvalidProxiedGroup(nexc.InvalidInput, ProxyGroupBadRequest):
message = _("Proxied group %(group_id)s already has a proxy.")
class ProxyTypeSetWithoutProxiedPTG(nexc.InvalidInput, ProxyGroupBadRequest):
message = _("Proxy type can't be set without a proxied PTG.")
class InvalidProxyGatewayGroup(nexc.InvalidInput, ProxyGroupBadRequest):
message = _("Proxy gateway can't be set for non proxy PTG %(group_id)s.")
EXTENDED_ATTRIBUTES_2_0 = {
gp.POLICY_TARGET_GROUPS: {
'proxied_group_id': {
'allow_post': True, 'allow_put': False,
'validate': {'type:uuid_or_none': None}, 'is_visible': True,
'default': attr.ATTR_NOT_SPECIFIED,
'enforce_policy': True},
'proxy_type': {
'allow_post': True, 'allow_put': False,
'validate': {'type:values': ['l2', 'l3', None]},
'is_visible': True, 'default': attr.ATTR_NOT_SPECIFIED,
'enforce_policy': True},
'proxy_group_id': {
'allow_post': False, 'allow_put': False,
'validate': {'type:uuid_or_none': None}, 'is_visible': True,
'enforce_policy': True},
},
gp.POLICY_TARGETS: {
# This policy target will be used to reach the -proxied- PTG
'proxy_gateway': {
'allow_post': True, 'allow_put': False, 'default': False,
'convert_to': attr.convert_to_boolean,
'is_visible': True, 'required_by_policy': True,
'enforce_policy': True},
# This policy target is the default gateway for the -current- PTG
# Only for internal use.
'group_default_gateway': {
'allow_post': True, 'allow_put': False, 'default': False,
'convert_to': attr.convert_to_boolean,
'is_visible': True, 'required_by_policy': True,
'enforce_policy': True},
},
}
class Driver_proxy_group(extensions.ExtensionDescriptor):
@classmethod
def get_name(cls):
return "group policy poxy group extension"
@classmethod
def get_alias(cls):
return "proxy_group"
@classmethod
def get_description(cls):
return _("Add proxy_group_id attribute to policy target groups.")
@classmethod
def get_namespace(cls):
return ("http://docs.openstack.org/ext/neutron/grouppolicy/"
"proxy_group/api/v1.0")
@classmethod
def get_updated(cls):
return "2015-07-31T10:00:00-00:00"
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}

View File

@ -0,0 +1,93 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes
from oslo_log import log as logging
from gbpservice.neutron.db.grouppolicy.extensions import group_proxy_db as db
from gbpservice.neutron.extensions import driver_proxy_group
from gbpservice.neutron.services.grouppolicy import (
group_policy_driver_api as api)
LOG = logging.getLogger(__name__)
class ProxyGroupDriver(api.ExtensionDriver):
_supported_extension_alias = 'proxy_group'
_extension_dict = driver_proxy_group.EXTENDED_ATTRIBUTES_2_0
def initialize(self):
pass
@property
def extension_alias(self):
return self._supported_extension_alias
@api.default_extension_behavior(db.GroupProxyMapping)
def process_create_policy_target_group(self, session, data, result):
data = data['policy_target_group']
proxied = data.get('proxied_group_id')
if attributes.is_attr_set(proxied):
# Set value for proxied group
record = (session.query(db.GroupProxyMapping).filter_by(
policy_target_group_id=proxied).first())
if record:
if record.proxy_group_id:
raise driver_proxy_group.InvalidProxiedGroup(
group_id=proxied)
record.proxy_group_id = result['id']
else:
# Record may not exist for that PTG yet
record = db.GroupProxyMapping(
policy_target_group_id=proxied,
proxy_group_id=result['id'],
proxied_group_id=None)
session.add(record)
if not attributes.is_attr_set(data.get('proxy_type')):
data['proxy_type'] = driver_proxy_group.DEFAULT_PROXY_TYPE
record = (session.query(db.GroupProxyMapping).filter_by(
policy_target_group_id=result['id']).one())
record.proxy_type = data['proxy_type']
result['proxy_type'] = data['proxy_type']
elif attributes.is_attr_set(data.get('proxy_type')):
raise driver_proxy_group.ProxyTypeSetWithoutProxiedPTG()
@api.default_extension_behavior(db.GroupProxyMapping)
def process_update_policy_target_group(self, session, data, result):
pass
@api.default_extension_behavior(db.GroupProxyMapping)
def extend_policy_target_group_dict(self, session, result):
pass
@api.default_extension_behavior(db.ProxyGatewayMapping)
def process_create_policy_target(self, session, data, result):
self._validate_proxy_gateway(session, data, result)
@api.default_extension_behavior(db.ProxyGatewayMapping)
def process_update_policy_target(self, session, data, result):
self._validate_proxy_gateway(session, data, result)
@api.default_extension_behavior(db.ProxyGatewayMapping)
def extend_policy_target_dict(self, session, result):
pass
def _validate_proxy_gateway(self, session, data, result):
data = data['policy_target']
if data.get('proxy_gateway'):
ptg_id = result['policy_target_group_id']
record = session.query(db.GroupProxyMapping).filter_by(
proxy_group_id=ptg_id).first()
if not record:
# The group of this PT is not a proxy
raise driver_proxy_group.InvalidProxyGatewayGroup(
group_id=ptg_id)

View File

@ -11,10 +11,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron.common import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log
import stevedore
from gbpservice.neutron.services.grouppolicy.common import exceptions as gp_exc
LOG = log.getLogger(__name__)
@ -68,6 +71,8 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
for driver in self.ordered_ext_drivers:
try:
getattr(driver.obj, method_name)(session, data, result)
except (gp_exc.GroupPolicyException, n_exc.NeutronException):
raise
except Exception:
LOG.exception(
_("Extension driver '%(name)s' failed in %(method)s"),

View File

@ -12,7 +12,14 @@
import abc
from neutron.api.v2 import attributes
from oslo_log import log as logging
import six
from sqlalchemy.orm import exc as orm_exc
from gbpservice.common import utils
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
@ -1661,4 +1668,105 @@ class ExtensionDriver(object):
driver calls and/or returned as the result of a
nat_pool operation.
"""
pass
pass
def _default_process_create(self, session, data, result, type=None,
table=None, keys=None):
"""Default process create behavior.
Gives a default data storing behavior in order to avoid code
duplication across drivers. Use multiple times to fill multiple
tables if needed.
"""
kwargs = dict((x, data[type][x] if
attributes.is_attr_set(data[type][x]) else None)
for x in keys)
kwargs[type + '_' + 'id'] = result['id']
record = table(**kwargs)
session.add(record)
del kwargs[type + '_' + 'id']
result.update(kwargs)
def _default_process_update(self, session, data, result, type=None,
table=None, keys=None):
"""Default process update behavior.
Gives a default data storing behavior in order to avoid code
duplication across drivers. Use multiple times to fill multiple
tables if needed.
"""
try:
record = (session.query(table).filter_by(**{type + '_' + 'id':
result['id']}).one())
except orm_exc.NoResultFound:
# TODO(ivar) This is a preexisting object. For now just ignore
# this and return. Each extension driver should be able to specify
# a default behavior in case this happens.
return
for key in keys:
value = data[type].get(key)
if attributes.is_attr_set(value) and value != getattr(record, key):
setattr(record, key, value)
result[key] = getattr(record, key)
def _default_extend_dict(self, session, result, type=None,
table=None, keys=None):
"""Default dictionary extension behavior.
Gives a default dictionary extension behavior in order to avoid code
duplication across drivers. Use multiple times to fill from multiple
tables if needed.
"""
try:
record = (session.query(table).filter_by(**{type + '_' + 'id':
result['id']}).one())
except orm_exc.NoResultFound:
# TODO(ivar) This is a preexisting object. For now just ignore
# this and return. Each extension driver should be able to specify
# a default behavior in case this happens.
return
for key in keys:
result[key] = getattr(record, key)
def default_extension_behavior(table, keys=None):
def wrap(func):
def inner(inst, *args):
def filter_keys(inst, data, type):
plural = utils.get_resource_plural(type)
if keys:
return keys
definition = inst._extension_dict[plural]
return [x for x in definition if (x in data[type] if data else
True)]
name = func.__name__
if name.startswith('process_create_'):
# call default process create
type = name[len('process_create_'):]
inst._default_process_create(*args, type=type, table=table,
keys=filter_keys(inst, args[1], type))
# Complete result dictionary with unfiltered attributes
inst._default_extend_dict(args[0], args[2], type=type,
table=table,
keys=filter_keys(inst, None, type))
elif name.startswith('process_update_'):
# call default process update
type = name[len('process_update_'):]
inst._default_process_update(*args, type=type, table=table,
keys=filter_keys(inst, args[1], type))
# Complete result dictionary with unfiltered attributes
inst._default_extend_dict(args[0], args[2], type=type,
table=table,
keys=filter_keys(inst, None, type))
elif name.startswith('extend_') and name.endswith('_dict'):
# call default extend dict
type = name[len('extend_'):-len('_dict')]
inst._default_extend_dict(*args, type=type, table=table,
keys=filter_keys(inst, None, type))
# Now exec the actual function for postprocessing
func(inst, *args)
return inner
return wrap

View File

@ -13,6 +13,8 @@
import webob.exc
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit import testlib_api
@ -49,6 +51,8 @@ class GroupPolicyMappingDbTestCase(tgpdb.GroupPolicyDbTestCase,
core_plugin=core_plugin, gp_plugin=gp_plugin,
service_plugins=service_plugins
)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
class TestMappedGroupResources(GroupPolicyMappingDbTestCase,

View File

@ -12,7 +12,7 @@
import os
from neutron.api.v2 import attributes
from neutron.common import config # noqa
from neutron.db import model_base
import sqlalchemy as sa
@ -24,55 +24,59 @@ from gbpservice.neutron.tests.unit.services.grouppolicy import (
extensions as test_ext)
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_grouppolicy_plugin as test_plugin)
from gbpservice.neutron.tests.unit.services.grouppolicy.extensions import (
test_extension as test_extension)
class ExtensionDriverTestCase(
test_plugin.GroupPolicyPluginTestCase):
class ExtensionDriverTestBase(test_plugin.GroupPolicyPluginTestCase):
_extension_drivers = ['test']
_extension_path = os.path.dirname(os.path.abspath(test_ext.__file__))
def setUp(self):
config.cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='group_policy')
config.cfg.CONF.set_override(
'api_extensions_path',
os.path.dirname(os.path.abspath(test_ext.__file__)))
super(ExtensionDriverTestCase, self).setUp()
if self._extension_path:
config.cfg.CONF.set_override(
'api_extensions_path', self._extension_path)
super(ExtensionDriverTestBase, self).setUp()
class ExtensionDriverTestCase(ExtensionDriverTestBase):
def test_pt_attr(self):
# Test create with default value.
pt = self.create_policy_target()
pt_id = pt['policy_target']['id']
policy_target_id = pt['policy_target']['id']
val = pt['policy_target']['pt_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_targets', pt_id)
self.assertIsNone(val)
req = self.new_show_request('policy_targets', policy_target_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target']['pt_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_targets')
val = res['policy_targets'][0]['pt_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
pt = self.create_policy_target(pt_extension="abc")
pt_id = pt['policy_target']['id']
policy_target_id = pt['policy_target']['id']
val = pt['policy_target']['pt_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_targets', pt_id)
req = self.new_show_request('policy_targets', policy_target_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target']['pt_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_target': {'pt_extension': "def"}}
req = self.new_update_request('policy_targets', data, pt_id)
req = self.new_update_request('policy_targets', data, policy_target_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target']['pt_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_targets', pt_id)
req = self.new_show_request('policy_targets', policy_target_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target']['pt_extension']
self.assertEqual("def", val)
@ -80,36 +84,40 @@ class ExtensionDriverTestCase(
def test_ptg_attr(self):
# Test create with default value.
ptg = self.create_policy_target_group()
ptg_id = ptg['policy_target_group']['id']
policy_target_group_id = ptg['policy_target_group']['id']
val = ptg['policy_target_group']['ptg_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_target_groups', ptg_id)
self.assertIsNone(val)
req = self.new_show_request('policy_target_groups',
policy_target_group_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target_group']['ptg_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_target_groups')
val = res['policy_target_groups'][0]['ptg_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
ptg = self.create_policy_target_group(ptg_extension="abc")
ptg_id = ptg['policy_target_group']['id']
policy_target_group_id = ptg['policy_target_group']['id']
val = ptg['policy_target_group']['ptg_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_target_groups', ptg_id)
req = self.new_show_request('policy_target_groups',
policy_target_group_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target_group']['ptg_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_target_group': {'ptg_extension': "def"}}
req = self.new_update_request('policy_target_groups', data, ptg_id)
req = self.new_update_request('policy_target_groups', data,
policy_target_group_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target_group']['ptg_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_target_groups', ptg_id)
req = self.new_show_request('policy_target_groups',
policy_target_group_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_target_group']['ptg_extension']
self.assertEqual("def", val)
@ -117,36 +125,36 @@ class ExtensionDriverTestCase(
def test_l2p_attr(self):
# Test create with default value.
l2p = self.create_l2_policy()
l2p_id = l2p['l2_policy']['id']
l2_policy_id = l2p['l2_policy']['id']
val = l2p['l2_policy']['l2p_extension']
self.assertEqual("", val)
req = self.new_show_request('l2_policies', l2p_id)
self.assertIsNone(val)
req = self.new_show_request('l2_policies', l2_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l2_policy']['l2p_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('l2_policies')
val = res['l2_policies'][0]['l2p_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
l2p = self.create_l2_policy(l2p_extension="abc")
l2p_id = l2p['l2_policy']['id']
l2_policy_id = l2p['l2_policy']['id']
val = l2p['l2_policy']['l2p_extension']
self.assertEqual("abc", val)
req = self.new_show_request('l2_policies', l2p_id)
req = self.new_show_request('l2_policies', l2_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l2_policy']['l2p_extension']
self.assertEqual("abc", val)
# Test update.
data = {'l2_policy': {'l2p_extension': "def"}}
req = self.new_update_request('l2_policies', data, l2p_id)
req = self.new_update_request('l2_policies', data, l2_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l2_policy']['l2p_extension']
self.assertEqual("def", val)
req = self.new_show_request('l2_policies', l2p_id)
req = self.new_show_request('l2_policies', l2_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l2_policy']['l2p_extension']
self.assertEqual("def", val)
@ -154,36 +162,36 @@ class ExtensionDriverTestCase(
def test_l3p_attr(self):
# Test create with default value.
l3p = self.create_l3_policy()
l3p_id = l3p['l3_policy']['id']
l3_policy_id = l3p['l3_policy']['id']
val = l3p['l3_policy']['l3p_extension']
self.assertEqual("", val)
req = self.new_show_request('l3_policies', l3p_id)
self.assertIsNone(val)
req = self.new_show_request('l3_policies', l3_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l3_policy']['l3p_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('l3_policies')
val = res['l3_policies'][0]['l3p_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
l3p = self.create_l3_policy(l3p_extension="abc")
l3p_id = l3p['l3_policy']['id']
l3_policy_id = l3p['l3_policy']['id']
val = l3p['l3_policy']['l3p_extension']
self.assertEqual("abc", val)
req = self.new_show_request('l3_policies', l3p_id)
req = self.new_show_request('l3_policies', l3_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l3_policy']['l3p_extension']
self.assertEqual("abc", val)
# Test update.
data = {'l3_policy': {'l3p_extension': "def"}}
req = self.new_update_request('l3_policies', data, l3p_id)
req = self.new_update_request('l3_policies', data, l3_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l3_policy']['l3p_extension']
self.assertEqual("def", val)
req = self.new_show_request('l3_policies', l3p_id)
req = self.new_show_request('l3_policies', l3_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['l3_policy']['l3p_extension']
self.assertEqual("def", val)
@ -191,36 +199,37 @@ class ExtensionDriverTestCase(
def test_pc_attr(self):
# Test create with default value.
pc = self.create_policy_classifier()
pc_id = pc['policy_classifier']['id']
policy_classifier_id = pc['policy_classifier']['id']
val = pc['policy_classifier']['pc_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_classifiers', pc_id)
self.assertIsNone(val)
req = self.new_show_request('policy_classifiers', policy_classifier_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_classifier']['pc_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_classifiers')
val = res['policy_classifiers'][0]['pc_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
pc = self.create_policy_classifier(pc_extension="abc")
pc_id = pc['policy_classifier']['id']
policy_classifier_id = pc['policy_classifier']['id']
val = pc['policy_classifier']['pc_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_classifiers', pc_id)
req = self.new_show_request('policy_classifiers', policy_classifier_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_classifier']['pc_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_classifier': {'pc_extension': "def"}}
req = self.new_update_request('policy_classifiers', data, pc_id)
req = self.new_update_request('policy_classifiers', data,
policy_classifier_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_classifier']['pc_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_classifiers', pc_id)
req = self.new_show_request('policy_classifiers', policy_classifier_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_classifier']['pc_extension']
self.assertEqual("def", val)
@ -228,36 +237,36 @@ class ExtensionDriverTestCase(
def test_pa_attr(self):
# Test create with default value.
pa = self.create_policy_action()
pa_id = pa['policy_action']['id']
policy_action_id = pa['policy_action']['id']
val = pa['policy_action']['pa_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_actions', pa_id)
self.assertIsNone(val)
req = self.new_show_request('policy_actions', policy_action_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_action']['pa_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_actions')
val = res['policy_actions'][0]['pa_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
pa = self.create_policy_action(pa_extension="abc")
pa_id = pa['policy_action']['id']
policy_action_id = pa['policy_action']['id']
val = pa['policy_action']['pa_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_actions', pa_id)
req = self.new_show_request('policy_actions', policy_action_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_action']['pa_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_action': {'pa_extension': "def"}}
req = self.new_update_request('policy_actions', data, pa_id)
req = self.new_update_request('policy_actions', data, policy_action_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_action']['pa_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_actions', pa_id)
req = self.new_show_request('policy_actions', policy_action_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_action']['pa_extension']
self.assertEqual("def", val)
@ -271,37 +280,37 @@ class ExtensionDriverTestCase(
# Test create with default value.
pr = self.create_policy_rule(policy_classifier_id=classifier_id)
pr_id = pr['policy_rule']['id']
policy_rule_id = pr['policy_rule']['id']
val = pr['policy_rule']['pr_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_rules', pr_id)
self.assertIsNone(val)
req = self.new_show_request('policy_rules', policy_rule_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule']['pr_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_rules')
val = res['policy_rules'][0]['pr_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
pr = self.create_policy_rule(policy_classifier_id=classifier_id,
pr_extension="abc")
pr_id = pr['policy_rule']['id']
policy_rule_id = pr['policy_rule']['id']
val = pr['policy_rule']['pr_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_rules', pr_id)
req = self.new_show_request('policy_rules', policy_rule_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule']['pr_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_rule': {'pr_extension': "def"}}
req = self.new_update_request('policy_rules', data, pr_id)
req = self.new_update_request('policy_rules', data, policy_rule_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule']['pr_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_rules', pr_id)
req = self.new_show_request('policy_rules', policy_rule_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule']['pr_extension']
self.assertEqual("def", val)
@ -309,36 +318,37 @@ class ExtensionDriverTestCase(
def test_prs_attr(self):
# Test create with default value.
prs = self.create_policy_rule_set(policy_rules=[])
prs_id = prs['policy_rule_set']['id']
policy_rule_set_id = prs['policy_rule_set']['id']
val = prs['policy_rule_set']['prs_extension']
self.assertEqual("", val)
req = self.new_show_request('policy_rule_sets', prs_id)
self.assertIsNone(val)
req = self.new_show_request('policy_rule_sets', policy_rule_set_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule_set']['prs_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('policy_rule_sets')
val = res['policy_rule_sets'][0]['prs_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
prs = self.create_policy_rule_set(policy_rules=[], prs_extension="abc")
prs_id = prs['policy_rule_set']['id']
policy_rule_set_id = prs['policy_rule_set']['id']
val = prs['policy_rule_set']['prs_extension']
self.assertEqual("abc", val)
req = self.new_show_request('policy_rule_sets', prs_id)
req = self.new_show_request('policy_rule_sets', policy_rule_set_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule_set']['prs_extension']
self.assertEqual("abc", val)
# Test update.
data = {'policy_rule_set': {'prs_extension': "def"}}
req = self.new_update_request('policy_rule_sets', data, prs_id)
req = self.new_update_request('policy_rule_sets', data,
policy_rule_set_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule_set']['prs_extension']
self.assertEqual("def", val)
req = self.new_show_request('policy_rule_sets', prs_id)
req = self.new_show_request('policy_rule_sets', policy_rule_set_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['policy_rule_set']['prs_extension']
self.assertEqual("def", val)
@ -346,36 +356,40 @@ class ExtensionDriverTestCase(
def test_nsp_attr(self):
# Test create with default value.
nsp = self.create_network_service_policy()
nsp_id = nsp['network_service_policy']['id']
network_service_policy_id = nsp['network_service_policy']['id']
val = nsp['network_service_policy']['nsp_extension']
self.assertEqual("", val)
req = self.new_show_request('network_service_policies', nsp_id)
self.assertIsNone(val)
req = self.new_show_request('network_service_policies',
network_service_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['network_service_policy']['nsp_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list('network_service_policies')
val = res['network_service_policies'][0]['nsp_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
nsp = self.create_network_service_policy(nsp_extension="abc")
nsp_id = nsp['network_service_policy']['id']
network_service_policy_id = nsp['network_service_policy']['id']
val = nsp['network_service_policy']['nsp_extension']
self.assertEqual("abc", val)
req = self.new_show_request('network_service_policies', nsp_id)
req = self.new_show_request('network_service_policies',
network_service_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['network_service_policy']['nsp_extension']
self.assertEqual("abc", val)
# Test update.
data = {'network_service_policy': {'nsp_extension': "def"}}
req = self.new_update_request('network_service_policies', data, nsp_id)
req = self.new_update_request('network_service_policies', data,
network_service_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['network_service_policy']['nsp_extension']
self.assertEqual("def", val)
req = self.new_show_request('network_service_policies', nsp_id)
req = self.new_show_request('network_service_policies',
network_service_policy_id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res['network_service_policy']['nsp_extension']
self.assertEqual("def", val)
@ -396,16 +410,16 @@ class ExtensionDriverTestCase(
obj = getattr(self, 'create_%s' % type)()
id = obj[type]['id']
val = obj[type][acronim + '_extension']
self.assertEqual("", val)
self.assertIsNone(val)
req = self.new_show_request(plural, id)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
val = res[type][acronim + '_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test list.
res = self._list(plural)
val = res[plural][0][acronim + '_extension']
self.assertEqual("", val)
self.assertIsNone(val)
# Test create with explict value.
kwargs = {acronim + '_extension': "abc"}
@ -432,114 +446,114 @@ class ExtensionDriverTestCase(
class TestPolicyTargetExtension(model_base.BASEV2):
__tablename__ = 'test_policy_target_extension'
pt_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_targets.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
policy_target_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_targets.id',
ondelete="CASCADE"),
primary_key=True)
pt_extension = sa.Column(sa.String(64))
class TestPolicyTargetGroupExtension(model_base.BASEV2):
__tablename__ = 'test_policy_target_group_extension'
ptg_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_target_groups.id',
policy_target_group_id = sa.Column(
sa.String(36), sa.ForeignKey('gp_policy_target_groups.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
primary_key=True)
ptg_extension = sa.Column(sa.String(64))
class TestL2PolicyExtension(model_base.BASEV2):
__tablename__ = 'test_l2_policy_extension'
l2p_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_l2_policies.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
l2_policy_id = sa.Column(sa.String(36), sa.ForeignKey('gp_l2_policies.id',
ondelete="CASCADE"),
primary_key=True)
l2p_extension = sa.Column(sa.String(64))
class TestL3PolicyExtension(model_base.BASEV2):
__tablename__ = 'test_l3_policy_extension'
l3p_id = sa.Column(sa.String(36),
l3_policy_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_l3_policies.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
l3p_extension = sa.Column(sa.String(64))
class TestPolicyClassifierExtension(model_base.BASEV2):
__tablename__ = 'test_policy_classifier_extension'
pc_id = sa.Column(sa.String(36),
policy_classifier_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_classifiers.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
pc_extension = sa.Column(sa.String(64))
class TestPolicyActionExtension(model_base.BASEV2):
__tablename__ = 'test_policy_action_extension'
pa_id = sa.Column(sa.String(36),
policy_action_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_actions.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
pa_extension = sa.Column(sa.String(64))
class TestPolicyRuleExtension(model_base.BASEV2):
__tablename__ = 'test_policy_rule_extension'
pr_id = sa.Column(sa.String(36),
policy_rule_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_rules.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
pr_extension = sa.Column(sa.String(64))
class TestPolicyRuleSetExtension(model_base.BASEV2):
__tablename__ = 'test_policy_rule_set_extension'
prs_id = sa.Column(sa.String(36),
policy_rule_set_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_policy_rule_sets.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
prs_extension = sa.Column(sa.String(64))
class TestNetworkServicePolicyExtension(model_base.BASEV2):
__tablename__ = 'test_network_service_policy_extension'
nsp_id = sa.Column(sa.String(36),
network_service_policy_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_network_service_policies.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
nsp_extension = sa.Column(sa.String(64))
class TestExternalSegmentExtension(model_base.BASEV2):
__tablename__ = 'test_external_segment_extension'
es_id = sa.Column(sa.String(36),
external_segment_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_external_segments.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
es_extension = sa.Column(sa.String(64))
class TestExternalPolicyExtension(model_base.BASEV2):
__tablename__ = 'test_external_policy_extension'
ep_id = sa.Column(sa.String(36),
external_policy_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_external_policies.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
ep_extension = sa.Column(sa.String(64))
class TestNatPoolExtension(model_base.BASEV2):
__tablename__ = 'test_nat_pool_extension'
np_id = sa.Column(sa.String(36),
nat_pool_id = sa.Column(sa.String(36),
sa.ForeignKey('gp_nat_pools.id',
ondelete="CASCADE"),
primary_key=True)
value = sa.Column(sa.String(64))
np_extension = sa.Column(sa.String(64))
class TestExtensionDriver(api.ExtensionDriver):
_supported_extension_alias = 'test_extension'
_extension_dict = test_extension.EXTENDED_ATTRIBUTES_2_0
def initialize(self):
pass
@ -548,275 +562,149 @@ class TestExtensionDriver(api.ExtensionDriver):
def extension_alias(self):
return self._supported_extension_alias
@api.default_extension_behavior(TestPolicyTargetExtension)
def process_create_policy_target(self, session, data, result):
value = data['policy_target']['pt_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyTargetExtension(pt_id=result['id'],
value=value)
session.add(record)
result['pt_extension'] = value
pass
@api.default_extension_behavior(TestPolicyTargetExtension)
def process_update_policy_target(self, session, data, result):
record = (session.query(TestPolicyTargetExtension).
filter_by(pt_id=result['id']).
one())
value = data['policy_target'].get('pt_extension')
if value and value != record.value:
record.value = value
result['pt_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyTargetExtension)
def extend_policy_target_dict(self, session, result):
record = (session.query(TestPolicyTargetExtension).
filter_by(pt_id=result['id']).
one())
result['pt_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyTargetGroupExtension)
def process_create_policy_target_group(self, session, data, result):
value = data['policy_target_group']['ptg_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyTargetGroupExtension(ptg_id=result['id'],
value=value)
session.add(record)
result['ptg_extension'] = value
pass
@api.default_extension_behavior(TestPolicyTargetGroupExtension)
def process_update_policy_target_group(self, session, data, result):
record = (session.query(TestPolicyTargetGroupExtension).
filter_by(ptg_id=result['id']).
one())
value = data['policy_target_group'].get('ptg_extension')
if value and value != record.value:
record.value = value
result['ptg_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyTargetGroupExtension)
def extend_policy_target_group_dict(self, session, result):
record = (session.query(TestPolicyTargetGroupExtension).
filter_by(ptg_id=result['id']).
one())
result['ptg_extension'] = record.value
pass
@api.default_extension_behavior(TestL2PolicyExtension)
def process_create_l2_policy(self, session, data, result):
value = data['l2_policy']['l2p_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestL2PolicyExtension(l2p_id=result['id'], value=value)
session.add(record)
result['l2p_extension'] = value
pass
@api.default_extension_behavior(TestL2PolicyExtension)
def process_update_l2_policy(self, session, data, result):
record = (session.query(TestL2PolicyExtension).
filter_by(l2p_id=result['id']).
one())
value = data['l2_policy'].get('l2p_extension')
if value and value != record.value:
record.value = value
result['l2p_extension'] = record.value
pass
@api.default_extension_behavior(TestL2PolicyExtension)
def extend_l2_policy_dict(self, session, result):
record = (session.query(TestL2PolicyExtension).
filter_by(l2p_id=result['id']).
one())
result['l2p_extension'] = record.value
pass
@api.default_extension_behavior(TestL3PolicyExtension)
def process_create_l3_policy(self, session, data, result):
value = data['l3_policy']['l3p_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestL3PolicyExtension(l3p_id=result['id'], value=value)
session.add(record)
result['l3p_extension'] = value
pass
@api.default_extension_behavior(TestL3PolicyExtension)
def process_update_l3_policy(self, session, data, result):
record = (session.query(TestL3PolicyExtension).
filter_by(l3p_id=result['id']).
one())
value = data['l3_policy'].get('l3p_extension')
if value and value != record.value:
record.value = value
result['l3p_extension'] = record.value
pass
@api.default_extension_behavior(TestL3PolicyExtension)
def extend_l3_policy_dict(self, session, result):
record = (session.query(TestL3PolicyExtension).
filter_by(l3p_id=result['id']).
one())
result['l3p_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyClassifierExtension)
def process_create_policy_classifier(self, session, data, result):
value = data['policy_classifier']['pc_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyClassifierExtension(pc_id=result['id'], value=value)
session.add(record)
result['pc_extension'] = value
pass
@api.default_extension_behavior(TestPolicyClassifierExtension)
def process_update_policy_classifier(self, session, data, result):
record = (session.query(TestPolicyClassifierExtension).
filter_by(pc_id=result['id']).
one())
value = data['policy_classifier'].get('pc_extension')
if value and value != record.value:
record.value = value
result['pc_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyClassifierExtension)
def extend_policy_classifier_dict(self, session, result):
record = (session.query(TestPolicyClassifierExtension).
filter_by(pc_id=result['id']).
one())
result['pc_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyActionExtension)
def process_create_policy_action(self, session, data, result):
value = data['policy_action']['pa_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyActionExtension(pa_id=result['id'], value=value)
session.add(record)
result['pa_extension'] = value
pass
@api.default_extension_behavior(TestPolicyActionExtension)
def process_update_policy_action(self, session, data, result):
record = (session.query(TestPolicyActionExtension).
filter_by(pa_id=result['id']).
one())
value = data['policy_action'].get('pa_extension')
if value and value != record.value:
record.value = value
result['pa_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyActionExtension)
def extend_policy_action_dict(self, session, result):
record = (session.query(TestPolicyActionExtension).
filter_by(pa_id=result['id']).
one())
result['pa_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyRuleExtension)
def process_create_policy_rule(self, session, data, result):
value = data['policy_rule']['pr_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyRuleExtension(pr_id=result['id'], value=value)
session.add(record)
result['pr_extension'] = value
pass
@api.default_extension_behavior(TestPolicyRuleExtension)
def process_update_policy_rule(self, session, data, result):
record = (session.query(TestPolicyRuleExtension).
filter_by(pr_id=result['id']).
one())
value = data['policy_rule'].get('pr_extension')
if value and value != record.value:
record.value = value
result['pr_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyRuleExtension)
def extend_policy_rule_dict(self, session, result):
record = (session.query(TestPolicyRuleExtension).
filter_by(pr_id=result['id']).
one())
result['pr_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyRuleSetExtension)
def process_create_policy_rule_set(self, session, data, result):
value = data['policy_rule_set']['prs_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestPolicyRuleSetExtension(prs_id=result['id'], value=value)
session.add(record)
result['prs_extension'] = value
pass
@api.default_extension_behavior(TestPolicyRuleSetExtension)
def process_update_policy_rule_set(self, session, data, result):
record = (session.query(TestPolicyRuleSetExtension).
filter_by(prs_id=result['id']).
one())
value = data['policy_rule_set'].get('prs_extension')
if value and value != record.value:
record.value = value
result['prs_extension'] = record.value
pass
@api.default_extension_behavior(TestPolicyRuleSetExtension)
def extend_policy_rule_set_dict(self, session, result):
record = (session.query(TestPolicyRuleSetExtension).
filter_by(prs_id=result['id']).
one())
result['prs_extension'] = record.value
pass
@api.default_extension_behavior(TestNetworkServicePolicyExtension)
def process_create_network_service_policy(self, session, data, result):
value = data['network_service_policy']['nsp_extension']
if not attributes.is_attr_set(value):
value = ''
record = TestNetworkServicePolicyExtension(nsp_id=result['id'],
value=value)
session.add(record)
result['nsp_extension'] = value
pass
@api.default_extension_behavior(TestNetworkServicePolicyExtension)
def process_update_network_service_policy(self, session, data, result):
record = (session.query(TestNetworkServicePolicyExtension).
filter_by(nsp_id=result['id']).
one())
value = data['network_service_policy'].get('nsp_extension')
if value and value != record.value:
record.value = value
result['nsp_extension'] = record.value
pass
@api.default_extension_behavior(TestNetworkServicePolicyExtension)
def extend_network_service_policy_dict(self, session, result):
record = (session.query(TestNetworkServicePolicyExtension).
filter_by(nsp_id=result['id']).
one())
result['nsp_extension'] = record.value
pass
@api.default_extension_behavior(TestExternalSegmentExtension)
def process_create_external_segment(self, session, data, result):
self._process_create(session, data, result, 'external_segment',
TestExternalSegmentExtension)
pass
@api.default_extension_behavior(TestExternalSegmentExtension)
def process_update_external_segment(self, session, data, result):
self._process_update(session, data, result, 'external_segment',
TestExternalSegmentExtension)
pass
@api.default_extension_behavior(TestExternalSegmentExtension)
def extend_external_segment_dict(self, session, result):
self._extend(session, result, 'external_segment',
TestExternalSegmentExtension)
pass
@api.default_extension_behavior(TestExternalPolicyExtension)
def process_create_external_policy(self, session, data, result):
self._process_create(session, data, result, 'external_policy',
TestExternalPolicyExtension)
pass
@api.default_extension_behavior(TestExternalPolicyExtension)
def process_update_external_policy(self, session, data, result):
self._process_update(session, data, result, 'external_policy',
TestExternalPolicyExtension)
pass
@api.default_extension_behavior(TestExternalPolicyExtension)
def extend_external_policy_dict(self, session, result):
self._extend(session, result, 'external_policy',
TestExternalPolicyExtension)
pass
@api.default_extension_behavior(TestNatPoolExtension)
def process_create_nat_pool(self, session, data, result):
self._process_create(session, data, result, 'nat_pool',
TestNatPoolExtension)
pass
@api.default_extension_behavior(TestNatPoolExtension)
def process_update_nat_pool(self, session, data, result):
self._process_update(session, data, result, 'nat_pool',
TestNatPoolExtension)
pass
@api.default_extension_behavior(TestNatPoolExtension)
def extend_nat_pool_dict(self, session, result):
self._extend(session, result, 'nat_pool', TestNatPoolExtension)
def _process_create(self, session, data, result, type, klass):
acronim = _acronim(type)
value = data[type][acronim + '_extension']
if not attributes.is_attr_set(value):
value = ''
kwargs = {acronim + '_id': result['id'], 'value': value}
record = klass(**kwargs)
session.add(record)
result[acronim + '_extension'] = value
def _process_update(self, session, data, result, type, klass):
acronim = _acronim(type)
kwargs = {acronim + '_id': result['id']}
record = session.query(klass).filter_by(**kwargs).one()
value = data[type].get(acronim + '_extension')
if value and value != record.value:
record.value = value
result[acronim + '_extension'] = record.value
def _extend(self, session, result, type, klass):
acronim = _acronim(type)
kwargs = {acronim + '_id': result['id']}
record = session.query(klass).filter_by(**kwargs).one()
result[acronim + '_extension'] = record.value
pass
def _acronim(type):

View File

@ -0,0 +1,143 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron import context as n_ctx
from sqlalchemy.orm import exc as orm_exc
from gbpservice.neutron.db.grouppolicy.extensions import group_proxy_db
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_extension_driver_api as test_ext_base)
class ExtensionDriverTestCase(test_ext_base.ExtensionDriverTestBase):
_extension_drivers = ['proxy_group']
_extension_path = None
def test_proxy_group_extension(self):
ptg = self.create_policy_target_group()['policy_target_group']
self.assertIsNone(ptg['proxy_group_id'])
self.assertIsNone(ptg['proxied_group_id'])
self.assertIsNone(ptg['proxy_type'])
ptg_proxy = self.create_policy_target_group(
proxied_group_id=ptg['id'])['policy_target_group']
self.assertIsNone(ptg_proxy['proxy_group_id'])
self.assertEqual(ptg['id'], ptg_proxy['proxied_group_id'])
self.assertEqual('l3', ptg_proxy['proxy_type'])
# Verify relationship added
ptg = self.show_policy_target_group(ptg['id'])['policy_target_group']
self.assertEqual(ptg_proxy['id'], ptg['proxy_group_id'])
self.assertIsNone(ptg['proxied_group_id'])
pt = self.create_policy_target(
policy_target_group_id=ptg_proxy['id'])['policy_target']
self.assertFalse(pt['proxy_gateway'])
self.assertFalse(pt['group_default_gateway'])
pt = self.create_policy_target(
policy_target_group_id=ptg_proxy['id'],
proxy_gateway=True, group_default_gateway=True)['policy_target']
self.assertTrue(pt['proxy_gateway'])
self.assertTrue(pt['group_default_gateway'])
pt = self.show_policy_target(pt['id'])['policy_target']
self.assertTrue(pt['proxy_gateway'])
self.assertTrue(pt['group_default_gateway'])
def test_preexisting_pt(self):
ptg = self.create_policy_target_group()['policy_target_group']
pt = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self.assertTrue('proxy_gateway' in pt)
self.assertTrue('group_default_gateway' in pt)
# Forcefully delete the entry in the proxy table, and verify that it's
# fixed by the subsequent GET
admin_context = n_ctx.get_admin_context()
mapping = admin_context.session.query(
group_proxy_db.ProxyGatewayMapping).filter_by(
policy_target_id=pt['id']).one()
admin_context.session.delete(mapping)
query = admin_context.session.query(
group_proxy_db.ProxyGatewayMapping).filter_by(
policy_target_id=pt['id'])
self.assertRaises(orm_exc.NoResultFound, query.one)
# Showing the object just ignores the extension
pt = self.show_policy_target(pt['id'],
expected_res_status=200)['policy_target']
self.assertFalse('proxy_gateway' in pt)
self.assertFalse('group_default_gateway' in pt)
# Updating the object just ignores the extension
pt = self.update_policy_target(
pt['id'], name='somenewname',
expected_res_status=200)['policy_target']
self.assertEqual('somenewname', pt['name'])
self.assertFalse('proxy_gateway' in pt)
self.assertFalse('group_default_gateway' in pt)
def test_proxy_group_multiple_proxies(self):
# same PTG proxied multiple times will fail
ptg = self.create_policy_target_group()['policy_target_group']
self.create_policy_target_group(proxied_group_id=ptg['id'])
# Second proxy will fail
res = self.create_policy_target_group(proxied_group_id=ptg['id'],
expected_res_status=400)
self.assertEqual('InvalidProxiedGroup', res['NeutronError']['type'])
def test_proxy_group_chain_proxy(self):
# Verify no error is raised when chaining multiple proxy PTGs
ptg0 = self.create_policy_target_group()['policy_target_group']
ptg1 = self.create_policy_target_group(
proxied_group_id=ptg0['id'],
expected_res_status=201)['policy_target_group']
self.create_policy_target_group(proxied_group_id=ptg1['id'],
expected_res_status=201)
def test_proxy_group_no_update(self):
ptg0 = self.create_policy_target_group()['policy_target_group']
ptg1 = self.create_policy_target_group()['policy_target_group']
ptg_proxy = self.create_policy_target_group(
proxied_group_id=ptg0['id'])['policy_target_group']
self.update_policy_target_group(
ptg_proxy['id'], proxied_group_id=ptg1['id'],
expected_res_status=400)
def test_different_proxy_type(self):
ptg = self.create_policy_target_group()['policy_target_group']
ptg_proxy = self.create_policy_target_group(
proxied_group_id=ptg['id'], proxy_type='l2')['policy_target_group']
self.assertEqual('l2', ptg_proxy['proxy_type'])
ptg_proxy = self.show_policy_target_group(
ptg_proxy['id'])['policy_target_group']
self.assertEqual('l2', ptg_proxy['proxy_type'])
def test_proxy_type_fails(self):
ptg = self.create_policy_target_group()['policy_target_group']
res = self.create_policy_target_group(proxy_type='l2',
expected_res_status=400)
self.assertEqual('ProxyTypeSetWithoutProxiedPTG',
res['NeutronError']['type'])
self.create_policy_target_group(proxied_group_id=ptg['id'],
proxy_type='notvalid',
expected_res_status=400)
def test_proxy_gateway_no_proxy(self):
ptg = self.create_policy_target_group()['policy_target_group']
res = self.create_policy_target(
policy_target_group_id=ptg['id'], proxy_gateway=True,
expected_res_status=400)
self.assertEqual('InvalidProxyGatewayGroup',
res['NeutronError']['type'])

View File

@ -47,6 +47,7 @@ neutron.service_plugins =
apic_gbp_l3 = gbpservice.neutron.services.l3_router.l3_apic:ApicGBPL3ServicePlugin
gbpservice.neutron.group_policy.extension_drivers =
test = gbpservice.neutron.tests.unit.services.grouppolicy.test_extension_driver_api:TestExtensionDriver
proxy_group = gbpservice.neutron.services.grouppolicy.drivers.extensions.proxy_group_driver:ProxyGroupDriver
gbpservice.neutron.group_policy.policy_drivers =
dummy = gbpservice.neutron.services.grouppolicy.drivers.dummy_driver:NoopDriver
implicit_policy = gbpservice.neutron.services.grouppolicy.drivers.implicit_policy:ImplicitPolicyDriver