Merge "Fix for cross tenancy issue with neutron" into stable/liberty

This commit is contained in:
Jenkins
2016-07-30 06:28:23 +00:00
committed by Gerrit Code Review
2 changed files with 52 additions and 0 deletions

View File

@@ -13,6 +13,8 @@
import netaddr
import operator
from keystoneclient import exceptions as k_exceptions
from keystoneclient.v2_0 import client as k_client
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
@@ -24,6 +26,7 @@ from neutron.extensions import l3 as ext_l3
from neutron.extensions import securitygroup as ext_sg
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy as sa
from gbpservice.common import utils
@@ -119,6 +122,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
@log.log
def initialize(self):
self._cached_agent_notifier = None
self._resource_owner_tenant_id = None
def _reject_shared(self, object, type):
if object.get('shared'):
@@ -135,6 +139,12 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
CrossTenantPolicyTargetGroupL2PolicyNotSupported())
def _reject_cross_tenant_l2p_l3p(self, context):
if context.current['tenant_id'] == self.resource_owner_tenant_id:
# Relax cross tenancy condition when current tenant id is admin.
# Relaxing when l2policy tenant id is of admin, to address the
# case for proxy group where l2policy belongs to admin tenant
# but l3policy belongs to user tenant.
return
# Can't create non shared L2p on a shared L3p
if context.current['l3_policy_id']:
l3p = context._plugin.get_l3_policy(
@@ -143,6 +153,31 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
if l3p['tenant_id'] != context.current['tenant_id']:
raise exc.CrossTenantL2PolicyL3PolicyNotSupported()
@property
def resource_owner_tenant_id(self):
if not self._resource_owner_tenant_id:
self._resource_owner_tenant_id = (
self._get_resource_owner_tenant_id())
return self._resource_owner_tenant_id
def _get_resource_owner_tenant_id(self):
# Returns service tenant id, which specified in neutron conf
try:
user, pwd, tenant, auth_url = utils.get_keystone_creds()
keystoneclient = k_client.Client(username=user, password=pwd,
auth_url=auth_url)
tenant = keystoneclient.tenants.find(name=tenant)
return tenant.id
except k_exceptions.NotFound:
with excutils.save_and_reraise_exception(reraise=True):
LOG.error(_('No tenant with name %s exists.'), tenant)
except k_exceptions.NoUniqueMatch:
with excutils.save_and_reraise_exception(reraise=True):
LOG.error(_('Multiple tenants matches found for %s'), tenant)
except k_exceptions.AuthorizationFailure:
LOG.error(_("User: %(user)s dont have permissions"),
{'user': user})
def _reject_non_shared_net_on_shared_l2p(self, context):
if context.current.get('shared') and context.current['network_id']:
net = self._get_network(

View File

@@ -95,6 +95,12 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
plugins = manager.NeutronManager.get_service_plugins()
self._gbp_plugin = plugins.get(pconst.GROUP_POLICY)
self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT)
self.saved_keystone_client = resource_mapping.k_client.Client
resource_mapping.k_client.Client = mock.Mock()
def tearDown(self):
resource_mapping.k_client.Client = self.saved_keystone_client
super(ResourceMappingTestCase, self).tearDown()
def get_plugin_context(self):
return self._plugin, self._context
@@ -1299,6 +1305,17 @@ class TestPolicyTargetGroup(ResourceMappingTestCase):
def test_cross_tenant_prs_admin(self):
self._test_cross_tenant_prs(admin=True)
def test_cross_tenant_l2p(self):
l2p = self.create_l2_policy(name="l2p1", tenant_id='anothertenant')
l2p_id = l2p['l2_policy']['id']
data = {'policy_target_group': {'l2_policy_id': l2p_id,
'tenant_id': 'admin'}}
req = self.new_create_request('policy_target_groups', data)
data = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual('CrossTenantPolicyTargetGroupL2PolicyNotSupported',
data['NeutronError']['type'])
def test_l2p_update_rejected(self):
# Create two l2 policies.
l2p1 = self.create_l2_policy(name="l2p1")