Files
python-glanceclient/tests/v2/test_shell_v2.py
Flavio Percoco 93c9bc1fe0 Add a --limit parameter to list operations
In v2, we use the link header during paginations to know when there are
more images available in the server that could be returned in the same
request. This way, it's possible to iterate over the generator returned
by list and consume the images in the server.

However, it's currently not possible to tell glanceclient the exact
number of images we want, which basically means that it'll *always* go
through the whole list of images in the server unless the limit is
implemented by the consumer.

DocImpact
Change-Id: I9f65a40d4eafda6320e5c7d94d03fcd1bbc93e33
Closes-bug: #1415035
2015-01-29 10:37:55 +00:00

976 lines
42 KiB
Python

# Copyright 2013 OpenStack Foundation
# Copyright (C) 2013 Yahoo! Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import os
import tempfile
import testtools
from glanceclient.common import utils
from glanceclient.v2 import shell as test_shell
class ShellV2Test(testtools.TestCase):
def setUp(self):
super(ShellV2Test, self).setUp()
self._mock_utils()
self.gc = self._mock_glance_client()
def _make_args(self, args):
#NOTE(venkatesh): this conversion from a dict to an object
# is required because the test_shell.do_xxx(gc, args) methods
# expects the args to be attributes of an object. If passed as
# dict directly, it throws an AttributeError.
class Args():
def __init__(self, entries):
self.__dict__.update(entries)
return Args(args)
def _mock_glance_client(self):
my_mocked_gc = mock.Mock()
my_mocked_gc.schemas.return_value = 'test'
my_mocked_gc.get.return_value = {}
return my_mocked_gc
def _mock_utils(self):
utils.print_list = mock.Mock()
utils.print_dict = mock.Mock()
utils.save_image = mock.Mock()
def assert_exits_with_msg(self, func, func_args, err_msg):
with mock.patch.object(utils, 'exit') as mocked_utils_exit:
mocked_utils_exit.return_value = '%s' % err_msg
func(self.gc, func_args)
mocked_utils_exit.assert_called_once_with(err_msg)
def test_do_image_list(self):
input = {
'limit': None,
'page_size': 18,
'visibility': True,
'member_status': 'Fake',
'owner': 'test',
'checksum': 'fake_checksum',
'tag': 'fake tag',
'properties': []
}
args = self._make_args(input)
with mock.patch.object(self.gc.images, 'list') as mocked_list:
mocked_list.return_value = {}
test_shell.do_image_list(self.gc, args)
exp_img_filters = {
'owner': 'test',
'member_status': 'Fake',
'visibility': True,
'checksum': 'fake_checksum',
'tag': 'fake tag'
}
mocked_list.assert_called_once_with(page_size=18,
filters=exp_img_filters)
utils.print_list.assert_called_once_with({}, ['ID', 'Name'])
def test_do_image_list_with_property_filter(self):
input = {
'limit': None,
'page_size': 1,
'visibility': True,
'member_status': 'Fake',
'owner': 'test',
'checksum': 'fake_checksum',
'tag': 'fake tag',
'properties': ['os_distro=NixOS', 'architecture=x86_64']
}
args = self._make_args(input)
with mock.patch.object(self.gc.images, 'list') as mocked_list:
mocked_list.return_value = {}
test_shell.do_image_list(self.gc, args)
exp_img_filters = {
'owner': 'test',
'member_status': 'Fake',
'visibility': True,
'checksum': 'fake_checksum',
'tag': 'fake tag',
'os_distro': 'NixOS',
'architecture': 'x86_64'
}
mocked_list.assert_called_once_with(page_size=1,
filters=exp_img_filters)
utils.print_list.assert_called_once_with({}, ['ID', 'Name'])
def test_do_image_show(self):
args = self._make_args({'id': 'pass', 'page_size': 18,
'max_column_width': 120})
with mock.patch.object(self.gc.images, 'get') as mocked_list:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
mocked_list.return_value = expect_image
test_shell.do_image_show(self.gc, args)
mocked_list.assert_called_once_with('pass')
utils.print_dict.assert_called_once_with({'id': 'pass'},
max_column_width=120)
def test_do_image_create_no_user_props(self):
args = self._make_args({'name': 'IMG-01', 'disk_format': 'vhd',
'container_format': 'bare'})
with mock.patch.object(self.gc.images, 'create') as mocked_create:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['disk_format'] = 'vhd'
expect_image['container_format'] = 'bare'
mocked_create.return_value = expect_image
test_shell.do_image_create(self.gc, args)
mocked_create.assert_called_once_with(name='IMG-01',
disk_format='vhd',
container_format='bare')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd',
'container_format': 'bare'})
def test_do_image_create_with_file(self):
try:
file_name = None
with open(tempfile.mktemp(), 'w+') as f:
f.write('Some data here')
f.flush()
f.seek(0)
file_name = f.name
temp_args = {'name': 'IMG-01',
'disk_format': 'vhd',
'container_format': 'bare',
'file': file_name,
'progress': False}
args = self._make_args(temp_args)
with mock.patch.object(self.gc.images, 'create') as mocked_create:
with mock.patch.object(self.gc.images, 'get') as mocked_get:
ignore_fields = ['self', 'access', 'schema']
expect_image = dict([(field, field) for field in
ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['disk_format'] = 'vhd'
expect_image['container_format'] = 'bare'
mocked_create.return_value = expect_image
mocked_get.return_value = expect_image
test_shell.do_image_create(self.gc, args)
temp_args.pop('file', None)
mocked_create.assert_called_once_with(**temp_args)
mocked_get.assert_called_once_with('pass')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd',
'container_format': 'bare'})
finally:
try:
os.remove(f.name)
except Exception:
pass
def test_do_image_create_with_user_props(self):
args = self._make_args({'name': 'IMG-01',
'property': ['myprop=myval']})
with mock.patch.object(self.gc.images, 'create') as mocked_create:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['myprop'] = 'myval'
mocked_create.return_value = expect_image
test_shell.do_image_create(self.gc, args)
mocked_create.assert_called_once_with(name='IMG-01',
myprop='myval')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'myprop': 'myval'})
def test_do_image_update_no_user_props(self):
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
'disk_format': 'vhd',
'container_format': 'bare'})
with mock.patch.object(self.gc.images, 'update') as mocked_update:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['disk_format'] = 'vhd'
expect_image['container_format'] = 'bare'
mocked_update.return_value = expect_image
test_shell.do_image_update(self.gc, args)
mocked_update.assert_called_once_with('pass',
None,
name='IMG-01',
disk_format='vhd',
container_format='bare')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd',
'container_format': 'bare'})
def test_do_image_update_with_user_props(self):
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
'property': ['myprop=myval']})
with mock.patch.object(self.gc.images, 'update') as mocked_update:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['myprop'] = 'myval'
mocked_update.return_value = expect_image
test_shell.do_image_update(self.gc, args)
mocked_update.assert_called_once_with('pass',
None,
name='IMG-01',
myprop='myval')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'myprop': 'myval'})
def test_do_image_update_with_remove_props(self):
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
'disk_format': 'vhd',
'remove-property': ['container_format']})
with mock.patch.object(self.gc.images, 'update') as mocked_update:
ignore_fields = ['self', 'access', 'file', 'schema']
expect_image = dict([(field, field) for field in ignore_fields])
expect_image['id'] = 'pass'
expect_image['name'] = 'IMG-01'
expect_image['disk_format'] = 'vhd'
mocked_update.return_value = expect_image
test_shell.do_image_update(self.gc, args)
mocked_update.assert_called_once_with('pass',
['container_format'],
name='IMG-01',
disk_format='vhd')
utils.print_dict.assert_called_once_with({
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd'})
def test_do_explain(self):
input = {
'page_size': 18,
'id': 'pass',
'schemas': 'test',
'model': 'test',
}
args = self._make_args(input)
with mock.patch.object(utils, 'print_list'):
test_shell.do_explain(self.gc, args)
self.gc.schemas.get.assert_called_once_with('test')
def test_do_location_add(self):
gc = self.gc
loc = {'url': 'http://foo.com/', 'metadata': {'foo': 'bar'}}
args = self._make_args({'id': 'pass',
'url': loc['url'],
'metadata': json.dumps(loc['metadata'])})
with mock.patch.object(gc.images, 'add_location') as mocked_addloc:
expect_image = {'id': 'pass', 'locations': [loc]}
mocked_addloc.return_value = expect_image
test_shell.do_location_add(self.gc, args)
mocked_addloc.assert_called_once_with('pass',
loc['url'],
loc['metadata'])
utils.print_dict.assert_called_once_with(expect_image)
def test_do_location_delete(self):
gc = self.gc
loc_set = set(['http://foo/bar', 'http://spam/ham'])
args = self._make_args({'id': 'pass', 'url': loc_set})
with mock.patch.object(gc.images, 'delete_locations') as mocked_rmloc:
test_shell.do_location_delete(self.gc, args)
mocked_rmloc.assert_called_once_with('pass', loc_set)
def test_do_location_update(self):
gc = self.gc
loc = {'url': 'http://foo.com/', 'metadata': {'foo': 'bar'}}
args = self._make_args({'id': 'pass',
'url': loc['url'],
'metadata': json.dumps(loc['metadata'])})
with mock.patch.object(gc.images, 'update_location') as mocked_modloc:
expect_image = {'id': 'pass', 'locations': [loc]}
mocked_modloc.return_value = expect_image
test_shell.do_location_update(self.gc, args)
mocked_modloc.assert_called_once_with('pass',
loc['url'],
loc['metadata'])
utils.print_dict.assert_called_once_with(expect_image)
def test_image_upload(self):
args = self._make_args(
{'id': 'IMG-01', 'file': 'test', 'size': 1024, 'progress': False})
with mock.patch.object(self.gc.images, 'upload') as mocked_upload:
utils.get_data_file = mock.Mock(return_value='testfile')
mocked_upload.return_value = None
test_shell.do_image_upload(self.gc, args)
mocked_upload.assert_called_once_with('IMG-01', 'testfile', 1024)
def test_image_download(self):
args = self._make_args(
{'id': 'IMG-01', 'file': 'test', 'progress': True})
with mock.patch.object(self.gc.images, 'data') as mocked_data:
def _data():
for c in 'abcedf':
yield c
mocked_data.return_value = utils.IterableWithLength(_data(), 5)
test_shell.do_image_download(self.gc, args)
mocked_data.assert_called_once_with('IMG-01')
def test_do_image_delete(self):
args = self._make_args({'id': 'pass', 'file': 'test'})
with mock.patch.object(self.gc.images, 'delete') as mocked_delete:
mocked_delete.return_value = 0
test_shell.do_image_delete(self.gc, args)
mocked_delete.assert_called_once_with('pass')
def test_do_member_list(self):
args = self._make_args({'image_id': 'IMG-01'})
with mock.patch.object(self.gc.image_members, 'list') as mocked_list:
mocked_list.return_value = {}
test_shell.do_member_list(self.gc, args)
mocked_list.assert_called_once_with('IMG-01')
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list.assert_called_once_with({}, columns)
def test_do_member_create(self):
args = self._make_args({'image_id': 'IMG-01', 'member_id': 'MEM-01'})
with mock.patch.object(self.gc.image_members, 'create') as mock_create:
mock_create.return_value = {}
test_shell.do_member_create(self.gc, args)
mock_create.assert_called_once_with('IMG-01', 'MEM-01')
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list.assert_called_once_with([{}], columns)
def test_do_member_create_with_few_arguments(self):
args = self._make_args({'image_id': None, 'member_id': 'MEM-01'})
msg = 'Unable to create member. Specify image_id and member_id'
self.assert_exits_with_msg(func=test_shell.do_member_create,
func_args=args,
err_msg=msg)
def test_do_member_update(self):
input = {
'image_id': 'IMG-01',
'member_id': 'MEM-01',
'member_status': 'status',
}
args = self._make_args(input)
with mock.patch.object(self.gc.image_members, 'update') as mock_update:
mock_update.return_value = {}
test_shell.do_member_update(self.gc, args)
mock_update.assert_called_once_with('IMG-01', 'MEM-01', 'status')
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list.assert_called_once_with([{}], columns)
def test_do_member_update_with_few_arguments(self):
input = {
'image_id': 'IMG-01',
'member_id': 'MEM-01',
'member_status': None,
}
args = self._make_args(input)
msg = 'Unable to update member. Specify image_id, member_id' \
' and member_status'
self.assert_exits_with_msg(func=test_shell.do_member_update,
func_args=args,
err_msg=msg)
def test_do_member_delete(self):
args = self._make_args({'image_id': 'IMG-01', 'member_id': 'MEM-01'})
with mock.patch.object(self.gc.image_members, 'delete') as mock_delete:
test_shell.do_member_delete(self.gc, args)
mock_delete.assert_called_once_with('IMG-01', 'MEM-01')
def test_do_member_delete_with_few_arguments(self):
args = self._make_args({'image_id': None, 'member_id': 'MEM-01'})
msg = 'Unable to delete member. Specify image_id and member_id'
self.assert_exits_with_msg(func=test_shell.do_member_delete,
func_args=args,
err_msg=msg)
def test_image_tag_update(self):
args = self._make_args({'image_id': 'IMG-01', 'tag_value': 'tag01'})
with mock.patch.object(self.gc.image_tags, 'update') as mocked_update:
self.gc.images.get = mock.Mock(return_value={})
mocked_update.return_value = None
test_shell.do_image_tag_update(self.gc, args)
mocked_update.assert_called_once_with('IMG-01', 'tag01')
def test_image_tag_update_with_few_arguments(self):
args = self._make_args({'image_id': None, 'tag_value': 'tag01'})
msg = 'Unable to update tag. Specify image_id and tag_value'
self.assert_exits_with_msg(func=test_shell.do_image_tag_update,
func_args=args,
err_msg=msg)
def test_image_tag_delete(self):
args = self._make_args({'image_id': 'IMG-01', 'tag_value': 'tag01'})
with mock.patch.object(self.gc.image_tags, 'delete') as mocked_delete:
mocked_delete.return_value = None
test_shell.do_image_tag_delete(self.gc, args)
mocked_delete.assert_called_once_with('IMG-01', 'tag01')
def test_image_tag_delete_with_few_arguments(self):
args = self._make_args({'image_id': 'IMG-01', 'tag_value': None})
msg = 'Unable to delete tag. Specify image_id and tag_value'
self.assert_exits_with_msg(func=test_shell.do_image_tag_delete,
func_args=args,
err_msg=msg)
def test_do_md_namespace_create(self):
args = self._make_args({'namespace': 'MyNamespace',
'protected': True})
with mock.patch.object(self.gc.metadefs_namespace,
'create') as mocked_create:
expect_namespace = {}
expect_namespace['namespace'] = 'MyNamespace'
expect_namespace['protected'] = True
mocked_create.return_value = expect_namespace
test_shell.do_md_namespace_create(self.gc, args)
mocked_create.assert_called_once_with(namespace='MyNamespace',
protected=True)
utils.print_dict.assert_called_once_with(expect_namespace)
def test_do_md_namespace_import(self):
args = self._make_args({'file': 'test'})
expect_namespace = {}
expect_namespace['namespace'] = 'MyNamespace'
expect_namespace['protected'] = True
with mock.patch.object(self.gc.metadefs_namespace,
'create') as mocked_create:
mock_read = mock.Mock(return_value=json.dumps(expect_namespace))
mock_file = mock.Mock(read=mock_read)
utils.get_data_file = mock.Mock(return_value=mock_file)
mocked_create.return_value = expect_namespace
test_shell.do_md_namespace_import(self.gc, args)
mocked_create.assert_called_once_with(**expect_namespace)
utils.print_dict.assert_called_once_with(expect_namespace)
def test_do_md_namespace_import_invalid_json(self):
args = self._make_args({'file': 'test'})
mock_read = mock.Mock(return_value='Invalid')
mock_file = mock.Mock(read=mock_read)
utils.get_data_file = mock.Mock(return_value=mock_file)
self.assertRaises(SystemExit, test_shell.do_md_namespace_import,
self.gc, args)
def test_do_md_namespace_import_no_input(self):
args = self._make_args({'file': None})
utils.get_data_file = mock.Mock(return_value=None)
self.assertRaises(SystemExit, test_shell.do_md_namespace_import,
self.gc, args)
def test_do_md_namespace_update(self):
args = self._make_args({'id': 'MyNamespace',
'protected': True})
with mock.patch.object(self.gc.metadefs_namespace,
'update') as mocked_update:
expect_namespace = {}
expect_namespace['namespace'] = 'MyNamespace'
expect_namespace['protected'] = True
mocked_update.return_value = expect_namespace
test_shell.do_md_namespace_update(self.gc, args)
mocked_update.assert_called_once_with('MyNamespace',
id='MyNamespace',
protected=True)
utils.print_dict.assert_called_once_with(expect_namespace)
def test_do_md_namespace_show(self):
args = self._make_args({'namespace': 'MyNamespace',
'max_column_width': 80,
'resource_type': None})
with mock.patch.object(self.gc.metadefs_namespace,
'get') as mocked_get:
expect_namespace = {}
expect_namespace['namespace'] = 'MyNamespace'
mocked_get.return_value = expect_namespace
test_shell.do_md_namespace_show(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace')
utils.print_dict.assert_called_once_with(expect_namespace, 80)
def test_do_md_namespace_show_resource_type(self):
args = self._make_args({'namespace': 'MyNamespace',
'max_column_width': 80,
'resource_type': 'RESOURCE'})
with mock.patch.object(self.gc.metadefs_namespace,
'get') as mocked_get:
expect_namespace = {}
expect_namespace['namespace'] = 'MyNamespace'
mocked_get.return_value = expect_namespace
test_shell.do_md_namespace_show(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace',
resource_type='RESOURCE')
utils.print_dict.assert_called_once_with(expect_namespace, 80)
def test_do_md_namespace_list(self):
args = self._make_args({'resource_type': None,
'visibility': None,
'page_size': None})
with mock.patch.object(self.gc.metadefs_namespace,
'list') as mocked_list:
expect_namespaces = [{'namespace': 'MyNamespace'}]
mocked_list.return_value = expect_namespaces
test_shell.do_md_namespace_list(self.gc, args)
mocked_list.assert_called_once_with(filters={})
utils.print_list.assert_called_once_with(expect_namespaces,
['namespace'])
def test_do_md_namespace_list_page_size(self):
args = self._make_args({'resource_type': None,
'visibility': None,
'page_size': 2})
with mock.patch.object(self.gc.metadefs_namespace,
'list') as mocked_list:
expect_namespaces = [{'namespace': 'MyNamespace'}]
mocked_list.return_value = expect_namespaces
test_shell.do_md_namespace_list(self.gc, args)
mocked_list.assert_called_once_with(filters={}, page_size=2)
utils.print_list.assert_called_once_with(expect_namespaces,
['namespace'])
def test_do_md_namespace_list_one_filter(self):
args = self._make_args({'resource_types': ['OS::Compute::Aggregate'],
'visibility': None,
'page_size': None})
with mock.patch.object(self.gc.metadefs_namespace, 'list') as \
mocked_list:
expect_namespaces = [{'namespace': 'MyNamespace'}]
mocked_list.return_value = expect_namespaces
test_shell.do_md_namespace_list(self.gc, args)
mocked_list.assert_called_once_with(filters={
'resource_types': ['OS::Compute::Aggregate']})
utils.print_list.assert_called_once_with(expect_namespaces,
['namespace'])
def test_do_md_namespace_list_all_filters(self):
args = self._make_args({'resource_types': ['OS::Compute::Aggregate'],
'visibility': 'public',
'page_size': None})
with mock.patch.object(self.gc.metadefs_namespace,
'list') as mocked_list:
expect_namespaces = [{'namespace': 'MyNamespace'}]
mocked_list.return_value = expect_namespaces
test_shell.do_md_namespace_list(self.gc, args)
mocked_list.assert_called_once_with(filters={
'resource_types': ['OS::Compute::Aggregate'],
'visibility': 'public'})
utils.print_list.assert_called_once_with(expect_namespaces,
['namespace'])
def test_do_md_namespace_list_unknown_filter(self):
args = self._make_args({'resource_type': None,
'visibility': None,
'some_arg': 'some_value',
'page_size': None})
with mock.patch.object(self.gc.metadefs_namespace,
'list') as mocked_list:
expect_namespaces = [{'namespace': 'MyNamespace'}]
mocked_list.return_value = expect_namespaces
test_shell.do_md_namespace_list(self.gc, args)
mocked_list.assert_called_once_with(filters={})
utils.print_list.assert_called_once_with(expect_namespaces,
['namespace'])
def test_do_md_namespace_delete(self):
args = self._make_args({'namespace': 'MyNamespace',
'content': False})
with mock.patch.object(self.gc.metadefs_namespace, 'delete') as \
mocked_delete:
test_shell.do_md_namespace_delete(self.gc, args)
mocked_delete.assert_called_once_with('MyNamespace')
def test_do_md_resource_type_associate(self):
args = self._make_args({'namespace': 'MyNamespace',
'name': 'MyResourceType',
'prefix': 'PREFIX:'})
with mock.patch.object(self.gc.metadefs_resource_type,
'associate') as mocked_associate:
expect_rt = {}
expect_rt['namespace'] = 'MyNamespace'
expect_rt['name'] = 'MyResourceType'
expect_rt['prefix'] = 'PREFIX:'
mocked_associate.return_value = expect_rt
test_shell.do_md_resource_type_associate(self.gc, args)
mocked_associate.assert_called_once_with('MyNamespace',
**expect_rt)
utils.print_dict.assert_called_once_with(expect_rt)
def test_do_md_resource_type_deassociate(self):
args = self._make_args({'namespace': 'MyNamespace',
'resource_type': 'MyResourceType'})
with mock.patch.object(self.gc.metadefs_resource_type,
'deassociate') as mocked_deassociate:
test_shell.do_md_resource_type_deassociate(self.gc, args)
mocked_deassociate.assert_called_once_with('MyNamespace',
'MyResourceType')
def test_do_md_resource_type_list(self):
args = self._make_args({})
with mock.patch.object(self.gc.metadefs_resource_type,
'list') as mocked_list:
expect_objects = ['MyResourceType1', 'MyResourceType2']
mocked_list.return_value = expect_objects
test_shell.do_md_resource_type_list(self.gc, args)
mocked_list.assert_called_once()
def test_do_md_namespace_resource_type_list(self):
args = self._make_args({'namespace': 'MyNamespace'})
with mock.patch.object(self.gc.metadefs_resource_type,
'get') as mocked_get:
expect_objects = [{'namespace': 'MyNamespace',
'object': 'MyObject'}]
mocked_get.return_value = expect_objects
test_shell.do_md_namespace_resource_type_list(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace')
utils.print_list.assert_called_once_with(expect_objects,
['name', 'prefix',
'properties_target'])
def test_do_md_property_create(self):
args = self._make_args({'namespace': 'MyNamespace',
'name': "MyProperty",
'title': "Title",
'schema': '{}'})
with mock.patch.object(self.gc.metadefs_property,
'create') as mocked_create:
expect_property = {}
expect_property['namespace'] = 'MyNamespace'
expect_property['name'] = 'MyProperty'
expect_property['title'] = 'Title'
mocked_create.return_value = expect_property
test_shell.do_md_property_create(self.gc, args)
mocked_create.assert_called_once_with('MyNamespace',
name='MyProperty',
title='Title')
utils.print_dict.assert_called_once_with(expect_property)
def test_do_md_property_create_invalid_schema(self):
args = self._make_args({'namespace': 'MyNamespace',
'name': "MyProperty",
'title': "Title",
'schema': 'Invalid'})
self.assertRaises(SystemExit, test_shell.do_md_property_create,
self.gc, args)
def test_do_md_property_update(self):
args = self._make_args({'namespace': 'MyNamespace',
'property': 'MyProperty',
'name': 'NewName',
'title': "Title",
'schema': '{}'})
with mock.patch.object(self.gc.metadefs_property,
'update') as mocked_update:
expect_property = {}
expect_property['namespace'] = 'MyNamespace'
expect_property['name'] = 'MyProperty'
expect_property['title'] = 'Title'
mocked_update.return_value = expect_property
test_shell.do_md_property_update(self.gc, args)
mocked_update.assert_called_once_with('MyNamespace', 'MyProperty',
name='NewName',
title='Title')
utils.print_dict.assert_called_once_with(expect_property)
def test_do_md_property_update_invalid_schema(self):
args = self._make_args({'namespace': 'MyNamespace',
'property': 'MyProperty',
'name': "MyObject",
'title': "Title",
'schema': 'Invalid'})
self.assertRaises(SystemExit, test_shell.do_md_property_update,
self.gc, args)
def test_do_md_property_show(self):
args = self._make_args({'namespace': 'MyNamespace',
'property': 'MyProperty',
'max_column_width': 80})
with mock.patch.object(self.gc.metadefs_property, 'get') as mocked_get:
expect_property = {}
expect_property['namespace'] = 'MyNamespace'
expect_property['property'] = 'MyProperty'
expect_property['title'] = 'Title'
mocked_get.return_value = expect_property
test_shell.do_md_property_show(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace', 'MyProperty')
utils.print_dict.assert_called_once_with(expect_property, 80)
def test_do_md_property_delete(self):
args = self._make_args({'namespace': 'MyNamespace',
'property': 'MyProperty'})
with mock.patch.object(self.gc.metadefs_property,
'delete') as mocked_delete:
test_shell.do_md_property_delete(self.gc, args)
mocked_delete.assert_called_once_with('MyNamespace', 'MyProperty')
def test_do_md_namespace_property_delete(self):
args = self._make_args({'namespace': 'MyNamespace'})
with mock.patch.object(self.gc.metadefs_property,
'delete_all') as mocked_delete_all:
test_shell.do_md_namespace_properties_delete(self.gc, args)
mocked_delete_all.assert_called_once_with('MyNamespace')
def test_do_md_property_list(self):
args = self._make_args({'namespace': 'MyNamespace'})
with mock.patch.object(self.gc.metadefs_property,
'list') as mocked_list:
expect_objects = [{'namespace': 'MyNamespace',
'property': 'MyProperty',
'title': 'MyTitle'}]
mocked_list.return_value = expect_objects
test_shell.do_md_property_list(self.gc, args)
mocked_list.assert_called_once_with('MyNamespace')
utils.print_list.assert_called_once_with(expect_objects,
['name', 'title', 'type'])
def test_do_md_object_create(self):
args = self._make_args({'namespace': 'MyNamespace',
'name': "MyObject",
'schema': '{}'})
with mock.patch.object(self.gc.metadefs_object,
'create') as mocked_create:
expect_object = {}
expect_object['namespace'] = 'MyNamespace'
expect_object['name'] = 'MyObject'
mocked_create.return_value = expect_object
test_shell.do_md_object_create(self.gc, args)
mocked_create.assert_called_once_with('MyNamespace',
name='MyObject')
utils.print_dict.assert_called_once_with(expect_object)
def test_do_md_object_create_invalid_schema(self):
args = self._make_args({'namespace': 'MyNamespace',
'name': "MyObject",
'schema': 'Invalid'})
self.assertRaises(SystemExit, test_shell.do_md_object_create,
self.gc, args)
def test_do_md_object_update(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject',
'name': 'NewName',
'schema': '{}'})
with mock.patch.object(self.gc.metadefs_object,
'update') as mocked_update:
expect_object = {}
expect_object['namespace'] = 'MyNamespace'
expect_object['name'] = 'MyObject'
mocked_update.return_value = expect_object
test_shell.do_md_object_update(self.gc, args)
mocked_update.assert_called_once_with('MyNamespace', 'MyObject',
name='NewName')
utils.print_dict.assert_called_once_with(expect_object)
def test_do_md_object_update_invalid_schema(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject',
'name': "MyObject",
'schema': 'Invalid'})
self.assertRaises(SystemExit, test_shell.do_md_object_update,
self.gc, args)
def test_do_md_object_show(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject',
'max_column_width': 80})
with mock.patch.object(self.gc.metadefs_object, 'get') as mocked_get:
expect_object = {}
expect_object['namespace'] = 'MyNamespace'
expect_object['object'] = 'MyObject'
mocked_get.return_value = expect_object
test_shell.do_md_object_show(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace', 'MyObject')
utils.print_dict.assert_called_once_with(expect_object, 80)
def test_do_md_object_property_show(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject',
'property': 'MyProperty',
'max_column_width': 80})
with mock.patch.object(self.gc.metadefs_object, 'get') as mocked_get:
expect_object = {'name': 'MyObject',
'properties': {
'MyProperty': {'type': 'string'}
}}
mocked_get.return_value = expect_object
test_shell.do_md_object_property_show(self.gc, args)
mocked_get.assert_called_once_with('MyNamespace', 'MyObject')
utils.print_dict.assert_called_once_with({'type': 'string',
'name': 'MyProperty'},
80)
def test_do_md_object_property_show_non_existing(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject',
'property': 'MyProperty',
'max_column_width': 80})
with mock.patch.object(self.gc.metadefs_object, 'get') as mocked_get:
expect_object = {'name': 'MyObject', 'properties': {}}
mocked_get.return_value = expect_object
self.assertRaises(SystemExit,
test_shell.do_md_object_property_show,
self.gc, args)
mocked_get.assert_called_once_with('MyNamespace', 'MyObject')
def test_do_md_object_delete(self):
args = self._make_args({'namespace': 'MyNamespace',
'object': 'MyObject'})
with mock.patch.object(self.gc.metadefs_object,
'delete') as mocked_delete:
test_shell.do_md_object_delete(self.gc, args)
mocked_delete.assert_called_once_with('MyNamespace', 'MyObject')
def test_do_md_namespace_objects_delete(self):
args = self._make_args({'namespace': 'MyNamespace'})
with mock.patch.object(self.gc.metadefs_object,
'delete_all') as mocked_delete_all:
test_shell.do_md_namespace_objects_delete(self.gc, args)
mocked_delete_all.assert_called_once_with('MyNamespace')
def test_do_md_object_list(self):
args = self._make_args({'namespace': 'MyNamespace'})
with mock.patch.object(self.gc.metadefs_object, 'list') as mocked_list:
expect_objects = [{'namespace': 'MyNamespace',
'object': 'MyObject'}]
mocked_list.return_value = expect_objects
test_shell.do_md_object_list(self.gc, args)
mocked_list.assert_called_once_with('MyNamespace')
utils.print_list.assert_called_once_with(
expect_objects,
['name', 'description'],
field_settings={
'description': {'align': 'l', 'max_width': 50}})