Fix naming of reset tasks and message duplication

* reset_environment supertask contains 3 subtasks:
  base_reset_environment, remove_keys_task,
  remove_ironic_bootstrap_task
* names for tasks were changed
* response methods for remove_keys_task and
  remove_ironic_bootstrap_task were added to receiver
* _restore_pending_changes method was add only for
  reset_environment_resp
* migration for adding new transaction names and appropriate
  test were added
* test for check task message was added

Change-Id: Ib8a215174431486316bca533797932e02969c037
Closes-Bug: #1541868
This commit is contained in:
Anastasiya 2016-12-22 18:33:05 +04:00
parent ccac8ae8ed
commit 93f40003de
8 changed files with 283 additions and 47 deletions

View File

@ -259,7 +259,12 @@ TASK_NAMES = Enum(
'deployment',
'provision',
'stop_deployment',
# reset_environment supertask contains three subtasks:
# reset_nodes, remove_keys and remove_ironic_bootstrap
'reset_environment',
'reset_nodes',
'remove_keys',
'remove_ironic_bootstrap',
'update',
'spawn_vms',

View File

@ -50,9 +50,11 @@ def upgrade():
upgrade_node_nic_attributes()
upgrade_node_bond_attributes()
upgrade_tags_set()
upgrade_transaction_names()
def downgrade():
downgrade_transaction_names()
downgrade_tags_set()
downgrade_node_bond_attributes()
downgrade_node_nic_attributes()
@ -250,6 +252,74 @@ FUEL_SECURITY_GROUPS_VERSION = '9.0'
# version of Fuel when DPDK hugepages was introduced
FUEL_DPDK_HUGEPAGES_VERSION = '9.0'
TASK_NAMES_OLD = (
'super',
# Cluster changes
# For deployment supertask, it contains
# two subtasks deployment and provision
'deploy',
'deployment',
'provision',
'stop_deployment',
'reset_environment',
'update',
'spawn_vms',
'node_deletion',
'cluster_deletion',
'remove_images',
'check_before_deployment',
# network
'check_networks',
'verify_networks',
'check_dhcp',
'verify_network_connectivity',
'multicast_verification',
'check_repo_availability',
'check_repo_availability_with_setup',
'dry_run_deployment',
# dump
'dump',
'capacity_log',
# statistics
'create_stats_user',
'remove_stats_user',
# setup dhcp via dnsmasq for multi-node-groups
'update_dnsmasq'
)
TASK_NAMES_NEW = TASK_NAMES_OLD + (
'reset_nodes',
'remove_keys',
'remove_ironic_bootstrap',
)
def upgrade_transaction_names():
migration.upgrade_enum(
'tasks',
'name',
'task_name',
TASK_NAMES_OLD,
TASK_NAMES_NEW
)
def downgrade_transaction_names():
migration.upgrade_enum(
'tasks',
'name',
'task_name',
TASK_NAMES_NEW,
TASK_NAMES_OLD
)
def update_vmware_attributes_metadata(upgrade):
connection = op.get_bind()

View File

@ -796,11 +796,35 @@ class NailgunReceiver(object):
process, len(nodes), message)
@classmethod
def reset_environment_resp(cls, **kwargs):
logger.info(
"RPC method reset_environment_resp received: %s",
jsonutils.dumps(kwargs)
def _restore_pending_changes(cls, nodes, task, ia_nodes):
task.cluster.status = consts.CLUSTER_STATUSES.new
objects.Cluster.add_pending_changes(
task.cluster,
consts.CLUSTER_CHANGES.attributes
)
objects.Cluster.add_pending_changes(
task.cluster,
consts.CLUSTER_CHANGES.networks
)
node_uids = [n["uid"] for n in itertools.chain(nodes, ia_nodes)]
q_nodes = objects.NodeCollection.filter_by_id_list(None, node_uids)
q_nodes = objects.NodeCollection.filter_by(
q_nodes,
cluster_id=task.cluster_id
)
q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
# locking Nodes for update
update_nodes = objects.NodeCollection.lock_for_update(
q_nodes
).all()
for node in update_nodes:
logs_utils.delete_node_logs(node)
objects.Node.reset_to_discover(node)
@classmethod
def _reset_resp(cls, successful_message, restore_pending_changes=False,
**kwargs):
task_uuid = kwargs.get('task_uuid')
nodes = kwargs.get('nodes', [])
ia_nodes = kwargs.get('inaccessible_nodes', [])
@ -813,59 +837,53 @@ class NailgunReceiver(object):
return
if status == consts.TASK_STATUSES.ready:
# restoring pending changes
task.cluster.status = consts.CLUSTER_STATUSES.new
objects.Cluster.add_pending_changes(
task.cluster,
consts.CLUSTER_CHANGES.attributes
)
objects.Cluster.add_pending_changes(
task.cluster,
consts.CLUSTER_CHANGES.networks
)
node_uids = [n["uid"] for n in itertools.chain(nodes, ia_nodes)]
q_nodes = objects.NodeCollection.filter_by_id_list(None, node_uids)
q_nodes = objects.NodeCollection.filter_by(
q_nodes,
cluster_id=task.cluster_id
)
q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
# locking Nodes for update
update_nodes = objects.NodeCollection.lock_for_update(
q_nodes
).all()
for node in update_nodes:
logs_utils.delete_node_logs(node)
objects.Node.reset_to_discover(node)
if restore_pending_changes:
cls._restore_pending_changes(nodes, task, ia_nodes)
if ia_nodes:
cls._notify_inaccessible(
task.cluster_id,
[n["uid"] for n in ia_nodes],
u"environment resetting"
)
message = (
u"Environment '{0}' "
u"was successfully reset".format(
task.cluster.name or task.cluster_id
)
message = successful_message.format(
task.cluster.name or task.cluster_id
)
notifier.notify(
"done",
message,
task.cluster_id
)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
cls._update_action_log_entry(status, task.name, task_uuid, nodes)
@classmethod
def reset_environment_resp(cls, **kwargs):
logger.info(
"RPC method reset_environment_resp received: %s",
jsonutils.dumps(kwargs)
)
message = u"Environment '{0}' was successfully reset"
cls._reset_resp(message, restore_pending_changes=True, **kwargs)
@classmethod
def remove_keys_resp(cls, **kwargs):
logger.info(
"RPC method remove_keys_resp received: %s",
jsonutils.dumps(kwargs)
)
message = u"Keys were removed from environment '{0}'"
cls._reset_resp(message, **kwargs)
@classmethod
def remove_ironic_bootstrap_resp(cls, **kwargs):
logger.info(
"RPC method remove_ironic_bootstrap_resp received: %s",
jsonutils.dumps(kwargs)
)
message = u"Ironic bootstrap was removed from environment '{0}'"
cls._reset_resp(message, **kwargs)
@classmethod
def _notify_inaccessible(cls, cluster_id, nodes_uids, action):
ia_nodes_db = db().query(Node.name).filter(

View File

@ -904,18 +904,22 @@ class ResetEnvironmentTaskManager(ClearTaskHistory):
db().add(supertask)
al = TaskHelper.create_action_log(supertask)
reset_nodes = supertask.create_subtask(
consts.TASK_NAMES.reset_nodes
)
remove_keys_task = supertask.create_subtask(
consts.TASK_NAMES.reset_environment
consts.TASK_NAMES.remove_keys
)
remove_ironic_bootstrap_task = supertask.create_subtask(
consts.TASK_NAMES.reset_environment
consts.TASK_NAMES.remove_ironic_bootstrap
)
db.commit()
rpc.cast('naily', [
tasks.ResetEnvironmentTask.message(supertask),
tasks.ResetEnvironmentTask.message(reset_nodes),
tasks.RemoveIronicBootstrap.message(remove_ironic_bootstrap_task),
tasks.RemoveClusterKeys.message(remove_keys_task)
])

View File

@ -1009,7 +1009,7 @@ class RemoveClusterKeys(object):
rpc_message = make_astute_message(
task,
"execute_tasks",
"reset_environment_resp",
"remove_keys_resp",
{
"tasks": [
tasks_templates.make_shell_task(
@ -1040,7 +1040,7 @@ class RemoveIronicBootstrap(object):
rpc_message = make_astute_message(
task,
"execute_tasks",
"reset_environment_resp",
"remove_ironic_bootstrap_resp",
{
"tasks": [
tasks_templates.make_shell_task(

View File

@ -73,7 +73,11 @@ class TestResetEnvironment(BaseIntegrationTest):
cluster_db.id
)
tasks_after_reset = list(set([task.name for task in cluster_tasks]))
self.assertEqual(tasks_after_reset, ['reset_environment'])
self.assertItemsEqual(tasks_after_reset,
['reset_environment',
'reset_nodes',
'remove_ironic_bootstrap',
'remove_keys'])
# FIXME(aroma): remove when stop action will be reworked for ha
# cluster. To get more details, please, refer to [1]

View File

@ -1725,3 +1725,87 @@ class TestResetEnvironment(BaseReciverTestCase):
}
self.receiver.reset_environment_resp(**resp)
mock_delete_logs.assert_called_once_with(node)
def test_task_message(self):
cluster = self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{"api": False, "status": consts.NODE_STATUSES.ready},
]
)
node = self.env.nodes[0]
reset_environment = Task(
uuid=str(uuid.uuid4()),
name=consts.TASK_NAMES.reset_environment,
cluster_id=cluster.id)
self.db.add(reset_environment)
self.db.flush()
reset_nodes_task = reset_environment.create_subtask(
consts.TASK_NAMES.reset_nodes
)
remove_keys_task = reset_environment.create_subtask(
consts.TASK_NAMES.remove_keys
)
remove_ironic_bootstrap_task = (
reset_environment.create_subtask(
consts.TASK_NAMES.remove_ironic_bootstrap))
self.db.flush()
reset_nodes_message = (
u"Environment '{0}' was successfully reset".format(
reset_nodes_task.cluster.name or
reset_nodes_task.cluster_id
)
)
remove_keys_message = (
u"Keys were removed from environment '{0}'").format(
remove_keys_task.cluster.name or remove_keys_task.cluster_id)
remove_ironic_bootstrap_message = (
u"Ironic bootstrap was removed from environment '{0}'").format(
remove_ironic_bootstrap_task.cluster.name or
remove_ironic_bootstrap_task.cluster_id)
reset_environment_message = (
u'\n'.join(
(reset_nodes_message, remove_keys_message,
remove_ironic_bootstrap_message)
)
)
reset_environment_resp = {
'task_uuid': reset_nodes_task.uuid,
'status': consts.TASK_STATUSES.ready,
'nodes': [
{'uid': node.uid},
]
}
remove_keys_resp = {
'task_uuid': remove_keys_task.uuid,
'status': consts.TASK_STATUSES.ready,
'nodes': [
{'uid': node.uid},
]
}
remove_ironic_bootstrap_resp = {
'task_uuid': remove_ironic_bootstrap_task.uuid,
'status': consts.TASK_STATUSES.ready,
'nodes': [
{'uid': node.uid},
]
}
self.receiver.reset_environment_resp(**reset_environment_resp)
self.receiver.remove_keys_resp(**remove_keys_resp)
self.receiver.remove_ironic_bootstrap_resp(
**remove_ironic_bootstrap_resp
)
self.assertEqual(reset_nodes_message,
reset_nodes_task.message)
self.assertEqual(remove_keys_message, remove_keys_task.message)
self.assertEqual(remove_ironic_bootstrap_message,
remove_ironic_bootstrap_task.message)
self.assertEqual(reset_environment_message,
reset_environment.message)

View File

@ -863,3 +863,54 @@ class TestNodeNICAndBondAttributesMigration(base.BaseAlembicMigrationTest):
# self.assertNotIn('offloading_modes', bonds_table.c)
# self.assertNotIn('interface_properties', bonds_table.c)
# self.assertNotIn('bond_properties', bonds_table.c)
class TestTransactionsNames(base.BaseAlembicMigrationTest):
def test_field_reset_environment_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'reset_environment',
'status': 'pending'
}
]
)
def test_field_reset_nodes_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'reset_nodes',
'status': 'pending'
}
]
)
def test_field_remove_keys_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'remove_keys',
'status': 'pending'
}
]
)
def test_field_remove_ironic_bootstrap_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'remove_ironic_bootstrap',
'status': 'pending'
}
]
)