Use osc_lib.test module
Actually consume it in our own tests. Legacy aliases are provided for external users. Change-Id: I5b892e5d65a75783e57c4c3212942e2bec93b62b Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
@@ -9,14 +9,13 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""API Test Fakes"""
|
||||
|
||||
from keystoneauth1 import session
|
||||
from requests_mock.contrib import fixture
|
||||
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
RESP_ITEM_1 = {
|
||||
@@ -46,7 +45,7 @@ LIST_BODY = {
|
||||
}
|
||||
|
||||
|
||||
class TestSession(utils.TestCase):
|
||||
class TestSession(base.TestCase):
|
||||
BASE_URL = 'https://api.example.com:1234/test'
|
||||
|
||||
def setUp(self):
|
||||
|
||||
@@ -9,13 +9,12 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from osc_lib.cli import client_config
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
class TestOSCConfig(utils.TestCase):
|
||||
class TestOSCConfig(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
|
||||
@@ -11,15 +11,14 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import collections
|
||||
|
||||
from osc_lib.cli import format_columns
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
class TestDictColumn(utils.TestCase):
|
||||
class TestDictColumn(base.TestCase):
|
||||
def test_dict_column(self):
|
||||
data = {
|
||||
'key1': 'value1',
|
||||
@@ -40,7 +39,7 @@ class TestDictColumn(utils.TestCase):
|
||||
self.assertEqual(type(col.machine_readable()), dict)
|
||||
|
||||
|
||||
class TestDictListColumn(utils.TestCase):
|
||||
class TestDictListColumn(base.TestCase):
|
||||
def test_dict_list_column(self):
|
||||
data = {
|
||||
'public': ['2001:db8::8', '172.24.4.6'],
|
||||
@@ -64,7 +63,7 @@ class TestDictListColumn(utils.TestCase):
|
||||
self.assertEqual(type(col.machine_readable()), dict)
|
||||
|
||||
|
||||
class TestListColumn(utils.TestCase):
|
||||
class TestListColumn(base.TestCase):
|
||||
def test_list_column(self):
|
||||
data = [
|
||||
'key1',
|
||||
@@ -83,7 +82,7 @@ class TestListColumn(utils.TestCase):
|
||||
self.assertEqual(type(col.machine_readable()), list)
|
||||
|
||||
|
||||
class TestListDictColumn(utils.TestCase):
|
||||
class TestListDictColumn(base.TestCase):
|
||||
def test_list_dict_column(self):
|
||||
data = [
|
||||
{'key1': 'value1'},
|
||||
@@ -107,7 +106,7 @@ class TestListDictColumn(utils.TestCase):
|
||||
self.assertEqual(type(x), dict) # noqa: H211
|
||||
|
||||
|
||||
class TestSizeColumn(utils.TestCase):
|
||||
class TestSizeColumn(base.TestCase):
|
||||
def test_size_column(self):
|
||||
content = 1576395005
|
||||
col = format_columns.SizeColumn(content)
|
||||
|
||||
@@ -18,10 +18,10 @@ from openstack.identity.v3 import project
|
||||
import testtools
|
||||
|
||||
from osc_lib.cli import identity as cli_identity
|
||||
from osc_lib.tests import utils as test_utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
class IdentityUtilsTestCase(test_utils.TestCase):
|
||||
class IdentityUtilsTestCase(base.TestCase):
|
||||
def test_add_project_owner_option_to_parser(self):
|
||||
parser = argparse.ArgumentParser()
|
||||
cli_identity.add_project_owner_option_to_parser(parser)
|
||||
|
||||
@@ -11,15 +11,14 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import argparse
|
||||
|
||||
from osc_lib.cli import parseractions
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
class TestKeyValueAction(utils.TestCase):
|
||||
class TestKeyValueAction(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
@@ -71,7 +70,7 @@ class TestKeyValueAction(utils.TestCase):
|
||||
self.assertRaises(SystemExit, self.parser.parse_args, data)
|
||||
|
||||
|
||||
class TestKeyValueAppendAction(utils.TestCase):
|
||||
class TestKeyValueAppendAction(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
@@ -130,7 +129,7 @@ class TestKeyValueAppendAction(utils.TestCase):
|
||||
self.assertRaises(SystemExit, self.parser.parse_args, data)
|
||||
|
||||
|
||||
class TestMultiKeyValueAction(utils.TestCase):
|
||||
class TestMultiKeyValueAction(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
@@ -270,7 +269,7 @@ class TestMultiKeyValueAction(utils.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestMultiKeyValueCommaAction(utils.TestCase):
|
||||
class TestMultiKeyValueCommaAction(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.parser = argparse.ArgumentParser()
|
||||
@@ -478,7 +477,7 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestNonNegativeAction(utils.TestCase):
|
||||
class TestNonNegativeAction(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ from unittest import mock
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib.tests import fakes as test_fakes
|
||||
from osc_lib.tests import utils as test_utils
|
||||
from osc_lib.test import base
|
||||
from osc_lib.test import fakes
|
||||
|
||||
|
||||
class FakeCommand(command.Command):
|
||||
@@ -25,7 +25,7 @@ class FakeCommand(command.Command):
|
||||
pass
|
||||
|
||||
|
||||
class TestCommand(test_utils.TestCase):
|
||||
class TestCommand(base.TestCase):
|
||||
def test_command_has_logger(self):
|
||||
cmd = FakeCommand(mock.Mock(), mock.Mock())
|
||||
self.assertTrue(hasattr(cmd, 'log'))
|
||||
@@ -37,7 +37,7 @@ class TestCommand(test_utils.TestCase):
|
||||
def test_validate_os_beta_command_enabled(self):
|
||||
cmd = FakeCommand(mock.Mock(), mock.Mock())
|
||||
cmd.app = mock.Mock()
|
||||
cmd.app.options = test_fakes.FakeOptions()
|
||||
cmd.app.options = fakes.FakeOptions()
|
||||
|
||||
# No exception is raised when enabled.
|
||||
cmd.app.options.os_beta_command = True
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Test Timing pseudo-command"""
|
||||
|
||||
@@ -18,9 +17,10 @@ import datetime
|
||||
from keystoneauth1 import session
|
||||
|
||||
from osc_lib.command import timing
|
||||
from osc_lib.tests import fakes
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
AUTH_TOKEN = "foobar"
|
||||
AUTH_URL = "http://0.0.0.0"
|
||||
|
||||
timing_method = 'GET'
|
||||
timing_url = 'http://localhost:5000'
|
||||
@@ -33,7 +33,7 @@ class FakeGenericClient:
|
||||
self.management_url = kwargs['endpoint']
|
||||
|
||||
|
||||
class TestTiming(utils.TestCommand):
|
||||
class TestTiming(base.TestCommand):
|
||||
columns = (
|
||||
'URL',
|
||||
'Seconds',
|
||||
@@ -45,13 +45,12 @@ class TestTiming(utils.TestCommand):
|
||||
self.app.timing_data = []
|
||||
|
||||
self.app.client_manager.compute = FakeGenericClient(
|
||||
endpoint=fakes.AUTH_URL,
|
||||
token=fakes.AUTH_TOKEN,
|
||||
endpoint=AUTH_URL, token=AUTH_TOKEN
|
||||
)
|
||||
|
||||
self.app.client_manager.volume = FakeGenericClient(
|
||||
endpoint=fakes.AUTH_URL,
|
||||
token=fakes.AUTH_TOKEN,
|
||||
endpoint=AUTH_URL,
|
||||
token=AUTH_TOKEN,
|
||||
)
|
||||
|
||||
# Get the command object to test
|
||||
|
||||
@@ -13,43 +13,63 @@
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import sys
|
||||
from unittest import mock
|
||||
from osc_lib.test.base import TestCase, TestCommand
|
||||
from osc_lib.test.fakes import (
|
||||
AUTH_URL,
|
||||
INTERFACE,
|
||||
IDENTITY_API_VERSION as VERSION,
|
||||
PASSWORD,
|
||||
PROJECT_ID,
|
||||
PROJECT_NAME,
|
||||
REGION_NAME,
|
||||
TOKEN as AUTH_TOKEN,
|
||||
USERNAME,
|
||||
USER_ID,
|
||||
FakeApp,
|
||||
FakeClientManager,
|
||||
FakeLog,
|
||||
FakeModule,
|
||||
FakeOptions,
|
||||
FakeResource,
|
||||
FakeStdout,
|
||||
)
|
||||
from osc_lib.tests.test_clientmanager import (
|
||||
SERVICE_PROVIDER_ID,
|
||||
TEST_RESPONSE_DICT,
|
||||
TEST_RESPONSE_DICT_V3,
|
||||
TEST_VERSIONS,
|
||||
)
|
||||
|
||||
from keystoneauth1 import fixture
|
||||
|
||||
|
||||
AUTH_TOKEN = "foobar"
|
||||
AUTH_URL = "http://0.0.0.0"
|
||||
USERNAME = "itchy"
|
||||
USER_ID = "2354b7c1-f681-4c39-8003-4fe9d1eabb65"
|
||||
PASSWORD = "scratchy"
|
||||
PROJECT_NAME = "poochie"
|
||||
PROJECT_ID = "30c3da29-61f5-4b7b-8eb2-3d18287428c7"
|
||||
REGION_NAME = "richie"
|
||||
INTERFACE = "catchy"
|
||||
VERSION = "3"
|
||||
|
||||
SERVICE_PROVIDER_ID = "bob"
|
||||
|
||||
TEST_RESPONSE_DICT = fixture.V2Token(token_id=AUTH_TOKEN, user_name=USERNAME)
|
||||
_s = TEST_RESPONSE_DICT.add_service('identity', name='keystone')
|
||||
_s.add_endpoint(AUTH_URL + ':5000/v2.0')
|
||||
_s = TEST_RESPONSE_DICT.add_service('network', name='neutron')
|
||||
_s.add_endpoint(AUTH_URL + ':9696')
|
||||
_s = TEST_RESPONSE_DICT.add_service('compute', name='nova')
|
||||
_s.add_endpoint(AUTH_URL + ':8774/v2')
|
||||
_s = TEST_RESPONSE_DICT.add_service('image', name='glance')
|
||||
_s.add_endpoint(AUTH_URL + ':9292')
|
||||
_s = TEST_RESPONSE_DICT.add_service('object', name='swift')
|
||||
_s.add_endpoint(AUTH_URL + ':8080/v1')
|
||||
|
||||
TEST_RESPONSE_DICT_V3 = fixture.V3Token(user_name=USERNAME)
|
||||
TEST_RESPONSE_DICT_V3.set_project_scope()
|
||||
|
||||
TEST_VERSIONS = fixture.DiscoveryList(href=AUTH_URL)
|
||||
__all__ = [
|
||||
'AUTH_TOKEN',
|
||||
'AUTH_URL',
|
||||
'FakeApp',
|
||||
'FakeClientManager',
|
||||
'FakeLog',
|
||||
'FakeModule',
|
||||
'FakeOptions',
|
||||
'FakeResource',
|
||||
'FakeStdout',
|
||||
'INTERFACE',
|
||||
'PASSWORD',
|
||||
'PROJECT_ID',
|
||||
'PROJECT_NAME',
|
||||
'REGION_NAME',
|
||||
'SERVICE_PROVIDER_ID',
|
||||
'TEST_RESPONSE_DICT',
|
||||
'TEST_RESPONSE_DICT_V3',
|
||||
'TEST_VERSIONS',
|
||||
'TestCase',
|
||||
'TestCommand',
|
||||
'to_unicode_dict',
|
||||
'USER_ID',
|
||||
'USERNAME',
|
||||
'VERSION',
|
||||
]
|
||||
|
||||
|
||||
# NOTE(stephenfin): This isn't moved since it's unused now. We can delete it in
|
||||
# the future.
|
||||
def to_unicode_dict(catalog_dict):
|
||||
"""Converts dict to unicode dict"""
|
||||
if isinstance(catalog_dict, dict):
|
||||
@@ -63,136 +83,3 @@ def to_unicode_dict(catalog_dict):
|
||||
return catalog_dict + ""
|
||||
else:
|
||||
return catalog_dict
|
||||
|
||||
|
||||
class FakeStdout:
|
||||
def __init__(self):
|
||||
self.content = []
|
||||
|
||||
def write(self, text):
|
||||
self.content.append(text)
|
||||
|
||||
def make_string(self):
|
||||
result = ''
|
||||
for line in self.content:
|
||||
result = result + line
|
||||
return result
|
||||
|
||||
|
||||
class FakeLog:
|
||||
def __init__(self):
|
||||
self.messages = {}
|
||||
|
||||
def debug(self, msg):
|
||||
self.messages['debug'] = msg
|
||||
|
||||
def info(self, msg):
|
||||
self.messages['info'] = msg
|
||||
|
||||
def warning(self, msg):
|
||||
self.messages['warning'] = msg
|
||||
|
||||
def error(self, msg):
|
||||
self.messages['error'] = msg
|
||||
|
||||
def critical(self, msg):
|
||||
self.messages['critical'] = msg
|
||||
|
||||
|
||||
class FakeApp:
|
||||
def __init__(self, _stdout, _log):
|
||||
self.stdout = _stdout
|
||||
self.client_manager = None
|
||||
self.api_version = {}
|
||||
self.stdin = sys.stdin
|
||||
self.stdout = _stdout or sys.stdout
|
||||
self.stderr = sys.stderr
|
||||
self.log = _log
|
||||
|
||||
|
||||
class FakeOptions:
|
||||
def __init__(self, **kwargs):
|
||||
self.os_beta_command = False
|
||||
|
||||
|
||||
class FakeClientManager:
|
||||
def __init__(self):
|
||||
self.compute = None
|
||||
self.identity = None
|
||||
self.image = None
|
||||
self.object_store = None
|
||||
self.volume = None
|
||||
self.network = None
|
||||
self.session = None
|
||||
self.auth_ref = None
|
||||
self.auth_plugin_name = None
|
||||
|
||||
def get_configuration(self):
|
||||
return {
|
||||
'auth': {
|
||||
'username': USERNAME,
|
||||
'password': PASSWORD,
|
||||
'token': AUTH_TOKEN,
|
||||
},
|
||||
'region': REGION_NAME,
|
||||
'identity_api_version': VERSION,
|
||||
}
|
||||
|
||||
|
||||
class FakeModule:
|
||||
def __init__(self, name, version):
|
||||
self.name = name
|
||||
self.__version__ = version
|
||||
# Workaround for openstacksdk case
|
||||
self.version = mock.Mock()
|
||||
self.version.__version__ = version
|
||||
|
||||
|
||||
class FakeResource:
|
||||
def __init__(self, manager=None, info=None, loaded=False, methods=None):
|
||||
"""Set attributes and methods for a resource.
|
||||
|
||||
:param manager:
|
||||
The resource manager
|
||||
:param Dictionary info:
|
||||
A dictionary with all attributes
|
||||
:param bool loaded:
|
||||
True if the resource is loaded in memory
|
||||
:param Dictionary methods:
|
||||
A dictionary with all methods
|
||||
"""
|
||||
info = info or {}
|
||||
methods = methods or {}
|
||||
|
||||
self.__name__ = type(self).__name__
|
||||
self.manager = manager
|
||||
self._info = info
|
||||
self._add_details(info)
|
||||
self._add_methods(methods)
|
||||
self._loaded = loaded
|
||||
|
||||
def _add_details(self, info):
|
||||
for k, v in info.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
def _add_methods(self, methods):
|
||||
"""Fake methods with MagicMock objects.
|
||||
|
||||
For each <@key, @value> pairs in methods, add an callable MagicMock
|
||||
object named @key as an attribute, and set the mock's return_value to
|
||||
@value. When users access the attribute with (), @value will be
|
||||
returned, which looks like a function call.
|
||||
"""
|
||||
for name, ret in methods.items():
|
||||
method = mock.MagicMock(return_value=ret)
|
||||
setattr(self, name, method)
|
||||
|
||||
def __repr__(self):
|
||||
reprkeys = sorted(
|
||||
k for k in self.__dict__.keys() if k[0] != '_' and k != 'manager'
|
||||
)
|
||||
info = ", ".join(f"{k}={getattr(self, k)}" for k in reprkeys)
|
||||
return f"<{self.__class__.__name__} {info}>"
|
||||
|
||||
def keys(self):
|
||||
return self._info.keys()
|
||||
|
||||
@@ -11,30 +11,54 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import copy
|
||||
import json as jsonutils
|
||||
from unittest import mock
|
||||
|
||||
from keystoneauth1.access import service_catalog
|
||||
from keystoneauth1 import exceptions as ksa_exceptions
|
||||
from keystoneauth1 import fixture as ksa_fixture
|
||||
from keystoneauth1.identity import generic as generic_plugin
|
||||
from keystoneauth1.identity.v3 import k2k
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import noauth
|
||||
from keystoneauth1 import token_endpoint
|
||||
from openstack.config import cloud_config
|
||||
from openstack.config import cloud_region
|
||||
from openstack.config import defaults
|
||||
from openstack import connection
|
||||
from requests_mock.contrib import fixture as requests_mock_fixture
|
||||
|
||||
from osc_lib.api import auth
|
||||
from osc_lib import clientmanager
|
||||
from osc_lib import exceptions as exc
|
||||
from osc_lib.tests import fakes
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
from osc_lib.test import fakes
|
||||
|
||||
SERVICE_PROVIDER_ID = "bob"
|
||||
|
||||
TEST_RESPONSE_DICT = ksa_fixture.V2Token(
|
||||
token_id=fakes.TOKEN, user_name=fakes.USERNAME
|
||||
)
|
||||
_s = TEST_RESPONSE_DICT.add_service('identity', name='keystone')
|
||||
_s.add_endpoint(fakes.AUTH_URL + ':5000/v2.0')
|
||||
_s = TEST_RESPONSE_DICT.add_service('network', name='neutron')
|
||||
_s.add_endpoint(fakes.AUTH_URL + ':9696')
|
||||
_s = TEST_RESPONSE_DICT.add_service('compute', name='nova')
|
||||
_s.add_endpoint(fakes.AUTH_URL + ':8774/v2')
|
||||
_s = TEST_RESPONSE_DICT.add_service('image', name='glance')
|
||||
_s.add_endpoint(fakes.AUTH_URL + ':9292')
|
||||
_s = TEST_RESPONSE_DICT.add_service('object', name='swift')
|
||||
_s.add_endpoint(fakes.AUTH_URL + ':8080/v1')
|
||||
|
||||
TEST_RESPONSE_DICT_V3 = ksa_fixture.V3Token(user_name=fakes.USERNAME)
|
||||
TEST_RESPONSE_DICT_V3.set_project_scope()
|
||||
|
||||
TEST_VERSIONS = ksa_fixture.DiscoveryList(href=fakes.AUTH_URL)
|
||||
|
||||
AUTH_REF = {'version': 'v2.0'}
|
||||
AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access'])
|
||||
AUTH_REF.update(TEST_RESPONSE_DICT['access'])
|
||||
SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF)
|
||||
|
||||
AUTH_DICT = {
|
||||
@@ -57,7 +81,7 @@ class Container:
|
||||
pass
|
||||
|
||||
|
||||
class TestClientCache(utils.TestCase):
|
||||
class TestClientCache(base.TestCase):
|
||||
def test_singleton(self):
|
||||
# NOTE(dtroyer): Verify that the ClientCache descriptor only invokes
|
||||
# the factory one time and always returns the same value after that.
|
||||
@@ -73,7 +97,132 @@ class TestClientCache(utils.TestCase):
|
||||
self.assertEqual("'Container' object has no attribute 'foo'", str(err))
|
||||
|
||||
|
||||
class TestClientManager(utils.TestClientManager):
|
||||
class BaseTestClientManager(base.TestCase):
|
||||
"""ClientManager class test framework"""
|
||||
|
||||
default_password_auth = {
|
||||
'auth_url': fakes.AUTH_URL,
|
||||
'username': fakes.USERNAME,
|
||||
'password': fakes.PASSWORD,
|
||||
'project_name': fakes.PROJECT_NAME,
|
||||
}
|
||||
default_token_auth = {
|
||||
'auth_url': fakes.AUTH_URL,
|
||||
'token': fakes.TOKEN,
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.mock = mock.Mock()
|
||||
self.requests = self.useFixture(requests_mock_fixture.Fixture())
|
||||
# fake v2password token retrieval
|
||||
self.stub_auth(json=TEST_RESPONSE_DICT)
|
||||
# fake token and token_endpoint retrieval
|
||||
self.stub_auth(
|
||||
json=TEST_RESPONSE_DICT,
|
||||
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']),
|
||||
)
|
||||
# fake v3password token retrieval
|
||||
self.stub_auth(
|
||||
json=TEST_RESPONSE_DICT_V3,
|
||||
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']),
|
||||
)
|
||||
# fake password token retrieval
|
||||
self.stub_auth(
|
||||
json=TEST_RESPONSE_DICT_V3,
|
||||
url='/'.join([fakes.AUTH_URL, 'auth/tokens']),
|
||||
)
|
||||
# fake password version endpoint discovery
|
||||
self.stub_auth(json=TEST_VERSIONS, url=fakes.AUTH_URL, verb='GET')
|
||||
|
||||
# Mock the auth plugin
|
||||
self.auth_mock = mock.Mock()
|
||||
|
||||
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
|
||||
subject_token = fakes.TOKEN
|
||||
base_url = fakes.AUTH_URL
|
||||
if json:
|
||||
text = jsonutils.dumps(json)
|
||||
headers = {
|
||||
'X-Subject-Token': subject_token,
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
if not url:
|
||||
url = '/'.join([base_url, 'tokens'])
|
||||
url = url.replace("/?", "?")
|
||||
if not verb:
|
||||
verb = 'POST'
|
||||
self.requests.register_uri(
|
||||
verb,
|
||||
url,
|
||||
headers=headers,
|
||||
text=text,
|
||||
)
|
||||
|
||||
def _clientmanager_class(self):
|
||||
"""Allow subclasses to override the ClientManager class"""
|
||||
return clientmanager.ClientManager
|
||||
|
||||
def _make_clientmanager(
|
||||
self,
|
||||
auth_args=None,
|
||||
config_args=None,
|
||||
identity_api_version=None,
|
||||
auth_plugin_name=None,
|
||||
auth_required=None,
|
||||
):
|
||||
if identity_api_version is None:
|
||||
identity_api_version = '2.0'
|
||||
if auth_plugin_name is None:
|
||||
auth_plugin_name = 'password'
|
||||
|
||||
if auth_plugin_name.endswith('password'):
|
||||
auth_dict = copy.deepcopy(self.default_password_auth)
|
||||
elif auth_plugin_name.endswith('token'):
|
||||
auth_dict = copy.deepcopy(self.default_token_auth)
|
||||
else:
|
||||
auth_dict = {}
|
||||
|
||||
if auth_args is not None:
|
||||
auth_dict = auth_args
|
||||
|
||||
cli_options = defaults.get_defaults()
|
||||
cli_options.update(
|
||||
{
|
||||
'auth_type': auth_plugin_name,
|
||||
'auth': auth_dict,
|
||||
'interface': fakes.INTERFACE,
|
||||
'region_name': fakes.REGION_NAME,
|
||||
}
|
||||
)
|
||||
if config_args is not None:
|
||||
cli_options.update(config_args)
|
||||
|
||||
loader = loading.get_plugin_loader(auth_plugin_name)
|
||||
auth_plugin = loader.load_from_options(**auth_dict)
|
||||
client_manager = self._clientmanager_class()(
|
||||
cli_options=cloud_region.CloudRegion(
|
||||
name='t1',
|
||||
region_name='1',
|
||||
config=cli_options,
|
||||
auth_plugin=auth_plugin,
|
||||
),
|
||||
api_version={
|
||||
'identity': identity_api_version,
|
||||
},
|
||||
)
|
||||
client_manager._auth_required = auth_required is True
|
||||
client_manager.setup_auth()
|
||||
client_manager.auth_ref
|
||||
|
||||
self.assertEqual(
|
||||
auth_plugin_name,
|
||||
client_manager.auth_plugin_name,
|
||||
)
|
||||
return client_manager
|
||||
|
||||
|
||||
class TestClientManager(BaseTestClientManager):
|
||||
def test_client_manager_none(self):
|
||||
none_auth = {
|
||||
'endpoint': fakes.AUTH_URL,
|
||||
@@ -100,7 +249,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
def test_client_manager_admin_token(self):
|
||||
token_auth = {
|
||||
'endpoint': fakes.AUTH_URL,
|
||||
'token': fakes.AUTH_TOKEN,
|
||||
'token': fakes.TOKEN,
|
||||
}
|
||||
client_manager = self._make_clientmanager(
|
||||
auth_args=token_auth,
|
||||
@@ -112,7 +261,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
client_manager._cli_options.config['auth']['endpoint'],
|
||||
)
|
||||
self.assertEqual(
|
||||
fakes.AUTH_TOKEN,
|
||||
fakes.TOKEN,
|
||||
client_manager.auth.get_token(None),
|
||||
)
|
||||
self.assertIsInstance(
|
||||
@@ -156,7 +305,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
client_manager.auth_ref.version,
|
||||
)
|
||||
self.assertEqual(
|
||||
fakes.to_unicode_dict(AUTH_REF),
|
||||
AUTH_REF,
|
||||
client_manager.auth_ref._data['access'],
|
||||
)
|
||||
self.assertEqual(
|
||||
@@ -306,11 +455,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
|
||||
auth_args = copy.deepcopy(self.default_password_auth)
|
||||
auth_args.pop('username')
|
||||
auth_args.update(
|
||||
{
|
||||
'user_id': fakes.USER_ID,
|
||||
}
|
||||
)
|
||||
auth_args.update({'user_id': fakes.USER_ID})
|
||||
self._make_clientmanager(
|
||||
auth_args=auth_args,
|
||||
identity_api_version='3',
|
||||
@@ -404,7 +549,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
'auth': AUTH_DICT,
|
||||
'interface': fakes.INTERFACE,
|
||||
'region_name': fakes.REGION_NAME,
|
||||
'service_provider': fakes.SERVICE_PROVIDER_ID,
|
||||
'service_provider': SERVICE_PROVIDER_ID,
|
||||
'remote_project_id': fakes.PROJECT_ID,
|
||||
}
|
||||
)
|
||||
@@ -425,7 +570,7 @@ class TestClientManager(utils.TestClientManager):
|
||||
# Note(knikolla): Make sure that the auth object is of the correct
|
||||
# type and that the service_provider is correctly set.
|
||||
self.assertIsInstance(client_manager.auth, k2k.Keystone2Keystone)
|
||||
self.assertEqual(client_manager.auth._sp_id, fakes.SERVICE_PROVIDER_ID)
|
||||
self.assertEqual(client_manager.auth._sp_id, SERVICE_PROVIDER_ID)
|
||||
self.assertEqual(client_manager.auth.project_id, fakes.PROJECT_ID)
|
||||
self.assertTrue(client_manager._auth_setup_completed)
|
||||
|
||||
@@ -459,8 +604,6 @@ class TestClientManager(utils.TestClientManager):
|
||||
)
|
||||
self.assertTrue(client_manager.is_service_available('compute'))
|
||||
|
||||
|
||||
class TestClientManagerSDK(utils.TestClientManager):
|
||||
def test_client_manager_connection(self):
|
||||
client_manager = self._make_clientmanager(
|
||||
auth_required=True,
|
||||
|
||||
@@ -15,10 +15,10 @@ import logging
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import logs
|
||||
from osc_lib.tests import utils
|
||||
from osc_lib.test import base
|
||||
|
||||
|
||||
class TestContext(utils.TestCase):
|
||||
class TestContext(base.TestCase):
|
||||
def test_log_level_from_options(self):
|
||||
opts = mock.Mock()
|
||||
opts.verbose_level = 0
|
||||
@@ -64,7 +64,7 @@ class TestContext(utils.TestCase):
|
||||
simplefilter.assert_called_with("once")
|
||||
|
||||
|
||||
class TestFileFormatter(utils.TestCase):
|
||||
class TestFileFormatter(base.TestCase):
|
||||
def test_nothing(self):
|
||||
formatter = logs._FileFormatter()
|
||||
self.assertEqual(
|
||||
@@ -105,7 +105,7 @@ class TestFileFormatter(utils.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestLogConfigurator(utils.TestCase):
|
||||
class TestLogConfigurator(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.options = mock.Mock()
|
||||
|
||||
@@ -11,18 +11,18 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import copy
|
||||
import os
|
||||
import sys
|
||||
from unittest import mock
|
||||
|
||||
import fixtures
|
||||
from oslo_utils import importutils
|
||||
import testtools
|
||||
|
||||
from osc_lib import shell
|
||||
from osc_lib.tests import utils
|
||||
|
||||
from osc_lib.test import base
|
||||
|
||||
DEFAULT_AUTH_URL = "http://127.0.0.1:5000/v2.0/"
|
||||
DEFAULT_PROJECT_ID = "xxxx-yyyy-zzzz"
|
||||
@@ -119,7 +119,181 @@ if shell.osprofiler_profiler:
|
||||
global_options['--os-profile'] = ('SECRET_KEY', True, True)
|
||||
|
||||
|
||||
class TestShellArgV(utils.TestShell):
|
||||
def fake_execute(shell, cmd):
|
||||
"""Pretend to execute shell commands."""
|
||||
return shell.run(cmd.split())
|
||||
|
||||
|
||||
def make_shell(shell_class=None):
|
||||
"""Create a new command shell and mock out some bits."""
|
||||
if shell_class is None:
|
||||
shell_class = shell.OpenStackShell
|
||||
_shell = shell_class()
|
||||
_shell.command_manager = mock.Mock()
|
||||
# _shell.cloud = mock.Mock()
|
||||
|
||||
return _shell
|
||||
|
||||
|
||||
def opt2attr(opt):
|
||||
if opt.startswith('--os-'):
|
||||
attr = opt[5:]
|
||||
elif opt.startswith('--'):
|
||||
attr = opt[2:]
|
||||
else:
|
||||
attr = opt
|
||||
return attr.lower().replace('-', '_')
|
||||
|
||||
|
||||
def opt2env(opt):
|
||||
return opt[2:].upper().replace('-', '_')
|
||||
|
||||
|
||||
class EnvFixture(fixtures.Fixture):
|
||||
"""Environment Fixture.
|
||||
|
||||
This fixture replaces os.environ with provided env or an empty env.
|
||||
"""
|
||||
|
||||
def __init__(self, env=None):
|
||||
self.new_env = env or {}
|
||||
|
||||
def _setUp(self):
|
||||
self.orig_env, os.environ = os.environ, self.new_env
|
||||
self.addCleanup(self.revert)
|
||||
|
||||
def revert(self):
|
||||
os.environ = self.orig_env
|
||||
|
||||
|
||||
class TestShell(base.TestCase):
|
||||
# Full name of the OpenStackShell class to test (cliff.app.App subclass)
|
||||
shell_class_name = "osc_lib.shell.OpenStackShell"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.shell_class = importutils.import_class(self.shell_class_name)
|
||||
self.cmd_patch = mock.patch(self.shell_class_name + ".run_subcommand")
|
||||
self.cmd_save = self.cmd_patch.start()
|
||||
self.addCleanup(self.cmd_patch.stop)
|
||||
self.app = mock.Mock("Test Shell")
|
||||
|
||||
def _assert_initialize_app_arg(self, cmd_options, default_args):
|
||||
"""Check the args passed to initialize_app()
|
||||
|
||||
The argv argument to initialize_app() is the remainder from parsing
|
||||
global options declared in both cliff.app and
|
||||
osc_lib.OpenStackShell build_option_parser(). Any global
|
||||
options passed on the command line should not be in argv but in
|
||||
_shell.options.
|
||||
"""
|
||||
|
||||
with mock.patch(
|
||||
self.shell_class_name + ".initialize_app",
|
||||
self.app,
|
||||
):
|
||||
_shell = make_shell(shell_class=self.shell_class)
|
||||
_cmd = cmd_options + " module list"
|
||||
fake_execute(_shell, _cmd)
|
||||
|
||||
self.app.assert_called_with(["module", "list"])
|
||||
for k in default_args.keys():
|
||||
self.assertEqual(
|
||||
default_args[k],
|
||||
vars(_shell.options)[k],
|
||||
f"{k} does not match",
|
||||
)
|
||||
|
||||
def _assert_cloud_region_arg(self, cmd_options, default_args):
|
||||
"""Check the args passed to OpenStackConfig.get_one()
|
||||
|
||||
The argparse argument to get_one() is an argparse.Namespace
|
||||
object that contains all of the options processed to this point in
|
||||
initialize_app().
|
||||
"""
|
||||
|
||||
cloud = mock.Mock(name="cloudy")
|
||||
cloud.config = {}
|
||||
self.occ_get_one = mock.Mock(return_value=cloud)
|
||||
with mock.patch(
|
||||
"openstack.config.loader.OpenStackConfig.get_one",
|
||||
self.occ_get_one,
|
||||
):
|
||||
_shell = make_shell(shell_class=self.shell_class)
|
||||
_cmd = cmd_options + " module list"
|
||||
fake_execute(_shell, _cmd)
|
||||
|
||||
self.app.assert_called_with(["module", "list"])
|
||||
opts = self.occ_get_one.call_args[1]['argparse']
|
||||
for k in default_args.keys():
|
||||
self.assertEqual(
|
||||
default_args[k],
|
||||
vars(opts)[k],
|
||||
f"{k} does not match",
|
||||
)
|
||||
|
||||
def _test_options_init_app(self, test_opts):
|
||||
"""Test options on the command line"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][1]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
if isinstance(test_opts[opt][0], str):
|
||||
cmd = opt + " " + test_opts[opt][0]
|
||||
else:
|
||||
cmd = opt
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
self._assert_initialize_app_arg(cmd, kwargs)
|
||||
|
||||
def _test_env_init_app(self, test_opts):
|
||||
"""Test options in the environment"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][2]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
env = {
|
||||
opt2env(opt): test_opts[opt][0],
|
||||
}
|
||||
os.environ = env.copy()
|
||||
self._assert_initialize_app_arg("", kwargs)
|
||||
|
||||
def _test_options_get_one_cloud(self, test_opts):
|
||||
"""Test options sent "to openstack.config"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][1]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
if isinstance(test_opts[opt][0], str):
|
||||
cmd = opt + " " + test_opts[opt][0]
|
||||
else:
|
||||
cmd = opt
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
self._assert_cloud_region_arg(cmd, kwargs)
|
||||
|
||||
def _test_env_get_one_cloud(self, test_opts):
|
||||
"""Test environment options sent "to openstack.config"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][2]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
env = {
|
||||
opt2env(opt): test_opts[opt][0],
|
||||
}
|
||||
os.environ = env.copy()
|
||||
self._assert_cloud_region_arg("", kwargs)
|
||||
|
||||
|
||||
class TestShellArgV(TestShell):
|
||||
"""Test the deferred help flag"""
|
||||
|
||||
def setUp(self):
|
||||
@@ -151,12 +325,12 @@ class TestShellArgV(utils.TestShell):
|
||||
self.assertEqual(str, type(self.app.call_args[0][0][0]))
|
||||
|
||||
|
||||
class TestShellHelp(utils.TestShell):
|
||||
class TestShellHelp(TestShell):
|
||||
"""Test the deferred help flag"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.useFixture(utils.EnvFixture())
|
||||
self.useFixture(EnvFixture())
|
||||
|
||||
@testtools.skip("skip until bug 1444983 is resolved")
|
||||
def test_help_options(self):
|
||||
@@ -165,8 +339,8 @@ class TestShellHelp(utils.TestShell):
|
||||
"deferred_help": True,
|
||||
}
|
||||
with mock.patch(self.app_patch + ".initialize_app", self.app):
|
||||
_shell, _cmd = utils.make_shell(), flag
|
||||
utils.fake_execute(_shell, _cmd)
|
||||
_shell, _cmd = make_shell(), flag
|
||||
fake_execute(_shell, _cmd)
|
||||
|
||||
self.assertEqual(
|
||||
kwargs["deferred_help"],
|
||||
@@ -174,7 +348,7 @@ class TestShellHelp(utils.TestShell):
|
||||
)
|
||||
|
||||
|
||||
class TestShellOptions(utils.TestShell):
|
||||
class TestShellOptions(TestShell):
|
||||
"""Test the option handling by argparse and openstack.config.loader
|
||||
|
||||
This covers getting the CLI options through the initial processing
|
||||
@@ -183,7 +357,7 @@ class TestShellOptions(utils.TestShell):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.useFixture(utils.EnvFixture())
|
||||
self.useFixture(EnvFixture())
|
||||
|
||||
def test_empty_auth(self):
|
||||
os.environ = {}
|
||||
@@ -204,7 +378,7 @@ class TestShellOptions(utils.TestShell):
|
||||
self._test_env_get_one_cloud(global_options)
|
||||
|
||||
|
||||
class TestShellCli(utils.TestShell):
|
||||
class TestShellCli(TestShell):
|
||||
"""Test handling of specific global options
|
||||
|
||||
_shell.options is the parsed command line from argparse
|
||||
@@ -215,23 +389,23 @@ class TestShellCli(utils.TestShell):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
env = {}
|
||||
self.useFixture(utils.EnvFixture(env.copy()))
|
||||
self.useFixture(EnvFixture(env.copy()))
|
||||
|
||||
def test_shell_args_no_options(self):
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
with mock.patch(
|
||||
"osc_lib.shell.OpenStackShell.initialize_app",
|
||||
self.app,
|
||||
):
|
||||
utils.fake_execute(_shell, "list user")
|
||||
fake_execute(_shell, "list user")
|
||||
self.app.assert_called_with(["list", "user"])
|
||||
|
||||
def test_shell_args_tls_options(self):
|
||||
"""Test the TLS verify and CA cert file options"""
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Default
|
||||
utils.fake_execute(_shell, "module list")
|
||||
fake_execute(_shell, "module list")
|
||||
self.assertIsNone(_shell.options.verify)
|
||||
self.assertIsNone(_shell.options.insecure)
|
||||
self.assertIsNone(_shell.options.cacert)
|
||||
@@ -239,7 +413,7 @@ class TestShellCli(utils.TestShell):
|
||||
self.assertIsNone(_shell.client_manager.cacert)
|
||||
|
||||
# --verify
|
||||
utils.fake_execute(_shell, "--verify module list")
|
||||
fake_execute(_shell, "--verify module list")
|
||||
self.assertTrue(_shell.options.verify)
|
||||
self.assertIsNone(_shell.options.insecure)
|
||||
self.assertIsNone(_shell.options.cacert)
|
||||
@@ -247,7 +421,7 @@ class TestShellCli(utils.TestShell):
|
||||
self.assertIsNone(_shell.client_manager.cacert)
|
||||
|
||||
# --insecure
|
||||
utils.fake_execute(_shell, "--insecure module list")
|
||||
fake_execute(_shell, "--insecure module list")
|
||||
self.assertIsNone(_shell.options.verify)
|
||||
self.assertTrue(_shell.options.insecure)
|
||||
self.assertIsNone(_shell.options.cacert)
|
||||
@@ -255,7 +429,7 @@ class TestShellCli(utils.TestShell):
|
||||
self.assertIsNone(_shell.client_manager.cacert)
|
||||
|
||||
# --os-cacert
|
||||
utils.fake_execute(_shell, "--os-cacert foo module list")
|
||||
fake_execute(_shell, "--os-cacert foo module list")
|
||||
self.assertIsNone(_shell.options.verify)
|
||||
self.assertIsNone(_shell.options.insecure)
|
||||
self.assertEqual('foo', _shell.options.cacert)
|
||||
@@ -263,7 +437,7 @@ class TestShellCli(utils.TestShell):
|
||||
self.assertEqual('foo', _shell.client_manager.cacert)
|
||||
|
||||
# --os-cacert and --verify
|
||||
utils.fake_execute(_shell, "--os-cacert foo --verify module list")
|
||||
fake_execute(_shell, "--os-cacert foo --verify module list")
|
||||
self.assertTrue(_shell.options.verify)
|
||||
self.assertIsNone(_shell.options.insecure)
|
||||
self.assertEqual('foo', _shell.options.cacert)
|
||||
@@ -275,7 +449,7 @@ class TestShellCli(utils.TestShell):
|
||||
# in this combination --insecure now overrides any
|
||||
# --os-cacert setting, where before --insecure
|
||||
# was ignored if --os-cacert was set.
|
||||
utils.fake_execute(_shell, "--os-cacert foo --insecure module list")
|
||||
fake_execute(_shell, "--os-cacert foo --insecure module list")
|
||||
self.assertIsNone(_shell.options.verify)
|
||||
self.assertTrue(_shell.options.insecure)
|
||||
self.assertEqual('foo', _shell.options.cacert)
|
||||
@@ -284,30 +458,28 @@ class TestShellCli(utils.TestShell):
|
||||
|
||||
def test_shell_args_cert_options(self):
|
||||
"""Test client cert options"""
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Default
|
||||
utils.fake_execute(_shell, "module list")
|
||||
fake_execute(_shell, "module list")
|
||||
self.assertIsNone(_shell.options.cert)
|
||||
self.assertIsNone(_shell.options.key)
|
||||
self.assertIsNone(_shell.client_manager.cert)
|
||||
|
||||
# --os-cert
|
||||
utils.fake_execute(_shell, "--os-cert mycert module list")
|
||||
fake_execute(_shell, "--os-cert mycert module list")
|
||||
self.assertEqual('mycert', _shell.options.cert)
|
||||
self.assertIsNone(_shell.options.key)
|
||||
self.assertEqual('mycert', _shell.client_manager.cert)
|
||||
|
||||
# --os-key
|
||||
utils.fake_execute(_shell, "--os-key mickey module list")
|
||||
fake_execute(_shell, "--os-key mickey module list")
|
||||
self.assertIsNone(_shell.options.cert)
|
||||
self.assertEqual('mickey', _shell.options.key)
|
||||
self.assertIsNone(_shell.client_manager.cert)
|
||||
|
||||
# --os-cert and --os-key
|
||||
utils.fake_execute(
|
||||
_shell, "--os-cert mycert --os-key mickey module list"
|
||||
)
|
||||
fake_execute(_shell, "--os-cert mycert --os-key mickey module list")
|
||||
self.assertEqual('mycert', _shell.options.cert)
|
||||
self.assertEqual('mickey', _shell.options.key)
|
||||
self.assertEqual(('mycert', 'mickey'), _shell.client_manager.cert)
|
||||
@@ -316,9 +488,9 @@ class TestShellCli(utils.TestShell):
|
||||
def test_shell_args_cloud_no_vendor(self, config_mock):
|
||||
"""Test cloud config options without the vendor file"""
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"--os-cloud scc module list",
|
||||
)
|
||||
@@ -372,9 +544,9 @@ class TestShellCli(utils.TestShell):
|
||||
"""Test cloud config options with the vendor file"""
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
|
||||
public_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"--os-cloud megacloud module list",
|
||||
)
|
||||
@@ -420,10 +592,10 @@ class TestShellCli(utils.TestShell):
|
||||
def test_shell_args_precedence(self, config_mock, vendor_mock):
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
|
||||
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Test command option overriding config file value
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"--os-cloud megacloud --os-region-name krikkit module list",
|
||||
)
|
||||
@@ -461,7 +633,7 @@ class TestShellCli(utils.TestShell):
|
||||
)
|
||||
|
||||
|
||||
class TestShellCliPrecedence(utils.TestShell):
|
||||
class TestShellCliPrecedence(TestShell):
|
||||
"""Test option precedencr order"""
|
||||
|
||||
def setUp(self):
|
||||
@@ -470,7 +642,7 @@ class TestShellCliPrecedence(utils.TestShell):
|
||||
'OS_CLOUD': 'megacloud',
|
||||
'OS_REGION_NAME': 'occ-env',
|
||||
}
|
||||
self.useFixture(utils.EnvFixture(env.copy()))
|
||||
self.useFixture(EnvFixture(env.copy()))
|
||||
|
||||
@mock.patch("openstack.config.loader.OpenStackConfig._load_vendor_file")
|
||||
@mock.patch("openstack.config.loader.OpenStackConfig._load_config_file")
|
||||
@@ -478,10 +650,10 @@ class TestShellCliPrecedence(utils.TestShell):
|
||||
"""Test environment overriding occ"""
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
|
||||
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Test env var
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"module list",
|
||||
)
|
||||
@@ -526,10 +698,10 @@ class TestShellCliPrecedence(utils.TestShell):
|
||||
"""Test command line overriding environment and occ"""
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
|
||||
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Test command option overriding config file value
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"--os-region-name krikkit list user",
|
||||
)
|
||||
@@ -574,10 +746,10 @@ class TestShellCliPrecedence(utils.TestShell):
|
||||
"""Test command line overriding environment and occ"""
|
||||
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
|
||||
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
|
||||
_shell = utils.make_shell()
|
||||
_shell = make_shell()
|
||||
|
||||
# Test command option overriding config file value
|
||||
utils.fake_execute(
|
||||
fake_execute(
|
||||
_shell,
|
||||
"--os-cloud scc --os-region-name krikkit list user",
|
||||
)
|
||||
|
||||
@@ -12,411 +12,29 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import copy
|
||||
import json as jsonutils
|
||||
import os
|
||||
from unittest import mock
|
||||
from osc_lib.test.base import ParserException, TestCase, TestCommand
|
||||
from osc_lib.tests.test_clientmanager import (
|
||||
BaseTestClientManager as TestClientManager,
|
||||
)
|
||||
from osc_lib.tests.test_shell import (
|
||||
fake_execute,
|
||||
make_shell,
|
||||
opt2attr,
|
||||
opt2env,
|
||||
TestShell,
|
||||
EnvFixture,
|
||||
)
|
||||
|
||||
from cliff import columns as cliff_columns
|
||||
import fixtures
|
||||
from keystoneauth1 import loading
|
||||
from openstack.config import cloud_region
|
||||
from openstack.config import defaults
|
||||
from oslo_utils import importutils
|
||||
from requests_mock.contrib import fixture
|
||||
import testtools
|
||||
|
||||
from osc_lib import clientmanager
|
||||
from osc_lib import shell
|
||||
from osc_lib.tests import fakes
|
||||
|
||||
|
||||
def fake_execute(shell, cmd):
|
||||
"""Pretend to execute shell commands."""
|
||||
return shell.run(cmd.split())
|
||||
|
||||
|
||||
def make_shell(shell_class=None):
|
||||
"""Create a new command shell and mock out some bits."""
|
||||
if shell_class is None:
|
||||
shell_class = shell.OpenStackShell
|
||||
_shell = shell_class()
|
||||
_shell.command_manager = mock.Mock()
|
||||
# _shell.cloud = mock.Mock()
|
||||
|
||||
return _shell
|
||||
|
||||
|
||||
def opt2attr(opt):
|
||||
if opt.startswith('--os-'):
|
||||
attr = opt[5:]
|
||||
elif opt.startswith('--'):
|
||||
attr = opt[2:]
|
||||
else:
|
||||
attr = opt
|
||||
return attr.lower().replace('-', '_')
|
||||
|
||||
|
||||
def opt2env(opt):
|
||||
return opt[2:].upper().replace('-', '_')
|
||||
|
||||
|
||||
class EnvFixture(fixtures.Fixture):
|
||||
"""Environment Fixture.
|
||||
|
||||
This fixture replaces os.environ with provided env or an empty env.
|
||||
"""
|
||||
|
||||
def __init__(self, env=None):
|
||||
self.new_env = env or {}
|
||||
|
||||
def _setUp(self):
|
||||
self.orig_env, os.environ = os.environ, self.new_env
|
||||
self.addCleanup(self.revert)
|
||||
|
||||
def revert(self):
|
||||
os.environ = self.orig_env
|
||||
|
||||
|
||||
class ParserException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TestCase(testtools.TestCase):
|
||||
def setUp(self):
|
||||
testtools.TestCase.setUp(self)
|
||||
|
||||
if (
|
||||
os.environ.get("OS_STDOUT_CAPTURE") == "True"
|
||||
or os.environ.get("OS_STDOUT_CAPTURE") == "1"
|
||||
):
|
||||
stdout = self.useFixture(fixtures.StringStream("stdout")).stream
|
||||
self.useFixture(fixtures.MonkeyPatch("sys.stdout", stdout))
|
||||
|
||||
if (
|
||||
os.environ.get("OS_STDERR_CAPTURE") == "True"
|
||||
or os.environ.get("OS_STDERR_CAPTURE") == "1"
|
||||
):
|
||||
stderr = self.useFixture(fixtures.StringStream("stderr")).stream
|
||||
self.useFixture(fixtures.MonkeyPatch("sys.stderr", stderr))
|
||||
|
||||
def assertNotCalled(self, m, msg=None):
|
||||
"""Assert a function was not called"""
|
||||
|
||||
if m.called:
|
||||
if not msg:
|
||||
msg = f'method {m} should not have been called'
|
||||
self.fail(msg)
|
||||
|
||||
|
||||
class TestCommand(TestCase):
|
||||
"""Test cliff command classes"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
# Build up a fake app
|
||||
self.fake_stdout = fakes.FakeStdout()
|
||||
self.fake_log = fakes.FakeLog()
|
||||
self.app = fakes.FakeApp(self.fake_stdout, self.fake_log)
|
||||
self.app.client_manager = fakes.FakeClientManager()
|
||||
|
||||
def check_parser(self, cmd, args, verify_args):
|
||||
cmd_parser = cmd.get_parser('check_parser')
|
||||
try:
|
||||
parsed_args = cmd_parser.parse_args(args)
|
||||
except SystemExit:
|
||||
raise ParserException("Argument parse failed")
|
||||
for av in verify_args:
|
||||
attr, value = av
|
||||
if attr:
|
||||
self.assertIn(attr, parsed_args)
|
||||
self.assertEqual(value, getattr(parsed_args, attr))
|
||||
return parsed_args
|
||||
|
||||
def assertItemEqual(self, expected, actual):
|
||||
"""Compare item considering formattable columns.
|
||||
|
||||
This method compares an observed item to an expected item column by
|
||||
column. If a column is a formattable column, observed and expected
|
||||
columns are compared using human_readable() and machine_readable().
|
||||
"""
|
||||
self.assertEqual(len(expected), len(actual))
|
||||
for col_expected, col_actual in zip(expected, actual):
|
||||
if isinstance(col_expected, cliff_columns.FormattableColumn):
|
||||
self.assertIsInstance(col_actual, col_expected.__class__)
|
||||
self.assertEqual(
|
||||
col_expected.human_readable(), col_actual.human_readable()
|
||||
)
|
||||
self.assertEqual(
|
||||
col_expected.machine_readable(),
|
||||
col_actual.machine_readable(),
|
||||
)
|
||||
else:
|
||||
self.assertEqual(col_expected, col_actual)
|
||||
|
||||
def assertListItemEqual(self, expected, actual):
|
||||
"""Compare a list of items considering formattable columns.
|
||||
|
||||
Each pair of observed and expected items are compared
|
||||
using assertItemEqual() method.
|
||||
"""
|
||||
self.assertEqual(len(expected), len(actual))
|
||||
for item_expected, item_actual in zip(expected, actual):
|
||||
self.assertItemEqual(item_expected, item_actual)
|
||||
|
||||
|
||||
class TestClientManager(TestCase):
|
||||
"""ClientManager class test framework"""
|
||||
|
||||
default_password_auth = {
|
||||
'auth_url': fakes.AUTH_URL,
|
||||
'username': fakes.USERNAME,
|
||||
'password': fakes.PASSWORD,
|
||||
'project_name': fakes.PROJECT_NAME,
|
||||
}
|
||||
default_token_auth = {
|
||||
'auth_url': fakes.AUTH_URL,
|
||||
'token': fakes.AUTH_TOKEN,
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.mock = mock.Mock()
|
||||
self.requests = self.useFixture(fixture.Fixture())
|
||||
# fake v2password token retrieval
|
||||
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
|
||||
# fake token and token_endpoint retrieval
|
||||
self.stub_auth(
|
||||
json=fakes.TEST_RESPONSE_DICT,
|
||||
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']),
|
||||
)
|
||||
# fake v3password token retrieval
|
||||
self.stub_auth(
|
||||
json=fakes.TEST_RESPONSE_DICT_V3,
|
||||
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']),
|
||||
)
|
||||
# fake password token retrieval
|
||||
self.stub_auth(
|
||||
json=fakes.TEST_RESPONSE_DICT_V3,
|
||||
url='/'.join([fakes.AUTH_URL, 'auth/tokens']),
|
||||
)
|
||||
# fake password version endpoint discovery
|
||||
self.stub_auth(
|
||||
json=fakes.TEST_VERSIONS, url=fakes.AUTH_URL, verb='GET'
|
||||
)
|
||||
|
||||
# Mock the auth plugin
|
||||
self.auth_mock = mock.Mock()
|
||||
|
||||
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
|
||||
subject_token = fakes.AUTH_TOKEN
|
||||
base_url = fakes.AUTH_URL
|
||||
if json:
|
||||
text = jsonutils.dumps(json)
|
||||
headers = {
|
||||
'X-Subject-Token': subject_token,
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
if not url:
|
||||
url = '/'.join([base_url, 'tokens'])
|
||||
url = url.replace("/?", "?")
|
||||
if not verb:
|
||||
verb = 'POST'
|
||||
self.requests.register_uri(
|
||||
verb,
|
||||
url,
|
||||
headers=headers,
|
||||
text=text,
|
||||
)
|
||||
|
||||
def _clientmanager_class(self):
|
||||
"""Allow subclasses to override the ClientManager class"""
|
||||
return clientmanager.ClientManager
|
||||
|
||||
def _make_clientmanager(
|
||||
self,
|
||||
auth_args=None,
|
||||
config_args=None,
|
||||
identity_api_version=None,
|
||||
auth_plugin_name=None,
|
||||
auth_required=None,
|
||||
):
|
||||
if identity_api_version is None:
|
||||
identity_api_version = '2.0'
|
||||
if auth_plugin_name is None:
|
||||
auth_plugin_name = 'password'
|
||||
|
||||
if auth_plugin_name.endswith('password'):
|
||||
auth_dict = copy.deepcopy(self.default_password_auth)
|
||||
elif auth_plugin_name.endswith('token'):
|
||||
auth_dict = copy.deepcopy(self.default_token_auth)
|
||||
else:
|
||||
auth_dict = {}
|
||||
|
||||
if auth_args is not None:
|
||||
auth_dict = auth_args
|
||||
|
||||
cli_options = defaults.get_defaults()
|
||||
cli_options.update(
|
||||
{
|
||||
'auth_type': auth_plugin_name,
|
||||
'auth': auth_dict,
|
||||
'interface': fakes.INTERFACE,
|
||||
'region_name': fakes.REGION_NAME,
|
||||
# 'workflow_api_version': '2',
|
||||
}
|
||||
)
|
||||
if config_args is not None:
|
||||
cli_options.update(config_args)
|
||||
|
||||
loader = loading.get_plugin_loader(auth_plugin_name)
|
||||
auth_plugin = loader.load_from_options(**auth_dict)
|
||||
client_manager = self._clientmanager_class()(
|
||||
cli_options=cloud_region.CloudRegion(
|
||||
name='t1',
|
||||
region_name='1',
|
||||
config=cli_options,
|
||||
auth_plugin=auth_plugin,
|
||||
),
|
||||
api_version={
|
||||
'identity': identity_api_version,
|
||||
},
|
||||
)
|
||||
client_manager._auth_required = auth_required is True
|
||||
client_manager.setup_auth()
|
||||
client_manager.auth_ref
|
||||
|
||||
self.assertEqual(
|
||||
auth_plugin_name,
|
||||
client_manager.auth_plugin_name,
|
||||
)
|
||||
return client_manager
|
||||
|
||||
|
||||
class TestShell(TestCase):
|
||||
# Full name of the OpenStackShell class to test (cliff.app.App subclass)
|
||||
shell_class_name = "osc_lib.shell.OpenStackShell"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.shell_class = importutils.import_class(self.shell_class_name)
|
||||
self.cmd_patch = mock.patch(self.shell_class_name + ".run_subcommand")
|
||||
self.cmd_save = self.cmd_patch.start()
|
||||
self.addCleanup(self.cmd_patch.stop)
|
||||
self.app = mock.Mock("Test Shell")
|
||||
|
||||
def _assert_initialize_app_arg(self, cmd_options, default_args):
|
||||
"""Check the args passed to initialize_app()
|
||||
|
||||
The argv argument to initialize_app() is the remainder from parsing
|
||||
global options declared in both cliff.app and
|
||||
osc_lib.OpenStackShell build_option_parser(). Any global
|
||||
options passed on the command line should not be in argv but in
|
||||
_shell.options.
|
||||
"""
|
||||
|
||||
with mock.patch(
|
||||
self.shell_class_name + ".initialize_app",
|
||||
self.app,
|
||||
):
|
||||
_shell = make_shell(shell_class=self.shell_class)
|
||||
_cmd = cmd_options + " module list"
|
||||
fake_execute(_shell, _cmd)
|
||||
|
||||
self.app.assert_called_with(["module", "list"])
|
||||
for k in default_args.keys():
|
||||
self.assertEqual(
|
||||
default_args[k],
|
||||
vars(_shell.options)[k],
|
||||
f"{k} does not match",
|
||||
)
|
||||
|
||||
def _assert_cloud_region_arg(self, cmd_options, default_args):
|
||||
"""Check the args passed to OpenStackConfig.get_one()
|
||||
|
||||
The argparse argument to get_one() is an argparse.Namespace
|
||||
object that contains all of the options processed to this point in
|
||||
initialize_app().
|
||||
"""
|
||||
|
||||
cloud = mock.Mock(name="cloudy")
|
||||
cloud.config = {}
|
||||
self.occ_get_one = mock.Mock(return_value=cloud)
|
||||
with mock.patch(
|
||||
"openstack.config.loader.OpenStackConfig.get_one",
|
||||
self.occ_get_one,
|
||||
):
|
||||
_shell = make_shell(shell_class=self.shell_class)
|
||||
_cmd = cmd_options + " module list"
|
||||
fake_execute(_shell, _cmd)
|
||||
|
||||
self.app.assert_called_with(["module", "list"])
|
||||
opts = self.occ_get_one.call_args[1]['argparse']
|
||||
for k in default_args.keys():
|
||||
self.assertEqual(
|
||||
default_args[k],
|
||||
vars(opts)[k],
|
||||
f"{k} does not match",
|
||||
)
|
||||
|
||||
def _test_options_init_app(self, test_opts):
|
||||
"""Test options on the command line"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][1]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
if isinstance(test_opts[opt][0], str):
|
||||
cmd = opt + " " + test_opts[opt][0]
|
||||
else:
|
||||
cmd = opt
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
self._assert_initialize_app_arg(cmd, kwargs)
|
||||
|
||||
def _test_env_init_app(self, test_opts):
|
||||
"""Test options in the environment"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][2]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
env = {
|
||||
opt2env(opt): test_opts[opt][0],
|
||||
}
|
||||
os.environ = env.copy()
|
||||
self._assert_initialize_app_arg("", kwargs)
|
||||
|
||||
def _test_options_get_one_cloud(self, test_opts):
|
||||
"""Test options sent "to openstack.config"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][1]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
if isinstance(test_opts[opt][0], str):
|
||||
cmd = opt + " " + test_opts[opt][0]
|
||||
else:
|
||||
cmd = opt
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
self._assert_cloud_region_arg(cmd, kwargs)
|
||||
|
||||
def _test_env_get_one_cloud(self, test_opts):
|
||||
"""Test environment options sent "to openstack.config"""
|
||||
for opt in test_opts.keys():
|
||||
if not test_opts[opt][2]:
|
||||
continue
|
||||
key = opt2attr(opt)
|
||||
kwargs = {
|
||||
key: test_opts[opt][0],
|
||||
}
|
||||
env = {
|
||||
opt2env(opt): test_opts[opt][0],
|
||||
}
|
||||
os.environ = env.copy()
|
||||
self._assert_cloud_region_arg("", kwargs)
|
||||
__all__ = [
|
||||
'fake_execute',
|
||||
'make_shell',
|
||||
'opt2attr',
|
||||
'opt2env',
|
||||
'EnvFixture',
|
||||
'ParserException',
|
||||
'TestCase',
|
||||
'TestClientManager',
|
||||
'TestCommand',
|
||||
'TestShell',
|
||||
]
|
||||
|
||||
@@ -9,13 +9,12 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from osc_lib.tests import utils as test_utils
|
||||
from osc_lib.test import base
|
||||
from osc_lib.utils import columns as column_utils
|
||||
|
||||
|
||||
class TestColumnUtils(test_utils.TestCase):
|
||||
class TestColumnUtils(base.TestCase):
|
||||
def test_get_column_definitions(self):
|
||||
attr_map = (
|
||||
('id', 'ID', column_utils.LIST_BOTH),
|
||||
|
||||
@@ -16,7 +16,7 @@ import argparse
|
||||
import functools
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib.tests import utils as test_utils
|
||||
from osc_lib.test import base
|
||||
from osc_lib.utils import tags
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ def help_enhancer(_h):
|
||||
return ''.join(reversed(_h))
|
||||
|
||||
|
||||
class TestTags(test_utils.TestCase):
|
||||
class TestTags(base.TestCase):
|
||||
def test_add_tag_filtering_option_to_parser(self):
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=functools.partial(argparse.HelpFormatter, width=78)
|
||||
@@ -206,7 +206,7 @@ class TestTags(test_utils.TestCase):
|
||||
mock_client.set_tags.assert_called_once_with(mock_obj, ['tag1'])
|
||||
|
||||
|
||||
class TestTagHelps(test_utils.TestCase):
|
||||
class TestTagHelps(base.TestCase):
|
||||
def _test_tag_method_help(self, meth, exp_normal, exp_enhanced):
|
||||
"""Vet the help text of the options added by the tag filtering helpers.
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ from cliff import columns as cliff_columns
|
||||
|
||||
from osc_lib.cli import format_columns
|
||||
from osc_lib import exceptions
|
||||
from osc_lib.tests import fakes
|
||||
from osc_lib.tests import utils as test_utils
|
||||
from osc_lib.test import base
|
||||
from osc_lib.test import fakes
|
||||
from osc_lib import utils
|
||||
|
||||
PASSWORD = "Pa$$w0rd"
|
||||
@@ -41,7 +41,7 @@ class FakeOddballResource(fakes.FakeResource):
|
||||
return None
|
||||
|
||||
|
||||
class TestUtils(test_utils.TestCase):
|
||||
class TestUtils(base.TestCase):
|
||||
def _get_test_items(self):
|
||||
item1 = {'a': 1, 'b': 2}
|
||||
item2 = {'a': 1, 'b': 3}
|
||||
@@ -532,7 +532,7 @@ class NoUniqueMatch(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TestFindResource(test_utils.TestCase):
|
||||
class TestFindResource(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.name = 'legos'
|
||||
@@ -839,7 +839,7 @@ class TestFindResource(test_utils.TestCase):
|
||||
self.assertEqual(expected, actual_unsorted)
|
||||
|
||||
|
||||
class TestAssertItemEqual(test_utils.TestCommand):
|
||||
class TestAssertItemEqual(base.TestCommand):
|
||||
def test_assert_normal_item(self):
|
||||
expected = ['a', 'b', 'c']
|
||||
actual = ['a', 'b', 'c']
|
||||
@@ -908,7 +908,7 @@ class TestAssertItemEqual(test_utils.TestCommand):
|
||||
self.assertListItemEqual(expected, actual)
|
||||
|
||||
|
||||
class TestSDKUtils(test_utils.TestCase):
|
||||
class TestSDKUtils(base.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user