Merge "Moved parameters of tasks from graph to separate dict"

This commit is contained in:
Jenkins 2016-02-29 20:43:44 +00:00 committed by Gerrit Code Review
commit 81aa3d727b
4 changed files with 150 additions and 112 deletions

View File

@ -15,7 +15,6 @@
# under the License.
import collections
import copy
from distutils.version import StrictVersion
import itertools
@ -356,7 +355,7 @@ class TaskProcessor(object):
requires_ex = current.setdefault('requires_ex', [])
for node_id in previous.get('uids', ()):
requires_ex.append(
{'name': previous['id'], 'node_id': node_id}
(previous['id'], node_id)
)
@staticmethod
@ -387,7 +386,8 @@ class TasksSerializer(object):
self.role_resolver = RoleResolver(nodes)
self.task_serializer = DeployTaskSerializer()
self.task_processor = TaskProcessor()
self.tasks_per_node = collections.defaultdict(dict)
self.tasks_connections = collections.defaultdict(dict)
self.tasks_dictionary = dict()
self.task_filter = self.make_task_filter(task_ids)
@classmethod
@ -399,15 +399,18 @@ class TasksSerializer(object):
:param tasks: the list of tasks
:param task_ids: Only specified tasks will be executed,
If None, all tasks will be executed
:return: the list of serialized task per node
:return: the tasks dictionary, the tasks connections
"""
serializer = cls(cluster, nodes, task_ids)
serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes)
serializer.resolve_dependencies()
return dict(
(k, list(six.itervalues(v)))
for k, v in six.iteritems(serializer.tasks_per_node)
)
tasks_dictionary = serializer.tasks_dictionary
tasks_connections = serializer.tasks_connections
for node_id in tasks_connections:
tasks_connections[node_id] = list(
six.itervalues(tasks_connections[node_id])
)
return tasks_dictionary, tasks_connections
def resolve_nodes(self, tasks, nodes):
"""Resolves node roles in tasks.
@ -432,7 +435,7 @@ class TasksSerializer(object):
self.expand_task_groups(groups, tasks_mapping)
# make sure that null node is present
self.tasks_per_node.setdefault(None, dict())
self.tasks_connections.setdefault(None, dict())
def process_task(self, task, nodes, resolver_factory, skip=False):
"""Processes one task one nodes of cluster.
@ -452,50 +455,65 @@ class TasksSerializer(object):
# do not pass skipped attribute to astute
skipped = skip or task.pop('skipped', False) or \
not task_serializer.should_execute()
for astute_task in self.task_processor.process_tasks(
for serialized in self.task_processor.process_tasks(
task, task_serializer.serialize()):
# all skipped task shall have type skipped
# do not exclude them from graph to keep connections between nodes
if skipped:
astute_task['type'] = \
consts.ORCHESTRATOR_TASK_TYPES.skipped
for node_id in astute_task.pop('uids', ()):
node_tasks = self.tasks_per_node[node_id]
if skipped:
task_type = consts.ORCHESTRATOR_TASK_TYPES.skipped
else:
task_type = serialized['type']
task_relations = {
'id': serialized['id'],
'type': task_type,
'requires': serialized.pop('requires', []),
'required_for': serialized.pop('required_for', []),
'cross-depends': serialized.pop('cross-depends', []),
'cross-depended-by': serialized.pop('cross-depended-by', []),
}
node_ids = serialized.pop('uids', ())
self.tasks_dictionary[serialized['id']] = serialized
for node_id in node_ids:
node_tasks = self.tasks_connections[node_id]
# de-duplication the tasks on node
# since task can be added after expand group need to
# overwrite if existed task is skipped and new is not skipped.
if self.need_update_task(node_tasks, astute_task):
node_tasks[astute_task['id']] = copy.deepcopy(astute_task)
if self.need_update_task(node_tasks, serialized):
node_tasks[serialized['id']] = task_relations.copy()
def resolve_dependencies(self):
"""Resolves tasks dependencies."""
for node_id, tasks in six.iteritems(self.tasks_per_node):
for node_id, tasks in six.iteritems(self.tasks_connections):
for task in six.itervalues(tasks):
task['requires'] = list(
self.expand_dependencies(
node_id, task.get('requires'), False
)
)
task['required_for'] = list(
self.expand_dependencies(
node_id, task.get('required_for'), True
)
)
task['requires'].extend(
self.expand_cross_dependencies(
node_id, task.pop('cross-depends', None), False
)
)
requires = set(self.expand_dependencies(
node_id, task.pop('requires'), False
))
requires.update(self.expand_cross_dependencies(
node_id, task.pop('cross-depends', None), False
))
requires.update(task.pop('requires_ex', ()))
task['required_for'].extend(
self.expand_cross_dependencies(
node_id, task.pop('cross-depended-by', None), True
)
)
task['requires'].extend(task.pop('requires_ex', ()))
task['required_for'].extend(task.pop('required_for_ex', ()))
required_for = set(self.expand_dependencies(
node_id, task.pop('required_for'), True
))
required_for.update(self.expand_cross_dependencies(
node_id, task.pop('cross-depended-by', None), True
))
required_for.update(task.pop('required_for_ex', ()))
# render
if requires:
task['requires'] = [
dict(six.moves.zip(('name', 'node_id'), r))
for r in requires
]
if required_for:
task['required_for'] = [
dict(six.moves.zip(('name', 'node_id'), r))
for r in required_for
]
def expand_task_groups(self, groups, task_mapping):
"""Expand group of tasks.
@ -573,11 +591,11 @@ class TasksSerializer(object):
match_policy = NameMatchingPolicy.create(name)
for node_id in node_ids:
applied_tasks = set()
for task_name in self.tasks_per_node[node_id]:
for task_name in self.tasks_connections[node_id]:
if task_name == name:
# the simple case when name of current task
# is exact math to name of task that is search
yield {"name": task_name, "node_id": node_id}
yield task_name, node_id
continue
# at first get the original task name, actual
@ -595,7 +613,7 @@ class TasksSerializer(object):
task_name_gen = self.task_processor.get_last_task_id
task_name = task_name_gen(original_task)
yield {"name": task_name, "node_id": node_id}
yield task_name, node_id
@classmethod
def need_update_task(cls, tasks, task):

View File

@ -230,15 +230,21 @@ class DeploymentTask(object):
@classmethod
def task_deploy(cls, task, nodes, task_ids, reexecutable_filter):
deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster)
logger.debug("start cluster serialization.")
serialized_cluster = deployment_serializers.serialize(
None, task.cluster, nodes
)
serialized_tasks = task_based_deployment.TasksSerializer.serialize(
logger.debug("finish cluster serialization.")
logger.debug("start tasks serialization.")
directory, graph = task_based_deployment.TasksSerializer.serialize(
task.cluster, nodes, deployment_tasks, task_ids
)
logger.debug("finish tasks serialization.")
return {
"deployment_info": serialized_cluster,
"deployment_tasks": serialized_tasks
"tasks_directory": directory,
"tasks_graph": graph
}

View File

@ -82,7 +82,8 @@ class TestTaskDeploy(BaseIntegrationTest):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
["task_uuid", "deployment_info", "deployment_tasks"],
["task_uuid", "deployment_info",
"tasks_directory", "tasks_graph"],
message["args"]
)
@ -122,7 +123,7 @@ class TestTaskDeploy(BaseIntegrationTest):
(x.uid for x in self.env.nodes if 'compute' in x.roles), None
)
self.assertIsNotNone(compute_uid)
compute_tasks = message['args']['deployment_tasks'][compute_uid]
compute_tasks = message['args']['tasks_graph'][compute_uid]
expected_tasks = {
consts.PLUGIN_PRE_DEPLOYMENT_HOOK + "_start",
@ -167,10 +168,10 @@ class TestTaskDeploy(BaseIntegrationTest):
).status
)
deploy_tasks = rpc_cast.call_args[0][1]['args']['deployment_tasks']
links = rpc_cast.call_args[0][1]['args']['tasks_graph']
self.assertItemsEqual(
["deploy_legacy"],
(task["id"] for task in deploy_tasks[compute.uid]
(task["id"] for task in links[compute.uid]
if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped)
)
@ -187,7 +188,7 @@ class TestTaskDeploy(BaseIntegrationTest):
self.db.flush()
message = self.get_deploy_message()
deploy_tasks = message['args']['deployment_tasks']
deploy_tasks = message['args']['tasks_graph']
self.assertIn(
"netconfig",
{task["id"] for task in deploy_tasks[compute.uid]

View File

@ -38,18 +38,28 @@ class TestTaskSerializers(BaseTestCase):
self.env.clusters[-1], self.env.nodes
)
def test_serialize(self):
def test_serialize_result(self):
tasks = [
{
"id": "test1", "role": ["controller"],
"type": "stage", "version": "2.0.0"
"id": "task1", "role": ["controller"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 1'"},
"cross-depends": [{"name": "task3", "role": ["compute"]}]
},
{
"id": "test2", "role": ["compute"],
"type": "stage", "version": "2.0.0"
"id": "task2", "role": ["controller"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 1'"},
"requires": ["task1"]
},
{
"id": "task3", "role": ["compute"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 2'"},
"cross-depended-by": [{"name": "task2", "role": "*"}]
},
]
serialized = self.serializer.serialize(
tasks, connections = self.serializer.serialize(
self.env.clusters[-1], self.env.nodes, tasks
)
controllers = [
@ -62,12 +72,27 @@ class TestTaskSerializers(BaseTestCase):
self.assertEqual(1, len(controllers))
self.assertEqual(1, len(computes))
self.assertItemsEqual(
["test1"],
(x["id"] for x in serialized[controllers[0]])
[
{
"id": "task1",
"type": "shell",
"requires": [{"name": "task3", "node_id": computes[0]}]
},
{
"id": "task2",
"type": "shell",
"requires": [{"name": "task1", "node_id": controllers[0]}],
},
],
connections[controllers[0]]
)
self.assertItemsEqual(
["test2"],
(x["id"] for x in serialized[computes[0]])
[{
"id": "task3",
"type": "shell",
"required_for": [{"name": "task2", "node_id": controllers[0]}]
}],
connections[computes[0]]
)
def test_serialize_fail_if_all_task_have_not_version_2(self):
@ -104,7 +129,7 @@ class TestTaskSerializers(BaseTestCase):
]
serialized = self.serializer.serialize(
self.env.clusters[-1], self.env.nodes, tasks, ids
)
)[1]
controllers = [
n.uid for n in self.env.nodes if "controller" in n.roles
]
@ -165,16 +190,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"puppet", self.serializer.tasks_per_node["1"]["test"]["type"]
"puppet", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
def test_process_skipped_task(self):
@ -185,16 +210,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]
"skipped", self.serializer.tasks_connections["1"]["test"]
)
def test_process_noop_task(self):
@ -202,16 +227,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]
"skipped", self.serializer.tasks_connections["1"]["test"]
)
def test_expand_task_groups(self):
@ -230,30 +255,30 @@ class TestTaskSerializers(BaseTestCase):
'type': 'skipped', 'role': '*'}
}
)
self.assertIn('1', self.serializer.tasks_per_node)
self.assertIn('2', self.serializer.tasks_per_node)
self.assertIn('1', self.serializer.tasks_connections)
self.assertIn('2', self.serializer.tasks_connections)
self.assertItemsEqual(
['task1', 'task2'],
self.serializer.tasks_per_node['1']
self.serializer.tasks_connections['1']
)
self.assertItemsEqual(
self.serializer.tasks_per_node['1'],
self.serializer.tasks_per_node['2']
self.serializer.tasks_connections['1'],
self.serializer.tasks_connections['2']
)
def test_expand_dependencies_on_same_node(self):
node_ids = ['1', '2']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.serializer.tasks_per_node[None] = ['sync_point']
self.serializer.tasks_connections[None] = ['sync_point']
self.assertItemsEqual(
[{'name': 'sync_point', 'node_id': None}],
[('sync_point', None)],
self.serializer.expand_dependencies('1', ['sync_point'], False)
)
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_dependencies('1', ['/task/'], False)
)
@ -271,7 +296,7 @@ class TestTaskSerializers(BaseTestCase):
def test_expand_cross_dependencies(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
@ -279,7 +304,7 @@ class TestTaskSerializers(BaseTestCase):
m_resolve.resolve.return_value = node_ids
# the default role and policy
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_cross_dependencies(
'2', [{'name': 'task_1'}], True
)
@ -289,7 +314,7 @@ class TestTaskSerializers(BaseTestCase):
)
# concrete role and policy
self.assertItemsEqual(
[{'name': 'task_2', 'node_id': '2'}],
[('task_2', '2')],
self.serializer.expand_cross_dependencies(
'2',
[{'name': 'task_2', 'role': ['role'], 'policy': 'any'}],
@ -302,7 +327,7 @@ class TestTaskSerializers(BaseTestCase):
m_resolve.resolve.reset_mock()
# use self as role
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_cross_dependencies(
'1',
[{'name': 'task_1', 'role': 'self'}],
@ -313,45 +338,39 @@ class TestTaskSerializers(BaseTestCase):
def test_resolve_relation_when_no_chains(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.assertItemsEqual(
[{'node_id': '1', 'name': 'task_1'}],
[('task_1', '1')],
self.serializer.resolve_relation('task_1', node_ids, True)
)
self.assertItemsEqual(
({'node_id': i, 'name': 'task_{0}'.format(i)} for i in node_ids),
(('task_{0}'.format(i), i) for i in node_ids),
self.serializer.resolve_relation('/task/', node_ids, True)
)
def test_resolve_relation_in_chain(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.serializer.task_processor.origin_task_ids = {
'task_1': 'task', 'task_2': 'task', 'task_3': 'task2'
}
self.serializer.tasks_per_node['1'].append('task_2')
self.serializer.tasks_connections['1'].append('task_2')
self.assertItemsEqual(
[
{'node_id': '1', 'name': 'task_start'},
{'node_id': '2', 'name': 'task_start'},
],
[('task_start', '1'), ('task_start', '2')],
self.serializer.resolve_relation('task', node_ids, True)
)
self.assertItemsEqual(
[
{'node_id': '1', 'name': 'task_end'},
{'node_id': '2', 'name': 'task_end'}
],
[('task_end', '1'), ('task_end', '2')],
self.serializer.resolve_relation('task', node_ids, False)
)
self.assertItemsEqual(
[{'node_id': '1', 'name': 'task_1'}],
[('task_1', '1')],
self.serializer.resolve_relation('task_1', node_ids, False)
)
@ -507,19 +526,13 @@ class TestTaskProcessor(BaseTestCase):
}
self.processor._link_tasks(previous, current)
self.assertItemsEqual(
(
{'name': 'test_task_start', 'node_id': n}
for n in previous['uids']
),
(('test_task_start', n) for n in previous['uids']),
current['requires_ex']
)
current['requires_ex'] = [{'name': 'test_task_start', 'node_id': '0'}]
current['requires_ex'] = [('test_task_start', '0')]
self.processor._link_tasks(previous, current)
self.assertItemsEqual(
(
{'name': 'test_task_start', 'node_id': n}
for n in ['0'] + previous['uids']
),
(('test_task_start', n) for n in ['0'] + previous['uids']),
current['requires_ex']
)