Merge "Moved filtration of tasks into TaskSerializer"

This commit is contained in:
Jenkins 2016-01-19 10:52:27 +00:00 committed by Gerrit Code Review
commit bced4668fa
4 changed files with 144 additions and 35 deletions

View File

@ -373,28 +373,33 @@ class TaskProcessor(object):
class TasksSerializer(object):
"""The deploy tasks serializer."""
def __init__(self, cluster, nodes):
def __init__(self, cluster, nodes, task_ids=None):
"""Initializes.
:param cluster: Cluster instance
:param nodes: the sequence of nodes for deploy
:param task_ids: Only specified tasks will be executed,
If None, all tasks will be executed
"""
self.cluster = cluster
self.role_resolver = RoleResolver(nodes)
self.task_serializer = DeployTaskSerializer()
self.task_processor = TaskProcessor()
self.tasks_per_node = collections.defaultdict(dict)
self.task_filter = self.make_task_filter(task_ids)
@classmethod
def serialize(cls, cluster, nodes, tasks):
def serialize(cls, cluster, nodes, tasks, task_ids=None):
"""Resolves roles and dependencies for tasks.
:param cluster: the cluster instance
:param nodes: the list of nodes
:param tasks: the list of tasks
:param task_ids: Only specified tasks will be executed,
If None, all tasks will be executed
:return: the list of serialized task per node
"""
serializer = cls(cluster, nodes)
serializer = cls(cluster, nodes, task_ids)
serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes)
serializer.resolve_dependencies()
return dict(
@ -411,29 +416,29 @@ class TasksSerializer(object):
"""
tasks_mapping = dict()
tasks_groups = collections.defaultdict(set)
groups = list()
for task in tasks:
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.group:
tasks_for_role = task.get('tasks')
if tasks_for_role:
tasks_groups[tuple(task.get('role', ()))].update(
tasks_for_role
)
continue
tasks_mapping[task['id']] = task
self.process_task(task, nodes, lambda _: self.role_resolver)
groups.append(task)
else:
tasks_mapping[task['id']] = task
self.process_task(
task, nodes, lambda _: self.role_resolver,
skip=not self.task_filter(task['id'])
)
self.expand_task_groups(tasks_groups, tasks_mapping)
self.expand_task_groups(groups, tasks_mapping)
# make sure that null node is present
self.tasks_per_node.setdefault(None, dict())
def process_task(self, task, nodes, resolver_factory):
def process_task(self, task, nodes, resolver_factory, skip=False):
"""Processes one task one nodes of cluster.
:param task: the task instance
:param nodes: the list of nodes
:param resolver_factory: the factory creates role-resolver
:param skip: make the task as skipped
"""
serializer_factory = self.task_serializer.get_stage_serializer(
@ -443,11 +448,12 @@ class TasksSerializer(object):
task, self.cluster, nodes, role_resolver=resolver_factory(nodes)
)
# do not pass skipped attribute to astute
skipped = task.pop('skipped', False) or \
skipped = skip or task.pop('skipped', False) or \
not task_serializer.should_execute()
for astute_task in self.task_processor.process_tasks(
task, task_serializer.serialize()):
# all skipped task shall have type skipped
# do not exclude them from graph to keep connections between nodes
if skipped:
astute_task['type'] = \
consts.ORCHESTRATOR_TASK_TYPES.skipped
@ -455,9 +461,10 @@ class TasksSerializer(object):
for node_id in astute_task.pop('uids', ()):
node_tasks = self.tasks_per_node[node_id]
# de-duplication the tasks on node
if astute_task['id'] in node_tasks:
continue
node_tasks[astute_task['id']] = copy.deepcopy(astute_task)
# since task can be added after expand group need to
# overwrite if existed task is skipped and new is not skipped.
if self.need_update_task(node_tasks, astute_task):
node_tasks[astute_task['id']] = copy.deepcopy(astute_task)
def resolve_dependencies(self):
"""Resolves tasks dependencies."""
@ -488,23 +495,29 @@ class TasksSerializer(object):
task['requires'].extend(task.pop('requires_ex', ()))
task['required_for'].extend(task.pop('required_for_ex', ()))
def expand_task_groups(self, tasks_per_role, task_mapping):
def expand_task_groups(self, groups, task_mapping):
"""Expand group of tasks.
:param tasks_per_role: the set of tasks per role
:param groups: the all tasks with type 'group'
:param task_mapping: the mapping task id to task object
"""
for roles, task_ids in six.iteritems(tasks_per_role):
for task_id in task_ids:
for task in groups:
skipped = not self.task_filter(task['id'])
node_ids = self.role_resolver.resolve(task.get('role', ()))
for sub_task_id in task.get('tasks', ()):
try:
task = task_mapping[task_id]
sub_task = task_mapping[sub_task_id]
except KeyError:
raise errors.InvalidData(
'Task %s cannot be resolved', task_id
'Task %s cannot be resolved', sub_task_id
)
for node_id in self.role_resolver.resolve(roles):
self.process_task(task, [node_id], NullResolver)
# if group is not excluded, all task should be run as well
# otherwise check each task individually
self.process_task(
sub_task, node_ids, NullResolver,
skip=skipped and not self.task_filter(sub_task_id)
)
def expand_dependencies(self, node_id, dependencies, is_required_for):
"""Expands task dependencies on same node.
@ -591,3 +604,36 @@ class TasksSerializer(object):
"no candidates in nodes '%s'.",
name, ", ".join(six.moves.map(str, node_ids))
)
@classmethod
def need_update_task(cls, tasks, task):
"""Checks that task shall overwrite existed one or should be added.
:param tasks: the current node tasks
:param task: the astute task object
:return True if task is not present or must be overwritten
otherwise False
"""
existed_task = tasks.get(task['id'])
if existed_task is None:
return True
if existed_task['type'] == task['type']:
return False
return task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped
@classmethod
def make_task_filter(cls, task_ids):
"""Makes task filter according to specified ids.
:param task_ids: the selected ids of tasks
:return: function that check task
"""
if task_ids is None:
return lambda _: True
if not isinstance(task_ids, set):
task_ids = set(task_ids)
return lambda task_id: task_id in task_ids

View File

@ -232,17 +232,11 @@ class DeploymentTask(object):
@classmethod
def task_deploy(cls, task, nodes, task_ids, reexecutable_filter):
deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster)
if task_ids:
task_ids = set(task_ids)
deployment_tasks = (
t for t in deployment_tasks if t['id'] in task_ids
)
serialized_cluster = deployment_serializers.serialize(
None, task.cluster, nodes
)
serialized_tasks = task_based_deployment.TasksSerializer.serialize(
task.cluster, nodes, deployment_tasks
task.cluster, nodes, deployment_tasks, task_ids
)
return {
"deployment_info": serialized_cluster,

View File

@ -174,5 +174,6 @@ class TestTaskDeploy(BaseIntegrationTest):
deploy_tasks = rpc_cast.call_args[0][1]['args']['deployment_tasks']
self.assertItemsEqual(
["deploy_legacy"],
(task["id"] for task in deploy_tasks[compute.uid])
(task["id"] for task in deploy_tasks[compute.uid]
if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped)
)

View File

@ -87,6 +87,55 @@ class TestTaskSerializers(BaseTestCase):
self.env.clusters[-1], self.env.nodes, tasks
)
def _check_run_selected_tasks(self, ids, controller_tasks, compute_tasks):
tasks = [
{
"id": "task1", "tasks": ["task2", "task3"], "type": "group",
"version": "2.0.0", "role": ["controller"]
},
{
"id": "task2", "role": ["controller"],
"type": "puppet", "version": "2.0.0", "parameters": {}
},
{
"id": "task3", "role": ["compute"],
"type": "puppet", "version": "2.0.0", "parameters": {}
},
]
serialized = self.serializer.serialize(
self.env.clusters[-1], self.env.nodes, tasks, ids
)
controllers = [
n.uid for n in self.env.nodes if "controller" in n.roles
]
computes = [
n.uid for n in self.env.nodes if "compute" in n.roles
]
self.assertEqual(1, len(controllers))
self.assertEqual(1, len(computes))
self.assertItemsEqual(
["task2", "task3"],
(x["id"] for x in serialized[controllers[0]])
)
self.assertItemsEqual(
["task3"],
(x["id"] for x in serialized[computes[0]])
)
for expected_tasks, node in ((controller_tasks, controllers[0]),
(compute_tasks, computes[0])):
self.assertItemsEqual(
expected_tasks,
(x["id"] for x in serialized[node]
if x["type"] != consts.ORCHESTRATOR_TASK_TYPES.skipped)
)
def test_process_with_selected_group_id(self):
self._check_run_selected_tasks(["task1"], ["task2", "task3"], [])
def test_process_with_selected_task_id(self):
self._check_run_selected_tasks(["task3"], ["task3"], ["task3"])
def test_serialize_success_if_all_applicable_task_has_version_2(self):
tasks = [
{
@ -170,7 +219,10 @@ class TestTaskSerializers(BaseTestCase):
with mock.patch.object(self.serializer, 'role_resolver') as m_resolve:
m_resolve.resolve.return_value = node_ids
self.serializer.expand_task_groups(
{'role': ['task1', 'task2']},
[
{"type": "group", "id": "group1", "role": "compute",
"tasks": ["task1", "task2"]}
],
{
'task1': {'id': 'task1', 'version': '2.0.0',
'type': 'skipped', 'role': '*'},
@ -320,6 +372,22 @@ class TestTaskSerializers(BaseTestCase):
"not_exists", "1, 2, 3"
)
def test_need_update_task(self):
self.assertTrue(self.serializer.need_update_task(
{}, {"id": "task1", "type": "puppet"}
))
self.assertTrue(self.serializer.need_update_task(
{"task1": {"type": "skipped"}}, {"id": "task1", "type": "puppet"}
))
self.assertFalse(self.serializer.need_update_task(
{"task1": {"type": "skipped"}}, {"id": "task1", "type": "skipped"}
))
self.assertFalse(self.serializer.need_update_task(
{"task1": {"type": "puppet"}}, {"id": "task1", "type": "skipped"}
))
class TestNoopSerializer(BaseTestCase):
def setUp(self):