OpenStack Compute (Nova) Client
import contextlib
import json
import re
import textwrap
import time
import uuid
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
import pkg_resources
import prettytable
import six
from novaclient import exceptions
from novaclient.i18n import _
from novaclient.openstack.common import cliutils
VALID_KEY_REGEX = re.compile(r"[\w\.\- :]+$", re.UNICODE)
def add_resource_manager_extra_kwargs_hook(f, hook):
"""Add hook to bind CLI arguments to ResourceManager calls.
The `do_foo` calls in will receive CLI args and then in turn pass
them through to the ResourceManager. Before passing through the args, the
hooks registered here will be called, giving us a chance to add extra
kwargs (taken from the command-line) to what's passed to the
if not hasattr(f, 'resource_manager_kwargs_hooks'):
f.resource_manager_kwargs_hooks = []
names = [h.__name__ for h in f.resource_manager_kwargs_hooks]
if hook.__name__ not in names:
def get_resource_manager_extra_kwargs(f, args, allow_conflicts=False):
"""Return extra_kwargs by calling resource manager kwargs hooks."""
hooks = getattr(f, "resource_manager_kwargs_hooks", [])
extra_kwargs = {}
for hook in hooks:
hook_kwargs = hook(args)
hook_name = hook.__name__
conflicting_keys = set(hook_kwargs.keys()) & set(extra_kwargs.keys())
if conflicting_keys and not allow_conflicts:
msg = (_("Hook '%(hook_name)s' is attempting to redefine "
"attributes '%(conflicting_keys)s'") %
{'hook_name': hook_name,
'conflicting_keys': conflicting_keys})
raise exceptions.NoUniqueMatch(msg)
return extra_kwargs
def pretty_choice_dict(d):
"""Returns a formatted dict as 'key=value'."""
return cliutils.pretty_choice_list(
['%s=%s' % (k, d[k]) for k in sorted(d.keys())])
def print_list(objs, fields, formatters={}, sortby_index=None):
if sortby_index is None:
sortby = None
sortby = fields[sortby_index]
mixed_case_fields = ['serverId']
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
if data is None:
data = '-'
if sortby is not None:
result = encodeutils.safe_encode(pt.get_string(sortby=sortby))
result = encodeutils.safe_encode(pt.get_string())
if six.PY3:
result = result.decode()
def _flatten(data, prefix=None):
"""Flatten a dict, using name as a prefix for the keys of dict.
>>> _flatten('cpu_info', {'arch':'x86_64'})
[('cpu_info_arch': 'x86_64')]
if isinstance(data, dict):
for key, value in six.iteritems(data):
new_key = '%s_%s' % (prefix, key) if prefix else key
if isinstance(value, (dict, list)):
for item in _flatten(value, new_key):
yield item
yield new_key, value
yield prefix, data
def flatten_dict(data):
"""Return a new dict whose sub-dicts have been merged into the
original. Each of the parents keys are prepended to the child's
to prevent collisions. Any string elements will be JSON parsed
before flattening.
>>> flatten_dict({'service': {'host':'cloud9@compute-068', 'id': 143}})
{'service_host': colud9@compute-068', 'service_id': 143}
data = data.copy()
# Try and decode any nested JSON structures.
for key, value in six.iteritems(data):
if isinstance(value, six.string_types):
data[key] = json.loads(value)
except ValueError:
return dict(_flatten(data))
def print_dict(d, dict_property="Property", dict_value="Value", wrap=0):
pt = prettytable.PrettyTable([dict_property, dict_value], caching=False)
pt.align = 'l'
for k, v in sorted(d.items()):
# convert dict to str to check length
if isinstance(v, (dict, list)):
v = jsonutils.dumps(v)
if wrap > 0:
v = textwrap.fill(str(v), wrap)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, six.string_types) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
if v is None:
v = '-'
pt.add_row([k, v])
result = encodeutils.safe_encode(pt.get_string())
if six.PY3:
result = result.decode()
def find_resource(manager, name_or_id, **find_args):
"""Helper for the _find_* methods."""
# for str id which is not uuid (for Flavor and Keypair search currently)
if getattr(manager, 'is_alphanum_id_allowed', False):
return manager.get(name_or_id)
except exceptions.NotFound:
# try to get entity as integer id
return manager.get(int(name_or_id))
except (TypeError, ValueError, exceptions.NotFound):
# now try to get entity as uuid
tmp_id = encodeutils.safe_encode(name_or_id)
if six.PY3:
tmp_id = tmp_id.decode()
return manager.get(tmp_id)
except (TypeError, ValueError, exceptions.NotFound):
resource = getattr(manager, 'resource_class', None)
name_attr = resource.NAME_ATTR if resource else 'name'
kwargs = {name_attr: name_or_id}
return manager.find(**kwargs)
except exceptions.NotFound:
# finally try to find entity by human_id
return manager.find(human_id=name_or_id, **find_args)
except exceptions.NotFound:
msg = (_("No %(class)s with a name or ID of '%(name)s' exists.") %
{'class': manager.resource_class.__name__.lower(),
'name': name_or_id})
raise exceptions.CommandError(msg)
except exceptions.NoUniqueMatch:
msg = (_("Multiple %(class)s matches found for '%(name)s', use an ID "
"to be more specific.") %
{'class': manager.resource_class.__name__.lower(),
'name': name_or_id})
raise exceptions.CommandError(msg)
def _format_servers_list_networks(server):
output = []
for (network, addresses) in server.networks.items():
if len(addresses) == 0:
addresses_csv = ', '.join(addresses)
group = "%s=%s" % (network, addresses_csv)
return '; '.join(output)
def _format_security_groups(groups):
return ', '.join(group['name'] for group in groups)
def _format_field_name(attr):
"""Format an object attribute in a human-friendly way."""
# Split at ':' and leave the extension name as-is.
parts = attr.rsplit(':', 1)
name = parts[-1].replace('_', ' ')
# Don't title() on mixed case
if name.isupper() or name.islower():
name = name.title()
parts[-1] = name
return ': '.join(parts)
def _make_field_formatter(attr, filters=None):
Given an object attribute, return a formatted field name and a
formatter suitable for passing to print_list.
Optionally pass a dict mapping attribute names to a function. The function
will be passed the value of the attribute and should return the string to
filter_ = None
if filters:
filter_ = filters.get(attr)
def get_field(obj):
field = getattr(obj, attr, '')
if field and filter_:
field = filter_(field)
return field
name = _format_field_name(attr)
formatter = get_field
return name, formatter
def safe_issubclass(*args):
"""Like issubclass, but will just return False if not a class."""
if issubclass(*args):
return True
except TypeError:
return False
def do_action_on_many(action, resources, success_msg, error_msg):
"""Helper to run an action on many resources."""
failure_flag = False
for resource in resources:
print(success_msg % resource)
except Exception as e:
failure_flag = True
if failure_flag:
raise exceptions.CommandError(error_msg)
def _load_entry_point(ep_name, name=None):
"""Try to load the entry point ep_name that matches name."""
for ep in pkg_resources.iter_entry_points(ep_name, name=name):
return ep.load()
except (ImportError, pkg_resources.UnknownExtra, AttributeError):
def is_integer_like(val):
"""Returns validation of a value as an integer."""
return True
except (TypeError, ValueError, AttributeError):
return False
def validate_flavor_metadata_keys(keys):
for key in keys:
valid_name = VALID_KEY_REGEX.match(key)
if not valid_name:
msg = _('Invalid key: "%s". Keys may only contain letters, '
'numbers, spaces, underscores, periods, colons and '
raise exceptions.CommandError(msg % key)
def record_time(times, enabled, *args):
"""Record the time of a specific action.
:param times: A list of tuples holds time data.
:type times: list
:param enabled: Whether timing is enabled.
:type enabled: bool
:param *args: Other data to be stored besides time data, these args
will be joined to a string.
if not enabled:
start = time.time()
end = time.time()
times.append((' '.join(args), start, end))