share_networks: enable project_only API only

At the moment, the share_network database API which the web
API layer interacts with directly does not have any checking
for project_id which means that a user has the ability to run
operations against any other share_network if they have the ID.

This patch implements the usage of project_only in the database
query which ensures that administrators still have the behaviour
of getting any share network they want, but users can only pull
up those which are part of their context/authenticated project.

This patch also adjusts a few other tests due to the fact that
the existing tests would run a lot of inserts with a different
project_id than the context, which is not allowed in this new
API behaviour.  Therefore, the instances that involved projects
different than the context were converted to elevated ones.

There was also an instance where they were being created with a
project_id that did not match the fake context, therefore the
context was adjusted accordingly as well.

Closes-Bug: #1861485
Change-Id: Id67a939a475c4ac06d546b7e095bd10f1a6d2619
(cherry picked from commit 947315f0903c823b0fdd9d99c60078814587272c)
(cherry picked from commit 496e6e1d2a074ab85f434fe2a88a6c0159696419)
(cherry picked from commit 039ab2b020e4ca13fa723b6cb931f921944894ee)
(cherry picked from commit 496bb1473ea1ad8143bfb65d1727166de543affc)
This commit is contained in:
Mohammed Naser 2020-01-31 16:13:24 +01:00
parent f4bc383de1
commit c3b92b030a
3 changed files with 47 additions and 8 deletions

View File

@ -3275,7 +3275,8 @@ def _security_service_get_query(context, session=None):
def _network_get_query(context, session=None):
if session is None:
session = get_session()
return (model_query(context, models.ShareNetwork, session=session).
return (model_query(context, models.ShareNetwork, session=session,
project_only=True).
options(joinedload('share_instances'),
joinedload('security_services'),
joinedload('share_servers')))

View File

@ -1915,7 +1915,7 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
share_nw_dict2['project_id'] = 'fake project 2'
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
result2 = db_api.share_network_create(self.fake_context.elevated(),
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
@ -1948,6 +1948,33 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
self.assertEqual(0, len(result['share_instances']))
self.assertEqual(0, len(result['security_services']))
def _create_share_network_for_project(self, project_id):
ctx = context.RequestContext(user_id='fake user',
project_id=project_id,
is_admin=False)
share_data = self.share_nw_dict.copy()
share_data['project_id'] = project_id
db_api.share_network_create(ctx, share_data)
return share_data
def test_get_other_tenant_as_admin(self):
expected = self._create_share_network_for_project('fake project 2')
result = db_api.share_network_get(self.fake_context.elevated(),
self.share_nw_dict['id'])
self._check_fields(expected=expected, actual=result)
self.assertEqual(0, len(result['share_instances']))
self.assertEqual(0, len(result['security_services']))
def test_get_other_tenant(self):
self._create_share_network_for_project('fake project 2')
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
self.share_nw_dict['id'])
@ddt.data([{'id': 'fake share id1'}],
[{'id': 'fake share id1'}, {'id': 'fake share id2'}],)
def test_get_with_shares(self, shares):
@ -2043,25 +2070,30 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
share_network_dict.update({'id': fake_id,
'neutron_subnet_id': fake_id})
share_networks.append(share_network_dict)
db_api.share_network_create(self.fake_context, share_network_dict)
db_api.share_network_create(self.fake_context.elevated(),
share_network_dict)
index += 1
result = db_api.share_network_get_all(self.fake_context)
result = db_api.share_network_get_all(self.fake_context.elevated())
self.assertEqual(len(share_networks), len(result))
for index, net in enumerate(share_networks):
self._check_fields(expected=net, actual=result[index])
def test_get_all_by_project(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake share nw id2'
share_nw_dict2['project_id'] = 'fake project 2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
new_context = context.RequestContext(user_id='fake user 2',
project_id='fake project 2',
is_admin=False)
db_api.share_network_create(new_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
self.fake_context.elevated(),
share_nw_dict2['project_id'])
self.assertEqual(1, len(result))
@ -2218,7 +2250,6 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
self.assertEqual(0, len(result['share_instances']))
@ddt.ddt
class SecurityServiceDatabaseAPITestCase(BaseDatabaseAPITestCase):
def __init__(self, *args, **kwargs):

View File

@ -0,0 +1,7 @@
---
security:
- |
CVE-2020-9543: An issue with share network retrieval has been addressed
in the API by scoping unprivileged access to project only. Please see
`launchpad bug #1861485 <https://bugs.launchpad
.net/manila/+bug/1861485>`_ for more details.