Merge "Improve share snapshots list API filtering"
This commit is contained in:
commit
e49da309e2
@ -318,6 +318,52 @@ class SharesActionsTest(base.BaseSharesTest):
|
||||
msg = "expected id lists %s times in share list" % (len(gen))
|
||||
self.assertEqual(len(gen), 1, msg)
|
||||
|
||||
@test.attr(type=["gate", ])
|
||||
def test_list_snapshots_with_detail_use_limit(self):
|
||||
for l, o in [('1', '1'), ('0', '1')]:
|
||||
filters = {'limit': l, 'offset': o, 'share_id': self.share['id']}
|
||||
|
||||
# list snapshots
|
||||
__, snaps = self.shares_client.list_snapshots_with_detail(
|
||||
params=filters)
|
||||
|
||||
# Our snapshot should not be listed
|
||||
self.assertEqual(0, len(snaps))
|
||||
|
||||
# Only our one snapshot should be listed
|
||||
__, snaps = self.shares_client.list_snapshots_with_detail(
|
||||
params={'limit': '1', 'offset': '0', 'share_id': self.share['id']})
|
||||
|
||||
self.assertEqual(1, len(snaps['snapshots']))
|
||||
self.assertEqual(self.snap['id'], snaps['snapshots'][0]['id'])
|
||||
|
||||
@test.attr(type=["gate", ])
|
||||
def test_list_snapshots_with_detail_filter_by_status_and_name(self):
|
||||
filters = {'status': 'available', 'name': self.snap_name}
|
||||
|
||||
# list snapshots
|
||||
__, snaps = self.shares_client.list_snapshots_with_detail(
|
||||
params=filters)
|
||||
|
||||
# verify response
|
||||
self.assertTrue(len(snaps) > 0)
|
||||
for snap in snaps:
|
||||
self.assertEqual(filters['status'], snap['status'])
|
||||
self.assertEqual(filters['name'], snap['name'])
|
||||
|
||||
@test.attr(type=["gate", ])
|
||||
def test_list_snapshots_with_detail_and_asc_sorting(self):
|
||||
filters = {'sort_key': 'share_id', 'sort_dir': 'asc'}
|
||||
|
||||
# list snapshots
|
||||
__, snaps = self.shares_client.list_snapshots_with_detail(
|
||||
params=filters)
|
||||
|
||||
# verify response
|
||||
self.assertTrue(len(snaps) > 0)
|
||||
sorted_list = [snap['share_id'] for snap in snaps]
|
||||
self.assertEqual(sorted_list, sorted(sorted_list))
|
||||
|
||||
|
||||
class SharesRenameTest(base.BaseSharesTest):
|
||||
|
||||
|
@ -130,9 +130,6 @@ class SimpleReadOnlyManilaClientTest(manilaclient.ClientTestBase):
|
||||
def test_manila_snapshot_list_filter_by_status(self):
|
||||
self.manila('snapshot-list', params='--status status')
|
||||
|
||||
def test_manila_snapshot_list_filter_by_share_id(self):
|
||||
self.manila('snapshot-list', params='--share-id share_id')
|
||||
|
||||
def test_manila_credentials(self):
|
||||
self.manila('credentials')
|
||||
|
||||
|
@ -143,17 +143,16 @@ class SharesClient(rest_client.RestClient):
|
||||
resp, body = self.get("snapshots/%s" % snapshot_id)
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_snapshots(self):
|
||||
resp, body = self.get("snapshots")
|
||||
def list_snapshots(self, detailed=False, params=None):
|
||||
"""Get list of share snapshots w/o filters."""
|
||||
uri = 'snapshots/detail' if detailed else 'snapshots'
|
||||
uri += '?%s' % urllib.urlencode(params) if params else ''
|
||||
resp, body = self.get(uri)
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_snapshots_with_detail(self, params=None):
|
||||
"""List the details of all shares."""
|
||||
uri = 'snapshots/detail'
|
||||
if params:
|
||||
uri += '?%s' % urllib.urlencode(params)
|
||||
resp, body = self.get(uri)
|
||||
return resp, self._parse_resp(body)
|
||||
"""Get detailed list of share snapshots w/o filters."""
|
||||
return self.list_snapshots(detailed=True, params=params)
|
||||
|
||||
def delete_snapshot(self, snap_id):
|
||||
return self.delete("snapshots/%s" % snap_id)
|
||||
|
@ -108,16 +108,27 @@ class ShareSnapshotsController(wsgi.Controller):
|
||||
search_opts = {}
|
||||
search_opts.update(req.GET)
|
||||
|
||||
# NOTE(rushiagr): v2 API allows name instead of display_name
|
||||
# Remove keys that are not related to share attrs
|
||||
search_opts.pop('limit', None)
|
||||
search_opts.pop('offset', None)
|
||||
sort_key = search_opts.pop('sort_key', 'created_at')
|
||||
sort_dir = search_opts.pop('sort_dir', 'desc')
|
||||
|
||||
# NOTE(vponomaryov): Manila stores in DB key 'display_name', but
|
||||
# allows to use both keys 'name' and 'display_name'. It is leftover
|
||||
# from Cinder v1 and v2 APIs.
|
||||
if 'name' in search_opts:
|
||||
search_opts['display_name'] = search_opts['name']
|
||||
del search_opts['name']
|
||||
search_opts['display_name'] = search_opts.pop('name')
|
||||
|
||||
common.remove_invalid_options(context, search_opts,
|
||||
self._get_snapshots_search_options())
|
||||
|
||||
snapshots = self.share_api.get_all_snapshots(context,
|
||||
search_opts=search_opts)
|
||||
snapshots = self.share_api.get_all_snapshots(
|
||||
context,
|
||||
search_opts=search_opts,
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
)
|
||||
limited_list = common.limited(snapshots, req)
|
||||
if is_detail:
|
||||
snapshots = self._view_builder.detail_list(req, limited_list)
|
||||
@ -127,7 +138,7 @@ class ShareSnapshotsController(wsgi.Controller):
|
||||
|
||||
def _get_snapshots_search_options(self):
|
||||
"""Return share search options allowed by non-admin."""
|
||||
return ('name', 'status', 'share_id')
|
||||
return ('display_name', 'name', 'status', 'share_id', 'size')
|
||||
|
||||
@wsgi.serializers(xml=SnapshotTemplate)
|
||||
def update(self, req, id, body):
|
||||
|
@ -390,19 +390,30 @@ def share_snapshot_get(context, snapshot_id):
|
||||
return IMPL.share_snapshot_get(context, snapshot_id)
|
||||
|
||||
|
||||
def share_snapshot_get_all(context):
|
||||
def share_snapshot_get_all(context, filters=None, sort_key=None,
|
||||
sort_dir=None):
|
||||
"""Get all snapshots."""
|
||||
return IMPL.share_snapshot_get_all(context)
|
||||
return IMPL.share_snapshot_get_all(
|
||||
context, filters=filters, sort_key=sort_key, sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
def share_snapshot_get_all_by_project(context, project_id):
|
||||
def share_snapshot_get_all_by_project(context, project_id, filters=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
"""Get all snapshots belonging to a project."""
|
||||
return IMPL.share_snapshot_get_all_by_project(context, project_id)
|
||||
return IMPL.share_snapshot_get_all_by_project(
|
||||
context, project_id, filters=filters, sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
def share_snapshot_get_all_for_share(context, share_id):
|
||||
def share_snapshot_get_all_for_share(context, share_id, filters=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
"""Get all snapshots for a share."""
|
||||
return IMPL.share_snapshot_get_all_for_share(context, share_id)
|
||||
return IMPL.share_snapshot_get_all_for_share(
|
||||
context, share_id, filters=filters, sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
def share_snapshot_update(context, snapshot_id, values):
|
||||
|
@ -1422,29 +1422,84 @@ def share_snapshot_get(context, snapshot_id, session=None):
|
||||
return result
|
||||
|
||||
|
||||
def _share_snapshot_get_all_with_filters(context, project_id=None,
|
||||
share_id=None, filters=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
# Init data
|
||||
sort_key = sort_key or 'share_id'
|
||||
sort_dir = sort_dir or 'desc'
|
||||
filters = filters or {}
|
||||
query = model_query(context, models.ShareSnapshot)
|
||||
|
||||
if project_id:
|
||||
query = query.filter_by(project_id=project_id)
|
||||
if share_id:
|
||||
query = query.filter_by(share_id=share_id)
|
||||
query = query.options(joinedload('share'))
|
||||
|
||||
# Apply filters
|
||||
if 'usage' in filters:
|
||||
usage_filter_keys = ['any', 'used', 'unused']
|
||||
if filters['usage'] == 'any':
|
||||
pass
|
||||
elif filters['usage'] == 'used':
|
||||
query = query.filter(or_(models.Share.snapshot_id == (
|
||||
models.ShareSnapshot.id)))
|
||||
elif filters['usage'] == 'unused':
|
||||
query = query.filter(or_(models.Share.snapshot_id != (
|
||||
models.ShareSnapshot.id)))
|
||||
else:
|
||||
msg = _("Wrong 'usage' key provided - '%(key)s'. "
|
||||
"Expected keys are '%(ek)s'.") % {
|
||||
'key': filters['usage'],
|
||||
'ek': six.text_type(usage_filter_keys)}
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
# Apply sorting
|
||||
try:
|
||||
attr = getattr(models.ShareSnapshot, sort_key)
|
||||
except AttributeError:
|
||||
msg = _("Wrong sorting key provided - '%s'.") % sort_key
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
if sort_dir.lower() == 'desc':
|
||||
query = query.order_by(attr.desc())
|
||||
elif sort_dir.lower() == 'asc':
|
||||
query = query.order_by(attr.asc())
|
||||
else:
|
||||
msg = _("Wrong sorting data provided: sort key is '%(sort_key)s' "
|
||||
"and sort direction is '%(sort_dir)s'.") % {
|
||||
"sort_key": sort_key, "sort_dir": sort_dir}
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
# Returns list of shares that satisfy filters
|
||||
return query.all()
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def share_snapshot_get_all(context):
|
||||
return model_query(context, models.ShareSnapshot).\
|
||||
options(joinedload('share')).\
|
||||
all()
|
||||
def share_snapshot_get_all(context, filters=None, sort_key=None,
|
||||
sort_dir=None):
|
||||
return _share_snapshot_get_all_with_filters(
|
||||
context, filters=filters, sort_key=sort_key, sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
@require_context
|
||||
def share_snapshot_get_all_by_project(context, project_id):
|
||||
def share_snapshot_get_all_by_project(context, project_id, filters=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
authorize_project_context(context, project_id)
|
||||
return model_query(context, models.ShareSnapshot).\
|
||||
filter_by(project_id=project_id).\
|
||||
options(joinedload('share')).\
|
||||
all()
|
||||
return _share_snapshot_get_all_with_filters(
|
||||
context, project_id=project_id,
|
||||
filters=filters, sort_key=sort_key, sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
@require_context
|
||||
def share_snapshot_get_all_for_share(context, share_id):
|
||||
return model_query(context, models.ShareSnapshot, read_deleted='no',
|
||||
project_only=True).\
|
||||
filter_by(share_id=share_id).\
|
||||
options(joinedload('share')).\
|
||||
all()
|
||||
def share_snapshot_get_all_for_share(context, share_id, filters=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
return _share_snapshot_get_all_with_filters(
|
||||
context, share_id=share_id,
|
||||
filters=filters, sort_key=sort_key, sort_dir=sort_dir,
|
||||
)
|
||||
|
||||
|
||||
@require_context
|
||||
|
@ -406,22 +406,37 @@ class API(base.Base):
|
||||
rv = self.db.share_snapshot_get(context, snapshot_id)
|
||||
return dict(six.iteritems(rv))
|
||||
|
||||
def get_all_snapshots(self, context, search_opts=None):
|
||||
def get_all_snapshots(self, context, search_opts=None,
|
||||
sort_key='share_id', sort_dir='desc'):
|
||||
policy.check_policy(context, 'share', 'get_all_snapshots')
|
||||
|
||||
search_opts = search_opts or {}
|
||||
LOG.debug("Searching for snapshots by: %s", six.text_type(search_opts))
|
||||
|
||||
if (context.is_admin and 'all_tenants' in search_opts):
|
||||
# Need to remove all_tenants to pass the filtering below.
|
||||
del search_opts['all_tenants']
|
||||
snapshots = self.db.share_snapshot_get_all(context)
|
||||
# Read and remove key 'all_tenants' if was provided
|
||||
all_tenants = search_opts.pop('all_tenants', None)
|
||||
|
||||
string_args = {'sort_key': sort_key, 'sort_dir': sort_dir}
|
||||
string_args.update(search_opts)
|
||||
for k, v in string_args.items():
|
||||
if not (isinstance(v, six.string_types) and v):
|
||||
msg = _("Wrong '%(k)s' filter provided: "
|
||||
"'%(v)s'.") % {'k': k, 'v': string_args[k]}
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
if (context.is_admin and all_tenants):
|
||||
snapshots = self.db.share_snapshot_get_all(
|
||||
context, filters=search_opts,
|
||||
sort_key=sort_key, sort_dir=sort_dir)
|
||||
else:
|
||||
snapshots = self.db.share_snapshot_get_all_by_project(
|
||||
context, context.project_id)
|
||||
context, context.project_id, filters=search_opts,
|
||||
sort_key=sort_key, sort_dir=sort_dir)
|
||||
|
||||
# Remove key 'usage' if provided
|
||||
search_opts.pop('usage', None)
|
||||
|
||||
if search_opts:
|
||||
LOG.debug("Searching by: %s", str(search_opts))
|
||||
|
||||
results = []
|
||||
not_found = object()
|
||||
for snapshot in snapshots:
|
||||
|
@ -118,5 +118,6 @@ def stub_snapshot_delete(self, context, *args, **param):
|
||||
pass
|
||||
|
||||
|
||||
def stub_snapshot_get_all_by_project(self, context, search_opts=None):
|
||||
def stub_snapshot_get_all_by_project(self, context, search_opts=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
return [stub_snapshot_get(self, context, 2)]
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import webob
|
||||
|
||||
from manila.api.v1 import share_snapshots
|
||||
@ -173,6 +174,116 @@ class ShareSnapshotApiTest(test.TestCase):
|
||||
}
|
||||
self.assertEqual(res_dict, expected)
|
||||
|
||||
def _snapshot_list_summary_with_search_opts(self, use_admin_context):
|
||||
search_opts = {
|
||||
'name': 'fake_name',
|
||||
'status': 'fake_status',
|
||||
'share_id': 'fake_share_id',
|
||||
'sort_key': 'fake_sort_key',
|
||||
'sort_dir': 'fake_sort_dir',
|
||||
'offset': '1',
|
||||
'limit': '1',
|
||||
}
|
||||
# fake_key should be filtered for non-admin
|
||||
url = '/snapshots?fake_key=fake_value'
|
||||
for k, v in search_opts.items():
|
||||
url = url + '&' + k + '=' + v
|
||||
req = fakes.HTTPRequest.blank(url, use_admin_context=use_admin_context)
|
||||
|
||||
snapshots = [
|
||||
{'id': 'id1', 'display_name': 'n1'},
|
||||
{'id': 'id2', 'display_name': 'n2'},
|
||||
{'id': 'id3', 'display_name': 'n3'},
|
||||
]
|
||||
self.stubs.Set(share_api.API, 'get_all_snapshots',
|
||||
mock.Mock(return_value=snapshots))
|
||||
|
||||
result = self.controller.index(req)
|
||||
|
||||
search_opts_expected = {
|
||||
'display_name': search_opts['name'],
|
||||
'status': search_opts['status'],
|
||||
'share_id': search_opts['share_id'],
|
||||
}
|
||||
if use_admin_context:
|
||||
search_opts_expected.update({'fake_key': 'fake_value'})
|
||||
share_api.API.get_all_snapshots.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
sort_key=search_opts['sort_key'],
|
||||
sort_dir=search_opts['sort_dir'],
|
||||
search_opts=search_opts_expected,
|
||||
)
|
||||
self.assertEqual(1, len(result['snapshots']))
|
||||
self.assertEqual(snapshots[1]['id'], result['snapshots'][0]['id'])
|
||||
self.assertEqual(
|
||||
snapshots[1]['display_name'], result['snapshots'][0]['name'])
|
||||
|
||||
def test_snapshot_list_summary_with_search_opts_by_non_admin(self):
|
||||
self._snapshot_list_summary_with_search_opts(use_admin_context=False)
|
||||
|
||||
def test_snapshot_list_summary_with_search_opts_by_admin(self):
|
||||
self._snapshot_list_summary_with_search_opts(use_admin_context=True)
|
||||
|
||||
def _snapshot_list_detail_with_search_opts(self, use_admin_context):
|
||||
search_opts = {
|
||||
'name': 'fake_name',
|
||||
'status': 'fake_status',
|
||||
'share_id': 'fake_share_id',
|
||||
'sort_key': 'fake_sort_key',
|
||||
'sort_dir': 'fake_sort_dir',
|
||||
'limit': '1',
|
||||
'offset': '1',
|
||||
}
|
||||
# fake_key should be filtered for non-admin
|
||||
url = '/shares/detail?fake_key=fake_value'
|
||||
for k, v in search_opts.items():
|
||||
url = url + '&' + k + '=' + v
|
||||
req = fakes.HTTPRequest.blank(url, use_admin_context=use_admin_context)
|
||||
|
||||
snapshots = [
|
||||
{'id': 'id1', 'display_name': 'n1'},
|
||||
{
|
||||
'id': 'id2',
|
||||
'display_name': 'n2',
|
||||
'status': 'fake_status',
|
||||
'share_id': 'fake_share_id',
|
||||
},
|
||||
{'id': 'id3', 'display_name': 'n3'},
|
||||
]
|
||||
|
||||
self.stubs.Set(share_api.API, 'get_all_snapshots',
|
||||
mock.Mock(return_value=snapshots))
|
||||
|
||||
result = self.controller.detail(req)
|
||||
|
||||
search_opts_expected = {
|
||||
'display_name': search_opts['name'],
|
||||
'status': search_opts['status'],
|
||||
'share_id': search_opts['share_id'],
|
||||
}
|
||||
if use_admin_context:
|
||||
search_opts_expected.update({'fake_key': 'fake_value'})
|
||||
share_api.API.get_all_snapshots.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
sort_key=search_opts['sort_key'],
|
||||
sort_dir=search_opts['sort_dir'],
|
||||
search_opts=search_opts_expected,
|
||||
)
|
||||
self.assertEqual(1, len(result['snapshots']))
|
||||
self.assertEqual(snapshots[1]['id'], result['snapshots'][0]['id'])
|
||||
self.assertEqual(
|
||||
snapshots[1]['display_name'], result['snapshots'][0]['name'])
|
||||
self.assertEqual(
|
||||
snapshots[1]['status'], result['snapshots'][0]['status'])
|
||||
self.assertEqual(
|
||||
snapshots[1]['share_id'], result['snapshots'][0]['share_id'])
|
||||
|
||||
def test_share_list_detail_with_search_opts_by_non_admin(self):
|
||||
self._snapshot_list_detail_with_search_opts(use_admin_context=False)
|
||||
|
||||
def test_share_list_detail_with_search_opts_by_admin(self):
|
||||
self._snapshot_list_detail_with_search_opts(use_admin_context=True)
|
||||
|
||||
def test_snapshot_list_detail(self):
|
||||
env = {'QUERY_STRING': 'name=Share+Test+Name'}
|
||||
req = fakes.HTTPRequest.blank('/shares/detail', environ=env)
|
||||
|
@ -128,6 +128,34 @@ _FAKE_LIST_OF_ALL_SHARES = [
|
||||
]
|
||||
|
||||
|
||||
_FAKE_LIST_OF_ALL_SNAPSHOTS = [
|
||||
{
|
||||
'name': 'foo',
|
||||
'status': 'available',
|
||||
'project_id': 'fake_pid_1',
|
||||
'share_id': 'fake_server_1',
|
||||
},
|
||||
{
|
||||
'name': 'bar',
|
||||
'status': 'error',
|
||||
'project_id': 'fake_pid_2',
|
||||
'share_id': 'fake_server_2',
|
||||
},
|
||||
{
|
||||
'name': 'foo',
|
||||
'status': 'available',
|
||||
'project_id': 'fake_pid_2',
|
||||
'share_id': 'fake_share_id_3',
|
||||
},
|
||||
{
|
||||
'name': 'bar',
|
||||
'status': 'error',
|
||||
'project_id': 'fake_pid_2',
|
||||
'share_id': 'fake_share_id_3',
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class ShareAPITestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -818,7 +846,7 @@ class ShareAPITestCase(test.TestCase):
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
|
||||
ctx, 'fakepid')
|
||||
ctx, 'fakepid', sort_dir='desc', sort_key='share_id', filters={})
|
||||
|
||||
@mock.patch.object(db_driver, 'share_snapshot_get_all', mock.Mock())
|
||||
def test_get_all_snapshots_admin_all_tenants(self):
|
||||
@ -826,7 +854,8 @@ class ShareAPITestCase(test.TestCase):
|
||||
search_opts={'all_tenants': 1})
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
self.context, 'share', 'get_all_snapshots')
|
||||
db_driver.share_snapshot_get_all.assert_called_once_with(self.context)
|
||||
db_driver.share_snapshot_get_all.assert_called_once_with(
|
||||
self.context, sort_dir='desc', sort_key='share_id', filters={})
|
||||
|
||||
@mock.patch.object(db_driver, 'share_snapshot_get_all_by_project',
|
||||
mock.Mock())
|
||||
@ -836,21 +865,61 @@ class ShareAPITestCase(test.TestCase):
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
|
||||
ctx, 'fakepid')
|
||||
ctx, 'fakepid', sort_dir='desc', sort_key='share_id', filters={})
|
||||
|
||||
def test_get_all_snapshots_not_admin_search_opts(self):
|
||||
search_opts = {'size': 'fakesize'}
|
||||
fake_objs = [{'name': 'fakename1'}, search_opts]
|
||||
ctx = context.RequestContext('fakeuid', 'fakepid', is_admin=False)
|
||||
with mock.patch.object(db_driver,
|
||||
'share_snapshot_get_all_by_project',
|
||||
mock.Mock(return_value=fake_objs)):
|
||||
self.stubs.Set(db_driver, 'share_snapshot_get_all_by_project',
|
||||
mock.Mock(return_value=fake_objs))
|
||||
|
||||
result = self.api.get_all_snapshots(ctx, search_opts)
|
||||
|
||||
self.assertEqual([search_opts], result)
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
db_driver.share_snapshot_get_all_by_project.\
|
||||
assert_called_once_with(ctx, 'fakepid')
|
||||
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
|
||||
ctx, 'fakepid', sort_dir='desc', sort_key='share_id',
|
||||
filters=search_opts)
|
||||
|
||||
def test_get_all_snapshots_with_sorting_valid(self):
|
||||
self.stubs.Set(db_driver, 'share_snapshot_get_all_by_project',
|
||||
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SNAPSHOTS[0]))
|
||||
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
|
||||
snapshots = self.api.get_all_snapshots(
|
||||
ctx, sort_key='status', sort_dir='asc')
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
|
||||
ctx, 'fake_pid_1', sort_dir='asc', sort_key='status', filters={})
|
||||
self.assertEqual(_FAKE_LIST_OF_ALL_SNAPSHOTS[0], snapshots)
|
||||
|
||||
def test_get_all_snapshots_sort_key_invalid(self):
|
||||
self.stubs.Set(db_driver, 'share_snapshot_get_all_by_project',
|
||||
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SNAPSHOTS[0]))
|
||||
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
|
||||
self.assertRaises(
|
||||
exception.InvalidInput,
|
||||
self.api.get_all_snapshots,
|
||||
ctx,
|
||||
sort_key=1,
|
||||
)
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
|
||||
def test_get_all_snapshots_sort_dir_invalid(self):
|
||||
self.stubs.Set(db_driver, 'share_snapshot_get_all_by_project',
|
||||
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SNAPSHOTS[0]))
|
||||
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
|
||||
self.assertRaises(
|
||||
exception.InvalidInput,
|
||||
self.api.get_all_snapshots,
|
||||
ctx,
|
||||
sort_dir=1,
|
||||
)
|
||||
share_api.policy.check_policy.assert_called_once_with(
|
||||
ctx, 'share', 'get_all_snapshots')
|
||||
|
||||
def test_allow_access(self):
|
||||
share = fake_share('fakeid', status='available')
|
||||
|
Loading…
Reference in New Issue
Block a user