Fuel UI
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

receiver.py 52KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2013 Mirantis, Inc.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import collections
  16. import copy
  17. import datetime
  18. import itertools
  19. import logging
  20. import os
  21. import six
  22. from oslo_serialization import jsonutils
  23. from sqlalchemy import or_
  24. from nailgun import consts
  25. from nailgun import notifier
  26. from nailgun import objects
  27. from nailgun.settings import settings
  28. from nailgun import transactions
  29. from nailgun.consts import TASK_STATUSES
  30. from nailgun.db import db
  31. from nailgun.db.sqlalchemy.models import IPAddr
  32. from nailgun.db.sqlalchemy.models import Node
  33. from nailgun.db.sqlalchemy.models import Release
  34. from nailgun.extensions.network_manager import connectivity_check
  35. from nailgun.extensions.network_manager import utils as net_utils
  36. from nailgun.objects.plugin import ClusterPlugin
  37. from nailgun.task.helpers import TaskHelper
  38. from nailgun.utils import logs as logs_utils
  39. from nailgun.utils import reverse
  40. logger = logging.getLogger('receiverd')
  41. class NailgunReceiver(object):
  42. @classmethod
  43. def acquire_lock(cls, transaction_uuid):
  44. """Get transaction and acquire exclusive access.
  45. :param transaction_uuid: the unique identifier of transaction
  46. :return: transaction object or None if there is no task with such uid
  47. """
  48. # use transaction object to get removed by UI tasks
  49. transaction = objects.Transaction.get_by_uuid(transaction_uuid)
  50. if not transaction:
  51. logger.error("Task '%s' was removed.", transaction_uuid)
  52. return
  53. # the lock order is following: cluster, task
  54. if transaction.cluster:
  55. objects.Cluster.get_by_uid(
  56. transaction.cluster_id,
  57. fail_if_not_found=True, lock_for_update=True
  58. )
  59. # read transaction again to ensure
  60. # that it was not removed in other session
  61. transaction = objects.Transaction.get_by_uuid(
  62. transaction_uuid, lock_for_update=True)
  63. if not transaction:
  64. logger.error(
  65. "Race condition detected, task '%s' was removed.",
  66. transaction_uuid
  67. )
  68. return transaction
  69. @classmethod
  70. def remove_nodes_resp(cls, **kwargs):
  71. logger.info(
  72. "RPC method remove_nodes_resp received: %s" %
  73. jsonutils.dumps(kwargs)
  74. )
  75. task_uuid = kwargs.get('task_uuid')
  76. nodes = kwargs.get('nodes') or []
  77. error_nodes = kwargs.get('error_nodes') or []
  78. inaccessible_nodes = kwargs.get('inaccessible_nodes') or []
  79. error_msg = kwargs.get('error')
  80. status = kwargs.get('status')
  81. progress = kwargs.get('progress')
  82. if status in [consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error]:
  83. progress = 100
  84. # locking task
  85. task = cls.acquire_lock(task_uuid)
  86. if not task:
  87. return False
  88. # locking nodes
  89. all_nodes = itertools.chain(nodes, error_nodes, inaccessible_nodes)
  90. all_nodes_ids = [
  91. node['id'] if 'id' in node else node['uid']
  92. for node in all_nodes
  93. ]
  94. locked_nodes = objects.NodeCollection.order_by(
  95. objects.NodeCollection.filter_by_list(
  96. None,
  97. 'id',
  98. all_nodes_ids,
  99. ),
  100. 'id'
  101. )
  102. objects.NodeCollection.lock_for_update(locked_nodes).all()
  103. def get_node_id(n):
  104. return n.get('id', int(n.get('uid')))
  105. nodes_to_delete_ids = [get_node_id(n) for n in nodes]
  106. if len(inaccessible_nodes) > 0:
  107. inaccessible_node_ids = [
  108. get_node_id(n) for n in inaccessible_nodes]
  109. logger.warn(u'Nodes %s not answered by RPC, removing from db',
  110. inaccessible_nodes)
  111. nodes_to_delete_ids.extend(inaccessible_node_ids)
  112. for node in objects.NodeCollection.filter_by_id_list(
  113. None, nodes_to_delete_ids):
  114. logs_utils.delete_node_logs(node)
  115. objects.NodeCollection.delete_by_ids(nodes_to_delete_ids)
  116. for node in error_nodes:
  117. node_db = objects.Node.get_by_uid(node['uid'])
  118. if not node_db:
  119. logger.error(
  120. u"Failed to delete node '%s' marked as error from Astute:"
  121. " node doesn't exist", str(node)
  122. )
  123. else:
  124. node_db.pending_deletion = False
  125. node_db.status = 'error'
  126. db().add(node_db)
  127. node['name'] = node_db.name
  128. db().flush()
  129. success_msg = u"No nodes were removed"
  130. err_msg = u"No errors occurred"
  131. if nodes_to_delete_ids:
  132. success_msg = u"Successfully removed {0} node(s)".format(
  133. len(nodes_to_delete_ids)
  134. )
  135. notifier.notify("done", success_msg)
  136. if error_nodes:
  137. err_msg = u"Failed to remove {0} node(s): {1}".format(
  138. len(error_nodes),
  139. ', '.join(
  140. [n.get('name') or "ID: {0}".format(n['uid'])
  141. for n in error_nodes])
  142. )
  143. notifier.notify("error", err_msg)
  144. if not error_msg:
  145. error_msg = ". ".join([success_msg, err_msg])
  146. data = {
  147. 'status': status,
  148. 'progress': progress,
  149. 'message': error_msg,
  150. }
  151. objects.Task.update(task, data)
  152. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  153. @classmethod
  154. def remove_cluster_resp(cls, **kwargs):
  155. logger.info(
  156. "RPC method remove_cluster_resp received: %s" %
  157. jsonutils.dumps(kwargs)
  158. )
  159. task_uuid = kwargs.get('task_uuid')
  160. # in remove_nodes_resp method all objects are already locked
  161. cls.remove_nodes_resp(**kwargs)
  162. task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
  163. cluster = task.cluster
  164. if task.status in ('ready',):
  165. logger.debug("Removing environment itself")
  166. cluster_name = cluster.name
  167. ips = db().query(IPAddr).filter(
  168. IPAddr.network.in_([n.id for n in cluster.network_groups])
  169. )
  170. for ip in ips:
  171. db().delete(ip)
  172. db().flush()
  173. objects.Task.delete(task)
  174. for task_ in cluster.tasks:
  175. if task_ != task:
  176. objects.Transaction.delete(task_)
  177. objects.Cluster.delete(cluster)
  178. notifier.notify(
  179. "done",
  180. u"Environment '{0}' is deleted".format(
  181. cluster_name
  182. )
  183. )
  184. elif task.status in ('error',):
  185. cluster.status = 'error'
  186. db().add(cluster)
  187. db().flush()
  188. if not task.message:
  189. task.message = "Failed to delete nodes:\n{0}".format(
  190. cls._generate_error_message(
  191. task,
  192. error_types=('deletion',)
  193. )
  194. )
  195. notifier.notify(
  196. "error",
  197. task.message,
  198. cluster.id
  199. )
  200. @classmethod
  201. def remove_images_resp(cls, **kwargs):
  202. logger.info(
  203. "RPC method remove_images_resp received: %s",
  204. jsonutils.dumps(kwargs)
  205. )
  206. status = kwargs.get('status')
  207. task_uuid = kwargs['task_uuid']
  208. task = cls.acquire_lock(task_uuid)
  209. if not task:
  210. return
  211. if status == consts.TASK_STATUSES.ready:
  212. logger.info("IBP images from deleted cluster have been removed")
  213. elif status == consts.TASK_STATUSES.error:
  214. logger.error("Removing IBP images failed: task_uuid %s", task_uuid)
  215. objects.Task.update(task, {'status': status})
  216. @classmethod
  217. def transaction_resp(cls, **kwargs):
  218. logger.info(
  219. "RPC method transaction_resp received: %s", jsonutils.dumps(kwargs)
  220. )
  221. # TODO(bgaifullin) move lock to transaction manager
  222. transaction = cls.acquire_lock(kwargs.pop('task_uuid', None))
  223. if not transaction:
  224. return
  225. manager = transactions.TransactionsManager(transaction.cluster.id)
  226. manager.process(transaction, kwargs)
  227. @classmethod
  228. def deploy_resp(cls, **kwargs):
  229. logger.info(
  230. "RPC method deploy_resp received: %s" %
  231. jsonutils.dumps(kwargs)
  232. )
  233. task_uuid = kwargs.get('task_uuid')
  234. nodes = kwargs.get('nodes') or []
  235. message = kwargs.get('error')
  236. status = kwargs.get('status')
  237. progress = kwargs.get('progress')
  238. task = cls.acquire_lock(task_uuid)
  239. if not task:
  240. return
  241. if not status:
  242. status = task.status
  243. # for deployment we need just to pop
  244. # if there no node except master - then just skip updating
  245. # nodes status, for the task itself astute will send
  246. # message with descriptive error
  247. nodes_by_id = {str(n['uid']): n for n in nodes}
  248. master = nodes_by_id.pop(consts.MASTER_NODE_UID, {})
  249. nodes_by_id.pop('None', {})
  250. if nodes_by_id:
  251. # lock nodes for updating so they can't be deleted
  252. q_nodes = objects.NodeCollection.filter_by_id_list(
  253. None,
  254. nodes_by_id,
  255. )
  256. q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
  257. db_nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
  258. else:
  259. db_nodes = []
  260. # Dry run deployments should not actually lead to update of
  261. # nodes' statuses
  262. if task.name != consts.TASK_NAMES.dry_run_deployment and \
  263. not task.get('dry_run'):
  264. # First of all, let's update nodes in database
  265. for node_db in db_nodes:
  266. node = nodes_by_id.pop(node_db.uid)
  267. update_fields = (
  268. 'error_msg',
  269. 'error_type',
  270. 'status',
  271. 'progress',
  272. 'online'
  273. )
  274. for param in update_fields:
  275. if param in node:
  276. logger.debug("Updating node %s - set %s to %s",
  277. node['uid'], param, node[param])
  278. setattr(node_db, param, node[param])
  279. if param == 'progress' and node.get('status') == \
  280. 'error' or node.get('online') is False:
  281. # If failure occurred with node
  282. # it's progress should be 100
  283. node_db.progress = 100
  284. # Setting node error_msg for offline nodes
  285. if node.get('online') is False \
  286. and not node_db.error_msg:
  287. node_db.error_msg = u"Node is offline"
  288. # Notification on particular node failure
  289. notifier.notify(
  290. consts.NOTIFICATION_TOPICS.error,
  291. u"Failed to {0} node '{1}': {2}".format(
  292. consts.TASK_NAMES.deploy,
  293. node_db.name,
  294. node_db.error_msg or "Unknown error"
  295. ),
  296. cluster_id=task.cluster_id,
  297. node_id=node['uid'],
  298. task_uuid=task_uuid
  299. )
  300. if nodes_by_id:
  301. logger.warning("The following nodes are not found: %s",
  302. ",".join(sorted(nodes_by_id)))
  303. for node in nodes:
  304. if node.get('deployment_graph_task_name') \
  305. and node.get('task_status'):
  306. objects.DeploymentHistory.update_if_exist(
  307. task.id,
  308. node['uid'],
  309. node['deployment_graph_task_name'],
  310. node['task_status'],
  311. node.get('summary', {}),
  312. node.get('custom', {})
  313. )
  314. db().flush()
  315. if nodes and not progress:
  316. progress = TaskHelper.recalculate_deployment_task_progress(task)
  317. # full error will be provided in next astute message
  318. if master.get('status') == consts.TASK_STATUSES.error:
  319. status = consts.TASK_STATUSES.error
  320. cls._update_task_status(task, status, progress, message, db_nodes)
  321. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  322. @classmethod
  323. def provision_resp(cls, **kwargs):
  324. logger.info(
  325. "RPC method provision_resp received: %s" %
  326. jsonutils.dumps(kwargs))
  327. task_uuid = kwargs.get('task_uuid')
  328. message = kwargs.get('error')
  329. status = kwargs.get('status')
  330. progress = kwargs.get('progress')
  331. nodes = kwargs.get('nodes', [])
  332. task = cls.acquire_lock(task_uuid)
  333. if not task:
  334. return
  335. # we should remove master node from the nodes since it requires
  336. # special handling and won't work with old code
  337. # lock nodes for updating
  338. nodes_by_id = {str(n['uid']): n for n in nodes}
  339. master = nodes_by_id.pop(consts.MASTER_NODE_UID, {})
  340. if master.get('status') == consts.TASK_STATUSES.error:
  341. status = consts.TASK_STATUSES.error
  342. progress = 100
  343. q_nodes = objects.NodeCollection.filter_by_id_list(
  344. None, nodes_by_id
  345. )
  346. q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
  347. db_nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
  348. for node_db in db_nodes:
  349. node = nodes_by_id.pop(node_db.uid)
  350. if node.get('status') == consts.TASK_STATUSES.error:
  351. node_db.status = consts.TASK_STATUSES.error
  352. node_db.progress = 100
  353. node_db.error_type = consts.TASK_NAMES.provision
  354. node_db.error_msg = node.get('error_msg', 'Unknown error')
  355. else:
  356. node_db.status = node.get('status')
  357. node_db.progress = node.get('progress')
  358. db().flush()
  359. if nodes_by_id:
  360. logger.warning("The following nodes is not found: %s",
  361. ",".join(sorted(six.moves.map(str, nodes_by_id))))
  362. if nodes and not progress:
  363. progress = TaskHelper.recalculate_provisioning_task_progress(task)
  364. cls._update_task_status(task, status, progress, message, db_nodes)
  365. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  366. @classmethod
  367. def update_config_resp(cls, **kwargs):
  368. """Updates task and nodes states at the end of upload config task"""
  369. logger.info(
  370. "RPC method update_config_resp received: %s" %
  371. jsonutils.dumps(kwargs))
  372. task_uuid = kwargs['task_uuid']
  373. message = kwargs.get('error')
  374. status = kwargs.get('status')
  375. progress = kwargs.get('progress')
  376. task = cls.acquire_lock(task_uuid)
  377. if not task:
  378. return
  379. q_nodes = objects.NodeCollection.filter_by_id_list(
  380. None, task.cache['nodes'])
  381. # lock nodes for updating
  382. nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
  383. if status in (consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error):
  384. for node in nodes:
  385. node.status = consts.NODE_STATUSES.ready
  386. node.progress = 100
  387. if status == consts.TASK_STATUSES.error:
  388. message = (u"Failed to update configuration on nodes:"
  389. u" {0}.").format(', '.join(node.name for node in nodes))
  390. logger.error(message)
  391. notifier.notify("error", message)
  392. db().flush()
  393. data = {'status': status, 'progress': progress, 'message': message}
  394. objects.Task.update(task, data)
  395. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  396. @classmethod
  397. def _notify(cls, task, topic, message, node_id=None, task_uuid=None):
  398. """Send notification.
  399. :param task: objects.Task object
  400. :param topic: consts.NOTIFICATION_TOPICS value
  401. :param message: message text
  402. :param node_id: node identifier
  403. :param task_uuid: task uuid. specify task_uuid if necessary to pass it
  404. """
  405. # Due to design of UI, that shows all notifications,
  406. # we should notify provision task only then the task is top-level task
  407. if (task.name == consts.TASK_NAMES.provision
  408. and task.parent_id is not None) or message is None:
  409. return
  410. notifier.notify(
  411. topic,
  412. message,
  413. task.cluster_id,
  414. node_id=node_id,
  415. task_uuid=task_uuid
  416. )
  417. @classmethod
  418. def _assemble_task_update(cls, task, status, progress, message, nodes):
  419. """Assemble arguments to update task.
  420. :param task: objects.Task object
  421. :param status: consts.TASK_STATUSES value
  422. :param progress: progress number value
  423. :param message: message text
  424. :param nodes: the modified nodes list
  425. """
  426. if status == consts.TASK_STATUSES.error:
  427. data = cls._error_action(task, status, progress, message)
  428. elif status == consts.TASK_STATUSES.ready:
  429. data = cls._success_action(task, status, progress, nodes)
  430. else:
  431. data = {}
  432. if status:
  433. data['status'] = status
  434. if progress:
  435. data['progress'] = progress
  436. if message:
  437. data['message'] = message
  438. return data
  439. @classmethod
  440. def _update_task_status(cls, task, status, progress, message, nodes):
  441. """Do update task status actions.
  442. :param task: objects.Task object
  443. :param status: consts.TASK_STATUSES value
  444. :param progress: progress number value
  445. :param message: message text
  446. :param nodes: the modified nodes list
  447. """
  448. objects.Task.update(
  449. task,
  450. cls._assemble_task_update(task, status, progress, message, nodes)
  451. )
  452. @classmethod
  453. def _update_action_log_entry(cls, task_status, task_name, task_uuid,
  454. nodes_from_resp):
  455. try:
  456. if task_status in (consts.TASK_STATUSES.ready,
  457. consts.TASK_STATUSES.error):
  458. al = objects.ActionLog.get_by_kwargs(task_uuid=task_uuid,
  459. action_name=task_name)
  460. if al:
  461. data = {
  462. 'end_timestamp': datetime.datetime.utcnow(),
  463. 'additional_info': {
  464. 'nodes_from_resp': cls.sanitize_nodes_from_resp(
  465. nodes_from_resp),
  466. 'ended_with_status': task_status
  467. }
  468. }
  469. objects.ActionLog.update(al, data)
  470. except Exception as e:
  471. logger.error("_update_action_log_entry failed: %s",
  472. six.text_type(e))
  473. @classmethod
  474. def sanitize_nodes_from_resp(cls, nodes):
  475. resp = []
  476. if isinstance(nodes, list):
  477. for n in nodes:
  478. if isinstance(n, dict) and 'uid' in n:
  479. resp.append(n['uid'])
  480. return resp
  481. @classmethod
  482. def _generate_error_message(cls, task, error_types, names_only=False):
  483. nodes_info = []
  484. error_nodes = db().query(Node).filter_by(
  485. cluster_id=task.cluster_id
  486. ).filter(
  487. or_(
  488. Node.status == 'error',
  489. Node.online == (False)
  490. )
  491. ).filter(
  492. Node.error_type.in_(error_types)
  493. ).all()
  494. for n in error_nodes:
  495. if names_only:
  496. nodes_info.append(u"'{0}'".format(n.name))
  497. else:
  498. nodes_info.append(u"'{0}': {1}".format(n.name, n.error_msg))
  499. if nodes_info:
  500. if names_only:
  501. message = u", ".join(nodes_info)
  502. else:
  503. message = u"\n".join(nodes_info)
  504. else:
  505. message = None
  506. return message
  507. @classmethod
  508. def _error_action(cls, task, status, progress, message=None):
  509. task_name = task.name.title()
  510. if message:
  511. message = u"{0} has failed. {1}".format(task_name, message)
  512. # in case we are sending faild task message from astute
  513. # we should not create a notification with it, because its add
  514. # a lot of clutter for user
  515. notify_message = message.split('\n\n')[0]
  516. else:
  517. error_message = cls._generate_error_message(
  518. task,
  519. error_types=('deploy', 'provision'),
  520. names_only=True
  521. )
  522. message = u"{0} has failed. Check these nodes:\n{1}".format(
  523. task_name, error_message
  524. )
  525. notify_message = message if error_message is not None else None
  526. cls._notify(task, consts.NOTIFICATION_TOPICS.error, notify_message)
  527. return {'status': status, 'progress': progress, 'message': message}
  528. @classmethod
  529. def _success_action(cls, task, status, progress, nodes):
  530. # we shouldn't report success if there's at least one node in
  531. # error state
  532. if any(n.status == consts.NODE_STATUSES.error for n in nodes):
  533. return cls._error_action(task, 'error', 100)
  534. task_name = task.name.title()
  535. if nodes:
  536. # check that all nodes in same state
  537. remaining = objects.Cluster.get_nodes_count_unmet_status(
  538. nodes[0].cluster, nodes[0].status
  539. )
  540. if remaining > 0:
  541. message = u"{0} of {1} environment node(s) is done.".format(
  542. task_name, len(nodes)
  543. )
  544. else:
  545. message = u"{0} of environment '{1}' is done.".format(
  546. task_name, task.cluster.name
  547. )
  548. else:
  549. message = u"{0} is done. No changes.".format(task_name)
  550. if task.name != consts.TASK_NAMES.provision:
  551. plugins_msg = cls._make_plugins_success_message(
  552. ClusterPlugin.get_enabled(task.cluster.id))
  553. if plugins_msg:
  554. message = '{0}\n\n{1}'.format(message, plugins_msg)
  555. cls._notify(task, consts.NOTIFICATION_TOPICS.done, message)
  556. return {'status': status, 'progress': progress, 'message': message}
  557. @classmethod
  558. def _make_plugins_success_message(cls, plugins):
  559. """Makes plugins installation message"""
  560. msg = 'Plugin {0} is deployed. {1}'
  561. return '\n'.join(
  562. map(lambda p: msg.format(p.name, p.description), plugins))
  563. @classmethod
  564. def stop_deployment_resp(cls, **kwargs):
  565. logger.info(
  566. "RPC method stop_deployment_resp received: %s" %
  567. jsonutils.dumps(kwargs)
  568. )
  569. task_uuid = kwargs.get('task_uuid')
  570. nodes = kwargs.get('nodes', [])
  571. ia_nodes = kwargs.get('inaccessible_nodes', [])
  572. message = kwargs.get('error')
  573. status = kwargs.get('status')
  574. progress = kwargs.get('progress')
  575. task = cls.acquire_lock(task_uuid)
  576. if not task:
  577. return
  578. stopping_task_names = [
  579. consts.TASK_NAMES.deploy,
  580. consts.TASK_NAMES.deployment,
  581. consts.TASK_NAMES.provision
  582. ]
  583. q_stop_tasks = objects.TaskCollection.filter_by_list(
  584. None,
  585. 'name',
  586. stopping_task_names
  587. )
  588. q_stop_tasks = objects.TaskCollection.filter_by(
  589. q_stop_tasks,
  590. cluster_id=task.cluster_id,
  591. deleted_at=None
  592. )
  593. stop_tasks = objects.TaskCollection.order_by(
  594. q_stop_tasks,
  595. 'id'
  596. ).all()
  597. if not stop_tasks:
  598. logger.warning("stop_deployment_resp: deployment tasks \
  599. not found for environment '%s'!", task.cluster_id)
  600. if status == consts.TASK_STATUSES.ready:
  601. task.cluster.status = consts.CLUSTER_STATUSES.stopped
  602. if stop_tasks:
  603. objects.Task.bulk_delete(x.id for x in stop_tasks)
  604. node_uids = [n['uid'] for n in itertools.chain(nodes, ia_nodes)]
  605. q_nodes = objects.NodeCollection.filter_by_id_list(None, node_uids)
  606. q_nodes = objects.NodeCollection.filter_by(
  607. q_nodes,
  608. cluster_id=task.cluster_id
  609. )
  610. q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
  611. # locking Nodes for update
  612. update_nodes = objects.NodeCollection.lock_for_update(
  613. q_nodes
  614. ).all()
  615. for node in update_nodes:
  616. objects.Node.reset_to_discover(node)
  617. if ia_nodes:
  618. cls._notify_inaccessible(
  619. task.cluster_id,
  620. [n["uid"] for n in ia_nodes],
  621. u"deployment stopping"
  622. )
  623. message = cls._make_stop_deployment_message(
  624. task, status, stop_tasks, message)
  625. notifier.notify(
  626. "done",
  627. message,
  628. task.cluster_id
  629. )
  630. elif status == consts.TASK_STATUSES.error:
  631. task.cluster.status = consts.CLUSTER_STATUSES.error
  632. if stop_tasks:
  633. objects.Task.bulk_delete(x.id for x in stop_tasks)
  634. q_nodes = objects.NodeCollection.filter_by(
  635. None,
  636. cluster_id=task.cluster_id
  637. )
  638. q_nodes = objects.NodeCollection.filter_by(
  639. q_nodes,
  640. status=consts.NODE_STATUSES.deploying
  641. )
  642. q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
  643. update_nodes = objects.NodeCollection.lock_for_update(
  644. q_nodes
  645. ).all()
  646. for node_db in update_nodes:
  647. node_db.status = consts.NODE_STATUSES.error
  648. node_db.progress = 100
  649. node_db.error_type = consts.NODE_ERRORS.stop_deployment
  650. db().flush()
  651. message = cls._make_stop_deployment_message(
  652. task, status, stop_tasks, message)
  653. notifier.notify(
  654. "error",
  655. message,
  656. task.cluster_id
  657. )
  658. data = {'status': status, 'progress': progress, 'message': message}
  659. objects.Task.update(task, data)
  660. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  661. @classmethod
  662. def _make_stop_deployment_message(cls, task, status, stop_tasks, message):
  663. messages_by_status = {
  664. consts.TASK_STATUSES.ready: [
  665. u"Deployment of environment '{0}' was successfully stopped. ",
  666. u"{0} of environment was successfully stopped. "
  667. ],
  668. consts.TASK_STATUSES.error: [
  669. u"Deployment of environment '{0}' was failed to stop: {1}. "
  670. u"Please check logs for details.",
  671. u"{0} of environment was failed to stop: {1}. "
  672. u"Please check logs for details."
  673. ]
  674. }
  675. stop_task_names = [t.name for t in stop_tasks]
  676. if consts.TASK_NAMES.deploy in stop_task_names:
  677. return messages_by_status[status][0].format(
  678. task.cluster.name or task.cluster_id, message)
  679. process = u"Deployment"
  680. if consts.TASK_NAMES.deployment not in stop_task_names:
  681. process = u"Provisioning"
  682. return messages_by_status[status][1].format(process, message)
  683. @classmethod
  684. def _restore_pending_changes(cls, nodes, task, ia_nodes):
  685. task.cluster.status = consts.CLUSTER_STATUSES.new
  686. objects.Cluster.add_pending_changes(
  687. task.cluster,
  688. consts.CLUSTER_CHANGES.attributes
  689. )
  690. objects.Cluster.add_pending_changes(
  691. task.cluster,
  692. consts.CLUSTER_CHANGES.networks
  693. )
  694. node_uids = [n["uid"] for n in itertools.chain(nodes, ia_nodes)]
  695. q_nodes = objects.NodeCollection.filter_by_id_list(None, node_uids)
  696. q_nodes = objects.NodeCollection.filter_by(
  697. q_nodes,
  698. cluster_id=task.cluster_id
  699. )
  700. q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
  701. # locking Nodes for update
  702. update_nodes = objects.NodeCollection.lock_for_update(
  703. q_nodes
  704. ).all()
  705. for node in update_nodes:
  706. logs_utils.delete_node_logs(node)
  707. objects.Node.reset_to_discover(node)
  708. @classmethod
  709. def _reset_resp(cls, successful_message, restore_pending_changes=False,
  710. **kwargs):
  711. task_uuid = kwargs.get('task_uuid')
  712. nodes = kwargs.get('nodes', [])
  713. ia_nodes = kwargs.get('inaccessible_nodes', [])
  714. message = kwargs.get('error')
  715. status = kwargs.get('status')
  716. progress = kwargs.get('progress')
  717. task = cls.acquire_lock(task_uuid)
  718. if not task:
  719. return
  720. if status == consts.TASK_STATUSES.ready:
  721. if restore_pending_changes:
  722. cls._restore_pending_changes(nodes, task, ia_nodes)
  723. if ia_nodes:
  724. cls._notify_inaccessible(
  725. task.cluster_id,
  726. [n["uid"] for n in ia_nodes],
  727. u"environment resetting"
  728. )
  729. message = successful_message.format(
  730. task.cluster.name or task.cluster_id
  731. )
  732. notifier.notify(
  733. "done",
  734. message,
  735. task.cluster_id
  736. )
  737. data = {'status': status, 'progress': progress, 'message': message}
  738. objects.Task.update(task, data)
  739. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  740. @classmethod
  741. def reset_environment_resp(cls, **kwargs):
  742. logger.info(
  743. "RPC method reset_environment_resp received: %s",
  744. jsonutils.dumps(kwargs)
  745. )
  746. message = u"Environment '{0}' was successfully reset"
  747. cls._reset_resp(message, restore_pending_changes=True, **kwargs)
  748. @classmethod
  749. def remove_keys_resp(cls, **kwargs):
  750. logger.info(
  751. "RPC method remove_keys_resp received: %s",
  752. jsonutils.dumps(kwargs)
  753. )
  754. message = u"Keys were removed from environment '{0}'"
  755. cls._reset_resp(message, **kwargs)
  756. @classmethod
  757. def remove_ironic_bootstrap_resp(cls, **kwargs):
  758. logger.info(
  759. "RPC method remove_ironic_bootstrap_resp received: %s",
  760. jsonutils.dumps(kwargs)
  761. )
  762. message = u"Ironic bootstrap was removed from environment '{0}'"
  763. cls._reset_resp(message, **kwargs)
  764. @classmethod
  765. def _notify_inaccessible(cls, cluster_id, nodes_uids, action):
  766. ia_nodes_db = db().query(Node.name).filter(
  767. Node.id.in_(nodes_uids),
  768. Node.cluster_id == cluster_id
  769. ).order_by(Node.id).yield_per(100)
  770. ia_message = (
  771. u"Fuel couldn't reach these nodes during "
  772. u"{0}: {1}. Manual check may be needed.".format(
  773. action,
  774. u", ".join([
  775. u"'{0}'".format(n.name)
  776. for n in ia_nodes_db
  777. ])
  778. )
  779. )
  780. notifier.notify(
  781. "warning",
  782. ia_message,
  783. cluster_id
  784. )
  785. @classmethod
  786. def verify_networks_resp(cls, **kwargs):
  787. logger.info(
  788. "RPC method verify_networks_resp received: %s" %
  789. jsonutils.dumps(kwargs)
  790. )
  791. task_uuid = kwargs.get('task_uuid')
  792. nodes = kwargs.get('nodes')
  793. error_msg = kwargs.get('error')
  794. status = kwargs.get('status')
  795. progress = kwargs.get('progress')
  796. task = cls.acquire_lock(task_uuid)
  797. if not task:
  798. return
  799. result = []
  800. # We expect that 'nodes' contains all nodes which we test.
  801. # Situation when some nodes not answered must be processed
  802. # in orchestrator early.
  803. if nodes is None:
  804. # If no nodes in kwargs then we update progress or status only.
  805. pass
  806. elif isinstance(nodes, list):
  807. cached_nodes = task.cache['args']['nodes']
  808. node_uids = [str(n['uid']) for n in nodes]
  809. cached_node_uids = [str(n['uid']) for n in cached_nodes]
  810. forgotten_uids = set(cached_node_uids) - set(node_uids)
  811. if forgotten_uids:
  812. absent_nodes = db().query(Node).filter(
  813. Node.id.in_(forgotten_uids)
  814. ).all()
  815. absent_node_names = []
  816. for n in absent_nodes:
  817. if n.name:
  818. absent_node_names.append(n.name)
  819. else:
  820. absent_node_names.append('id: %s' % n.id)
  821. if not error_msg:
  822. error_msg = 'Node(s) {0} didn\'t return data.'.format(
  823. ', '.join(absent_node_names)
  824. )
  825. status = 'error'
  826. else:
  827. error_nodes = []
  828. node_excluded_networks = []
  829. for node in nodes:
  830. cached_nodes_filtered = filter(
  831. lambda n: str(n['uid']) == str(node['uid']),
  832. cached_nodes
  833. )
  834. if not cached_nodes_filtered:
  835. logger.warning(
  836. "verify_networks_resp: arguments contain node "
  837. "data which is not in the task cache: %r",
  838. node
  839. )
  840. continue
  841. cached_node = cached_nodes_filtered[0]
  842. # Check if we have excluded bonded interfaces
  843. # (in particular modes as LACP, Round-robin, etc.)
  844. # that cannot be checked at the moment
  845. excluded_networks = cached_node.get(
  846. 'excluded_networks', [])
  847. if excluded_networks:
  848. interfaces = ', '.join(
  849. [net.get('iface') for net in excluded_networks])
  850. node_excluded_networks.append({
  851. 'node_name': cached_node['name'],
  852. 'interfaces': interfaces
  853. })
  854. errors = connectivity_check.check_received_data(
  855. cached_node, node)
  856. error_nodes.extend(errors)
  857. if error_nodes:
  858. result = error_nodes
  859. status = 'error'
  860. else:
  861. # notices must not rewrite error messages
  862. if node_excluded_networks:
  863. interfaces_list = ', '.join(
  864. ['node {0} [{1}]'.format(
  865. item['node_name'], item['interfaces'])
  866. for item in node_excluded_networks])
  867. error_msg = connectivity_check.append_message(
  868. error_msg,
  869. 'Notice: some interfaces were skipped from '
  870. 'connectivity checking because this version of '
  871. 'Fuel cannot establish following bonding modes '
  872. 'on Bootstrap nodes: LACP, Round-robin '
  873. '(balance-rr). Only interfaces of '
  874. 'successfully deployed nodes may be checked '
  875. 'with mentioned modes enabled. The list of '
  876. 'skipped interfaces: {0}.'.format(interfaces_list),
  877. )
  878. if task.cache['args']['offline'] > 0:
  879. error_msg = connectivity_check.append_message(
  880. error_msg,
  881. 'Notice: {0} node(s) were offline during '
  882. 'connectivity check so they were skipped from the '
  883. 'check.'.format(task.cache['args']['offline'])
  884. )
  885. else:
  886. error_msg = (error_msg or
  887. 'verify_networks_resp: argument "nodes"'
  888. ' have incorrect type')
  889. status = 'error'
  890. logger.error(error_msg)
  891. if status not in ('ready', 'error'):
  892. data = {
  893. 'status': status,
  894. 'progress': progress,
  895. 'message': error_msg,
  896. 'result': result
  897. }
  898. objects.Task.update(task, data)
  899. else:
  900. objects.Task.update_verify_networks(
  901. task, status, progress, error_msg, result)
  902. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  903. @classmethod
  904. def multicast_verification_resp(cls, **kwargs):
  905. """Receiver for verification of multicast packages
  906. data - {1: response, 2: response}
  907. """
  908. logger.info(
  909. u"RPC method multicast_resp received: {0}".format(
  910. jsonutils.dumps(kwargs))
  911. )
  912. task_uuid = kwargs.get('task_uuid')
  913. task = cls.acquire_lock(task_uuid)
  914. if not task:
  915. return
  916. if kwargs.get('status'):
  917. task.status = kwargs['status']
  918. task.progress = kwargs.get('progress', 0)
  919. response = kwargs.get('nodes', {})
  920. error_msg = kwargs.get('error')
  921. if task.status == TASK_STATUSES.error:
  922. task.message = error_msg
  923. elif task.status == TASK_STATUSES.ready:
  924. errors = []
  925. results = []
  926. node_ids = set(config['uid'] for config
  927. in task.cache['args']['nodes'])
  928. not_received_nodes = node_ids - set(response.keys())
  929. if not_received_nodes:
  930. msg = (u'No answer from nodes: {0}').format(
  931. list(not_received_nodes))
  932. errors.append(msg)
  933. for node_id, received_ids in six.iteritems(response):
  934. result = {}
  935. not_received_ids = node_ids - set(received_ids or [])
  936. result = {'node_id': node_id,
  937. 'not_received': list(not_received_ids)}
  938. results.append(result)
  939. if not_received_ids:
  940. msg = (u'Not received ids {0}'
  941. u' for node {1}.').format(not_received_ids, node_id)
  942. errors.append(msg)
  943. task.message = '\n'.join(errors)
  944. if errors:
  945. task.status = TASK_STATUSES.error
  946. task.result = results
  947. if task.status == TASK_STATUSES.ready:
  948. editable = copy.deepcopy(task.cluster.attributes.editable)
  949. editable['corosync']['verified']['value'] = True
  950. task.cluster.attributes.editable = editable
  951. logger.debug(u'Multicast verification message %s', task.message)
  952. objects.Task.update_verify_networks(
  953. task, task.status,
  954. task.progress, task.message, task.result)
  955. @classmethod
  956. def check_dhcp_resp(cls, **kwargs):
  957. """Receiver method for check_dhcp task
  958. For example of kwargs check FakeCheckingDhcpThread
  959. """
  960. logger.info(
  961. "RPC method check_dhcp_resp received: %s",
  962. jsonutils.dumps(kwargs)
  963. )
  964. messages = []
  965. result = collections.defaultdict(list)
  966. message_template = (
  967. u"Node {node_name} discovered DHCP server "
  968. u"via {iface} with following parameters: IP: {server_id}, "
  969. u"MAC: {mac}. This server will conflict with the installation.")
  970. task_uuid = kwargs.get('task_uuid')
  971. nodes = kwargs.get('nodes', [])
  972. error_msg = kwargs.get('error')
  973. status = kwargs.get('status')
  974. progress = kwargs.get('progress')
  975. task = cls.acquire_lock(task_uuid)
  976. if not task:
  977. return
  978. nodes_uids = [node['uid'] for node in nodes]
  979. nodes_db = db().query(Node).filter(Node.id.in_(nodes_uids)).all()
  980. nodes_map = dict((str(node.id), node) for node in nodes_db)
  981. master_network_mac = settings.ADMIN_NETWORK['mac']
  982. logger.debug('Mac addr on master node %s', master_network_mac)
  983. for node in nodes:
  984. node_db = nodes_map.get(node['uid'])
  985. if not node_db:
  986. logger.warning(
  987. "Received message from nonexistent node. "
  988. "Node's UID {0}. Node's data {1}"
  989. .format(node['uid'], node.get('data', []))
  990. )
  991. continue
  992. if node['status'] == consts.NODE_STATUSES.error:
  993. messages.append(
  994. "DHCP discover check failed on node with ID={}. "
  995. "Check logs for details."
  996. .format(node['uid'])
  997. )
  998. result[node['uid']] = node.get('data')
  999. elif node['status'] == consts.NODE_STATUSES.ready:
  1000. # (vvalyavskiy): dhcp_check util produces one record with
  1001. # empty fields if no dhcp server is present, so, we can
  1002. # safely skip checking such kind of responses
  1003. response = node.get('data', [])
  1004. if (len(response) == 1 and isinstance(response[0], dict)
  1005. and not any(response[0].values())):
  1006. logger.warning(
  1007. "No DHCP servers were found! "
  1008. "Node's UID {0}. Node's data {1}"
  1009. .format(node['uid'], response)
  1010. )
  1011. continue
  1012. incorrect_input = False
  1013. for row in response:
  1014. try:
  1015. if not net_utils.is_same_mac(row['mac'],
  1016. master_network_mac):
  1017. row['node_name'] = node_db.name
  1018. message = message_template.format(**row)
  1019. messages.append(message)
  1020. # NOTE(aroma): for example when mac's value
  1021. # is an empty string
  1022. except ValueError as e:
  1023. logger.warning(
  1024. "Failed to compare mac address "
  1025. "from response data (row = {0}) "
  1026. "from node with id={1}. "
  1027. "Original error:\n {2}"
  1028. .format(row, node['uid'], six.text_type(e)))
  1029. incorrect_input = True
  1030. finally:
  1031. result[node['uid']].append(row)
  1032. if incorrect_input:
  1033. messages.append(
  1034. "Something is wrong with response data from node with "
  1035. "id={}. Check logs for details."
  1036. .format(node['uid'])
  1037. )
  1038. status = status if not messages else consts.TASK_STATUSES.error
  1039. error_msg = '\n'.join(messages) if messages else error_msg
  1040. logger.debug('Check dhcp message %s', error_msg)
  1041. objects.Task.update_verify_networks(task, status, progress,
  1042. error_msg, result)
  1043. @classmethod
  1044. def download_release_resp(cls, **kwargs):
  1045. logger.info(
  1046. "RPC method download_release_resp received: %s" %
  1047. jsonutils.dumps(kwargs)
  1048. )
  1049. task_uuid = kwargs.get('task_uuid')
  1050. error_msg = kwargs.get('error')
  1051. status = kwargs.get('status')
  1052. progress = kwargs.get('progress')
  1053. task = cls.acquire_lock(task_uuid)
  1054. if not task:
  1055. return
  1056. release_info = task.cache['args']['release_info']
  1057. release_id = release_info['release_id']
  1058. release = db().query(Release).get(release_id)
  1059. if not release:
  1060. logger.error("download_release_resp: Release"
  1061. " with ID %s not found", release_id)
  1062. return
  1063. if error_msg:
  1064. status = 'error'
  1065. error_msg = "{0} download and preparation " \
  1066. "has failed.".format(release.name)
  1067. cls._download_release_error(
  1068. release_id,
  1069. error_msg
  1070. )
  1071. elif progress == 100 and status == 'ready':
  1072. cls._download_release_completed(release_id)
  1073. result = {
  1074. "release_info": {
  1075. "release_id": release_id
  1076. }
  1077. }
  1078. data = {'status': status, 'progress': progress, 'message': error_msg,
  1079. 'result': result}
  1080. objects.Task.update(task, data)
  1081. @classmethod
  1082. def dump_environment_resp(cls, **kwargs):
  1083. logger.info(
  1084. "RPC method dump_environment_resp received: %s" %
  1085. jsonutils.dumps(kwargs)
  1086. )
  1087. task_uuid = kwargs.get('task_uuid')
  1088. status = kwargs.get('status')
  1089. progress = kwargs.get('progress')
  1090. error = kwargs.get('error')
  1091. msg = kwargs.get('msg')
  1092. task = cls.acquire_lock(task_uuid)
  1093. if not task:
  1094. return
  1095. if status == 'error':
  1096. notifier.notify('error', error)
  1097. data = {'status': status, 'progress': 100, 'message': error}
  1098. objects.Task.update(task, data)
  1099. elif status == 'ready':
  1100. dumpfile = os.path.basename(msg)
  1101. notifier.notify('done', 'Snapshot is ready. '
  1102. 'Visit Support page to download')
  1103. dumpfile_url = reverse('SnapshotDownloadHandler',
  1104. kwargs={'snapshot_name': dumpfile})
  1105. data = {'status': status, 'progress': progress,
  1106. 'message': dumpfile_url}
  1107. objects.Task.update(task, data)
  1108. @classmethod
  1109. def stats_user_resp(cls, **kwargs):
  1110. logger.info("RPC method stats_user_resp received: %s",
  1111. jsonutils.dumps(kwargs))
  1112. task_uuid = kwargs.get('task_uuid')
  1113. nodes = kwargs.get('nodes', [])
  1114. status = kwargs.get('status')
  1115. error = kwargs.get('error')
  1116. message = kwargs.get('msg')
  1117. task = cls.acquire_lock(task_uuid)
  1118. if not task:
  1119. return
  1120. if status not in (consts.TASK_STATUSES.ready,
  1121. consts.TASK_STATUSES.error):
  1122. logger.debug("Task %s, id: %s in status: %s",
  1123. task.name, task.id, task.status)
  1124. return
  1125. data = {'status': status, 'progress': 100, 'message': message}
  1126. if status == consts.TASK_STATUSES.error:
  1127. logger.error("Task %s, id: %s failed: %s",
  1128. task.name, task.id, error)
  1129. data['message'] = error
  1130. objects.Task.update(task, data)
  1131. cls._update_action_log_entry(status, task.name, task_uuid, nodes)
  1132. logger.info("RPC method stats_user_resp processed")
  1133. @classmethod
  1134. def _get_failed_repos(cls, node):
  1135. """Get failed repositories from failed node.
  1136. :param node: master or slave
  1137. :type node: dict
  1138. :return: list of failed repositories
  1139. """
  1140. return node['out'].get('failed_urls', [])
  1141. @classmethod
  1142. def _check_repos_connectivity(cls, resp_kwargs, failed_nodes_msg,
  1143. suggestion_msg=''):
  1144. """Analyze response data to check repo connectivity from nodes
  1145. :param resp_kwargs: task response data
  1146. :type resp_kwargs: dict
  1147. :param failed_nodes_msg: error message part if the task has not
  1148. due to underlying command execution error; is formatted by
  1149. node name
  1150. :type failed_nodes_msg: str
  1151. :param failed_repos_msg: error message part if connection to the
  1152. repositories cannot be established; is formatted by list of names
  1153. of the repositories
  1154. :type failed_repos_msg: str
  1155. :param err_msg: general error message part
  1156. :type err_msg: str
  1157. """
  1158. task_uuid = resp_kwargs.get('task_uuid')
  1159. response = resp_kwargs.get('nodes', [])
  1160. status = consts.TASK_STATUSES.ready
  1161. progress = 100
  1162. task = cls.acquire_lock(task_uuid)
  1163. if not task:
  1164. return
  1165. failed_response_nodes = {
  1166. n['uid']: n for n in response if n['status'] != 0
  1167. }
  1168. failed_nodes = []
  1169. failed_repos = set()
  1170. master = failed_response_nodes.pop(consts.MASTER_NODE_UID, None)
  1171. if master is not None:
  1172. failed_repos.update(cls._get_failed_repos(master))
  1173. failed_nodes.append(consts.MASTER_NODE_NAME)
  1174. nodes = objects.NodeCollection.filter_by_list(
  1175. None, 'id', failed_response_nodes, order_by='id')
  1176. for node in nodes:
  1177. failed_repos.update(cls._get_failed_repos(
  1178. failed_response_nodes[node.uid]))
  1179. failed_nodes.append(node.name)
  1180. err_msg = ''
  1181. failed_repos_msg = (
  1182. 'Following repos are not available - {0}.\n '
  1183. )
  1184. if failed_nodes:
  1185. err_msg = failed_nodes_msg.format(', '.join(failed_nodes))
  1186. if failed_repos:
  1187. err_msg += failed_repos_msg.format(', '.join(failed_repos))
  1188. if err_msg and suggestion_msg:
  1189. err_msg += suggestion_msg
  1190. if err_msg:
  1191. status = consts.TASK_STATUSES.error
  1192. objects.Task.update_verify_networks(
  1193. task, status, progress, err_msg, {})
  1194. @classmethod
  1195. def check_repositories_resp(cls, **kwargs):
  1196. logger.info(
  1197. "RPC method check_repositories_resp received: %s",
  1198. jsonutils.dumps(kwargs)
  1199. )
  1200. failed_nodes_msg = (
  1201. 'Repo availability verification'
  1202. ' failed on following nodes {0}.\n '
  1203. )
  1204. cls._check_repos_connectivity(kwargs, failed_nodes_msg)
  1205. @classmethod
  1206. def check_repositories_with_setup_resp(cls, **kwargs):
  1207. logger.info(
  1208. "RPC method check_repositories_with_setup received: %s",
  1209. jsonutils.dumps(kwargs)
  1210. )
  1211. failed_nodes_msg = (
  1212. 'Repo availability verification using public network'
  1213. ' failed on following nodes {0}.\n '
  1214. )
  1215. suggestion_msg = (
  1216. 'Check your public network settings and '
  1217. 'availability of the repositories from public network. '
  1218. 'Please examine nailgun and astute'
  1219. ' logs for additional details.'
  1220. )
  1221. cls._check_repos_connectivity(kwargs, failed_nodes_msg,
  1222. suggestion_msg)
  1223. @classmethod
  1224. def base_resp(cls, **kwargs):
  1225. logger.info("RPC method base_resp received: %s",
  1226. jsonutils.dumps(kwargs))
  1227. task_uuid = kwargs.get('task_uuid')
  1228. status = kwargs.get('status')
  1229. error = kwargs.get('error', '')
  1230. message = kwargs.get('msg', '')
  1231. task = cls.acquire_lock(task_uuid)
  1232. if not task:
  1233. return
  1234. data = {'status': status, 'progress': 100, 'message': message}
  1235. if status == consts.TASK_STATUSES.error:
  1236. logger.error("Task %s, id: %s failed: %s",
  1237. task.name, task.id, error)
  1238. data['message'] = error
  1239. objects.Task.update(task, data)
  1240. cls._update_action_log_entry(status, task.name, task_uuid, [])