Add 2.0.0 plugins support

* install action now can install both .fp plugins and .rpm

* remove action can remove .fp and .rpm plugins

* update and downgrade actions are available for new plugin
  formats only

* register action was added for the users who install plugin
  directly with yum/rpm

* unregister action was added for the users who remove plugin
  directly with yum/rpm

* sync action is required for those users who use yum to perform
  massive update which includes a lot of plugins

* all unit tests were rewritten, now each layer
  (cli, objects, utils) is tested separately.

Change-Id: I2bc837fdaf356a4f4c7291cd0dbfc0a92dc433d6
Implements: blueprint plugins-security-fixes-delivery
This commit is contained in:
Evgeniy L 2015-02-20 19:15:02 +03:00
parent c4f8f7cf81
commit 75c87d131f
14 changed files with 1466 additions and 307 deletions

View File

@ -16,4 +16,4 @@
fuelclient command line interface
"""
"""

View File

@ -198,8 +198,8 @@ class EnvironmentAction(Action):
@check_any("download", "upload")
def deployment_tasks(self, params):
"""Modify deployment_tasks for environment.
fuel env --env 1 --deployment-tasks --download
fuel env --env 1 --deployment-tasks --upload
fuel env --env 1 --deployment-tasks --download
fuel env --env 1 --deployment-tasks --upload
"""
cluster = Environment(params.env)
dir_path = self.full_path_directory(

View File

@ -12,12 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from fuelclient.cli import error
import fuelclient.cli.arguments as Args
from fuelclient.cli.actions.base import Action
import fuelclient.cli.arguments as Args
from fuelclient.cli import error
from fuelclient.cli.formatting import format_table
from fuelclient.objects.plugins import Plugins
from fuelclient import utils
class PluginAction(Action):
@ -35,56 +36,161 @@ class PluginAction(Action):
def __init__(self):
super(PluginAction, self).__init__()
self.args = [
Args.get_list_arg("List all available plugins."),
Args.get_plugin_install_arg("Install action"),
Args.get_plugin_remove_arg("Remove action"),
Args.get_force_arg("Update action"),
Args.group(
Args.get_list_arg(
"List of all registered plugins."),
Args.get_plugin_install_arg(
"Install and register plugin package"),
Args.get_plugin_remove_arg(
"Remove and unregister plugin"),
Args.get_plugin_register_arg(
"Register installed plugin"),
Args.get_plugin_unregister_arg(
"Unregister plugin"),
Args.get_plugin_update_arg(
"Update installed plugin"),
Args.get_plugin_downgrade_arg(
"Downgrade installed plugin"),
Args.get_plugin_sync_arg(
"Synchronise plugins with API service")),
Args.get_force_arg("Force action")
]
self.flag_func_map = (
("install", self.install),
("remove", self.remove),
("update", self.update),
("downgrade", self.downgrade),
("sync", self.sync),
("register", self.register),
("unregister", self.unregister),
(None, self.list),
)
def list(self, params):
"""Print all available plugins:
"""Print all available plugins
fuel plugins
fuel plugins --list
"""
plugins = Plugins.get_all_data()
self.serializer.print_to_output(
plugins,
format_table(
plugins,
acceptable_keys=self.acceptable_keys
)
)
format_table(plugins, acceptable_keys=self.acceptable_keys))
def install(self, params):
"""Enable plugin for environment
fuel plugins --install /tmp/plugin_sample.fb
"""Install plugin archive and register in API service
fuel plugins --install plugin-name-2.0-2.0.1-0.noarch.rpm
"""
results = Plugins.install_plugin(params.install, params.force)
file_path = params.install
self.check_file(file_path)
results = Plugins.install(file_path, force=params.force)
self.serializer.print_to_output(
results,
"Plugin {0} was successfully installed.".format(
params.install))
def remove(self, params):
"""Remove plugin from environment
fuel plugins --remove plugin_sample
fuel plugins --remove plugin_sample==1.0.1
"""Remove plugin from file system and from API service
fuel plugins --remove plugin-name==1.0.1
"""
s = params.remove.split('==')
plugin_name = s[0]
plugin_version = None
if len(s) == 2:
plugin_version = s[1]
elif len(s) > 2:
raise error.ArgumentException(
'Syntax: fuel plugins --remove fuel_plugin==1.0')
results = Plugins.remove_plugin(
plugin_name, plugin_version=plugin_version)
name, version = self.parse_name_version(params.remove)
results = Plugins.remove(name, version)
self.serializer.print_to_output(
results,
"Plugin {0} was successfully removed.".format(params.remove))
def update(self, params):
"""Update plugin from one minor version to another.
For example if there is a plugin with version 2.0.0,
plugin with version 2.0.1 can be used as update. But
plugin with version 2.1.0, cannot be used to update
plugin. Note that update is supported for plugins
beginning from package_version 2.0.0
fuel plugins --update plugin-name-2.0-2.0.1-0.noarch.rpm
"""
plugin_path = params.update
self.check_file(plugin_path)
result = Plugins.update(plugin_path)
self.serializer.print_to_output(
result,
"Plugin {0} was successfully updated.".format(plugin_path))
def downgrade(self, params):
"""Downgrade plugin from one minor version to another.
For example if there is a plugin with version 2.0.1,
plugin with version 2.0.0 can be used to perform downgrade.
Plugin with version 1.0.0, cannot be used to perform downgrade
plugin. Note that downgrade is supported for plugins
beginning from package_version 2.0.0
fuel plugins --downgrade plugin-name-2.0-2.0.1-0.noarch.rpm
"""
plugin_path = params.downgrade
self.check_file(plugin_path)
result = Plugins.downgrade(plugin_path)
self.serializer.print_to_output(
result,
"Plugin {0} was successfully downgraded.".format(plugin_path))
def sync(self, params):
"""Synchronise plugins on file system with plugins in
API service, creates plugin if it is not exists,
updates existent plugins
fuel plugins --sync
"""
Plugins.sync()
self.serializer.print_to_output(
None, "Plugins were successfully synchronized.")
def register(self, params):
"""Register plugin in API service
fuel plugins --register plugin-name==1.0.1
"""
name, version = self.parse_name_version(params.register)
result = Plugins.register(name, version, force=params.force)
self.serializer.print_to_output(
result,
"Plugin {0} was successfully registered.".format(params.register))
def unregister(self, params):
"""Deletes plugin from API service
fuel plugins --unregister plugin-name==1.0.1
"""
name, version = self.parse_name_version(params.unregister)
result = Plugins.unregister(name, version)
self.serializer.print_to_output(
result,
"Plugin {0} was successfully unregistered."
"".format(params.unregister))
def parse_name_version(self, param):
"""Takes the string and returns name and version
:param str param: string with name and version
:raises: error.ArgumentException if version is not specified
"""
attrs = param.split('==')
if len(attrs) != 2:
raise error.ArgumentException(
'Syntax: fuel plugins <action> fuel_plugin==1.0.0')
return attrs
def check_file(self, file_path):
"""Checks if file exists
:param str file_path: path to the file
:raises: error.ArgumentException if file does not exist
"""
if not utils.file_exists(file_path):
raise error.ArgumentException(
'File "{0}" does not exists'.format(file_path))

View File

@ -21,8 +21,8 @@ from fuelclient.cli.actions.base import check_any
import fuelclient.cli.arguments as Args
from fuelclient.cli.arguments import group
from fuelclient.cli.formatting import format_table
from fuelclient.cli import utils
from fuelclient.objects.release import Release
from fuelclient import utils
class ReleaseAction(Action):

View File

@ -435,7 +435,7 @@ def get_task_arg(help_msg):
def get_plugin_install_arg(help_msg):
return get_str_arg(
"install",
flags=("--install",),
metavar='PLUGIN_FILE',
help=help_msg
)
@ -443,7 +443,46 @@ def get_plugin_install_arg(help_msg):
def get_plugin_remove_arg(help_msg):
return get_str_arg(
"remove",
flags=("--remove",),
metavar='PLUGIN_NAME==VERSION',
help=help_msg
)
def get_plugin_register_arg(help_msg):
return get_str_arg(
"register",
metavar='PLUGIN_NAME==VERSION',
help=help_msg
)
def get_plugin_unregister_arg(help_msg):
return get_str_arg(
"unregister",
metavar='PLUGIN_NAME==VERSION',
help=help_msg
)
def get_plugin_update_arg(help_msg):
return get_str_arg(
"update",
metavar='PLUGIN_FILE',
help=help_msg
)
def get_plugin_downgrade_arg(help_msg):
return get_str_arg(
"downgrade",
metavar='PLUGIN_FILE',
help=help_msg
)
def get_plugin_sync_arg(help_msg):
return get_boolean_arg(
"sync",
help=help_msg
)

View File

@ -83,6 +83,10 @@ class SettingsException(FuelClientException):
"""Indicates errors or unexpected behaviour in processing settings."""
class ExecutedErrorNonZeroExitCode(FuelClientException):
"""Subshell command returned non-zero exit code."""
def exceptions_decorator(func):
"""Handles HTTP errors and expected exceptions that may occur
in methods of APIClient class

View File

@ -1,29 +0,0 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fnmatch import fnmatch
import os
def iterfiles(dir_path, file_pattern):
"""Returns generator where each item is a path to file, that satisfies
file_patterns condtion
:param dir_path: path to directory, e.g /etc/puppet/
:param file_pattern: unix filepattern to match files
"""
for root, dirs, file_names in os.walk(dir_path):
for file_name in file_names:
if fnmatch(file_name, file_pattern):
yield os.path.join(root, file_name)

View File

@ -12,116 +12,475 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
import shutil
import sys
import tarfile
from distutils.version import StrictVersion
import six
import yaml
from fuelclient.cli import error
from fuelclient.objects import base
from fuelclient import utils
EXTRACT_PATH = "/var/www/nailgun/plugins/"
PLUGINS_PATH = '/var/www/nailgun/plugins/'
METADATA_MASK = '/var/www/nailgun/plugins/*/metadata.yaml'
VERSIONS_PATH = '/etc/fuel/version.yaml'
def raise_error_if_not_master():
"""Raises error if it's not Fuel master
:raises: error.WrongEnvironmentError
"""
if not os.path.exists(VERSIONS_PATH):
raise error.WrongEnvironmentError(
'Action can be performed from Fuel master node only.')
def master_only(f):
"""Decorator for the method, which raises error, if method
is called on the node which is not Fuel master
"""
@six.wraps(f)
def print_message(*args, **kwargs):
raise_error_if_not_master()
return f(*args, **kwargs)
return print_message
@six.add_metaclass(abc.ABCMeta)
class BasePlugin(object):
@abc.abstractmethod
def install(cls, plugin_path, force=False):
"""Installs plugin package
"""
@abc.abstractmethod
def update(cls, plugin_path):
"""Updates the plugin
"""
@abc.abstractmethod
def remove(cls, plugin_name, plugin_version):
"""Removes the plugin from file system
"""
@abc.abstractmethod
def downgrade(cls, plugin_path):
"""Downgrades the plugin
"""
@abc.abstractmethod
def name_from_file(cls, file_path):
"""Retrieves name from plugin package
"""
@abc.abstractmethod
def version_from_file(cls, file_path):
"""Retrieves version from plugin package
"""
class PluginV1(BasePlugin):
metadata_config = 'metadata.yaml'
def deprecated(f):
"""Prints deprecation warning for old plugins
"""
@six.wraps(f)
def print_message(*args, **kwargs):
six.print_(
'DEPRECATION WARNING: The plugin has old 1.0 package format, '
'this format does not support many features, such as '
'plugins updates, find plugin in new format or migrate '
'and rebuild this one.', file=sys.stderr)
return f(*args, **kwargs)
return print_message
@classmethod
@master_only
@deprecated
def install(cls, plugin_path, force=False):
plugin_tar = tarfile.open(plugin_path, 'r')
try:
plugin_tar.extractall(PLUGINS_PATH)
finally:
plugin_tar.close()
@classmethod
@master_only
@deprecated
def remove(cls, plugin_name, plugin_version):
plugin_path = os.path.join(
PLUGINS_PATH, '{0}-{1}'.format(plugin_name, plugin_version))
shutil.rmtree(plugin_path)
@classmethod
def update(cls, _):
raise error.BadDataException(
'Update action is not supported for old plugins with '
'package version "1.0.0", you can install your plugin '
'or use newer plugin format.')
@classmethod
def downgrade(cls, _):
raise error.BadDataException(
'Downgrade action is not supported for old plugins with '
'package version "1.0.0", you can install your plugin '
'or use newer plugin format.')
@classmethod
def name_from_file(cls, file_path):
"""Retrieves plugin name from plugin archive.
:param str plugin_path: path to the plugin
:returns: plugin name
"""
return cls._get_metadata(file_path)['name']
@classmethod
def version_from_file(cls, file_path):
"""Retrieves plugin version from plugin archive.
:param str plugin_path: path to the plugin
:returns: plugin version
"""
return cls._get_metadata(file_path)['version']
@classmethod
def _get_metadata(cls, plugin_path):
"""Retrieves metadata from plugin archive
:param str plugin_path: path to the plugin
:returns: metadata from the plugin
"""
plugin_tar = tarfile.open(plugin_path, 'r')
try:
for member_name in plugin_tar.getnames():
if cls.metadata_config in member_name:
return yaml.load(
plugin_tar.extractfile(member_name).read())
finally:
plugin_tar.close()
class PluginV2(BasePlugin):
@classmethod
@master_only
def install(cls, plugin_path, force=False):
action = 'install'
if force:
action = 'reinstall'
utils.exec_cmd('yum -y {0} {1}'.format(action, plugin_path))
@classmethod
@master_only
def remove(cls, name, version):
rpm_name = '{0}-{1}'.format(name, utils.major_plugin_version(version))
utils.exec_cmd('yum -y remove {0}'.format(rpm_name))
@classmethod
@master_only
def update(cls, plugin_path):
utils.exec_cmd('yum -y update {0}'.format(plugin_path))
@classmethod
@master_only
def downgrade(cls, plugin_path):
utils.exec_cmd('yum -y downgrade {0}'.format(plugin_path))
@classmethod
def name_from_file(cls, file_path):
"""Retrieves plugin name from RPM. RPM name contains
the version of the plugin, which should be removed.
:param str file_path: path to rpm file
:returns: name of the plugin
"""
for line in utils.exec_cmd_iterator(
"rpm -qp --queryformat '%{{name}}' {0}".format(file_path)):
name = line
break
return cls._remove_major_plugin_version(name)
@classmethod
def version_from_file(cls, file_path):
"""Retrieves plugin version from RPM.
:param str file_path: path to rpm file
:returns: version of the plugin
"""
for line in utils.exec_cmd_iterator(
"rpm -qp --queryformat '%{{version}}' {0}".format(file_path)):
version = line
break
return version
@classmethod
def _remove_major_plugin_version(cls, name):
"""Removes the version from plugin name.
Here is an example: "name-1.0" -> "name"
:param str name: plugin name
:returns: the name withot version
"""
name_wo_version = name
if '-' in name_wo_version:
name_wo_version = '-'.join(name.split('-')[:-1])
return name_wo_version
class Plugins(base.BaseObject):
class_api_path = "plugins/"
class_instance_path = "plugins/{id}"
metadata_config = 'metadata.yaml'
class_api_path = 'plugins/'
class_instance_path = 'plugins/{id}'
@classmethod
def validate_environment(cls):
return os.path.exists(VERSIONS_PATH)
def register(cls, name, version, force=False):
"""Tries to find plugin on file system, creates
it in API service if it exists.
@classmethod
def get_metadata(cls, plugin_tar):
for member_name in plugin_tar.getnames():
if cls.metadata_config in member_name:
return yaml.load(plugin_tar.extractfile(member_name).read())
raise error.BadDataException(
"Tarfile {name} doesn't have {config}".format(
name=plugin_tar.name, config=cls.metadata_config))
@classmethod
def get_plugin(cls, plugin_name, plugin_version=None):
"""Returns plugin fetched by name and optionally by version.
If multiple plugin versions are found and version is not specified
error is returned.
:returns: dictionary with plugin data
:param str name: plugin name
:param str version: plugin version
:param str force: if True updates meta information
about the plugin even it does not
support updates
"""
checker = lambda p: p['name'] == plugin_name
if plugin_version is not None:
checker = lambda p: \
(p['name'], p['version']) == (plugin_name, plugin_version)
plugins = filter(checker, cls.get_all_data())
if len(plugins) == 0:
if plugin_version is None:
raise error.BadDataException(
'Plugin {plugin_name} does not exist'.format(
plugin_name=plugin_name)
)
else:
raise error.BadDataException(
'Plugin {plugin_name}, version {plugin_version} does '
'not exist'.format(
plugin_name=plugin_name,
plugin_version=plugin_version)
)
if len(plugins) > 1 and plugin_version is None:
metadata = None
for m in utils.glob_and_parse_yaml(METADATA_MASK):
if m.get('version') == version and \
m.get('name') == name:
metadata = m
break
if not metadata:
raise error.BadDataException(
'Multiple versions of plugin {plugin_name} found. '
'Please consider specifying a version in the '
'{plugin_name}==<version> format'.format(
plugin_name=plugin_name)
)
'Plugin {0} with version {1} does '
'not exist, install it and try again'.format(
name, version))
return cls.update_or_create(metadata, force=force)
@classmethod
def sync(cls):
"""Checks all of the plugins on file systems,
and makes sure that they have consistent information
in API service.
"""
for metadata in utils.glob_and_parse_yaml(METADATA_MASK):
cls.update_or_create(metadata, force=True)
@classmethod
def unregister(cls, name, version):
"""Removes the plugin from API service
:param str name: plugin name
:param str version: plugin version
"""
plugin = cls.get_plugin(name, version)
return cls.connection.delete_request(
cls.class_instance_path.format(**plugin))
@classmethod
def install(cls, plugin_path, force=False):
"""Installs the package, and creates data in API service
:param str name: plugin name
:param str version: plugin version
"""
plugin = cls.make_obj_by_file(plugin_path)
name = plugin.name_from_file(plugin_path)
version = plugin.version_from_file(plugin_path)
plugin.install(plugin_path, force=force)
return cls.register(name, version, force=force)
@classmethod
def remove(cls, plugin_name, plugin_version):
"""Removes the package, and updates data in API service
:param str name: plugin name
:param str version: plugin version
"""
plugin = cls.make_obj_by_name(plugin_name, plugin_version)
plugin.remove(plugin_name, plugin_version)
return cls.unregister(plugin_name, plugin_version)
@classmethod
def update(cls, plugin_path):
"""Updates the package, and updates data in API service
:param str plugin_path: path to the plugin
"""
plugin = cls.make_obj_by_file(plugin_path)
name = plugin.name_from_file(plugin_path)
version = plugin.version_from_file(plugin_path)
plugin.update(plugin_path)
return cls.register(name, version)
@classmethod
def downgrade(cls, plugin_path):
"""Downgrades the package, and updates data in API service
:param str plugin_path: path to the plugin
"""
plugin = cls.make_obj_by_file(plugin_path)
name = plugin.name_from_file(plugin_path)
version = plugin.version_from_file(plugin_path)
plugin.downgrade(plugin_path)
return cls.register(name, version)
@classmethod
def make_obj_by_name(cls, name, version):
"""Finds appropriate plugin class version,
by plugin version and name.
:param str name:
:param str version:
:returns: plugin class
:raises: error.BadDataException unsupported package version
"""
plugin = cls.get_plugin(name, version)
package_version = plugin['package_version']
if StrictVersion('1.0.0') <= \
StrictVersion(package_version) < \
StrictVersion('2.0.0'):
return PluginV1
elif StrictVersion('2.0.0') <= StrictVersion(package_version):
return PluginV2
raise error.BadDataException(
'Plugin {0}=={1} has unsupported package version {2}'.format(
name, version, package_version))
@classmethod
def make_obj_by_file(cls, file_path):
"""Finds appropriate plugin class version,
by plugin file.
:param str file_path: plugin path
:returns: plugin class
:raises: error.BadDataException unsupported package version
"""
_, ext = os.path.splitext(file_path)
if ext == '.fp':
return PluginV1
elif ext == '.rpm':
return PluginV2
raise error.BadDataException(
'Plugin {0} has unsupported format {1}'.format(
file_path, ext))
@classmethod
def update_or_create(cls, metadata, force=False):
"""Try to update existent plugin or create new one.
:param dict metadata: plugin information
:param bool force: updates existent plugin even if
it is not updatable
"""
# Try to update plugin
plugin_for_update = cls.get_plugin_for_update(metadata)
if plugin_for_update:
url = cls.class_instance_path.format(id=plugin_for_update['id'])
resp = cls.connection.put_request(url, metadata)
return resp
# If plugin is not updatable it means that we should
# create new instance in Nailgun
resp_raw = cls.connection.post_request_raw(
cls.class_api_path, metadata)
resp = resp_raw.json()
if resp_raw.status_code == 409 and force:
# Replace plugin information
url = cls.class_instance_path.format(id=resp['id'])
resp_raw = cls.connection.put_request(url, metadata)
resp = resp_raw.json()
else:
resp_raw.raise_for_status()
return resp
@classmethod
def get_plugin_for_update(cls, metadata):
"""Retrieves plugins which can be updated
:param dict metadata: plugin metadata
:returns: dict with plugin which can be updated or None
"""
if not cls.is_updatable(metadata['package_version']):
return
plugins = filter(
lambda p: p['name'] == metadata['name'] and
cls.is_updatable(p['package_version']) and
utils.major_plugin_version(metadata['version']) ==
utils.major_plugin_version(p['version']),
cls.get_all_data())
plugin = None
if plugins:
# List should contain only one plugin, but just
# in case we make sure that we get plugin with
# higher version
plugin = sorted(
plugins,
key=lambda p: StrictVersion(p['version']))[0]
return plugin
@classmethod
def is_updatable(cls, package_version):
"""Checks if plugin's package version supports updates.
:param str package_version: package version of the plugin
:returns: True if plugin can be updated
False if plugin cannot be updated
"""
return StrictVersion('2.0.0') <= StrictVersion(package_version)
@classmethod
def get_plugin(cls, name, version):
"""Returns plugin fetched by name and version.
:param str name: plugin name
:param str version: plugin version
:returns: dictionary with plugin data
:raises: error.BadDataException if no plugin was found
"""
plugins = filter(
lambda p: (p['name'], p['version']) == (name, version),
cls.get_all_data())
if not plugins:
raise error.BadDataException(
'Plugin "{name}" with version {version}, does '
'not exist'.format(name=name, version=version))
return plugins[0]
@classmethod
def add_plugin(cls, plugin_meta, plugin_tar):
return plugin_tar.extractall(EXTRACT_PATH)
@classmethod
def install_plugin(cls, plugin_path, force=False):
if not cls.validate_environment():
raise error.WrongEnvironmentError(
'Plugin can be installed only from master node.')
plugin_tar = tarfile.open(plugin_path, 'r')
try:
metadata = cls.get_metadata(plugin_tar)
resp = cls.connection.post_request_raw(
cls.class_api_path, metadata)
if resp.status_code == 409 and force:
url = cls.class_instance_path.format(id=resp.json()['id'])
resp = cls.connection.put_request(
url, metadata)
else:
resp.raise_for_status()
cls.add_plugin(metadata, plugin_tar)
finally:
plugin_tar.close()
return resp
@classmethod
def remove_plugin(cls, plugin_name, plugin_version=None):
if not cls.validate_environment():
raise error.WrongEnvironmentError(
'Plugin can be removed only from master node.')
plugin = cls.get_plugin(plugin_name, plugin_version)
resp = cls.connection.delete_request(
cls.class_instance_path.format(**plugin)
)
plugin_path = os.path.join(
EXTRACT_PATH,
'{name}-{version}'.format(**plugin)
)
shutil.rmtree(plugin_path)
return resp

View File

@ -25,6 +25,8 @@ import subprocess
import sys
import tempfile
from StringIO import StringIO
import mock
from fuelclient.cli.parser import main
@ -35,6 +37,20 @@ log = logging.getLogger("CliTest.ExecutionLog")
log.setLevel(logging.DEBUG)
class FakeFile(StringIO):
"""It's a fake file which returns StringIO
when file opens with 'with' statement.
NOTE(eli): We cannot use mock_open from mock library
here, because it hangs when we use 'with' statement,
and when we want to read file by chunks.
"""
def __enter__(self):
return self
def __exit__(self, *args):
pass
class CliExectutionResult:
def __init__(self, process_handle, out, err):
self.return_code = process_handle.returncode
@ -62,6 +78,17 @@ class UnitTestCase(TestCase):
auth.return_value = False
return self.execute(command)
def mock_open(self, text, filename='some.file'):
"""Mocks builtin open function.
Usage example:
with mock.patch('__builtin__.open', self.mock_open('file content')):
# call mocked code
"""
fileobj = FakeFile(text)
setattr(fileobj, 'name', filename)
return mock.MagicMock(return_value=fileobj)
class BaseTestCase(UnitTestCase):
nailgun_root = os.environ.get('NAILGUN_ROOT', '/tmp/fuel_web/nailgun')

View File

@ -79,7 +79,7 @@ class TestClusterDeploymentTasksActions(base.UnitTestCase):
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.utils.iterfiles')
@patch('fuelclient.utils.iterfiles')
class TestSyncDeploymentTasks(base.UnitTestCase):
def test_sync_deployment_scripts(self, mfiles, mopen, mrequests):

View File

@ -14,169 +14,164 @@
# License for the specific language governing permissions and limitations
# under the License.
from mock import Mock
from mock import patch
from fuelclient.tests import base
DATA = """
name: sample
version: 0.1.0
"""
from fuelclient.cli.actions import PluginAction
from fuelclient.cli import error
from fuelclient.cli.formatting import format_table
from fuelclient.cli.serializers import Serializer
from fuelclient.objects.plugins import Plugins
plugin_data = {
'id': 1,
'name': 'plugin_name',
'version': '1.0.0',
'package_version': '1.0.0'
}
@patch('fuelclient.client.requests')
class TestPluginsActions(base.UnitTestCase):
def test_001_plugins_action(self, mrequests):
self.execute(['fuel', 'plugins'])
plugins_call = mrequests.get.call_args_list[-1]
url = plugins_call[0][0]
self.assertIn('api/v1/plugins', url)
def setUp(self):
super(TestPluginsActions, self).setUp()
self.file_name = '/tmp/path/plugin.fp'
self.attr_name = 'plugin_name==version'
self.name = 'plugin_name'
self.version = 'version'
@patch('fuelclient.objects.plugins.tarfile')
@patch('fuelclient.objects.plugins.os')
def test_install_plugin(self, mos, mtar, mrequests):
mos.path.exists.return_value = True
mtar.open().getnames.return_value = ['metadata.yaml']
mtar.open().extractfile().read.return_value = DATA
response_mock = Mock(status_code=201)
mrequests.post.return_value = response_mock
self.execute(
['fuel', 'plugins', '--install', '/tmp/sample.fp'])
self.assertEqual(mrequests.post.call_count, 1)
self.assertEqual(mrequests.put.call_count, 0)
def exec_plugins(self, actions):
plugins_cmd = ['fuel', 'plugins']
plugins_cmd.extend(actions)
self.execute(plugins_cmd)
@patch('fuelclient.objects.plugins.tarfile')
@patch('fuelclient.objects.plugins.os')
def test_install_plugin_with_force(self, mos, mtar, mrequests):
mos.path.exists.return_value = True
mtar.open().getnames.return_value = ['metadata.yaml']
mtar.open().extractfile().read.return_value = DATA
response_mock = Mock(status_code=409)
response_mock.json.return_value = {'id': '12'}
mrequests.post.return_value = response_mock
self.execute(
['fuel', 'plugins', '--install', '/tmp/sample.fp', '--force'])
self.assertEqual(mrequests.post.call_count, 1)
self.assertEqual(mrequests.put.call_count, 1)
def assert_print_table(self, print_mock, plugins):
print_mock.assert_called_once_with(
plugins, format_table(
plugins,
acceptable_keys=PluginAction.acceptable_keys))
@patch('fuelclient.objects.plugins.os')
@patch('fuelclient.objects.plugins.shutil')
def test_remove_plugin_single(self, mshutil, mos, mrequests):
mos.path.exists.return_value = True
mresponse = Mock(status_code=201)
mresponse.json.return_value = [
{
'id': 1,
'name': 'test',
'version': '1.0.0',
}
]
mrequests.get.return_value = mresponse
mrequests.delete.return_value = Mock(status_code=200)
def assert_print(self, print_mock, result, msg):
print_mock.assert_called_once_with(result, msg)
self.execute(
['fuel', 'plugins', '--remove', 'test']
)
@patch.object(Serializer, 'print_to_output')
@patch.object(Plugins, 'get_all_data')
def test_list_default(self, get_mock, print_mock):
plugins = [plugin_data, plugin_data]
get_mock.return_value = plugins
self.assertEqual(mrequests.delete.call_count, 1)
self.assertEqual(mshutil.rmtree.call_count, 1)
self.exec_plugins([])
get_mock.assert_called_once_with()
self.assert_print_table(print_mock, plugins)
@patch.object(Serializer, 'print_to_output')
@patch.object(Plugins, 'get_all_data')
def test_list(self, get_mock, print_mock):
plugins = [plugin_data, plugin_data]
get_mock.return_value = plugins
self.exec_plugins(['--list'])
get_mock.assert_called_once_with()
self.assert_print_table(print_mock, plugins)
@patch.object(Serializer, 'print_to_output')
@patch.object(PluginAction, 'check_file')
@patch.object(Plugins, 'install', return_value='some_result')
def test_install(self, install_mock, check_mock, print_mock):
self.exec_plugins(['--install', self.file_name])
self.assert_print(
print_mock,
'some_result',
'Plugin /tmp/path/plugin.fp was successfully installed.')
install_mock.assert_called_once_with(self.file_name, force=False)
check_mock.assert_called_once_with(self.file_name)
@patch.object(Serializer, 'print_to_output')
@patch.object(Plugins, 'remove', return_value='some_result')
def test_remove(self, remove_mock, print_mock):
self.exec_plugins(['--remove', self.attr_name])
self.assert_print(
print_mock,
'some_result',
'Plugin plugin_name==version was successfully removed.')
remove_mock.assert_called_once_with(self.name, self.version)
@patch.object(Serializer, 'print_to_output')
@patch.object(PluginAction, 'check_file')
@patch.object(Plugins, 'update', return_value='some_result')
def test_update(self, update_mock, check_mock, print_mock):
self.exec_plugins(['--update', self.file_name])
self.assert_print(
print_mock,
'some_result',
'Plugin /tmp/path/plugin.fp was successfully updated.')
update_mock.assert_called_once_with(self.file_name)
check_mock.assert_called_once_with(self.file_name)
@patch.object(Serializer, 'print_to_output')
@patch.object(PluginAction, 'check_file')
@patch.object(Plugins, 'downgrade', return_value='some_result')
def test_downgrade(self, downgrade_mock, check_mock, print_mock):
self.exec_plugins(['--downgrade', self.file_name])
self.assert_print(
print_mock,
'some_result',
'Plugin /tmp/path/plugin.fp was successfully downgraded.')
downgrade_mock.assert_called_once_with(self.file_name)
check_mock.assert_called_once_with(self.file_name)
@patch.object(Plugins, 'sync')
def test_sync(self, sync_mock):
self.exec_plugins(['--sync'])
sync_mock.assert_called_once_with()
@patch.object(Serializer, 'print_to_output')
@patch.object(Plugins, 'register', return_value='some_result')
def test_register(self, register_mock, print_mock):
self.exec_plugins(['--register', 'plugin_name==version'])
self.assert_print(
print_mock,
'some_result',
'Plugin plugin_name==version was successfully registered.')
register_mock.assert_called_once_with(
self.name, self.version, force=False)
@patch.object(Serializer, 'print_to_output')
@patch.object(Plugins, 'unregister', return_value='some_result')
def test_unregister(self, unregister_mock, print_mock):
self.exec_plugins(['--unregister', 'plugin_name==version'])
self.assert_print(
print_mock,
'some_result',
'Plugin plugin_name==version was successfully unregistered.')
unregister_mock.assert_called_once_with(self.name, self.version)
def test_parse_name_version(self):
plugin = PluginAction()
self.assertEqual(
'test-1.0.0',
mos.path.join.call_args[0][1])
plugin.parse_name_version('name==version'),
['name', 'version'])
@patch('fuelclient.objects.plugins.os')
@patch('fuelclient.objects.plugins.shutil')
def test_remove_plugin_multi(self, mshutil, mos, mrequests):
mos.path.exists.return_value = True
mresponse = Mock(status_code=201)
mresponse.json.return_value = [
{
'id': 1,
'name': 'test',
'version': '1.0.0',
},
{
'id': 2,
'name': 'test',
'version': '1.1.0',
}
]
mrequests.get.return_value = mresponse
mrequests.delete.return_value = Mock(status_code=200)
def test_parse_name_version_raises_error(self):
plugin = PluginAction()
self.assertRaisesRegexp(
error.ArgumentException,
'Syntax: fuel plugins <action> fuel_plugin==1.0.0',
plugin.parse_name_version, 'some_string')
self.execute(
['fuel', 'plugins', '--remove', 'test==1.0.0']
)
@patch('fuelclient.utils.file_exists', return_value=True)
def test_check_file(self, _):
plugin = PluginAction()
plugin.check_file(self.file_name)
self.assertEqual(mrequests.delete.call_count, 1)
self.assertEqual(mshutil.rmtree.call_count, 1)
@patch('fuelclient.objects.plugins.os')
def test_remove_nonexisting_plugin(self, mos, mrequests):
mos.path.exists.return_value = True
mresponse = Mock(status_code=201)
mresponse.json.return_value = [
{
'id': 1,
'name': 'test',
'version': '1.0.0',
}
]
mrequests.get.return_value = mresponse
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'plugins', '--remove', 'test-fail']
)
self.assertEqual(mrequests.delete.call_count, 0)
@patch('fuelclient.objects.plugins.os')
def test_remove_when_multiple_versions(self, mos, mrequests):
mos.path.exists.return_value = True
mresponse = Mock(status_code=201)
mresponse.json.return_value = [
{
'id': 1,
'name': 'test',
'version': '1.0.0',
},
{
'id': 2,
'name': 'test',
'version': '1.1.0',
}
]
mrequests.get.return_value = mresponse
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'plugins', '--remove', 'test']
)
self.assertEqual(mrequests.delete.call_count, 0)
@patch('fuelclient.objects.plugins.os')
def test_remove_nonexisting_plugin_version(self, mos, mrequests):
mos.path.exists.return_value = True
mresponse = Mock(status_code=201)
mresponse.json.return_value = [
{
'id': 1,
'name': 'test',
'version': '1.0.0',
}
]
mrequests.get.return_value = mresponse
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'plugins', '--remove', 'test==1.1.0']
)
self.assertEqual(mrequests.delete.call_count, 0)
@patch('fuelclient.utils.file_exists', return_value=False)
def test_check_file_raises_error(self, _):
plugin = PluginAction()
self.assertRaisesRegexp(
error.ArgumentException,
'File "/tmp/path/plugin.fp" does not exists',
plugin.check_file, self.file_name)

View File

@ -0,0 +1,407 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import MagicMock
from mock import patch
from fuelclient.cli import error
from fuelclient.objects.plugins import Plugins
from fuelclient.objects.plugins import PluginV1
from fuelclient.objects.plugins import PluginV2
from fuelclient.tests import base
@patch('fuelclient.objects.plugins.raise_error_if_not_master')
class TestPluginV1(base.UnitTestCase):
fake_meta = """
name: 'plugin_name'
version: 'version'
"""
def setUp(self):
super(TestPluginV1, self).setUp()
self.plugin = PluginV1
self.path = '/tmp/plugin/path'
self.name = 'plugin_name'
self.version = 'version'
@patch('fuelclient.objects.plugins.tarfile')
def test_install(self, tar_mock, master_only_mock):
tar_obj = MagicMock()
tar_mock.open.return_value = tar_obj
self.plugin.install(self.path)
master_only_mock.assert_called_once_with()
tar_obj.extractall.assert_called_once_with('/var/www/nailgun/plugins/')
tar_obj.close.assert_called_once_with()
@patch('fuelclient.objects.plugins.shutil.rmtree')
def test_remove(self, rmtree_mock, master_only_mock):
self.plugin.remove(self.name, self.version)
master_only_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with(
'/var/www/nailgun/plugins/plugin_name-version')
def test_update(self, _):
self.assertRaisesRegexp(
error.BadDataException,
'Update action is not supported for old plugins with '
'package version "1.0.0", you can install your plugin '
'or use newer plugin format.',
self.plugin.update, 'some_string')
def test_downgrade(self, _):
self.assertRaisesRegexp(
error.BadDataException,
'Downgrade action is not supported for old plugins with '
'package version "1.0.0", you can install your plugin '
'or use newer plugin format.',
self.plugin.downgrade, 'some_string')
def mock_tar(self, tar_mock):
tar_obj = MagicMock()
tar_mock.open.return_value = tar_obj
tar_file = MagicMock()
tar_obj.getnames.return_value = ['metadata.yaml']
tar_obj.extractfile.return_value = tar_file
tar_file.read.return_value = self.fake_meta
@patch('fuelclient.objects.plugins.tarfile')
def test_name_from_file(self, tar_mock, _):
self.mock_tar(tar_mock)
self.assertEqual(
self.plugin.name_from_file(self.path),
self.name)
@patch('fuelclient.objects.plugins.tarfile')
def test_version_from_file(self, tar_mock, _):
self.mock_tar(tar_mock)
self.assertEqual(
self.plugin.version_from_file(self.path),
self.version)
@patch('fuelclient.objects.plugins.raise_error_if_not_master')
class TestPluginV2(base.UnitTestCase):
def setUp(self):
super(TestPluginV2, self).setUp()
self.plugin = PluginV2
self.path = '/tmp/plugin/path'
self.name = 'plugin_name'
self.version = '1.2.3'
@patch('fuelclient.objects.plugins.utils.exec_cmd')
def test_install(self, exec_mock, master_only_mock):
self.plugin.install(self.path)
exec_mock.assert_called_once_with('yum -y install /tmp/plugin/path')
master_only_mock.assert_called_once_with()
@patch('fuelclient.objects.plugins.utils.exec_cmd')
def test_install_w_force(self, exec_mock, master_only_mock):
self.plugin.install(self.path, force=True)
exec_mock.assert_called_once_with('yum -y reinstall /tmp/plugin/path')
master_only_mock.assert_called_once_with()
@patch('fuelclient.objects.plugins.utils.exec_cmd')
def test_remove(self, exec_mock, master_only_mock):
self.plugin.remove(self.name, self.version)
exec_mock.assert_called_once_with('yum -y remove plugin_name-1.2')
master_only_mock.assert_called_once_with()
@patch('fuelclient.objects.plugins.utils.exec_cmd')
def test_update(self, exec_mock, master_only_mock):
self.plugin.update(self.path)
exec_mock.assert_called_once_with('yum -y update /tmp/plugin/path')
master_only_mock.assert_called_once_with()
@patch('fuelclient.objects.plugins.utils.exec_cmd')
def test_downgrade(self, exec_mock, master_only_mock):
self.plugin.downgrade(self.path)
exec_mock.assert_called_once_with('yum -y downgrade /tmp/plugin/path')
master_only_mock.assert_called_once_with()
@patch('fuelclient.objects.plugins.utils.exec_cmd_iterator',
return_value=['plugin_name-1.2'])
def test_name_from_file(self, exec_mock, _):
self.assertEqual(
self.plugin.name_from_file(self.path),
self.name)
exec_mock.assert_called_once_with(
"rpm -qp --queryformat '%{name}' /tmp/plugin/path")
@patch('fuelclient.objects.plugins.utils.exec_cmd_iterator',
return_value=['1.2.3'])
def test_version_from_file(self, exec_mock, _):
self.assertEqual(
self.plugin.version_from_file(self.path),
self.version)
exec_mock.assert_called_once_with(
"rpm -qp --queryformat '%{version}' /tmp/plugin/path")
class TestPluginsObject(base.UnitTestCase):
def setUp(self):
super(TestPluginsObject, self).setUp()
self.plugin = Plugins
self.path = '/tmp/plugin/path'
self.name = 'plugin_name'
self.version = 'version'
def mock_make_obj_by_file(self, make_obj_by_file_mock):
plugin_obj = MagicMock()
plugin_obj.name_from_file.return_value = 'retrieved_name'
plugin_obj.version_from_file.return_value = 'retrieved_version'
make_obj_by_file_mock.return_value = plugin_obj
return plugin_obj
@patch('fuelclient.utils.glob_and_parse_yaml',
return_value=[
{'name': 'name1', 'version': 'version1'},
{'name': 'name2', 'version': 'version2'},
{'name': 'name3', 'version': 'version3'}])
@patch.object(Plugins, 'update_or_create')
def test_register(self, up_or_create_mock, glob_parse_mock):
self.plugin.register('name3', 'version3')
glob_parse_mock.assert_called_once_with(
'/var/www/nailgun/plugins/*/metadata.yaml')
up_or_create_mock.assert_called_once_with(
{'name': 'name3', 'version': 'version3'},
force=False)
@patch('fuelclient.utils.glob_and_parse_yaml', return_value=[])
def test_register_raises_error(self, glob_parse_mock):
self.assertRaisesRegexp(
error.BadDataException,
'Plugin name3 with version version3 does '
'not exist, install it and try again',
self.plugin.register, 'name3', 'version3')
glob_parse_mock.assert_called_once_with(
'/var/www/nailgun/plugins/*/metadata.yaml')
@patch.object(Plugins, 'get_plugin', return_value={'id': 123})
@patch.object(Plugins.connection, 'delete_request')
def test_unregister(self, del_mock, get_mock):
self.plugin.unregister(self.name, self.version)
get_mock.assert_called_once_with(self.name, self.version)
del_mock.assert_called_once_with('plugins/123')
@patch.object(Plugins, 'register')
@patch.object(Plugins, 'make_obj_by_file')
def test_install(self, make_obj_by_file_mock, register_mock):
plugin_obj = self.mock_make_obj_by_file(make_obj_by_file_mock)
self.plugin.install(self.path)
plugin_obj.install.assert_called_once_with(self.path, force=False)
register_mock.assert_called_once_with(
'retrieved_name', 'retrieved_version', force=False)
@patch.object(Plugins, 'unregister')
@patch.object(Plugins, 'make_obj_by_name')
def test_remove(self, make_obj_by_name_mock, unregister_mock):
plugin_obj = MagicMock()
make_obj_by_name_mock.return_value = plugin_obj
self.plugin.remove(self.name, self.version)
plugin_obj.remove.assert_called_once_with(self.name, self.version)
unregister_mock.assert_called_once_with(self.name, self.version)
@patch('fuelclient.utils.glob_and_parse_yaml',
return_value=['meta1', 'meta2'])
@patch.object(Plugins, 'update_or_create')
def test_sync(self, up_or_create_mock, glob_and_parse_mock):
self.plugin.sync()
glob_and_parse_mock.assert_called_once_with(
'/var/www/nailgun/plugins/*/metadata.yaml')
self.assertEqual(
up_or_create_mock.call_args_list,
[mock.call('meta1', force=True),
mock.call('meta2', force=True)])
@patch.object(Plugins, 'register')
@patch.object(Plugins, 'make_obj_by_file')
def test_update(self, make_obj_by_file_mock, register_mock):
plugin_obj = self.mock_make_obj_by_file(make_obj_by_file_mock)
self.plugin.update(self.path)
plugin_obj.update.assert_called_once_with(self.path)
register_mock.assert_called_once_with(
'retrieved_name', 'retrieved_version')
@patch.object(Plugins, 'register')
@patch.object(Plugins, 'make_obj_by_file')
def test_downgrade(self, make_obj_by_file_mock, register_mock):
plugin_obj = self.mock_make_obj_by_file(make_obj_by_file_mock)
self.plugin.downgrade(self.path)
plugin_obj.downgrade.assert_called_once_with(self.path)
register_mock.assert_called_once_with(
'retrieved_name', 'retrieved_version')
@patch.object(Plugins, 'get_plugin')
def test_make_obj_by_name_v1(self, get_mock):
plugins = [{'package_version': '1.0.0'},
{'package_version': '1.0.1'},
{'package_version': '1.99.99'}]
for plugin in plugins:
get_mock.return_value = plugin
self.assertEqual(
self.plugin.make_obj_by_name(self.name, self.version),
PluginV1)
@patch.object(Plugins, 'get_plugin')
def test_make_obj_by_name_v2(self, get_mock):
plugins = [{'package_version': '2.0.0'},
{'package_version': '2.0.1'},
{'package_version': '3.0.0'}]
for plugin in plugins:
get_mock.return_value = plugin
self.assertEqual(
self.plugin.make_obj_by_name(self.name, self.version),
PluginV2)
@patch.object(Plugins, 'get_plugin')
def test_make_obj_by_name_v2_raises_error(self, get_mock):
get_mock.return_value = {'package_version': '0.0.1'}
self.assertRaisesRegexp(
error.BadDataException,
'Plugin plugin_name==version has '
'unsupported package version 0.0.1',
self.plugin.make_obj_by_name, self.name, self.version)
def test_make_obj_by_file_v1(self):
self.assertEqual(
self.plugin.make_obj_by_file('file-name-1.2.3.fp'),
PluginV1)
def test_make_obj_by_file_v2(self):
self.assertEqual(
self.plugin.make_obj_by_file('file-name-1.2-1.2.3-0.noarch.rpm'),
PluginV2)
def test_make_obj_by_file_raises_error(self):
self.assertRaisesRegexp(
error.BadDataException,
'Plugin file-name.ext has unsupported format .ext',
self.plugin.make_obj_by_file, 'file-name.ext')
@patch.object(Plugins, 'get_plugin_for_update', return_value={'id': 99})
@patch.object(Plugins.connection, 'put_request', return_value={'id': 99})
def test_update_or_create_updates(self, put_mock, get_for_update_mock):
meta = {'id': 99, 'version': '1.0.0', 'package_version': '2.0.0'}
self.plugin.update_or_create(meta)
put_mock.assert_called_once_with('plugins/99', meta)
@patch.object(Plugins, 'get_plugin_for_update', return_value=None)
@patch.object(Plugins.connection, 'post_request_raw',
return_value=MagicMock(status_code=201))
@patch.object(Plugins.connection, 'put_request')
def test_update_or_create_creates(
self, put_mock, post_mock, get_for_update_mock):
meta = {'id': 99, 'version': '1.0.0', 'package_version': '2.0.0'}
self.plugin.update_or_create(meta)
post_mock.assert_called_once_with('plugins/', meta)
get_for_update_mock.assert_called_once_with(meta)
self.assertFalse(put_mock.called)
@patch.object(Plugins, 'get_plugin_for_update', return_value=None)
@patch.object(Plugins.connection, 'post_request_raw',
return_value=MagicMock(
status_code=409,
**{'json.return_value': {'id': 99}}))
@patch.object(Plugins.connection, 'put_request')
def test_update_or_create_updates_with_force(
self, put_mock, post_mock, get_for_update_mock):
meta = {'id': 99, 'version': '1.0.0', 'package_version': '2.0.0'}
self.plugin.update_or_create(meta, force=True)
post_mock.assert_called_once_with('plugins/', meta)
get_for_update_mock.assert_called_once_with(meta)
put_mock.assert_called_once_with('plugins/99', meta)
@patch.object(Plugins, 'get_all_data')
def test_get_plugin_for_update(self, get_mock):
plugin_to_be_found = {'name': 'name', 'version': '2.2.0',
'package_version': '2.0.0'}
get_mock.return_value = [
# Different major version
{'name': 'name', 'version': '2.3.0',
'package_version': '2.0.0'},
{'name': 'name', 'version': '2.1.0',
'package_version': '2.0.0'},
# Different name
{'name': 'different_name', 'version': '2.2.99',
'package_version': '2.0.0'},
# Package version is not updatable
{'name': 'name', 'version': '2.2.100',
'package_version': '1.0.0'},
plugin_to_be_found]
self.assertEqual(
self.plugin.get_plugin_for_update(
{'name': 'name',
'version': '2.2.99',
'package_version': '2.0.0'}),
plugin_to_be_found)
# Required plugin has not updatable package version
self.assertIsNone(self.plugin.get_plugin_for_update(
{'name': 'name', 'version': '2.2.99', 'package_version': '1.0.0'}))
# Plugin does not exist
self.assertIsNone(self.plugin.get_plugin_for_update(
{'name': 'name2', 'version': '2.2.9', 'package_version': '2.0.0'}))
def test_is_updatable(self):
for updatable in ['2.0.0', '2.0.1', '99.99.99']:
self.assertTrue(self.plugin.is_updatable(updatable))
for is_not_updatable in ['0.0.1', '1.0.0', '1.99.99']:
self.assertFalse(self.plugin.is_updatable(is_not_updatable))
@patch.object(Plugins, 'get_all_data',
return_value=[{'name': 'name1', 'version': '1.0.0'},
{'name': 'name2', 'version': '1.0.1'},
{'name': 'name2', 'version': '1.0.0'}])
def test_get_plugin(self, get_mock):
self.assertEqual(self.plugin.get_plugin('name2', '1.0.0'),
{'name': 'name2', 'version': '1.0.0'})
get_mock.assert_called_once_with()

View File

@ -15,16 +15,18 @@
# under the License.
import os
import subprocess
import mock
from fuelclient.cli import utils
from fuelclient.cli import error
from fuelclient.tests import base
from fuelclient import utils
class TestUtils(base.UnitTestCase):
@mock.patch('fuelclient.cli.utils.os.walk')
@mock.patch('fuelclient.utils.os.walk')
def test_iterfiles(self, mwalk):
mwalk.return_value = [
('/some_directory/', [], ['valid.yaml', 'invalid.yml'])]
@ -37,3 +39,121 @@ class TestUtils(base.UnitTestCase):
mwalk.assert_called_once_with(directory)
self.assertEqual(expected_result, files)
def make_process_mock(self, return_code=0):
process_mock = mock.Mock()
process_mock.stdout = ['Stdout line 1', 'Stdout line 2']
process_mock.returncode = return_code
return process_mock
def test_exec_cmd(self):
cmd = 'some command'
process_mock = self.make_process_mock()
with mock.patch.object(
subprocess, 'Popen', return_value=process_mock) as popen_mock:
utils.exec_cmd(cmd)
popen_mock.assert_called_once_with(
cmd,
stdout=None,
stderr=subprocess.STDOUT,
shell=True,
cwd=None)
def test_exec_cmd_raises_error(self):
cmd = 'some command'
return_code = 1
process_mock = self.make_process_mock(return_code=return_code)
with mock.patch.object(
subprocess, 'Popen', return_value=process_mock) as popen_mock:
self.assertRaisesRegexp(
error.ExecutedErrorNonZeroExitCode,
'Shell command executed with "{0}" '
'exit code: {1} '.format(return_code, cmd),
utils.exec_cmd, cmd)
popen_mock.assert_called_once_with(
cmd,
stdout=None,
stderr=subprocess.STDOUT,
shell=True,
cwd=None)
def test_exec_cmd_iterator(self):
cmd = 'some command'
process_mock = self.make_process_mock()
with mock.patch.object(
subprocess, 'Popen', return_value=process_mock) as popen_mock:
for line in utils.exec_cmd_iterator(cmd):
self.assertTrue(line.startswith('Stdout line '))
popen_mock.assert_called_once_with(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
def test_exec_cmd_iterator_raises_error(self):
cmd = 'some command'
return_code = 1
process_mock = self.make_process_mock(return_code=return_code)
with mock.patch.object(subprocess, 'Popen', return_value=process_mock):
with self.assertRaisesRegexp(
error.ExecutedErrorNonZeroExitCode,
'Shell command executed with "{0}" '
'exit code: {1} '.format(return_code, cmd)):
for line in utils.exec_cmd_iterator(cmd):
self.assertTrue(line.startswith('Stdout line '))
def test_parse_yaml_file(self):
mock_open = self.mock_open("key: value")
with mock.patch('fuelclient.utils.io.open', mock_open):
self.assertEqual(
utils.parse_yaml_file('some_file_name'),
{'key': 'value'})
@mock.patch('fuelclient.utils.glob.iglob',
return_value=['file1', 'file2'])
@mock.patch('fuelclient.utils.parse_yaml_file',
side_effect=['content_file1', 'content_file2'])
def test_glob_and_parse_yaml(self, parse_mock, iglob_mock):
path = '/tmp/path/mask*'
content = []
for data in utils.glob_and_parse_yaml(path):
content.append(data)
iglob_mock.assert_called_once_with(path)
self.assertEqual(
parse_mock.call_args_list,
[mock.call('file1'),
mock.call('file2')])
self.assertEqual(content, ['content_file1', 'content_file2'])
def test_major_plugin_version(self):
pairs = [
['1.2.3', '1.2'],
['123456789.123456789.12121', '123456789.123456789'],
['1.2', '1.2']]
for arg, expected in pairs:
self.assertEqual(
utils.major_plugin_version(arg),
expected)
@mock.patch('fuelclient.utils.os.path.lexists', side_effect=[True, False])
def test_file_exists(self, lexists_mock):
self.assertTrue(utils.file_exists('file1'))
self.assertFalse(utils.file_exists('file2'))
self.assertEqual(
lexists_mock.call_args_list,
[mock.call('file1'), mock.call('file2')])

131
fuelclient/utils.py Normal file
View File

@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import glob
import io
import os
import subprocess
import yaml
from distutils.version import StrictVersion
from fnmatch import fnmatch
from fuelclient.cli import error
def _wait_and_check_exit_code(cmd, child):
"""Wait for child and check it's exit code
:param cmd: command
:param child: object which returned by subprocess.Popen
:raises: ExecutedErrorNonZeroExitCode
"""
child.wait()
exit_code = child.returncode
if exit_code != 0:
raise error.ExecutedErrorNonZeroExitCode(
u'Shell command executed with "{0}" '
'exit code: {1} '.format(exit_code, cmd))
def exec_cmd(cmd, cwd=None):
"""Execute shell command logging.
:param str cmd: shell command
:param str cwd: None is default
"""
child = subprocess.Popen(
cmd, stdout=None,
stderr=subprocess.STDOUT,
shell=True,
cwd=cwd)
_wait_and_check_exit_code(cmd, child)
def exec_cmd_iterator(cmd):
"""Execute command with logging.
:param cmd: shell command
:returns: generator where yeach item
is line from stdout
"""
child = subprocess.Popen(
cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
for line in child.stdout:
yield line
_wait_and_check_exit_code(cmd, child)
def parse_yaml_file(path):
"""Parses yaml
:param str path: path to yaml file
:returns: deserialized file
"""
with io.open(path, encoding='utf-8') as f:
data = yaml.load(f)
return data
def glob_and_parse_yaml(path):
"""Parses yaml files by mask.
:param str path: mask
:returns: iterator
"""
for f in glob.iglob(path):
yield parse_yaml_file(f)
def major_plugin_version(version):
"""Retrieves major version.
"1.2.3" -> "1.2"
:param str version: version
:returns: only major version
"""
version_tuple = StrictVersion(version).version
major = '.'.join(map(str, version_tuple[:2]))
return major
def iterfiles(dir_path, file_pattern):
"""Returns generator where each item is a path to file, that satisfies
file_patterns condtion
:param dir_path: path to directory, e.g /etc/puppet/
:param file_pattern: unix filepattern to match files
"""
for root, dirs, file_names in os.walk(dir_path):
for file_name in file_names:
if fnmatch(file_name, file_pattern):
yield os.path.join(root, file_name)
def file_exists(path):
"""Checks if file exists
:param str path: path to the file
:returns: True if file is exist, Flase if is not
"""
return os.path.lexists(path)