Refactors nova.cmd utils

Some of functions used in the nova.cmd modules are duplicated
or they can be reused in future modules.

Adds docstrings to the refactored functions.

Adds unit tests to the refactored functions.

Change-Id: I026ce2e524a3fe61256c552c89a2efb90cdde9b6
Partially-Implements: blueprint discoverable-policy-cli
This commit is contained in:
Claudiu Belu 2016-05-29 18:04:04 +03:00
parent 014b32d60e
commit a049967e7d
6 changed files with 313 additions and 140 deletions

161
nova/cmd/common.py Normal file
View File

@ -0,0 +1,161 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Common functions used by different CLI interfaces.
"""
from __future__ import print_function
import argparse
import traceback
from oslo_log import log as logging
import six
import nova.conf
import nova.db.api
from nova import exception
from nova.i18n import _, _LE
from nova import utils
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
def block_db_access(service_name):
"""Blocks Nova DB access."""
class NoDB(object):
def __getattr__(self, attr):
return self
def __call__(self, *args, **kwargs):
stacktrace = "".join(traceback.format_stack())
LOG.error(_LE('No db access allowed in %(service_name)s: '
'%(stacktrace)s'),
dict(service_name=service_name, stacktrace=stacktrace))
raise exception.DBNotAllowed(service_name)
nova.db.api.IMPL = NoDB()
# Decorators for actions
def args(*args, **kwargs):
"""Decorator which adds the given args and kwargs to the args list of
the desired func's __dict__.
"""
def _decorator(func):
func.__dict__.setdefault('args', []).insert(0, (args, kwargs))
return func
return _decorator
def methods_of(obj):
"""Get all callable methods of an object that don't start with underscore
returns a list of tuples of the form (method_name, method)
"""
result = []
for i in dir(obj):
if callable(getattr(obj, i)) and not i.startswith('_'):
result.append((i, getattr(obj, i)))
return result
def add_command_parsers(subparsers, categories):
"""Adds command parsers to the given subparsers.
Adds version and bash-completion parsers.
Adds a parser with subparsers for each category in the categories dict
given.
"""
parser = subparsers.add_parser('version')
parser = subparsers.add_parser('bash-completion')
parser.add_argument('query_category', nargs='?')
for category in categories:
command_object = categories[category]()
desc = getattr(command_object, 'description', None)
parser = subparsers.add_parser(category, description=desc)
parser.set_defaults(command_object=command_object)
category_subparsers = parser.add_subparsers(dest='action')
for (action, action_fn) in methods_of(command_object):
parser = category_subparsers.add_parser(action, description=desc)
action_kwargs = []
for args, kwargs in getattr(action_fn, 'args', []):
# FIXME(markmc): hack to assume dest is the arg name without
# the leading hyphens if no dest is supplied
kwargs.setdefault('dest', args[0][2:])
if kwargs['dest'].startswith('action_kwarg_'):
action_kwargs.append(kwargs['dest'][len('action_kwarg_'):])
else:
action_kwargs.append(kwargs['dest'])
kwargs['dest'] = 'action_kwarg_' + kwargs['dest']
parser.add_argument(*args, **kwargs)
parser.set_defaults(action_fn=action_fn)
parser.set_defaults(action_kwargs=action_kwargs)
parser.add_argument('action_args', nargs='*',
help=argparse.SUPPRESS)
def print_bash_completion(categories):
if not CONF.category.query_category:
print(" ".join(categories.keys()))
elif CONF.category.query_category in categories:
fn = categories[CONF.category.query_category]
command_object = fn()
actions = methods_of(command_object)
print(" ".join([k for (k, v) in actions]))
def get_action_fn():
fn = CONF.category.action_fn
fn_args = []
for arg in CONF.category.action_args:
if isinstance(arg, six.binary_type):
arg = arg.decode('utf-8')
fn_args.append(arg)
fn_kwargs = {}
for k in CONF.category.action_kwargs:
v = getattr(CONF.category, 'action_kwarg_' + k)
if v is None:
continue
if isinstance(v, six.binary_type):
v = v.decode('utf-8')
fn_kwargs[k] = v
# call the action with the remaining arguments
# check arguments
missing = utils.validate_args(fn, *fn_args, **fn_kwargs)
if missing:
# NOTE(mikal): this isn't the most helpful error message ever. It is
# long, and tells you a lot of things you probably don't want to know
# if you just got a single arg wrong.
print(fn.__doc__)
CONF.print_help()
raise exception.Invalid(
_("Missing arguments: %s") % ", ".join(missing))
return fn, fn_args, fn_kwargs

View File

@ -17,17 +17,15 @@
"""Starter script for Nova Compute."""
import sys
import traceback
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from nova.cmd import common as cmd_common
from nova.conductor import rpcapi as conductor_rpcapi
import nova.conf
from nova import config
import nova.db.api
from nova import exception
from nova.i18n import _LE, _LW
from nova.i18n import _LW
from nova import objects
from nova.objects import base as objects_base
from nova import service
@ -38,20 +36,6 @@ CONF = nova.conf.CONF
LOG = logging.getLogger('nova.compute')
def block_db_access():
class NoDB(object):
def __getattr__(self, attr):
return self
def __call__(self, *args, **kwargs):
stacktrace = "".join(traceback.format_stack())
LOG.error(_LE('No db access allowed in nova-compute: %s'),
stacktrace)
raise exception.DBNotAllowed('nova-compute')
nova.db.api.IMPL = NoDB()
def main():
config.parse_args(sys.argv)
logging.setup(CONF, 'nova')
@ -61,7 +45,7 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version)
if not CONF.conductor.use_local:
block_db_access()
cmd_common.block_db_access('nova-compute')
objects_base.NovaObject.indirection_api = \
conductor_rpcapi.ConductorAPI()
else:

View File

@ -22,19 +22,17 @@ from __future__ import print_function
import os
import sys
import traceback
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import importutils
from nova.cmd import common as cmd_common
from nova.conductor import rpcapi as conductor_rpcapi
import nova.conf
from nova import config
from nova import context
import nova.db.api
from nova import exception
from nova.i18n import _LE, _LW
from nova.network import rpcapi as network_rpcapi
from nova import objects
@ -96,20 +94,6 @@ CONF.register_cli_opt(
handler=add_action_parsers))
def block_db_access():
class NoDB(object):
def __getattr__(self, attr):
return self
def __call__(self, *args, **kwargs):
stacktrace = "".join(traceback.format_stack())
LOG.error(_LE('No db access allowed in nova-dhcpbridge: %s'),
stacktrace)
raise exception.DBNotAllowed('nova-dhcpbridge')
nova.db.api.IMPL = NoDB()
def main():
"""Parse environment and arguments and call the appropriate action."""
config.parse_args(sys.argv,
@ -129,7 +113,7 @@ def main():
objects.register_all()
if not CONF.conductor.use_local:
block_db_access()
cmd_common.block_db_access('nova-dhcpbridge')
objects_base.NovaObject.indirection_api = \
conductor_rpcapi.ConductorAPI()
else:

View File

@ -54,7 +54,7 @@
from __future__ import print_function
import argparse
import functools
import os
import sys
@ -71,6 +71,7 @@ import six.moves.urllib.parse as urlparse
from nova.api.ec2 import ec2utils
from nova import availability_zones
from nova.cmd import common as cmd_common
import nova.conf
from nova import config
from nova import context
@ -96,11 +97,7 @@ _EXTRA_DEFAULT_LOG_LEVELS = ['oslo_db=INFO']
# Decorators for actions
def args(*args, **kwargs):
def _decorator(func):
func.__dict__.setdefault('args', []).insert(0, (args, kwargs))
return func
return _decorator
args = cmd_common.args
def param2id(object_id):
@ -1353,55 +1350,8 @@ CATEGORIES = {
}
def methods_of(obj):
"""Get all callable methods of an object that don't start with underscore
returns a list of tuples of the form (method_name, method)
"""
result = []
for i in dir(obj):
if callable(getattr(obj, i)) and not i.startswith('_'):
result.append((i, getattr(obj, i)))
return result
def add_command_parsers(subparsers):
parser = subparsers.add_parser('version')
parser = subparsers.add_parser('bash-completion')
parser.add_argument('query_category', nargs='?')
for category in CATEGORIES:
command_object = CATEGORIES[category]()
desc = getattr(command_object, 'description', None)
parser = subparsers.add_parser(category, description=desc)
parser.set_defaults(command_object=command_object)
category_subparsers = parser.add_subparsers(dest='action')
for (action, action_fn) in methods_of(command_object):
parser = category_subparsers.add_parser(action, description=desc)
action_kwargs = []
for args, kwargs in getattr(action_fn, 'args', []):
# FIXME(markmc): hack to assume dest is the arg name without
# the leading hyphens if no dest is supplied
kwargs.setdefault('dest', args[0][2:])
if kwargs['dest'].startswith('action_kwarg_'):
action_kwargs.append(
kwargs['dest'][len('action_kwarg_'):])
else:
action_kwargs.append(kwargs['dest'])
kwargs['dest'] = 'action_kwarg_' + kwargs['dest']
parser.add_argument(*args, **kwargs)
parser.set_defaults(action_fn=action_fn)
parser.set_defaults(action_kwargs=action_kwargs)
parser.add_argument('action_args', nargs='*',
help=argparse.SUPPRESS)
add_command_parsers = functools.partial(cmd_common.add_command_parsers,
categories=CATEGORIES)
category_opt = cfg.SubCommandOpt('category',
@ -1439,38 +1389,11 @@ def main():
return(0)
if CONF.category.name == "bash-completion":
if not CONF.category.query_category:
print(" ".join(CATEGORIES.keys()))
elif CONF.category.query_category in CATEGORIES:
fn = CATEGORIES[CONF.category.query_category]
command_object = fn()
actions = methods_of(command_object)
print(" ".join([k for (k, v) in actions]))
cmd_common.print_bash_completion(CATEGORIES)
return(0)
fn = CONF.category.action_fn
fn_args = [arg.decode('utf-8') for arg in CONF.category.action_args]
fn_kwargs = {}
for k in CONF.category.action_kwargs:
v = getattr(CONF.category, 'action_kwarg_' + k)
if v is None:
continue
if isinstance(v, six.string_types):
v = v.decode('utf-8')
fn_kwargs[k] = v
# call the action with the remaining arguments
# check arguments
missing = utils.validate_args(fn, *fn_args, **fn_kwargs)
if missing:
# NOTE(mikal): this isn't the most helpful error message ever. It is
# long, and tells you a lot of things you probably don't want to know
# if you just got a single arg wrong.
print(fn.__doc__)
CONF.print_help()
print(_("Missing arguments: %s") % ", ".join(missing))
return(1)
try:
fn, fn_args, fn_kwargs = cmd_common.get_action_fn()
ret = fn(*fn_args, **fn_kwargs)
rpc.cleanup()
return(ret)

View File

@ -17,17 +17,15 @@
"""Starter script for Nova Network."""
import sys
import traceback
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
from nova.cmd import common as cmd_common
from nova.conductor import rpcapi as conductor_rpcapi
import nova.conf
from nova import config
import nova.db.api
from nova import exception
from nova.i18n import _LE, _LW
from nova.i18n import _LW
from nova import objects
from nova.objects import base as objects_base
from nova import service
@ -38,20 +36,6 @@ CONF = nova.conf.CONF
LOG = logging.getLogger('nova.network')
def block_db_access():
class NoDB(object):
def __getattr__(self, attr):
return self
def __call__(self, *args, **kwargs):
stacktrace = "".join(traceback.format_stack())
LOG.error(_LE('No db access allowed in nova-network: %s'),
stacktrace)
raise exception.DBNotAllowed('nova-network')
nova.db.api.IMPL = NoDB()
def main():
config.parse_args(sys.argv)
logging.setup(CONF, "nova")
@ -61,7 +45,7 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version)
if not CONF.conductor.use_local:
block_db_access()
cmd_common.block_db_access('nova-network')
objects_base.NovaObject.indirection_api = \
conductor_rpcapi.ConductorAPI()
else:

View File

@ -0,0 +1,137 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the common functions used by different CLI interfaces.
"""
import mock
from nova.cmd import common as cmd_common
from nova.db import api
from nova import exception
from nova import test
class TestCmdCommon(test.NoDBTestCase):
@mock.patch.object(cmd_common, 'LOG')
@mock.patch.object(api, 'IMPL')
def test_block_db_access(self, mock_db_IMPL, mock_LOG):
cmd_common.block_db_access('unit-tests')
self.assertEqual(api.IMPL, api.IMPL.foo)
self.assertRaises(exception.DBNotAllowed, api.IMPL)
self.assertEqual('unit-tests',
mock_LOG.error.call_args[0][1]['service_name'])
def test_args_decorator(self):
@cmd_common.args(bar='<bar>')
@cmd_common.args('foo')
def f():
pass
f_args = f.__dict__['args']
bar_args = ((), {'bar': '<bar>'})
foo_args = (('foo', ), {})
self.assertEqual(bar_args, f_args[0])
self.assertEqual(foo_args, f_args[1])
def test_methods_of(self):
class foo(object):
foo = 'bar'
def public(self):
pass
def _private(self):
pass
methods = cmd_common.methods_of(foo())
method_names = [method_name for method_name, method in methods]
self.assertIn('public', method_names)
self.assertNotIn('_private', method_names)
self.assertNotIn('foo', method_names)
@mock.patch.object(cmd_common, 'print')
@mock.patch.object(cmd_common, 'CONF')
def test_print_bash_completion_no_query_category(self, mock_CONF,
mock_print):
mock_CONF.category.query_category = None
categories = {'foo': mock.sentinel.foo, 'bar': mock.sentinel.bar}
cmd_common.print_bash_completion(categories)
mock_print.assert_called_once_with(' '.join(categories.keys()))
@mock.patch.object(cmd_common, 'print')
@mock.patch.object(cmd_common, 'CONF')
def test_print_bash_completion_mismatch(self, mock_CONF, mock_print):
mock_CONF.category.query_category = 'bar'
categories = {'foo': mock.sentinel.foo}
cmd_common.print_bash_completion(categories)
self.assertFalse(mock_print.called)
@mock.patch.object(cmd_common, 'methods_of')
@mock.patch.object(cmd_common, 'print')
@mock.patch.object(cmd_common, 'CONF')
def test_print_bash_completion(self, mock_CONF, mock_print,
mock_method_of):
mock_CONF.category.query_category = 'foo'
actions = [('f1', mock.sentinel.f1), ('f2', mock.sentinel.f2)]
mock_method_of.return_value = actions
mock_fn = mock.Mock()
categories = {'foo': mock_fn}
cmd_common.print_bash_completion(categories)
mock_fn.assert_called_once_with()
mock_method_of.assert_called_once_with(mock_fn.return_value)
mock_print.assert_called_once_with(' '.join([k for k, v in actions]))
@mock.patch.object(cmd_common.utils, 'validate_args')
@mock.patch.object(cmd_common, 'CONF')
def test_get_action_fn(self, mock_CONF, mock_validate_args):
mock_validate_args.return_value = None
action_args = [u'arg']
action_kwargs = ['missing', 'foo', 'bar']
mock_CONF.category.action_fn = mock.sentinel.action_fn
mock_CONF.category.action_args = action_args
mock_CONF.category.action_kwargs = action_kwargs
mock_CONF.category.action_kwarg_foo = u'foo_val'
mock_CONF.category.action_kwarg_bar = True
mock_CONF.category.action_kwarg_missing = None
actual_fn, actual_args, actual_kwargs = cmd_common.get_action_fn()
self.assertEqual(mock.sentinel.action_fn, actual_fn)
self.assertEqual(action_args, actual_args)
self.assertEqual(u'foo_val', actual_kwargs['foo'])
self.assertTrue(actual_kwargs['bar'])
self.assertNotIn('missing', actual_kwargs)
@mock.patch.object(cmd_common.utils, 'validate_args')
@mock.patch.object(cmd_common, 'CONF')
def test_get_action_fn_missing_args(self, mock_CONF, mock_validate_args):
mock_validate_args.return_value = ['foo']
mock_CONF.category.action_fn = mock.sentinel.action_fn
mock_CONF.category.action_args = []
mock_CONF.category.action_kwargs = []
self.assertRaises(exception.Invalid, cmd_common.get_action_fn)
mock_CONF.print_help.assert_called_once_with()