Deployment graph tasks parameters now saved in Transaction before deployment

Deployment tasks snapshot is now saving during DeploymentTask message creation
in Transaction.tasks_snapshot field.

Methods:

    * attach_tasks_snapshot
    * get_tasks_snapshot

are added to Transaction model.

Handler /transactions/:transaction_id/deployment_history/
now showing tasks parameters in output and supporting
request parameter:
/transactions/:transaction_id/deployment_history/?tasks_names=task1,task2

Change-Id: I59446ef456f7d3bd249c686b7a8e1a93a364daf2
Partial-Bug: #1563317
DocImpact
This commit is contained in:
Ilya Kutukov 2016-04-07 21:59:16 +03:00
parent fffc56e7e3
commit a056929141
11 changed files with 416 additions and 25 deletions

View File

@ -17,32 +17,50 @@ import web
from nailgun.api.v1.handlers import base
from nailgun.api.v1.handlers.base import content
from nailgun.api.v1.validators.deployment_history import \
DeploymentHistoryValidator
from nailgun import errors
from nailgun import objects
class DeploymentHistoryCollectionHandler(base.CollectionHandler):
collection = objects.DeploymentHistoryCollection
validator = DeploymentHistoryValidator
@content
def GET(self, transaction_id):
""":returns: Collection of JSONized DeploymentHistory objects.
""":returns: Collection of JSONized DeploymentHistory records.
:http: * 200 (OK)
* 404 (cluster not found in db)
* 400 (Bad tasks in given transaction)
* 404 (transaction not found in db, task not found in snapshot)
"""
self.get_object_or_404(objects.Transaction, transaction_id)
node_ids = web.input(nodes=None).nodes
statuses = web.input(statuses=None).statuses
# get transaction data
transaction = self.get_object_or_404(
objects.Transaction, transaction_id)
if node_ids:
node_ids = set(node_ids.strip().split(','))
# process input parameters
nodes_ids = web.input(nodes=None).nodes
statuses = web.input(statuses=None).statuses
tasks_names = web.input(tasks_names=None).tasks_names
try:
self.validator.validate_query(nodes_ids=nodes_ids,
statuses=statuses,
tasks_names=tasks_names)
except errors.ValidationException as exc:
raise self.http(400, exc.message)
if nodes_ids:
nodes_ids = set(nodes_ids.strip().split(','))
if statuses:
statuses = set(statuses.strip().split(','))
if tasks_names:
tasks_names = set(tasks_names.strip().split(','))
return self.collection.to_json(
self.collection.get_history(
transaction_id,
node_ids,
statuses)
)
# fetch and serialize history
return self.collection.get_history(transaction=transaction,
nodes_ids=nodes_ids,
statuses=statuses,
tasks_names=tasks_names)

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.api.v1.validators.base import BasicValidator
from nailgun import consts
from nailgun import errors
class DeploymentHistoryValidator(BasicValidator):
@classmethod
def validate_query(cls, nodes_ids, statuses, tasks_names):
if statuses:
statuses = set(statuses.strip().split(','))
if not statuses.issubset(set(consts.HISTORY_TASK_STATUSES)):
raise errors.ValidationException(
"Statuses parameter could be only: {}".format(
", ".join(consts.HISTORY_TASK_STATUSES))
)

View File

@ -19,6 +19,7 @@ Revises: 675105097a69
Create Date: 2016-04-08 15:20:43.989472
"""
from nailgun.db.sqlalchemy.models import fields
from alembic import op
import sqlalchemy as sa
@ -30,11 +31,13 @@ down_revision = '675105097a69'
def upgrade():
upgrade_tasks_snapshot()
upgrade_plugin_links_constraints()
def downgrade():
downgrade_plugin_links_constraints()
downgrade_tasks_snapshot()
def upgrade_plugin_links_constraints():
@ -80,3 +83,18 @@ def downgrade_plugin_links_constraints():
'cluster_plugin_links')
op.drop_constraint('plugin_links_url_uc', 'plugin_links')
def upgrade_tasks_snapshot():
op.add_column(
'tasks',
sa.Column(
'tasks_snapshot',
fields.JSON(),
nullable=True
)
)
def downgrade_tasks_snapshot():
op.drop_column('tasks', 'tasks_snapshot')

View File

@ -32,6 +32,7 @@ from nailgun.db import db
from nailgun.db.sqlalchemy.models.base import Base
from nailgun.db.sqlalchemy.models.fields import JSON
from nailgun.db.sqlalchemy.models.mutable import MutableDict
from nailgun.db.sqlalchemy.models.mutable import MutableList
class Task(Base):
@ -83,6 +84,9 @@ class Task(Base):
network_settings = deferred(Column(MutableDict.as_mutable(JSON),
nullable=True))
tasks_snapshot = deferred(Column(MutableList.as_mutable(JSON),
nullable=True))
deployment_history = relationship(
"DeploymentHistory", backref="task", cascade="all,delete")

View File

@ -13,8 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from datetime import datetime
from nailgun.consts import HISTORY_TASK_STATUSES
@ -26,6 +25,7 @@ from nailgun.objects import NailgunCollection
from nailgun.objects import NailgunObject
from nailgun.objects.serializers.deployment_history \
import DeploymentHistorySerializer
from nailgun.objects import Transaction
from nailgun.logger import logger
@ -135,11 +135,58 @@ class DeploymentHistoryCollection(NailgunCollection):
db().bulk_save_objects(entries)
@classmethod
def get_history(cls, transaction_id, node_ids=None, statuses=None):
query = cls.filter_by(None, task_id=transaction_id)
if node_ids:
query = query.filter(cls.single.model.node_id.in_(node_ids))
def get_history(cls, transaction, nodes_ids=None, statuses=None,
tasks_names=None):
"""Get deployment tasks history.
:param transaction: task SQLAlchemy object
:type transaction: models.Task
:param nodes_ids: filter by node IDs
:type nodes_ids: list[int]|None
:param statuses: filter by statuses
:type statuses: list[basestring]|None
:param tasks_names: filter by deployment graph task names
:type tasks_names: list[basestring]|None
:returns: tasks history
:rtype: list[dict]
"""
query = cls.filter_by(None, task_id=transaction.id)
if nodes_ids:
query = query.filter(cls.single.model.node_id.in_(nodes_ids))
if statuses:
query = query.filter(cls.single.model.status.in_(statuses))
if tasks_names:
query = query.filter(
cls.single.model.deployment_graph_task_name.in_(tasks_names))
return query
history = copy.deepcopy(cls.to_list(query))
# rename task id to conventional field
for record in history:
record['task_name'] = record.pop(
'deployment_graph_task_name', None)
tasks_snapshot = Transaction.get_tasks_snapshot(transaction)
if tasks_snapshot:
task_by_name = {}
for task in tasks_snapshot:
# remove ambiguous id field
task.pop('id', None)
task_by_name[task['task_name']] = task
for history_record in history:
task_name = history_record['task_name']
try:
task_parameters = task_by_name[task_name]
history_record.update(task_parameters)
except KeyError:
logger.warning(
'Definition of "{0}" task is not found'.format(
task_name))
else:
logger.warning('No tasks snapshot is defined in given '
'transaction, probably it is a legacy '
'(Fuel<10.0) or malformed.')
return history

View File

@ -70,6 +70,15 @@ class Transaction(NailgunObject):
if instance is not None:
return instance.cluster_settings
@classmethod
def attach_tasks_snapshot(cls, instance, tasks_snapshot):
instance.tasks_snapshot = tasks_snapshot
@classmethod
def get_tasks_snapshot(cls, instance):
if instance is not None:
return instance.tasks_snapshot
class TransactionCollection(NailgunCollection):

View File

@ -242,10 +242,15 @@ class DeploymentTask(BaseDeploymentTask):
deployment_tasks = objects.Cluster.get_deployment_tasks(
task.cluster, graph_type
)
objects.Transaction.attach_tasks_snapshot(
task,
deployment_tasks
)
# update all puppet tasks with puppet_debug value from settings
settings = objects.Cluster.get_editable_attributes(task.cluster)
puppet_debug = settings['common']['puppet_debug']['value']
cluster_settings = objects.Cluster.get_editable_attributes(
task.cluster)
puppet_debug = cluster_settings['common']['puppet_debug']['value']
for deploy_task in deployment_tasks:
if deploy_task['type'] == consts.ORCHESTRATOR_TASK_TYPES.puppet:
logger.debug("Update puppet task: %s with debug=%s",
@ -273,6 +278,7 @@ class DeploymentTask(BaseDeploymentTask):
task,
objects.Cluster.get_network_attributes(task.cluster)
)
rpc_message = make_astute_message(
task,
deployment_mode,

View File

@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
from nailgun import consts
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import mock_rpc
from nailgun.utils import reverse
class TestDeploymentHistoryHandlers(BaseIntegrationTest):
def setUp(self):
super(TestDeploymentHistoryHandlers, self).setUp()
self.tasks_amt = 2
self.test_tasks = [
{
'id': 'test{}'.format(task_no),
'task_name': 'test{}'.format(task_no),
'parameters': {'param1': 'value1'},
'type': 'puppet',
'roles': '*',
'version': '2.1.0',
'requires': ['pre_deployment_end']
} for task_no in six.moves.range(1, 1 + self.tasks_amt)
]
self.cluster_parameters = {
'nodes_kwargs': [
{
'roles': ['controller'],
'pending_addition': True
},
],
'release_kwargs': {
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'newton-10.0'
}
}
@mock_rpc()
@mock.patch('objects.Cluster.get_deployment_tasks')
def test_history_collection_handler(self, tasks_mock):
tasks_mock.return_value = self.test_tasks
cluster = self.env.create(**self.cluster_parameters)
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
response = self.app.get(
reverse(
'DeploymentHistoryCollectionHandler',
kwargs={
'transaction_id': deployment_task.id
}
),
headers=self.default_headers
)
self.assertItemsEqual(
[
{
'task_name': 'test{}'.format(task_no),
'parameters': {'param1': 'value1'},
'roles': '*',
'type': 'puppet',
'version': '2.1.0',
'requires': ['pre_deployment_end'],
'node_id': node.uid,
'status': 'pending',
'time_start': None,
'time_end': None,
'custom': {}
}
for node in cluster.nodes
for task_no in six.moves.range(1, 1 + self.tasks_amt)
],
response.json_body
)
@mock_rpc()
@mock.patch('objects.Cluster.get_deployment_tasks')
def test_history_task_handler(self, tasks_mock):
tasks_mock.return_value = self.test_tasks
cluster = self.env.create(**self.cluster_parameters)
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
response = self.app.get(
reverse(
'DeploymentHistoryCollectionHandler',
kwargs={
'transaction_id': deployment_task.id
}
) + '?tasks_names=test1',
headers=self.default_headers
)
self.assertItemsEqual(
[{
'task_name': 'test1',
'parameters': {'param1': 'value1'},
'roles': '*',
'type': 'puppet',
'version': '2.1.0',
'requires': ['pre_deployment_end'],
'node_id': node.uid,
'status': 'pending',
'time_start': None,
'time_end': None,
'custom': {}
} for node in cluster.nodes],
response.json_body
)
@mock_rpc()
@mock.patch('objects.Cluster.get_deployment_tasks')
def test_history_task_not_found_returns_empty(self, tasks_mock):
tasks_mock.return_value = self.test_tasks
cluster = self.env.create(**self.cluster_parameters)
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
response = self.app.get(
reverse(
'DeploymentHistoryCollectionHandler',
kwargs={
'transaction_id': deployment_task.id
}
) + '?tasks_names=NOSUCHTASK',
headers=self.default_headers
)
self.assertEqual(200, response.status_code)
self.assertEqual([], response.json_body)
@mock_rpc()
@mock.patch('objects.Cluster.get_deployment_tasks')
def test_history_task_handler_work_without_snapshot(self, tasks_mock):
"""Test that history task handler working without snapshot.
Checks that if not valid graph snapshot is provided output will
return to old history format without unwrapped tasks parameters.
"""
tasks_mock.return_value = self.test_tasks
cluster = self.env.create(**self.cluster_parameters)
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
deployment_task.tasks_snapshot = None
self.db.flush()
response = self.app.get(
reverse(
'DeploymentHistoryCollectionHandler',
kwargs={
'transaction_id': deployment_task.id
}
) + '?tasks_names=test1,nosuchtask',
headers=self.default_headers
)
self.assertEqual(200, response.status_code)
self.assertItemsEqual(
[{
'task_name': 'test1',
'node_id': node.uid,
'status': 'pending',
'time_start': None,
'time_end': None,
'custom': {}
} for node in cluster.nodes],
response.json_body
)
@mock_rpc()
@mock.patch('objects.Cluster.get_deployment_tasks')
def test_history_task_with_bad_status_param(self, tasks_mock):
tasks_mock.return_value = self.test_tasks
cluster = self.env.create(**self.cluster_parameters)
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
response = self.app.get(
reverse(
'DeploymentHistoryCollectionHandler',
kwargs={
'transaction_id': deployment_task.id
}
) + '?statuses=NOTEXISTINGTYPE',
headers=self.default_headers,
expect_errors=True
)
self.assertEqual(400, response.status_code)
self.assertEqual("Statuses parameter could be only: pending, ready, "
"running, error, skipped",
response.json_body['message'])

View File

@ -126,6 +126,9 @@ class TestTaskManagers(BaseIntegrationTest):
objects.Cluster.get_network_attributes(cluster),
objects.Transaction.get_network_settings(deployment_task),
)
self.assertEqual(
len(objects.Transaction.get_tasks_snapshot(deployment_task)),
len(objects.Cluster.get_deployment_tasks(cluster)))
@mock.patch('nailgun.task.task.rpc.cast')
def test_deployment_info_saves_in_transaction(self, _):

View File

@ -76,17 +76,17 @@ class TestDeploymentHistoryObject(base.BaseTestCase):
def test_deployment_history_create(self):
histories = deployment_history.DeploymentHistoryCollection.\
get_history(self.task.id).all()
get_history(self.task)
self.assertEqual(len(histories), 4)
db_task_names = {h.deployment_graph_task_name for h in histories}
db_task_names = {h['task_name'] for h in histories}
input_task_names = set()
for node in TASKS_GRAPH:
for task in TASKS_GRAPH[node]:
input_task_names.add(task['id'])
self.assertEqual(len(db_task_names & input_task_names), 4)
self.assertEqual(histories[0].status,
self.assertEqual(histories[0]['status'],
consts.HISTORY_TASK_STATUSES.pending)
def test_deployment_history_update_if_exist(self):

View File

@ -235,3 +235,22 @@ class TestPluginLinksConstraints(base.BaseAlembicMigrationTest):
[sa.func.count(self.meta.tables['cluster_plugin_links'].c.id)]
)).fetchone()[0]
self.assertEqual(links_count, 2)
class TestTasksSnapshotField(base.BaseAlembicMigrationTest):
def test_fields_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[{
'uuid': 'fake_task_uuid_0',
'name': 'dump',
'status': 'pending',
'tasks_snapshot': '[{"id":"taskid","type":"puppet"}]'
}]
)
result = db.execute(
sa.select([
self.meta.tables['tasks'].c.tasks_snapshot,
])
).first()
self.assertIsNotNone(result['tasks_snapshot'])