Support feature advanced-config in task based deployment

The Advanced Config  allows to re-execute some tasks
in case of openstack configs was modified.
The flag 'refresh_on' of task uses to filter tasks
those should be reexecuted on modifying config.
Also added support for flag '*' that means reexecute
task in case if any of configs was modified.

Implements: blueprint enable-task-based-deployment
Change-Id: I0b3822c1c35d3dad6e7c3df4bb4a200bd63268ed
Depends-On: I1adce292a87b7b5ec673b2df72fc8b983170f95c
This commit is contained in:
Bulat Gaifullin 2016-02-17 20:02:44 +03:00
parent 78bf2973a5
commit 1d9b5452d5
7 changed files with 203 additions and 77 deletions

View File

@ -85,3 +85,7 @@ class Task(Base):
self.subtasks.append(task)
db().flush()
return task
def is_completed(self):
return self.status == consts.TASK_STATUSES.error or \
self.status == consts.TASK_STATUSES.ready

View File

@ -119,7 +119,8 @@ default_messages = {
"UnknownError": "Unknown error",
"UnresolvableConflict": "Unresolvable conflict",
"NodeNotBelongToCluster": "The Node doesn't belong to the Cluster",
"TaskBaseDeploymentNotAllowed": "The task-based deployment is not allowed"
"TaskBaseDeploymentNotAllowed": "The task-based deployment is not allowed",
"NoChanges": "There is no changes to apply."
}

View File

@ -184,3 +184,13 @@
puppet_manifest: /etc/puppet/modules/osnailyfacter/modular/globals/globals.pp
puppet_modules: /etc/puppet/modules
timeout: 3600
- id: upload_configuration
type: upload_file
version: 2.0.0
role: '*'
requires: [pre_deployment_start]
required_for: [pre_deployment_end]
parameters:
timeout: 180
refresh_on: ['*']

View File

@ -62,28 +62,29 @@ class TaskManager(object):
TaskHelper.update_action_log(task, al)
return to_return
except errors.NoChanges as e:
self._finish_task(task, al, consts.TASK_STATUSES.ready, str(e))
except Exception as exc:
err = str(exc)
if any([
not hasattr(exc, "log_traceback"),
hasattr(exc, "log_traceback") and exc.log_traceback
]):
logger.error(traceback.format_exc())
self._finish_task(task, al, consts.TASK_STATUSES.error, str(exc))
# update task entity with given data
data = {'status': 'error',
'progress': 100,
'message': err}
objects.Task.update(task, data)
# NOTE(romcheg): Flushing the data is required to unlock
# tasks in order to temporary fix issues with
# the deadlock detection query in tests and let the tests pass.
# TODO(akislitsky): Get rid of this flush as soon as
# task locking issues are resolved.
db().flush()
TaskHelper.update_action_log(task, al)
def _finish_task(self, task, log_item, status, message):
data = {'status': status, 'progress': 100, 'message': message}
# update task entity with given data
objects.Task.update(task, data)
# NOTE(romcheg): Flushing the data is required to unlock
# tasks in order to temporary fix issues with
# the deadlock detection query in tests and let the tests pass.
# TODO(akislitsky): Get rid of this flush as soon as
# task locking issues are resolved.
db().flush()
TaskHelper.update_action_log(task, log_item)
db().commit()
db().commit()
def check_running_task(self, task_name):
current_tasks = db().query(Task).filter_by(
@ -1304,6 +1305,10 @@ class OpenstackConfigTaskManager(TaskManager):
fail_if_not_found=True,
lock_for_update=True
)
if task.is_completed():
return task
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_update)

View File

@ -100,7 +100,41 @@ def fake_cast(queue, messages, **kwargs):
make_thread_task_in_orchestrator(messages)
class DeploymentTask(object):
class BaseDeploymentTask(object):
@classmethod
def _get_deployment_method(cls, cluster, ignore_task_deploy=False):
"""Get deployment method name based on cluster version
:param cluster: Cluster db object
:param ignore_task_deploy: do not check that task deploy enabled
:returns: string - deploy/granular_deploy
"""
if not ignore_task_deploy and \
objects.Cluster.is_task_deploy_enabled(cluster):
return "task_deploy"
if objects.Release.is_granular_enabled(cluster.release):
return 'granular_deploy'
return 'deploy'
@classmethod
def call_deployment_method(cls, task, *args, **kwargs):
"""Calls the deployment method with fallback.
:param task: the Task object instance
:param args: the positional arguments
:param kwargs: the keyword arguments
"""
for flag in (False, True):
try:
method = cls._get_deployment_method(task.cluster, flag)
message = getattr(cls, method)(task, *args, **kwargs)
return method, message
except errors.TaskBaseDeploymentNotAllowed:
logger.warning("Task deploy is not allowed, "
"fallback to granular deploy.")
class DeploymentTask(BaseDeploymentTask):
"""Task for applying changes to cluster
LOGIC
@ -138,21 +172,6 @@ class DeploymentTask(object):
those which are prepared for removal.
"""
@classmethod
def _get_deployment_method(cls, cluster, ignore_task_deploy=False):
"""Get deployment method name based on cluster version
:param cluster: Cluster db object
:param ignore_task_deploy: do not check that task deploy enabled
:returns: string - deploy/granular_deploy
"""
if not ignore_task_deploy and \
objects.Cluster.is_task_deploy_enabled(cluster):
return "task_deploy"
if objects.Release.is_granular_enabled(cluster.release):
return 'granular_deploy'
return 'deploy'
@classmethod
def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
reexecutable_filter=None):
@ -184,18 +203,9 @@ class DeploymentTask(object):
n.progress = 0
db().flush()
deployment_mode = cls._get_deployment_method(task.cluster)
while True:
try:
message = getattr(cls, deployment_mode)(
task, nodes, affected_nodes, task_ids, reexecutable_filter
)
break
except errors.TaskBaseDeploymentNotAllowed:
deployment_mode = cls._get_deployment_method(
task.cluster, True
)
logger.warning("fallback to %s deploy.", deployment_mode)
deployment_mode, message = cls.call_deployment_method(
task, nodes, affected_nodes, task_ids, reexecutable_filter
)
# After serialization set pending_addition to False
for node in nodes:
@ -1846,39 +1856,54 @@ class UpdateDnsmasqTask(object):
)
class UpdateOpenstackConfigTask(object):
class UpdateOpenstackConfigTask(BaseDeploymentTask):
@staticmethod
def task_deploy(task, nodes, update_configs):
tasks = objects.Cluster.get_deployment_tasks(task.cluster)
events = task_based_deployment.TaskEvents(
'refresh_on', update_configs
)
directory, graph = task_based_deployment.TasksSerializer.serialize(
task.cluster, [], tasks, nodes, events=events
)
return make_astute_message(
task, "task_deploy", "update_config_resp", {
"tasks_directory": directory,
"tasks_graph": graph
}
)
@staticmethod
def granular_deploy(task, nodes, update_configs):
refreshable_tasks = objects.Cluster.get_refreshable_tasks(
task.cluster, update_configs
)
orchestrator_graph = deployment_graph.AstuteGraph(task.cluster)
task_ids = [t['id'] for t in refreshable_tasks]
orchestrator_graph.only_tasks(task_ids)
deployment_tasks = orchestrator_graph.stage_tasks_serialize(
orchestrator_graph.graph.topology, nodes
)
return make_astute_message(
task, 'execute_tasks', 'update_config_resp', {
'tasks': deployment_tasks,
})
@classmethod
def message(cls, task, cluster, nodes):
configs = objects.OpenstackConfigCollection.find_configs_for_nodes(
cluster, nodes)
refresh_on = set()
updated_configs = set()
for config in configs:
refresh_on.update(config.configuration)
updated_configs.update(config.configuration)
refreshable_tasks = objects.Cluster.get_refreshable_tasks(
cluster, refresh_on)
if updated_configs:
updated_configs.add('*') # '*' means any config
else:
raise errors.NoChanges()
upload_serializer = tasks_serializer.UploadConfiguration(
task, task.cluster, nodes, configs)
tasks_to_execute = list(upload_serializer.serialize())
if refreshable_tasks:
orchestrator_graph = deployment_graph.AstuteGraph(task.cluster)
task_ids = [t['id'] for t in refreshable_tasks]
orchestrator_graph.only_tasks(task_ids)
deployment_tasks = orchestrator_graph.stage_tasks_serialize(
orchestrator_graph.graph.topology, nodes)
tasks_to_execute.extend(deployment_tasks)
rpc_message = make_astute_message(
task, 'execute_tasks', 'update_config_resp', {
'tasks': tasks_to_execute,
})
return rpc_message
return cls.call_deployment_method(task, nodes, updated_configs)[1]
if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP:

View File

@ -78,9 +78,6 @@ class TestTaskManagers(BaseIntegrationTest):
provision_task = filter(
lambda t: t.name == TASK_NAMES.provision, supertask.subtasks)[0]
self.assertEqual(provision_task.weight, 0.4)
wait_nodes = [self.env.nodes[0]]
self.env.wait_for_nodes_status(wait_nodes, NODE_STATUSES.provisioning)
self.env.wait_ready(
supertask,
60,
@ -99,7 +96,6 @@ class TestTaskManagers(BaseIntegrationTest):
u"Deployment of environment '{0}' is done.".format(cluster_name),
supertask.message
)
self.env.wait_for_nodes_status(wait_nodes, NODE_STATUSES.ready)
self.env.refresh_nodes()
for n in filter(
lambda n: n.cluster_id == cluster['id'],

View File

@ -29,14 +29,25 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest):
def setUp(self):
super(TestOpenstackConfigHandlers, self).setUp()
release_kwargs = {
'version': 'liberty-9.0',
'operating_system': consts.RELEASE_OS.ubuntu,
}
cluster_kwargs = {'net_provider': 'neutron'}
self.env.create_cluster(api=False,
status=consts.CLUSTER_STATUSES.operational)
status=consts.CLUSTER_STATUSES.operational,
release_kwargs=release_kwargs,
cluster_kwargs=cluster_kwargs)
self.env.create_cluster(api=False,
status=consts.CLUSTER_STATUSES.operational)
status=consts.CLUSTER_STATUSES.operational,
release_kwargs=release_kwargs,
cluster_kwargs=cluster_kwargs)
self.clusters = self.env.clusters
self.nodes = self.env.create_nodes(
3, cluster_id=self.clusters[0].id,
roles=["compute"],
status=consts.NODE_STATUSES.ready)
self.env.create_openstack_config(
@ -214,20 +225,94 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest):
expect_errors=True)
self.assertEqual(resp.status_code, 405)
@mock.patch('nailgun.task.task.rpc.cast')
def test_openstack_config_execute(self, _):
@mock.patch('objects.Cluster.get_deployment_tasks')
def execute_update_open_stack_config(self, tasks_mock):
tasks_mock.return_value = [{
'id': 'upload_configuration',
'type': 'upload_file',
'version': '2.0.0',
'role': '*',
'parameters': {
'timeout': 180,
},
'refresh_on': ['*']
}]
data = {'cluster_id': self.clusters[0].id}
resp = self.app.put(
reverse('OpenstackConfigExecuteHandler'),
jsonutils.dumps(data), headers=self.default_headers
)
return resp
@mock.patch('nailgun.task.task.rpc.cast')
def test_openstack_config_execute_with_granular_deploy(self, mock_rpc):
self.env.disable_task_deploy(self.clusters[0])
resp = self.execute_update_open_stack_config()
self.assertEqual(resp.status_code, 202)
message = mock_rpc.call_args_list[0][0][1]
self.assertEqual('execute_tasks', message['method'])
self.assertEqual('update_config_resp', message['respond_to'])
# there is no task deduplication in granular deployment
# and some of tasks can be included
# to result list more than 1 times
self.assertItemsEqual(
((n.uid, 'upload_file') for n in self.clusters[0].nodes),
{(t['uids'][0], t['type']) for t in message['args']['tasks']}
)
node_1_upload_config = (
t['parameters']['data'] for t in message['args']['tasks']
if self.nodes[1].uid in t['uids']
)
self.assertItemsEqual(
[
'configuration: {}\n',
'configuration: {nova_config: value_1_1}\n'
],
node_1_upload_config
)
@mock.patch('nailgun.task.task.rpc.cast')
def test_openstack_config_execute_with_task_deploy(self, mock_rpc):
resp = self.execute_update_open_stack_config()
self.assertEqual(resp.status_code, 202)
message = mock_rpc.call_args_list[0][0][1]
self.assertEqual('task_deploy', message['method'])
self.assertEqual('update_config_resp', message['respond_to'])
tasks_graph = message['args']['tasks_graph']
tasks_directory = message['args']['tasks_directory']
nodes = [n.uid for n in self.clusters[0].nodes]
nodes.append(None)
self.assertItemsEqual(nodes, tasks_graph)
self.assertEqual(
'upload_file',
tasks_graph[nodes[0]][0]['type']
)
node_1_upload_config = (
tasks_directory[t['id']]['parameters']['data']
for t in tasks_graph[nodes[1]]
)
self.assertItemsEqual(
[
'configuration: {}\n',
'configuration: {nova_config: value_1_1}\n'
],
node_1_upload_config
)
@mock.patch('objects.OpenstackConfigCollection.find_configs_for_nodes')
def test_openstack_config_completed_exit_if_no_changes(self, m_conf):
m_conf.return_value = []
resp = self.execute_update_open_stack_config()
self.assertEqual(200, resp.status_code)
self.assertEqual(consts.TASK_STATUSES.ready, resp.json_body['status'])
@mock.patch('nailgun.task.task.rpc.cast')
def test_openstack_config_execute_force(self, _):
# Turn node 2 into provisioned state
self.env.nodes[2].status = consts.NODE_STATUSES.provisioned
self.db.flush()
# need to persistent state in database because handler will revert
# all changes on error.
self.db.commit()
# Try to update OpenStack configuration for cluster
data = {'cluster_id': self.clusters[0].id}
resp = self.app.put(