Merge "Added adaptation of legacy tasks for task based deployment"

This commit is contained in:
Jenkins 2016-04-22 18:28:47 +00:00 committed by Gerrit Code Review
commit 1ce23a0f22
13 changed files with 574 additions and 7 deletions

View File

@ -1049,17 +1049,23 @@
type: "checkbox"
auth_key:
value: ""
label: "Public Key"
# label: "Public Key"
# description: "Public key(s) to include in authorized_keys on deployed nodes"
group: "security"
description: "Public key(s) to include in authorized_keys on deployed nodes"
weight: 70
type: "hidden"
task_deploy:
value: true
label: "Enable task based deploy"
description: "The new deployment engine based on cross-node dependencies for deployment tasks."
# label: "Enable task based deploy"
# description: "The new deployment engine based on cross-node dependencies for deployment tasks."
weight: 11
type: "hidden"
propagate_task_deploy:
value: false
# label: "Propagate task based deployment."
# description: "Enables adaptation of granular tasks for task deployment."
weight: 12
type: "hidden"
public_network_assignment:
metadata:

View File

@ -80,7 +80,9 @@ class TransactionSerializer(object):
version = StrictVersion(task.get('version', '0.0.0'))
if version < cls.min_supported_task_version:
message = (
"Task '{0}' does not support cross-dependencies."
"Task '{0}' does not support cross-dependencies.\n"
"You can enable option 'propagate_task_deploy'"
"for cluster to use task adaptation mechanism."
.format(task['id'])
)
logger.warning(message)

View File

@ -1118,6 +1118,11 @@ class Cluster(NailgunObject):
cluster_deployment_tasks
])
@classmethod
def get_legacy_plugin_tasks(cls, instance):
"""Get legacy deployment tasks from tasks.yaml."""
return PluginManager.get_legacy_tasks_for_cluster(instance)
@classmethod
def get_refreshable_tasks(cls, instance, filter_by_configs=None):
"""Return list of refreshable tasks
@ -1494,9 +1499,19 @@ class Cluster(NailgunObject):
:param instance: cluster for checking
:type instance: nailgun.db.sqlalchemy.models.Cluster instance
"""
attrs = cls.get_editable_attributes(instance, False)
attrs = cls.get_editable_attributes(instance)
return attrs['common'].get('task_deploy', {}).get('value')
@classmethod
def is_propagate_task_deploy_enabled(cls, instance):
"""Tests that task based deployment propagation enabled.
:param instance: cluster for checking
:type instance: nailgun.db.sqlalchemy.models.Cluster instance
"""
attrs = cls.get_editable_attributes(instance)
return attrs['common'].get('propagate_task_deploy', {}).get('value')
# FIXME(aroma): remove updating of 'deployed_before'
# when stop action is reworked. 'deployed_before'
# flag identifies whether stop action is allowed for the

View File

@ -143,6 +143,13 @@ class PluginAdapterBase(object):
else:
self._tasks = self._load_tasks()
slave_path = self.slaves_scripts_path
for task in self._tasks:
task['roles'] = task['role']
parameters = task.get('parameters')
if parameters is not None:
parameters.setdefault('cwd', slave_path)
return self._tasks
@property

View File

@ -370,3 +370,15 @@ class PluginManager(object):
if cluster_components & plugin_components:
ClusterPlugins.set_attributes(
cluster.id, plugin.id, enabled=True)
@classmethod
def get_legacy_tasks_for_cluster(cls, cluster):
"""Gets the tasks from tasks.yaml for all plugins.
:param cluster: the cluster object
:return: all tasks from tasks.yaml
"""
tasks = []
for plugin in cls.get_enabled_plugins(cluster):
tasks.extend(plugin.tasks)
return tasks

View File

@ -52,6 +52,8 @@ class InstallationInfo(object):
'resume_guests_state_on_host_boot', None),
WhiteListRule(('common', 'task_deploy', 'value'),
'task_deploy', None),
WhiteListRule(('common', 'propagate_task_deploy', 'value'),
'propagate_task_deploy', None),
WhiteListRule(('corosync', 'verified', 'value'),
'corosync_verified', None),

View File

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from distutils.version import StrictVersion
import itertools
from nailgun import consts
from nailgun.logger import logger
from nailgun.orchestrator.orchestrator_graph import GraphSolver
TASK_START_TEMPLATE = '{0}_start'
TASK_END_TEMPLATE = '{0}_end'
def _get_role(task):
return task.get('roles', task.get('groups'))
def _get_task_stage(task):
return task['stage'].split('/')[0]
def _get_task_stage_and_priority(task):
stage_list = task['stage'].split('/')
stage = stage_list[0]
priority = stage_list[-1] if len(stage_list) > 1 else 0
try:
priority = float(priority)
except ValueError:
logger.warn(
'Task %s has non numeric priority "%s", set to 0',
task, priority)
priority = 0
return stage, priority
def _join_groups(groups):
for group in groups.values():
for req in group.get('requires', ()):
if req in groups:
group['cross_depends'].append({
'name': TASK_END_TEMPLATE.format(req),
'role': _get_role(groups[req])
})
for req in group.get('required_for', ()):
if req in groups:
groups[req]['cross_depends'].append({
'name': TASK_END_TEMPLATE.format(group['id']),
'role': _get_role(group)
})
def _get_group_start(group):
return {
'id': TASK_START_TEMPLATE.format(group['id']),
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': _get_role(group),
'cross_depends': group['cross_depends'],
'cross_depended_by': [{
'name': TASK_END_TEMPLATE.format(group['id']), 'role': 'self'
}],
}
def _get_group_end(group):
return {
'id': TASK_END_TEMPLATE.format(group['id']),
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': _get_role(group)
}
def _join_task_to_group(task, groups):
task['version'] = consts.TASK_CROSS_DEPENDENCY
# add only depends to start, because depends to end already added
task['cross_depends'] = [
{'name': TASK_START_TEMPLATE.format(g), 'role': 'self'} for g in groups
]
return task
def adapt_legacy_tasks(deployment_tasks, legacy_plugin_tasks, role_resolver):
"""Adapt the legacy tasks to execute with Task Based Engine.
:param deployment_tasks: the list of deployment tasks
:param legacy_plugin_tasks: the pre/post tasks from tasks.yaml
:param role_resolver: the RoleResolver instance
"""
min_task_version = StrictVersion(consts.TASK_CROSS_DEPENDENCY)
groups = {}
sync_points = GraphSolver()
legacy_tasks = []
for task in deployment_tasks:
task_type = task.get('type')
task_version = StrictVersion(task.get('version', '0.0.0'))
if task_type == consts.ORCHESTRATOR_TASK_TYPES.group:
groups[task['id']] = dict(task, cross_depends=[])
elif task_type == consts.ORCHESTRATOR_TASK_TYPES.stage:
sync_points.add_task(task)
else:
task = task.copy()
required_for = copy.copy(task.get('required_for', []))
required_for.extend(
TASK_END_TEMPLATE.format(x)
for x in role_resolver.get_all_roles(_get_role(task))
)
task['required_for'] = required_for
if task_version < min_task_version:
legacy_tasks.append(task)
continue
yield task
if not (legacy_tasks or legacy_plugin_tasks):
return
_join_groups(groups)
# make bubbles from each group
for group in groups.values():
yield _get_group_start(group)
yield _get_group_end(group)
# put legacy tasks into bubble
for task in legacy_tasks:
logger.warning("Added cross_depends for legacy task: %s", task['id'])
yield _join_task_to_group(
task, role_resolver.get_all_roles(_get_role(task))
)
if not legacy_plugin_tasks:
return
# process tasks from stages
legacy_plugin_tasks.sort(key=_get_task_stage_and_priority)
tasks_per_stage = itertools.groupby(
legacy_plugin_tasks, key=_get_task_stage
)
for stage, tasks in tasks_per_stage:
sync_point_name = TASK_END_TEMPLATE.format(stage)
cross_depends = [{'name': sync_point_name, 'role': None}]
successors = sync_points.successors(sync_point_name)
if successors:
logger.debug(
'The next stage is found for %s: %s',
sync_point_name, successors[0]
)
cross_depended_by = [{'name': successors[0], 'role': None}]
else:
logger.debug(
'The next stage is not found for %s.', sync_point_name
)
cross_depended_by = []
for idx, task in enumerate(tasks):
new_task = {
'id': '{0}_{1}'.format(stage, idx),
'type': task['type'],
'roles': _get_role(task),
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': cross_depends,
'cross_depended_by': cross_depended_by,
'parameters': task.get('parameters', {}),
'condition': task.get('condition', True)
}
cross_depends = [
{'name': new_task['id'], 'role': new_task['roles']}
]
yield new_task

View File

@ -28,7 +28,6 @@ from sqlalchemy import not_
from sqlalchemy.orm import ColumnProperty
from sqlalchemy.orm import object_mapper
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy.models import CapacityLog
@ -53,6 +52,7 @@ import nailgun.rpc as rpc
from nailgun.settings import settings
from nailgun.task.fake import FAKE_THREADS
from nailgun.task.helpers import TaskHelper
from nailgun.task.legacy_tasks_adapter import adapt_legacy_tasks
from nailgun.utils import logs as logs_utils
from nailgun.utils.restrictions import VmwareAttributesRestriction
from nailgun.utils.role_resolver import RoleResolver
@ -414,6 +414,16 @@ class ClusterTransaction(DeploymentTask):
tasks = cls.mark_skipped(tasks, selected_task_ids)
role_resolver = RoleResolver(nodes)
cluster = transaction.cluster
if objects.Cluster.is_propagate_task_deploy_enabled(cluster):
logger.info("The legacy tasks adaptation is used.")
tasks = adapt_legacy_tasks(
tasks,
objects.Cluster.get_legacy_plugin_tasks(cluster),
role_resolver,
)
directory, graph = lcm.TransactionSerializer.serialize(
context,
tasks,

View File

@ -206,6 +206,40 @@ class TestTaskManagers(BaseIntegrationTest):
[x['id'] for x in tasks_graph[cluster.nodes[1].uid]]
)
@fake_tasks()
@mock.patch('nailgun.lcm.transaction_serializer.settings',
LCM_CHECK_TASK_VERSION=True)
@mock.patch('objects.Cluster.get_deployment_tasks')
@mock.patch('objects.Cluster.is_propagate_task_deploy_enabled')
def test_adaptation_legacy_tasks(self, propagate_mock, tasks_mock, _):
tasks_mock.return_value = [
{
'id': 'task', 'parameters': {}, 'type': 'puppet',
'roles': ['controller'], 'version': '1.0.0',
},
{
'id': 'controller', 'type': 'group', 'roles': ['controller']
}
]
self.env.create(
nodes_kwargs=[
{"pending_addition": True, "pending_roles": ['controller']},
{"pending_addition": True, "pending_roles": ['controller']},
],
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'liberty-9.0',
}
)
cluster = self.env.clusters[-1]
propagate_mock.return_value = False
supertask = self.env.launch_deployment(cluster.id)
self.assertEqual(TASK_STATUSES.error, supertask.status)
self.assertIn("Task 'task'", supertask.message)
propagate_mock.return_value = True
supertask = self.env.launch_deployment(cluster.id)
self.assertEqual(TASK_STATUSES.ready, supertask.status)
@fake_tasks(fake_rpc=False, mock_rpc=True)
def test_write_action_logs(self, _):
self.env.create(

View File

@ -0,0 +1,233 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nailgun import consts
from nailgun.task.legacy_tasks_adapter import adapt_legacy_tasks
from nailgun.test.base import BaseUnitTest
class TestLegacyTasksAdapter(BaseUnitTest):
@classmethod
def setUpClass(cls):
super(TestLegacyTasksAdapter, cls).setUpClass()
cls.role_resolver = mock.MagicMock()
cls.role_resolver.get_all_roles.side_effect = lambda x: set(x)
def test_returns_same_task_if_no_legacy(self):
tasks = [
{'id': 'test1', 'version': '2.0.0', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'required_for': []},
{'id': 'group1', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
{'id': 'stage1', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
new_tasks = list(adapt_legacy_tasks(tasks, None, self.role_resolver))
self.datadiff(tasks, new_tasks, ignore_keys='required_for')
self.assertEqual([], tasks[0]['required_for'])
self.assertEqual(
['group1_end'], new_tasks[0]['required_for']
)
def test_legacy_deployment_task_adaptation(self):
tasks = [
{'id': 'task1', 'version': '2.0.0', 'roles': 'group1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'task2', 'roles': ['group2'],
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'group1', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'requires': ['stage1'], 'required_for': ['group2']},
{'id': 'group3', 'roles': ['group3'], 'requires': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group, },
{'id': 'group2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'roles': ['group2'], 'required_for': ['stage2']},
{'id': 'stage1', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
self.role_resolver.get_all_roles.side_effect = lambda x: set(x)
new_tasks = list(adapt_legacy_tasks(tasks, [], self.role_resolver))
self.assertEqual(
{
'id': 'group1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group1'],
'cross_depends': [],
'cross_depended_by': [{'name': 'group1_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group1_start')
)
self.assertEqual(
{
'id': 'group1_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group1']
},
next(x for x in new_tasks if x['id'] == 'group1_end')
)
self.assertEqual(
{
'id': 'group2_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group2'],
'cross_depends': [{'name': 'group1_end', 'role': ['group1']}],
'cross_depended_by': [{'name': 'group2_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group2_start')
)
self.assertEqual(
{
'id': 'group2_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group2']
},
next(x for x in new_tasks if x['id'] == 'group2_end')
)
self.assertEqual(
{
'id': 'group3_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group3'],
'cross_depends': [{'name': 'group1_end', 'role': ['group1']}],
'cross_depended_by': [{'name': 'group3_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group3_start')
)
self.assertEqual(
{
'id': 'group3_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group3']
},
next(x for x in new_tasks if x['id'] == 'group3_end')
)
self.assertEqual(
{
'id': 'task2',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'version': '2.0.0',
'roles': ['group2'],
'required_for': ['group2_end'],
'cross_depends': [{'name': 'group2_start', 'role': 'self'}],
},
next(x for x in new_tasks if x['id'] == 'task2')
)
def test_legacy_plugin_tasks_adaptation(self):
tasks = [
{'id': 'task1', 'version': '2.0.0', 'roles': 'group1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'group1', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'requires': ['stage1'], 'required_for': ['stage2']},
{'id': 'stage1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage1_end', 'requires': 'stage1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2_start', 'requires': ['stage1_end'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2_end', 'requires': ['stage2_start'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage3_start', 'requires': ['stage2_end'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage3_end', 'requires': ['stage3_start'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
legacy_plugin_tasks = [
{
'roles': '*',
'stage': 'stage1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 0}
},
{
'roles': '*',
'stage': 'stage1/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 2}
},
{
'roles': '*',
'stage': 'stage1/10',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 1}
},
{
'roles': '*',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 1}
},
{
'roles': 'group1',
'stage': 'stage3',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 0}
}
]
new_tasks = list(adapt_legacy_tasks(
tasks, legacy_plugin_tasks, self.role_resolver
))
stage1_tasks = new_tasks[-5:-2]
depends = [{'role': None, 'name': 'stage1_end'}]
depended_by = [{'role': None, 'name': 'stage2_start'}]
for idx, task in enumerate(stage1_tasks):
self.assertEqual(
{
'id': 'stage1_{0}'.format(idx),
'type': legacy_plugin_tasks[idx]['type'],
'roles': legacy_plugin_tasks[idx]['roles'],
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': depends,
'cross_depended_by': depended_by,
'condition': True,
'parameters': {'number': idx}
},
task
)
depends = [{'role': task['roles'], 'name': task['id']}]
stage3_tasks = new_tasks[-2:]
depends = [{'role': None, 'name': 'stage3_end'}]
depended_by = []
for idx, task in enumerate(stage3_tasks):
self.assertEqual(
{
'id': 'stage3_{0}'.format(idx),
'type': legacy_plugin_tasks[3 + idx]['type'],
'roles': legacy_plugin_tasks[3 + idx]['roles'],
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': depends,
'cross_depended_by': depended_by,
'condition': True,
'parameters': {'number': idx}
},
task
)
depends = [{'role': task['roles'], 'name': task['id']}]

View File

@ -192,6 +192,26 @@ class TestPluginBase(base.BaseTestCase):
def _find_path(self, config_name):
return '{0}.yaml'.format(config_name)
def test_plugin_adapter_get_tasks(self):
self.plugin.tasks = [
{
'role': '*',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {}
},
{
'role': 'controller',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.shell,
}
]
tasks = self.plugin_adapter.get_tasks()
for task in tasks:
self.assertEqual(task['role'], task['roles'])
if 'parameters' in task:
self.assertIn('cwd', task['parameters'])
class TestPluginV1(TestPluginBase):

View File

@ -86,6 +86,22 @@ class TestPatternBasedRoleResolver(BaseUnitTest):
self.assertEqual(1, len(any_node))
self.assertTrue(any_node.issubset(all_nodes))
def test_get_all_roles(self):
resolver = role_resolver.RoleResolver(self.nodes)
all_roles = {r for roles in self.roles_of_nodes for r in roles}
self.assertEqual(all_roles, resolver.get_all_roles())
self.assertEqual(all_roles, resolver.get_all_roles(
consts.TASK_ROLES.all
))
self.assertEqual(
{'controller', 'primary-controller'},
resolver.get_all_roles("/.*controller/")
)
self.assertEqual(
{'compute', "cinder"},
resolver.get_all_roles(["compute", "cinder", "cinder2"])
)
class TestNullResolver(BaseUnitTest):
def test_resolve(self):

View File

@ -43,6 +43,14 @@ class BaseRoleResolver(object):
:return: the unique set of nodes
"""
@abc.abstractmethod
def get_all_roles(self, pattern=None):
"""Gets all roles by pattern if pattern is specified.
:param pattern: option pattern to match role
:return: the all roles that forth pattern
"""
class NullResolver(BaseRoleResolver):
"""The implementation of RoleResolver
@ -55,6 +63,9 @@ class NullResolver(BaseRoleResolver):
def resolve(self, roles, policy=None):
return self.nodes_ids
def get_all_roles(self, pattern=None):
return []
class RoleResolver(BaseRoleResolver):
"""The general role resolver.
@ -119,3 +130,17 @@ class RoleResolver(BaseRoleResolver):
roles, policy, result
)
return result
def get_all_roles(self, pattern=None):
if pattern is None or pattern == consts.TASK_ROLES.all:
return set(self.__mapping)
if isinstance(pattern, six.string_types):
pattern = [pattern]
result = set()
if isinstance(pattern, (list, tuple, set)):
for p in pattern:
p = NameMatchingPolicy.create(p)
result.update(r for r in self.__mapping if p.match(r))
return result