Enable using new transaction manager for old handlers

DeployChanges will use the sequence 'deploy-changes' if it is available.
DeployNodes and ProvisionNodes will be use apropriate graphs if it is available.

Change-Id: Icb66826ce96c41f77d85d97990b4cc600aa5fe04
Closes-Bug: 1620620
This commit is contained in:
Bulat Gaifullin 2016-09-12 14:58:00 +03:00
parent 418afc5853
commit 17ca94d09f
7 changed files with 211 additions and 102 deletions

View File

@ -549,76 +549,6 @@ class DBSingletonHandler(BaseHandler):
return self.single.to_dict(instance) return self.single.to_dict(instance)
# TODO(enchantner): rewrite more handlers to inherit from this
# and move more common code here
class DeferredTaskHandler(BaseHandler):
"""Abstract Deferred Task Handler"""
validator = BaseDefferedTaskValidator
single = objects.Task
log_message = u"Starting deferred task on environment '{env_id}'"
log_error = u"Error during execution of deferred task " \
u"on environment '{env_id}': {error}"
task_manager = None
@classmethod
def get_options(cls):
return {}
@handle_errors
@validate
def PUT(self, cluster_id):
""":returns: JSONized Task object.
:http: * 202 (task successfully executed)
* 400 (invalid object data specified)
* 404 (environment is not found)
* 409 (task with such parameters already exists)
"""
cluster = self.get_object_or_404(
objects.Cluster,
cluster_id,
log_404=(
u"warning",
u"Error: there is no cluster "
u"with id '{0}' in DB.".format(cluster_id)
)
)
logger.info(self.log_message.format(env_id=cluster_id))
try:
options = self.get_options()
except ValueError as e:
raise self.http(400, six.text_type(e))
try:
self.validator.validate(cluster)
task_manager = self.task_manager(cluster_id=cluster.id)
task = task_manager.execute(**options)
except (
errors.AlreadyExists,
errors.StopAlreadyRunning
) as exc:
raise self.http(409, exc.message)
except (
errors.DeploymentNotRunning,
errors.NoDeploymentTasks,
errors.WrongNodeStatus,
errors.UnavailableRelease,
errors.CannotBeStopped,
) as exc:
raise self.http(400, exc.message)
except Exception as exc:
logger.error(
self.log_error.format(
env_id=cluster_id,
error=str(exc)
)
)
# let it be 500
raise
self.raise_task(task)
class OrchestratorDeploymentTasksHandler(SingleHandler): class OrchestratorDeploymentTasksHandler(SingleHandler):
"""Handler for deployment graph serialization.""" """Handler for deployment graph serialization."""
@ -720,3 +650,97 @@ class TransactionExecutorHandler(BaseHandler):
raise self.http(409, e.message) raise self.http(409, e.message)
except errors.InvalidData as e: except errors.InvalidData as e:
raise self.http(400, e.message) raise self.http(400, e.message)
# TODO(enchantner): rewrite more handlers to inherit from this
# and move more common code here, this is deprecated handler
class DeferredTaskHandler(TransactionExecutorHandler):
"""Abstract Deferred Task Handler"""
validator = BaseDefferedTaskValidator
single = objects.Task
log_message = u"Starting deferred task on environment '{env_id}'"
log_error = u"Error during execution of deferred task " \
u"on environment '{env_id}': {error}"
task_manager = None
@classmethod
def get_options(cls):
return {}
@classmethod
def get_transaction_options(cls, cluster, options):
"""Finds graph for this action."""
return None
@handle_errors
@validate
def PUT(self, cluster_id):
""":returns: JSONized Task object.
:http: * 202 (task successfully executed)
* 400 (invalid object data specified)
* 404 (environment is not found)
* 409 (task with such parameters already exists)
"""
cluster = self.get_object_or_404(
objects.Cluster,
cluster_id,
log_404=(
u"warning",
u"Error: there is no cluster "
u"with id '{0}' in DB.".format(cluster_id)
)
)
logger.info(self.log_message.format(env_id=cluster_id))
try:
options = self.get_options()
except ValueError as e:
raise self.http(400, six.text_type(e))
try:
self.validator.validate(cluster)
except errors.NailgunException as e:
raise self.http(400, e.message)
if objects.Release.is_lcm_supported(cluster.release):
# try to get new graph to run transaction manager
try:
transaction_options = self.get_transaction_options(
cluster, options
)
except errors.NailgunException as e:
logger.exception("Failed to get transaction options")
raise self.http(400, msg=six.text_type(e))
if transaction_options:
return self.start_transaction(cluster, transaction_options)
try:
task_manager = self.task_manager(cluster_id=cluster.id)
task = task_manager.execute(**options)
except (
errors.AlreadyExists,
errors.StopAlreadyRunning
) as exc:
raise self.http(409, exc.message)
except (
errors.DeploymentNotRunning,
errors.NoDeploymentTasks,
errors.WrongNodeStatus,
errors.UnavailableRelease,
errors.CannotBeStopped,
) as exc:
raise self.http(400, exc.message)
except Exception as exc:
logger.error(
self.log_error.format(
env_id=cluster_id,
error=str(exc)
)
)
# let it be 500
raise
self.raise_task(task)

View File

@ -130,6 +130,20 @@ class ClusterChangesHandler(DeferredTaskHandler):
task_manager = ApplyChangesTaskManager task_manager = ApplyChangesTaskManager
validator = ClusterChangesValidator validator = ClusterChangesValidator
@classmethod
def get_transaction_options(cls, cluster, options):
"""Find sequence 'default' to use for deploy-changes handler."""
sequence = objects.DeploymentSequence.get_by_name_for_release(
cluster.release, 'deploy-changes'
)
if sequence:
return {
'dry_run': options['dry_run'],
'noop_run': options['noop_run'],
'force': options['force'],
'graphs': sequence.graphs,
}
@classmethod @classmethod
def get_options(cls): def get_options(cls):
data = web.input(graph_type=None, dry_run="0", noop_run="0") data = web.input(graph_type=None, dry_run="0", noop_run="0")
@ -142,13 +156,11 @@ class ClusterChangesHandler(DeferredTaskHandler):
} }
class ClusterChangesForceRedeployHandler(DeferredTaskHandler): class ClusterChangesForceRedeployHandler(ClusterChangesHandler):
log_message = u"Trying to force deployment of the environment '{env_id}'" log_message = u"Trying to force deployment of the environment '{env_id}'"
log_error = u"Error during execution of a forced deployment task " \ log_error = u"Error during execution of a forced deployment task " \
u"on environment '{env_id}': {error}" u"on environment '{env_id}': {error}"
task_manager = ApplyChangesTaskManager
validator = ClusterChangesValidator
@classmethod @classmethod
def get_options(cls): def get_options(cls):

View File

@ -14,14 +14,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
import six import six
import web import web
from nailgun.api.v1.handlers.base import BaseHandler from nailgun.api.v1.handlers.base import BaseHandler
from nailgun.api.v1.handlers.base import handle_errors from nailgun.api.v1.handlers.base import handle_errors
from nailgun.api.v1.handlers.base import serialize from nailgun.api.v1.handlers.base import serialize
from nailgun.api.v1.handlers.base import TransactionExecutorHandler
from nailgun.api.v1.handlers.base import validate from nailgun.api.v1.handlers.base import validate
from nailgun.api.v1.validators.cluster import ProvisionSelectedNodesValidator from nailgun.api.v1.validators.cluster import ProvisionSelectedNodesValidator
from nailgun.api.v1.validators.node import DeploySelectedNodesValidator from nailgun.api.v1.validators.node import DeploySelectedNodesValidator
@ -252,21 +251,66 @@ class RunMixin(object):
return utils.parse_bool(web.input(noop_run='0').noop_run) return utils.parse_bool(web.input(noop_run='0').noop_run)
class SelectedNodesBase(NodesFilterMixin, BaseHandler): class SelectedNodesBase(NodesFilterMixin, TransactionExecutorHandler):
"""Base class for running task manager on selected nodes.""" """Base class for running task manager on selected nodes."""
graph_type = None
def get_transaction_options(self, cluster, options):
if not objects.Release.is_lcm_supported(cluster.release):
# this code is actual only for lcm
return
graph_type = options.get('graph_type') or self.graph_type
graph = graph_type and objects.Cluster.get_deployment_graph(
cluster, graph_type
)
if not graph or not graph['tasks']:
return
nodes_ids = self.get_param_as_set('nodes', default=None)
if nodes_ids is not None:
nodes = self.get_objects_list_or_404(
objects.NodeCollection, nodes_ids
)
nodes_ids = [n.id for n in nodes]
if graph:
return {
'noop_run': options.get('noop_run'),
'dry_run': options.get('dry_run'),
'force': options.get('force'),
'graphs': [{
'type': graph['type'],
'nodes': nodes_ids,
'tasks': options.get('deployment_tasks')
}]
}
def handle_task(self, cluster, **kwargs): def handle_task(self, cluster, **kwargs):
if objects.Release.is_lcm_supported(cluster.release):
# this code is actual only if cluster is LCM ready
try:
transaction_options = self.get_transaction_options(
cluster, kwargs
)
except errors.NailgunException as e:
logger.exception("Failed to get transaction options.")
raise self.http(400, six.text_type(e))
if transaction_options:
return self.start_transaction(cluster, transaction_options)
nodes = self.get_nodes(cluster) nodes = self.get_nodes(cluster)
try: try:
task_manager = self.task_manager( task_manager = self.task_manager(cluster_id=cluster.id)
cluster_id=cluster.id)
task = task_manager.execute(nodes, **kwargs) task = task_manager.execute(nodes, **kwargs)
except Exception as exc: except Exception as exc:
logger.warn( logger.exception(
u'Cannot execute %s task nodes: %s', u'Cannot execute %s task nodes: %s',
task_manager.__class__.__name__, traceback.format_exc()) self.task_manager.__name__, ','.join(n.uid for n in nodes)
)
raise self.http(400, msg=six.text_type(exc)) raise self.http(400, msg=six.text_type(exc))
self.raise_task(task) self.raise_task(task)
@ -291,6 +335,8 @@ class ProvisionSelectedNodes(SelectedNodesBase):
validator = ProvisionSelectedNodesValidator validator = ProvisionSelectedNodesValidator
task_manager = manager.ProvisioningTaskManager task_manager = manager.ProvisioningTaskManager
graph_type = 'provision'
def get_default_nodes(self, cluster): def get_default_nodes(self, cluster):
return TaskHelper.nodes_to_provision(cluster) return TaskHelper.nodes_to_provision(cluster)
@ -318,6 +364,8 @@ class BaseDeploySelectedNodes(SelectedNodesBase):
validator = DeploySelectedNodesValidator validator = DeploySelectedNodesValidator
task_manager = manager.DeploymentTaskManager task_manager = manager.DeploymentTaskManager
graph_type = consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE
def get_default_nodes(self, cluster): def get_default_nodes(self, cluster):
return TaskHelper.nodes_to_deploy(cluster) return TaskHelper.nodes_to_deploy(cluster)

View File

@ -33,6 +33,7 @@ from nailgun.db.sqlalchemy.models import Node
from nailgun import objects from nailgun import objects
from nailgun.rpc.receiver import NailgunReceiver from nailgun.rpc.receiver import NailgunReceiver
from nailgun.settings import settings from nailgun.settings import settings
from nailgun.utils import get_in
def _is_slave(node): def _is_slave(node):
@ -324,6 +325,26 @@ class FakeAmpqThread(FakeThread):
class FakeDeploymentThread(FakeAmpqThread): class FakeDeploymentThread(FakeAmpqThread):
def inject_node_status_transition(self, kwargs):
if kwargs['status'] == consts.TASK_STATUSES.ready:
selector = 'successful'
elif kwargs['status'] == consts.TASK_STATUSES.error:
selector = 'failed'
elif kwargs['status'] == consts.TASK_STATUSES.stopped:
selector = 'stopped'
else:
return
node_statuses_transition = get_in(
self.data['args'],
'tasks_metadata', 'node_statuses_transitions', selector
)
if node_statuses_transition:
kwargs['nodes'] = [
dict(uid=uid, **node_statuses_transition)
for uid in self.data['args']['tasks_graph']
]
def message_gen(self): def message_gen(self):
# TEST: we can fail only in deployment stage here: # TEST: we can fail only in deployment stage here:
error = self.params.get("error") error = self.params.get("error")
@ -398,6 +419,7 @@ class FakeDeploymentThread(FakeAmpqThread):
if error_msg: if error_msg:
kwargs['error'] = error_msg kwargs['error'] = error_msg
self.inject_node_status_transition(kwargs)
yield kwargs yield kwargs

View File

@ -311,7 +311,7 @@ class TestSelectedNodesAction(BaseSelectedNodesTest):
make_query(nodes=[n.uid for n in controller_nodes], dry_run='1') make_query(nodes=[n.uid for n in controller_nodes], dry_run='1')
self.send_put(deploy_action_url) self.send_put(deploy_action_url)
self.assertTrue(mcast.call_args[0][1]['args']['dry_run']) self.assertTrue(mcast.call_args[0][1][0]['args']['dry_run'])
@mock_rpc(pass_mock=True) @mock_rpc(pass_mock=True)
@patch("objects.Release.is_lcm_supported") @patch("objects.Release.is_lcm_supported")
@ -330,7 +330,7 @@ class TestSelectedNodesAction(BaseSelectedNodesTest):
make_query(nodes=[n.uid for n in controller_nodes], noop_run='1') make_query(nodes=[n.uid for n in controller_nodes], noop_run='1')
self.send_put(deploy_action_url) self.send_put(deploy_action_url)
self.assertTrue(mcast.call_args[0][1]['args']['noop_run']) self.assertTrue(mcast.call_args[0][1][0]['args']['noop_run'])
@mock_rpc(pass_mock=True) @mock_rpc(pass_mock=True)
@patch('nailgun.task.task.rpc.cast') @patch('nailgun.task.task.rpc.cast')

View File

@ -496,7 +496,7 @@ class TestTaskDeploy90AfterDeployment(BaseIntegrationTest):
uuid=resp.json_body['uuid'], fail_if_not_found=True uuid=resp.json_body['uuid'], fail_if_not_found=True
).status) ).status)
graph = rpc_cast.call_args[0][1]['args']['tasks_graph'] graph = rpc_cast.call_args[0][1][0]['args']['tasks_graph']
# LCM: no changes - no tasks # LCM: no changes - no tasks
self.assertItemsEqual( self.assertItemsEqual(
@ -523,7 +523,7 @@ class TestTaskDeploy90AfterDeployment(BaseIntegrationTest):
uuid=resp.json_body['uuid'], fail_if_not_found=True uuid=resp.json_body['uuid'], fail_if_not_found=True
).status) ).status)
graph = rpc_cast.call_args[0][1]['args']['tasks_graph'] graph = rpc_cast.call_args[0][1][0]['args']['tasks_graph']
# due to 'force', task must be run anyway # due to 'force', task must be run anyway
self.assertItemsEqual( self.assertItemsEqual(

View File

@ -63,7 +63,7 @@ class TestTaskManagers(BaseIntegrationTest):
{'status': consts.HISTORY_TASK_STATUSES.ready}) {'status': consts.HISTORY_TASK_STATUSES.ready})
def set_tasks_ready(self): def set_tasks_ready(self):
objects.TaskCollection.all().update( objects.TransactionCollection.all().update(
{'status': consts.TASK_STATUSES.ready}) {'status': consts.TASK_STATUSES.ready})
@fake_tasks(override_state={"progress": 100, "status": "ready"}) @fake_tasks(override_state={"progress": 100, "status": "ready"})
@ -1366,7 +1366,7 @@ class TestTaskManagers(BaseIntegrationTest):
self.db.refresh(node3) self.db.refresh(node3)
self.assertEqual(consts.NODE_STATUSES.discover, node3.status) self.assertEqual(consts.NODE_STATUSES.discover, node3.status)
self.assertTrue(node3.pending_addition) self.assertTrue(node3.pending_addition)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertItemsEqual( self.assertItemsEqual(
[consts.MASTER_NODE_UID, None] + nodes_uids, tasks_graph [consts.MASTER_NODE_UID, None] + nodes_uids, tasks_graph
) )
@ -1405,7 +1405,7 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertNotEqual(consts.TASK_STATUSES.error, task.status) self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
for task in tasks_graph['master']: for task in tasks_graph['master']:
if task['id'] in task_ids: if task['id'] in task_ids:
self.assertEqual(task['type'], 'puppet') self.assertEqual(task['type'], 'puppet')
@ -1431,7 +1431,7 @@ class TestTaskManagers(BaseIntegrationTest):
headers=self.default_headers headers=self.default_headers
) )
self.assertIn(resp.status_code, [200, 202]) self.assertIn(resp.status_code, [200, 202])
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
# check that all nodes present in message # check that all nodes present in message
self.assertItemsEqual( self.assertItemsEqual(
[n.uid for n in cluster.nodes] + [consts.MASTER_NODE_UID, None], [n.uid for n in cluster.nodes] + [consts.MASTER_NODE_UID, None],
@ -1439,11 +1439,9 @@ class TestTaskManagers(BaseIntegrationTest):
) )
@mock.patch('nailgun.task.task.rpc.cast') @mock.patch('nailgun.task.task.rpc.cast')
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks') @mock.patch('nailgun.objects.Cluster.get_deployment_graph')
@mock.patch('nailgun.objects.TransactionCollection'
'.get_successful_transactions_per_task')
def check_correct_state_calculation(self, node_status, is_skip_expected, def check_correct_state_calculation(self, node_status, is_skip_expected,
state_mock, tasks_mock, rpc_mock): get_graph_mock, rpc_mock):
cluster = self.env.create( cluster = self.env.create(
nodes_kwargs=[{'roles': ['controller'], nodes_kwargs=[{'roles': ['controller'],
'status': consts.NODE_STATUSES.ready}], 'status': consts.NODE_STATUSES.ready}],
@ -1456,28 +1454,33 @@ class TestTaskManagers(BaseIntegrationTest):
task = { task = {
'parameters': {}, 'type': 'puppet', 'parameters': {}, 'type': 'puppet',
'roles': ['primary-controller'], 'version': '2.1.0', 'roles': ['/.*/'], 'version': '2.1.0',
'condition': {'yaql_exp': 'changed($.uid)'}, 'condition': {'yaql_exp': 'changed($.uid)'},
} }
tasks_mock.return_value = [ get_graph_mock.return_value = {
dict(task, id='test1'), dict(task, id='test2') 'type': 'custom',
] 'tasks': [dict(task, id='test1'), dict(task, id='test2')]
state_mock.return_value = [] }
# reset progress
node.progress = 0
# deploy cluster at first time and create history # deploy cluster at first time and create history
supertask = self.env.launch_deployment_selected([node.uid], cluster.id) supertask = self.env.launch_deployment_selected([node.uid], cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status) self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
self.set_history_ready() self.set_history_ready()
self.set_tasks_ready() self.set_tasks_ready()
# mark test2 as skipped to ensure that it will run in next deploy
objects.DeploymentHistoryCollection.filter_by(
None, deployment_graph_task_name='test2'
).update({'status': consts.HISTORY_TASK_STATUSES.skipped})
node.status = node_status node.status = node_status
node.progress = 0
state_mock.return_value = [(supertask, node.uid, 'test1')]
task = self.env.launch_deployment_selected([node.uid], cluster.id) task = self.env.launch_deployment_selected([node.uid], cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status) self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph'] tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
for task in tasks_graph[node.uid]: for task in tasks_graph[node.uid]:
if task['id'] == 'test1': if task['id'] == 'test1':
@ -1491,7 +1494,7 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertNotEqual( self.assertNotEqual(
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped) task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
else: else:
self.fail('Unexpected task in graph') self.fail('Unexpected task in graph {0}'.format(task['id']))
def test_correct_state_calculation(self): def test_correct_state_calculation(self):
self.check_correct_state_calculation( self.check_correct_state_calculation(