Store replaced info on node instead of cluster

- add necessery columns to node model
- add replace/get provisioning/deployment info on objects.Cluster
- api interaction with cli is not changed
- added migration from old data model to new

DeploymentInfo treated everywhere as list, because it will consist of
- multiple yaml files per node, on role basis

ProvisioningInfo treated as dict
- one yaml for cluster section (engine)
- yaml per node with provisioning params of this node

It is possible to manage cluster from UI and cli

Change-Id: I9345a5e9adadead2c149e85fab139ae4e5615cf1
Closes-Bug: #1280318
This commit is contained in:
Dima Shulyak 2014-08-06 18:48:44 +03:00
parent a95374adf2
commit 4cd225ca9d
15 changed files with 292 additions and 59 deletions

View File

@ -75,7 +75,8 @@ class DefaultOrchestratorInfo(NodesFilterMixin, BaseHandler):
"""
cluster = self.get_object_or_404(objects.Cluster, cluster_id)
nodes = self.get_nodes(cluster)
return self._serializer.serialize(cluster, nodes)
return self._serializer.serialize(
cluster, nodes, ignore_customized=True)
class OrchestratorInfo(BaseHandler):
@ -149,21 +150,19 @@ class DefaultDeploymentInfo(DefaultOrchestratorInfo):
class ProvisioningInfo(OrchestratorInfo):
def get_orchestrator_info(self, cluster):
return cluster.replaced_provisioning_info
return objects.Cluster.get_provisioning_info(cluster)
def update_orchestrator_info(self, cluster, data):
cluster.replace_provisioning_info(data)
return cluster.replaced_provisioning_info
return objects.Cluster.replace_provisioning_info(cluster, data)
class DeploymentInfo(OrchestratorInfo):
def get_orchestrator_info(self, cluster):
return cluster.replaced_deployment_info
return objects.Cluster.get_deployment_info(cluster)
def update_orchestrator_info(self, cluster, data):
cluster.replace_deployment_info(data)
return cluster.replaced_deployment_info
return objects.Cluster.replace_deployment_info(cluster, data)
class SelectedNodesBase(NodesFilterMixin, BaseHandler):

View File

@ -31,6 +31,7 @@ from nailgun import consts
from nailgun.db.sqlalchemy.models.fields import JSON
from nailgun.openstack.common import jsonutils
from nailgun.utils.migration import drop_enum
from nailgun.utils.migration import upgrade_clusters_replaced_info
from nailgun.utils.migration import upgrade_enum
from nailgun.utils.migration import upgrade_release_attributes_50_to_51
from nailgun.utils.migration import upgrade_release_roles_50_to_51
@ -173,6 +174,10 @@ def upgrade_schema():
op.drop_table('red_hat_accounts')
drop_enum('license_type')
op.add_column('nodes', sa.Column(
'replaced_deployment_info', JSON(), nullable=True))
op.add_column('nodes', sa.Column(
'replaced_provisioning_info', JSON(), nullable=True))
### end Alembic commands ###
@ -180,6 +185,11 @@ def upgrade_data():
connection = op.get_bind()
# upgrade release data from 5.0 to 5.1
upgrade_releases(connection)
upgrade_clusters_replaced_info(connection)
def upgrade_releases(connection):
select = text(
"""SELECT id, attributes_metadata, roles_metadata
from releases""")
@ -215,6 +225,8 @@ def downgrade():
def downgrade_schema():
### commands auto generated by Alembic - please adjust! ###
op.drop_column('nodes', 'replaced_provisioning_info')
op.drop_column('nodes', 'replaced_deployment_info')
upgrade_enum(
"neutron_config", # table
"net_l23_provider", # column

View File

@ -99,16 +99,6 @@ class Cluster(Base):
is_customized = Column(Boolean, default=False)
fuel_version = Column(Text, nullable=False)
def replace_provisioning_info(self, data):
self.replaced_provisioning_info = data
self.is_customized = True
return self.replaced_provisioning_info
def replace_deployment_info(self, data):
self.replaced_deployment_info = data
self.is_customized = True
return self.replaced_deployment_info
@property
def changes(self):
return [

View File

@ -119,6 +119,8 @@ class Node(Base):
agent_checksum = Column(String(40), nullable=True)
ip_addrs = relationship("IPAddr", viewonly=True)
replaced_deployment_info = Column(JSON, default=[])
replaced_provisioning_info = Column(JSON, default={})
@property
def interfaces(self):

View File

@ -364,6 +364,8 @@ class Cluster(NailgunObject):
net_manager.clear_assigned_networks,
nodes_to_remove
)
cls.replace_provisioning_info_on_nodes(instance, [], nodes_to_remove)
cls.replace_deployment_info_on_nodes(instance, [], nodes_to_remove)
map(
net_manager.assign_networks_by_default,
nodes_to_add
@ -402,6 +404,54 @@ class Cluster(NailgunObject):
)
return nics_db.union(bonds_db)
@classmethod
def replace_provisioning_info_on_nodes(cls, instance, data, nodes):
for node in nodes:
node_data = next((n for n in data if node.uid == n['uid']), {})
node.replaced_provisioning_info = node_data
@classmethod
def replace_deployment_info_on_nodes(cls, instance, data, nodes):
for node in instance.nodes:
node_data = [n for n in data if node.uid == n['uid']]
node.replaced_deployment_info = node_data
@classmethod
def replace_provisioning_info(cls, instance, data):
received_nodes = data.pop('nodes', [])
instance.is_customized = True
instance.replaced_provisioning_info = data
cls.replace_provisioning_info_on_nodes(
instance, received_nodes, instance.nodes)
return cls.get_provisioning_info(instance)
@classmethod
def replace_deployment_info(cls, instance, data):
instance.is_customized = True
cls.replace_deployment_info_on_nodes(instance, data, instance.nodes)
return cls.get_deployment_info(instance)
@classmethod
def get_provisioning_info(cls, instance):
data = {}
if instance.replaced_provisioning_info:
data.update(instance.replaced_provisioning_info)
nodes = []
for node in instance.nodes:
if node.replaced_provisioning_info:
nodes.append(node.replaced_provisioning_info)
if data:
data['nodes'] = nodes
return data
@classmethod
def get_deployment_info(cls, instance):
data = []
for node in instance.nodes:
if node.replaced_deployment_info:
data.extend(node.replaced_deployment_info)
return data
class ClusterCollection(NailgunCollection):
"""Cluster collection

View File

@ -617,6 +617,7 @@ class Node(NailgunObject):
netmanager.clear_bond_configuration(instance)
cls.update_roles(instance, [])
cls.update_pending_roles(instance, [])
cls.remove_replaced_params(instance)
instance.cluster_id = None
instance.kernel_params = None
instance.reset_name_to_default()
@ -649,6 +650,11 @@ class Node(NailgunObject):
return (instance.kernel_params or
Cluster.get_default_kernel_params(instance.cluster))
@classmethod
def remove_replaced_params(cls, instance):
instance.replaced_deployment_info = []
instance.replaced_provisioning_info = {}
class NodeCollection(NailgunCollection):
"""Node collection

View File

@ -17,6 +17,7 @@
"""Deployment serializers for orchestrator"""
from copy import deepcopy
from itertools import groupby
from netaddr import IPNetwork
from sqlalchemy import and_
@ -72,10 +73,22 @@ class DeploymentMultinodeSerializer(object):
critical_roles = ['controller', 'ceph-osd', 'primary-mongo']
@classmethod
def serialize(cls, cluster, nodes):
def serialize(cls, cluster, nodes, ignore_customized=False):
"""Method generates facts which
through an orchestrator passes to puppet
"""
serialized_nodes = []
keyfunc = lambda node: bool(node.replaced_deployment_info)
for customized, node_group in groupby(nodes, keyfunc):
if customized and not ignore_customized:
serialized_nodes.extend(cls.serialize_customized(node_group))
else:
serialized_nodes.extend(cls.serialize_generated(
cluster, node_group))
return serialized_nodes
@classmethod
def serialize_generated(cls, cluster, nodes):
nodes = cls.serialize_nodes(nodes)
common_attrs = cls.get_common_attrs(cluster)
@ -84,6 +97,13 @@ class DeploymentMultinodeSerializer(object):
return [dict_merge(node, common_attrs) for node in nodes]
@classmethod
def serialize_customized(self, nodes):
serialized = []
for node in nodes:
serialized.extend(node.replaced_deployment_info)
return serialized
@classmethod
def get_common_attrs(cls, cluster):
"""Cluster attributes."""
@ -1024,7 +1044,7 @@ class NeutronNetworkDeploymentSerializer(NetworkDeploymentSerializer):
return iface_attrs
def serialize(cluster, nodes):
def serialize(cluster, nodes, ignore_customized=False):
"""Serialization depends on deployment mode
"""
objects.NodeCollection.prepare_for_deployment(cluster.nodes)
@ -1034,4 +1054,5 @@ def serialize(cluster, nodes):
elif cluster.is_ha_mode:
serializer = DeploymentHASerializer
return serializer.serialize(cluster, nodes)
return serializer.serialize(
cluster, nodes, ignore_customized=ignore_customized)

View File

@ -16,6 +16,8 @@
"""Provisioning serializers for orchestrator"""
from itertools import groupby
from nailgun import objects
import netaddr
@ -27,31 +29,48 @@ class ProvisioningSerializer(object):
"""Provisioning serializer"""
@classmethod
def serialize(cls, cluster, nodes):
def serialize(cls, cluster, nodes, ignore_customized=False):
"""Serialize cluster for provisioning."""
cluster_attrs = objects.Attributes.merged_attrs_values(
cluster.attributes
)
serialized_nodes = cls.serialize_nodes(cluster_attrs, nodes)
serialized_nodes = []
keyfunc = lambda node: bool(node.replaced_provisioning_info)
for customized, node_group in groupby(nodes, keyfunc):
if customized and not ignore_customized:
serialized_nodes.extend(cls.serialize_customized(node_group))
else:
serialized_nodes.extend(
cls.serialize_nodes(cluster_attrs, node_group))
serialized_info = (cluster.replaced_provisioning_info or
cls.serialize_cluster_info(cluster))
serialized_info['nodes'] = serialized_nodes
return serialized_info
@classmethod
def serialize_cluster_info(cls, cluster):
return {
'engine': {
'url': settings.COBBLER_URL,
'username': settings.COBBLER_USER,
'password': settings.COBBLER_PASSWORD,
'master_ip': settings.MASTER_IP,
},
'nodes': serialized_nodes}
}}
@classmethod
def serialize_customized(self, nodes):
serialized = []
for node in nodes:
serialized.append(node.replaced_provisioning_info)
return serialized
@classmethod
def serialize_nodes(cls, cluster_attrs, nodes):
"""Serialize nodes."""
serialized_nodes = []
for node in nodes:
serialized_node = cls.serialize_node(cluster_attrs, node)
serialized_nodes.append(serialized_node)
serialized_nodes.append(cls.serialize_node(cluster_attrs, node))
return serialized_nodes
@classmethod
@ -183,8 +202,9 @@ class ProvisioningSerializer(object):
return settings.PATH_TO_SSH_KEY
def serialize(cluster, nodes):
def serialize(cluster, nodes, ignore_customized=False):
"""Serialize cluster for provisioning."""
objects.NodeCollection.prepare_for_provisioning(nodes)
return ProvisioningSerializer.serialize(cluster, nodes)
return ProvisioningSerializer.serialize(
cluster, nodes, ignore_customized=ignore_customized)

View File

@ -487,7 +487,6 @@ class NailgunReceiver(object):
message,
task.cluster_id
)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)

View File

@ -147,8 +147,9 @@ class ApplyChangesTaskManager(TaskManager):
# Run validation if user didn't redefine
# provisioning and deployment information
if not self.cluster.replaced_provisioning_info and \
not self.cluster.replaced_deployment_info:
if (not objects.Cluster.get_provisioning_info(self.cluster) and
not objects.Cluster.get_deployment_info(self.cluster)):
try:
self.check_before_deployment(supertask)
except errors.CheckBeforeDeploymentError:

View File

@ -135,8 +135,8 @@ class DeploymentTask(object):
db().flush()
# here we replace deployment data if user redefined them
serialized_cluster = task.cluster.replaced_deployment_info or \
deployment_serializers.serialize(task.cluster, nodes)
serialized_cluster = deployment_serializers.serialize(
task.cluster, nodes)
# After serialization set pending_addition to False
for node in nodes:
@ -167,8 +167,8 @@ class UpdateTask(object):
n.progress = 0
# here we replace deployment data if user redefined them
serialized_cluster = task.cluster.replaced_deployment_info or \
deployment_serializers.serialize(task.cluster, nodes)
serialized_cluster = deployment_serializers.serialize(
task.cluster, nodes)
# After serialization set pending_addition to False
for node in nodes:
@ -196,9 +196,8 @@ class ProvisionTask(object):
lock_for_update=True
)
objects.NodeCollection.lock_nodes(nodes_to_provisioning)
serialized_cluster = task.cluster.replaced_provisioning_info or \
provisioning_serializers.serialize(
task.cluster, nodes_to_provisioning)
serialized_cluster = provisioning_serializers.serialize(
task.cluster, nodes_to_provisioning)
for node in nodes_to_provisioning:
if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP:

View File

@ -200,22 +200,33 @@ class TestHandlers(BaseIntegrationTest):
nodes_kwargs=[
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['compute'], 'pending_addition': True}])
cluster = self.env.clusters[0]
new_deployment_info = {"field": "deployment_info"}
new_provisioning_info = {"field": "provisioning_info"}
# assigning facts to cluster
cluster.replaced_deployment_info = new_deployment_info
cluster.replaced_provisioning_info = new_provisioning_info
self.db.commit()
new_deployment_info = []
new_provisioning_info = {'engine': {}}
nodes = []
self.env.clusters[0].replaced_provisioning_info = new_provisioning_info
self.db.flush()
for node in self.env.nodes:
node.replaced_deployment_info = [{
"field": "deployment_info",
"uid": node.uid
}]
new_deployment_info.extend(node.replaced_deployment_info)
node.replaced_provisioning_info = {
"field": "provisioning_info",
"uid": node.uid
}
nodes.append(
node.replaced_provisioning_info)
new_provisioning_info['nodes'] = nodes
self.env.launch_deployment()
# intercepting arguments with which rpc.cast was called
args, kwargs = nailgun.task.manager.rpc.cast.call_args
self.datadiff(
new_provisioning_info, args[1][0]['args']['provisioning_info'])
new_provisioning_info,
args[1][0]['args']['provisioning_info'])
self.datadiff(
new_deployment_info, args[1][1]['args']['deployment_info'])
new_deployment_info,
args[1][1]['args']['deployment_info'])
def test_cluster_generated_data_handler(self):
self.env.create(

View File

@ -16,6 +16,8 @@
import nailgun
from nailgun import objects
from mock import patch
from nailgun.db.sqlalchemy.models import Cluster
@ -35,9 +37,9 @@ class TestOrchestratorInfoHandlers(BaseIntegrationTest):
super(TestOrchestratorInfoHandlers, self).setUp()
self.cluster = self.env.create_cluster(api=False)
def check_info_handler(self, handler_name, get_info):
def check_info_handler(
self, handler_name, get_info, orchestrator_data, default=[]):
# updating provisioning info
orchestrator_data = {"field": "test"}
put_resp = self.app.put(
reverse(handler_name,
kwargs={'cluster_id': self.cluster.id}),
@ -63,17 +65,28 @@ class TestOrchestratorInfoHandlers(BaseIntegrationTest):
headers=self.default_headers)
self.assertEqual(delete_resp.status_code, 202)
self.assertEqual(get_info(), {})
self.assertEqual(get_info(), default)
def test_cluster_provisioning_info(self):
orchestrator_data = {'engine': {}, 'nodes': []}
for node in self.env.nodes:
orchestrator_data['nodes'].append(
{"field": "test", "uid": node.uid})
self.check_info_handler(
'ProvisioningInfo',
lambda: self.cluster.replaced_provisioning_info)
lambda: objects.Cluster.get_provisioning_info(self.cluster),
orchestrator_data,
default={})
def test_cluster_deployment_info(self):
orchestrator_data = []
for node in self.env.nodes:
orchestrator_data.append({"field": "test", "uid": node.uid})
self.check_info_handler(
'DeploymentInfo',
lambda: self.cluster.replaced_deployment_info)
lambda: objects.Cluster.get_deployment_info(self.cluster),
orchestrator_data)
class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
@ -89,8 +102,7 @@ class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
self.cluster = self.db.query(Cluster).get(cluster['id'])
def customization_handler_helper(self, handler_name, get_info):
facts = {"key": "value"}
def customization_handler_helper(self, handler_name, get_info, facts):
resp = self.app.put(
reverse(handler_name,
kwargs={'cluster_id': self.cluster.id}),
@ -147,15 +159,25 @@ class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
self.assertItemsEqual(actual_uids, node_ids)
def test_cluster_provisioning_customization(self):
facts = {'engine': {'1': '2'}}
nodes = []
for node in self.env.nodes:
nodes.append({"key": "value", "uid": node.uid})
facts['nodes'] = nodes
self.customization_handler_helper(
'ProvisioningInfo',
lambda: self.cluster.replaced_provisioning_info
lambda: objects.Cluster.get_provisioning_info(self.cluster),
facts
)
def test_cluster_deployment_customization(self):
facts = []
for node in self.env.nodes:
facts.append({"key": "value", "uid": node.uid})
self.customization_handler_helper(
'DeploymentInfo',
lambda: self.cluster.replaced_deployment_info
lambda: objects.Cluster.get_deployment_info(self.cluster),
facts
)
def test_deployment_with_one_compute_node(self):

View File

@ -0,0 +1,58 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.test.base import BaseIntegrationTest
from nailgun.utils.migration import upgrade_clusters_replaced_info
class TestReplacedDataMigration(BaseIntegrationTest):
def setUp(self):
super(TestReplacedDataMigration, self).setUp()
self.env.create(
nodes_kwargs=[
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['controller', 'cinder'], 'pending_addition': True},
]
)
self.cluster = self.env.clusters[0]
self.nodes = self.env.nodes
self.deployment_info = []
self.provisioning_info = {'nodes': [], 'engine': {'custom': 'type'}}
for node in self.env.nodes:
self.deployment_info.append({'uid': node.uid, 'type': 'deploy'})
self.provisioning_info['nodes'].append(
{'uid': node.uid, 'type': 'provision'})
self.cluster.replaced_deployment_info = self.deployment_info
self.cluster.replaced_provisioning_info = self.provisioning_info
self.db.commit()
self.provisioning_nodes = self.provisioning_info.pop('nodes')
def test_migration_passed_successfully(self):
connection = self.db.connection()
upgrade_clusters_replaced_info(connection)
self.assertEqual(self.provisioning_info,
self.cluster.replaced_provisioning_info)
self.assertEqual(self.cluster.replaced_deployment_info, {})
for node in self.nodes:
self.assertEqual(
node.replaced_deployment_info,
[n for n in self.deployment_info if n['uid'] == node.uid]
)
self.assertEqual(
node.replaced_provisioning_info,
next(n for n in self.provisioning_nodes
if n['uid'] == node.uid)
)

View File

@ -16,9 +16,11 @@ from alembic import op
import os
import six
import sqlalchemy as sa
from sqlalchemy.sql import text
import yaml
from nailgun.db.sqlalchemy.fixman import load_fixture
from nailgun.openstack.common import jsonutils
def upgrade_enum(table, column_name, enum_name, old_options, new_options):
@ -120,3 +122,44 @@ def upgrade_release_wizard_metadata_50_to_51(fixture_path=None):
]
return wizard_meta
def upgrade_clusters_replaced_info(connection):
select = text(
"""SELECT id, replaced_provisioning_info, replaced_deployment_info
FROM clusters""")
clusters = connection.execute(select)
for cluster in clusters:
nodes_select = text(
"""SELECT id FROM nodes WHERE cluster_id=:id""")
nodes = connection.execute(
nodes_select,
id=cluster[0])
provisioning_info = jsonutils.loads(cluster[1])
deployment_nodes = jsonutils.loads(cluster[2])
provisioning_nodes = provisioning_info.pop('nodes', [])
for node in nodes:
node_deploy = [d for d in deployment_nodes
if d['uid'] == str(node[0])]
node_provision = next((d for d in provisioning_nodes
if d['uid'] == str(node[0])), {})
update_node = text(
"""UPDATE nodes
SET replaced_deployment_info = :deploy,
replaced_provisioning_info = :provision
WHERE id = :id""")
connection.execute(
update_node,
deploy=jsonutils.dumps(node_deploy),
provision=jsonutils.dumps(node_provision),
id=node[0])
update_cluster = text(
"""UPDATE clusters
SET replaced_deployment_info = :deploy,
replaced_provisioning_info = :provision
WHERE id = :id""")
connection.execute(
update_cluster,
deploy=jsonutils.dumps({}),
provision=jsonutils.dumps(provisioning_info),
id=cluster[0])