diff --git a/neutron_lib/_context.py b/neutron_lib/context.py similarity index 100% rename from neutron_lib/_context.py rename to neutron_lib/context.py diff --git a/neutron_lib/tests/unit/db/test_model_base.py b/neutron_lib/tests/unit/db/test_model_base.py index cf1025f3e..f4039a26e 100644 --- a/neutron_lib/tests/unit/db/test_model_base.py +++ b/neutron_lib/tests/unit/db/test_model_base.py @@ -12,7 +12,7 @@ import sqlalchemy as sa -from neutron_lib import _context as context +from neutron_lib import context from neutron_lib.db import model_base from neutron_lib.tests.unit.db import _base as db_base diff --git a/neutron_lib/tests/unit/test_context.py b/neutron_lib/tests/unit/test_context.py index 922ac5eb8..9f7d00a94 100644 --- a/neutron_lib/tests/unit/test_context.py +++ b/neutron_lib/tests/unit/test_context.py @@ -14,7 +14,7 @@ import mock from oslo_context import context as oslo_context from testtools import matchers -from neutron_lib import _context +from neutron_lib import context from neutron_lib.tests import _base @@ -27,7 +27,7 @@ class TestNeutronContext(_base.BaseTestCase): self.db_api_session = self._db_api_session_patcher.start() def test_neutron_context_create(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') self.assertEqual('user_id', ctx.user_id) self.assertEqual('tenant_id', ctx.project_id) self.assertEqual('tenant_id', ctx.tenant_id) @@ -42,7 +42,7 @@ class TestNeutronContext(_base.BaseTestCase): self.assertIsNone(ctx.auth_token) def test_neutron_context_getter_setter(self): - ctx = _context.Context('Anakin', 'Skywalker') + ctx = context.Context('Anakin', 'Skywalker') self.assertEqual('Anakin', ctx.user_id) self.assertEqual('Skywalker', ctx.tenant_id) ctx.user_id = 'Darth' @@ -51,9 +51,8 @@ class TestNeutronContext(_base.BaseTestCase): self.assertEqual('Vader', ctx.tenant_id) def test_neutron_context_create_with_name(self): - ctx = _context.Context('user_id', 'tenant_id', - tenant_name='tenant_name', - user_name='user_name') + ctx = context.Context('user_id', 'tenant_id', + tenant_name='tenant_name', user_name='user_name') # Check name is set self.assertEqual('user_name', ctx.user_name) self.assertEqual('tenant_name', ctx.tenant_name) @@ -62,32 +61,32 @@ class TestNeutronContext(_base.BaseTestCase): self.assertEqual('tenant_id', ctx.tenant) def test_neutron_context_create_with_request_id(self): - ctx = _context.Context('user_id', 'tenant_id', request_id='req_id_xxx') + ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx') self.assertEqual('req_id_xxx', ctx.request_id) def test_neutron_context_create_with_timestamp(self): now = "Right Now!" - ctx = _context.Context('user_id', 'tenant_id', timestamp=now) + ctx = context.Context('user_id', 'tenant_id', timestamp=now) self.assertEqual(now, ctx.timestamp) def test_neutron_context_create_is_advsvc(self): - ctx = _context.Context('user_id', 'tenant_id', is_advsvc=True) + ctx = context.Context('user_id', 'tenant_id', is_advsvc=True) self.assertFalse(ctx.is_admin) self.assertTrue(ctx.is_advsvc) def test_neutron_context_create_with_auth_token(self): - ctx = _context.Context('user_id', 'tenant_id', - auth_token='auth_token_xxx') + ctx = context.Context('user_id', 'tenant_id', + auth_token='auth_token_xxx') self.assertEqual('auth_token_xxx', ctx.auth_token) def test_neutron_context_from_dict(self): owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'} - ctx = _context.Context.from_dict(owner) + ctx = context.Context.from_dict(owner) self.assertEqual(owner['user_id'], ctx.user_id) self.assertEqual(owner['tenant_id'], ctx.tenant_id) def test_neutron_context_to_dict(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') ctx_dict = ctx.to_dict() self.assertEqual('user_id', ctx_dict['user_id']) self.assertEqual('tenant_id', ctx_dict['project_id']) @@ -100,23 +99,23 @@ class TestNeutronContext(_base.BaseTestCase): self.assertIsNone(ctx_dict['auth_token']) def test_neutron_context_to_dict_with_name(self): - ctx = _context.Context('user_id', 'tenant_id', - tenant_name='tenant_name', - user_name='user_name') + ctx = context.Context('user_id', 'tenant_id', + tenant_name='tenant_name', + user_name='user_name') ctx_dict = ctx.to_dict() self.assertEqual('user_name', ctx_dict['user_name']) self.assertEqual('tenant_name', ctx_dict['tenant_name']) self.assertEqual('tenant_name', ctx_dict['project_name']) def test_neutron_context_to_dict_with_auth_token(self): - ctx = _context.Context('user_id', 'tenant_id', - auth_token='auth_token_xxx') + ctx = context.Context('user_id', 'tenant_id', + auth_token='auth_token_xxx') ctx_dict = ctx.to_dict() self.assertEqual('auth_token_xxx', ctx_dict['auth_token']) def test_neutron_context_admin_to_dict(self): self.db_api_session.return_value = 'fakesession' - ctx = _context.get_admin_context() + ctx = context.get_admin_context() ctx_dict = ctx.to_dict() self.assertIsNone(ctx_dict['user_id']) self.assertIsNone(ctx_dict['tenant_id']) @@ -126,7 +125,7 @@ class TestNeutronContext(_base.BaseTestCase): self.assertNotIn('session', ctx_dict) def test_neutron_context_admin_without_session_to_dict(self): - ctx = _context.get_admin_context_without_session() + ctx = context.get_admin_context_without_session() ctx_dict = ctx.to_dict() self.assertIsNone(ctx_dict['user_id']) self.assertIsNone(ctx_dict['tenant_id']) @@ -134,7 +133,7 @@ class TestNeutronContext(_base.BaseTestCase): self.assertFalse(hasattr(ctx, 'session')) def test_neutron_context_elevated_retains_request_id(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') self.assertFalse(ctx.is_admin) req_id_before = ctx.request_id @@ -143,7 +142,7 @@ class TestNeutronContext(_base.BaseTestCase): self.assertEqual(req_id_before, elevated_ctx.request_id) def test_neutron_context_elevated_idempotent(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') self.assertFalse(ctx.is_admin) elevated_ctx = ctx.elevated() self.assertTrue(elevated_ctx.is_admin) @@ -151,28 +150,28 @@ class TestNeutronContext(_base.BaseTestCase): self.assertTrue(elevated2_ctx.is_admin) def test_neutron_context_overwrite(self): - ctx1 = _context.Context('user_id', 'tenant_id') + ctx1 = context.Context('user_id', 'tenant_id') self.assertEqual(ctx1.request_id, oslo_context.get_current().request_id) # If overwrite is not specified, request_id should be updated. - ctx2 = _context.Context('user_id', 'tenant_id') + ctx2 = context.Context('user_id', 'tenant_id') self.assertNotEqual(ctx2.request_id, ctx1.request_id) self.assertEqual(ctx2.request_id, oslo_context.get_current().request_id) # If overwrite is specified, request_id should be kept. - ctx3 = _context.Context('user_id', 'tenant_id', overwrite=False) + ctx3 = context.Context('user_id', 'tenant_id', overwrite=False) self.assertNotEqual(ctx3.request_id, ctx2.request_id) self.assertEqual(ctx2.request_id, oslo_context.get_current().request_id) def test_neutron_context_get_admin_context_not_update_local_store(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') req_id_before = oslo_context.get_current().request_id self.assertEqual(ctx.request_id, req_id_before) - ctx_admin = _context.get_admin_context() + ctx_admin = context.get_admin_context() self.assertEqual(req_id_before, oslo_context.get_current().request_id) self.assertNotEqual(req_id_before, ctx_admin.request_id) @@ -194,21 +193,21 @@ class TestNeutronContext(_base.BaseTestCase): 'project_id': 'tenant_id', 'project_name': 'tenant_name', } - ctx = _context.Context(**values) + ctx = context.Context(**values) # apply dict() to get a real dictionary, needed for newer oslo.context # that returns _DeprecatedPolicyValues object instead policy_values = dict(ctx.to_policy_values()) self.assertDictSupersetOf(values, policy_values) self.assertDictSupersetOf(additional_values, policy_values) - @mock.patch.object(_context.ContextBaseWithSession, 'session') + @mock.patch.object(context.ContextBaseWithSession, 'session') def test_superclass_session(self, mocked_session): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') # make sure context uses parent class session that is mocked self.assertEqual(mocked_session, ctx.session) def test_session_cached(self): - ctx = _context.Context('user_id', 'tenant_id') + ctx = context.Context('user_id', 'tenant_id') session1 = ctx.session session2 = ctx.session self.assertIs(session1, session2) diff --git a/neutron_lib/tests/unit/test_policy.py b/neutron_lib/tests/unit/test_policy.py index 3694f5262..99e9e6732 100644 --- a/neutron_lib/tests/unit/test_policy.py +++ b/neutron_lib/tests/unit/test_policy.py @@ -13,7 +13,7 @@ import mock -from neutron_lib import _context +from neutron_lib import context from neutron_lib import policy from neutron_lib.tests import _base as base @@ -35,34 +35,34 @@ class TestPolicyEnforcer(base.BaseTestCase): self.assertIsNotNone(policy._ENFORCER) def test_check_user_is_not_admin(self): - ctx = _context.Context('me', 'my_project') + ctx = context.Context('me', 'my_project') self.assertFalse(policy.check_is_admin(ctx)) def test_check_user_elevated_is_admin(self): - ctx = _context.Context('me', 'my_project', roles=['user']).elevated() + ctx = context.Context('me', 'my_project', roles=['user']).elevated() self.assertTrue(policy.check_is_admin(ctx)) def test_check_is_admin_no_roles_no_admin(self): policy.init(policy_file='no_policy.json') - ctx = _context.Context('me', 'my_project', roles=['user']).elevated() + ctx = context.Context('me', 'my_project', roles=['user']).elevated() # With no admin role, elevated() should not work. self.assertFalse(policy.check_is_admin(ctx)) def test_check_is_advsvc_role(self): - ctx = _context.Context('me', 'my_project', roles=['advsvc']) + ctx = context.Context('me', 'my_project', roles=['advsvc']) self.assertTrue(policy.check_is_advsvc(ctx)) def test_check_is_not_advsvc_user(self): - ctx = _context.Context('me', 'my_project', roles=['user']) + ctx = context.Context('me', 'my_project', roles=['user']) self.assertFalse(policy.check_is_advsvc(ctx)) def test_check_is_not_advsvc_admin(self): - ctx = _context.Context('me', 'my_project').elevated() + ctx = context.Context('me', 'my_project').elevated() self.assertTrue(policy.check_is_admin(ctx)) self.assertFalse(policy.check_is_advsvc(ctx)) def test_check_is_advsvc_no_roles_no_advsvc(self): policy.init(policy_file='no_policy.json') - ctx = _context.Context('me', 'my_project', roles=['advsvc']) + ctx = context.Context('me', 'my_project', roles=['advsvc']) # No advsvc role in the policy file, so cannot assume the role. self.assertFalse(policy.check_is_advsvc(ctx)) diff --git a/releasenotes/notes/context-public-6df198b77027c224.yaml b/releasenotes/notes/context-public-6df198b77027c224.yaml new file mode 100644 index 000000000..d13df73c6 --- /dev/null +++ b/releasenotes/notes/context-public-6df198b77027c224.yaml @@ -0,0 +1,12 @@ +--- +features: + - | + The ``context`` module has been made public. For example: + + .. code-block:: python + + from neutron_lib import context + + ctx = context.get_admin_context() + + For more examples, see: https://review.openstack.org/#/c/388157/