share_networks: enable project_only API only

At the moment, the share_network database API which the web
API layer interacts with directly does not have any checking
for project_id which means that a user has the ability to run
operations against any other share_network if they have the ID.

This patch implements the usage of project_only in the database
query which ensures that administrators still have the behaviour
of getting any share network they want, but users can only pull
up those which are part of their context/authenticated project.

This patch also adjusts a few other tests due to the fact that
the existing tests would run a lot of inserts with a different
project_id than the context, which is not allowed in this new
API behaviour.  Therefore, the instances that involved projects
different than the context were converted to elevated ones.

There was also an instance where they were being created with a
project_id that did not match the fake context, therefore the
context was adjusted accordingly as well.

Closes-Bug: #1861485
Change-Id: Id67a939a475c4ac06d546b7e095bd10f1a6d2619
(cherry picked from commit 947315f090)
(cherry picked from commit 496e6e1d2a)
(cherry picked from commit 039ab2b020)
(cherry picked from commit 496bb1473e)
(cherry picked from commit c3b92b030a)
Signed-off-by: Goutham Pacha Ravi <gouthampravi@gmail.com>
This commit is contained in:
Mohammed Naser 2020-01-31 16:13:24 +01:00 committed by Goutham Pacha Ravi
parent b6d0933e96
commit 87799bbb13
3 changed files with 47 additions and 7 deletions

View File

@ -3197,7 +3197,8 @@ def _security_service_get_query(context, session=None):
def _network_get_query(context, session=None):
if session is None:
session = get_session()
return (model_query(context, models.ShareNetwork, session=session).
return (model_query(context, models.ShareNetwork, session=session,
project_only=True).
options(joinedload('share_instances'),
joinedload('security_services'),
joinedload('share_servers')))

View File

@ -1782,7 +1782,7 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
share_nw_dict2['project_id'] = 'fake project 2'
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
result2 = db_api.share_network_create(self.fake_context.elevated(),
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
@ -1815,6 +1815,33 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
self.assertEqual(0, len(result['share_instances']))
self.assertEqual(0, len(result['security_services']))
def _create_share_network_for_project(self, project_id):
ctx = context.RequestContext(user_id='fake user',
project_id=project_id,
is_admin=False)
share_data = self.share_nw_dict.copy()
share_data['project_id'] = project_id
db_api.share_network_create(ctx, share_data)
return share_data
def test_get_other_tenant_as_admin(self):
expected = self._create_share_network_for_project('fake project 2')
result = db_api.share_network_get(self.fake_context.elevated(),
self.share_nw_dict['id'])
self._check_fields(expected=expected, actual=result)
self.assertEqual(0, len(result['share_instances']))
self.assertEqual(0, len(result['security_services']))
def test_get_other_tenant(self):
self._create_share_network_for_project('fake project 2')
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
self.share_nw_dict['id'])
@ddt.data([{'id': 'fake share id1'}],
[{'id': 'fake share id1'}, {'id': 'fake share id2'}],)
def test_get_with_shares(self, shares):
@ -1910,25 +1937,30 @@ class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
share_network_dict.update({'id': fake_id,
'neutron_subnet_id': fake_id})
share_networks.append(share_network_dict)
db_api.share_network_create(self.fake_context, share_network_dict)
db_api.share_network_create(self.fake_context.elevated(),
share_network_dict)
index += 1
result = db_api.share_network_get_all(self.fake_context)
result = db_api.share_network_get_all(self.fake_context.elevated())
self.assertEqual(len(share_networks), len(result))
for index, net in enumerate(share_networks):
self._check_fields(expected=net, actual=result[index])
def test_get_all_by_project(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake share nw id2'
share_nw_dict2['project_id'] = 'fake project 2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
new_context = context.RequestContext(user_id='fake user 2',
project_id='fake project 2',
is_admin=False)
db_api.share_network_create(new_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
self.fake_context.elevated(),
share_nw_dict2['project_id'])
self.assertEqual(1, len(result))

View File

@ -0,0 +1,7 @@
---
security:
- |
CVE-2020-9543: An issue with share network retrieval has been addressed
in the API by scoping unprivileged access to project only. Please see
`launchpad bug #1861485 <https://bugs.launchpad
.net/manila/+bug/1861485>`_ for more details.