Merge "Allow a user to run dry-run deployment"

This commit is contained in:
Jenkins 2016-05-31 10:28:17 +00:00 committed by Gerrit Code Review
commit 8a6a0dcb61
21 changed files with 490 additions and 160 deletions

View File

@ -121,10 +121,12 @@ class ClusterChangesHandler(DeferredTaskHandler):
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
data = web.input(graph_type=None, dry_run="0")
return {
'graph_type': data.graph_type,
'force': False
'force': False,
'dry_run': utils.parse_bool(data.dry_run),
}
@ -138,10 +140,11 @@ class ClusterChangesForceRedeployHandler(DeferredTaskHandler):
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
data = web.input(graph_type=None, dry_run="0")
return {
'graph_type': data.graph_type,
'force': True
'force': True,
'dry_run': utils.parse_bool(data.dry_run)
}

View File

@ -33,6 +33,7 @@ from nailgun.logger import logger
from nailgun import consts
from nailgun import errors
from nailgun import objects
from nailgun import utils
from nailgun.orchestrator import deployment_serializers
from nailgun.orchestrator import graph_visualization
@ -44,7 +45,6 @@ from nailgun.orchestrator import task_based_deployment
from nailgun.task.helpers import TaskHelper
from nailgun.task import manager
from nailgun.task import task
from nailgun import utils
class NodesFilterMixin(object):
@ -209,6 +209,13 @@ class DeploymentInfo(OrchestratorInfo):
return objects.Cluster.replace_deployment_info(cluster, data)
class DryRunMixin(object):
"""Provides dry_run parameters."""
def get_dry_run(self):
return utils.parse_bool(web.input(dry_run='0').dry_run)
class SelectedNodesBase(NodesFilterMixin, BaseHandler):
"""Base class for running task manager on selected nodes."""
@ -217,7 +224,8 @@ class SelectedNodesBase(NodesFilterMixin, BaseHandler):
nodes = self.get_nodes(cluster)
try:
task_manager = self.task_manager(cluster_id=cluster.id)
task_manager = self.task_manager(
cluster_id=cluster.id)
task = task_manager.execute(nodes, **kwargs)
except Exception as exc:
logger.warn(
@ -292,7 +300,7 @@ class BaseDeploySelectedNodes(SelectedNodesBase):
graph_type=graph_type)
class DeploySelectedNodes(BaseDeploySelectedNodes):
class DeploySelectedNodes(BaseDeploySelectedNodes, DryRunMixin):
"""Handler for deployment selected nodes."""
@content
@ -305,10 +313,14 @@ class DeploySelectedNodes(BaseDeploySelectedNodes):
* 404 (cluster or nodes not found in db)
"""
cluster = self.get_object_or_404(objects.Cluster, cluster_id)
return self.handle_task(cluster, graph_type=self.get_graph_type())
return self.handle_task(
cluster=cluster,
graph_type=self.get_graph_type(),
dry_run=self.get_dry_run()
)
class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes):
class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes, DryRunMixin):
validator = NodeDeploymentValidator
@ -332,7 +344,9 @@ class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes):
cluster,
deployment_tasks=data,
graph_type=self.get_graph_type(),
force=force)
force=force,
dry_run=self.get_dry_run()
)
class TaskDeployGraph(BaseHandler):

View File

@ -37,7 +37,8 @@ class OpenstackConfigValidator(BasicValidator):
deploy_task_ids = [
six.text_type(task.id)
for task in objects.TaskCollection.get_by_name_and_cluster(
cluster, (consts.TASK_NAMES.deployment,))
cluster, (consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment))
.filter(models.Task.status.in_((consts.TASK_STATUSES.pending,
consts.TASK_STATUSES.running)))
.all()]

View File

@ -285,6 +285,7 @@ TASK_NAMES = Enum(
'multicast_verification',
'check_repo_availability',
'check_repo_availability_with_setup',
'dry_run_deployment',
# dump
'dump',

View File

@ -24,6 +24,8 @@ from alembic import op
import sqlalchemy as sa
from nailgun.db.sqlalchemy.models import fields
from nailgun.utils.migration import upgrade_enum
# revision identifiers, used by Alembic.
revision = '675105097a69'
@ -32,6 +34,7 @@ down_revision = '11a9adc6d36a'
def upgrade():
upgrade_deployment_history()
upgrade_transaction_names()
upgrade_clusters_replaced_info_wrong_default()
upgrade_tasks_snapshot()
@ -39,6 +42,7 @@ def upgrade():
def downgrade():
downgrade_tasks_snapshot()
downgrade_clusters_replaced_info_wrong_default()
downgrade_transaction_names()
downgrade_deployment_history()
@ -82,3 +86,68 @@ def upgrade_tasks_snapshot():
def downgrade_tasks_snapshot():
op.drop_column('tasks', 'tasks_snapshot')
transaction_names_old = (
'super',
# Cluster changes
# For deployment supertask, it contains
# two subtasks deployment and provision
'deploy',
'deployment',
'provision',
'stop_deployment',
'reset_environment',
'update',
'spawn_vms',
'node_deletion',
'cluster_deletion',
'remove_images',
'check_before_deployment',
# network
'check_networks',
'verify_networks',
'check_dhcp',
'verify_network_connectivity',
'multicast_verification',
'check_repo_availability',
'check_repo_availability_with_setup',
# dump
'dump',
'capacity_log',
# statistics
'create_stats_user',
'remove_stats_user',
# setup dhcp via dnsmasq for multi-node-groups
'update_dnsmasq'
)
transaction_names_new = transaction_names_old + ('dry_run_deployment',)
def upgrade_transaction_names():
upgrade_enum(
'tasks',
'name',
'task_name',
transaction_names_old,
transaction_names_new
)
def downgrade_transaction_names():
upgrade_enum(
'tasks',
'name',
'task_name',
transaction_names_new,
transaction_names_old
)

View File

@ -101,3 +101,7 @@ class TaskBaseDeploymentNotAllowed(DeploymentException):
class NoChanges(DeploymentException):
message = "There is no changes to apply"
class DryRunSupportedOnlyByLCM(DeploymentException):
message = "Dry run deployment mode is supported only by LCM serializer"

View File

@ -14,7 +14,7 @@
class TransactionContext(object):
def __init__(self, new_state, old_state=None):
def __init__(self, new_state, old_state=None, **kwargs):
"""Wrapper around current and previous state of a transaction
:param new_state: new state of cluster
@ -24,6 +24,7 @@ class TransactionContext(object):
"""
self.new = new_state
self.old = old_state or {}
self.options = kwargs
def get_new_data(self, node_id):
return self.new[node_id]

View File

@ -61,6 +61,9 @@ class Context(object):
self._yaql_engine = yaql_ext.create_engine()
self._yaql_expressions_cache = {}
def get_transaction_option(self, name, default=None):
return self._transaction.options.get(name, default)
def get_new_data(self, node_id):
return self._transaction.get_new_data(node_id)
@ -189,6 +192,7 @@ class NoopTaskSerializer(DeploymentTaskSerializer):
class DefaultTaskSerializer(NoopTaskSerializer):
def should_execute(self, task, node_id):
condition = task.get('condition', True)
if isinstance(condition, six.string_types):

View File

@ -230,7 +230,7 @@ class Task(NailgunObject):
Cluster.clear_pending_changes(cluster)
elif instance.status == consts.CLUSTER_STATUSES.error:
elif instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
@ -337,11 +337,19 @@ class TaskCollection(NailgunCollection):
@classmethod
def get_cluster_tasks(cls, cluster_id, names=None):
"""Get unordered cluster tasks query.
:param cluster_id: cluster ID
:type cluster_id: int
:param names: tasks names
:type names: iterable[dict]
:returns: sqlalchemy query
:rtype: sqlalchemy.Query[models.Task]
"""
query = cls.get_by_cluster_id(cluster_id)
if isinstance(names, (list, tuple)):
query = cls.filter_by_list(query, 'name', names)
query = cls.order_by(query, 'id')
return query.all()
return query
@classmethod
def get_by_name_and_cluster(cls, cluster, names):

View File

@ -281,46 +281,51 @@ class NailgunReceiver(object):
else:
db_nodes = []
# First of all, let's update nodes in database
for node_db in db_nodes:
node = nodes_by_id.pop(node_db.uid)
update_fields = (
'error_msg',
'error_type',
'status',
'progress',
'online'
)
for param in update_fields:
if param in node:
logger.debug("Updating node %s - set %s to %s",
node['uid'], param, node[param])
setattr(node_db, param, node[param])
task = objects.Task.get_by_uuid(task_uuid)
# Dry run deployments should not actually lead to update of
# nodes' statuses
if task.name != consts.TASK_NAMES.dry_run_deployment:
if param == 'progress' and node.get('status') == 'error' \
or node.get('online') is False:
# If failure occurred with node
# it's progress should be 100
node_db.progress = 100
# Setting node error_msg for offline nodes
if node.get('online') is False \
and not node_db.error_msg:
node_db.error_msg = u"Node is offline"
# Notification on particular node failure
notifier.notify(
consts.NOTIFICATION_TOPICS.error,
u"Failed to {0} node '{1}': {2}".format(
consts.TASK_NAMES.deploy,
node_db.name,
node_db.error_msg or "Unknown error"
),
cluster_id=task.cluster_id,
node_id=node['uid'],
task_uuid=task_uuid
)
if nodes_by_id:
logger.warning("The following nodes is not found: %s",
",".join(sorted(nodes_by_id)))
# First of all, let's update nodes in database
for node_db in db_nodes:
node = nodes_by_id.pop(node_db.uid)
update_fields = (
'error_msg',
'error_type',
'status',
'progress',
'online'
)
for param in update_fields:
if param in node:
logger.debug("Updating node %s - set %s to %s",
node['uid'], param, node[param])
setattr(node_db, param, node[param])
if param == 'progress' and node.get('status') == \
'error' or node.get('online') is False:
# If failure occurred with node
# it's progress should be 100
node_db.progress = 100
# Setting node error_msg for offline nodes
if node.get('online') is False \
and not node_db.error_msg:
node_db.error_msg = u"Node is offline"
# Notification on particular node failure
notifier.notify(
consts.NOTIFICATION_TOPICS.error,
u"Failed to {0} node '{1}': {2}".format(
consts.TASK_NAMES.deploy,
node_db.name,
node_db.error_msg or "Unknown error"
),
cluster_id=task.cluster_id,
node_id=node['uid'],
task_uuid=task_uuid
)
if nodes_by_id:
logger.warning("The following nodes are not found: %s",
",".join(sorted(nodes_by_id)))
for node in nodes:
if node.get('deployment_graph_task_name') \

View File

@ -34,6 +34,7 @@ from nailgun.statistics.fuel_statistics.tasks_params_white_lists \
tasks_names_actions_groups_mapping = {
consts.TASK_NAMES.deploy: "cluster_changes",
consts.TASK_NAMES.deployment: "cluster_changes",
consts.TASK_NAMES.dry_run_deployment: "cluster_changes",
consts.TASK_NAMES.provision: "cluster_changes",
consts.TASK_NAMES.node_deletion: "cluster_changes",
consts.TASK_NAMES.update: "cluster_changes",

View File

@ -16,20 +16,21 @@
import copy
from distutils.version import StrictVersion
import six
import traceback
from oslo_serialization import jsonutils
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NeutronNetworkConfigurationSerializer
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NovaNetworkConfigurationSerializer
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy.models import Cluster
from nailgun.db.sqlalchemy.models import Task
from nailgun import errors
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NeutronNetworkConfigurationSerializer
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NovaNetworkConfigurationSerializer
from nailgun.logger import logger
from nailgun import notifier
from nailgun import objects
@ -86,10 +87,13 @@ class TaskManager(object):
db().commit()
def check_running_task(self, task_name):
current_tasks = db().query(Task).filter_by(
name=task_name
)
def check_running_task(self, task_names):
if isinstance(task_names, six.string_types):
task_names = (task_names,)
cluster = getattr(self, 'cluster', None)
current_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=cluster.id if cluster else None,
names=task_names).all()
for task in current_tasks:
if task.status == "running":
raise errors.TaskAlreadyRunning()
@ -113,6 +117,7 @@ class DeploymentCheckMixin(object):
deployment_tasks = (
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment,
consts.TASK_NAMES.provision,
consts.TASK_NAMES.stop_deployment,
consts.TASK_NAMES.reset_environment,
@ -124,7 +129,7 @@ class DeploymentCheckMixin(object):
def check_no_running_deployment(cls, cluster):
tasks_q = objects.TaskCollection.get_by_name_and_cluster(
cluster, cls.deployment_tasks).filter_by(
status=consts.TASK_STATUSES.running)
status=consts.TASK_STATUSES.running)
tasks_exists = db.query(tasks_q.exists()).scalar()
if tasks_exists:
@ -139,6 +144,13 @@ class BaseDeploymentTaskManager(TaskManager):
return tasks.ClusterTransaction
return tasks.DeploymentTask
@staticmethod
def get_deployment_transaction_name(dry_run):
transaction_name = consts.TASK_NAMES.deployment
if dry_run:
transaction_name = consts.TASK_NAMES.dry_run_deployment
return transaction_name
class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
@ -163,6 +175,8 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
def _remove_obsolete_tasks(self):
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=self.cluster.id)
cluster_tasks = objects.TaskCollection.order_by(
cluster_tasks, 'id').all()
current_tasks = objects.TaskCollection.filter_by(
cluster_tasks,
@ -230,7 +244,8 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
nodes_to_provision_deploy=nodes_ids_to_deploy,
deployment_tasks=deployment_tasks,
force=force,
graph_type=graph_type
graph_type=graph_type,
**kwargs
)
return supertask
@ -272,7 +287,7 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
def _execute_async_content(self, supertask, deployment_tasks=None,
nodes_to_provision_deploy=None, force=False,
graph_type=None):
graph_type=None, **kwargs):
"""Processes supertask async in mule
:param supertask: SqlAlchemy task object
@ -362,6 +377,9 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
task_messages.append(provision_message)
deployment_message = None
dry_run = kwargs.get('dry_run', False)
if (nodes_to_deploy or affected_nodes or
objects.Release.is_lcm_supported(self.cluster.release)):
if nodes_to_deploy:
@ -373,23 +391,27 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
" ".join((objects.Node.get_node_fqdn(n)
for n in affected_nodes)))
deployment_task_provider = self.get_deployment_task()
transaction_name = self.get_deployment_transaction_name(dry_run)
task_deployment = supertask.create_subtask(
name=consts.TASK_NAMES.deployment,
name=transaction_name,
status=consts.TASK_STATUSES.pending
)
# we should have task committed for processing in other threads
db().commit()
deployment_message = self._call_silently(
task_deployment,
self.get_deployment_task(),
deployment_task_provider,
nodes_to_deploy,
affected_nodes=affected_nodes,
deployment_tasks=deployment_tasks,
method_name='message',
reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES,
graph_type=graph_type,
force=force
force=force,
**kwargs
)
db().commit()
@ -436,14 +458,13 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
node.status = consts.NODE_STATUSES.provisioning
db().commit()
objects.Cluster.get_by_uid(
self.cluster.id,
fail_if_not_found=True,
lock_for_update=True
)
self.cluster.status = consts.CLUSTER_STATUSES.deployment
db().add(self.cluster)
db().commit()
if not dry_run:
objects.Cluster.get_by_uid(
self.cluster.id,
fail_if_not_found=True
)
self.cluster.status = consts.CLUSTER_STATUSES.deployment
db().commit()
# We have to execute node deletion task only when provision,
# deployment and other tasks are in the database. Otherwise,
@ -608,14 +629,19 @@ class ProvisioningTaskManager(TaskManager):
class DeploymentTaskManager(BaseDeploymentTaskManager):
def execute(self, nodes_to_deployment, deployment_tasks=None,
graph_type=None, force=False, **kwargs):
graph_type=None, force=False, dry_run=False,
**kwargs):
deployment_tasks = deployment_tasks or []
logger.debug('Nodes to deploy: {0}'.format(
' '.join([objects.Node.get_node_fqdn(n)
for n in nodes_to_deployment])))
transaction_name = self.get_deployment_transaction_name(dry_run)
task_deployment = Task(
name=consts.TASK_NAMES.deployment, cluster=self.cluster,
name=transaction_name,
cluster=self.cluster,
status=consts.TASK_STATUSES.pending
)
db().add(task_deployment)
@ -627,7 +653,8 @@ class DeploymentTaskManager(BaseDeploymentTaskManager):
deployment_tasks=deployment_tasks,
method_name='message',
graph_type=graph_type,
force=force)
force=force,
dry_run=dry_run)
db().refresh(task_deployment)
@ -637,15 +664,9 @@ class DeploymentTaskManager(BaseDeploymentTaskManager):
fail_if_not_found=True,
lock_for_update=True
)
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_deployment)
task_deployment.cache = deployment_message
for node in nodes_to_deployment:
node.status = 'deploying'
node.progress = 0
db().commit()
rpc.cast('naily', deployment_message)
@ -767,6 +788,7 @@ class ResetEnvironmentTaskManager(TaskManager):
Task.name.in_([
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment,
consts.TASK_NAMES.stop_deployment
])
)
@ -1013,6 +1035,8 @@ class ClusterDeletionManager(TaskManager):
current_tasks = objects.TaskCollection.get_cluster_tasks(
self.cluster.id, names=(consts.TASK_NAMES.cluster_deletion,)
)
current_tasks = objects.TaskCollection.order_by(
current_tasks, 'id').all()
# locking cluster
objects.Cluster.get_by_uid(
@ -1313,7 +1337,12 @@ class OpenstackConfigTaskManager(TaskManager):
return tasks.UpdateOpenstackConfigTask
def execute(self, filters, force=False, graph_type=None, **kwargs):
self.check_running_task(consts.TASK_NAMES.deployment)
self.check_running_task(
(
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment
)
)
task = Task(name=consts.TASK_NAMES.deployment,
cluster=self.cluster,
@ -1332,7 +1361,6 @@ class OpenstackConfigTaskManager(TaskManager):
force=force
)
# locking task
task = objects.Task.get_by_uid(
task.id,
fail_if_not_found=True,
@ -1342,16 +1370,9 @@ class OpenstackConfigTaskManager(TaskManager):
if task.is_completed():
return task
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_update)
task.cache = copy.copy(message)
task.cache['nodes'] = [n.id for n in nodes_to_update]
for node in nodes_to_update:
node.status = consts.NODE_STATUSES.deploying
node.progress = 0
db().commit()
rpc.cast('naily', message)

View File

@ -143,7 +143,7 @@ class BaseDeploymentTask(object):
try:
args = getattr(cls, method)(transaction, **kwargs)
# save tasks history
if 'tasks_graph' in args:
if 'tasks_graph' in args and not args.get('dry_run', False):
logger.info("tasks history saving is started.")
objects.DeploymentHistoryCollection.create(
transaction, args['tasks_graph']
@ -210,7 +210,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
reexecutable_filter=None, graph_type=None, force=False):
reexecutable_filter=None, graph_type=None,
force=False, dry_run=False, **kwargs):
"""Builds RPC message for deployment task.
:param task: the database task object instance
@ -219,9 +220,12 @@ class DeploymentTask(BaseDeploymentTask):
:param deployment_tasks: the list of tasks_ids to execute,
if None, all tasks will be executed
:param reexecutable_filter: the list of events to find subscribed tasks
:param force: force
:param dry_run: dry run
:param graph_type: deployment graph type
"""
logger.debug("DeploymentTask.message(task=%s)" % task.uuid)
task_ids = deployment_tasks or []
objects.NodeCollection.lock_nodes(nodes)
@ -260,7 +264,8 @@ class DeploymentTask(BaseDeploymentTask):
deployment_mode, message = cls.call_deployment_method(
task, tasks=deployment_tasks, nodes=nodes,
affected_nodes=affected_nodes, selected_task_ids=task_ids,
events=reexecutable_filter, force=force
events=reexecutable_filter, force=force,
dry_run=dry_run, **kwargs
)
# After serialization set pending_addition to False
@ -290,7 +295,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def granular_deploy(cls, transaction, tasks, nodes,
affected_nodes, selected_task_ids, events, **kwargs):
affected_nodes, selected_task_ids, events,
dry_run=False, **kwargs):
"""Builds parameters for granular deployment.
:param transaction: the transaction object
@ -300,8 +306,12 @@ class DeploymentTask(BaseDeploymentTask):
:param selected_task_ids: the list of tasks_ids to execute,
if None, all tasks will be executed
:param events: the list of events to find subscribed tasks
:param dry_run: dry run
:return: the arguments for RPC message
"""
if dry_run:
raise errors.DryRunSupportedOnlyByLCM()
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
graph.check()
graph.only_tasks(selected_task_ids)
@ -338,7 +348,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def task_deploy(cls, transaction, tasks, nodes, affected_nodes,
selected_task_ids, events, **kwargs):
selected_task_ids, events, dry_run=False,
**kwargs):
"""Builds parameters for task based deployment.
:param transaction: the transaction object
@ -348,9 +359,13 @@ class DeploymentTask(BaseDeploymentTask):
:param selected_task_ids: the list of tasks_ids to execute,
if None, all tasks will be executed
:param events: the list of events to find subscribed tasks
:param dry_run: dry run
:return: RPC method name, the arguments for RPC message
"""
if dry_run:
raise errors.DryRunSupportedOnlyByLCM()
task_processor = task_based_deployment.TaskProcessor
for task in tasks:
task_processor.ensure_task_based_deploy_allowed(task)
@ -501,11 +516,12 @@ class ClusterTransaction(DeploymentTask):
@classmethod
def task_deploy(cls, transaction, tasks, nodes, force=False,
selected_task_ids=None, **kwargs):
selected_task_ids=None, dry_run=False, **kwargs):
logger.info("The cluster transaction is initiated.")
logger.info("cluster serialization is started.")
# we should update information for all nodes except deleted
# TODO(bgaifullin) pass role resolver to serializers
deployment_info = deployment_serializers.serialize_for_lcm(
transaction.cluster, nodes
)
@ -550,7 +566,8 @@ class ClusterTransaction(DeploymentTask):
logger.info("tasks serialization is finished.")
return {
"tasks_directory": directory,
"tasks_graph": graph
"tasks_graph": graph,
"dry_run": dry_run,
}
@ -1802,9 +1819,11 @@ class CheckBeforeDeploymentTask(object):
@classmethod
def _check_dpdk_network_scheme(cls, network_scheme, node_group):
"""Check that endpoint with dpdk provider mapped only to neutron/private
"""DPDK endpoint provider check
Check that endpoint with dpdk provider mapped only to neutron/private
"""
for net_template in network_scheme.values():
roles = net_template['roles']

View File

@ -1048,13 +1048,22 @@ class EnvironmentManager(object):
task_ids or [],
)
def _launch_for_cluster(self, handler, cluster_id):
def _launch_for_cluster(self, handler, cluster_id, **kwargs):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
if kwargs:
get_string = '?' + ('&'.join(
'{}={}'.format(k, v) for k, v in six.iteritems(kwargs)
))
else:
get_string = ''
resp = self.app.put(
reverse(
handler,
kwargs={'cluster_id': cluster_id}),
kwargs={'cluster_id': cluster_id}
) + get_string,
headers=self.default_headers)
return self.db.query(Task).filter_by(
@ -1065,12 +1074,14 @@ class EnvironmentManager(object):
"Nothing to deploy - try creating cluster"
)
def launch_deployment(self, cluster_id=None):
return self._launch_for_cluster('ClusterChangesHandler', cluster_id)
def launch_redeployment(self, cluster_id=None):
def launch_deployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id
'ClusterChangesHandler', cluster_id, **kwargs
)
def launch_redeployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id, **kwargs
)
def stop_deployment(self, cluster_id=None):

View File

@ -26,6 +26,7 @@ from nailgun import objects
from nailgun.db.sqlalchemy import models
from nailgun.db.sqlalchemy.models import NetworkGroup
from nailgun.extensions.network_manager.manager import NetworkManager
from nailgun.objects import Task
from nailgun.settings import settings
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import mock_rpc
@ -1870,6 +1871,42 @@ class TestHandlers(BaseIntegrationTest):
[node['uid'] for node in deployment_info]
)
@patch('nailgun.task.manager.rpc.cast')
def test_dry_run(self, mcast):
self.env.create(
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0',
},
nodes_kwargs=[
{
'roles': ['controller'],
'status': consts.NODE_STATUSES.provisioned
}
],
cluster_kwargs={
'status': consts.CLUSTER_STATUSES.operational
},
)
for handler in ('ClusterChangesHandler',
'ClusterChangesForceRedeployHandler'):
resp = self.app.put(
reverse(
handler,
kwargs={'cluster_id': self.env.clusters[0].id}
) + '?dry_run=1',
headers=self.default_headers,
expect_errors=True
)
self.assertEqual(resp.status_code, 202)
self.assertEqual(
mcast.call_args[0][1][0]['args']['dry_run'], True)
task_uuid = mcast.call_args[0][1][0]['args']['task_uuid']
task = Task.get_by_uuid(uuid=task_uuid, fail_if_not_found=True)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
self.assertEqual('dry_run_deployment', task.name)
@patch('nailgun.rpc.cast')
def test_occurs_error_not_enough_memory_for_hugepages(self, *_):
meta = self.env.default_metadata()

View File

@ -16,8 +16,10 @@ from mock import patch
from nailgun import consts
from nailgun.db.sqlalchemy.models import DeploymentGraphTask
from nailgun import errors
from nailgun import objects
from nailgun.orchestrator.tasks_templates import make_generic_task
from nailgun.rpc.receiver import NailgunReceiver
from nailgun.task.manager import OpenstackConfigTaskManager
from nailgun.test import base
@ -155,6 +157,26 @@ class TestOpenstackConfigTaskManager80(base.BaseIntegrationTest):
all_node_ids = [self.nodes[0].id]
self.assertEqual(task.cache['nodes'], all_node_ids)
@patch('nailgun.rpc.cast')
def test_config_execute_fails_if_deployment_running(self, mocked_rpc):
task_manager = OpenstackConfigTaskManager(self.cluster.id)
task = task_manager.execute({'cluster_id': self.cluster.id})
self.assertEqual(task.status, consts.TASK_STATUSES.pending)
NailgunReceiver.deploy_resp(
task_uuid=task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
nodes=[{'uid': n.uid, 'status': consts.NODE_STATUSES.ready}
for n in self.env.nodes],
)
self.assertEqual(task.status, consts.TASK_STATUSES.running)
task2 = OpenstackConfigTaskManager(self.cluster.id)
self.assertRaises(errors.TaskAlreadyRunning,
task2.execute, {'cluster_id': self.cluster.id})
class TestOpenstackConfigTaskManager90(TestOpenstackConfigTaskManager80):
env_version = "liberty-8.0"

View File

@ -275,7 +275,27 @@ class TestSelectedNodesAction(BaseSelectedNodesTest):
self.check_deployment_call_made(self.node_uids, mcast)
@mock_rpc(pass_mock=True)
def test_start_deployment_on_selected_nodes_with_tasks(self, mcast):
@patch("objects.Release.is_lcm_supported")
@patch('nailgun.task.task.rpc.cast')
def test_start_dry_run_deployment_on_selected_nodes(self, _, mcast, __):
controller_nodes = [
n for n in self.cluster.nodes
if "controller" in n.roles
]
self.emulate_nodes_provisioning(controller_nodes)
deploy_action_url = reverse(
"DeploySelectedNodes",
kwargs={'cluster_id': self.cluster.id}) + \
make_query(nodes=[n.uid for n in controller_nodes], dry_run='1')
self.send_put(deploy_action_url)
self.assertTrue(mcast.call_args[0][1]['args']['dry_run'])
@mock_rpc(pass_mock=True)
@patch('nailgun.task.task.rpc.cast')
def test_start_deployment_on_selected_nodes_with_tasks(self, _, mcast):
controller_nodes = [
n for n in self.cluster.nodes
if "controller" in n.roles

View File

@ -835,6 +835,43 @@ class TestConsumer(BaseReciverTestCase):
# if there are error nodes
self.assertEqual(task.status, "running")
def test_node_deploy_resp_dry_run(self):
cluster = self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{"api": False},
{"api": False}]
)
node, node2 = self.env.nodes
node.status = consts.NODE_STATUSES.ready
node2.status = consts.NODE_STATUSES.ready
cluster.status = consts.CLUSTER_STATUSES.operational
task = Task(
uuid=str(uuid.uuid4()),
name="dry_run_deployment",
cluster_id=cluster.id
)
self.db.add(task)
self.db.commit()
kwargs = {'task_uuid': task.uuid,
'status': consts.TASK_STATUSES.ready,
'progress': 100,
'nodes': []
}
self.receiver.deploy_resp(**kwargs)
self.db.refresh(node)
self.db.refresh(node2)
self.db.refresh(task)
self.assertEqual(
(node.status, node2.status),
(consts.NODE_STATUSES.ready, consts.NODE_STATUSES.ready)
)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
self.assertEqual(cluster.status, consts.CLUSTER_STATUSES.operational)
def test_node_provision_resp(self):
cluster = self.env.create(
cluster_kwargs={},

View File

@ -15,6 +15,7 @@
# under the License.
import nailgun
from nailgun import consts
from nailgun.db.sqlalchemy.models.notification import Notification
from nailgun.db.sqlalchemy.models.task import Task
@ -25,10 +26,10 @@ from nailgun.test.base import mock_rpc
from nailgun.test.base import reverse
class TestStopDeployment(BaseIntegrationTest):
class TestStopDeploymentPre90(BaseIntegrationTest):
def setUp(self):
super(TestStopDeployment, self).setUp()
super(TestStopDeploymentPre90, self).setUp()
self.cluster = self.env.create(
nodes_kwargs=[
{"name": "First",
@ -36,57 +37,15 @@ class TestStopDeployment(BaseIntegrationTest):
{"name": "Second",
"roles": ["compute"],
"pending_addition": True}
]
],
release_kwargs={
'version': "liberty-8.0"
}
)
self.controller = self.env.nodes[0]
self.compute = self.env.nodes[1]
self.node_uids = [n.uid for n in self.cluster.nodes][:3]
@mock_rpc()
def test_stop_deployment(self):
supertask = self.env.launch_deployment()
self.assertEqual(supertask.status, consts.TASK_STATUSES.pending)
deploy_task = [t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment][0]
NailgunReceiver.deploy_resp(
task_uuid=deploy_task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
)
stop_task = self.env.stop_deployment()
NailgunReceiver.stop_deployment_resp(
task_uuid=stop_task.uuid,
status=consts.TASK_STATUSES.ready,
progress=100,
nodes=[{'uid': n.uid} for n in self.env.nodes],
)
self.assertEqual(stop_task.status, consts.TASK_STATUSES.ready)
self.assertTrue(self.db().query(Task).filter_by(
uuid=deploy_task.uuid
).first())
self.assertIsNone(objects.Task.get_by_uuid(deploy_task.uuid))
self.assertEqual(self.cluster.status, consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
self.assertFalse(self.cluster.is_locked)
for n in self.cluster.nodes:
self.assertEqual(n.roles, [])
self.assertNotEqual(n.pending_roles, [])
notification = self.db.query(Notification).filter_by(
cluster_id=stop_task.cluster_id
).order_by(
Notification.datetime.desc()
).first()
self.assertRegexpMatches(
notification.message,
'was successfully stopped')
# FIXME(aroma): remove when stop action will be reworked for ha
# cluster. To get more details, please, refer to [1]
# [1]: https://bugs.launchpad.net/fuel/+bug/1529691
@ -148,6 +107,73 @@ class TestStopDeployment(BaseIntegrationTest):
self.assertEqual(resp.json_body['message'],
'Stop action is forbidden for the cluster')
class TestStopDeployment(BaseIntegrationTest):
def setUp(self):
super(TestStopDeployment, self).setUp()
self.cluster = self.env.create(
nodes_kwargs=[
{"name": "First",
"pending_addition": True},
{"name": "Second",
"roles": ["compute"],
"pending_addition": True}
],
release_kwargs={
'version': "mitaka-9.0"
}
)
self.controller = self.env.nodes[0]
self.compute = self.env.nodes[1]
self.node_uids = [n.uid for n in self.cluster.nodes][:3]
@mock_rpc()
def test_stop_deployment(self):
supertask = self.env.launch_deployment()
self.assertEqual(supertask.status, consts.TASK_STATUSES.pending)
deploy_task = [t for t in supertask.subtasks
if t.name in (consts.TASK_NAMES.deployment)][0]
NailgunReceiver.deploy_resp(
task_uuid=deploy_task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
)
stop_task = self.env.stop_deployment()
NailgunReceiver.stop_deployment_resp(
task_uuid=stop_task.uuid,
status=consts.TASK_STATUSES.ready,
progress=100,
nodes=[{'uid': n.uid} for n in self.env.nodes],
)
self.assertEqual(stop_task.status, consts.TASK_STATUSES.ready)
self.assertTrue(self.db().query(Task).filter_by(
uuid=deploy_task.uuid
).first())
self.assertIsNone(objects.Task.get_by_uuid(deploy_task.uuid))
self.assertEqual(self.cluster.status,
consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
self.assertFalse(self.cluster.is_locked)
for n in self.cluster.nodes:
self.assertEqual(n.roles, [])
self.assertNotEqual(n.pending_roles, [])
notification = self.db.query(Notification).filter_by(
cluster_id=stop_task.cluster_id
).order_by(
Notification.datetime.desc()
).first()
self.assertRegexpMatches(
notification.message,
'was successfully stopped')
@mock_rpc()
def test_admin_ip_in_args(self):
deploy_task = self.env.launch_deployment()

View File

@ -77,7 +77,8 @@ class TestTaskDeploy80(BaseIntegrationTest):
return args[1][1]
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
def test_task_deploy_used_by_default(self, _):
@mock.patch.object(objects.Release, "is_lcm_supported", return_value=False)
def test_task_deploy_used_by_default(self, _, lcm_mock):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
@ -86,6 +87,16 @@ class TestTaskDeploy80(BaseIntegrationTest):
message["args"]
)
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
@mock.patch.object(objects.Release, "is_lcm_supported", return_value=True)
def test_task_deploy_dry_run(self, _, lcm_mock):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
["task_uuid", "tasks_directory", "tasks_graph", "dry_run"],
message["args"]
)
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
def test_fallback_to_granular_deploy(self, ensure_allowed):
ensure_allowed.side_effect = errors.TaskBaseDeploymentNotAllowed

View File

@ -102,3 +102,18 @@ class TestTasksSnapshotField(base.BaseAlembicMigrationTest):
])
).first()
self.assertIsNotNone(result['tasks_snapshot'])
class TestTransactionsNames(base.BaseAlembicMigrationTest):
def test_fields_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'dry_run_deployment',
'status': 'pending'
}
]
)