Receiving of task processing confirmation from astute handled

We should know is task handled by orchestrator or not.
For instance, we should send stop_deployment task only if
provision or deploy tasks are handled by the orchestrator.
Task status 'pending' added into Task DB model and
handled in stop deployment, provisioning, deployment and
apply cluster changes task managers.

Nailgun Task object update function changed for bubble 'running'
status to parent task.

Locking of all cluster tasks calls removed for deadlocks
exclusion.

Consts used instead hardcoded tasks statuses in part of tests.

Co-Authored-By: Alexandra Morozova <astepanchuk@mirantis.com>
Depends-On: Ib054517696dc4e53487557b09b75ebfcb1255ecb
Depends-On: Idedb061b7b5c4dca4a0ca7adcaa570cecbb691af
Change-Id: I15ebeb85226c832923f9476bb91fa19c0ff87a4f
Closes-Bug: #1498827
This commit is contained in:
Alexander Kislitsky 2015-09-30 14:14:26 +03:00
parent ff842dd813
commit fa46c4dca9
19 changed files with 319 additions and 95 deletions

View File

@ -223,6 +223,7 @@ BOND_TYPES = Enum(
)
TASK_STATUSES = Enum(
'pending',
'ready',
'running',
'error'

View File

@ -42,15 +42,27 @@ release_states_new = (
'manageonly',
)
task_statuses_old = (
'ready',
'running',
'error'
)
task_statuses_new = task_statuses_old + (
'pending',
)
def upgrade():
create_components_table()
create_release_components_table()
upgrade_nodegroups_name_cluster_constraint()
upgrade_release_state()
task_statuses_upgrade()
def downgrade():
task_statuses_downgrade()
downgrade_release_state()
op.drop_constraint('_name_cluster_uc', 'nodegroups',)
op.drop_table('release_components')
@ -158,3 +170,13 @@ def downgrade_release_state():
release_states_new,
release_states_old,
)
def task_statuses_upgrade():
upgrade_enum('tasks', 'status', 'task_status',
task_statuses_old, task_statuses_new)
def task_statuses_downgrade():
upgrade_enum('tasks', 'status', 'task_status',
task_statuses_new, task_statuses_old)

View File

@ -97,9 +97,10 @@ class Task(NailgunObject):
if len(subtasks):
data = dict()
if all(map(lambda s: s.status == 'ready', subtasks)):
if all(map(lambda s: s.status == consts.TASK_STATUSES.ready,
subtasks)):
data['status'] = 'ready'
data['status'] = consts.TASK_STATUSES.ready
data['progress'] = 100
data['message'] = u'\n'.join(map(
lambda s: s.message, filter(
@ -108,26 +109,34 @@ class Task(NailgunObject):
cls.update(instance, data)
TaskHelper.update_action_log(instance)
elif any(map(lambda s: s.status in ('error',), subtasks)):
elif any(map(lambda s: s.status == consts.TASK_STATUSES.error,
subtasks)):
for subtask in subtasks:
if subtask.status not in ('error', 'ready'):
subtask.status = 'error'
if subtask.status not in (consts.TASK_STATUSES.error,
consts.TASK_STATUSES.ready):
subtask.status = consts.TASK_STATUSES.error
subtask.progress = 100
subtask.message = 'Task aborted'
subtask.message = "Task aborted"
data['status'] = 'error'
data['status'] = consts.TASK_STATUSES.error
data['progress'] = 100
data['message'] = u'\n'.join(list(set(map(
lambda s: (s.message or ""), filter(
lambda s: (
s.status == 'error' and not
s.status == consts.TASK_STATUSES.error and not
# TODO(aroma): make this check less ugly
s.message == 'Task aborted'
s.message == "Task aborted"
), subtasks)))))
cls.update(instance, data)
TaskHelper.update_action_log(instance)
elif instance.status == consts.TASK_STATUSES.pending and any(
map(lambda s: s.status in (consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready),
subtasks)):
instance.status = consts.TASK_STATUSES.running
else:
subtasks_with_progress = filter(
lambda s: s.progress is not None,
@ -273,12 +282,11 @@ class TaskCollection(NailgunCollection):
return cls.filter_by(None, cluster_id=cluster_id)
@classmethod
def lock_cluster_tasks(cls, cluster_id, names=None):
def get_cluster_tasks(cls, cluster_id, names=None):
query = cls.get_by_cluster_id(cluster_id)
if isinstance(names, (list, tuple)):
query = cls.filter_by_list(query, 'name', names)
query = cls.order_by(query, 'id')
query = cls.lock_for_update(query)
return query.all()
@classmethod

View File

@ -26,6 +26,7 @@ from oslo_serialization import jsonutils
from sqlalchemy import or_
from nailgun import consts
from nailgun.errors import errors as nailgun_errors
from nailgun import notifier
from nailgun import objects
from nailgun.settings import settings
@ -61,9 +62,7 @@ class NailgunReceiver(object):
if status in [consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error]:
progress = 100
# locking tasks on cluster
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
objects.TaskCollection.lock_cluster_tasks(task.cluster_id)
# locking task
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
@ -234,12 +233,9 @@ class NailgunReceiver(object):
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
fail_if_not_found=True
)
# locking all cluster tasks
objects.TaskCollection.lock_cluster_tasks(task.cluster_id)
# lock cluster
objects.Cluster.get_by_uid(
task.cluster_id,
@ -559,16 +555,12 @@ class NailgunReceiver(object):
fail_if_not_found=True,
)
# locking all cluster tasks
objects.TaskCollection.lock_cluster_tasks(task.cluster_id)
stopping_task_names = [
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.provision
]
# Locking other tasks for stopping
q_stop_tasks = objects.TaskCollection.filter_by_list(
None,
'name',
@ -578,11 +570,10 @@ class NailgunReceiver(object):
q_stop_tasks,
cluster_id=task.cluster_id
)
q_stop_tasks = objects.TaskCollection.order_by(
stop_tasks = objects.TaskCollection.order_by(
q_stop_tasks,
'id'
)
stop_tasks = objects.TaskCollection.lock_for_update(q_stop_tasks).all()
).all()
# Locking cluster
objects.Cluster.get_by_uid(
@ -595,8 +586,8 @@ class NailgunReceiver(object):
logger.warning("stop_deployment_resp: deployment tasks \
not found for environment '%s'!", task.cluster_id)
if status == "ready":
task.cluster.status = "stopped"
if status == consts.TASK_STATUSES.ready:
task.cluster.status = consts.CLUSTER_STATUSES.stopped
if stop_tasks:
map(db().delete, stop_tasks)
@ -1161,3 +1152,26 @@ class NailgunReceiver(object):
objects.Task.update_verify_networks(
task, status, progress, msg, {})
@classmethod
def task_in_orchestrator(cls, **kwargs):
logger.info("RPC method task_in_orchestrator received: %s",
jsonutils.dumps(kwargs))
task_uuid = kwargs.get('task_uuid')
try:
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True,
lock_for_update=True)
if task.status == consts.TASK_STATUSES.pending:
objects.Task.update(
task, {'status': consts.TASK_STATUSES.running})
logger.debug("Task '%s' is acknowledged as running",
task_uuid)
else:
logger.debug("Task '%s' in status '%s' can not "
"be acknowledged as running", task_uuid,
task.status)
except nailgun_errors.ObjectNotFound:
logger.warning("Task '%s' acknowledgement as running failed "
"due to task doesn't exist in DB", task_uuid)

View File

@ -666,6 +666,12 @@ class FakeCheckRepositories(FakeAmpqThread):
}]
class FakeTaskInOrchestrator(FakeAmpqThread):
def message_gen(self):
self.sleep(self.tick_interval)
return [{'task_uuid': self.task_uuid}]
FAKE_THREADS = {
'native_provision': FakeProvisionThread,
'image_provision': FakeProvisionThread,
@ -682,4 +688,5 @@ FAKE_THREADS = {
'execute_tasks': FakeExecuteTasksThread,
'check_repositories': FakeCheckRepositories,
'check_repositories_with_setup': FakeCheckRepositories,
'task_in_orchestrator': FakeTaskInOrchestrator
}

View File

@ -433,6 +433,7 @@ class TaskHelper(object):
@classmethod
def set_ready_if_not_finished(cls, task):
if task.status == consts.TASK_STATUSES.running:
if task.status in (consts.TASK_STATUSES.pending,
consts.TASK_STATUSES.running):
task.status = consts.TASK_STATUSES.ready
cls.update_action_log(task)

View File

@ -131,16 +131,10 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
deployment_type = consts.TASK_NAMES.deploy
def _lock_required_tasks(self):
return objects.TaskCollection.lock_cluster_tasks(
cluster_id=self.cluster.id, names=self.deployment_tasks)
def _remove_obsolete_tasks(self):
locked_tasks = self._lock_required_tasks()
current_tasks = objects.TaskCollection.filter_by(
locked_tasks,
name=consts.TASK_NAMES.deploy
)
current_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=self.cluster.id, names=(consts.TASK_NAMES.deploy,))
# locking cluster
objects.Cluster.get_by_uid(
self.cluster.id,
@ -155,7 +149,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().flush()
obsolete_tasks = objects.TaskCollection.filter_by_list(
locked_tasks,
current_tasks,
'name',
(consts.TASK_NAMES.stop_deployment,
consts.TASK_NAMES.reset_environment)
@ -174,7 +168,8 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
self.check_no_running_deployment(self.cluster)
self._remove_obsolete_tasks()
supertask = Task(name=self.deployment_type, cluster=self.cluster)
supertask = Task(name=self.deployment_type, cluster=self.cluster,
status=consts.TASK_STATUSES.pending)
db().add(supertask)
nodes_to_delete = TaskHelper.nodes_to_delete(self.cluster)
@ -232,7 +227,6 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().commit()
def delete_nodes(self, supertask, nodes_to_delete):
objects.TaskCollection.lock_cluster_tasks(self.cluster.id)
objects.NodeCollection.lock_nodes(nodes_to_delete)
# For more accurate progress calculation
task_weight = 0.4
@ -287,8 +281,6 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
task_deletion = self.delete_nodes(supertask, nodes_to_delete)
if nodes_to_provision:
objects.TaskCollection.lock_cluster_tasks(self.cluster.id)
logger.debug("There are nodes to provision: %s",
" ".join([objects.Node.get_node_fqdn(n)
for n in nodes_to_provision]))
@ -297,6 +289,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
task_weight = 0.4
task_provision = supertask.create_subtask(
consts.TASK_NAMES.provision,
status=consts.TASK_STATUSES.pending,
weight=task_weight)
# we should have task committed for processing in other threads
@ -324,13 +317,13 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
task_messages.append(provision_message)
if nodes_to_deploy:
objects.TaskCollection.lock_cluster_tasks(self.cluster.id)
logger.debug("There are nodes to deploy: %s",
" ".join([objects.Node.get_node_fqdn(n)
for n in nodes_to_deploy]))
task_deployment = supertask.create_subtask(
name=consts.TASK_NAMES.deployment)
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending
)
# we should have task committed for processing in other threads
db().commit()
@ -367,7 +360,9 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
"No nodes to deploy, just update nodes.yaml everywhere.")
task_deployment = supertask.create_subtask(
name=consts.TASK_NAMES.deployment)
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending
)
task_message = tasks.UpdateNodesInfoTask.message(task_deployment)
task_deployment.cache = task_message
task_messages.append(task_message)
@ -530,6 +525,7 @@ class ProvisioningTaskManager(TaskManager):
for n in nodes_to_provision])))
task_provision = Task(name=consts.TASK_NAMES.provision,
status=consts.TASK_STATUSES.pending,
cluster=self.cluster)
db().add(task_provision)
@ -574,7 +570,9 @@ class DeploymentTaskManager(TaskManager):
' '.join([objects.Node.get_node_fqdn(n)
for n in nodes_to_deployment])))
task_deployment = Task(
name=consts.TASK_NAMES.deployment, cluster=self.cluster)
name=consts.TASK_NAMES.deployment, cluster=self.cluster,
status=consts.TASK_STATUSES.pending
)
db().add(task_deployment)
deployment_message = self._call_silently(
@ -611,42 +609,35 @@ class DeploymentTaskManager(TaskManager):
class StopDeploymentTaskManager(TaskManager):
def execute(self):
# locking tasks for processing
names = (
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.stop_deployment,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.provision
)
objects.TaskCollection.lock_cluster_tasks(
self.cluster.id,
names=names
)
stop_running = objects.TaskCollection.filter_by(
None,
cluster_id=self.cluster.id,
name=consts.TASK_NAMES.stop_deployment,
name=consts.TASK_NAMES.stop_deployment
)
stop_running = objects.TaskCollection.order_by(
stop_running, 'id'
).first()
if stop_running:
if stop_running.status == consts.TASK_STATUSES.running:
if stop_running.status in (
consts.TASK_STATUSES.running,
consts.TASK_STATUSES.pending):
raise errors.StopAlreadyRunning(
"Stopping deployment task "
"is already launched"
)
else:
db().delete(stop_running)
db().flush()
db().commit()
deployment_task = objects.TaskCollection.filter_by(
None,
cluster_id=self.cluster.id,
name=consts.TASK_NAMES.deployment,
)
deployment_task = deployment_task.filter(
Task.status != consts.TASK_STATUSES.pending
)
deployment_task = objects.TaskCollection.order_by(
deployment_task, '-id'
).first()
@ -656,6 +647,9 @@ class StopDeploymentTaskManager(TaskManager):
cluster_id=self.cluster.id,
name=consts.TASK_NAMES.provision,
)
provisioning_task = provisioning_task.filter(
Task.status != consts.TASK_STATUSES.pending
)
provisioning_task = objects.TaskCollection.order_by(
provisioning_task, '-id'
).first()
@ -679,6 +673,7 @@ class StopDeploymentTaskManager(TaskManager):
deploy_task, 'id').first()
if deploy_task:
TaskHelper.set_ready_if_not_finished(deploy_task)
db().commit()
task = Task(
name=consts.TASK_NAMES.stop_deployment,
@ -1001,10 +996,10 @@ class VerifyNetworksTaskManager(TaskManager):
class ClusterDeletionManager(TaskManager):
def execute(self):
# locking required tasks
locked_tasks = objects.TaskCollection.lock_cluster_tasks(
current_tasks = objects.TaskCollection.get_cluster_tasks(
self.cluster.id, names=(consts.TASK_NAMES.cluster_deletion,)
)
# locking cluster
objects.Cluster.get_by_uid(
self.cluster.id,
@ -1040,7 +1035,7 @@ class ClusterDeletionManager(TaskManager):
TaskHelper.set_ready_if_not_finished(deploy_running)
logger.debug("Removing cluster tasks")
for task in locked_tasks:
for task in current_tasks:
if task.status == consts.TASK_STATUSES.running:
db().rollback()
raise errors.DeletionAlreadyStarted()
@ -1132,7 +1127,6 @@ class NodeDeletionTaskManager(TaskManager, DeploymentCheckMixin):
cluster = None
if hasattr(self, 'cluster'):
cluster = self.cluster
objects.TaskCollection.lock_cluster_tasks(self.cluster.id)
logger.info("Trying to execute node deletion task with nodes %s",
', '.join(str(node.id) for node in nodes_to_delete))

View File

@ -80,12 +80,22 @@ def fake_cast(queue, messages, **kwargs):
thread.name = message['method'].upper()
return thread
def make_thread_task_in_orchestrator(message):
task_in_orchestrator = {
'args': {'task_uuid': message['args'].get('task_uuid')},
'respond_to': 'task_in_orchestrator',
'method': 'task_in_orchestrator'
}
make_thread(task_in_orchestrator)
if isinstance(messages, (list,)):
thread = None
for m in messages:
thread = make_thread(m, join_to=thread)
make_thread_task_in_orchestrator(m)
else:
make_thread(messages)
make_thread_task_in_orchestrator(messages)
class DeploymentTask(object):

View File

@ -977,9 +977,9 @@ class EnvironmentManager(object):
except Exception:
self.nodes.remove(n)
def _wait_task(self, task, timeout, message):
def _wait_task_status(self, task, timeout, wait_until_in_statuses):
timer = time.time()
while task.status == 'running':
while task.status in wait_until_in_statuses:
self.db.refresh(task)
if time.time() - timer > timeout:
raise Exception(
@ -988,6 +988,11 @@ class EnvironmentManager(object):
)
)
time.sleep(1)
def _wait_task(self, task, timeout, message):
wait_until_in_statuses = (consts.TASK_STATUSES.running,
consts.TASK_STATUSES.pending)
self._wait_task_status(task, timeout, wait_until_in_statuses)
self.tester.assertEqual(task.progress, 100)
if isinstance(message, type(re.compile("regexp"))):
self.tester.assertIsNotNone(re.match(message, task.message))
@ -996,11 +1001,17 @@ class EnvironmentManager(object):
def wait_ready(self, task, timeout=60, message=None):
self._wait_task(task, timeout, message)
self.tester.assertEqual(task.status, 'ready')
self.tester.assertEqual(task.status, consts.TASK_STATUSES.ready)
def wait_until_task_pending(self, task, timeout=60):
wait_until_in_statuses = (consts.TASK_STATUSES.pending,)
self._wait_task_status(task, timeout,
wait_until_in_statuses=wait_until_in_statuses)
self.tester.assertNotEqual(task.status, consts.TASK_STATUSES.pending)
def wait_error(self, task, timeout=60, message=None):
self._wait_task(task, timeout, message)
self.tester.assertEqual(task.status, 'error')
self.tester.assertEqual(task.status, consts.TASK_STATUSES.error)
def wait_for_nodes_status(self, nodes, status):
def check_statuses():

View File

@ -16,6 +16,7 @@
import time
from nailgun import consts
from nailgun.db.sqlalchemy.models import Cluster
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import fake_tasks
@ -39,17 +40,18 @@ class TestCharsetIssues(BaseIntegrationTest):
]
)
supertask = self.env.launch_deployment()
self.assertEqual(supertask.name, 'deploy')
self.assertIn(supertask.status, ('running', 'ready'))
self.env.wait_until_task_pending(supertask)
self.assertEqual(supertask.name, consts.TASK_NAMES.deploy)
self.assertIn(supertask.status, (consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready))
# we have three subtasks here
# repo connectivity check
# deletion
# provision
# deployment
self.assertEqual(len(supertask.subtasks), 4)
self.env.wait_for_nodes_status(self.env.nodes, 'provisioning')
self.env.wait_ready(supertask, 60)
self.env.wait_ready(supertask)
@fake_tasks()
def test_deletion_during_deployment(self):

View File

@ -1745,7 +1745,9 @@ class TestHandlers(BaseIntegrationTest):
headers=self.default_headers)
task = self.env.launch_deployment()
self.assertIn(task.status, ('running', 'ready'))
self.env.wait_until_task_pending(task)
self.assertIn(task.status, (consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready))
@fake_tasks()
def test_admin_untagged_intersection(self):
@ -1836,3 +1838,12 @@ class TestHandlers(BaseIntegrationTest):
headers=self.default_headers)
self.assertEqual(check_mongo.call_count, 1)
@fake_tasks(fake_rpc=False, mock_rpc=False)
@patch('nailgun.rpc.cast')
def test_deploy_task_status(self, _):
self.env.create(
nodes_kwargs=[{'name': '', 'pending_addition': True}]
)
deploy_task = self.env.launch_deployment()
self.assertEqual(consts.TASK_STATUSES.pending, deploy_task.status)

View File

@ -49,7 +49,8 @@ class TestSpawnVMs(BaseIntegrationTest):
self.assertEqual(task_deploy.name, consts.TASK_NAMES.spawn_vms)
self.assertIn(
task_deploy.status,
(consts.TASK_STATUSES.running, consts.TASK_STATUSES.ready)
(consts.TASK_STATUSES.pending, consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready)
)
self.assertEqual(len(task_deploy.subtasks), 2)

View File

@ -51,6 +51,7 @@ class TestStopDeployment(BaseIntegrationTest):
def test_stop_deployment(self):
supertask = self.env.launch_deployment()
deploy_task_uuid = supertask.uuid
self.env.wait_until_task_pending(supertask)
stop_task = self.env.stop_deployment()
self.env.wait_ready(stop_task, 60)
self.assertIsNone(
@ -58,7 +59,7 @@ class TestStopDeployment(BaseIntegrationTest):
uuid=deploy_task_uuid
).first()
)
self.assertEqual(self.cluster.status, "stopped")
self.assertEqual(self.cluster.status, consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
for n in self.cluster.nodes:
@ -79,7 +80,12 @@ class TestStopDeployment(BaseIntegrationTest):
@fake_tasks(fake_rpc=False, mock_rpc=False)
@patch('nailgun.rpc.cast')
def test_admin_ip_in_args(self, mocked_rpc):
self.env.launch_deployment()
deploy_task = self.env.launch_deployment()
provision_task = objects.TaskCollection.filter_by(
None, name=consts.TASK_NAMES.provision,
parent_id=deploy_task.id).first()
provision_task.status = consts.TASK_STATUSES.running
self.env.db.flush()
self.env.stop_deployment()
args, kwargs = nailgun.task.manager.rpc.cast.call_args
for n in args[1]["args"]["nodes"]:
@ -98,6 +104,7 @@ class TestStopDeployment(BaseIntegrationTest):
self.node_uids
)
provision_task_uuid = provision_task.uuid
self.env.wait_until_task_pending(provision_task)
stop_task = self.env.stop_deployment()
self.env.wait_ready(stop_task, 60)
self.assertIsNone(
@ -105,7 +112,7 @@ class TestStopDeployment(BaseIntegrationTest):
uuid=provision_task_uuid
).first()
)
self.assertEqual(self.cluster.status, "stopped")
self.assertEqual(self.cluster.status, consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
@patch('nailgun.rpc.cast')

View File

@ -158,7 +158,8 @@ class TestTasksLogging(BaseIntegrationTest):
{"pending_addition": True, "pending_roles": ["compute"]},
]
)
self.env.launch_deployment()
deploy = self.env.launch_deployment()
self.env.wait_until_task_pending(deploy)
self.env.stop_deployment()
self.assertGreaterEqual(len(logger.call_args_list), 1)

View File

@ -64,7 +64,8 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertEqual(supertask.name, TASK_NAMES.deploy)
self.assertIn(
supertask.status,
(TASK_STATUSES.running, TASK_STATUSES.ready)
(TASK_STATUSES.pending, TASK_STATUSES.running,
TASK_STATUSES.ready)
)
# we have three subtasks here
# repository check
@ -263,23 +264,26 @@ class TestTaskManagers(BaseIntegrationTest):
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
# First node with status ready
# should not be readeployed
self.env.nodes[0].status = 'ready'
self.env.nodes[0].status = consts.TASK_STATUSES.ready
self.env.nodes[0].pending_addition = False
self.db.commit()
objects.Cluster.clear_pending_changes(cluster_db)
supertask = self.env.launch_deployment()
self.assertEqual(supertask.name, 'deploy')
self.assertIn(supertask.status, ('running', 'ready'))
self.assertEqual(supertask.name, consts.TASK_NAMES.deploy)
self.assertIn(supertask.status, (consts.TASK_STATUSES.pending,
consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready))
self.assertEqual(self.env.nodes[0].status, 'ready')
self.env.wait_for_nodes_status([self.env.nodes[1]], 'provisioning')
self.assertEqual(self.env.nodes[0].status, consts.TASK_STATUSES.ready)
self.env.wait_for_nodes_status([self.env.nodes[1]],
consts.NODE_STATUSES.provisioning)
self.env.wait_ready(supertask)
self.env.refresh_nodes()
self.assertEqual(self.env.nodes[1].status, 'ready')
self.assertEqual(self.env.nodes[1].status, consts.NODE_STATUSES.ready)
self.assertEqual(self.env.nodes[1].progress, 100)
@fake_tasks()
@ -901,10 +905,11 @@ class TestTaskManagers(BaseIntegrationTest):
with mock.patch('nailgun.task.task.DeploymentTask.message') as \
mocked_task:
self.env.launch_deployment()
_, actual_nodes_to_deploy = mocked_task.call_args[0]
self.assertItemsEqual(expected_nodes_to_deploy,
actual_nodes_to_deploy)
with mock.patch('nailgun.rpc.cast'):
self.env.launch_deployment()
_, actual_nodes_to_deploy = mocked_task.call_args[0]
self.assertItemsEqual(expected_nodes_to_deploy,
actual_nodes_to_deploy)
@fake_tasks()
def test_deployment_on_controller_removal_via_node_deletion(self):

View File

@ -688,11 +688,18 @@ class TestVerifyNeutronVlan(BaseIntegrationTest):
@fake_tasks()
def test_verify_networks_after_stop(self):
self.cluster = self.env.clusters[0]
self.env.launch_deployment()
cluster = self.env.clusters[0]
deploy_task = self.env.launch_deployment()
self.env.wait_until_task_pending(deploy_task)
stop_task = self.env.stop_deployment()
self.env.wait_ready(stop_task, 60)
self.assertEqual(self.cluster.status, consts.CLUSTER_STATUSES.stopped)
self.db.refresh(cluster)
self.assertEqual(cluster.status, consts.CLUSTER_STATUSES.stopped)
# Moving nodes online by hands. Our fake threads do this with
# random success
for node in sorted(cluster.nodes, key=lambda n: n.id):
node.online = True
self.db.commit()
verify_task = self.env.launch_verify_networks()
self.env.wait_ready(verify_task, 60)

View File

@ -18,6 +18,7 @@ from sqlalchemy.exc import DataError
from sqlalchemy.exc import IntegrityError
import uuid
from nailgun import consts
from nailgun.db import db
from nailgun.db import dropdb
from nailgun.db.migration import ALEMBIC_CONFIG
@ -275,3 +276,31 @@ class TestReleaseMigrations(base.BaseAlembicMigrationTest):
for state in states:
self.assertEqual(state, 'manageonly')
class TestTaskStatus(base.BaseAlembicMigrationTest):
def test_pending_status_saving(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'cluster_id': None,
'uuid': 'fake_task_uuid_0',
'name': consts.TASK_NAMES.node_deletion,
'message': None,
'status': consts.TASK_STATUSES.pending,
'progress': 0,
'cache': None,
'result': None,
'parent_id': None,
'weight': 1
}
])
result = db.execute(
sa.select([self.meta.tables['tasks'].c.status]))
for row in result.fetchall():
status = row[0]
self.assertEqual(status, consts.TASK_STATUSES.pending)

View File

@ -18,6 +18,7 @@ from mock import ANY
from mock import patch
from nailgun import consts
from nailgun.errors import errors
from nailgun.objects import Plugin
from nailgun.rpc.receiver import NailgunReceiver
from nailgun.test import base
@ -128,3 +129,28 @@ class TestNailgunReceiver(base.BaseTestCase):
actual_msg,
r'These nodes: "1" failed to '
'connect to some of these repositories: .*')
def test_task_in_orchestrator_task_not_found(self):
resp = {'task_uuid': 'fake_uuid'}
old_status = self.task.status
self.assertNotRaises(errors.ObjectNotFound,
NailgunReceiver.task_in_orchestrator, **resp)
self.db.flush()
self.assertEqual(old_status, self.task.status)
def test_task_in_orchestrator(self):
resp = {'task_uuid': self.task.uuid}
self.task.status = consts.TASK_STATUSES.pending
self.db.flush()
NailgunReceiver.task_in_orchestrator(**resp)
self.assertEqual(consts.TASK_STATUSES.running, self.task.status)
def test_task_in_orchestrator_status_not_changed(self):
resp = {'task_uuid': self.task.uuid}
for status in (consts.TASK_STATUSES.error,
consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready):
self.task.status = status
self.db.flush()
NailgunReceiver.task_in_orchestrator(**resp)
self.assertEqual(status, self.task.status)

View File

@ -565,3 +565,70 @@ class TestCheckBeforeDeploymentTask(BaseTestCase):
task.CheckBeforeDeploymentTask.\
_check_deployment_graph_for_correctness(
self.task)
class TestDeployTask(BaseTestCase):
def create_deploy_tasks(self):
self.env.create()
cluster = self.env.clusters[0]
deploy_task = Task(name=consts.TASK_NAMES.deploy,
cluster_id=cluster.id,
status=consts.TASK_STATUSES.pending)
self.db.add(deploy_task)
self.db.flush()
provision_task = Task(name=consts.TASK_NAMES.provision,
status=consts.TASK_STATUSES.pending,
parent_id=deploy_task.id, cluster_id=cluster.id)
self.db.add(provision_task)
deployment_task = Task(name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending,
parent_id=deploy_task.id, cluster_id=cluster.id)
self.db.add(deployment_task)
self.db.flush()
return deploy_task, provision_task, deployment_task
def test_running_status_bubble_for_deploy_task(self):
deploy_task, provision_task, deployment_task = \
self.create_deploy_tasks()
objects.Task.update(provision_task,
{'status': consts.TASK_STATUSES.running})
# Only deploy and provision tasks are running now
self.assertEqual(consts.TASK_STATUSES.running, deploy_task.status)
self.assertEqual(consts.TASK_STATUSES.running, provision_task.status)
self.assertEqual(consts.TASK_STATUSES.pending, deployment_task.status)
def test_error_status_bubble_for_deploy_task(self):
deploy_task, provision_task, deployment_task = \
self.create_deploy_tasks()
objects.Task.update(provision_task,
{'status': consts.TASK_STATUSES.error})
# All tasks have error status
self.assertEqual(consts.TASK_STATUSES.error, deploy_task.status)
self.assertEqual(consts.TASK_STATUSES.error, provision_task.status)
self.assertEqual(consts.TASK_STATUSES.error, deployment_task.status)
def test_ready_status_bubble_for_deploy_task(self):
deploy_task, provision_task, deployment_task = \
self.create_deploy_tasks()
objects.Task.update(provision_task,
{'status': consts.TASK_STATUSES.ready})
# Not all child bugs in ready state
self.assertEqual(consts.TASK_STATUSES.running, deploy_task.status)
self.assertEqual(consts.TASK_STATUSES.ready, provision_task.status)
self.assertEqual(consts.TASK_STATUSES.pending, deployment_task.status)
# All child bugs in ready state
objects.Task.update(deployment_task,
{'status': consts.TASK_STATUSES.ready})
self.assertEqual(consts.TASK_STATUSES.ready, deploy_task.status)
self.assertEqual(consts.TASK_STATUSES.ready, provision_task.status)
self.assertEqual(consts.TASK_STATUSES.ready, deployment_task.status)