Switch Watcher CLI to an OSC-compatible version

In this changeset, I switched the watcher command line to now use
the OpenStackClient code to avoid code duplication.

Partially Implements: blueprint openstackclient-plugin

Change-Id: I2e0df6a96f4c7c59d33b92c01962f65129bfc7cc
This commit is contained in:
Vincent Francoise
2016-04-29 14:17:39 +02:00
parent e8236aaf65
commit 1b14be868e
18 changed files with 1 additions and 2415 deletions

View File

@@ -24,7 +24,7 @@ packages =
[entry_points]
console_scripts =
watcher = watcherclient.shell:main
watcher = watcherclient.watcher:main
openstack.cli.extension =
infra_optim = watcherclient.plugin

View File

@@ -1,507 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Command-line interface to the Watcher API.
"""
from __future__ import print_function
import argparse
import getpass
import logging
import sys
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import discover
from keystoneclient import exceptions as ks_exc
from keystoneclient import session as kssession
import six.moves.urllib.parse as urlparse
import watcherclient
from watcherclient._i18n import _
from watcherclient import client as watcher_client
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient import exceptions as exc
class WatcherShell(object):
def _append_global_identity_args(self, parser):
# FIXME(dhu): these are global identity (Keystone) arguments which
# should be consistent and shared by all service clients. Therefore,
# they should be provided by python-keystoneclient. We will need to
# refactor this code once this functionality is avaible in
# python-keystoneclient.
# Register arguments needed for a Session
kssession.Session.register_cli_options(parser)
parser.add_argument('--os-user-domain-id',
default=cliutils.env('OS_USER_DOMAIN_ID'),
help='Defaults to env[OS_USER_DOMAIN_ID].')
parser.add_argument('--os-user-domain-name',
default=cliutils.env('OS_USER_DOMAIN_NAME'),
help='Defaults to env[OS_USER_DOMAIN_NAME].')
parser.add_argument('--os-project-id',
default=cliutils.env('OS_PROJECT_ID'),
help='Another way to specify tenant ID. '
'This option is mutually exclusive with '
' --os-tenant-id. '
'Defaults to env[OS_PROJECT_ID].')
parser.add_argument('--os-project-name',
default=cliutils.env('OS_PROJECT_NAME'),
help='Another way to specify tenant name. '
'This option is mutually exclusive with '
' --os-tenant-name. '
'Defaults to env[OS_PROJECT_NAME].')
parser.add_argument('--os-project-domain-id',
default=cliutils.env('OS_PROJECT_DOMAIN_ID'),
help='Defaults to env[OS_PROJECT_DOMAIN_ID].')
parser.add_argument('--os-project-domain-name',
default=cliutils.env('OS_PROJECT_DOMAIN_NAME'),
help='Defaults to env[OS_PROJECT_DOMAIN_NAME].')
def get_base_parser(self):
parser = argparse.ArgumentParser(
prog='watcher',
description=__doc__.strip(),
epilog='See "watcher help COMMAND" '
'for help on a specific command.',
add_help=False,
formatter_class=HelpFormatter,
)
# Global arguments
parser.add_argument('-h', '--help',
action='store_true',
help=argparse.SUPPRESS,
)
parser.add_argument('--version',
action='version',
version=watcherclient.__version__)
parser.add_argument('--debug',
default=bool(cliutils.env('WATCHERCLIENT_DEBUG')),
action='store_true',
help='Defaults to env[WATCHERCLIENT_DEBUG]')
parser.add_argument('-v', '--verbose',
default=False, action="store_true",
help="Print more verbose output")
# for backward compatibility only
parser.add_argument('--cert-file',
dest='os_cert',
help='DEPRECATED! Use --os-cert.')
# for backward compatibility only
parser.add_argument('--key-file',
dest='os_key',
help='DEPRECATED! Use --os-key.')
# for backward compatibility only
parser.add_argument('--ca-file',
dest='os_cacert',
help='DEPRECATED! Use --os-cacert.')
parser.add_argument('--os-username',
default=cliutils.env('OS_USERNAME'),
help='Defaults to env[OS_USERNAME]')
parser.add_argument('--os_username',
help=argparse.SUPPRESS)
parser.add_argument('--os-password',
default=cliutils.env('OS_PASSWORD'),
help='Defaults to env[OS_PASSWORD]')
parser.add_argument('--os_password',
help=argparse.SUPPRESS)
parser.add_argument('--os-tenant-id',
default=cliutils.env('OS_TENANT_ID'),
help='Defaults to env[OS_TENANT_ID]')
parser.add_argument('--os_tenant_id',
help=argparse.SUPPRESS)
parser.add_argument('--os-tenant-name',
default=cliutils.env('OS_TENANT_NAME'),
help='Defaults to env[OS_TENANT_NAME]')
parser.add_argument('--os_tenant_name',
help=argparse.SUPPRESS)
parser.add_argument('--os-auth-url',
default=cliutils.env('OS_AUTH_URL'),
help='Defaults to env[OS_AUTH_URL]')
parser.add_argument('--os_auth_url',
help=argparse.SUPPRESS)
parser.add_argument('--os-region-name',
default=cliutils.env('OS_REGION_NAME'),
help='Defaults to env[OS_REGION_NAME]')
parser.add_argument('--os_region_name',
help=argparse.SUPPRESS)
parser.add_argument('--os-auth-token',
default=cliutils.env('OS_AUTH_TOKEN'),
help='Defaults to env[OS_AUTH_TOKEN]')
parser.add_argument('--os_auth_token',
help=argparse.SUPPRESS)
parser.add_argument('--watcher-url',
default=cliutils.env('WATCHER_URL'),
help='Defaults to env[WATCHER_URL]')
parser.add_argument('--watcher_url',
help=argparse.SUPPRESS)
parser.add_argument('--watcher-api-version',
default=cliutils.env(
'WATCHER_API_VERSION', default='1'),
help='Defaults to env[WATCHER_API_VERSION] '
'or 1')
parser.add_argument('--watcher_api_version',
help=argparse.SUPPRESS)
parser.add_argument('--os-service-type',
default=cliutils.env('OS_SERVICE_TYPE'),
help='Defaults to env[OS_SERVICE_TYPE] or '
'"watcher"')
parser.add_argument('--os_service_type',
help=argparse.SUPPRESS)
parser.add_argument('--os-endpoint',
default=cliutils.env('OS_SERVICE_ENDPOINT'),
help='Specify an endpoint to use instead of '
'retrieving one from the service catalog '
'(via authentication). '
'Defaults to env[OS_SERVICE_ENDPOINT].')
parser.add_argument('--os_endpoint',
help=argparse.SUPPRESS)
parser.add_argument('--os-endpoint-type',
default=cliutils.env('OS_ENDPOINT_TYPE'),
help='Defaults to env[OS_ENDPOINT_TYPE] or '
'"publicURL"')
parser.add_argument('--os_endpoint_type',
help=argparse.SUPPRESS)
# FIXME(gyee): this method should come from python-keystoneclient.
# Will refactor this code once it is available.
# https://bugs.launchpad.net/python-keystoneclient/+bug/1332337
self._append_global_identity_args(parser)
return parser
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
submodule = utils.import_versioned_module(version, 'shell')
submodule.enhance_parser(parser, subparsers, self.subcommands)
utils.define_commands_from_module(subparsers, self, self.subcommands)
return parser
def _setup_debugging(self, debug):
if debug:
logging.basicConfig(
format="%(levelname)s (%(module)s:%(lineno)d) %(message)s",
level=logging.DEBUG)
else:
logging.basicConfig(
format="%(levelname)s %(message)s",
level=logging.CRITICAL)
def do_bash_completion(self):
"""Prints all of the commands and options for bash-completion."""
commands = set()
options = set()
for sc_str, sc in self.subcommands.items():
commands.add(sc_str)
for option in sc._optionals._option_string_actions.keys():
options.add(option)
commands.remove('bash-completion')
print(' '.join(commands | options))
def _discover_auth_versions(self, session, auth_url):
# discover the API versions the server is supporting base on the
# given URL
v2_auth_url = None
v3_auth_url = None
try:
ks_discover = discover.Discover(session=session, auth_url=auth_url)
v2_auth_url = ks_discover.url_for('2.0')
v3_auth_url = ks_discover.url_for('3.0')
except ks_exc.ClientException:
# Identity service may not support discover API version.
# Let's try to figure out the API version from the original URL.
url_parts = urlparse.urlparse(auth_url)
(scheme, netloc, path, params, query, fragment) = url_parts
path = path.lower()
if path.startswith('/v3'):
v3_auth_url = auth_url
elif path.startswith('/v2'):
v2_auth_url = auth_url
else:
# not enough information to determine the auth version
msg = _('Unable to determine the Keystone version '
'to authenticate with using the given '
'auth_url. Identity service may not support API '
'version discovery. Please provide a versioned '
'auth_url instead. %s') % auth_url
raise exc.CommandError(msg)
return (v2_auth_url, v3_auth_url)
def _get_keystone_v3_auth(self, v3_auth_url, **kwargs):
auth_token = kwargs.pop('auth_token', None)
if auth_token:
return v3_auth.Token(v3_auth_url, auth_token)
else:
return v3_auth.Password(v3_auth_url, **kwargs)
def _get_keystone_v2_auth(self, v2_auth_url, **kwargs):
auth_token = kwargs.pop('auth_token', None)
if auth_token:
return v2_auth.Token(
v2_auth_url,
auth_token,
tenant_id=kwargs.pop('project_id', None),
tenant_name=kwargs.pop('project_name', None))
else:
return v2_auth.Password(
v2_auth_url,
username=kwargs.pop('username', None),
password=kwargs.pop('password', None),
tenant_id=kwargs.pop('project_id', None),
tenant_name=kwargs.pop('project_name', None))
def _get_keystone_auth(self, session, auth_url, **kwargs):
# FIXME(dhu): this code should come from keystoneclient
# discover the supported keystone versions using the given url
(v2_auth_url, v3_auth_url) = self._discover_auth_versions(
session=session,
auth_url=auth_url)
# Determine which authentication plugin to use. First inspect the
# auth_url to see the supported version. If both v3 and v2 are
# supported, then use the highest version if possible.
auth = None
if v3_auth_url and v2_auth_url:
user_domain_name = kwargs.get('user_domain_name', None)
user_domain_id = kwargs.get('user_domain_id', None)
project_domain_name = kwargs.get('project_domain_name', None)
project_domain_id = kwargs.get('project_domain_id', None)
# support both v2 and v3 auth. Use v3 if domain information is
# provided.
if (user_domain_name or user_domain_id or project_domain_name or
project_domain_id):
auth = self._get_keystone_v3_auth(v3_auth_url, **kwargs)
else:
auth = self._get_keystone_v2_auth(v2_auth_url, **kwargs)
elif v3_auth_url:
# support only v3
auth = self._get_keystone_v3_auth(v3_auth_url, **kwargs)
elif v2_auth_url:
# support only v2
auth = self._get_keystone_v2_auth(v2_auth_url, **kwargs)
else:
raise exc.CommandError(
_('Unable to determine the Keystone version '
'to authenticate with using the given '
'auth_url.'))
return auth
def main(self, argv):
# Parse args once to find version
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
self._setup_debugging(options.debug)
# build available subcommands based on version
api_version = options.watcher_api_version
subcommand_parser = self.get_subcommand_parser(api_version)
self.parser = subcommand_parser
# Handle top-level --help/-h before attempting to parse
# a command off the command line
if options.help or not argv:
self.do_help(options)
return 0
# Parse args again and call whatever callback was selected
args = subcommand_parser.parse_args(argv)
# Short-circuit and deal with these commands right away.
if args.func == self.do_help:
self.do_help(args)
return 0
elif args.func == self.do_bash_completion:
self.do_bash_completion()
return 0
if not (args.os_auth_token and (args.watcher_url or args.os_endpoint)):
if not args.os_username:
raise exc.CommandError(_("You must provide a username via "
"either --os-username or via "
"env[OS_USERNAME]"))
if not args.os_password:
# No password, If we've got a tty, try prompting for it
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
# Check for Ctl-D
try:
args.os_password = getpass.getpass(
'OpenStack Password: ')
except EOFError:
pass
# No password because we didn't have a tty or the
# user Ctl-D when prompted.
if not args.os_password:
raise exc.CommandError(_("You must provide a password via "
"either --os-password, "
"env[OS_PASSWORD], "
"or prompted response"))
if not (args.os_tenant_id or args.os_tenant_name or
args.os_project_id or args.os_project_name):
raise exc.CommandError(_(
"You must provide a project name or"
" project id via --os-project-name, --os-project-id,"
" env[OS_PROJECT_ID] or env[OS_PROJECT_NAME]. You may"
" use os-project and os-tenant interchangeably."))
if not args.os_auth_url:
raise exc.CommandError(_("You must provide an auth url via "
"either --os-auth-url or via "
"env[OS_AUTH_URL]"))
endpoint = args.watcher_url or args.os_endpoint
service_type = args.os_service_type or 'infra-optim'
project_id = args.os_project_id or args.os_tenant_id
project_name = args.os_project_name or args.os_tenant_name
if (args.os_auth_token and (args.watcher_url or args.os_endpoint)):
kwargs = {
'token': args.os_auth_token,
'insecure': args.insecure,
'timeout': args.timeout,
'ca_file': args.os_cacert,
'cert_file': args.os_cert,
'key_file': args.os_key,
'auth_ref': None,
}
elif (args.os_username and
args.os_password and
args.os_auth_url and
(project_id or project_name)):
keystone_session = kssession.Session.load_from_cli_options(args)
kwargs = {
'username': args.os_username,
'user_domain_id': args.os_user_domain_id,
'user_domain_name': args.os_user_domain_name,
'password': args.os_password,
'auth_token': args.os_auth_token,
'project_id': project_id,
'project_name': project_name,
'project_domain_id': args.os_project_domain_id,
'project_domain_name': args.os_project_domain_name,
}
keystone_auth = self._get_keystone_auth(keystone_session,
args.os_auth_url,
**kwargs)
if not endpoint:
svc_type = args.os_service_type
region_name = args.os_region_name
endpoint = keystone_auth.get_endpoint(keystone_session,
service_type=svc_type,
region_name=region_name)
endpoint_type = args.os_endpoint_type or 'publicURL'
kwargs = {
'auth_url': args.os_auth_url,
'session': keystone_session,
'auth': keystone_auth,
'service_type': service_type,
'endpoint_type': endpoint_type,
'region_name': args.os_region_name,
'username': args.os_username,
'password': args.os_password,
}
client = watcher_client.Client(api_version, endpoint, **kwargs)
try:
args.func(client, args)
except exc.Unauthorized:
raise exc.CommandError(_("Invalid OpenStack Identity credentials"))
@cliutils.arg('command', metavar='<subcommand>', nargs='?',
help='Display help for <subcommand>')
def do_help(self, args):
"""Display help about this program or one of its subcommands."""
if getattr(args, 'command', None):
if args.command in self.subcommands:
self.subcommands[args.command].print_help()
else:
raise exc.CommandError(_("'%s' is not a valid subcommand") %
args.command)
else:
self.parser.print_help()
class HelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
super(HelpFormatter, self).start_section(heading)
def main():
try:
WatcherShell().main(sys.argv[1:])
except KeyboardInterrupt:
print("... terminating watcher client", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,158 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import sys
import fixtures
from keystoneclient import exceptions as keystone_exc
import mock
import six
from testtools import matchers
from watcherclient import exceptions as exc
from watcherclient import shell as watcher_shell
from watcherclient.tests import utils
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_NAME': 'tenant_name',
'OS_AUTH_URL': 'http://no.where/v2.0/'}
class ShellTest(utils.BaseTestCase):
re_options = re.DOTALL | re.MULTILINE
# Patch os.environ to avoid required auth info.
def make_env(self, exclude=None):
env = dict((k, v) for k, v in FAKE_ENV.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def setUp(self):
super(ShellTest, self).setUp()
def shell(self, argstr):
orig = sys.stdout
try:
sys.stdout = six.StringIO()
_shell = watcher_shell.WatcherShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(0, exc_value.code)
finally:
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
return out
def test_help_unknown_command(self):
self.assertRaises(exc.CommandError, self.shell, 'help foofoo')
def test_help(self):
required = [
'.*?^usage: watcher',
'.*?^ +bash-completion',
'.*?^See "watcher help COMMAND" '
'for help on a specific command',
]
for argstr in ['--help', 'help']:
help_text = self.shell(argstr)
for r in required:
self.assertThat(help_text,
matchers.MatchesRegex(r,
self.re_options))
def test_help_on_subcommand(self):
required = [
'.*?^usage: watcher action-show',
".*?^Show detailed information about an action",
]
argstrings = [
'help action-show',
]
for argstr in argstrings:
help_text = self.shell(argstr)
for r in required:
self.assertThat(help_text,
matchers.MatchesRegex(r, self.re_options))
def test_auth_param(self):
self.make_env(exclude='OS_USERNAME')
self.test_help()
@mock.patch('sys.stdin', side_effect=mock.MagicMock)
@mock.patch('getpass.getpass', return_value='password')
def test_password_prompted(self, mock_getpass, mock_stdin):
self.make_env(exclude='OS_PASSWORD')
# We will get a Connection Refused because there is no keystone.
self.assertRaises(keystone_exc.ConnectionRefused,
self.shell, 'action-list')
# Make sure we are actually prompted.
mock_getpass.assert_called_with('OpenStack Password: ')
@mock.patch('sys.stdin', side_effect=mock.MagicMock)
@mock.patch('getpass.getpass', side_effect=EOFError)
def test_password_prompted_ctrlD(self, mock_getpass, mock_stdin):
self.make_env(exclude='OS_PASSWORD')
# We should get Command Error because we mock Ctl-D.
self.assertRaises(exc.CommandError,
self.shell, 'action-list')
# Make sure we are actually prompted.
mock_getpass.assert_called_with('OpenStack Password: ')
@mock.patch('sys.stdin')
def test_no_password_no_tty(self, mock_stdin):
# delete the isatty attribute so that we do not get
# prompted when manually running the tests
del mock_stdin.isatty
required = ('You must provide a password'
' via either --os-password, env[OS_PASSWORD],'
' or prompted response',)
self.make_env(exclude='OS_PASSWORD')
try:
self.shell('action-list')
except exc.CommandError as message:
self.assertEqual(required, message.args)
else:
self.fail('CommandError not raised')
def test_bash_completion(self):
stdout = self.shell('bash-completion')
# just check we have some output
required = [
'.*help',
'.*audit-list',
'.*audit-show',
'.*audit-delete',
'.*audit-update',
'.*audit-template-create',
'.*audit-template-update',
'.*audit-template-list',
'.*audit-template-show',
'.*audit-template-delete',
'.*action-list',
'.*action-show',
'.*action-update',
'.*action-plan-list',
'.*action-plan-show',
'.*action-plan-update',
'.*goal-list',
'.*goal-show',
'.*strategy-list',
'.*strategy-show',
]
for r in required:
self.assertThat(stdout,
matchers.MatchesRegex(r, self.re_options))

View File

@@ -1,153 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.common import utils as commonutils
from watcherclient import exceptions
from watcherclient.tests import utils
import watcherclient.v1.action_plan_shell as ap_shell
class ActionPlanShellTest(utils.BaseTestCase):
def test_do_action_plan_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
action_plan = object()
ap_shell._print_action_plan_show(action_plan)
exp = ['uuid', 'created_at', 'updated_at', 'deleted_at',
'state', 'audit_uuid']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))
def test_do_action_plan_show_by_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan', 'a5199d0e-0702-4613-9234-5ae2af8dafea')
ap_shell.do_action_plan_show(client_mock, args)
client_mock.action_plan.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea'
)
# assert get_by_name() wasn't called
self.assertFalse(client_mock.action_plan.get_by_name.called)
def test_do_action_plan_show_by_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan', 'not_uuid')
self.assertRaises(
exceptions.ValidationError,
ap_shell.do_action_plan_show, client_mock, args)
def test_do_action_plan_delete(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
delete = ['a5199d0e-0702-4613-9234-5ae2af8dafea']
setattr(args, 'action-plan', delete)
ap_shell.do_action_plan_delete(client_mock, args)
client_mock.action_plan.delete.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea')
def test_do_action_plan_delete_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan', ['not_uuid'])
self.assertRaises(
exceptions.ValidationError,
ap_shell.do_action_plan_delete, client_mock, args)
def test_do_action_plan_delete_multiple(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan',
["a5199d0e-0702-4613-9234-5ae2af8dafea",
"a5199d0e-0702-4613-9234-5ae2af8dafeb"])
ap_shell.do_action_plan_delete(client_mock, args)
client_mock.action_plan.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea'),
mock.call('a5199d0e-0702-4613-9234-5ae2af8dafeb')])
def test_do_action_plan_delete_multiple_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan',
["a5199d0e-0702-4613-9234-5ae2af8dafea",
"not_uuid",
"a5199d0e-0702-4613-9234-5ae2af8dafeb"])
self.assertRaises(
exceptions.ValidationError,
ap_shell.do_action_plan_delete, client_mock, args)
client_mock.action_plan.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea')])
def test_do_action_plan_update(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan', "a5199d0e-0702-4613-9234-5ae2af8dafea")
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
ap_shell.do_action_plan_update(client_mock, args)
patch = commonutils.args_array_to_patch(
args.op,
args.attributes[0])
client_mock.action_plan.update.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea', patch)
def test_do_action_plan_update_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'action-plan', "not_uuid")
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
self.assertRaises(
exceptions.ValidationError,
ap_shell.do_action_plan_update, client_mock, args)
def test_do_action_plan_start(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
action_plan = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
setattr(args, 'action-plan', action_plan)
ap_shell.do_action_plan_start(client_mock, args)
patch = commonutils.args_array_to_patch(
'replace', ['state=PENDING'])
client_mock.action_plan.update.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea', patch)
def test_do_action_plan_start_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
action_plan = 'not_uuid'
setattr(args, 'action-plan', action_plan)
self.assertRaises(
exceptions.ValidationError,
ap_shell.do_action_plan_start, client_mock, args)

View File

@@ -1,127 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.common import utils as commonutils
from watcherclient import exceptions
from watcherclient.tests import utils
import watcherclient.v1.action_shell as a_shell
class ActionShellTest(utils.BaseTestCase):
def test_do_action_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
action = object()
a_shell._print_action_show(action)
exp = ['action_type',
'created_at',
'deleted_at',
'next_uuid',
'input_parameters',
'state',
'action_plan_uuid',
'updated_at',
'uuid']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))
def test_do_action_show_by_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
a_shell.do_action_show(client_mock, args)
client_mock.action.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea'
)
# assert get_by_name() wasn't called
self.assertFalse(client_mock.action.get_by_name.called)
def test_do_action_show_by_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = 'not_uuid'
self.assertRaises(exceptions.ValidationError, a_shell.do_action_show,
client_mock, args)
def test_do_action_delete(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = ['a5199d0e-0702-4613-9234-5ae2af8dafea']
a_shell.do_action_delete(client_mock, args)
client_mock.action.delete.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea')
def test_do_action_delete_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = ['not_uuid']
self.assertRaises(exceptions.ValidationError, a_shell.do_action_delete,
client_mock, args)
def test_do_action_delete_multiple(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = ['a5199d0e-0702-4613-9234-5ae2af8dafea',
'a5199d0e-0702-4613-9234-5ae2af8dafeb']
a_shell.do_action_delete(client_mock, args)
client_mock.action.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea'),
mock.call('a5199d0e-0702-4613-9234-5ae2af8dafeb')])
def test_do_action_delete_multiple_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = ['a5199d0e-0702-4613-9234-5ae2af8dafea',
'not_uuid'
'a5199d0e-0702-4613-9234-5ae2af8dafeb']
self.assertRaises(exceptions.ValidationError, a_shell.do_action_delete,
client_mock, args)
client_mock.action.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea')])
def test_do_action_update(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
a_shell.do_action_update(client_mock, args)
patch = commonutils.args_array_to_patch(
args.op,
args.attributes[0])
client_mock.action.update.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea', patch)
def test_do_action_update_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.action = 'not_uuid'
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
self.assertRaises(exceptions.ValidationError, a_shell.do_action_update,
client_mock, args)

View File

@@ -1,165 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.common import utils as commonutils
from watcherclient import exceptions
from watcherclient.tests import utils
import watcherclient.v1.audit_shell as a_shell
class AuditShellTest(utils.BaseTestCase):
def test_do_audit_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
audit = object()
a_shell._print_audit_show(audit)
exp = ['created_at', 'audit_template_uuid', 'audit_template_name',
'updated_at', 'uuid', 'deleted_at', 'state', 'type',
'deadline']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))
def test_do_audit_show_by_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
a_shell.do_audit_show(client_mock, args)
client_mock.audit.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea'
)
# assert get_by_name() wasn't called
self.assertFalse(client_mock.audit.get_by_name.called)
def test_do_audit_show_by_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = 'not_uuid'
self.assertRaises(exceptions.ValidationError, a_shell.do_audit_show,
client_mock, args)
def test_do_audit_delete(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = ['a5199d0e-0702-4613-9234-5ae2af8dafea']
a_shell.do_audit_delete(client_mock, args)
client_mock.audit.delete.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea')
def test_do_audit_delete_with_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = ['not_uuid']
self.assertRaises(exceptions.ValidationError, a_shell.do_audit_delete,
client_mock, args)
def test_do_audit_delete_multiple(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = ['a5199d0e-0702-4613-9234-5ae2af8dafea',
'a5199d0e-0702-4613-9234-5ae2af8dafeb']
a_shell.do_audit_delete(client_mock, args)
client_mock.audit.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea'),
mock.call('a5199d0e-0702-4613-9234-5ae2af8dafeb')])
def test_do_audit_delete_multiple_with_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = ['a5199d0e-0702-4613-9234-5ae2af8dafea',
'not_uuid',
'a5199d0e-0702-4613-9234-5ae2af8dafeb']
self.assertRaises(exceptions.ValidationError, a_shell.do_audit_delete,
client_mock, args)
client_mock.audit.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea')])
def test_do_audit_update(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
a_shell.do_audit_update(client_mock, args)
patch = commonutils.args_array_to_patch(
args.op,
args.attributes[0])
client_mock.audit.update.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea', patch)
def test_do_audit_update_with_not_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit = ['not_uuid']
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
self.assertRaises(exceptions.ValidationError, a_shell.do_audit_update,
client_mock, args)
def test_do_audit_create(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
a_shell.do_audit_create(client_mock, args)
client_mock.audit.create.assert_called_once_with()
def test_do_audit_create_with_audit_template_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.audit_template_uuid = 'a5194d0e-1702-4663-9234-5ab2cf8dafea'
a_shell.do_audit_create(client_mock, args)
client_mock.audit.create.assert_called_once_with(
audit_template_uuid='a5194d0e-1702-4663-9234-5ab2cf8dafea')
def test_do_audit_create_with_audit_template_name(self):
client_mock = mock.MagicMock(
audit_template=mock.Mock(
get=mock.Mock(return_value=mock.Mock(
uuid='a5194d0e-1702-4663-9234-5ab2cf8dafea'))))
args = mock.MagicMock()
args.audit_template_uuid = 'Test AT name'
a_shell.do_audit_create(client_mock, args)
client_mock.audit.create.assert_called_once_with(
audit_template_uuid='a5194d0e-1702-4663-9234-5ab2cf8dafea')
def test_do_audit_create_with_deadline(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.deadline = 'deadline'
a_shell.do_audit_create(client_mock, args)
client_mock.audit.create.assert_called_once_with(
deadline='deadline')
def test_do_audit_create_with_type(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.type = 'type'
a_shell.do_audit_create(client_mock, args)
client_mock.audit.create.assert_called_once_with(type='type')

View File

@@ -1,137 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.common import utils as commonutils
from watcherclient.tests import utils
import watcherclient.v1.audit_template_shell as at_shell
class AuditTemplateShellTest(utils.BaseTestCase):
def test_do_audit_template_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
audit_template = object()
at_shell._print_audit_template_show(audit_template)
exp = [
'uuid', 'created_at', 'updated_at', 'deleted_at',
'description', 'host_aggregate', 'name',
'extra', 'goal_uuid', 'strategy_uuid']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))
def test_do_audit_template_show_by_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'audit-template', 'a5199d0e-0702-4613-9234-5ae2af8dafea')
at_shell.do_audit_template_show(client_mock, args)
client_mock.audit_template.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea'
)
# assert get_by_name() wasn't called
self.assertFalse(client_mock.audit_template.get_by_name.called)
def test_do_audit_template_show_by_name(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'audit-template', "a5199d0e-0702-4613-9234-5ae2af8dafea")
at_shell.do_audit_template_show(client_mock, args)
client_mock.audit_template.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea')
def test_do_audit_template_delete(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'audit-template',
['a5199d0e-0702-4613-9234-5ae2af8dafea'])
at_shell.do_audit_template_delete(client_mock, args)
client_mock.audit_template.delete.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea')
def test_do_audit_template_delete_multiple(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'audit-template',
['a5199d0e-0702-4613-9234-5ae2af8dafea',
'a5199d0e-0702-4613-9234-5ae2af8dafeb'])
at_shell.do_audit_template_delete(client_mock, args)
client_mock.audit_template.delete.assert_has_calls(
[mock.call('a5199d0e-0702-4613-9234-5ae2af8dafea'),
mock.call('a5199d0e-0702-4613-9234-5ae2af8dafeb')])
def test_do_audit_template_update(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'audit-template', "a5199d0e-0702-4613-9234-5ae2af8dafea")
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
at_shell.do_audit_template_update(client_mock, args)
patch = commonutils.args_array_to_patch(
args.op,
args.attributes[0])
client_mock.audit_template.update.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea', patch)
def test_do_audit_template_create(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
at_shell.do_audit_template_create(client_mock, args)
client_mock.audit_template.create.assert_called_once_with()
def test_do_audit_template_create_with_name(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.name = 'my audit template'
at_shell.do_audit_template_create(client_mock, args)
client_mock.audit_template.create.assert_called_once_with(
name='my audit template')
def test_do_audit_template_create_with_description(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.description = 'description'
at_shell.do_audit_template_create(client_mock, args)
client_mock.audit_template.create.assert_called_once_with(
description='description')
def test_do_audit_template_create_with_aggregate(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.host_aggregate = 5
at_shell.do_audit_template_create(client_mock, args)
client_mock.audit_template.create.assert_called_once_with(
host_aggregate=5)
def test_do_audit_template_create_with_extra(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.extra = ['automatic=true']
at_shell.do_audit_template_create(client_mock, args)
client_mock.audit_template.create.assert_called_once_with(
extra={'automatic': True})

View File

@@ -1,34 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.tests import utils
import watcherclient.v1.goal_shell as a_shell
class GoalShellTest(utils.BaseTestCase):
def test_do_goal_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
goal = object()
a_shell._print_goal_show(goal)
exp = ['uuid', 'name', 'display_name']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))

View File

@@ -1,107 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.common import utils as commonutils
from watcherclient.tests import utils
import watcherclient.v1.metric_collector_shell as mc_shell
class MetricCollectorShellTest(utils.BaseTestCase):
def test_do_metric_collector_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
metric_collector = object()
mc_shell._print_metric_collector_show(metric_collector)
exp = ['uuid', 'created_at', 'updated_at', 'deleted_at',
'category', 'endpoint']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))
def test_do_metric_collector_show_by_uuid(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.metric_collector = 'a5199d0e-0702-4613-9234-5ae2af8dafea'
mc_shell.do_metric_collector_show(client_mock, args)
client_mock.metric_collector.get.assert_called_once_with(
'a5199d0e-0702-4613-9234-5ae2af8dafea'
)
# assert get_by_name() wasn't called
self.assertFalse(client_mock.metric_collector.get_by_name.called)
def test_do_metric_collector_delete(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.metric_collector = ['metric_collector_uuid']
mc_shell.do_metric_collector_delete(client_mock, args)
client_mock.metric_collector.delete.assert_called_once_with(
'metric_collector_uuid'
)
def test_do_metric_collector_delete_multiple(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.metric_collector = ['metric_collector_uuid1',
'metric_collector_uuid2']
mc_shell.do_metric_collector_delete(client_mock, args)
client_mock.metric_collector.delete.assert_has_calls(
[mock.call('metric_collector_uuid1'),
mock.call('metric_collector_uuid2')])
def test_do_metric_collector_update(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
setattr(args, 'metric-collector', "metric_collector_uuid")
args.op = 'add'
args.attributes = [['arg1=val1', 'arg2=val2']]
mc_shell.do_metric_collector_update(client_mock, args)
patch = commonutils.args_array_to_patch(
args.op,
args.attributes[0])
client_mock.metric_collector.update.assert_called_once_with(
'metric_collector_uuid', patch)
def test_do_metric_collector_create(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
mc_shell.do_metric_collector_create(client_mock, args)
client_mock.metric_collector.create.assert_called_once_with()
def test_do_metric_collector_create_with_category(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.category = 'mc_category'
mc_shell.do_metric_collector_create(client_mock, args)
client_mock.metric_collector.create.assert_called_once_with(
category='mc_category')
def test_do_metric_collector_create_with_endpoint(self):
client_mock = mock.MagicMock()
args = mock.MagicMock()
args.endpoint = 'mc_endpoint'
mc_shell.do_metric_collector_create(client_mock, args)
client_mock.metric_collector.create.assert_called_once_with(
endpoint='mc_endpoint')

View File

@@ -1,34 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from watcherclient.common import cliutils
from watcherclient.tests import utils
from watcherclient.v1 import strategy_shell
class StrategyShellTest(utils.BaseTestCase):
def test_do_strategy_show(self):
actual = {}
fake_print_dict = lambda data, *args, **kwargs: actual.update(data)
with mock.patch.object(cliutils, 'print_dict', fake_print_dict):
strategy = object()
strategy_shell._print_strategy_show(strategy)
exp = ['uuid', 'name', 'display_name', 'goal_uuid']
act = actual.keys()
self.assertEqual(sorted(exp), sorted(act))

View File

@@ -1,157 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import argparse
from oslo_utils import uuidutils
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient import exceptions
from watcherclient.v1 import resource_fields as res_fields
def _print_action_plan_show(action_plan):
fields = res_fields.ACTION_PLAN_FIELDS
data = dict([(f, getattr(action_plan, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'action-plan',
metavar='<action-plan>',
help="UUID of the action_plan.")
def do_action_plan_show(cc, args):
"""Show detailed information about an action plan."""
action_plan_uuid = getattr(args, 'action-plan')
if uuidutils.is_uuid_like(action_plan_uuid):
action_plan = cc.action_plan.get(action_plan_uuid)
_print_action_plan_show(action_plan)
else:
raise exceptions.ValidationError()
@cliutils.arg(
'--audit',
metavar='<audit>',
help='UUID of an audit used for filtering.')
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about action plans.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of action plans to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Action Plan field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_action_plan_list(cc, args):
"""List the action plans."""
params = {}
if args.audit is not None:
params['audit'] = args.audit
if args.detail:
fields = res_fields.ACTION_PLAN_FIELDS
field_labels = res_fields.ACTION_PLAN_FIELD_LABELS
else:
fields = res_fields.ACTION_PLAN_SHORT_LIST_FIELDS
field_labels = res_fields.ACTION_PLAN_SHORT_LIST_FIELD_LABELS
params.update(utils.common_params_for_list(args,
fields,
field_labels))
action_plan = cc.action_plan.list(**params)
cliutils.print_list(action_plan, fields,
field_labels=field_labels,
sortby_index=None)
@cliutils.arg(
'action-plan',
metavar='<action-plan>',
nargs='+',
help="UUID of the action plan.")
def do_action_plan_delete(cc, args):
"""Delete an action plan."""
for p in getattr(args, 'action-plan'):
if uuidutils.is_uuid_like(p):
cc.action_plan.delete(p)
print ('Deleted action plan %s' % p)
else:
raise exceptions.ValidationError()
@cliutils.arg(
'action-plan',
metavar='<action-plan>',
help="UUID of the action plan.")
@cliutils.arg(
'op',
metavar='<op>',
choices=['add', 'replace', 'remove'],
help="Operation: 'add', 'replace', or 'remove'.")
@cliutils.arg(
'attributes',
metavar='<path=value>',
nargs='+',
action='append',
default=[],
help="Attribute to add, replace, or remove. Can be specified multiple "
"times. For 'remove', only <path> is necessary.")
def do_action_plan_update(cc, args):
"""Update information about an action plan."""
action_plan_uuid = getattr(args, 'action-plan')
if uuidutils.is_uuid_like(action_plan_uuid):
patch = utils.args_array_to_patch(args.op, args.attributes[0])
action_plan = cc.action_plan.update(action_plan_uuid, patch)
_print_action_plan_show(action_plan)
else:
raise exceptions.ValidationError()
@cliutils.arg('action-plan',
metavar='<action-plan>',
help="UUID of the action plan.")
def do_action_plan_start(cc, args):
"""Execute an action plan."""
action_plan_uuid = getattr(args, 'action-plan')
if uuidutils.is_uuid_like(action_plan_uuid):
args.op = 'replace'
args.attributes = [['state=PENDING']]
patch = utils.args_array_to_patch(
args.op,
args.attributes[0])
action_plan = cc.action_plan.update(action_plan_uuid, patch)
_print_action_plan_show(action_plan)
else:
raise exceptions.ValidationError()

View File

@@ -1,138 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import argparse
from oslo_utils import uuidutils
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient import exceptions
from watcherclient.v1 import resource_fields as res_fields
def _print_action_show(action):
fields = res_fields.ACTION_FIELDS
data = dict([(f, getattr(action, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'action',
metavar='<action>',
help="UUID of the action")
def do_action_show(cc, args):
"""Show detailed information about an action."""
if uuidutils.is_uuid_like(args.action):
action = cc.action.get(args.action)
_print_action_show(action)
else:
raise exceptions.ValidationError()
@cliutils.arg(
'--action-plan',
metavar='<action_plan>',
help='UUID of the action plan used for filtering.')
@cliutils.arg(
'--audit',
metavar='<audit>',
help=' UUID of the audit used for filtering.')
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about actions.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of actions to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Action field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_action_list(cc, args):
"""List the actions."""
params = {}
if args.action_plan is not None:
params['action_plan'] = args.action_plan
if args.audit is not None:
params['audit'] = args.audit
if args.detail:
fields = res_fields.ACTION_FIELDS
field_labels = res_fields.ACTION_FIELD_LABELS
else:
fields = res_fields.ACTION_SHORT_LIST_FIELDS
field_labels = res_fields.ACTION_SHORT_LIST_FIELD_LABELS
params.update(utils.common_params_for_list(args,
fields,
field_labels))
action = cc.action.list(**params)
cliutils.print_list(action, fields,
field_labels=field_labels,
sortby_index=None)
@cliutils.arg(
'action',
metavar='<action>',
nargs='+',
help="UUID of the action.")
def do_action_delete(cc, args):
"""Delete an action."""
for p in args.action:
if uuidutils.is_uuid_like(p):
cc.action.delete(p)
print ('Deleted action %s' % p)
else:
raise exceptions.ValidationError()
@cliutils.arg('action', metavar='<action>', help="UUID of the action.")
@cliutils.arg(
'op',
metavar='<op>',
choices=['add', 'replace', 'remove'],
help="Operation: 'add', 'replace', or 'remove'.")
@cliutils.arg(
'attributes',
metavar='<path=value>',
nargs='+',
action='append',
default=[],
help="Attribute to add, replace, or remove. Can be specified multiple "
"times. For 'remove', only <path> is necessary.")
def do_action_update(cc, args):
"""Update information about an action."""
if uuidutils.is_uuid_like(args.action):
patch = utils.args_array_to_patch(args.op, args.attributes[0])
action = cc.action.update(args.action, patch)
_print_action_show(action)
else:
raise exceptions.ValidationError()

View File

@@ -1,167 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import argparse
from oslo_utils import uuidutils
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient import exceptions
from watcherclient.v1 import resource_fields as res_fields
def _print_audit_show(audit):
fields = res_fields.AUDIT_FIELDS
data = dict([(f, getattr(audit, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'audit',
metavar='<audit>',
help="UUID of the audit.")
def do_audit_show(cc, args):
"""Show detailed information about an audit."""
if uuidutils.is_uuid_like(args.audit):
audit = cc.audit.get(args.audit)
_print_audit_show(audit)
else:
raise exceptions.ValidationError()
@cliutils.arg(
'--audit-template',
metavar='<audit_template>',
dest='audit_template',
help='Name or UUID of an audit template used for filtering.')
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about audits.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of audits to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Audit field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_audit_list(cc, args):
"""List the audits."""
params = {}
if args.audit_template is not None:
params['audit_template'] = args.audit_template
if args.detail:
fields = res_fields.AUDIT_FIELDS
field_labels = res_fields.AUDIT_FIELD_LABELS
else:
fields = res_fields.AUDIT_SHORT_LIST_FIELDS
field_labels = res_fields.AUDIT_SHORT_LIST_FIELD_LABELS
# params.update(utils.common_params_for_list(args, fields, field_labels))
audit = cc.audit.list(**params)
cliutils.print_list(audit, fields,
field_labels=field_labels,
sortby_index=None)
@cliutils.arg(
'-a', '--audit-template',
required=True,
dest='audit_template_uuid',
metavar='<audit_template>',
help='Audit template used for this audit (name or uuid).')
@cliutils.arg(
'-d', '--deadline',
dest='deadline',
metavar='<deadline>',
help='Descrition of the audit.')
@cliutils.arg(
'-t', '--type',
dest='type',
metavar='<type>',
default='ONESHOT',
help="Audit type.")
def do_audit_create(cc, args):
"""Create a new audit."""
field_list = ['audit_template_uuid', 'type', 'deadline']
fields = dict((k, v) for (k, v) in vars(args).items()
if k in field_list and not (v is None))
if fields.get('audit_template_uuid'):
if not utils.is_uuid_like(fields['audit_template_uuid']):
fields['audit_template_uuid'] = cc.audit_template.get(
fields['audit_template_uuid']).uuid
audit = cc.audit.create(**fields)
field_list.append('uuid')
data = dict([(f, getattr(audit, f, '')) for f in field_list])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'audit',
metavar='<audit>',
nargs='+',
help="UUID of the audit.")
def do_audit_delete(cc, args):
"""Delete an audit."""
for p in args.audit:
if uuidutils.is_uuid_like(p):
cc.audit.delete(p)
print ('Deleted audit %s' % p)
else:
raise exceptions.ValidationError()
@cliutils.arg(
'audit',
metavar='<audit>',
help="UUID of the audit.")
@cliutils.arg(
'op',
metavar='<op>',
choices=['add', 'replace', 'remove'],
help="Operation: 'add', 'replace', or 'remove'.")
@cliutils.arg(
'attributes',
metavar='<path=value>',
nargs='+',
action='append',
default=[],
help="Attribute to add, replace, or remove. Can be specified multiple "
"times. For 'remove', only <path> is necessary.")
def do_audit_update(cc, args):
"""Update information about an audit."""
if uuidutils.is_uuid_like(args.audit):
patch = utils.args_array_to_patch(args.op, args.attributes[0])
audit = cc.audit.update(args.audit, patch)
_print_audit_show(audit)
else:
raise exceptions.ValidationError()

View File

@@ -1,173 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import argparse
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient.v1 import resource_fields as res_fields
def _print_audit_template_show(audit_template):
fields = res_fields.AUDIT_TEMPLATE_FIELDS
data = dict([(f, getattr(audit_template, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'audit-template',
metavar='<audit-template>',
help="Name or UUID of the audit template.")
def do_audit_template_show(cc, args):
"""Show detailed information about a audit template."""
audit_template = cc.audit_template.get(getattr(args, 'audit-template'))
_print_audit_template_show(audit_template)
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about audit templates.")
@cliutils.arg(
'--goal-uuid',
metavar='<goal-uuid>',
help='UUID the goal used for filtering.')
@cliutils.arg(
'--strategy-uuid',
metavar='<strategy-uuid>',
help='UUID the strategy used for filtering.')
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of audit templates to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Audit template field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_audit_template_list(cc, args):
"""List the audit templates."""
params = {}
if args.goal_uuid is not None:
params['goal_uuid'] = args.goal_uuid
if args.strategy_uuid is not None:
params['strategy_uuid'] = args.strategy_uuid
if args.detail:
fields = res_fields.AUDIT_TEMPLATE_FIELDS
field_labels = res_fields.AUDIT_TEMPLATE_FIELD_LABELS
else:
fields = res_fields.AUDIT_TEMPLATE_SHORT_LIST_FIELDS
field_labels = res_fields.AUDIT_TEMPLATE_SHORT_LIST_FIELD_LABELS
params.update(utils.common_params_for_list(args,
fields,
field_labels))
audit_template = cc.audit_template.list(**params)
cliutils.print_list(audit_template, fields,
field_labels=field_labels,
sortby_index=None)
@cliutils.arg(
'name',
metavar='<name>',
help='Name for this audit template.')
@cliutils.arg(
'goal_uuid',
metavar='<goal-uuid>',
help='Goal ID associated to this audit template.')
@cliutils.arg(
'-s', '--strategy-uuid',
dest='strategy_uuid',
metavar='<strategy-uuid>',
help='Strategy ID associated to this audit template.')
@cliutils.arg(
'-d', '--description',
metavar='<description>',
help='Descrition of the audit template.')
@cliutils.arg(
'-e', '--extra',
metavar='<key=value>',
action='append',
help="Record arbitrary key/value metadata. "
"Can be specified multiple times.")
@cliutils.arg(
'-a', '--host-aggregate',
dest='host_aggregate',
metavar='<host-aggregate>',
help='Name or ID of the host aggregate targeted by this audit template.')
def do_audit_template_create(cc, args):
"""Create a new audit template."""
field_list = ['host_aggregate', 'description', 'name', 'extra',
'goal_uuid', 'strategy_uuid']
fields = dict((k, v) for (k, v) in vars(args).items()
if k in field_list and not (v is None))
fields = utils.args_array_to_dict(fields, 'extra')
audit_template = cc.audit_template.create(**fields)
field_list.append('uuid')
data = dict([(f, getattr(audit_template, f, '')) for f in field_list])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'audit-template',
metavar='<audit-template>',
nargs='+',
help="UUID or name of the audit template.")
def do_audit_template_delete(cc, args):
"""Delete an audit template."""
for p in getattr(args, 'audit-template'):
cc.audit_template.delete(p)
print ('Deleted audit template %s' % p)
@cliutils.arg(
'audit-template',
metavar='<audit-template>',
help="UUID or name of the audit template.")
@cliutils.arg(
'op',
metavar='<op>',
choices=['add', 'replace', 'remove'],
help="Operation: 'add', 'replace', or 'remove'.")
@cliutils.arg(
'attributes',
metavar='<path=value>',
nargs='+',
action='append',
default=[],
help="Attribute to add, replace, or remove. Can be specified multiple "
"times. For 'remove', only <path> is necessary.")
def do_audit_template_update(cc, args):
"""Update information about an audit template."""
patch = utils.args_array_to_patch(args.op, args.attributes[0])
audit_template = cc.audit_template.update(getattr(args, 'audit-template'),
patch)
_print_audit_template_show(audit_template)

View File

@@ -1,79 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient.v1 import resource_fields as res_fields
def _print_goal_show(goal):
fields = res_fields.GOAL_FIELDS
data = dict([(f, getattr(goal, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'goal',
metavar='<goal>',
help="UUID or name of the goal")
def do_goal_show(cc, args):
"""Show detailed information about a _print_goal_show."""
goal = cc.goal.get(args.goal)
_print_goal_show(goal)
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about each goal.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of goals to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Goal field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_goal_list(cc, args):
"""List the goals."""
params = {}
if args.detail:
fields = res_fields.GOAL_FIELDS
field_labels = res_fields.GOAL_FIELD_LABELS
else:
fields = res_fields.GOAL_SHORT_LIST_FIELDS
field_labels = res_fields.GOAL_SHORT_LIST_FIELD_LABELS
params.update(utils.common_params_for_list(args,
fields,
field_labels))
goal = cc.goal.list(**params)
cliutils.print_list(goal, fields,
field_labels=field_labels,
sortby_index=None)

View File

@@ -1,144 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# import argparse
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient.v1 import resource_fields as res_fields
def _print_metric_collector_show(metric_collector):
fields = res_fields.METRIC_COLLECTOR_FIELDS
data = dict([(f, getattr(metric_collector, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'metric_collector',
metavar='<metric_collector>',
help="UUID of the metric collector")
def do_metric_collector_show(cc, args):
"""Show detailed information about a metric collector."""
metric_collector = cc.metric_collector.get(args.metric_collector)
_print_metric_collector_show(metric_collector)
@cliutils.arg(
'--category',
metavar='<category>',
help='Only show information for metric collectors with this category.')
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about metric collectors.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of metric collectors to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Metric collector field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_metric_collector_list(cc, args):
"""List the metric collectors."""
params = {}
if args.detail:
fields = res_fields.METRIC_COLLECTOR_FIELDS
field_labels = res_fields.METRIC_COLLECTOR_FIELD_LABELS
else:
fields = res_fields.METRIC_COLLECTOR_SHORT_LIST_FIELDS
field_labels = res_fields.METRIC_COLLECTOR_SHORT_LIST_FIELD_LABELS
params.update(utils.common_params_for_list(args,
fields,
field_labels))
metric_collector = cc.metric_collector.list(**params)
cliutils.print_list(metric_collector, fields,
field_labels=field_labels,
sortby_index=None)
@cliutils.arg(
'-c', '--category',
metavar='<category>',
required=True,
help='Metric category.')
@cliutils.arg(
'-e', '--endpoint-url',
required=True,
metavar='<endpoint-url>',
help='URL towards which publish metric data.')
def do_metric_collector_create(cc, args):
"""Create a new metric collector."""
field_list = ['category', 'endpoint']
fields = dict((k, v) for (k, v) in vars(args).items()
if k in field_list and not (v is None))
metric_collector = cc.metric_collector.create(**fields)
field_list.append('uuid')
data = dict([(f, getattr(metric_collector, f, '')) for f in field_list])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'metric_collector',
metavar='<metric_collector>',
nargs='+',
help="UUID of the metric collector.")
def do_metric_collector_delete(cc, args):
"""Delete a metric collector."""
for p in args.metric_collector:
cc.metric_collector.delete(p)
print ('Deleted metric collector %s' % p)
@cliutils.arg(
'metric_collector',
metavar='<metric_collector>',
help="UUID of the metric collector.")
@cliutils.arg(
'op',
metavar='<op>',
choices=['add', 'replace', 'remove'],
help="Operation: 'add', 'replace', or 'remove'.")
@cliutils.arg(
'attributes',
metavar='<path=value>',
nargs='+',
action='append',
default=[],
help="Attribute to add, replace, or remove. Can be specified multiple "
"times. For 'remove', only <path> is necessary.")
def do_metric_collector_update(cc, args):
"""Update information about a metric collector."""
patch = utils.args_array_to_patch(args.op, args.attributes[0])
metric_collector = cc.metric_collector.update(
getattr(args, 'metric-collector'), patch)
_print_metric_collector_show(metric_collector)

View File

@@ -1,47 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcherclient.common import utils
from watcherclient.v1 import action_plan_shell
from watcherclient.v1 import action_shell
from watcherclient.v1 import audit_shell
from watcherclient.v1 import audit_template_shell
from watcherclient.v1 import goal_shell
from watcherclient.v1 import strategy_shell
# from watcherclient.v1 import metric_collector_shell
COMMAND_MODULES = [
audit_template_shell,
audit_shell,
action_plan_shell,
action_shell,
# metric_collector_shell,
goal_shell,
strategy_shell,
]
def enhance_parser(parser, subparsers, cmd_mapper):
"""Enhance parser with API version specific options.
Take a basic (nonversioned) parser and enhance it with
commands and options specific for this version of API.
:param parser: top level parser :param subparsers: top level
parser's subparsers collection where subcommands will go
"""
for command_module in COMMAND_MODULES:
utils.define_commands_from_module(subparsers, command_module,
cmd_mapper)

View File

@@ -1,87 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcherclient.common import cliutils
from watcherclient.common import utils
from watcherclient.v1 import resource_fields as res_fields
def _print_strategy_show(strategy):
fields = res_fields.STRATEGY_FIELDS
data = dict([(f, getattr(strategy, f, '')) for f in fields])
cliutils.print_dict(data, wrap=72)
@cliutils.arg(
'strategy',
metavar='<strategy>',
help="UUID or name of the strategy")
def do_strategy_show(cc, args):
"""Show detailed information about a _print_strategy_show."""
strategy = cc.strategy.get(args.strategy)
_print_strategy_show(strategy)
@cliutils.arg(
'--goal-uuid',
metavar='<goal_uuid>',
dest='goal_uuid',
help='UUID of the goal')
@cliutils.arg(
'--detail',
dest='detail',
action='store_true',
default=False,
help="Show detailed information about each strategy.")
@cliutils.arg(
'--limit',
metavar='<limit>',
type=int,
help='Maximum number of strategies to return per request, '
'0 for no limit. Default is the maximum number used '
'by the Watcher API Service.')
@cliutils.arg(
'--sort-key',
metavar='<field>',
help='Goal field that will be used for sorting.')
@cliutils.arg(
'--sort-dir',
metavar='<direction>',
choices=['asc', 'desc'],
help='Sort direction: "asc" (the default) or "desc".')
def do_strategy_list(cc, args):
"""List the strategies."""
params = {}
if args.detail:
fields = res_fields.STRATEGY_FIELDS
field_labels = res_fields.STRATEGY_FIELD_LABELS
else:
fields = res_fields.STRATEGY_SHORT_LIST_FIELDS
field_labels = res_fields.STRATEGY_SHORT_LIST_FIELD_LABELS
if args.goal_uuid:
params["goal_uuid"] = args.goal_uuid
params.update(utils.common_params_for_list(args,
fields,
field_labels))
strategy = cc.strategy.list(**params)
cliutils.print_list(strategy, fields,
field_labels=field_labels,
sortby_index=None)