From 465460cf93f040c0d758eeeee72c22b071fc3e5f Mon Sep 17 00:00:00 2001 From: Bulat Gaifullin Date: Fri, 12 Feb 2016 23:46:27 +0300 Subject: [PATCH] Added support 'reexecute_on' flag for task deploy The flag 'reexecute_on' uses to execute some tasks on modification of operational cluster. Closes-Bug: 1546181 Implements: blueprint enable-task-based-deployment Change-Id: Idf8af662b22dec48042da49458eab3ea3e315c7d --- nailgun/nailgun/objects/cluster.py | 8 +- .../orchestrator/plugins_serializers.py | 4 +- .../orchestrator/task_based_deployment.py | 121 ++++++++----- nailgun/nailgun/task/manager.py | 74 +++----- nailgun/nailgun/task/task.py | 53 +++++- .../test/integration/test_task_deploy.py | 45 ++++- .../test/unit/test_task_based_deployment.py | 168 +++++++++++++----- 7 files changed, 314 insertions(+), 159 deletions(-) diff --git a/nailgun/nailgun/objects/cluster.py b/nailgun/nailgun/objects/cluster.py index d2d355f7c7..931c450f9e 100644 --- a/nailgun/nailgun/objects/cluster.py +++ b/nailgun/nailgun/objects/cluster.py @@ -819,17 +819,21 @@ class Cluster(NailgunObject): return nodes @classmethod - def get_nodes_by_status(cls, instance, status): + def get_nodes_by_status(cls, instance, status, exclude=None): """Get cluster nodes with particular status :param instance: cluster instance :param status: node status + :param exclude: the list of uids to exclude :return: filtered query on nodes """ - return db().query(models.Node).filter_by( + query = db().query(models.Node).filter_by( cluster_id=instance.id, status=status ) + if exclude: + query = query.filter(sa.not_(models.Node.id.in_(exclude))) + return query @classmethod def get_primary_node(cls, instance, role_name): diff --git a/nailgun/nailgun/orchestrator/plugins_serializers.py b/nailgun/nailgun/orchestrator/plugins_serializers.py index 88959711bd..021f249643 100644 --- a/nailgun/nailgun/orchestrator/plugins_serializers.py +++ b/nailgun/nailgun/orchestrator/plugins_serializers.py @@ -22,10 +22,12 @@ from nailgun import consts from nailgun.errors import errors from nailgun.logger import logger import nailgun.orchestrator.tasks_templates as templates -from nailgun.plugins.manager import PluginManager from nailgun.settings import settings from nailgun.utils.role_resolver import RoleResolver +# TODO(bgaifullin) HUCK to prevent cycle imports +from nailgun.plugins.manager import PluginManager + class BasePluginDeploymentHooksSerializer(object): # TODO(dshulyak) refactor it to be consistent with task_serializer diff --git a/nailgun/nailgun/orchestrator/task_based_deployment.py b/nailgun/nailgun/orchestrator/task_based_deployment.py index 6102473229..d09c851584 100644 --- a/nailgun/nailgun/orchestrator/task_based_deployment.py +++ b/nailgun/nailgun/orchestrator/task_based_deployment.py @@ -15,6 +15,7 @@ # under the License. import collections +import copy from distutils.version import StrictVersion import itertools @@ -371,38 +372,73 @@ class TaskProcessor(object): return chain_name + "_end" +class TaskEvents(object): + def __init__(self, channel, events): + """Initialises. + + :param channel: the channel name + :param events: the list of events, those have been occurred + """ + + self.channel = channel + self.events = frozenset(events) + + def check_subscription(self, task): + """Checks tasks subscription on events. + + :param task: the task description + :return: True if task is subscribed on events otherwise False + """ + subsciptions = task.get(self.channel) + return bool(subsciptions and self.events.intersection(subsciptions)) + + class TasksSerializer(object): """The deploy tasks serializer.""" - def __init__(self, cluster, nodes, task_ids=None): + def __init__(self, cluster, nodes, + affected_nodes=None, task_ids=None, events=None): """Initializes. :param cluster: Cluster instance :param nodes: the sequence of nodes for deploy + :param affected_nodes: the list of nodes, that affected by deployment :param task_ids: Only specified tasks will be executed, If None, all tasks will be executed + :param events: the events (see TaskEvents) """ + if affected_nodes: + self.affected_node_ids = frozenset(n.uid for n in affected_nodes) + self.deployment_nodes = copy.copy(nodes) + self.deployment_nodes.extend(affected_nodes) + else: + self.deployment_nodes = nodes + self.affected_node_ids = frozenset() self.cluster = cluster - self.role_resolver = RoleResolver(nodes) + self.role_resolver = RoleResolver(self.deployment_nodes) self.task_serializer = DeployTaskSerializer() self.task_processor = TaskProcessor() self.tasks_connections = collections.defaultdict(dict) self.tasks_dictionary = dict() self.task_filter = self.make_task_filter(task_ids) + self.events = events @classmethod - def serialize(cls, cluster, nodes, tasks, task_ids=None): + def serialize(cls, cluster, nodes, tasks, + affected_nodes=None, task_ids=None, events=None): """Resolves roles and dependencies for tasks. :param cluster: the cluster instance :param nodes: the list of nodes + :param affected_nodes: the list of nodes, that affected by deployment :param tasks: the list of tasks :param task_ids: Only specified tasks will be executed, If None, all tasks will be executed - :return: the tasks dictionary, the tasks connections + :param events: the events (see TaskEvents) + :return: the list of serialized task per node """ - serializer = cls(cluster, nodes, task_ids) - serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes) + serializer = cls(cluster, nodes, affected_nodes, task_ids, events) + serializer.resolve_nodes(add_plugin_deployment_hooks(tasks)) serializer.resolve_dependencies() tasks_dictionary = serializer.tasks_dictionary tasks_connections = serializer.tasks_connections @@ -412,11 +448,10 @@ class TasksSerializer(object): ) return tasks_dictionary, tasks_connections - def resolve_nodes(self, tasks, nodes): + def resolve_nodes(self, tasks): """Resolves node roles in tasks. :param tasks: the deployment tasks - :param nodes: the list of nodes to deploy :return the mapping tasks per node """ @@ -428,21 +463,18 @@ class TasksSerializer(object): groups.append(task) else: tasks_mapping[task['id']] = task - self.process_task( - task, nodes, lambda _: self.role_resolver, - skip=not self.task_filter(task['id']) - ) + skip = not self.task_filter(task['id']) + self.process_task(task, self.role_resolver, skip) self.expand_task_groups(groups, tasks_mapping) # make sure that null node is present self.tasks_connections.setdefault(None, dict()) - def process_task(self, task, nodes, resolver_factory, skip=False): + def process_task(self, task, role_resolver, skip=False): """Processes one task one nodes of cluster. :param task: the task instance - :param nodes: the list of nodes - :param resolver_factory: the factory creates role-resolver + :param role_resolver: the role resolver :param skip: make the task as skipped """ @@ -450,13 +482,15 @@ class TasksSerializer(object): task ) task_serializer = serializer_factory( - task, self.cluster, nodes, role_resolver=resolver_factory(nodes) + task, self.cluster, self.deployment_nodes, + role_resolver=role_resolver ) - # do not pass skipped attribute to astute - skipped = skip or task.pop('skipped', False) or \ - not task_serializer.should_execute() - for serialized in self.task_processor.process_tasks( - task, task_serializer.serialize()): + skipped = skip or not task_serializer.should_execute() + force = self.events and self.events.check_subscription(task) + serialised_tasks = self.task_processor.process_tasks( + task, task_serializer.serialize() + ) + for serialized in serialised_tasks: # all skipped task shall have type skipped # do not exclude them from graph to keep connections between nodes @@ -476,12 +510,16 @@ class TasksSerializer(object): node_ids = serialized.pop('uids', ()) self.tasks_dictionary[serialized['id']] = serialized for node_id in node_ids: + node_task = task_relations.copy() + if not force and node_id in self.affected_node_ids: + node_task['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped + node_tasks = self.tasks_connections[node_id] # de-duplication the tasks on node # since task can be added after expand group need to # overwrite if existed task is skipped and new is not skipped. - if self.need_update_task(node_tasks, serialized): - node_tasks[serialized['id']] = task_relations.copy() + if self.need_update_task(node_tasks, node_task): + node_tasks[serialized['id']] = node_task def resolve_dependencies(self): """Resolves tasks dependencies.""" @@ -489,18 +527,22 @@ class TasksSerializer(object): for node_id, tasks in six.iteritems(self.tasks_connections): for task in six.itervalues(tasks): requires = set(self.expand_dependencies( - node_id, task.pop('requires'), False + node_id, task.pop('requires'), + self.task_processor.get_last_task_id )) requires.update(self.expand_cross_dependencies( - node_id, task.pop('cross-depends', None), False + node_id, task.pop('cross-depends', None), + self.task_processor.get_last_task_id )) requires.update(task.pop('requires_ex', ())) required_for = set(self.expand_dependencies( - node_id, task.pop('required_for'), True + node_id, task.pop('required_for'), + self.task_processor.get_first_task_id )) required_for.update(self.expand_cross_dependencies( - node_id, task.pop('cross-depended-by', None), True + node_id, task.pop('cross-depended-by', None), + self.task_processor.get_first_task_id )) required_for.update(task.pop('required_for_ex', ())) # render @@ -535,16 +577,16 @@ class TasksSerializer(object): # if group is not excluded, all task should be run as well # otherwise check each task individually self.process_task( - sub_task, node_ids, NullResolver, + sub_task, NullResolver(node_ids), skip=skipped and not self.task_filter(sub_task_id) ) - def expand_dependencies(self, node_id, dependencies, is_required_for): + def expand_dependencies(self, node_id, dependencies, task_resolver): """Expands task dependencies on same node. :param node_id: the ID of target node :param dependencies: the list of dependencies on same node - :param is_required_for: means task from required_for section + :param task_resolver: the task name resolver """ if not dependencies: return @@ -552,16 +594,15 @@ class TasksSerializer(object): # need to search dependencies on node and in sync points node_ids = [node_id, None] for name in dependencies: - for rel in self.resolve_relation(name, node_ids, is_required_for): + for rel in self.resolve_relation(name, node_ids, task_resolver): yield rel - def expand_cross_dependencies( - self, node_id, dependencies, is_required_for): + def expand_cross_dependencies(self, node_id, dependencies, task_resolver): """Expands task dependencies on same node. :param node_id: the ID of target node :param dependencies: the list of cross-node dependencies - :param is_required_for: means task from required_for section + :param task_resolver: the task name resolver """ if not dependencies: return @@ -576,17 +617,17 @@ class TasksSerializer(object): roles, dep.get('policy', consts.NODE_RESOLVE_POLICY.all) ) relations = self.resolve_relation( - dep['name'], node_ids, is_required_for + dep['name'], node_ids, task_resolver ) for rel in relations: yield rel - def resolve_relation(self, name, node_ids, is_required_for): + def resolve_relation(self, name, node_ids, task_resolver): """Resolves the task relation. :param name: the name of task :param node_ids: the ID of nodes where need to search - :param is_required_for: means task from required_for section + :param task_resolver: the task name resolver """ match_policy = NameMatchingPolicy.create(name) for node_id in node_ids: @@ -607,11 +648,7 @@ class TasksSerializer(object): applied_tasks.add(original_task) if original_task is not task_name: - if is_required_for: - task_name_gen = self.task_processor.get_first_task_id - else: - task_name_gen = self.task_processor.get_last_task_id - task_name = task_name_gen(original_task) + task_name = task_resolver(original_task) yield task_name, node_id diff --git a/nailgun/nailgun/task/manager.py b/nailgun/nailgun/task/manager.py index f4a75076a6..882aac5a76 100644 --- a/nailgun/nailgun/task/manager.py +++ b/nailgun/nailgun/task/manager.py @@ -250,7 +250,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin): """ nodes_to_delete = [] - nodes_to_resetup = [] + affected_nodes = [] if nodes_to_provision_deploy: nodes_to_deploy = objects.NodeCollection.get_by_ids( @@ -282,13 +282,14 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin): if self.cluster.status == consts.CLUSTER_STATUSES.operational: # rerun particular tasks on all deployed nodes - affected_nodes = set( - nodes_to_deploy + nodes_to_provision + nodes_to_delete) - ready_nodes = set(objects.Cluster.get_nodes_by_status( + modified_node_ids = {n.id for n in nodes_to_deploy} + modified_node_ids.update(n.id for n in nodes_to_provision) + modified_node_ids.update(n.id for n in nodes_to_delete) + affected_nodes = objects.Cluster.get_nodes_by_status( self.cluster, - consts.NODE_STATUSES.ready - )) - nodes_to_resetup = ready_nodes.difference(affected_nodes) + status=consts.NODE_STATUSES.ready, + exclude=modified_node_ids + ).all() task_deletion, task_provision, task_deployment = None, None, None @@ -332,10 +333,16 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin): task_messages.append(provision_message) deployment_message = None - if nodes_to_deploy: - logger.debug("There are nodes to deploy: %s", - " ".join([objects.Node.get_node_fqdn(n) - for n in nodes_to_deploy])) + if nodes_to_deploy or affected_nodes: + if nodes_to_deploy: + logger.debug("There are nodes to deploy: %s", + " ".join((objects.Node.get_node_fqdn(n) + for n in nodes_to_deploy))) + if affected_nodes: + logger.debug("There are nodes affected by deployment: %s", + " ".join((objects.Node.get_node_fqdn(n) + for n in affected_nodes))) + task_deployment = supertask.create_subtask( name=consts.TASK_NAMES.deployment, status=consts.TASK_STATUSES.pending @@ -347,8 +354,10 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin): task_deployment, tasks.DeploymentTask, nodes_to_deploy, + affected_nodes=affected_nodes, deployment_tasks=deployment_tasks, - method_name='message' + method_name='message', + reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES ) db().commit() @@ -365,47 +374,6 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin): task_deployment.cache = deployment_message db().commit() - if nodes_to_resetup: - logger.debug("There are nodes to resetup: %s", - ", ".join([objects.Node.get_node_fqdn(n) - for n in nodes_to_resetup])) - - if not deployment_message: - task_deployment = supertask.create_subtask( - name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.pending - ) - # we should have task committed for processing in other threads - db().commit() - - resetup_message = self._call_silently( - task_deployment, - tasks.DeploymentTask, - nodes_to_resetup, - reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES, - method_name='message' - ) - db().commit() - - task_deployment = objects.Task.get_by_uid( - task_deployment.id, - fail_if_not_found=True, - lock_for_update=True - ) - # if failed to generate task message for orchestrator - # then task is already set to error - if task_deployment.status == consts.TASK_STATUSES.error: - return - - if deployment_message: - deployment_message['args']['deployment_info'].extend( - resetup_message['args']['deployment_info'] - ) - else: - deployment_message = resetup_message - task_deployment.cache = deployment_message - db().commit() - if deployment_message: task_messages.append(deployment_message) diff --git a/nailgun/nailgun/task/task.py b/nailgun/nailgun/task/task.py index 98824ba04c..e36def52cb 100644 --- a/nailgun/nailgun/task/task.py +++ b/nailgun/nailgun/task/task.py @@ -154,8 +154,18 @@ class DeploymentTask(object): return 'deploy' @classmethod - def message(cls, task, nodes, deployment_tasks=None, + def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None, reexecutable_filter=None): + """Builds RPC message for deployment task. + + :param task: the database task object instance + :param nodes: the nodes for deployment + :param affected_nodes: the list of nodes is affected by deployment + :deployment_tasks: the list of tasks_ids to execute, + if None, all tasks will be executed + :reexecutable_filter: the list of events to find subscribed tasks + :return: the RPC message + """ logger.debug("DeploymentTask.message(task=%s)" % task.uuid) task_ids = deployment_tasks or [] @@ -178,7 +188,7 @@ class DeploymentTask(object): while True: try: message = getattr(cls, deployment_mode)( - task, nodes, task_ids, reexecutable_filter + task, nodes, affected_nodes, task_ids, reexecutable_filter ) break except errors.TaskBaseDeploymentNotAllowed: @@ -201,10 +211,19 @@ class DeploymentTask(object): return rpc_message @classmethod - def granular_deploy(cls, task, nodes, task_ids, reexecutable_filter): + def granular_deploy(cls, task, nodes, affected_nodes, task_ids, events): + """Builds parameters for granular deployment. + + :param task: the database task object instance + :param nodes: the nodes for deployment + :param affected_nodes: the list of nodes is affected by deployment + :task_ids: the list of tasks_ids to execute, + if None, all tasks will be executed + :events: the list of events to find subscribed tasks + :return: the arguments for RPC message + """ orchestrator_graph = deployment_graph.AstuteGraph(task.cluster) orchestrator_graph.only_tasks(task_ids) - orchestrator_graph.reexecutable_tasks(reexecutable_filter) # NOTE(dshulyak) At this point parts of the orchestration can be empty, # it should not cause any issues with deployment/progress and was @@ -212,6 +231,13 @@ class DeploymentTask(object): role_resolver = RoleResolver(nodes) serialized_cluster = deployment_serializers.serialize( orchestrator_graph, task.cluster, nodes) + + if affected_nodes: + orchestrator_graph.reexecutable_tasks(events) + serialized_cluster.extend(deployment_serializers.serialize( + orchestrator_graph, task.cluster, affected_nodes + )) + nodes = nodes + affected_nodes pre_deployment = stages.pre_deployment_serialize( orchestrator_graph, task.cluster, nodes, role_resolver=role_resolver) @@ -228,16 +254,31 @@ class DeploymentTask(object): deploy = granular_deploy @classmethod - def task_deploy(cls, task, nodes, task_ids, reexecutable_filter): + def task_deploy(cls, task, nodes, affected_nodes, task_ids, events): + """Builds parameters for task based deployment. + + :param task: the database task object instance + :param nodes: the nodes for deployment + :param affected_nodes: the list of nodes is affected by deployment + :task_ids: the list of tasks_ids to execute, + if None, all tasks will be executed + :events: the list of events to find subscribed tasks + :return: the arguments for RPC message + """ + deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster) logger.debug("start cluster serialization.") serialized_cluster = deployment_serializers.serialize( None, task.cluster, nodes ) logger.debug("finish cluster serialization.") + tasks_events = events and \ + task_based_deployment.TaskEvents('reexecute_on', events) + logger.debug("start tasks serialization.") directory, graph = task_based_deployment.TasksSerializer.serialize( - task.cluster, nodes, deployment_tasks, task_ids + task.cluster, nodes, deployment_tasks, affected_nodes, + task_ids, tasks_events ) logger.debug("finish tasks serialization.") return { diff --git a/nailgun/nailgun/test/integration/test_task_deploy.py b/nailgun/nailgun/test/integration/test_task_deploy.py index befc35fd1d..32ab6b892a 100644 --- a/nailgun/nailgun/test/integration/test_task_deploy.py +++ b/nailgun/nailgun/test/integration/test_task_deploy.py @@ -69,10 +69,9 @@ class TestTaskDeploy(BaseIntegrationTest): ) self.db.flush() - @fake_tasks(mock_rpc=False, fake_rpc=False) - @mock.patch('nailgun.rpc.cast') + @fake_tasks(mock_rpc=True, fake_rpc=False) def get_deploy_message(self, rpc_cast): - task = self.env.launch_deployment() + task = self.env.launch_deployment(self.cluster.id) self.assertNotEqual(consts.TASK_STATUSES.error, task.status) args, kwargs = rpc_cast.call_args return args[1][1] @@ -141,9 +140,8 @@ class TestTaskDeploy(BaseIntegrationTest): .format(sorted(expected_tasks)) ) - @fake_tasks(mock_rpc=False, fake_rpc=False) @mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed") - @mock.patch('nailgun.rpc.cast') + @fake_tasks(mock_rpc=True, fake_rpc=False) def test_task_deploy_specified_tasks(self, rpc_cast, *_): compute = next( (x for x in self.env.nodes if 'compute' in x.roles), None @@ -175,10 +173,8 @@ class TestTaskDeploy(BaseIntegrationTest): if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped) ) - @fake_tasks(mock_rpc=False, fake_rpc=False) @mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed") - @mock.patch('nailgun.rpc.cast') - def test_task_deploy_all_tasks(self, rpc_cast, *_): + def test_task_deploy_all_tasks(self, *_): compute = next( (x for x in self.env.nodes if 'compute' in x.roles), None ) @@ -194,3 +190,36 @@ class TestTaskDeploy(BaseIntegrationTest): {task["id"] for task in deploy_tasks[compute.uid] if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped} ) + + def check_reexecute_task_on_cluster_update(self): + node = next( + (n for n in self.env.nodes + if n.status == consts.NODE_STATUSES.ready), + None + ) + self.assertIsNotNone(node) + message = self.get_deploy_message() + deploy_tasks = message['args']['tasks_graph'] + # netconfig has attribute reexecute_on + self.assertIn( + "netconfig", + {task["id"] for task in deploy_tasks[node.uid] + if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped} + ) + self.db().refresh(self.cluster) + + @mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed") + @fake_tasks(mock_rpc=True, fake_rpc=True, + override_state={'status': consts.NODE_STATUSES.ready}) + def test_task_executed_on_adding_node(self, *_): + self.env.wait_ready(self.env.launch_deployment(self.cluster.id)) + self.db().refresh(self.cluster) + self.assertEqual( + consts.CLUSTER_STATUSES.operational, self.cluster.status + ) + self.env.create_node( + api=False, cluster_id=self.cluster.id, + roles=["compute"], + pending_addition=True + ) + self.check_reexecute_task_on_cluster_update() diff --git a/nailgun/nailgun/test/unit/test_task_based_deployment.py b/nailgun/nailgun/test/unit/test_task_based_deployment.py index 699d0fe339..2cc6f769d7 100644 --- a/nailgun/nailgun/test/unit/test_task_based_deployment.py +++ b/nailgun/nailgun/test/unit/test_task_based_deployment.py @@ -128,7 +128,7 @@ class TestTaskSerializers(BaseTestCase): }, ] serialized = self.serializer.serialize( - self.env.clusters[-1], self.env.nodes, tasks, ids + self.env.clusters[-1], self.env.nodes, tasks, task_ids=ids )[1] controllers = [ n.uid for n in self.env.nodes if "controller" in n.roles @@ -183,49 +183,36 @@ class TestTaskSerializers(BaseTestCase): "id": "test", "type": "puppet", "parameters": {}, "version": "2.0.0" } + node_id = self.env.nodes[-1].uid self.serializer.process_task( - task, ["1"], task_based_deployment.NullResolver + task, task_based_deployment.NullResolver([node_id]) ) # check de-duplication self.serializer.process_task( - task, ["1"], task_based_deployment.NullResolver + task, task_based_deployment.NullResolver([node_id]) ) - self.assertItemsEqual(["1"], self.serializer.tasks_connections) - self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) - self.assertEqual( - "test", self.serializer.tasks_connections["1"]["test"]["id"] + self.assertItemsEqual([node_id], self.serializer.tasks_connections) + self.assertItemsEqual( + ["test"], + self.serializer.tasks_connections[node_id] ) self.assertEqual( - "puppet", self.serializer.tasks_connections["1"]["test"]["type"] + "test", + self.serializer.tasks_connections[node_id]["test"]["id"] + ) + self.assertEqual( + "puppet", + self.serializer.tasks_connections[node_id]["test"]["type"] ) self.assertNotIn( - "skipped", self.serializer.tasks_connections["1"]["test"]["type"] - ) - - def test_process_skipped_task(self): - task = { - "id": "test", "type": "puppet", "version": "2.0.0", - "parameters": {}, 'skipped': True, - } - self.serializer.process_task( - task, ["1"], task_based_deployment.NullResolver - ) - self.assertItemsEqual(["1"], self.serializer.tasks_connections) - self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) - self.assertEqual( - "test", self.serializer.tasks_connections["1"]["test"]["id"] - ) - self.assertEqual( - "skipped", self.serializer.tasks_connections["1"]["test"]["type"] - ) - self.assertNotIn( - "skipped", self.serializer.tasks_connections["1"]["test"] + "skipped", + self.serializer.tasks_connections[node_id]["test"]["type"] ) def test_process_noop_task(self): task = {"id": "test", "type": "stage", "role": "*"} self.serializer.process_task( - task, ["1"], task_based_deployment.NullResolver + task, task_based_deployment.NullResolver(["1"]) ) self.assertItemsEqual(["1"], self.serializer.tasks_connections) self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"]) @@ -351,27 +338,43 @@ class TestTaskSerializers(BaseTestCase): self.serializer.resolve_relation('/task/', node_ids, True) ) - def test_resolve_relation_in_chain(self): - node_ids = ['1', '2', '3'] - self.serializer.tasks_connections = dict( - (node_id, ['task_{0}'.format(node_id)]) - for node_id in node_ids - ) - self.serializer.task_processor.origin_task_ids = { - 'task_1': 'task', 'task_2': 'task', 'task_3': 'task2' + def test_resolve_relations(self): + node_ids = ['1', '2'] + task_params = { + "requires": ["task_2"], + "required_for": ["task"], + "cross-depends": [{"role": "/.*/", "name": "task"}], + "cross-depended-by": [{"role": "/.*/", "name": "task_2"}] } - self.serializer.tasks_connections['1'].append('task_2') + self.serializer.tasks_connections = { + node_id: { + "task_1": task_params.copy(), + "task_2": task_params.copy() + } for node_id in node_ids + } + self.serializer.tasks_connections[None] = {} + self.serializer.task_processor.origin_task_ids = { + 'task_1': 'task' + } + self.serializer.role_resolver = task_based_deployment.NullResolver( + node_ids + ) + self.serializer.resolve_dependencies() self.assertItemsEqual( - [('task_start', '1'), ('task_start', '2')], - self.serializer.resolve_relation('task', node_ids, True) + [ + {'node_id': '1', 'name': 'task_2'}, + {'node_id': '1', 'name': 'task_start'}, + {'node_id': '2', 'name': 'task_2'} + ], + self.serializer.tasks_connections["1"]["task_1"]["required_for"] ) self.assertItemsEqual( - [('task_end', '1'), ('task_end', '2')], - self.serializer.resolve_relation('task', node_ids, False) - ) - self.assertItemsEqual( - [('task_1', '1')], - self.serializer.resolve_relation('task_1', node_ids, False) + [ + {'node_id': '1', 'name': 'task_2'}, + {'node_id': '1', 'name': 'task_end'}, + {'node_id': '2', 'name': 'task_end'} + ], + self.serializer.tasks_connections["1"]["task_1"]["requires"] ) def test_need_update_task(self): @@ -390,6 +393,77 @@ class TestTaskSerializers(BaseTestCase): {"task1": {"type": "puppet"}}, {"id": "task1", "type": "skipped"} )) + def test_deploy_only_selected_nodes(self): + tasks = [ + { + "id": "test1", "role": ["controller"], + "type": "puppet", "version": "2.0.0", "parameters": {} + }, + { + "id": "test2", "role": ["compute"], + "type": "puppet", "version": "2.0.0", "parameters": {} + } + ] + controllers = [ + n for n in self.env.nodes if "controller" in n.roles + ] + serialized = self.serializer.serialize( + self.env.clusters[-1], controllers, tasks + )[1] + # serialised contains also master node + self.assertItemsEqual( + [n.uid for n in controllers] + [None], + serialized + ) + self.assertItemsEqual( + [("test1", "puppet")], + ((x["id"], x["type"]) for x in serialized[controllers[0].uid]) + ) + + def test_serialise_with_events(self): + tasks = [ + { + "id": "test1", "role": ["controller"], + "type": "puppet", "version": "2.0.0", "parameters": {} + }, + { + "id": "test2", "role": ["compute"], + "type": "puppet", "version": "2.0.0", "parameters": {}, + "reexecute_on": ["deploy"] + }, + { + "id": "test3", "role": ["compute"], + "type": "puppet", "version": "2.0.0", "parameters": {} + }, + { + "id": "test4", "role": ["cinder"], + "type": "puppet", "version": "2.0.0", "parameters": {} + } + ] + controllers = [ + n for n in self.env.nodes if "controller" in n.roles + ] + computes = [ + n for n in self.env.nodes if "compute" in n.roles + ] + events = task_based_deployment.TaskEvents('reexecute_on', {'deploy'}) + serialized = task_based_deployment.TasksSerializer.serialize( + self.env.clusters[-1], controllers, tasks, computes, events=events + )[1] + # serialised contains also master node + self.assertItemsEqual( + [n.uid for n in (controllers + computes)] + [None], + serialized + ) + self.assertItemsEqual( + [("test1", "puppet")], + ((x["id"], x["type"]) for x in serialized[controllers[0].uid]) + ) + self.assertItemsEqual( + [("test2", "puppet"), ("test3", "skipped")], + ((x["id"], x["type"]) for x in serialized[computes[0].uid]) + ) + class TestNoopSerializer(BaseTestCase): def setUp(self):