Use Router OVO in external_net_db

Router OVO is created in patch [1].
This patch uses Router object in external_net_db

[1] https://review.openstack.org/#/c/516961/

Co-Authored-By: Cao Xuan Hoang <hoangcx@vn.fujitsu.com>
Change-Id: Id3aaf473737ed70ab4e16a122c82457cc5e1f307
This commit is contained in:
Van Hung Pham 2017-11-21 17:42:30 +07:00 committed by Ihar Hrachyshka
parent 88aebd4a29
commit 2aff1f165e
3 changed files with 95 additions and 18 deletions

View File

@ -30,11 +30,11 @@ from neutron._i18n import _
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import _utils as db_utils
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
from neutron.extensions import rbac as rbac_ext
from neutron.objects import network as net_obj
from neutron.objects import router as l3_obj
DEVICE_OWNER_ROUTER_GW = constants.DEVICE_OWNER_ROUTER_GW
@ -194,21 +194,22 @@ class External_net_db_mixin(object):
if (object_type != 'network' or
policy['action'] != 'access_as_external'):
return
new_tenant = None
new_project = None
if event == events.BEFORE_UPDATE:
new_tenant = kwargs['policy_update']['target_tenant']
if new_tenant == policy['target_tenant']:
new_project = kwargs['policy_update']['target_tenant']
if new_project == policy['target_tenant']:
# nothing to validate if the tenant didn't change
return
ports = context.session.query(models_v2.Port.id).filter_by(
gw_ports = context.session.query(models_v2.Port.id).filter_by(
device_owner=DEVICE_OWNER_ROUTER_GW,
network_id=policy['object_id'])
router = context.session.query(l3_models.Router).filter(
l3_models.Router.gw_port_id.in_(ports))
gw_ports = [gw_port[0] for gw_port in gw_ports]
rbac = rbac_db.NetworkRBAC
if policy['target_tenant'] != '*':
router = router.filter(
l3_models.Router.tenant_id == policy['target_tenant'])
filters = {
'gw_port_id': gw_ports,
'project_id': policy['target_tenant']
}
# if there is a wildcard entry we can safely proceed without the
# router lookup because they will have access either way
if context.session.query(rbac_db.NetworkRBAC).filter(
@ -216,6 +217,7 @@ class External_net_db_mixin(object):
rbac.action == 'access_as_external',
rbac.target_tenant == '*').count():
return
router_exist = l3_obj.Router.objects_exist(context, **filters)
else:
# deleting the wildcard is okay as long as the tenants with
# attached routers have their own entries and the network is
@ -226,19 +228,19 @@ class External_net_db_mixin(object):
"everyone.")
raise rbac_ext.RbacPolicyInUse(object_id=policy['object_id'],
details=msg)
tenants_with_entries = (
projects_with_entries = (
context.session.query(rbac.target_tenant).
filter(rbac.object_id == policy['object_id'],
rbac.action == 'access_as_external',
rbac.target_tenant != '*'))
router = router.filter(
~l3_models.Router.tenant_id.in_(tenants_with_entries))
if new_tenant:
# if this is an update we also need to ignore any router
# interfaces that belong to the new target.
router = router.filter(
l3_models.Router.tenant_id != new_tenant)
if router.count():
projects_with_entries = [projects_with_entry[0]
for projects_with_entry
in projects_with_entries]
if new_project:
projects_with_entries.append(new_project)
router_exist = l3_obj.Router.check_routers_not_owned_by_projects(
context, gw_ports, projects_with_entries)
if router_exist:
msg = _("There are routers attached to this network that "
"depend on this policy for access.")
raise rbac_ext.RbacPolicyInUse(object_id=policy['object_id'],

View File

@ -210,6 +210,23 @@ class Router(base.NeutronDbObject):
synthetic_fields = ['extra_attributes']
fields_no_update = ['project_id']
@classmethod
def check_routers_not_owned_by_projects(cls, context, gw_ports, projects):
"""This method is to check whether routers that aren't owned by
existing projects or not
"""
# TODO(hungpv) We may want to implement NOT semantic in get_object(s)
query = context.session.query(l3.Router).filter(
l3.Router.gw_port_id.in_(gw_ports))
query = query.filter(
~l3.Router.project_id.in_(projects))
return bool(query.count())
@base.NeutronObjectRegistry.register
class FloatingIP(base.NeutronDbObject):

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from neutron.objects import router
from neutron.tests.unit.objects import test_base as obj_test_base
@ -66,6 +67,63 @@ class RouterDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
{'gw_port_id': lambda: self._create_test_port_id(),
'flavor_id': lambda: self._create_test_flavor_id()})
def _create_router(self, router_id, gw_port_id, project_id):
r = router.Router(self.context,
id=router_id,
gw_port_id=gw_port_id,
project_id=project_id)
r.create()
def test_check_routers_not_owned_by_projects(self):
for obj in self.obj_fields:
self._create_router(router_id=obj['id'],
gw_port_id=obj['gw_port_id'],
project_id=obj['project_id'])
obj = self.obj_fields[0]
gw_port = obj['gw_port_id']
project = obj['project_id']
new_project = project
gw_port_no_match = uuidutils.generate_uuid()
project_no_match = uuidutils.generate_uuid()
new_project_no_match = uuidutils.generate_uuid()
# Check router match with gw_port BUT no projects
router_exist = router.Router.check_routers_not_owned_by_projects(
self.context,
[gw_port],
[project_no_match, new_project_no_match])
self.assertTrue(router_exist)
# Check router doesn't match with gw_port
router_exist = router.Router.check_routers_not_owned_by_projects(
self.context,
[gw_port_no_match],
[project])
self.assertFalse(router_exist)
# Check router match with gw_port AND project
router_exist = router.Router.check_routers_not_owned_by_projects(
self.context,
[gw_port],
[project, new_project_no_match])
self.assertFalse(router_exist)
# Check router match with gw_port AND new project
router_exist = router.Router.check_routers_not_owned_by_projects(
self.context,
[gw_port],
[project_no_match, new_project])
self.assertFalse(router_exist)
# Check router match with gw_port AND project AND new project
router_exist = router.Router.check_routers_not_owned_by_projects(
self.context,
[gw_port],
[project, new_project])
self.assertFalse(router_exist)
class RouterPortIfaceObjectTestCase(obj_test_base.BaseObjectIfaceTestCase):