Merge "Rerun network setup on deployed nodes"

This commit is contained in:
Jenkins 2015-12-02 16:26:53 +00:00 committed by Gerrit Code Review
commit 6111c69264
9 changed files with 228 additions and 23 deletions

View File

@ -366,6 +366,10 @@ INTERNAL_TASKS = (ORCHESTRATOR_TASK_TYPES.group,
ORCHESTRATOR_TASK_TYPES.stage,
ORCHESTRATOR_TASK_TYPES.skipped)
# filter for deployment tasks which should be rerun on deployed nodes to make
# re-setup of network on nodes
TASKS_TO_RERUN_ON_DEPLOY_CHANGES = ['deploy_changes']
TASK_REFRESH_FIELD = 'refresh_on'
ROLE_NAME_MAX_SIZE = 64

View File

@ -114,9 +114,12 @@
- id: netconfig
type: puppet
groups: [compute, virt]
groups: [primary-controller, controller,
cinder, compute, ceph-osd,
primary-mongo, mongo, ironic]
required_for: [deploy_end]
requires: [deploy_start]
reexecute_on: [deploy_changes]
parameters:
puppet_manifest: /etc/puppet/modules/osnailyfacter/modular/netconfig/netconfig.pp
puppet_modules: /etc/puppet/modules
@ -168,3 +171,16 @@
parameters:
strategy:
type: parallel
- id: globals
type: puppet
groups: [primary-controller, controller,
cinder, compute, ceph-osd,
primary-mongo, mongo, ironic]
required_for: [deploy_end]
requires: [deploy_start]
reexecute_on: [deploy_changes]
parameters:
puppet_manifest: /etc/puppet/modules/osnailyfacter/modular/globals/globals.pp
puppet_modules: /etc/puppet/modules
timeout: 3600

View File

@ -805,6 +805,19 @@ class Cluster(NailgunObject):
return nodes
@classmethod
def get_nodes_by_status(cls, instance, status):
"""Get cluster nodes with particular status
:param instance: cluster instance
:param status: node status
:return: filtered query on nodes
"""
return db().query(models.Node).filter_by(
cluster_id=instance.id,
status=status
)
@classmethod
def get_primary_node(cls, instance, role_name):
"""Get primary node for role_name

View File

@ -201,6 +201,24 @@ class DeploymentGraph(nx.DiGraph):
else:
task['skipped'] = False
def reexecutable_tasks(self, task_filter):
"""Keep only reexecutable tasks which match the filter.
Filter is the list of values. If task has reexecute_on key and its
value matches the value from filter then task is not skipped.
:param task_filter: filter (list)
"""
if not task_filter:
return
task_filter = set(task_filter)
for task in six.itervalues(self.node):
reexecute_on = task.get('reexecute_on')
if reexecute_on is not None and task_filter.issubset(reexecute_on):
task['skipped'] = False
else:
self.make_skipped_task(task)
def find_subgraph(self, start=None, end=None):
"""Find subgraph by provided start and end endpoints
@ -253,6 +271,9 @@ class AstuteGraph(object):
def only_tasks(self, task_ids):
self.graph.only_tasks(task_ids)
def reexecutable_tasks(self, task_filter):
self.graph.reexecutable_tasks(task_filter)
def group_nodes_by_roles(self, nodes):
"""Group nodes by roles

View File

@ -18,7 +18,6 @@ import copy
from distutils.version import StrictVersion
import traceback
from oslo_serialization import jsonutils
from nailgun.objects.serializers.network_configuration \
@ -246,6 +245,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
"""
nodes_to_delete = []
nodes_to_resetup = []
if nodes_to_provision_deploy:
nodes_to_deploy = objects.NodeCollection.get_by_ids(
@ -266,15 +266,25 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
# Run validation if user didn't redefine
# provisioning and deployment information
if not(nodes_to_provision_deploy) and \
(not objects.Cluster.get_provisioning_info(self.cluster) and
not objects.Cluster.get_deployment_info(self.cluster)):
if not (nodes_to_provision_deploy or
objects.Cluster.get_provisioning_info(self.cluster) or
objects.Cluster.get_deployment_info(self.cluster)):
try:
self.check_before_deployment(supertask)
except errors.CheckBeforeDeploymentError:
db().commit()
return
if self.cluster.status == consts.CLUSTER_STATUSES.operational:
# rerun particular tasks on all deployed nodes
affected_nodes = set(
nodes_to_deploy + nodes_to_provision + nodes_to_delete)
ready_nodes = set(objects.Cluster.get_nodes_by_status(
self.cluster,
consts.NODE_STATUSES.ready
))
nodes_to_resetup = ready_nodes.difference(affected_nodes)
task_deletion, task_provision, task_deployment = None, None, None
if nodes_to_delete:
@ -316,6 +326,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().commit()
task_messages.append(provision_message)
deployment_message = None
if nodes_to_deploy:
logger.debug("There are nodes to deploy: %s",
" ".join([objects.Node.get_node_fqdn(n)
@ -348,6 +359,49 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
task_deployment.cache = deployment_message
db().commit()
if nodes_to_resetup:
logger.debug("There are nodes to resetup: %s",
", ".join([objects.Node.get_node_fqdn(n)
for n in nodes_to_resetup]))
if not deployment_message:
task_deployment = supertask.create_subtask(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending
)
# we should have task committed for processing in other threads
db().commit()
resetup_message = self._call_silently(
task_deployment,
tasks.DeploymentTask,
nodes_to_resetup,
reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES,
method_name='message'
)
db().commit()
task_deployment = objects.Task.get_by_uid(
task_deployment.id,
fail_if_not_found=True,
lock_for_update=True
)
# if failed to generate task message for orchestrator
# then task is already set to error
if task_deployment.status == consts.TASK_STATUSES.error:
return
if deployment_message:
deployment_message['args']['deployment_info'].extend(
resetup_message['args']['deployment_info']
)
else:
deployment_message = resetup_message
task_deployment.cache = deployment_message
db().commit()
if deployment_message:
task_messages.append(deployment_message)
# Even if we don't have nodes to deploy, the deployment task

View File

@ -149,7 +149,8 @@ class DeploymentTask(object):
return 'deploy'
@classmethod
def message(cls, task, nodes, deployment_tasks=None):
def message(cls, task, nodes, deployment_tasks=None,
reexecutable_filter=None):
logger.debug("DeploymentTask.message(task=%s)" % task.uuid)
deployment_tasks = deployment_tasks or []
@ -162,7 +163,7 @@ class DeploymentTask(object):
n.roles = n.roles + n.pending_roles
n.pending_roles = []
# If reciever for some reasons didn't update
# If receiver for some reasons didn't update
# node's status to provisioned when deployment
# started, we should do it in nailgun
if n.status in (consts.NODE_STATUSES.deploying,):
@ -173,6 +174,7 @@ class DeploymentTask(object):
orchestrator_graph = deployment_graph.AstuteGraph(task.cluster)
orchestrator_graph.only_tasks(deployment_tasks)
orchestrator_graph.reexecutable_tasks(reexecutable_filter)
# NOTE(dshulyak) At this point parts of the orchestration can be empty,
# it should not cause any issues with deployment/progress and was

View File

@ -1141,17 +1141,37 @@ class TestRolesSerializationWithPlugins(BaseDeploymentSerializer,
serializer = self._get_serializer(self.cluster)
serialized_data = serializer.serialize(
self.cluster, self.cluster.nodes)
self.assertItemsEqual(serialized_data[0]['tasks'], [{
'parameters': {
'cwd': '/',
'puppet_manifest': '/etc/puppet/manifests/site.pp',
'puppet_modules': '/etc/puppet/modules',
'timeout': 3600,
},
'priority': 100,
'type': 'puppet',
'uids': [self.cluster.nodes[0].uid],
}])
self.assertItemsEqual(serialized_data[0]['tasks'], [
{
'parameters': {
'puppet_modules': '/etc/puppet/modules',
'puppet_manifest': '/etc/puppet/modules/osnailyfacter/'
'modular/netconfig/netconfig.pp',
'timeout': 3600,
'cwd': '/'},
'priority': 100,
'type': 'puppet',
'uids': [self.cluster.nodes[0].uid],
}, {
'parameters': {
'cwd': '/',
'puppet_manifest': '/etc/puppet/manifests/site.pp',
'puppet_modules': '/etc/puppet/modules',
'timeout': 3600},
'priority': 200,
'type': 'puppet',
'uids': [self.cluster.nodes[0].uid],
}, {
'parameters': {
'puppet_modules': '/etc/puppet/modules',
'puppet_manifest': '/etc/puppet/modules/osnailyfacter/'
'modular/globals/globals.pp',
'timeout': 3600,
'cwd': '/'},
'priority': 300,
'type': 'puppet',
'uids': [self.cluster.nodes[0].uid],
}])
class TestNetworkTemplateSerializer70(BaseDeploymentSerializer,

View File

@ -18,6 +18,8 @@ from copy import deepcopy
import mock
import six
import nailgun
from nailgun import consts
from nailgun.db.sqlalchemy import models
from nailgun import objects
@ -112,6 +114,74 @@ class TestNetworkTemplateSerializer80(
self._check_baremetal_neutron_attrs(self.cluster)
class TestDeploymentTasksSerialization80(
TestSerializer80Mixin,
BaseDeploymentSerializer
):
manifests_to_rerun = set([
"/etc/puppet/modules/osnailyfacter/modular/globals/globals.pp",
"/etc/puppet/modules/osnailyfacter/modular/netconfig/netconfig.pp"])
def setUp(self):
super(TestDeploymentTasksSerialization80, self).setUp()
cluster = self.env.create(
release_kwargs={'version': self.env_version},
cluster_kwargs={
'mode': consts.CLUSTER_MODES.ha_compact,
'net_provider': consts.CLUSTER_NET_PROVIDERS.neutron,
'net_segment_type': consts.NEUTRON_SEGMENT_TYPES.vlan,
'status': consts.CLUSTER_STATUSES.operational},
nodes_kwargs=[
{'roles': ['controller'],
'status': consts.NODE_STATUSES.ready}]
)
self.cluster = self.db.query(models.Cluster).get(cluster['id'])
def add_node(self, role):
return self.env.create_node(
cluster_id=self.cluster.id,
pending_roles=[role],
pending_addition=True
)
def get_deployment_info(self):
self.env.launch_deployment()
args, kwargs = nailgun.task.manager.rpc.cast.call_args
return args[1][1]['args']['deployment_info']
@mock.patch('nailgun.rpc.cast')
def test_add_compute(self, _):
new_node = self.add_node('compute')
rpc_deploy_message = self.get_deployment_info()
for node in rpc_deploy_message:
tasks_for_node = set(t['parameters']['puppet_manifest']
for t in node['tasks'])
if node['tasks'][0]['uids'] == [str(new_node.id)]:
# all tasks are run on a new node
self.assertTrue(
self.manifests_to_rerun.issubset(tasks_for_node))
else:
# only selected tasks are run on a deployed node
self.assertEqual(self.manifests_to_rerun, tasks_for_node)
@mock.patch('nailgun.rpc.cast')
def test_add_controller(self, _):
self.add_node('controller')
rpc_deploy_message = self.get_deployment_info()
for node in rpc_deploy_message:
tasks_for_node = set(t['parameters']['puppet_manifest']
for t in node['tasks'])
# controller is redeployed when other one is added
# so all tasks are run on all nodes
self.assertTrue(
self.manifests_to_rerun.issubset(tasks_for_node))
class TestDeploymentAttributesSerialization80(
TestSerializer80Mixin,
BaseDeploymentSerializer

View File

@ -240,9 +240,14 @@ class TestTaskManagers(BaseIntegrationTest):
objects.Cluster.prepare_for_deployment(self.env.clusters[0])
self.env.launch_deployment()
args, kwargs = nailgun.task.manager.rpc.cast.call_args_list[1]
self.assertEqual(args[1][0]['method'], 'execute_tasks')
self.assertEqual(args[1][0]['respond_to'], 'deploy_resp')
args, _ = nailgun.task.manager.rpc.cast.call_args_list[1]
for message in args[1]:
if message['method'] == 'execute_tasks':
self.assertEqual(message['respond_to'], 'deploy_resp')
execute_tasks = message
break
else:
self.fail("'execute_tasks' method not found")
def is_upload_nodes(task):
return 'nodes.yaml' in task['parameters'].get('path', '')
@ -250,11 +255,11 @@ class TestTaskManagers(BaseIntegrationTest):
def is_update_hosts(task):
return 'hosts.pp' in task['parameters'].get('puppet_manifest', '')
tasks = args[1][0]['args']['tasks']
tasks = execute_tasks['args']['tasks']
self.assertIsNotNone(next((
t for t in tasks if is_upload_nodes(t)), None))
self.assertIsNotNone(next((
t for t in tasks if is_upload_nodes(t)), None))
t for t in tasks if is_update_hosts(t)), None))
@fake_tasks()
def test_do_not_redeploy_nodes_in_ready_status(self):