Merge "Added missing unit tests for shell.py"

This commit is contained in:
Jenkins
2013-02-19 03:37:03 +00:00
committed by Gerrit Code Review
4 changed files with 1011 additions and 4 deletions

101
tests/fakes.py Normal file
View File

@@ -0,0 +1,101 @@
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might raise AssertionError. I've indicated in comments the
places where actual behavior differs from the spec.
"""
from keystoneclient import service_catalog
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
try:
assert k in keys
except AssertionError:
extra_keys = set(keys).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" %
list(extra_keys))
class FakeClient(object):
def assert_called(self, method, url, body=None, pos=-1):
"""
Assert than an API method was just called.
"""
expected = (method, url)
called = self.callstack[pos][0:2]
assert self.callstack, ("Expected %s %s but no calls were made." %
expected)
assert expected == called, ("Expected %s %s; got %s %s" %
(expected + called))
if body is not None:
assert self.callstack[pos][2] == body
def assert_called_anytime(self, method, url, body=None):
"""
Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.callstack, ("Expected %s %s but no calls were made." %
expected)
found = False
for entry in self.callstack:
if expected == entry[0:2]:
found = True
break
assert found, ('Expected %s; got %s' %
(expected, self.callstack))
if body is not None:
if entry[2] != body:
raise AssertionError('%s != %s' % (entry[2], body))
self.callstack = []
def clear_callstack(self):
self.callstack = []
def authenticate(self, cl_obj):
cl_obj.user_id = '1'
cl_obj.auth_user_id = '1'
cl_obj.tenant_id = '1'
cl_obj.auth_tenant_id = '1'
cl_obj.service_catalog = service_catalog.ServiceCatalog({
"token": {
"expires": "2012-02-05T00:00:00",
"id": "887665443383838",
"tenant": {
"id": "1",
"name": "customer-x"}},
"serviceCatalog": [
{"endpoints": [
{"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"publicURL":
"http://swift.publicinternets.com/v1/AUTH_1"}],
"type": "object-store",
"name": "swift"},
{"endpoints": [
{"adminURL": "http://cdn.admin-nets.local/v1.1/1",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:7777/v1.1/1",
"publicURL": "http://cdn.publicinternets.com/v1.1/1"}],
"type": "object-store",
"name": "cdn"}],
"user": {
"id": "1",
"roles": [
{"tenantId": "1",
"id": "3",
"name": "Member"}],
"name": "joeuser"}
}
)

View File

@@ -1,16 +1,21 @@
import argparse
import cStringIO
import json
import mock
import os
import requests
import sys
import uuid
import fixtures
import requests
import mock
import testtools
from testtools import matchers
from keystoneclient import exceptions
from keystoneclient import shell as openstack_shell
from keystoneclient.v2_0 import shell as shell_v2_0
from keystoneclient import exceptions
from tests import utils
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
@@ -51,9 +56,82 @@ class ShellTest(utils.TestCase):
_shell = openstack_shell.OpenStackIdentityShell()
shell = lambda cmd: _shell.main(cmd.split())
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, shell, 'help %s'
% uuid.uuid4().hex)
def shell(self, argstr):
orig = sys.stdout
clean_env = {}
_old_env, os.environ = os.environ, clean_env.copy()
try:
sys.stdout = cStringIO.StringIO()
_shell = openstack_shell.OpenStackIdentityShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(exc_value.code, 0)
finally:
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
os.environ = _old_env
return out
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, shell, 'help foofoo')
def test_help_no_args(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.shell.OpenStackIdentityShell.do_help',
do_tenant_mock):
self.shell('')
assert do_tenant_mock.called
def test_help(self):
required = 'usage:'
help_text = self.shell('help')
self.assertThat(help_text,
matchers.MatchesRegex(required))
def test_help_command(self):
required = 'usage: keystone user-create'
help_text = self.shell('help user-create')
self.assertThat(help_text,
matchers.MatchesRegex(required))
def test_auth_no_credentials(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting authentication method'):
self.shell('user-list')
def test_auth_password_authurl_no_username(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting a username provided via either'):
self.shell('--os-password=%s --os-auth-url=%s user-list'
% (uuid.uuid4().hex, uuid.uuid4().hex))
def test_auth_username_password_no_authurl(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting an auth URL via either'):
self.shell('--os-password=%s --os-username=%s user-list'
% (uuid.uuid4().hex, uuid.uuid4().hex))
def test_token_no_endpoint(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting an endpoint provided'):
self.shell('--token=%s user-list' % uuid.uuid4().hex)
def test_endpoint_no_token(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting a token provided'):
self.shell('--endpoint=http://10.0.0.1:5000/v2.0/ user-list')
def test_shell_args(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_user_list',

503
tests/v2_0/fakes.py Normal file
View File

@@ -0,0 +1,503 @@
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2011 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import urlparse
from keystoneclient import client as base_client
from keystoneclient.v2_0 import client
from tests import fakes
from tests import utils
class FakeHTTPClient(fakes.FakeClient):
def __init__(self, **kwargs):
self.username = 'username'
self.password = 'password'
self.auth_url = 'auth_url'
self.callstack = []
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
kwargs.setdefault('body', None)
# Call the method
args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body})
return r, body
#
# List all extensions
#
def post_tokens(self, **kw):
body = [
{"access":
{"token":
{"expires": "2012-02-05T00:00:00",
"id": "887665443383838",
"tenant":
{"id": "1",
"name": "customer-x"}},
"serviceCatalog": [
{"endpoints": [
{"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"publicURL":
"http://swift.publicinternets.com/v1/AUTH_1"}],
"type": "object-store",
"name": "swift"},
{"endpoints": [
{"adminURL": "http://cdn.admin-nets.local/v1.1/1",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:7777/v1.1/1",
"publicURL":
"http://cdn.publicinternets.com/v1.1/1"}],
"type": "object-store",
"name": "cdn"}],
"user":
{"id": "1",
"roles": [
{"tenantId": "1",
"id": "3",
"name": "Member"}],
"name": "joeuser"}}
}
]
return (200, body)
def get_tokens_887665443383838(self, **kw):
body = [
{"access":
{"token":
{"expires": "2012-02-05T00:00:00",
"id": "887665443383838",
"tenant": {"id": "1",
"name": "customer-x"}},
"user":
{"name": "joeuser",
"tenantName": "customer-x",
"id": "1",
"roles": [
{"serviceId": "1",
"id": "3",
"name": "Member"}],
"tenantId": "1"}}
}
]
return (200, body)
def get_tokens_887665443383838_endpoints(self, **kw):
body = [
{"endpoints_links": [
{"href":
"http://127.0.0.1:35357/tokens/887665443383838"
"/endpoints?'marker=5&limit=10'",
"rel": "next"}],
"endpoints": [
{"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"name": "swift",
"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"tenantId": 1,
"type": "object-store",
"id": 1,
"publicURL": "http://swift.publicinternets.com/v1/AUTH_1"},
{"internalURL": "http://localhost:8774/v1.0",
"name": "nova_compat",
"adminURL": "http://127.0.0.1:8774/v1.0",
"region": "RegionOne",
"tenantId": 1,
"type": "compute",
"id": 2,
"publicURL": "http://nova.publicinternets.com/v1.0/"},
{"internalURL": "http://localhost:8774/v1.1",
"name": "nova",
"adminURL": "http://127.0.0.1:8774/v1.1",
"region": "RegionOne",
"tenantId": 1,
"type": "compute",
"id": 3,
"publicURL": "http://nova.publicinternets.com/v1.1/"},
{"internalURL": "http://127.0.0.1:9292/v1.1/",
"name": "glance",
"adminURL": "http://nova.admin-nets.local/v1.1/",
"region": "RegionOne",
"tenantId": 1,
"type": "image",
"id": 4,
"publicURL": "http://glance.publicinternets.com/v1.1/"},
{"internalURL": "http://127.0.0.1:7777/v1.1/1",
"name": "cdn",
"adminURL": "http://cdn.admin-nets.local/v1.1/1",
"region": "RegionOne",
"tenantId": 1,
"versionId": "1.1",
"versionList": "http://127.0.0.1:7777/",
"versionInfo": "http://127.0.0.1:7777/v1.1",
"type": "object-store",
"id": 5,
"publicURL": "http://cdn.publicinternets.com/v1.1/1"}]
}
]
return (200, body)
def get(self, **kw):
body = {
"version": {
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": [
{"rel": "self",
"href": "http://127.0.0.1:35357/v2.0/"},
{"rel": "describedby",
"type": "text/html",
"href": "http://docs.openstack.org/"
"api/openstack-identity-service/2.0/content/"},
{"rel": "describedby",
"type": "application/pdf",
"href": "http://docs.openstack.org/api/"
"openstack-identity-service/2.0/identity-dev-guide-2.0.pdf"},
{"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "http://127.0.0.1:35357/v2.0/identity-admin.wadl"}],
"media-types": [
{"base": "application/xml",
"type": "application/vnd.openstack.identity-v2.0+xml"},
{"base": "application/json",
"type": "application/vnd.openstack.identity-v2.0+json"}]
}
}
return (200, body)
def get_extensions(self, **kw):
body = {
"extensions": {"values": []}
}
return (200, body)
def post_tenants(self, **kw):
body = {"tenant":
{"enabled": True,
"description": None,
"name": "new-tenant",
"id": "1"}}
return (200, body)
def post_tenants_2(self, **kw):
body = {"tenant":
{"enabled": False,
"description": "desc",
"name": "new-tenant1",
"id": "2"}}
return (200, body)
def get_tenants(self, **kw):
body = {
"tenants_links": [],
"tenants": [
{"enabled": False,
"description": None,
"name": "project-y",
"id": "1"},
{"enabled": True,
"description": None,
"name": "new-tenant",
"id": "2"},
{"enabled": True,
"description": None,
"name": "customer-x",
"id": "1"}]
}
return (200, body)
def get_tenants_1(self, **kw):
body = {"tenant":
{"enabled": True,
"description": None,
"name": "new-tenant",
"id": "1"}}
return (200, body)
def get_tenants_2(self, **kw):
body = {"tenant":
{"enabled": True,
"description": None,
"name": "new-tenant",
"id": "2"}}
return (200, body)
def delete_tenants_2(self, **kw):
body = {}
return (200, body)
def get_tenants_1_users_1_roles(self, **kw):
body = {
"roles": [
{"id": "1",
"name": "Admin"},
{"id": "2",
"name": "Member"},
{"id": "3",
"name": "new-role"}]
}
return (200, body)
def put_users_1_roles_OS_KSADM_1(self, **kw):
body = {
"roles":
{"id": "1",
"name": "Admin"}}
return (200, body)
def delete_users_1_roles_OS_KSADM_1(self, **kw):
body = {}
return (200, body)
def put_tenants_1_users_1_roles_OS_KSADM_1(self, **kw):
body = {
"role":
{"id": "1",
"name": "Admin"}}
return (200, body)
def get_users(self, **kw):
body = {
"users": [
{"name": self.username,
"enabled": "true",
"email": "sdfsdf@sdfsd.sdf",
"id": "1",
"tenantId": "1"},
{"name": "user2",
"enabled": "true",
"email": "sdfsdf@sdfsd.sdf",
"id": "2",
"tenantId": "1"}]
}
return (200, body)
def get_users_1(self, **kw):
body = {
"user": {
"tenantId": "1",
"enabled": "true",
"id": "1",
"name": self.username}
}
return (200, body)
def put_users_1(self, **kw):
body = {
"user": {
"tenantId": "1",
"enabled": "true",
"id": "1",
"name": "new-user1",
"email": "user@email.com"}
}
return (200, body)
def put_users_1_OS_KSADM_password(self, **kw):
body = {
"user": {
"tenantId": "1",
"enabled": "true",
"id": "1",
"name": "new-user1",
"email": "user@email.com"}
}
return (200, body)
def post_users(self, **kw):
body = {
"user": {
"tenantId": "1",
"enabled": "true",
"id": "1",
"name": self.username}
}
return (200, body)
def delete_users_1(self, **kw):
body = []
return (200, body)
def get_users_1_roles(self, **kw):
body = [
{"roles_links": [],
"roles":[
{"id": "2",
"name": "KeystoneServiceAdmin"}]
}
]
return (200, body)
def post_OS_KSADM_roles(self, **kw):
body = {"role":
{"name": "new-role",
"id": "1"}}
return (200, body)
def post_OS_KSADM_roles(self, **kw):
body = {"role":
{"name": "new-role",
"id": "1"}}
return (200, body)
def get_OS_KSADM_roles(self, **kw):
body = {"roles": [
{"id": "10", "name": "admin"},
{"id": "20", "name": "member"},
{"id": "1", "name": "new-role"}]
}
return (200, body)
def get_OS_KSADM_roles_1(self, **kw):
body = {"role":
{"name": "new-role",
"id": "1"}
}
return (200, body)
def delete_OS_KSADM_roles_1(self, **kw):
body = {}
return (200, body)
def post_OS_KSADM_services(self, **kw):
body = {"OS-KSADM:service":
{"id": "1",
"type": "compute",
"name": "service1",
"description": None}
}
return (200, body)
def get_OS_KSADM_services_1(self, **kw):
body = {"OS-KSADM:service":
{"description": None,
"type": "compute",
"id": "1",
"name": "service1"}
}
return (200, body)
def get_OS_KSADM_services(self, **kw):
body = {
"OS-KSADM:services": [
{"description": None,
"type": "compute",
"id": "1",
"name": "service1"},
{"description": None,
"type": "identity",
"id": "2",
"name": "service2"}]
}
return (200, body)
def delete_OS_KSADM_services_1(self, **kw):
body = {}
return (200, body)
def post_users_1_credentials_OS_EC2(self, **kw):
body = {"credential":
{"access": "1",
"tenant_id": "1",
"secret": "1",
"user_id": "1"}
}
return (200, body)
def get_users_1_credentials_OS_EC2(self, **kw):
body = {"credentials": [
{"access": "1",
"tenant_id": "1",
"secret": "1",
"user_id": "1"}]
}
return (200, body)
def get_users_1_credentials_OS_EC2_2(self, **kw):
body = {
"credential":
{"access": "2",
"tenant_id": "1",
"secret": "1",
"user_id": "1"}
}
return (200, body)
def delete_users_1_credentials_OS_EC2_2(self, **kw):
body = {}
return (200, body)
def patch_OS_KSCRUD_users_1(self, **kw):
body = {}
return (200, body)
def get_endpoints(self, **kw):
body = {
'endpoints': [
{'adminURL': 'http://cdn.admin-nets.local/v1.1/1',
'region': 'RegionOne',
'internalURL': 'http://127.0.0.1:7777/v1.1/1',
'publicURL': 'http://cdn.publicinternets.com/v1.1/1'}],
'type': 'compute',
'name': 'nova-compute'
}
return (200, body)
def post_endpoints(self, **kw):
body = {
"endpoint":
{"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"publicURL": "http://swift.publicinternets.com/v1/AUTH_1"},
"type": "compute",
"name": "nova-compute"
}
return (200, body)

325
tests/v2_0/test_shell.py Normal file
View File

@@ -0,0 +1,325 @@
import cStringIO
import mock
import os
import stubout
import sys
from testtools import matchers
from keystoneclient import exceptions
from keystoneclient import client
from keystoneclient.v2_0 import client as client_v2_0
from tests.v2_0 import fakes
from tests import utils
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/'
class ShellTests(utils.TestCase):
def setUp(self):
"""Patch os.environ to avoid required auth info."""
super(ShellTests, self).setUp()
self.stubs = stubout.StubOutForTesting()
self.fake_client = fakes.FakeHTTPClient()
self.stubs.Set(
client.HTTPClient, "_cs_request",
lambda ign_self, *args, **kwargs:
self.fake_client._cs_request(*args, **kwargs))
self.stubs.Set(
client.HTTPClient, "authenticate",
lambda cl_obj:
self.fake_client.authenticate(cl_obj))
self.old_environment = os.environ.copy()
os.environ = {
'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_ID': DEFAULT_TENANT_ID,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_AUTH_URL,
}
import keystoneclient.shell
self.shell = keystoneclient.shell.OpenStackIdentityShell()
def tearDown(self):
self.stubs.UnsetAll()
self.stubs.SmartUnsetAll()
os.environ = self.old_environment
self.fake_client.clear_callstack()
super(ShellTests, self).tearDown()
def run_command(self, cmd):
orig = sys.stdout
try:
sys.stdout = cStringIO.StringIO()
self.shell.main(cmd.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(exc_value.code, 0)
finally:
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
return out
def assert_called(self, method, url, body=None, **kwargs):
return self.fake_client.assert_called(method, url, body, **kwargs)
def assert_called_anytime(self, method, url, body=None):
return self.fake_client.assert_called_anytime(method, url, body)
def test_user_list(self):
self.run_command('user-list')
self.fake_client.assert_called_anytime('GET', '/users')
def test_user_create(self):
self.run_command('user-create --name new-user')
self.fake_client.assert_called_anytime(
'POST', '/users',
{'user':
{'email': None,
'password': None,
'enabled': True,
'name': 'new-user',
'tenantId': None}})
def test_user_get(self):
self.run_command('user-get 1')
self.fake_client.assert_called_anytime('GET', '/users/1')
def test_user_delete(self):
self.run_command('user-delete 1')
self.fake_client.assert_called_anytime('DELETE', '/users/1')
def test_user_password_update(self):
self.run_command('user-password-update --pass newpass 1')
self.fake_client.assert_called_anytime(
'PUT', '/users/1/OS-KSADM/password')
def test_user_update(self):
self.run_command('user-update --name new-user1'
' --email user@email.com --enabled true 1')
self.fake_client.assert_called_anytime(
'PUT', '/users/1',
{'user':
{'id': '1',
'email': 'user@email.com',
'enabled': True,
'name': 'new-user1'}
})
required = 'User not updated, no arguments present.'
out = self.run_command('user-update 1')
self.assertThat(out, matchers.MatchesRegex(required))
def test_role_create(self):
self.run_command('role-create --name new-role')
self.fake_client.assert_called_anytime(
'POST', '/OS-KSADM/roles',
{"role": {"name": "new-role"}})
def test_role_get(self):
self.run_command('role-get 1')
self.fake_client.assert_called_anytime('GET', '/OS-KSADM/roles/1')
def test_role_list(self):
self.run_command('role-list')
self.fake_client.assert_called_anytime('GET', '/OS-KSADM/roles')
def test_role_delete(self):
self.run_command('role-delete 1')
self.fake_client.assert_called_anytime('DELETE', '/OS-KSADM/roles/1')
def test_user_role_add(self):
self.run_command('user-role-add --user_id 1 --role_id 1')
self.fake_client.assert_called_anytime(
'PUT', '/users/1/roles/OS-KSADM/1')
def test_user_role_list(self):
self.run_command('user-role-list --user_id 1 --tenant-id 1')
self.fake_client.assert_called_anytime(
'GET', '/tenants/1/users/1/roles')
self.run_command('user-role-list --user_id 1')
self.fake_client.assert_called_anytime(
'GET', '/tenants/1/users/1/roles')
self.run_command('user-role-list')
self.fake_client.assert_called_anytime(
'GET', '/tenants/1/users/1/roles')
def test_user_role_remove(self):
self.run_command('user-role-remove --user_id 1 --role_id 1')
self.fake_client.assert_called_anytime(
'DELETE', '/users/1/roles/OS-KSADM/1')
def test_tenant_create(self):
self.run_command('tenant-create --name new-tenant')
self.fake_client.assert_called_anytime(
'POST', '/tenants',
{"tenant": {"enabled": True,
"name": "new-tenant",
"description": None}})
def test_tenant_get(self):
self.run_command('tenant-get 2')
self.fake_client.assert_called_anytime('GET', '/tenants/2')
def test_tenant_list(self):
self.run_command('tenant-list')
self.fake_client.assert_called_anytime('GET', '/tenants')
def test_tenant_update(self):
self.run_command('tenant-update'
' --name new-tenant1 --enabled false'
' --description desc 2')
self.fake_client.assert_called_anytime(
'POST', '/tenants/2',
{"tenant":
{"enabled": False,
"id": "2",
"description": "desc",
"name": "new-tenant1"}})
required = 'Tenant not updated, no arguments present.'
out = self.run_command('tenant-update 1')
self.assertThat(out, matchers.MatchesRegex(required))
def test_tenant_delete(self):
self.run_command('tenant-delete 2')
self.fake_client.assert_called_anytime('DELETE', '/tenants/2')
def test_service_create(self):
self.run_command('service-create --name service1 --type compute')
self.fake_client.assert_called_anytime(
'POST', '/OS-KSADM/services',
{"OS-KSADM:service":
{"type": "compute",
"name": "service1",
"description": None}})
def test_service_get(self):
self.run_command('service-get 1')
self.fake_client.assert_called_anytime('GET', '/OS-KSADM/services/1')
def test_service_list(self):
self.run_command('service-list')
self.fake_client.assert_called_anytime('GET', '/OS-KSADM/services')
def test_service_delete(self):
self.run_command('service-delete 1')
self.fake_client.assert_called_anytime(
'DELETE', '/OS-KSADM/services/1')
def test_catalog(self):
self.run_command('catalog')
self.run_command('catalog --service compute')
def test_ec2_credentials_create(self):
self.run_command('ec2-credentials-create'
' --tenant-id 1 --user-id 1')
self.fake_client.assert_called_anytime(
'POST', '/users/1/credentials/OS-EC2',
{'tenant_id': '1'})
self.run_command('ec2-credentials-create --tenant-id 1')
self.fake_client.assert_called_anytime(
'POST', '/users/1/credentials/OS-EC2',
{'tenant_id': '1'})
self.run_command('ec2-credentials-create')
self.fake_client.assert_called_anytime(
'POST', '/users/1/credentials/OS-EC2',
{'tenant_id': '1'})
def test_ec2_credentials_delete(self):
self.run_command('ec2-credentials-delete --access 2 --user-id 1')
self.fake_client.assert_called_anytime(
'DELETE', '/users/1/credentials/OS-EC2/2')
self.run_command('ec2-credentials-delete --access 2')
self.fake_client.assert_called_anytime(
'DELETE', '/users/1/credentials/OS-EC2/2')
def test_ec2_credentials_list(self):
self.run_command('ec2-credentials-list --user-id 1')
self.fake_client.assert_called_anytime(
'GET', '/users/1/credentials/OS-EC2')
self.run_command('ec2-credentials-list')
self.fake_client.assert_called_anytime(
'GET', '/users/1/credentials/OS-EC2')
def test_ec2_credentials_get(self):
self.run_command('ec2-credentials-get --access 2 --user-id 1')
self.fake_client.assert_called_anytime(
'GET', '/users/1/credentials/OS-EC2/2')
def test_bootstrap(self):
self.run_command('bootstrap --user-name new-user'
' --pass 1 --role-name admin'
' --tenant-name new-tenant')
self.fake_client.assert_called_anytime(
'POST', '/users',
{'user':
{'email': None,
'password': '1',
'enabled': True,
'name': 'new-user',
'tenantId': None}})
self.run_command('bootstrap --user-name new-user'
' --pass 1 --role-name admin'
' --tenant-name new-tenant')
self.fake_client.assert_called_anytime(
'POST', '/tenants',
{"tenant": {"enabled": True,
"name": "new-tenant",
"description": None}})
self.run_command('bootstrap --user-name new-user'
' --pass 1 --role-name new-role'
' --tenant-name new-tenant')
self.fake_client.assert_called_anytime(
'POST', '/OS-KSADM/roles',
{"role": {"name": "new-role"}})
self.run_command('bootstrap --user-name'
' new-user --pass 1 --role-name admin'
' --tenant-name new-tenant')
self.fake_client.assert_called_anytime(
'PUT', '/tenants/1/users/1/roles/OS-KSADM/1')
def test_bash_completion(self):
self.run_command('bash-completion')
def test_help(self):
out = self.run_command('help')
required = 'usage: keystone'
self.assertThat(out, matchers.MatchesRegex(required))
def test_password_update(self):
self.run_command('password-update --current-password oldpass'
' --new-password newpass')
self.fake_client.assert_called_anytime(
'PATCH', '/OS-KSCRUD/users/1',
{'user':
{'original_password': 'oldpass',
'password': 'newpass'}})
def test_endpoint_create(self):
self.run_command('endpoint-create --service-id 1')
self.fake_client.assert_called_anytime(
'POST', '/endpoints',
{'endpoint':
{'adminurl': None,
'service_id': '1',
'region': 'regionOne',
'internalurl': None,
'publicurl': None}})
def test_endpoint_list(self):
self.run_command('endpoint-list')
self.fake_client.assert_called_anytime('GET', '/endpoints')