apiclient support for keystone v2.0 and v3

The apiclient now supports authentication using keystone versions 2.0 and 3

Authentication methods can be password or token.

Implements blueprint apiclient-keystone-v3-support

Change-Id: Ie09886e8f07aeb0a54fdf6f802bdd7fbf33aa253
This commit is contained in:
Fabrizio Vanni 2015-07-23 19:39:04 +01:00
parent 7cb039cc6c
commit 1649deff31
8 changed files with 387 additions and 341 deletions

View File

@ -17,11 +17,17 @@ This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).
========================================================================
client interface to the Freezer API
"""
import argparse
import os
import socket
from openstackclient.identity import client as os_client
from keystoneclient.auth.identity import v2
from keystoneclient.auth.identity import v3
from keystoneclient import session as ksc_session
from backups import BackupsManager
from registration import RegistrationManager
@ -29,7 +35,16 @@ from jobs import JobManager
from actions import ActionManager
from sessions import SessionManager
import exceptions
FREEZER_SERVICE_TYPE = 'backup'
def env(*vars, **kwargs):
for v in vars:
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
class cached_property(object):
@ -45,24 +60,145 @@ class cached_property(object):
return value
class Client(object):
def build_os_option_parser(parser):
parser.add_argument(
'--os-username', action='store',
help=('Name used for authentication with the OpenStack '
'Identity service. Defaults to env[OS_USERNAME].'),
dest='os_username', default=env('OS_USERNAME'))
parser.add_argument(
'--os-password', action='store',
help=('Password used for authentication with the OpenStack '
'Identity service. Defaults to env[OS_PASSWORD].'),
dest='os_password', default=env('OS_PASSWORD'))
parser.add_argument(
'--os-project-name', action='store',
help=('Project name to scope to. Defaults to '
'env[OS_PROJECT_NAME].'),
dest='os_project_name',
default=env('OS_PROJECT_NAME', default='default'))
parser.add_argument(
'--os-project-domain-name', action='store',
help=('Domain name containing project. Defaults to '
'env[OS_PROJECT_DOMAIN_NAME].'),
dest='os_project_domain_name', default=env('OS_PROJECT_DOMAIN_NAME',
default='default'))
parser.add_argument(
'--os-user-domain-name', action='store',
help=('User\'s domain name. Defaults to '
'env[OS_USER_DOMAIN_NAME].'),
dest='os_user_domain_name', default=env('OS_USER_DOMAIN_NAME',
default='default'))
parser.add_argument(
'--os-tenant-name', action='store',
help=('Tenant to request authorization on. Defaults to '
'env[OS_TENANT_NAME].'),
dest='os_tenant_name', default=env('OS_TENANT_NAME'))
parser.add_argument(
'--os-tenant-id', action='store',
help=('Tenant to request authorization on. Defaults to '
'env[OS_TENANT_ID].'),
dest='os_tenant_id', default=env('OS_TENANT_ID'))
parser.add_argument(
'--os-auth-url', action='store',
help=('Specify the Identity endpoint to use for '
'authentication. Defaults to env[OS_AUTH_URL].'),
dest='os_auth_url', default=env('OS_AUTH_URL'))
parser.add_argument(
'--os-backup-url', action='store',
help=('Specify the Freezer backup service endpoint to use. '
'Defaults to env[OS_BACKUP_URL].'),
dest='os_backup_url', default=env('OS_BACKUP_URL'))
parser.add_argument(
'--os-region-name', action='store',
help=('Specify the region to use. Defaults to '
'env[OS_REGION_NAME].'),
dest='os_region_name', default=env('OS_REGION_NAME'))
parser.add_argument(
'--os-token', action='store',
help=('Specify an existing token to use instead of retrieving'
' one via authentication (e.g. with username & password). '
'Defaults to env[OS_TOKEN].'),
dest='os_token', default=env('OS_TOKEN'))
parser.add_argument(
'--os-identity-api-version', action='store',
help=('Identity API version: 2.0 or 3. '
'Defaults to env[OS_IDENTITY_API_VERSION]'),
dest='os_identity_api_version',
default=env('OS_IDENTITY_API_VERSION'))
return parser
def __init__(self, version='1',
def guess_auth_version(opts):
if opts.os_identity_api_version == '3':
return '3'
elif opts.os_identity_api_version == '2.0':
return '2.0'
elif opts.os_auth_url.endswith('v3'):
return '3'
elif opts.os_auth_url.endswith('v2.0'):
return '2.0'
return None
def get_auth_plugin(opts):
auth_version = guess_auth_version(opts)
if opts.os_username:
if auth_version == '3':
return v3.Password(auth_url=opts.os_auth_url,
username=opts.os_username,
password=opts.os_password,
project_name=opts.os_project_name,
user_domain_name=opts.os_user_domain_name,
project_domain_name=opts.os_project_domain_name)
elif auth_version == '2.0':
return v2.Password(auth_url=opts.os_auth_url,
username=opts.os_username,
password=opts.os_password,
tenant_name=opts.os_tenant_name)
elif opts.os_token:
if auth_version == '3':
return v3.Token(auth_url=opts.os_auth_url,
token=opts.os_token,
project_name=opts.os_project_name,
project_domain_name=opts.os_project_domain_name)
elif auth_version == '2.0':
return v2.Token(auth_url=opts.os_auth_url,
token=opts.os_token,
tenant_name=opts.os_tenant_name)
raise Exception('Unable to determine correct auth method')
class Client(object):
def __init__(self,
version='1',
token=None,
username=None,
password=None,
tenant_name=None,
auth_url=None,
session=None,
endpoint=None):
endpoint=None,
opts=None):
self.opts = opts or build_os_option_parser(
argparse.ArgumentParser(description='Freezer Client')
).parse_args()
if token:
self.opts.os_token = token
if username:
self.opts.os_username = username
if password:
self.opts.os_password = password
if tenant_name:
self.opts.os_tenant_name = tenant_name
if auth_url:
self.opts.os_auth_url = auth_url
if endpoint:
self.opts.os_backup_url = endpoint
self._session = session
self.version = version
self.token = token
self.username = username
self.tenant_name = tenant_name
self.password = password
self.auth_url = auth_url
self._endpoint = endpoint
self.session = session
self.backups = BackupsManager(self)
self.registration = RegistrationManager(self)
self.jobs = JobManager(self)
@ -70,52 +206,29 @@ class Client(object):
self.sessions = SessionManager(self)
@cached_property
def endpoint(self):
if self._endpoint:
return self._endpoint
services = self.auth.services.list()
try:
freezer_service = next(x for x in services if x.name == 'freezer')
except:
raise exceptions.ApiClientException(
'freezer service not found in services list')
endpoints = self.auth.endpoints.list()
try:
freezer_endpoint =\
next(x for x in endpoints
if x.service_id == freezer_service.id)
except:
raise exceptions.ApiClientException(
'freezer endpoint not found in endpoint list')
return freezer_endpoint.publicurl
def session(self):
if self._session:
return self._session
auth_plugin = get_auth_plugin(self.opts)
return ksc_session.Session(auth=auth_plugin)
@cached_property
def auth(self):
if self.username and self.password:
_auth = os_client.IdentityClientv2(
auth_url=self.auth_url,
username=self.username,
password=self.password,
tenant_name=self.tenant_name)
elif self.token:
_auth = os_client.IdentityClientv2(
endpoint=self.auth_url,
token=self.token)
def endpoint(self):
if self.opts.os_backup_url:
return self.opts.os_backup_url
else:
raise exceptions.ApiClientException("Missing auth credentials")
return _auth
auth_ref = self.session.auth.get_auth_ref(self.session)
endpoint = auth_ref.service_catalog.url_for(
service_type=FREEZER_SERVICE_TYPE,
endpoint_type='public',
)
return endpoint
@property
def auth_token(self):
return self.auth.auth_token
def api_exists(self):
try:
if self.endpoint is not None:
return True
except:
return False
return self.session.get_token()
@cached_property
def client_id(self):
return '{0}_{1}'.format(self.auth.project_id, socket.gethostname())
return '{0}_{1}'.format(self.session.get_project_id(),
socket.gethostname())

View File

@ -20,151 +20,82 @@ Hudson (tjh@cryptsoft.com).
"""
import argparse
from prettytable import PrettyTable
import os
SCHEDULER_CONF_D = '/etc/freezer/scheduler/conf.d'
from freezer.apiclient import client as api_client
DEFAULT_FREEZER_SCHEDULER_CONF_D = '/etc/freezer/scheduler/conf.d'
class OpenstackOptions(object):
def __init__(self, args, default_dict={}):
self.username = args.os_username or\
default_dict.get('OS_USERNAME', None)
self.tenant_name = args.os_tenant_name or\
default_dict.get('OS_TENANT_NAME', None)
self.auth_url = args.os_auth_url or\
default_dict.get('OS_AUTH_URL', None)
self.password = args.os_password or\
default_dict.get('OS_PASSWORD', None)
self.tenant_id = args.os_tenant_id or\
default_dict.get('OS_TENANT_ID', None)
self.region_name = args.os_region_name or\
default_dict.get('OS_REGION_NAME', None)
self.endpoint = args.os_endpoint or\
default_dict.get('OS_SERVICE_ENDPOINT', None)
if not self.is_valid():
raise Exception('ERROR: OS Options not valid: {0}'.
format(self.reason()))
def __str__(self):
table = PrettyTable(["variable", "value"])
table.add_row(['username', self.username])
table.add_row(['tenant_name', self.tenant_name])
table.add_row(['auth_url', self.auth_url])
table.add_row(['password', self.password])
table.add_row(['tenant_id', self.tenant_id])
table.add_row(['region_name', self.region_name])
table.add_row(['endpoint', self.endpoint])
return table.__str__()
def is_valid(self):
if self.reason():
return False
return True
def reason(self):
missing = []
for attr in ['username', 'password', 'tenant_name', 'region_name']:
if not self.__getattribute__(attr):
missing.append(attr)
if missing:
return 'missing {0}'.format(', '.join(missing))
return ''
def get_args(choices):
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
'action', action='store', default=None, choices=choices, help='')
arg_parser.add_argument(
'--debug', action='store_true',
help='Prints debugging output onto the console, this may include '
'OS environment variables, request and response calls. '
'Helpful for debugging and understanding the API calls.',
dest='debug', default=False)
arg_parser.add_argument(
def base_parser(parser):
scheduler_conf_d = os.environ.get('FREEZER_SCHEDULER_CONF_D',
DEFAULT_FREEZER_SCHEDULER_CONF_D)
parser.add_argument(
'-j', '--job', action='store',
help=('name or ID of the job'),
dest='job_id', default=None)
arg_parser.add_argument(
parser.add_argument(
'-s', '--session', action='store',
help=('name or ID of the session'),
dest='session_id', default=None)
arg_parser.add_argument(
parser.add_argument(
'--file', action='store',
help=('Local file that contains the resource '
'to be uploaded/downloaded'),
dest='fname', default=None)
arg_parser.add_argument(
'--os-endpoint', action='store',
help=('Specify an endpoint to use instead of retrieving '
'one from the service catalog (via authentication). '
'Defaults to env[OS_SERVICE_ENDPOINT]'),
dest='os_endpoint', default=None)
arg_parser.add_argument(
'--os-username', action='store',
help=('Name used for authentication with the OpenStack '
'Identity service. Defaults to env[OS_USERNAME].'),
dest='os_username', default=None)
arg_parser.add_argument(
'--os-password', action='store',
help=('Password used for authentication with the OpenStack '
'Identity service. Defaults to env[OS_PASSWORD].'),
dest='os_password', default=None)
arg_parser.add_argument(
'--os-tenant-name', action='store',
help=('Tenant to request authorization on. Defaults to '
'env[OS_TENANT_NAME].'),
dest='os_tenant_name', default=None)
arg_parser.add_argument(
'--os-tenant-id', action='store',
help=('Tenant to request authorization on. Defaults to '
'env[OS_TENANT_ID].'),
dest='os_tenant_id', default=None)
arg_parser.add_argument(
'--os-auth-url', action='store',
help=('Specify the Identity endpoint to use for '
'authentication. Defaults to env[OS_AUTH_URL].'),
dest='os_auth_url', default=None)
arg_parser.add_argument(
'--os-region-name', action='store',
help=('Specify the region to use. Defaults to '
'env[OS_REGION_NAME].'),
dest='os_region_name', default=None)
arg_parser.add_argument(
'--os-token', action='store',
help=('Specify an existing token to use instead of retrieving'
' one via authentication (e.g. with username & password). '
'Defaults to env[OS_SERVICE_TOKEN].'),
dest='os_token', default=None)
arg_parser.add_argument(
parser.add_argument(
'-c', '--client-id', action='store',
help=('Specifies the client_id used when contacting the service.'
'If not specified it will be automatically created'
'using the tenant-id and the machine hostname.'),
dest='client_id', default=None)
arg_parser.add_argument(
parser.add_argument(
'-n', '--no-api', action='store_true',
help='Prevents the scheduler from using the api service',
dest='no_api', default=False)
arg_parser.add_argument(
parser.add_argument(
'-a', '--active-only', action='store_true',
help='Filter only active jobs/session',
dest='active_only', default=False)
arg_parser.add_argument(
parser.add_argument(
'-f', '--conf', action='store',
help=('Used to store/retrieve files on local storage, including '
'those exchanged with the api service. '
'Default value is {0}'.format(SCHEDULER_CONF_D)),
dest='jobs_dir', default=SCHEDULER_CONF_D)
arg_parser.add_argument(
'Default value is {0} '
'(Env: FREEZER_SCHEDULER_CONF_D)'.format(scheduler_conf_d)),
dest='jobs_dir', default=scheduler_conf_d)
parser.add_argument(
'-i', '--interval', action='store',
help=('Specifies the api-polling interval in seconds.'
'Defaults to 60 seconds'),
dest='interval', default=60)
arg_parser.add_argument(
parser.add_argument(
'-v', '--verbose',
action='count',
dest='verbose_level',
default=1,
help='Increase verbosity of output. Can be repeated.',
)
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='show tracebacks on errors',
)
parser.add_argument(
'-l', '--log-file', action='store',
help=('location of log file'),
dest='log_file', default=None)
return arg_parser.parse_args()
return parser
def get_args(choices):
parser = base_parser(
api_client.build_os_option_parser(
argparse.ArgumentParser(description='Freezer Scheduler')
))
parser.add_argument(
'action', action='store', default=None, choices=choices, help='')
return parser.parse_args()

View File

@ -22,11 +22,13 @@ Hudson (tjh@cryptsoft.com).
import logging
import os
import signal
import traceback
from tempfile import gettempdir
from time import sleep
from pep3143daemon import DaemonContext, PidFile
from pep3143daemon import DaemonContext, PidFile
from freezer.utils import create_dir
@ -122,7 +124,7 @@ class Daemon:
def no_api(self):
return False
def start(self, log_file=None):
def start(self, log_file=None, dump_stack_trace=False):
pidfile = PidFile(self.pid_fname)
with DaemonContext(pidfile=pidfile, signal_map=self.signal_map):
self.setup_logging(log_file)
@ -133,6 +135,8 @@ class Daemon:
self.daemonizable.start()
Daemon.exit_flag = True
except Exception as e:
if dump_stack_trace:
logging.error(traceback.format_exc(e))
logging.error('[*] Restarting daemonized procedure '
'after Fatal Error: {0}'.format(e))
sleep(10)

View File

@ -33,7 +33,6 @@ from freezer.apiclient import client
import arguments
import shell
import utils
from freezer.utils import create_dir
from daemon import Daemon
from scheduler_job import Job
@ -181,18 +180,12 @@ def main():
if args.action is None:
print "No action"
sys.exit(1)
return os.EX_DATAERR
apiclient = None
if args.no_api is False:
os_options = arguments.OpenstackOptions(args, os.environ)
if args.debug:
print os_options
apiclient = client.Client(username=os_options.username,
password=os_options.password,
tenant_name=os_options.tenant_name,
endpoint=os_options.endpoint,
auth_url=os_options.auth_url)
apiclient = client.Client(opts=args)
if args.client_id:
apiclient.client_id = args.client_id
@ -201,9 +194,7 @@ def main():
return doers[args.action](apiclient, args)
except Exception as e:
print ('ERROR {0}'.format(e))
return 1
create_dir(args.jobs_dir, do_log=False)
return os.EX_SOFTWARE
freezer_scheduler = FreezerScheduler(apiclient=apiclient,
interval=int(args.interval),

View File

@ -3,7 +3,6 @@ python-keystoneclient>=1.2.0,<1.4.0
python-cinderclient>=1.1.0,<1.2.0
python-glanceclient>=0.15.0,<0.18.0
python-novaclient>=2.22.0,<2.24.0
python-openstackclient>=1.0.3,<1.1.0
oslo.utils>=1.4.0,<1.5.0 # Apache-2.0
paramiko>=1.13.0

View File

@ -23,108 +23,157 @@ import unittest
from mock import Mock, patch
from freezer.apiclient import client
from freezer.apiclient import exceptions
class TestSupportFunctions(unittest.TestCase):
@patch('freezer.apiclient.client.os')
def test_env_return_env_var(self, mock_os):
mock_os.environ = {'TEST_ENV_VAR': 'qwerty'}
var = client.env('TEST_ENV_VAR')
self.assertEquals(var, 'qwerty')
@patch('freezer.apiclient.client.os')
def test_env_return_default(self, mock_os):
mock_os.environ = {}
var = client.env('TEST_ENV_VAR')
self.assertEquals(var, '')
@patch('freezer.apiclient.client.env')
def test_build_os_option_parser(self, mock_env):
mock_env.return_value = ''
mock_parser = Mock()
mock_parser._me = 'test12345'
retval = client.build_os_option_parser(mock_parser)
self.assertEquals(retval._me, 'test12345')
call_count = mock_parser.add_argument.call_count
self.assertGreater(call_count, 10)
def test_guess_auth_version_returns_none(self):
mock_opts = Mock()
mock_opts.os_identity_api_version = ''
mock_opts.os_auth_url = ''
self.assertIsNone(client.guess_auth_version(mock_opts))
def test_guess_auth_version_explicit_3(self):
mock_opts = Mock()
mock_opts.os_identity_api_version = '3'
self.assertEquals(client.guess_auth_version(mock_opts), '3')
def test_guess_auth_version_explicit_2(self):
mock_opts = Mock()
mock_opts.os_identity_api_version = '2.0'
self.assertEquals(client.guess_auth_version(mock_opts), '2.0')
def test_guess_auth_version_implicit_3(self):
mock_opts = Mock()
mock_opts.os_auth_url = 'http://whatever/v3'
self.assertEquals(client.guess_auth_version(mock_opts), '3')
def test_guess_auth_version_implicit_2(self):
mock_opts = Mock()
mock_opts.os_auth_url = 'http://whatever/v2.0'
self.assertEquals(client.guess_auth_version(mock_opts), '2.0')
@patch('freezer.apiclient.client.v3')
@patch('freezer.apiclient.client.v2')
def test_get_auth_plugin_v3_Password(self, mock_v2, mock_v3):
mock_opts = Mock()
mock_opts.os_identity_api_version = '3'
mock_opts.os_user_name = 'myuser'
mock_opts.os_token = ''
client.get_auth_plugin(mock_opts)
self.assertTrue(mock_v3.Password.called)
@patch('freezer.apiclient.client.v3')
@patch('freezer.apiclient.client.v2')
def test_get_auth_plugin_v3_Token(self, mock_v2, mock_v3):
mock_opts = Mock()
mock_opts.os_identity_api_version = '3'
mock_opts.os_username = ''
mock_opts.os_token = 'mytoken'
client.get_auth_plugin(mock_opts)
self.assertTrue(mock_v3.Token.called)
@patch('freezer.apiclient.client.v3')
@patch('freezer.apiclient.client.v2')
def test_get_auth_plugin_v2_Password(self, mock_v2, mock_v3):
mock_opts = Mock()
mock_opts.os_identity_api_version = '2.0'
mock_opts.os_user_name = 'myuser'
mock_opts.os_token = ''
client.get_auth_plugin(mock_opts)
self.assertTrue(mock_v2.Password.called)
@patch('freezer.apiclient.client.v3')
@patch('freezer.apiclient.client.v2')
def test_get_auth_plugin_v2_Token(self, mock_v2, mock_v3):
mock_opts = Mock()
mock_opts.os_identity_api_version = '2.0'
mock_opts.os_username = ''
mock_opts.os_token = 'mytoken'
client.get_auth_plugin(mock_opts)
self.assertTrue(mock_v2.Token.called)
@patch('freezer.apiclient.client.v3')
@patch('freezer.apiclient.client.v2')
def test_get_auth_plugin_raises_when_no_username_token(self, mock_v2, mock_v3):
mock_opts = Mock()
mock_opts.os_identity_api_version = '2.0'
mock_opts.os_username = ''
mock_opts.os_token = ''
self.assertRaises(Exception, client.get_auth_plugin, mock_opts)
class TestClientMock(unittest.TestCase):
def create_mock_endpoint(self, service_id):
m = Mock()
m.service_id = service_id
m.publicurl = 'http://frezerapiurl:9090'
return m
def create_mock_service(self, name, id):
m = Mock()
m.name = name
m.id = id
return m
def setUp(self):
mock_enpointlist_ok = [self.create_mock_endpoint('idqwerty'),
self.create_mock_endpoint('idfreak'),
self.create_mock_endpoint('blabla')]
mock_servicelist_ok = [self.create_mock_service(name='glance', id='idqwerty'),
self.create_mock_service(name='freezer', id='idfreak')]
self.mock_IdentityClientv2 = Mock()
self.mock_IdentityClientv2.endpoints.list.return_value = mock_enpointlist_ok
self.mock_IdentityClientv2.services.list.return_value = mock_servicelist_ok
self.mock_IdentityClientv2.project_id = 'project_mayhem'
@patch('freezer.apiclient.client.os_client')
def test_client_create_username(self, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(username='myname',
password='mypasswd',
tenant_name='mytenant',
auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.ksc_session')
@patch('freezer.apiclient.client.get_auth_plugin')
def test_client_new(self, mock_get_auth_plugin, mock_ksc_session):
c = client.Client(opts=Mock(), endpoint='blabla')
self.assertIsInstance(c, client.Client)
self.assertEqual(c.endpoint, 'http://frezerapiurl:9090')
@patch('freezer.apiclient.client.os_client')
def test_client_create_token(self, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(token='mytoken',
auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.ksc_session')
@patch('freezer.apiclient.client.get_auth_plugin')
def test_client_new_with_kwargs(self, mock_get_auth_plugin, mock_ksc_session):
kwargs = {'token': 'alpha',
'username': 'bravo',
'password': 'charlie',
'tenant_name': 'delta',
'auth_url': 'echo',
'session': 'foxtrot',
'endpoint': 'golf',
'version': 'hotel',
'opts': Mock()}
c = client.Client(**kwargs)
self.assertIsInstance(c, client.Client)
self.assertEqual(c.endpoint, 'http://frezerapiurl:9090')
self.assertEqual(c.opts.os_token, 'alpha')
self.assertEqual(c.opts.os_username, 'bravo')
self.assertEqual(c.opts.os_password, 'charlie')
self.assertEqual(c.opts.os_tenant_name, 'delta')
self.assertEqual(c.opts.os_auth_url, 'echo')
self.assertEqual(c._session, 'foxtrot')
self.assertEqual(c.session, 'foxtrot')
self.assertEqual(c.endpoint, 'golf')
self.assertEqual(c.version, 'hotel')
@patch('freezer.apiclient.client.os_client')
def test_client_create_assigns_endpoint(self, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(username='myname',
password='mypasswd',
tenant_name='mytenant',
endpoint='http://caccadura:9999',
auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.ksc_session')
@patch('freezer.apiclient.client.get_auth_plugin')
def test_get_token(self, mock_get_auth_plugin, mock_ksc_session):
mock_session = Mock()
mock_session.get_token.return_value = 'antaniX2'
c = client.Client(session=mock_session, endpoint='justtest', opts=Mock())
self.assertIsInstance(c, client.Client)
self.assertEqual(c.endpoint, 'http://caccadura:9999')
self.assertEquals(c.auth_token, 'antaniX2')
@patch('freezer.apiclient.client.os_client')
@patch('freezer.apiclient.client.socket')
def test_client_correctly_creates_client_id(self, mock_socket, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(username='myname',
password='mypasswd',
tenant_name='mytenant',
endpoint='http://caccadura:9999',
auth_url='http://whatever:35357/v2.0/')
mock_socket.gethostname.return_value = 'tyler'
self.assertEqual(c.client_id, 'project_mayhem_tyler')
@patch('freezer.apiclient.client.os_client')
def test_client_error_no_credentials(self, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
self.assertRaises(exceptions.ApiClientException, client.Client, auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.os_client')
def test_client_service_not_found(self, mock_os_client):
mock_servicelist_bad = [self.create_mock_service(name='glance', id='idqwerty'),
self.create_mock_service(name='spanishinquisition', id='idfreak')]
self.mock_IdentityClientv2.services.list.return_value = mock_servicelist_bad
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
self.assertRaises(exceptions.ApiClientException, client.Client, token='mytoken', auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.os_client')
def test_client_endpoint_not_found(self, mock_os_client):
mock_enpointlist_bad = [self.create_mock_endpoint('idqwerty'),
self.create_mock_endpoint('idfiasco'),
self.create_mock_endpoint('blabla')]
self.mock_IdentityClientv2.endpoints.list.return_value = mock_enpointlist_bad
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
self.assertRaises(exceptions.ApiClientException, client.Client, token='mytoken', auth_url='http://whatever:35357/v2.0/')
@patch('freezer.apiclient.client.os_client')
def test_client_api_exists(self, mock_os_client):
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(token='mytoken',
auth_url='http://whatever:35357/v2.0/')
self.assertTrue(c.api_exists())
@patch('freezer.apiclient.client.os_client')
def test_client_auth_token(self, mock_os_client):
self.mock_IdentityClientv2.auth_token = 'stotoken'
mock_os_client.IdentityClientv2.return_value = self.mock_IdentityClientv2
c = client.Client(token='mytoken',
auth_url='http://whatever:35357/v2.0/')
self.assertEqual(c.auth_token, 'stotoken')
@patch('freezer.apiclient.client.ksc_session')
@patch('freezer.apiclient.client.get_auth_plugin')
def test_get_client_id(self, mock_get_auth_plugin, mock_ksc_session, mock_socket):
mock_socket.gethostname.return_value = 'parmenide'
mock_session = Mock()
mock_session.get_project_id.return_value = 'H2O'
c = client.Client(session=mock_session, endpoint='justtest', opts=Mock())
self.assertIsInstance(c, client.Client)
self.assertEquals(c.client_id, 'H2O_parmenide')

View File

@ -0,0 +1,10 @@
import unittest
from freezer.apiclient import exceptions
class TestApiClientException(unittest.TestCase):
def test_get_message_from_response_string(self):
e = exceptions.ApiClientException('some error message')
self.assertEquals(e.message, 'some error message')

View File

@ -12,73 +12,22 @@ from mock import Mock, patch
from freezer.scheduler import arguments
class TestOpenstackOptions(unittest.TestCase):
class TestBaseParser(unittest.TestCase):
def setUp(self):
self.args = Mock()
self.args.os_username = 'janedoe'
self.args.os_tenant_name = 'hertenant'
self.args.os_auth_url = 'herauthurl'
self.args.os_password = 'herpassword'
self.args.os_tenant_id = 'hertenantid'
self.args.os_region_name = 'herregion'
self.args.os_endpoint = 'herpublicurl'
self.empty_args = Mock()
self.empty_args.os_username = ''
self.empty_args.os_tenant_name = ''
self.empty_args.os_auth_url = ''
self.empty_args.os_password = ''
self.empty_args.os_tenant_id = ''
self.empty_args.os_region_name = ''
self.empty_args.os_endpoint = ''
def test_returns_parser(self):
mock_parser = Mock()
mock_parser._me = 'test12345'
retval = arguments.base_parser(mock_parser)
self.assertEquals(retval._me, 'test12345')
self.env_dict = {
'OS_USERNAME': 'johndoe',
'OS_TENANT_NAME': 'histenant',
'OS_AUTH_URL': 'hisauthurl',
'OS_PASSWORD': 'hispassword',
'OS_TENANT_ID': 'histenantid',
'OS_REGION_NAME': 'hisregion',
'OS_SERVICE_ENDPOINT': 'hispublicurl'
}
def test_create_with_args_and_env(self):
os = arguments.OpenstackOptions(self.args, self.env_dict)
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_with_empty_args_and_empty_env(self):
os = arguments.OpenstackOptions(self.empty_args, self.env_dict)
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_with_args_and_empty_env(self):
os = arguments.OpenstackOptions(self.args, {})
self.assertIsInstance(os, arguments.OpenstackOptions)
def test_create_raises_Exception_when_missing_username(self):
self.args.os_username = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_create_raises_Exception_when_missing_p(self):
self.args.os_password = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_create_raises_Exception_when_missing_parameter(self):
self.args.os_username = ''
self.assertRaises(Exception, arguments.OpenstackOptions, self.args, {})
def test_str(self):
os = arguments.OpenstackOptions(self.args, self.env_dict)
s = str(os)
self.assertIsInstance(s, str)
class TestGetArgs(unittest.TestCase):
@patch('freezer.scheduler.arguments.argparse.ArgumentParser')
def test_get_args_calls_add_argument(self, mock_ArgumentParser):
mock_arg_parser = Mock()
mock_ArgumentParser.return_value = mock_arg_parser
call_count = mock_parser.add_argument.call_count
self.assertGreater(call_count, 10)
@patch('freezer.scheduler.arguments.base_parser')
def test_get_args_return_parsed_args(self, mock_base_parser):
mock_parser = Mock()
mock_parser.parse_args.return_value = 'pluto'
mock_base_parser.return_value = mock_parser
retval = arguments.get_args(['alpha', 'bravo'])
call_count = mock_arg_parser.add_argument.call_count
self.assertGreater(call_count, 15)
call_count = mock_parser.add_argument.call_count
self.assertGreater(call_count, 0)