Merge "Make neutron context available in neutron-lib"
This commit is contained in:
commit
de12c6753b
@ -12,7 +12,7 @@
|
|||||||
|
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
|
|
||||||
from neutron_lib import _context as context
|
from neutron_lib import context
|
||||||
from neutron_lib.db import model_base
|
from neutron_lib.db import model_base
|
||||||
|
|
||||||
from neutron_lib.tests.unit.db import _base as db_base
|
from neutron_lib.tests.unit.db import _base as db_base
|
||||||
|
@ -14,7 +14,7 @@ import mock
|
|||||||
from oslo_context import context as oslo_context
|
from oslo_context import context as oslo_context
|
||||||
from testtools import matchers
|
from testtools import matchers
|
||||||
|
|
||||||
from neutron_lib import _context
|
from neutron_lib import context
|
||||||
from neutron_lib.tests import _base
|
from neutron_lib.tests import _base
|
||||||
|
|
||||||
|
|
||||||
@ -27,7 +27,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.db_api_session = self._db_api_session_patcher.start()
|
self.db_api_session = self._db_api_session_patcher.start()
|
||||||
|
|
||||||
def test_neutron_context_create(self):
|
def test_neutron_context_create(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
self.assertEqual('user_id', ctx.user_id)
|
self.assertEqual('user_id', ctx.user_id)
|
||||||
self.assertEqual('tenant_id', ctx.project_id)
|
self.assertEqual('tenant_id', ctx.project_id)
|
||||||
self.assertEqual('tenant_id', ctx.tenant_id)
|
self.assertEqual('tenant_id', ctx.tenant_id)
|
||||||
@ -42,7 +42,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIsNone(ctx.auth_token)
|
self.assertIsNone(ctx.auth_token)
|
||||||
|
|
||||||
def test_neutron_context_getter_setter(self):
|
def test_neutron_context_getter_setter(self):
|
||||||
ctx = _context.Context('Anakin', 'Skywalker')
|
ctx = context.Context('Anakin', 'Skywalker')
|
||||||
self.assertEqual('Anakin', ctx.user_id)
|
self.assertEqual('Anakin', ctx.user_id)
|
||||||
self.assertEqual('Skywalker', ctx.tenant_id)
|
self.assertEqual('Skywalker', ctx.tenant_id)
|
||||||
ctx.user_id = 'Darth'
|
ctx.user_id = 'Darth'
|
||||||
@ -51,9 +51,8 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertEqual('Vader', ctx.tenant_id)
|
self.assertEqual('Vader', ctx.tenant_id)
|
||||||
|
|
||||||
def test_neutron_context_create_with_name(self):
|
def test_neutron_context_create_with_name(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'tenant_id',
|
||||||
tenant_name='tenant_name',
|
tenant_name='tenant_name', user_name='user_name')
|
||||||
user_name='user_name')
|
|
||||||
# Check name is set
|
# Check name is set
|
||||||
self.assertEqual('user_name', ctx.user_name)
|
self.assertEqual('user_name', ctx.user_name)
|
||||||
self.assertEqual('tenant_name', ctx.tenant_name)
|
self.assertEqual('tenant_name', ctx.tenant_name)
|
||||||
@ -62,32 +61,32 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertEqual('tenant_id', ctx.tenant)
|
self.assertEqual('tenant_id', ctx.tenant)
|
||||||
|
|
||||||
def test_neutron_context_create_with_request_id(self):
|
def test_neutron_context_create_with_request_id(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
|
ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
|
||||||
self.assertEqual('req_id_xxx', ctx.request_id)
|
self.assertEqual('req_id_xxx', ctx.request_id)
|
||||||
|
|
||||||
def test_neutron_context_create_with_timestamp(self):
|
def test_neutron_context_create_with_timestamp(self):
|
||||||
now = "Right Now!"
|
now = "Right Now!"
|
||||||
ctx = _context.Context('user_id', 'tenant_id', timestamp=now)
|
ctx = context.Context('user_id', 'tenant_id', timestamp=now)
|
||||||
self.assertEqual(now, ctx.timestamp)
|
self.assertEqual(now, ctx.timestamp)
|
||||||
|
|
||||||
def test_neutron_context_create_is_advsvc(self):
|
def test_neutron_context_create_is_advsvc(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id', is_advsvc=True)
|
ctx = context.Context('user_id', 'tenant_id', is_advsvc=True)
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
self.assertTrue(ctx.is_advsvc)
|
self.assertTrue(ctx.is_advsvc)
|
||||||
|
|
||||||
def test_neutron_context_create_with_auth_token(self):
|
def test_neutron_context_create_with_auth_token(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'tenant_id',
|
||||||
auth_token='auth_token_xxx')
|
auth_token='auth_token_xxx')
|
||||||
self.assertEqual('auth_token_xxx', ctx.auth_token)
|
self.assertEqual('auth_token_xxx', ctx.auth_token)
|
||||||
|
|
||||||
def test_neutron_context_from_dict(self):
|
def test_neutron_context_from_dict(self):
|
||||||
owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'}
|
owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'}
|
||||||
ctx = _context.Context.from_dict(owner)
|
ctx = context.Context.from_dict(owner)
|
||||||
self.assertEqual(owner['user_id'], ctx.user_id)
|
self.assertEqual(owner['user_id'], ctx.user_id)
|
||||||
self.assertEqual(owner['tenant_id'], ctx.tenant_id)
|
self.assertEqual(owner['tenant_id'], ctx.tenant_id)
|
||||||
|
|
||||||
def test_neutron_context_to_dict(self):
|
def test_neutron_context_to_dict(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('user_id', ctx_dict['user_id'])
|
self.assertEqual('user_id', ctx_dict['user_id'])
|
||||||
self.assertEqual('tenant_id', ctx_dict['project_id'])
|
self.assertEqual('tenant_id', ctx_dict['project_id'])
|
||||||
@ -100,23 +99,23 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIsNone(ctx_dict['auth_token'])
|
self.assertIsNone(ctx_dict['auth_token'])
|
||||||
|
|
||||||
def test_neutron_context_to_dict_with_name(self):
|
def test_neutron_context_to_dict_with_name(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'tenant_id',
|
||||||
tenant_name='tenant_name',
|
tenant_name='tenant_name',
|
||||||
user_name='user_name')
|
user_name='user_name')
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('user_name', ctx_dict['user_name'])
|
self.assertEqual('user_name', ctx_dict['user_name'])
|
||||||
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
|
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
|
||||||
self.assertEqual('tenant_name', ctx_dict['project_name'])
|
self.assertEqual('tenant_name', ctx_dict['project_name'])
|
||||||
|
|
||||||
def test_neutron_context_to_dict_with_auth_token(self):
|
def test_neutron_context_to_dict_with_auth_token(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'tenant_id',
|
||||||
auth_token='auth_token_xxx')
|
auth_token='auth_token_xxx')
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
|
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
|
||||||
|
|
||||||
def test_neutron_context_admin_to_dict(self):
|
def test_neutron_context_admin_to_dict(self):
|
||||||
self.db_api_session.return_value = 'fakesession'
|
self.db_api_session.return_value = 'fakesession'
|
||||||
ctx = _context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertIsNone(ctx_dict['user_id'])
|
self.assertIsNone(ctx_dict['user_id'])
|
||||||
self.assertIsNone(ctx_dict['tenant_id'])
|
self.assertIsNone(ctx_dict['tenant_id'])
|
||||||
@ -126,7 +125,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertNotIn('session', ctx_dict)
|
self.assertNotIn('session', ctx_dict)
|
||||||
|
|
||||||
def test_neutron_context_admin_without_session_to_dict(self):
|
def test_neutron_context_admin_without_session_to_dict(self):
|
||||||
ctx = _context.get_admin_context_without_session()
|
ctx = context.get_admin_context_without_session()
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertIsNone(ctx_dict['user_id'])
|
self.assertIsNone(ctx_dict['user_id'])
|
||||||
self.assertIsNone(ctx_dict['tenant_id'])
|
self.assertIsNone(ctx_dict['tenant_id'])
|
||||||
@ -134,7 +133,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertFalse(hasattr(ctx, 'session'))
|
self.assertFalse(hasattr(ctx, 'session'))
|
||||||
|
|
||||||
def test_neutron_context_elevated_retains_request_id(self):
|
def test_neutron_context_elevated_retains_request_id(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
req_id_before = ctx.request_id
|
req_id_before = ctx.request_id
|
||||||
|
|
||||||
@ -143,7 +142,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertEqual(req_id_before, elevated_ctx.request_id)
|
self.assertEqual(req_id_before, elevated_ctx.request_id)
|
||||||
|
|
||||||
def test_neutron_context_elevated_idempotent(self):
|
def test_neutron_context_elevated_idempotent(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
elevated_ctx = ctx.elevated()
|
elevated_ctx = ctx.elevated()
|
||||||
self.assertTrue(elevated_ctx.is_admin)
|
self.assertTrue(elevated_ctx.is_admin)
|
||||||
@ -151,28 +150,28 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertTrue(elevated2_ctx.is_admin)
|
self.assertTrue(elevated2_ctx.is_admin)
|
||||||
|
|
||||||
def test_neutron_context_overwrite(self):
|
def test_neutron_context_overwrite(self):
|
||||||
ctx1 = _context.Context('user_id', 'tenant_id')
|
ctx1 = context.Context('user_id', 'tenant_id')
|
||||||
self.assertEqual(ctx1.request_id,
|
self.assertEqual(ctx1.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
# If overwrite is not specified, request_id should be updated.
|
# If overwrite is not specified, request_id should be updated.
|
||||||
ctx2 = _context.Context('user_id', 'tenant_id')
|
ctx2 = context.Context('user_id', 'tenant_id')
|
||||||
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
|
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
|
||||||
self.assertEqual(ctx2.request_id,
|
self.assertEqual(ctx2.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
# If overwrite is specified, request_id should be kept.
|
# If overwrite is specified, request_id should be kept.
|
||||||
ctx3 = _context.Context('user_id', 'tenant_id', overwrite=False)
|
ctx3 = context.Context('user_id', 'tenant_id', overwrite=False)
|
||||||
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
|
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
|
||||||
self.assertEqual(ctx2.request_id,
|
self.assertEqual(ctx2.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
def test_neutron_context_get_admin_context_not_update_local_store(self):
|
def test_neutron_context_get_admin_context_not_update_local_store(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
req_id_before = oslo_context.get_current().request_id
|
req_id_before = oslo_context.get_current().request_id
|
||||||
self.assertEqual(ctx.request_id, req_id_before)
|
self.assertEqual(ctx.request_id, req_id_before)
|
||||||
|
|
||||||
ctx_admin = _context.get_admin_context()
|
ctx_admin = context.get_admin_context()
|
||||||
self.assertEqual(req_id_before, oslo_context.get_current().request_id)
|
self.assertEqual(req_id_before, oslo_context.get_current().request_id)
|
||||||
self.assertNotEqual(req_id_before, ctx_admin.request_id)
|
self.assertNotEqual(req_id_before, ctx_admin.request_id)
|
||||||
|
|
||||||
@ -194,21 +193,21 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
'project_id': 'tenant_id',
|
'project_id': 'tenant_id',
|
||||||
'project_name': 'tenant_name',
|
'project_name': 'tenant_name',
|
||||||
}
|
}
|
||||||
ctx = _context.Context(**values)
|
ctx = context.Context(**values)
|
||||||
# apply dict() to get a real dictionary, needed for newer oslo.context
|
# apply dict() to get a real dictionary, needed for newer oslo.context
|
||||||
# that returns _DeprecatedPolicyValues object instead
|
# that returns _DeprecatedPolicyValues object instead
|
||||||
policy_values = dict(ctx.to_policy_values())
|
policy_values = dict(ctx.to_policy_values())
|
||||||
self.assertDictSupersetOf(values, policy_values)
|
self.assertDictSupersetOf(values, policy_values)
|
||||||
self.assertDictSupersetOf(additional_values, policy_values)
|
self.assertDictSupersetOf(additional_values, policy_values)
|
||||||
|
|
||||||
@mock.patch.object(_context.ContextBaseWithSession, 'session')
|
@mock.patch.object(context.ContextBaseWithSession, 'session')
|
||||||
def test_superclass_session(self, mocked_session):
|
def test_superclass_session(self, mocked_session):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
# make sure context uses parent class session that is mocked
|
# make sure context uses parent class session that is mocked
|
||||||
self.assertEqual(mocked_session, ctx.session)
|
self.assertEqual(mocked_session, ctx.session)
|
||||||
|
|
||||||
def test_session_cached(self):
|
def test_session_cached(self):
|
||||||
ctx = _context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'tenant_id')
|
||||||
session1 = ctx.session
|
session1 = ctx.session
|
||||||
session2 = ctx.session
|
session2 = ctx.session
|
||||||
self.assertIs(session1, session2)
|
self.assertIs(session1, session2)
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron_lib import _context
|
from neutron_lib import context
|
||||||
from neutron_lib import policy
|
from neutron_lib import policy
|
||||||
|
|
||||||
from neutron_lib.tests import _base as base
|
from neutron_lib.tests import _base as base
|
||||||
@ -35,34 +35,34 @@ class TestPolicyEnforcer(base.BaseTestCase):
|
|||||||
self.assertIsNotNone(policy._ENFORCER)
|
self.assertIsNotNone(policy._ENFORCER)
|
||||||
|
|
||||||
def test_check_user_is_not_admin(self):
|
def test_check_user_is_not_admin(self):
|
||||||
ctx = _context.Context('me', 'my_project')
|
ctx = context.Context('me', 'my_project')
|
||||||
self.assertFalse(policy.check_is_admin(ctx))
|
self.assertFalse(policy.check_is_admin(ctx))
|
||||||
|
|
||||||
def test_check_user_elevated_is_admin(self):
|
def test_check_user_elevated_is_admin(self):
|
||||||
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
|
ctx = context.Context('me', 'my_project', roles=['user']).elevated()
|
||||||
self.assertTrue(policy.check_is_admin(ctx))
|
self.assertTrue(policy.check_is_admin(ctx))
|
||||||
|
|
||||||
def test_check_is_admin_no_roles_no_admin(self):
|
def test_check_is_admin_no_roles_no_admin(self):
|
||||||
policy.init(policy_file='no_policy.json')
|
policy.init(policy_file='no_policy.json')
|
||||||
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
|
ctx = context.Context('me', 'my_project', roles=['user']).elevated()
|
||||||
# With no admin role, elevated() should not work.
|
# With no admin role, elevated() should not work.
|
||||||
self.assertFalse(policy.check_is_admin(ctx))
|
self.assertFalse(policy.check_is_admin(ctx))
|
||||||
|
|
||||||
def test_check_is_advsvc_role(self):
|
def test_check_is_advsvc_role(self):
|
||||||
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
|
ctx = context.Context('me', 'my_project', roles=['advsvc'])
|
||||||
self.assertTrue(policy.check_is_advsvc(ctx))
|
self.assertTrue(policy.check_is_advsvc(ctx))
|
||||||
|
|
||||||
def test_check_is_not_advsvc_user(self):
|
def test_check_is_not_advsvc_user(self):
|
||||||
ctx = _context.Context('me', 'my_project', roles=['user'])
|
ctx = context.Context('me', 'my_project', roles=['user'])
|
||||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
self.assertFalse(policy.check_is_advsvc(ctx))
|
||||||
|
|
||||||
def test_check_is_not_advsvc_admin(self):
|
def test_check_is_not_advsvc_admin(self):
|
||||||
ctx = _context.Context('me', 'my_project').elevated()
|
ctx = context.Context('me', 'my_project').elevated()
|
||||||
self.assertTrue(policy.check_is_admin(ctx))
|
self.assertTrue(policy.check_is_admin(ctx))
|
||||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
self.assertFalse(policy.check_is_advsvc(ctx))
|
||||||
|
|
||||||
def test_check_is_advsvc_no_roles_no_advsvc(self):
|
def test_check_is_advsvc_no_roles_no_advsvc(self):
|
||||||
policy.init(policy_file='no_policy.json')
|
policy.init(policy_file='no_policy.json')
|
||||||
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
|
ctx = context.Context('me', 'my_project', roles=['advsvc'])
|
||||||
# No advsvc role in the policy file, so cannot assume the role.
|
# No advsvc role in the policy file, so cannot assume the role.
|
||||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
self.assertFalse(policy.check_is_advsvc(ctx))
|
||||||
|
12
releasenotes/notes/context-public-6df198b77027c224.yaml
Normal file
12
releasenotes/notes/context-public-6df198b77027c224.yaml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
The ``context`` module has been made public. For example:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from neutron_lib import context
|
||||||
|
|
||||||
|
ctx = context.get_admin_context()
|
||||||
|
|
||||||
|
For more examples, see: https://review.openstack.org/#/c/388157/
|
Loading…
Reference in New Issue
Block a user