More complete testing of the os plugins/utils (#24)
The change implements several fixes so that monitor stack can be run in OpenStack environments supporting both V2 and V3 authentication. All of the plugins now have a proper name lookup and will pull from a local cache instead of hammering the API on every request. The Local caching functionlaity used to be tied to the python shelve module. Due to issues with Python 3.5 and shelves the library was replaced with the diskcache lib which implements a caching interface supporting both py2.7+. Tests have been added in support of all additions. Tests have been added for the os_utils module which should bring it within ~98% of completeness. Signed-off-by: Kevin Carter <kevin.carter@rackspace.com>
This commit is contained in:
parent
79d23cd713
commit
775f034a78
|
@ -44,6 +44,8 @@ nosetests.xml
|
|||
coverage.xml
|
||||
*,cover
|
||||
.hypothesis/
|
||||
AUTHORS
|
||||
ChangeLog
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
|
|
|
@ -4,32 +4,32 @@
|
|||
# override whatever is needed within the local sections.
|
||||
|
||||
[DEFAULT]
|
||||
insecure = false
|
||||
auth_url = https://example.com:5000/v3
|
||||
# The verify option is for SSL. If your SSL certificate is not
|
||||
# valid set this option to false else omit it or set it true.
|
||||
insecure = true
|
||||
|
||||
auth_url = https://127.0.0.1:5000/v3
|
||||
|
||||
username = admin
|
||||
password = Secrete
|
||||
|
||||
[keystone]
|
||||
# NOTE(cloudnull):
|
||||
# When using keystone V3 you will need the .*domain_name configuration options.
|
||||
user_domain_name = default # This is required when Keystone V3 is being used
|
||||
project_domain_name = default # This is required when Keystone V3 is being used
|
||||
|
||||
[glance]
|
||||
# NOTE(cloudnull):
|
||||
# If you're using keystone V2 you will need the tenant_name option.
|
||||
tenant_name = admin # This is required when Keystone V2 is being used
|
||||
project_name = admin # This is required when Keystone V2 is being used
|
||||
|
||||
# NEVER Mix and match the options tenant name and domain_name options.
|
||||
# NEVER Mix and match the options tenant name and domain_name options withiin the same section.
|
||||
# You are be required to run either V2 or V3 as it pertains to this config.
|
||||
# If you provide both tenant_name and .*domain_name options at the same time
|
||||
# the plugins will fail API version negotiation.
|
||||
|
||||
username = admin
|
||||
password = Secrete
|
||||
# The verify option is for SSL. If your SSL certificate is not
|
||||
# valid set this option to false else omit it or set it true.
|
||||
verify = false
|
||||
|
||||
[keystone]
|
||||
|
||||
[glance]
|
||||
|
||||
[nova]
|
||||
project_name = nova
|
||||
|
||||
|
@ -40,7 +40,7 @@ project_name = nova
|
|||
[cinder]
|
||||
|
||||
[ironic]
|
||||
auth_url = https://example2.com:5000/v3
|
||||
auth_url = https://127.0.1.1:5000/v3
|
||||
project_name = ironic
|
||||
user_domain_name = users
|
||||
project_domain_name = projects
|
||||
|
|
|
@ -18,14 +18,15 @@ import socket
|
|||
|
||||
import click
|
||||
|
||||
from monitorstack import utils
|
||||
from monitorstack.cli import pass_context
|
||||
|
||||
|
||||
DOC = """Get metrics from a KVM hypervisor."""
|
||||
COMMAND = 'kvm'
|
||||
COMMAND_NAME = 'kvm'
|
||||
|
||||
|
||||
@click.command(COMMAND, short_help=DOC.split('\n')[0])
|
||||
@click.command(COMMAND_NAME, short_help=DOC.split('\n')[0])
|
||||
@pass_context
|
||||
def cli(ctx):
|
||||
"""Get metrics from a KVM hypervisor."""
|
||||
|
@ -60,7 +61,10 @@ def cli(ctx):
|
|||
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = 'kvm failed -- Error: {}'.format(exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = 'kvm is ok'
|
||||
|
|
|
@ -53,7 +53,10 @@ def cli(ctx, config_file):
|
|||
variables[project.name] = int(limits['quota_set']['cores'])
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -53,7 +53,10 @@ def cli(ctx, config_file):
|
|||
variables[project.name] = int(limits['quota_set']['instances'])
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -53,7 +53,10 @@ def cli(ctx, config_file):
|
|||
variables[project.name] = int(limits['quota_set']['ram'])
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -51,13 +51,19 @@ def cli(ctx, config_file):
|
|||
variables = output['variables']
|
||||
for used in _ost.get_consumer_usage():
|
||||
flavor = flavors[used['flavor']['id']]
|
||||
used_collection[used['name']] += int(flavor['vcpus'])
|
||||
output['meta'][used['flavor']['id']] = True
|
||||
output['meta'][used['flavor']['name']] = True
|
||||
project_name = _ost.get_project_name(project_id=used['project_id'])
|
||||
used_collection[project_name] += int(flavor['vcpus'])
|
||||
flavor_id = used['flavor']['id']
|
||||
output['meta'][flavor_id] = True
|
||||
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
|
||||
output['meta'][flavor_name] = True
|
||||
variables.update(used_collection)
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -51,13 +51,19 @@ def cli(ctx, config_file):
|
|||
variables = output['variables']
|
||||
for used in _ost.get_consumer_usage():
|
||||
flavor = flavors[used['flavor']['id']]
|
||||
used_collection[used['name']] += int(flavor['disk'])
|
||||
output['meta'][used['flavor']['id']] = True
|
||||
output['meta'][used['flavor']['name']] = True
|
||||
project_name = _ost.get_project_name(project_id=used['project_id'])
|
||||
used_collection[project_name] += int(flavor['disk'])
|
||||
flavor_id = used['flavor']['id']
|
||||
output['meta'][flavor_id] = True
|
||||
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
|
||||
output['meta'][flavor_name] = True
|
||||
variables.update(used_collection)
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -49,11 +49,19 @@ def cli(ctx, config_file):
|
|||
try:
|
||||
variables = output['variables']
|
||||
for used in _ost.get_consumer_usage():
|
||||
used_collection[used['name']] += 1
|
||||
project_name = _ost.get_project_name(project_id=used['project_id'])
|
||||
used_collection[project_name] += 1
|
||||
flavor_id = used['flavor']['id']
|
||||
output['meta'][flavor_id] = True
|
||||
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
|
||||
output['meta'][flavor_name] = True
|
||||
variables.update(used_collection)
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -51,13 +51,19 @@ def cli(ctx, config_file):
|
|||
variables = output['variables']
|
||||
for used in _ost.get_consumer_usage():
|
||||
flavor = flavors[used['flavor']['id']]
|
||||
used_collection[used['name']] += int(flavor['ram'])
|
||||
output['meta'][used['flavor']['id']] = True
|
||||
output['meta'][used['flavor']['name']] = True
|
||||
project_name = _ost.get_project_name(project_id=used['project_id'])
|
||||
used_collection[project_name] += int(flavor['ram'])
|
||||
flavor_id = used['flavor']['id']
|
||||
output['meta'][flavor_id] = True
|
||||
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
|
||||
output['meta'][flavor_name] = True
|
||||
variables.update(used_collection)
|
||||
except Exception as exp:
|
||||
output['exit_code'] = 1
|
||||
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
|
||||
output['message'] = '{} failed -- {}'.format(
|
||||
COMMAND_NAME,
|
||||
utils.log_exception(exp=exp)
|
||||
)
|
||||
else:
|
||||
output['exit_code'] = 0
|
||||
output['message'] = '{} is ok'.format(COMMAND_NAME)
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
# limitations under the License.
|
||||
"""Common code for utils."""
|
||||
|
||||
import functools
|
||||
import os
|
||||
import shelve
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
|
||||
# Lower import to support conditional configuration parser
|
||||
try:
|
||||
if sys.version_info > (3, 2, 0):
|
||||
import configparser as ConfigParser
|
||||
|
@ -27,9 +27,50 @@ try:
|
|||
except ImportError:
|
||||
raise SystemExit('No configparser module was found.')
|
||||
|
||||
import diskcache
|
||||
|
||||
|
||||
def retry(ExceptionToCheck, tries=3, delay=1, backoff=1): # noqa
|
||||
"""Retry calling the decorated function using an exponential backoff.
|
||||
|
||||
Attributes to sources of inspiration:
|
||||
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
|
||||
http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
|
||||
|
||||
:param ExceptionToCheck: the exception to check. may be a tuple of
|
||||
exceptions to check
|
||||
:type ExceptionToCheck: Exception or tuple
|
||||
:param tries: number of times to try (not retry) before giving up
|
||||
:type tries: int
|
||||
:param delay: initial delay between retries in seconds
|
||||
:type delay: int
|
||||
:param backoff: backoff multiplier e.g. value of 2 will double the delay
|
||||
each retry
|
||||
:type backoff: int
|
||||
"""
|
||||
def deco_retry(f):
|
||||
@functools.wraps(f)
|
||||
def f_retry(*args, **kwargs):
|
||||
mtries, mdelay = tries, delay
|
||||
while mtries > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except ExceptionToCheck:
|
||||
time.sleep(mdelay)
|
||||
mtries -= 1
|
||||
mdelay *= backoff
|
||||
return f(*args, **kwargs)
|
||||
return f_retry # true decorator
|
||||
return deco_retry
|
||||
|
||||
|
||||
def is_int(value):
|
||||
"""Check if a variable is an integer."""
|
||||
"""Check if a variable is an integer.
|
||||
|
||||
:param value: parameter to evaluate and return
|
||||
:type value: str || int || float
|
||||
:returns: str || int || float
|
||||
"""
|
||||
for v_type in [int, float]:
|
||||
try:
|
||||
value = v_type(value)
|
||||
|
@ -42,43 +83,68 @@ def is_int(value):
|
|||
|
||||
|
||||
class LocalCache(object):
|
||||
"""Context Manager for opening and closing access to the DBM."""
|
||||
"""Context Manager for opening and closing access to the cache objects."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialization method for class."""
|
||||
"""Set the Path to the DBM to create/Open."""
|
||||
def __init__(self, cache_path=None):
|
||||
"""Set the Path cache object.
|
||||
|
||||
self.db_cache = os.path.join(
|
||||
tempfile.gettempdir(),
|
||||
'monitorstack.openstack.dbm'
|
||||
)
|
||||
:param cache_file: File path to store cache
|
||||
:type cache_file: str
|
||||
"""
|
||||
# If a cache file is provided use it otherwise store one in
|
||||
# the user home folder as a hidden folder.
|
||||
self.cache_path = cache_path
|
||||
if not self.cache_path:
|
||||
self.cache_path = os.path.join(
|
||||
os.path.expanduser('~'),
|
||||
'.monitorstack.cache'
|
||||
)
|
||||
elif not self.cache_path.endswith('cache'):
|
||||
self.cache_path = '{}.cache'.format(self.cache_path)
|
||||
|
||||
if not os.path.isdir(self.cache_path):
|
||||
os.makedirs(self.cache_path)
|
||||
|
||||
def __enter__(self):
|
||||
"""Open the DBM in r/w mode.
|
||||
"""Open the cache object.
|
||||
|
||||
:return: Open DBM
|
||||
:returns: object
|
||||
"""
|
||||
return self.open_shelve
|
||||
return self.open_cache
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Close DBM Connection."""
|
||||
self.close_shelve()
|
||||
|
||||
def _open_shelve(self):
|
||||
return shelve.open(self.db_cache)
|
||||
def __exit__(self, *args, **kwargs):
|
||||
"""Close cache object."""
|
||||
self.lc_close()
|
||||
|
||||
@property
|
||||
def open_shelve(self):
|
||||
"""Open shelved data."""
|
||||
return self._open_shelve()
|
||||
@retry(ExceptionToCheck=Exception)
|
||||
def open_cache(self):
|
||||
"""Return open caching opbject.
|
||||
|
||||
def close_shelve(self):
|
||||
:returns: object
|
||||
"""
|
||||
return diskcache.Cache(directory=self.cache_path)
|
||||
|
||||
def lc_open(self):
|
||||
"""Open shelved data.
|
||||
|
||||
:param cache_file: File path to store cache
|
||||
:type cache_file: str
|
||||
:returns: object
|
||||
"""
|
||||
return self.open_cache
|
||||
|
||||
def lc_close(self):
|
||||
"""Close shelved data."""
|
||||
self.open_shelve.close()
|
||||
self.open_cache.close()
|
||||
|
||||
|
||||
def read_config(config_file):
|
||||
"""Read an OpenStack configuration."""
|
||||
"""Read an OpenStack configuration.
|
||||
|
||||
:param config_file: path to configuration file.
|
||||
:type config_file: str
|
||||
"""
|
||||
cfg = os.path.abspath(os.path.expanduser(config_file))
|
||||
if not os.path.isfile(cfg):
|
||||
raise IOError('Config file "{}" was not found'.format(cfg))
|
||||
|
@ -89,11 +155,22 @@ def read_config(config_file):
|
|||
args = dict()
|
||||
defaults = dict([(k, v) for k, v in parser.items(section='DEFAULT')])
|
||||
for section in parser.sections():
|
||||
if section == 'DEFAULT':
|
||||
continue
|
||||
|
||||
sec = args[section] = defaults
|
||||
for key, value in parser.items(section):
|
||||
sec[key] = is_int(value=value)
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def log_exception(exp):
|
||||
"""Return log entries.
|
||||
|
||||
:param exp: Exception object or name.
|
||||
:type exp: str || object
|
||||
:return: str
|
||||
"""
|
||||
_trace = [i.strip() for i in str(traceback.format_exc()).splitlines()]
|
||||
trace = ' -> '.join(_trace)
|
||||
_exception = [i.strip() for i in str(exp).splitlines()]
|
||||
exception = ' -> '.join(_exception)
|
||||
return 'Exception [ %s ]: Trace: [ %s ]' % (exception, trace)
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
try:
|
||||
from openstack import connection as os_conn
|
||||
from openstack import exceptions as os_exp
|
||||
except ImportError as e:
|
||||
raise SystemExit('OpenStack plugins require access to the OpenStackSDK.'
|
||||
' Please install "python-openstacksdk".'
|
||||
|
@ -28,16 +27,32 @@ class OpenStack(object):
|
|||
"""Class for reusable OpenStack utility methods."""
|
||||
|
||||
def __init__(self, os_auth_args):
|
||||
"""Initialization method for class."""
|
||||
"""Initialization method for class.
|
||||
|
||||
:param os_auth_args: dict containing auth creds.
|
||||
:type os_auth_args: dict
|
||||
"""
|
||||
self.os_auth_args = os_auth_args
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
"""Return an OpenStackSDK connection."""
|
||||
"""Return an OpenStackSDK connection.
|
||||
|
||||
:returns: object
|
||||
"""
|
||||
return os_conn.Connection(**self.os_auth_args)
|
||||
|
||||
def get_consumer_usage(self, servers=None, marker=None, limit=512):
|
||||
"""Retrieve current usage by an OpenStack cloud consumer."""
|
||||
"""Retrieve current usage by an OpenStack cloud consumer.
|
||||
|
||||
:param servers: ID of a given project to lookup.
|
||||
:type servers: str || uuid
|
||||
:param marker: ID of last server seen.
|
||||
:type marker: str || uuid
|
||||
:param limit: Number of items a single API call can return.
|
||||
:type limit: int
|
||||
:returns: list
|
||||
"""
|
||||
tenant_kwargs = {'details': True, 'all_tenants': True, 'limit': limit}
|
||||
if not servers:
|
||||
servers = list()
|
||||
|
@ -47,54 +62,123 @@ class OpenStack(object):
|
|||
|
||||
count = 0
|
||||
for server in self.conn.compute.servers(**tenant_kwargs):
|
||||
servers.append(server)
|
||||
servers.append(server.to_dict())
|
||||
count += 1
|
||||
|
||||
if count == limit:
|
||||
return self.get_consumer_usage(
|
||||
servers=servers,
|
||||
marker=servers[-1].id
|
||||
)
|
||||
if count == limit:
|
||||
return self.get_consumer_usage(
|
||||
servers=servers,
|
||||
marker=servers[-1]['id']
|
||||
)
|
||||
|
||||
return servers
|
||||
|
||||
def get_compute_limits(self, project_id, interface='internal'):
|
||||
"""Determine limits of compute resources."""
|
||||
url = self.conn.compute.session.get_endpoint(
|
||||
"""Return compute resource limits for a project.
|
||||
|
||||
:param project_id: ID of a given project to lookup.
|
||||
:type project_id: str || uuid
|
||||
:param interface: Interface name, normally [internal, public, admin].
|
||||
:type interface: str
|
||||
:returns: dict
|
||||
"""
|
||||
url = self.conn.session.get_endpoint(
|
||||
interface=interface,
|
||||
service_type='compute'
|
||||
)
|
||||
quota_data = self.conn.compute.session.get(
|
||||
quota_data = self.conn.session.get(
|
||||
url + '/os-quota-sets/' + project_id
|
||||
)
|
||||
return quota_data.json()
|
||||
|
||||
def get_project_name(self, project_id):
|
||||
"""Retrieve the name of a project."""
|
||||
with utils.LocalCache() as c:
|
||||
try:
|
||||
project_name = c.get(project_id)
|
||||
if not project_name:
|
||||
project_info = self.conn.identity.get_project(project_id)
|
||||
project_name = c[project_info.id] = project_info.name
|
||||
except os_exp.ResourceNotFound:
|
||||
return None
|
||||
else:
|
||||
return project_name
|
||||
|
||||
def get_projects(self):
|
||||
"""Retrieve a list of projects."""
|
||||
"""Retrieve a list of projects.
|
||||
|
||||
:returns: list
|
||||
"""
|
||||
_consumers = list()
|
||||
with utils.LocalCache() as c:
|
||||
for project in self.conn.identity.projects():
|
||||
_consumers.append(project)
|
||||
c[project.id] = project.name
|
||||
cache_key = 'projects_' + str(project.id)
|
||||
c.set(
|
||||
cache_key,
|
||||
project.to_dict(),
|
||||
expire=43200,
|
||||
tag='projects'
|
||||
)
|
||||
return _consumers
|
||||
|
||||
def get_project(self, project_id):
|
||||
"""Retrieve project data.
|
||||
|
||||
:param project_id: ID of a given project to lookup.
|
||||
:type project_id: str || uuid
|
||||
:returns: dict
|
||||
"""
|
||||
project = None
|
||||
cache_key = 'projects_{}'.format(project_id)
|
||||
with utils.LocalCache() as c:
|
||||
try:
|
||||
project = c.get(cache_key)
|
||||
if not project:
|
||||
raise LookupError
|
||||
except LookupError:
|
||||
project_info = self.conn.identity.get_project(project_id)
|
||||
project = project_info.to_dict()
|
||||
c.set(cache_key, project, expire=43200, tag='projects')
|
||||
finally:
|
||||
return project
|
||||
|
||||
def get_project_name(self, project_id):
|
||||
"""Retrieve the name of a project."""
|
||||
return self.get_project(project_id=project_id)['name']
|
||||
|
||||
def get_flavors(self):
|
||||
"""Retrieve a list of flavors."""
|
||||
flavor_cache = dict()
|
||||
for flavor in self.conn.compute.flavors():
|
||||
entry = flavor_cache[flavor['id']] = dict()
|
||||
entry.update(flavor)
|
||||
return flavor_cache
|
||||
"""Retrieve all of flavors.
|
||||
|
||||
:returns: dict
|
||||
"""
|
||||
flavors = dict()
|
||||
with utils.LocalCache() as c:
|
||||
for flavor in self.conn.compute.flavors():
|
||||
_flavor = flavor.to_dict()
|
||||
cache_key = 'flavor_' + str(flavor.id)
|
||||
c.set(
|
||||
cache_key,
|
||||
_flavor,
|
||||
expire=43200,
|
||||
tag='flavors'
|
||||
)
|
||||
entry = flavors[flavor.id] = dict()
|
||||
entry.update(_flavor)
|
||||
return flavors
|
||||
|
||||
def get_flavor(self, flavor_id):
|
||||
"""Retrieve a flavor.
|
||||
|
||||
:param flavor_id: ID of a given flavor to lookup.
|
||||
:type flavor_id: int || str
|
||||
:returns: dict
|
||||
"""
|
||||
flavor = None
|
||||
cache_key = 'flavor_{}'.format(flavor_id)
|
||||
with utils.LocalCache() as c:
|
||||
try:
|
||||
flavor = c.get(cache_key)
|
||||
if not flavor:
|
||||
raise LookupError
|
||||
except LookupError:
|
||||
flavor_info = self.conn.compute.get_flavor(flavor_id)
|
||||
flavor = flavor_info.to_dict()
|
||||
c.set(cache_key, flavor, expire=43200, tag='flavors')
|
||||
finally:
|
||||
return flavor
|
||||
|
||||
def get_flavor_name(self, flavor_id):
|
||||
"""Retrieve the name of a flavor.
|
||||
|
||||
:param flavor_id: ID of a given flavor to lookup.
|
||||
:type flavor_id: int || str
|
||||
:returns: str
|
||||
"""
|
||||
return self.get_flavor(flavor_id=flavor_id)['name']
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
click
|
||||
diskcache
|
||||
openstacksdk>=0.9.14
|
||||
psutil>=5.2.0
|
||||
six
|
||||
|
|
|
@ -13,3 +13,17 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""This an __init__.py."""
|
||||
|
||||
import os
|
||||
|
||||
from monitorstack import utils
|
||||
|
||||
|
||||
def read_config():
|
||||
"""Load the test config file."""
|
||||
os_config_file = os.path.expanduser(
|
||||
os.path.abspath(
|
||||
os.path.dirname(__file__) + '/files/test-openstack.ini'
|
||||
)
|
||||
)
|
||||
return utils.read_config(os_config_file)
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
# Store the authentication credentials needed to query a given OpenStack Service.
|
||||
# All sections are overrides for the defaults. If you only need to connect to a
|
||||
# single cloud simply store the credentials needd in the DEFAULT section and
|
||||
# override whatever is needed within the local sections.
|
||||
|
||||
[DEFAULT]
|
||||
insecure = false
|
||||
auth_url = https://localhost:5000/v3
|
||||
|
||||
# NOTE(cloudnull):
|
||||
# When using keystone V3 you will need the .*domain_name configuration options.
|
||||
user_domain_name = default # This is required when Keystone V3 is being used
|
||||
project_domain_name = default # This is required when Keystone V3 is being used
|
||||
|
||||
# If you're using keystone V2 you will need the tenant_name option.
|
||||
tenant_name = admin # This is required when Keystone V2 is being used
|
||||
|
||||
# NEVER Mix and match the options tenant name and domain_name options.
|
||||
# You are be required to run either V2 or V3 as it pertains to this config.
|
||||
# If you provide both tenant_name and .*domain_name options at the same time
|
||||
# the plugins will fail API version negotiation.
|
||||
|
||||
username = admin
|
||||
password = Secrete
|
||||
# The verify option is for SSL. If your SSL certificate is not
|
||||
# valid set this option to false else omit it or set it true.
|
||||
verify = false
|
||||
|
||||
[keystone]
|
||||
|
||||
[glance]
|
||||
|
||||
[nova]
|
||||
project_name = nova
|
||||
|
||||
[neutron]
|
||||
|
||||
[heat]
|
||||
|
||||
[cinder]
|
||||
|
||||
[ironic]
|
||||
auth_url = https://localhost:5000/v3
|
||||
project_name = ironic
|
||||
user_domain_name = users
|
||||
project_domain_name = projects
|
||||
password = SuperSecrete
|
|
@ -0,0 +1 @@
|
|||
../../etc/openstack.ini
|
|
@ -0,0 +1,246 @@
|
|||
# Copyright 2017, Major Hayden <major@mhtx.net>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for the os_utils plugin."""
|
||||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from monitorstack.utils import os_utils
|
||||
|
||||
import tests # Import the test base module
|
||||
|
||||
|
||||
class OpenStackObject(object):
|
||||
"""Mocked server object."""
|
||||
|
||||
def __init__(self, id=None, name=None):
|
||||
"""Mocked server class."""
|
||||
self.id = id
|
||||
self.name = name
|
||||
|
||||
def to_dict(self):
|
||||
"""Mocked dict return."""
|
||||
return {
|
||||
'id': self.id,
|
||||
'name': self.name
|
||||
}
|
||||
|
||||
|
||||
class MockedOpenStackConn(object):
|
||||
"""Mocked OpenStack Connection object."""
|
||||
|
||||
class compute(object): # noqa
|
||||
"""Mocked compute class."""
|
||||
|
||||
@staticmethod
|
||||
def servers(*args, **kwargs):
|
||||
"""Mocked servers method."""
|
||||
servers = [
|
||||
OpenStackObject(1, 'test1'),
|
||||
OpenStackObject(2, 'test2'),
|
||||
OpenStackObject(3, 'test3'),
|
||||
OpenStackObject(4, 'test4'),
|
||||
OpenStackObject(5, 'test5')
|
||||
]
|
||||
if 'marker' in kwargs:
|
||||
for server in servers:
|
||||
if server.id == kwargs['marker']:
|
||||
index = servers.index(server)
|
||||
servers.pop(index)
|
||||
return servers[index:]
|
||||
return servers
|
||||
|
||||
@staticmethod
|
||||
def flavors():
|
||||
"""Mocked flavors return."""
|
||||
return [
|
||||
OpenStackObject(1, 'test1'),
|
||||
OpenStackObject(2, 'test2'),
|
||||
OpenStackObject(3, 'test3'),
|
||||
OpenStackObject(4, 'test4'),
|
||||
OpenStackObject(5, 'test5')
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_flavor(flavor_id):
|
||||
"""Return mocked flavor object."""
|
||||
return OpenStackObject(
|
||||
flavor_id,
|
||||
'test_{}'.format(flavor_id)
|
||||
)
|
||||
|
||||
class identity(object): # noqa
|
||||
"""Mocked identity object."""
|
||||
|
||||
@staticmethod
|
||||
def projects():
|
||||
"""Mocked projects return."""
|
||||
return [
|
||||
OpenStackObject(1, 'test1'),
|
||||
OpenStackObject(2, 'test2'),
|
||||
OpenStackObject(3, 'test3'),
|
||||
OpenStackObject(4, 'test4'),
|
||||
OpenStackObject(5, 'test5')
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_project(project_id):
|
||||
"""Return mocked project object."""
|
||||
return OpenStackObject(
|
||||
project_id,
|
||||
'test_{}'.format(project_id)
|
||||
)
|
||||
|
||||
class session(object): # noqa
|
||||
"""Mocked session object."""
|
||||
|
||||
@staticmethod
|
||||
def get_endpoint(interface, service_type):
|
||||
"""Mocked endpoint return."""
|
||||
return "https://127.0.1.1/{}/{}".format(interface, service_type)
|
||||
|
||||
@staticmethod
|
||||
def get(url):
|
||||
"""Mocked get return."""
|
||||
class SessionGet(object):
|
||||
"""Mocked session object."""
|
||||
|
||||
def __init__(self, url):
|
||||
"""Mocked session get."""
|
||||
self.url = url
|
||||
|
||||
def json(self):
|
||||
"""Mocked json return."""
|
||||
return {'url': self.url}
|
||||
|
||||
return SessionGet(url=url)
|
||||
|
||||
|
||||
class TestOSUtilsConnection(unittest.TestCase):
|
||||
"""Tests for the utilities."""
|
||||
|
||||
def test_conn(self):
|
||||
"""Test the OpenStack connection interface."""
|
||||
# load the base class for these tests.
|
||||
self.osu = os_utils.OpenStack(
|
||||
os_auth_args=tests.read_config()['keystone']
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(
|
||||
self.osu.conn,
|
||||
os_utils.os_conn.Connection
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class TestOsUtils(unittest.TestCase):
|
||||
"""Tests for the utilities."""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup the test."""
|
||||
# load the base class for these tests.
|
||||
self.osu = os_utils.OpenStack(
|
||||
os_auth_args=tests.read_config()['keystone']
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
"""Tear down the test."""
|
||||
pass
|
||||
|
||||
def test_get_consumer_usage(self):
|
||||
"""Test retrieving consumer usage."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
self.assertTrue(isinstance(self.osu.get_consumer_usage(), list))
|
||||
|
||||
def test_get_consumer_usage_with_servers(self):
|
||||
"""Test retrieving consumer usage with servers list."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
servers = self.osu.get_consumer_usage(
|
||||
servers=[OpenStackObject(0, 'test0').to_dict()]
|
||||
)
|
||||
self.assertEquals(len(servers), 6)
|
||||
|
||||
def test_get_consumer_usage_with_marker(self):
|
||||
"""Test retrieving consumer usage."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
servers = self.osu.get_consumer_usage(marker=5)
|
||||
self.assertEquals(len(servers), 0)
|
||||
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
servers = self.osu.get_consumer_usage(marker=2)
|
||||
self.assertEquals(len(servers), 3)
|
||||
|
||||
def test_get_consumer_usage_with_limit(self):
|
||||
"""Test retrieving consumer usage."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
servers = self.osu.get_consumer_usage(limit=1)
|
||||
self.assertEquals(len(servers), 5)
|
||||
|
||||
def test_get_compute_limits(self):
|
||||
"""Test retrieving consumer limits."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
limits = self.osu.get_compute_limits(project_id='not-a-uuid')
|
||||
u = 'https://127.0.1.1/internal/compute/os-quota-sets/not-a-uuid'
|
||||
self.assertEquals(limits, {'url': u})
|
||||
|
||||
def test_get_projects(self):
|
||||
"""Test retrieving project list."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
projects = self.osu.get_projects()
|
||||
self.assertEquals(len(projects), 5)
|
||||
|
||||
def test_get_project(self):
|
||||
"""Test retrieving project dict."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
project = self.osu.get_project(project_id='12345')
|
||||
self.assertEquals(project['id'], '12345')
|
||||
self.assertEquals(project['name'], 'test_12345')
|
||||
|
||||
def test_get_project_name(self):
|
||||
"""Test retrieving project name."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
project_name = self.osu.get_project_name(project_id='12345')
|
||||
self.assertEquals(project_name, 'test_12345')
|
||||
|
||||
def test_get_flavors(self):
|
||||
"""Test retrieving flavors dict."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
servers = self.osu.get_flavors()
|
||||
self.assertEquals(len(servers), 5)
|
||||
|
||||
def test_get_flavor(self):
|
||||
"""Test retrieving flavor dict."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
flavor = self.osu.get_flavor(flavor_id=12345)
|
||||
self.assertEquals(flavor['id'], 12345)
|
||||
self.assertEquals(flavor['name'], 'test_12345')
|
||||
|
||||
def test_get_flavor_name(self):
|
||||
"""Test retrieving flavor name."""
|
||||
with mock.patch('openstack.connection.Connection') as MockClass:
|
||||
MockClass.return_value = MockedOpenStackConn()
|
||||
flavor_name = self.osu.get_flavor_name(flavor_id=12345)
|
||||
self.assertEquals(flavor_name, 'test_12345')
|
|
@ -15,12 +15,22 @@
|
|||
|
||||
import json
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
from monitorstack.cli import cli
|
||||
|
||||
|
||||
def _runner(module):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, ['-f', 'json', module])
|
||||
try:
|
||||
return json.loads(result.output)
|
||||
except Exception:
|
||||
return result.exception
|
||||
|
||||
|
||||
class LibvirtStub(object):
|
||||
"""Stubbed libvirt class."""
|
||||
|
||||
|
@ -38,26 +48,44 @@ class LibvirtStub(object):
|
|||
|
||||
class lookupByID(object): # noqa
|
||||
"""Stubbed lookupByID class."""
|
||||
def __init__(self, *args, **kwargs): # noqa
|
||||
def __init__(self, *args, **kwargs): # noqa
|
||||
pass
|
||||
|
||||
def maxVcpus(self): # noqa
|
||||
return 2
|
||||
|
||||
|
||||
class TestKvm(object):
|
||||
class LibvirtStubFailed(object):
|
||||
"""Stubbed libvirt class."""
|
||||
|
||||
class openReadOnly(object): # noqa
|
||||
"""Stubbed openReadOnly class."""
|
||||
|
||||
def close(self, *args, **kwargs): # noqa
|
||||
pass
|
||||
|
||||
def listDomainsID(self, *args, **kwargs): # noqa
|
||||
raise RuntimeError('Failed')
|
||||
|
||||
|
||||
class TestKvm(unittest.TestCase):
|
||||
"""Tests for the kvm monitor."""
|
||||
|
||||
def test_run(self):
|
||||
def setUp(self):
|
||||
"""Setup teardown."""
|
||||
self.orig_libvirt = sys.modules.pop('libvirt', None)
|
||||
sys.modules['libvirt'] = LibvirtStub()
|
||||
|
||||
def tearDown(self):
|
||||
"""Teardown method."""
|
||||
if self.orig_libvirt:
|
||||
sys.modules['libvirt'] = self.orig_libvirt
|
||||
|
||||
def test_run_success(self):
|
||||
"""Ensure the run() method works."""
|
||||
sys.modules['libvirt'] = LibvirtStub
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, ['-f', 'json', 'kvm'])
|
||||
result_json = json.loads(result.output)
|
||||
|
||||
variables = result_json['variables']
|
||||
meta = result_json['meta']
|
||||
result = _runner('kvm')
|
||||
variables = result['variables']
|
||||
meta = result['meta']
|
||||
assert 'kvm_vms' in variables
|
||||
assert variables['kvm_vms'] == 3
|
||||
assert 'kvm_total_vcpus' in variables
|
||||
|
@ -66,4 +94,17 @@ class TestKvm(object):
|
|||
assert variables['kvm_scheduled_vcpus'] == 6
|
||||
assert 'platform' in meta
|
||||
assert 'kvm_host_id' in meta
|
||||
assert result.exit_code == 0
|
||||
assert result['exit_code'] == 0
|
||||
|
||||
def test_run_failure_no_libvirt(self):
|
||||
"""Ensure the run() method works."""
|
||||
sys.modules.pop('libvirt', None)
|
||||
result = _runner('kvm')
|
||||
self.assertTrue(isinstance(result, SystemExit))
|
||||
|
||||
def test_run_failure(self):
|
||||
"""Ensure the run() method works."""
|
||||
sys.modules['libvirt'] = LibvirtStubFailed()
|
||||
result = _runner('kvm')
|
||||
assert result['measurement_name'] == 'kvm'
|
||||
assert result['exit_code'] == 1
|
||||
|
|
|
@ -38,20 +38,22 @@ class MockProject(object):
|
|||
"""Mock init."""
|
||||
self.id = 'testing'
|
||||
self.name = 'testing'
|
||||
self.project_id = 12345
|
||||
|
||||
|
||||
def mock_get_consumer_usage(self):
|
||||
def mock_get_consumer_usage(*args, **kwargs):
|
||||
"""Mocked get_consumer_usage()."""
|
||||
return [{
|
||||
'name': 'test_name',
|
||||
'project_id': 12345,
|
||||
'flavor': {
|
||||
'id': 1,
|
||||
'name': 'flavor_one',
|
||||
'name': 'flavor_one'
|
||||
}
|
||||
}]
|
||||
|
||||
|
||||
def mock_get_flavors(self):
|
||||
def mock_get_flavors(*args, **kwargs):
|
||||
"""Mocked get_flavors()."""
|
||||
return {
|
||||
1: {
|
||||
|
@ -63,13 +65,27 @@ def mock_get_flavors(self):
|
|||
}
|
||||
|
||||
|
||||
def mock_get_projects(arg1):
|
||||
def mock_get_flavor(*args, **kwargs):
|
||||
"""Mocked get_flavor(id)."""
|
||||
return {
|
||||
'name': 'flavor_one',
|
||||
'vcpus': 2,
|
||||
'disk': 10,
|
||||
'ram': 1024,
|
||||
}
|
||||
|
||||
|
||||
def mock_get_project_name(*args, **kwargs):
|
||||
"""Mocked get_projects()."""
|
||||
projects = MockProject()
|
||||
return [projects]
|
||||
return 'test_name'
|
||||
|
||||
|
||||
def mock_get_compute_limits(self, project_id, interface):
|
||||
def mock_get_projects(*args, **kwargs):
|
||||
"""Mocked get_projects()."""
|
||||
return [MockProject()]
|
||||
|
||||
|
||||
def mock_get_compute_limits(*args, **kwargs):
|
||||
"""Mocked get_compute_limits()."""
|
||||
return {
|
||||
'quota_set': {
|
||||
|
@ -131,6 +147,8 @@ class TestOs(object):
|
|||
def test_os_vm_used_cores_success(self, monkeypatch):
|
||||
"""Ensure os_vm_used_cores method works with success."""
|
||||
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
|
||||
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
|
||||
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
|
||||
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
|
||||
|
||||
result = _runner('os_vm_used_cores')
|
||||
|
@ -147,6 +165,8 @@ class TestOs(object):
|
|||
def test_os_vm_used_disk_success(self, monkeypatch):
|
||||
"""Ensure os_vm_used_disk method works with success."""
|
||||
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
|
||||
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
|
||||
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
|
||||
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
|
||||
|
||||
result = _runner('os_vm_used_disk')
|
||||
|
@ -162,6 +182,9 @@ class TestOs(object):
|
|||
|
||||
def test_os_vm_used_instance_success(self, monkeypatch):
|
||||
"""Ensure os_vm_used_instance method works with success."""
|
||||
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
|
||||
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
|
||||
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
|
||||
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
|
||||
|
||||
result = _runner('os_vm_used_instance')
|
||||
|
@ -178,6 +201,8 @@ class TestOs(object):
|
|||
def test_os_vm_used_ram_success(self, monkeypatch):
|
||||
"""Ensure os_vm_used_ram method works with success."""
|
||||
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
|
||||
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
|
||||
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
|
||||
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
|
||||
|
||||
result = _runner('os_vm_used_ram')
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for the uptime plugin."""
|
||||
"""Tests for the utils."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
@ -26,21 +26,27 @@ class TestUtils(unittest.TestCase):
|
|||
def setUp(self):
|
||||
"""Initial setup for class."""
|
||||
os_config_file = os.path.expanduser(
|
||||
os.path.abspath(__file__ + '/../../etc/openstack.ini')
|
||||
os.path.abspath(
|
||||
os.path.dirname(__file__) + '/files/test-openstack.ini'
|
||||
)
|
||||
)
|
||||
self.config = utils.read_config(os_config_file)
|
||||
conf = utils.ConfigParser.RawConfigParser()
|
||||
conf.read([os_config_file])
|
||||
self.config_defaults = conf.defaults()
|
||||
|
||||
self.g_testfile = os.path.join(
|
||||
os.path.expanduser('~'),
|
||||
'.monitorstack.cache'
|
||||
)
|
||||
self.t_testfile = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
"""Destroy the local cache."""
|
||||
local_cache = os.path.join(
|
||||
tempfile.gettempdir(),
|
||||
'monitorstack.openstack.dbm'
|
||||
)
|
||||
if os.path.exists(local_cache):
|
||||
os.remove(local_cache)
|
||||
for f in [self.g_testfile, self.t_testfile]:
|
||||
cache_db = os.path.join(f, 'cache.db')
|
||||
if os.path.exists(cache_db):
|
||||
os.remove(cache_db)
|
||||
|
||||
def test_is_int_is_int(self): # noqa
|
||||
self.assertTrue(isinstance(utils.is_int(value=1), int))
|
||||
|
@ -67,8 +73,84 @@ class TestUtils(unittest.TestCase):
|
|||
for key in self.config_defaults.keys():
|
||||
self.assertTrue(key in v.keys())
|
||||
|
||||
def test_local_cache(self):
|
||||
def test_local_cache_no_file(self):
|
||||
"""Test local cache."""
|
||||
with utils.LocalCache() as c:
|
||||
c['test_key'] = True
|
||||
self.assertTrue('test_key' in c)
|
||||
c['test_key1'] = True
|
||||
self.assertTrue('test_key1' in c)
|
||||
|
||||
def test_local_cache_file(self):
|
||||
"""Test local cache."""
|
||||
with utils.LocalCache(cache_path=self.t_testfile) as c:
|
||||
c['test_key2'] = True
|
||||
self.assertTrue('test_key2' in c)
|
||||
|
||||
def test_local_cache_no_file_no_context(self):
|
||||
"""Test local cache without a context manager."""
|
||||
c = utils.LocalCache()
|
||||
cache = c.lc_open()
|
||||
cache['test_key3'] = True
|
||||
try:
|
||||
self.assertTrue('test_key3' in cache)
|
||||
finally:
|
||||
c.lc_close()
|
||||
|
||||
with utils.LocalCache() as c:
|
||||
self.assertTrue('test_key3' in c)
|
||||
|
||||
def test_local_cache_file_no_context(self):
|
||||
"""Test local cache without a context manager."""
|
||||
c = utils.LocalCache(cache_path=self.t_testfile)
|
||||
cache = c.lc_open()
|
||||
cache['test_key4'] = True
|
||||
try:
|
||||
self.assertTrue('test_key4' in cache)
|
||||
finally:
|
||||
c.lc_close()
|
||||
|
||||
with utils.LocalCache(cache_path=self.t_testfile) as c:
|
||||
self.assertTrue('test_key4' in c)
|
||||
|
||||
def test_local_cache_no_load(self):
|
||||
"""Test local cache without loading anything."""
|
||||
c = utils.LocalCache(cache_path=self.t_testfile)
|
||||
c.lc_close()
|
||||
|
||||
def test_local_cache_named_ext(self):
|
||||
"""Test local cache without loading anything with a named extension."""
|
||||
utils.LocalCache(cache_path='{}.cache'.format(self.t_testfile))
|
||||
|
||||
def test_retry_failure(self):
|
||||
"""Test retry decorator for failure."""
|
||||
@utils.retry(ExceptionToCheck=BaseException, tries=3, backoff=0,
|
||||
delay=1)
|
||||
def _failed():
|
||||
"""Raise failure exception after retry."""
|
||||
raise BaseException
|
||||
|
||||
self.assertRaises(BaseException, _failed)
|
||||
|
||||
def test_retry_success(self):
|
||||
"""Test retry decorator for success."""
|
||||
@utils.retry(ExceptionToCheck=BaseException, tries=3, backoff=0,
|
||||
delay=1)
|
||||
def _success():
|
||||
"""Return True after retry."""
|
||||
self.count += 1
|
||||
if self.count == 3:
|
||||
return True
|
||||
else:
|
||||
raise BaseException
|
||||
|
||||
self.count = 0
|
||||
self.assertEquals(_success(), True)
|
||||
|
||||
def test_log_exception(self):
|
||||
"""Test traceback formatter for exception messages."""
|
||||
try:
|
||||
raise Exception('test-exception')
|
||||
except Exception as exp:
|
||||
message = utils.log_exception(exp=exp)
|
||||
|
||||
self.assertTrue('Exception' in message)
|
||||
self.assertTrue('Trace' in message)
|
||||
|
|
Loading…
Reference in New Issue