OpenStack Orchestration (Heat)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

stack.py 92KB


  1. #
  2. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  3. # not use this file except in compliance with the License. You may obtain
  4. # a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  10. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  11. # License for the specific language governing permissions and limitations
  12. # under the License.
  13. import collections
  14. import contextlib
  15. import copy
  16. import eventlet
  17. import functools
  18. import re
  19. import warnings
  20. from oslo_config import cfg
  21. from oslo_log import log as logging
  22. from oslo_utils import excutils
  23. from oslo_utils import timeutils as oslo_timeutils
  24. from oslo_utils import uuidutils
  25. from osprofiler import profiler
  26. import six
  27. from heat.common import context as common_context
  28. from heat.common import environment_format as env_fmt
  29. from heat.common import exception
  30. from heat.common.i18n import _
  31. from heat.common import identifier
  32. from heat.common import lifecycle_plugin_utils
  33. from heat.engine import api
  34. from heat.engine import dependencies
  35. from heat.engine import environment
  36. from heat.engine import event
  37. from heat.engine.notification import stack as notification
  38. from heat.engine import parameter_groups as param_groups
  39. from heat.engine import parent_rsrc
  40. from heat.engine import resource
  41. from heat.engine import resources
  42. from heat.engine import scheduler
  43. from heat.engine import status
  44. from heat.engine import stk_defn
  45. from heat.engine import sync_point
  46. from heat.engine import template as tmpl
  47. from heat.engine import update
  48. from heat.objects import raw_template as raw_template_object
  49. from heat.objects import resource as resource_objects
  50. from heat.objects import snapshot as snapshot_object
  51. from heat.objects import stack as stack_object
  52. from heat.objects import stack_tag as stack_tag_object
  53. from heat.objects import user_creds as ucreds_object
  54. from heat.rpc import api as rpc_api
  55. from heat.rpc import worker_client as rpc_worker_client
  56. LOG = logging.getLogger(__name__)
  57. ConvergenceNode = collections.namedtuple('ConvergenceNode',
  58. ['rsrc_id', 'is_update'])
  59. class ForcedCancel(Exception):
  60. """Exception raised to cancel task execution."""
  61. def __init__(self, with_rollback=True):
  62. self.with_rollback = with_rollback
  63. def __str__(self):
  64. return "Operation cancelled"
  65. def reset_state_on_error(func):
  66. @six.wraps(func)
  67. def handle_exceptions(stack, *args, **kwargs):
  68. errmsg = None
  69. try:
  70. return func(stack, *args, **kwargs)
  71. except Exception as exc:
  72. with excutils.save_and_reraise_exception():
  73. errmsg = six.text_type(exc)
  74. LOG.error('Unexpected exception in %(func)s: %(msg)s',
  75. {'func': func.__name__, 'msg': errmsg})
  76. except BaseException as exc:
  77. with excutils.save_and_reraise_exception():
  78. exc_type = type(exc).__name__
  79. errmsg = '%s(%s)' % (exc_type, six.text_type(exc))
  80. LOG.info('Stopped due to %(msg)s in %(func)s',
  81. {'func': func.__name__, 'msg': errmsg})
  82. finally:
  83. if ((not stack.convergence or errmsg is not None) and
  84. stack.status == stack.IN_PROGRESS):
  85. rtnmsg = _("Unexpected exit while IN_PROGRESS.")
  86. stack.mark_failed(errmsg if errmsg is not None else rtnmsg)
  87. assert errmsg is not None, "Returned while IN_PROGRESS."
  88. return handle_exceptions
  89. @six.python_2_unicode_compatible
  90. class Stack(collections.Mapping):
  91. ACTIONS = (
  92. CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND, RESUME, ADOPT,
  93. SNAPSHOT, CHECK, RESTORE
  94. ) = (
  95. 'CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND', 'RESUME', 'ADOPT',
  96. 'SNAPSHOT', 'CHECK', 'RESTORE'
  97. )
  98. STATUSES = (IN_PROGRESS, FAILED, COMPLETE
  99. ) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
  100. _zones = None
  101. def __init__(self, context, stack_name, tmpl,
  102. stack_id=None, action=None, status=None,
  103. status_reason='', timeout_mins=None,
  104. disable_rollback=True, parent_resource=None, owner_id=None,
  105. adopt_stack_data=None, stack_user_project_id=None,
  106. created_time=None, updated_time=None,
  107. user_creds_id=None, tenant_id=None,
  108. use_stored_context=False, username=None,
  109. nested_depth=0, strict_validate=True, convergence=False,
  110. current_traversal=None, tags=None, prev_raw_template_id=None,
  111. current_deps=None, cache_data=None,
  112. deleted_time=None, converge=False):
  113. """Initialise the Stack.
  114. Initialise from a context, name, Template object and (optionally)
  115. Environment object. The database ID may also be initialised, if the
  116. stack is already in the database.
  117. Creating a stack with cache_data creates a lightweight stack which
  118. will not load any resources from the database and resolve the
  119. functions from the cache_data specified.
  120. """
  121. def _validate_stack_name(name):
  122. try:
  123. if not re.match("[a-zA-Z][a-zA-Z0-9_.-]{0,254}$", name):
  124. message = _('Invalid stack name %s must contain '
  125. 'only alphanumeric or \"_-.\" characters, '
  126. 'must start with alpha and must be 255 '
  127. 'characters or less.') % name
  128. raise exception.StackValidationFailed(message=message)
  129. except TypeError:
  130. message = _('Invalid stack name %s, must be a string') % name
  131. raise exception.StackValidationFailed(message=message)
  132. if owner_id is None:
  133. _validate_stack_name(stack_name)
  134. self.id = stack_id
  135. self.owner_id = owner_id
  136. self.context = context
  137. self.name = stack_name
  138. self.action = (self.ADOPT if adopt_stack_data else
  139. self.CREATE if action is None else action)
  140. self.status = self.IN_PROGRESS if status is None else status
  141. self.status_reason = status_reason
  142. self.timeout_mins = timeout_mins
  143. self.disable_rollback = disable_rollback
  144. self._outputs = None
  145. self._resources = None
  146. self._dependencies = None
  147. self._implicit_deps_loaded = False
  148. self._access_allowed_handlers = {}
  149. self._db_resources = None
  150. self._tags = tags
  151. self.adopt_stack_data = adopt_stack_data
  152. self.stack_user_project_id = stack_user_project_id
  153. self.created_time = created_time
  154. self.updated_time = updated_time
  155. self.deleted_time = deleted_time
  156. self.user_creds_id = user_creds_id
  157. self.nested_depth = nested_depth
  158. self.convergence = convergence
  159. self.current_traversal = current_traversal
  160. self.tags = tags
  161. self.prev_raw_template_id = prev_raw_template_id
  162. self.current_deps = current_deps
  163. self._worker_client = None
  164. self._convg_deps = None
  165. self.thread_group_mgr = None
  166. self.converge = converge
  167. # strict_validate can be used to disable value validation
  168. # in the resource properties schema, this is useful when
  169. # performing validation when properties reference attributes
  170. # for not-yet-created resources (which return None)
  171. self.strict_validate = strict_validate
  172. self.in_convergence_check = cache_data is not None
  173. if use_stored_context:
  174. self.context = self.stored_context()
  175. self.clients = self.context.clients
  176. # This will use the provided tenant ID when loading the stack
  177. # from the DB or get it from the context for new stacks.
  178. self.tenant_id = tenant_id or self.context.tenant_id
  179. self.username = username or self.context.username
  180. resources.initialise()
  181. parent_info = parent_rsrc.ParentResourceProxy(context,
  182. parent_resource,
  183. owner_id)
  184. if tmpl is not None:
  185. self.defn = stk_defn.StackDefinition(context, tmpl,
  186. self.identifier(),
  187. cache_data or {},
  188. parent_info)
  189. else:
  190. self.defn = None
  191. @property
  192. def tags(self):
  193. if self._tags is None:
  194. tags = stack_tag_object.StackTagList.get(
  195. self.context, self.id)
  196. if tags:
  197. self._tags = [t.tag for t in tags]
  198. return self._tags
  199. @tags.setter
  200. def tags(self, value):
  201. self._tags = value
  202. @property
  203. def worker_client(self):
  204. """Return a client for making engine RPC calls."""
  205. if not self._worker_client:
  206. self._worker_client = rpc_worker_client.WorkerClient()
  207. return self._worker_client
  208. @property
  209. def t(self):
  210. """The stack template."""
  211. if self.defn is None:
  212. return None
  213. return self.defn.t
  214. @t.setter
  215. def t(self, tmpl):
  216. """Set the stack template."""
  217. self.defn = self.defn.clone_with_new_template(tmpl, self.identifier())
  218. @property
  219. def parameters(self):
  220. return self.defn.parameters
  221. @property
  222. def env(self):
  223. """The stack environment"""
  224. return self.defn.env
  225. @property
  226. def parent_resource_name(self):
  227. parent_info = self.defn.parent_resource
  228. return parent_info and parent_info.name
  229. @property
  230. def parent_resource(self):
  231. """Dynamically load up the parent_resource.
  232. Note: this should only be used by "Fn::ResourceFacade"
  233. """
  234. return self.defn.parent_resource
  235. def set_parent_stack(self, parent_stack):
  236. parent_info = self.defn.parent_resource
  237. if parent_info is not None:
  238. parent_rsrc.use_parent_stack(parent_info, parent_stack)
  239. def stored_context(self):
  240. if self.user_creds_id:
  241. creds_obj = ucreds_object.UserCreds.get_by_id(
  242. self.context, self.user_creds_id)
  243. # Maintain request_id from self.context so we retain traceability
  244. # in situations where servicing a request requires switching from
  245. # the request context to the stored context
  246. creds = creds_obj.obj_to_primitive()["versioned_object.data"]
  247. creds['request_id'] = self.context.request_id
  248. # We don't store roles in the user_creds table, so disable the
  249. # policy check for admin by setting is_admin=False.
  250. creds['is_admin'] = False
  251. creds['overwrite'] = False
  252. return common_context.StoredContext.from_dict(creds)
  253. else:
  254. msg = _("Attempt to use stored_context with no user_creds")
  255. raise exception.Error(msg)
  256. @property
  257. def outputs(self):
  258. return {n: self.defn.output_definition(n)
  259. for n in self.defn.enabled_output_names()}
  260. def _resources_for_defn(self, stack_defn):
  261. return {
  262. name: resource.Resource(name,
  263. stack_defn.resource_definition(name),
  264. self)
  265. for name in stack_defn.enabled_rsrc_names()
  266. }
  267. @property
  268. def resources(self):
  269. if self._resources is None:
  270. self._resources = self._resources_for_defn(self.defn)
  271. return self._resources
  272. def _update_all_resource_data(self, for_resources, for_outputs):
  273. for rsrc in self._explicit_dependencies():
  274. node_data = rsrc.node_data(for_resources=for_resources,
  275. for_outputs=for_outputs)
  276. stk_defn.update_resource_data(self.defn, rsrc.name, node_data)
  277. def _find_filtered_resources(self, filters=None):
  278. if filters:
  279. assert not self.in_convergence_check, \
  280. "Resources should not be loaded from the DB"
  281. resources = resource_objects.Resource.get_all_by_stack(
  282. self.context, self.id, filters)
  283. else:
  284. resources = self._db_resources_get()
  285. stk_def_cache = {}
  286. for rsc in six.itervalues(resources):
  287. loaded_res = self._resource_from_db_resource(rsc, stk_def_cache)
  288. if loaded_res is not None:
  289. yield loaded_res
  290. def iter_resources(self, nested_depth=0, filters=None):
  291. """Iterates over all the resources in a stack.
  292. Iterating includes nested stacks up to `nested_depth` levels below.
  293. """
  294. for res in self._find_filtered_resources(filters):
  295. yield res
  296. resources = self._find_filtered_resources()
  297. for res in resources:
  298. if not res.has_nested() or nested_depth == 0:
  299. continue
  300. nested_stack = res.nested()
  301. if nested_stack is None:
  302. continue
  303. for nested_res in nested_stack.iter_resources(nested_depth - 1,
  304. filters):
  305. yield nested_res
  306. def db_active_resources_get(self):
  307. resources = resource_objects.Resource.get_all_active_by_stack(
  308. self.context, self.id)
  309. return resources or None
  310. def db_resource_get(self, name):
  311. if self.id is None:
  312. return None
  313. return self._db_resources_get().get(name)
  314. def _db_resources_get(self):
  315. if self._db_resources is None:
  316. assert not self.in_convergence_check, \
  317. "Resources should not be loaded from the DB"
  318. _db_resources = resource_objects.Resource.get_all_by_stack(
  319. self.context, self.id)
  320. if not _db_resources:
  321. return {}
  322. self._db_resources = _db_resources
  323. return self._db_resources
  324. @contextlib.contextmanager
  325. def _previous_definition(self, stk_defn):
  326. cur_defn = self.defn
  327. try:
  328. self.defn = stk_defn
  329. yield
  330. finally:
  331. self.defn = cur_defn
  332. def _resource_from_db_resource(self, db_res, stk_def_cache=None):
  333. tid = db_res.current_template_id
  334. if tid is None:
  335. tid = self.t.id
  336. if tid == self.t.id:
  337. cur_res = self.resources.get(db_res.name)
  338. if cur_res is not None and (cur_res.id == db_res.id):
  339. return cur_res
  340. stk_def = self.defn
  341. elif stk_def_cache and tid in stk_def_cache:
  342. stk_def = stk_def_cache[tid]
  343. else:
  344. try:
  345. t = tmpl.Template.load(self.context, tid)
  346. except exception.NotFound:
  347. return None
  348. stk_def = self.defn.clone_with_new_template(t,
  349. self.identifier())
  350. if stk_def_cache is not None:
  351. stk_def_cache[tid] = stk_def
  352. try:
  353. defn = stk_def.resource_definition(db_res.name)
  354. except KeyError:
  355. return None
  356. with self._previous_definition(stk_def):
  357. res = resource.Resource(db_res.name, defn, self)
  358. res._load_data(db_res)
  359. return res
  360. def resource_get(self, name):
  361. """Return a stack resource, even if not in the current template."""
  362. res = self.resources.get(name)
  363. if res:
  364. return res
  365. # fall back to getting the resource from the database
  366. db_res = self.db_resource_get(name)
  367. if db_res:
  368. return self._resource_from_db_resource(db_res)
  369. return None
  370. @property
  371. def dependencies(self):
  372. if not self._implicit_deps_loaded:
  373. self._explicit_dependencies()
  374. self._add_implicit_dependencies(self._dependencies,
  375. ignore_errors=self.id is not None)
  376. self._implicit_deps_loaded = True
  377. return self._dependencies
  378. def reset_dependencies(self):
  379. self._implicit_deps_loaded = False
  380. self._dependencies = None
  381. def root_stack_id(self):
  382. if not self.owner_id:
  383. return self.id
  384. return stack_object.Stack.get_root_id(self.context, self.owner_id)
  385. def object_path_in_stack(self):
  386. """Return stack resources and stacks in path from the root stack.
  387. If this is not nested return (None, self), else return stack resources
  388. and stacks in path from the root stack and including this stack.
  389. Note that this is horribly inefficient, as it requires us to load every
  390. stack in the chain back to the root in memory at the same time.
  391. :returns: a list of (stack_resource, stack) tuples.
  392. """
  393. if self.parent_resource:
  394. parent_stack = self.parent_resource._stack()
  395. if parent_stack is not None:
  396. path = parent_stack.object_path_in_stack()
  397. path.extend([(self.parent_resource, self)])
  398. return path
  399. return [(None, self)]
  400. def path_in_stack(self):
  401. """Return tuples of names in path from the root stack.
  402. If this is not nested return (None, self.name), else return tuples of
  403. names (stack_resource.name, stack.name) in path from the root stack and
  404. including this stack.
  405. :returns: a list of (string, string) tuples.
  406. """
  407. opis = self.object_path_in_stack()
  408. return [(stckres.name if stckres else None,
  409. stck.name if stck else None) for stckres, stck in opis]
  410. def total_resources(self, stack_id=None):
  411. """Return the total number of resources in a stack.
  412. Includes nested stacks below.
  413. """
  414. if not stack_id:
  415. if self.id is None:
  416. # We're not stored yet, so we don't have anything to count
  417. return 0
  418. stack_id = self.id
  419. return stack_object.Stack.count_total_resources(self.context, stack_id)
  420. def _set_param_stackid(self):
  421. """Update self.parameters with the current ARN.
  422. The ARN is then provided via the Parameters class as the StackId pseudo
  423. parameter.
  424. """
  425. if not self.parameters.set_stack_id(self.identifier()):
  426. LOG.warning("Unable to set parameters StackId identifier")
  427. def _explicit_dependencies(self):
  428. """Return dependencies without making any resource plugin calls.
  429. This includes at least all of the dependencies that are explicitly
  430. expressed in the template (via depends_on or an intrinsic function). It
  431. may include implicit dependencies defined by resource plugins, but only
  432. if they have already been calculated.
  433. """
  434. if self._dependencies is None:
  435. deps = dependencies.Dependencies()
  436. for res in six.itervalues(self.resources):
  437. res.add_explicit_dependencies(deps)
  438. self._dependencies = deps
  439. return self._dependencies
  440. def _add_implicit_dependencies(self, deps, ignore_errors=True):
  441. """Augment the given dependencies with implicit ones from plugins."""
  442. for res in six.itervalues(self.resources):
  443. try:
  444. res.add_dependencies(deps)
  445. except Exception as exc:
  446. # Always ignore ValueError/TypeError, as they're likely to
  447. # have come from trying to read invalid property values that
  448. # haven't been validated yet.
  449. if not (ignore_errors or
  450. isinstance(exc, (ValueError, TypeError))):
  451. raise
  452. else:
  453. LOG.warning('Ignoring error adding implicit '
  454. 'dependencies for %(res)s: %(err)s',
  455. {'res': six.text_type(res),
  456. 'err': six.text_type(exc)})
  457. @classmethod
  458. def load(cls, context, stack_id=None, stack=None, show_deleted=True,
  459. use_stored_context=False, force_reload=False, cache_data=None,
  460. load_template=True):
  461. """Retrieve a Stack from the database."""
  462. if stack is None:
  463. stack = stack_object.Stack.get_by_id(
  464. context,
  465. stack_id,
  466. show_deleted=show_deleted)
  467. if stack is None:
  468. message = _('No stack exists with id "%s"') % str(stack_id)
  469. raise exception.NotFound(message)
  470. if force_reload:
  471. stack.refresh()
  472. return cls._from_db(context, stack,
  473. use_stored_context=use_stored_context,
  474. cache_data=cache_data,
  475. load_template=load_template)
  476. @classmethod
  477. def load_all(cls, context, limit=None, marker=None, sort_keys=None,
  478. sort_dir=None, filters=None,
  479. show_deleted=False,
  480. show_nested=False, show_hidden=False, tags=None,
  481. tags_any=None, not_tags=None, not_tags_any=None):
  482. stacks = stack_object.Stack.get_all(
  483. context,
  484. limit=limit,
  485. sort_keys=sort_keys,
  486. marker=marker,
  487. sort_dir=sort_dir,
  488. filters=filters,
  489. show_deleted=show_deleted,
  490. show_nested=show_nested,
  491. show_hidden=show_hidden,
  492. tags=tags,
  493. tags_any=tags_any,
  494. not_tags=not_tags,
  495. not_tags_any=not_tags_any,
  496. eager_load=True)
  497. for stack in stacks:
  498. try:
  499. yield cls._from_db(context, stack)
  500. except exception.NotFound:
  501. # We're in a different transaction than the get_all, so a stack
  502. # returned above can be deleted by the time we try to load it.
  503. pass
  504. @classmethod
  505. def _from_db(cls, context, stack,
  506. use_stored_context=False, cache_data=None,
  507. load_template=True):
  508. if load_template:
  509. template = tmpl.Template.load(
  510. context, stack.raw_template_id, stack.raw_template)
  511. else:
  512. template = None
  513. return cls(context, stack.name, template,
  514. stack_id=stack.id,
  515. action=stack.action, status=stack.status,
  516. status_reason=stack.status_reason,
  517. timeout_mins=stack.timeout,
  518. disable_rollback=stack.disable_rollback,
  519. parent_resource=stack.parent_resource_name,
  520. owner_id=stack.owner_id,
  521. stack_user_project_id=stack.stack_user_project_id,
  522. created_time=stack.created_at,
  523. updated_time=stack.updated_at,
  524. user_creds_id=stack.user_creds_id, tenant_id=stack.tenant,
  525. use_stored_context=use_stored_context,
  526. username=stack.username, convergence=stack.convergence,
  527. current_traversal=stack.current_traversal,
  528. prev_raw_template_id=stack.prev_raw_template_id,
  529. current_deps=stack.current_deps, cache_data=cache_data,
  530. nested_depth=stack.nested_depth,
  531. deleted_time=stack.deleted_at)
  532. def get_kwargs_for_cloning(self, keep_status=False, only_db=False,
  533. keep_tags=False):
  534. """Get common kwargs for calling Stack() for cloning.
  535. The point of this method is to reduce the number of places that we
  536. need to update when a kwarg to Stack.__init__() is modified. It
  537. is otherwise easy to forget an option and cause some unexpected
  538. error if this option is lost.
  539. Note:
  540. - This doesn't return the args(name, template) but only the kwargs.
  541. - We often want to start 'fresh' so don't want to maintain the old
  542. status, action and status_reason.
  543. - We sometimes only want the DB attributes.
  544. """
  545. stack = {
  546. 'owner_id': self.owner_id,
  547. 'username': self.username,
  548. 'disable_rollback': self.disable_rollback,
  549. 'stack_user_project_id': self.stack_user_project_id,
  550. 'user_creds_id': self.user_creds_id,
  551. 'nested_depth': self.nested_depth,
  552. 'convergence': self.convergence,
  553. 'current_traversal': self.current_traversal,
  554. 'prev_raw_template_id': self.prev_raw_template_id,
  555. 'current_deps': self.current_deps
  556. }
  557. if keep_status:
  558. stack.update({
  559. 'action': self.action,
  560. 'status': self.status,
  561. 'status_reason': six.text_type(self.status_reason)})
  562. if only_db:
  563. stack['parent_resource_name'] = self.parent_resource_name
  564. stack['tenant'] = self.tenant_id
  565. stack['timeout'] = self.timeout_mins
  566. else:
  567. stack['parent_resource'] = self.parent_resource_name
  568. stack['tenant_id'] = self.tenant_id
  569. stack['timeout_mins'] = self.timeout_mins
  570. stack['strict_validate'] = self.strict_validate
  571. if keep_tags:
  572. stack['tags'] = self.tags
  573. return stack
  574. @profiler.trace('Stack.store', hide_args=False)
  575. def store(self, backup=False, exp_trvsl=None,
  576. ignore_traversal_check=False):
  577. """Store the stack in the database and return its ID.
  578. If self.id is set, we update the existing stack.
  579. """
  580. s = self.get_kwargs_for_cloning(keep_status=True, only_db=True)
  581. s['name'] = self.name
  582. s['backup'] = backup
  583. s['updated_at'] = self.updated_time
  584. if self.t.id is None:
  585. stack_object.Stack.encrypt_hidden_parameters(self.t)
  586. s['raw_template_id'] = self.t.store(self.context)
  587. else:
  588. s['raw_template_id'] = self.t.id
  589. if self.id is not None:
  590. if exp_trvsl is None and not ignore_traversal_check:
  591. exp_trvsl = self.current_traversal
  592. if self.convergence:
  593. # do things differently for convergence
  594. updated = stack_object.Stack.select_and_update(
  595. self.context, self.id, s, exp_trvsl=exp_trvsl)
  596. if not updated:
  597. return None
  598. else:
  599. stack_object.Stack.update_by_id(self.context, self.id, s)
  600. else:
  601. if not self.user_creds_id:
  602. # Create a context containing a trust_id and trustor_user_id
  603. # if trusts are enabled
  604. if cfg.CONF.deferred_auth_method == 'trusts':
  605. keystone = self.clients.client('keystone')
  606. trust_ctx = keystone.create_trust_context()
  607. new_creds = ucreds_object.UserCreds.create(trust_ctx)
  608. else:
  609. new_creds = ucreds_object.UserCreds.create(self.context)
  610. s['user_creds_id'] = new_creds.id
  611. self.user_creds_id = new_creds.id
  612. if self.convergence:
  613. # create a traversal ID
  614. self.current_traversal = uuidutils.generate_uuid()
  615. s['current_traversal'] = self.current_traversal
  616. new_s = stack_object.Stack.create(self.context, s)
  617. self.id = new_s.id
  618. self.created_time = new_s.created_at
  619. if self.tags:
  620. stack_tag_object.StackTagList.set(self.context, self.id, self.tags)
  621. self._set_param_stackid()
  622. return self.id
  623. def _backup_name(self):
  624. return '%s*' % self.name
  625. def identifier(self):
  626. """Return an identifier for this stack."""
  627. return identifier.HeatIdentifier(self.tenant_id, self.name, self.id)
  628. def __iter__(self):
  629. """Return an iterator over the resource names."""
  630. return iter(self.resources)
  631. def __len__(self):
  632. """Return the number of resources."""
  633. return len(self.resources)
  634. def __getitem__(self, key):
  635. """Get the resource with the specified name."""
  636. return self.resources[key]
  637. def add_resource(self, resource):
  638. """Insert the given resource into the stack."""
  639. resource._rsrc_prop_data_id = None
  640. template = resource.stack.t
  641. resource.stack = self
  642. definition = resource.t.reparse(self.defn, template)
  643. resource.t = definition
  644. resource.reparse()
  645. self.resources[resource.name] = resource
  646. stk_defn.add_resource(self.defn, definition)
  647. if self.t.id is not None:
  648. self.t.store(self.context)
  649. resource.store()
  650. def remove_resource(self, resource_name):
  651. """Remove the resource with the specified name."""
  652. del self.resources[resource_name]
  653. stk_defn.remove_resource(self.defn, resource_name)
  654. if self.t.id is not None:
  655. self.t.store(self.context)
  656. def __contains__(self, key):
  657. """Determine whether the stack contains the specified resource."""
  658. if self._resources is not None:
  659. return key in self.resources
  660. else:
  661. return key in self.t[self.t.RESOURCES]
  662. def __eq__(self, other):
  663. """Compare two Stacks for equality.
  664. Stacks are considered equal only if they are identical.
  665. """
  666. return self is other
  667. def __ne__(self, other):
  668. return not self.__eq__(other)
  669. def __str__(self):
  670. """Return a human-readable string representation of the stack."""
  671. text = 'Stack "%s" [%s]' % (self.name, self.id)
  672. return six.text_type(text)
  673. def resource_by_refid(self, refid):
  674. """Return the resource in this stack with the specified refid.
  675. :returns: resource in this stack with the specified refid, or None if
  676. not found.
  677. """
  678. for r in six.itervalues(self):
  679. if r.state not in ((r.INIT, r.COMPLETE),
  680. (r.CREATE, r.IN_PROGRESS),
  681. (r.CREATE, r.COMPLETE),
  682. (r.RESUME, r.IN_PROGRESS),
  683. (r.RESUME, r.COMPLETE),
  684. (r.UPDATE, r.IN_PROGRESS),
  685. (r.UPDATE, r.COMPLETE),
  686. (r.CHECK, r.COMPLETE)):
  687. continue
  688. proxy = self.defn[r.name]
  689. if proxy._resource_data is None:
  690. matches = r.FnGetRefId() == refid or r.name == refid
  691. else:
  692. matches = proxy.FnGetRefId() == refid
  693. if matches:
  694. if self.in_convergence_check and r.id is not None:
  695. # We don't have resources loaded from the database at this
  696. # point, so load the data for just this one from the DB.
  697. db_res = resource_objects.Resource.get_obj(self.context,
  698. r.id)
  699. if db_res is not None:
  700. r._load_data(db_res)
  701. return r
  702. def register_access_allowed_handler(self, credential_id, handler):
  703. """Register an authorization handler function.
  704. Register a function which determines whether the credentials with a
  705. given ID can have access to a named resource.
  706. """
  707. assert callable(handler), 'Handler is not callable'
  708. self._access_allowed_handlers[credential_id] = handler
  709. def access_allowed(self, credential_id, resource_name):
  710. """Is credential_id authorised to access resource by resource_name."""
  711. if not self.resources or resource_name not in self.resources:
  712. # this handle the case that sd in action delete,
  713. # try to load access_allowed_handlers if resources object
  714. # haven't been loaded.
  715. [res.name for res in self.iter_resources()]
  716. handler = self._access_allowed_handlers.get(credential_id)
  717. return handler and handler(resource_name)
  718. @profiler.trace('Stack.validate', hide_args=False)
  719. def validate(self, ignorable_errors=None, validate_res_tmpl_only=False):
  720. """Validates the stack."""
  721. # TODO(sdake) Should return line number of invalid reference
  722. # validate overall template (top-level structure)
  723. self.t.validate()
  724. # Validate parameters
  725. self.parameters.validate(context=self.context,
  726. validate_value=self.strict_validate)
  727. # Validate Parameter Groups
  728. parameter_groups = param_groups.ParameterGroups(self.t)
  729. parameter_groups.validate()
  730. # Continue to call this function, since old third-party Template
  731. # plugins may depend on it being called to validate the resource
  732. # definitions before actually generating them.
  733. if (type(self.t).validate_resource_definitions !=
  734. tmpl.Template.validate_resource_definitions):
  735. warnings.warn("The Template.validate_resource_definitions() "
  736. "method is deprecated and will no longer be called "
  737. "in future versions of Heat. Template subclasses "
  738. "should validate resource definitions in the "
  739. "resource_definitions() method.",
  740. DeprecationWarning)
  741. self.t.validate_resource_definitions(self)
  742. self.t.conditions(self).validate()
  743. # Load the resources definitions (success of which implies the
  744. # definitions are valid)
  745. resources = self.resources
  746. # Check duplicate names between parameters and resources
  747. dup_names = set(self.parameters) & set(resources)
  748. if dup_names:
  749. LOG.debug("Duplicate names %s" % dup_names)
  750. raise exception.StackValidationFailed(
  751. message=_("Duplicate names %s") % dup_names)
  752. self._update_all_resource_data(for_resources=True, for_outputs=True)
  753. if self.strict_validate:
  754. iter_rsc = self.dependencies
  755. else:
  756. iter_rsc = self._explicit_dependencies()
  757. unique_defns = set(res.t for res in six.itervalues(resources))
  758. unique_defn_names = set(defn.name for defn in unique_defns)
  759. for res in iter_rsc:
  760. # Don't validate identical definitions multiple times
  761. if res.name not in unique_defn_names:
  762. continue
  763. result = None
  764. try:
  765. if not validate_res_tmpl_only:
  766. if res.external_id is not None:
  767. res.validate_external()
  768. continue
  769. result = res.validate()
  770. elif res.external_id is None:
  771. result = res.validate_template()
  772. except exception.HeatException as ex:
  773. LOG.debug('%s', ex)
  774. if ignorable_errors and ex.error_code in ignorable_errors:
  775. result = None
  776. else:
  777. raise
  778. except AssertionError:
  779. raise
  780. except Exception as ex:
  781. LOG.info("Exception in stack validation",
  782. exc_info=True)
  783. raise exception.StackValidationFailed(error=ex,
  784. resource=res)
  785. if result:
  786. raise exception.StackValidationFailed(message=result)
  787. eventlet.sleep(0)
  788. for op_name, output in six.iteritems(self.outputs):
  789. try:
  790. output.validate()
  791. except exception.StackValidationFailed as ex:
  792. path = [self.t.OUTPUTS, op_name,
  793. self.t.get_section_name(ex.path[0])]
  794. path.extend(ex.path[1:])
  795. raise exception.StackValidationFailed(
  796. error=ex.error,
  797. path=path,
  798. message=ex.error_message)
  799. def requires_deferred_auth(self):
  800. """Determine whether to perform API requests with deferred auth.
  801. Returns whether this stack may need to perform API requests
  802. during its lifecycle using the configured deferred authentication
  803. method.
  804. """
  805. return any(res.requires_deferred_auth for res in six.itervalues(self))
  806. def _add_event(self, action, status, reason):
  807. """Add a state change event to the database."""
  808. ev = event.Event(self.context, self, action, status, reason,
  809. self.id, None, None,
  810. self.name, 'OS::Heat::Stack')
  811. ev.store()
  812. self.dispatch_event(ev)
  813. def dispatch_event(self, ev):
  814. def _dispatch(ctx, sinks, ev):
  815. try:
  816. for sink in sinks:
  817. sink.consume(ctx, ev)
  818. except Exception as e:
  819. LOG.debug('Got error sending events %s', e)
  820. if self.thread_group_mgr is not None:
  821. self.thread_group_mgr.start(self.id, _dispatch,
  822. self.context,
  823. self.env.get_event_sinks(),
  824. ev.as_dict())
  825. def defer_state_persist(self):
  826. """Return whether to defer persisting the state.
  827. If persistence is deferred, the new state will not be written to the
  828. database until the stack lock is released (by calling
  829. persist_state_and_release_lock()). This prevents races in the legacy
  830. path where an observer sees the stack COMPLETE but an engine still
  831. holds the lock.
  832. """
  833. if self.status == self.IN_PROGRESS:
  834. # Always persist IN_PROGRESS immediately
  835. return False
  836. if (self.convergence and
  837. self.action in {self.UPDATE, self.DELETE, self.CREATE,
  838. self.ADOPT, self.ROLLBACK, self.RESTORE}):
  839. # These operations do not use the stack lock in convergence, so
  840. # never defer.
  841. return False
  842. return self.action not in {self.UPDATE, self.DELETE, self.ROLLBACK,
  843. self.RESTORE}
  844. @profiler.trace('Stack.state_set', hide_args=False)
  845. def state_set(self, action, status, reason):
  846. """Update the stack state."""
  847. if action not in self.ACTIONS:
  848. raise ValueError(_("Invalid action %s") % action)
  849. if status not in self.STATUSES:
  850. raise ValueError(_("Invalid status %s") % status)
  851. self.action = action
  852. self.status = status
  853. self.status_reason = reason
  854. self._log_status()
  855. if not self.defer_state_persist():
  856. updated = self._persist_state()
  857. if self.convergence and not updated:
  858. LOG.info("Stack %(name)s traversal %(trvsl_id)s no longer "
  859. "active; not setting state to %(action)s_%(status)s",
  860. {'name': self.name,
  861. 'trvsl_id': self.current_traversal,
  862. 'action': action, 'status': status})
  863. return updated
  864. def _log_status(self):
  865. LOG.info('Stack %(action)s %(status)s (%(name)s): %(reason)s',
  866. {'action': self.action,
  867. 'status': self.status,
  868. 'name': self.name,
  869. 'reason': self.status_reason})
  870. def _persist_state(self):
  871. """Persist stack state to database"""
  872. if self.id is None:
  873. return
  874. stack = stack_object.Stack.get_by_id(self.context, self.id,
  875. eager_load=False)
  876. if stack is not None:
  877. values = {'action': self.action,
  878. 'status': self.status,
  879. 'status_reason': six.text_type(self.status_reason)}
  880. self._send_notification_and_add_event()
  881. if self.convergence:
  882. # do things differently for convergence
  883. updated = stack_object.Stack.select_and_update(
  884. self.context, self.id, values,
  885. exp_trvsl=self.current_traversal)
  886. return updated
  887. else:
  888. stack.update_and_save(values)
  889. def _send_notification_and_add_event(self):
  890. LOG.debug('Persisting stack %(name)s status %(action)s %(status)s',
  891. {'action': self.action,
  892. 'status': self.status,
  893. 'name': self.name})
  894. notification.send(self)
  895. self._add_event(self.action, self.status, self.status_reason)
  896. def persist_state_and_release_lock(self, engine_id):
  897. """Persist stack state to database and release stack lock"""
  898. if self.id is None:
  899. return
  900. stack = stack_object.Stack.get_by_id(self.context, self.id,
  901. eager_load=False)
  902. if stack is not None:
  903. values = {'action': self.action,
  904. 'status': self.status,
  905. 'status_reason': six.text_type(self.status_reason)}
  906. self._send_notification_and_add_event()
  907. stack.persist_state_and_release_lock(self.context, self.id,
  908. engine_id, values)
  909. @property
  910. def state(self):
  911. """Returns state, tuple of action, status."""
  912. return (self.action, self.status)
  913. def timeout_secs(self):
  914. """Return the stack action timeout in seconds."""
  915. if self.timeout_mins is None:
  916. return cfg.CONF.stack_action_timeout
  917. return self.timeout_mins * 60
  918. def preview_resources(self):
  919. """Preview the stack with all of the resources."""
  920. return [resource.preview()
  921. for resource in six.itervalues(self.resources)]
  922. def get_nested_parameters(self, filter_func):
  923. """Return nested parameters schema, if any.
  924. This introspects the resources to return the parameters of the nested
  925. stacks. It uses the `get_nested_parameters_stack` API to build the
  926. stack.
  927. """
  928. result = {}
  929. for name, rsrc in six.iteritems(self.resources):
  930. nested = rsrc.get_nested_parameters_stack()
  931. if nested is None:
  932. continue
  933. nested_params = nested.parameters.map(
  934. api.format_validate_parameter,
  935. filter_func=filter_func)
  936. params = {
  937. 'Type': rsrc.type(),
  938. 'Description': nested.t.get('Description', ''),
  939. 'Parameters': nested_params
  940. }
  941. # Add parameter_groups if it is present in nested stack
  942. nested_pg = param_groups.ParameterGroups(nested.t)
  943. if nested_pg.parameter_groups:
  944. params.update({'ParameterGroups': nested_pg.parameter_groups})
  945. params.update(nested.get_nested_parameters(filter_func))
  946. result[name] = params
  947. return {'NestedParameters': result} if result else {}
  948. def _store_resources(self):
  949. for r in reversed(self.dependencies):
  950. if r.action == r.INIT:
  951. r.store()
  952. @profiler.trace('Stack.create', hide_args=False)
  953. @reset_state_on_error
  954. def create(self, msg_queue=None):
  955. """Create the stack and all of the resources."""
  956. def rollback():
  957. if not self.disable_rollback and self.state == (self.CREATE,
  958. self.FAILED):
  959. self.delete(action=self.ROLLBACK)
  960. self._store_resources()
  961. check_message = functools.partial(self._check_for_message, msg_queue)
  962. creator = scheduler.TaskRunner(
  963. self.stack_task, action=self.CREATE,
  964. reverse=False, post_func=rollback)
  965. creator(timeout=self.timeout_secs(), progress_callback=check_message)
  966. def _adopt_kwargs(self, resource):
  967. data = self.adopt_stack_data
  968. if not data or not data.get('resources'):
  969. return {'resource_data': None}
  970. return {'resource_data': data['resources'].get(resource.name)}
  971. @scheduler.wrappertask
  972. def stack_task(self, action, reverse=False, post_func=None,
  973. aggregate_exceptions=False, pre_completion_func=None,
  974. notify=None):
  975. """A task to perform an action on the stack.
  976. All of the resources are traversed in forward or reverse dependency
  977. order.
  978. :param action: action that should be executed with stack resources
  979. :param reverse: define if action on the resources need to be executed
  980. in reverse dependency order
  981. :param post_func: function that need to be executed after
  982. action complete on the stack
  983. :param aggregate_exceptions: define if exceptions should be aggregated
  984. :param pre_completion_func: function that need to be executed right
  985. before action completion; uses stack,
  986. action, status and reason as input
  987. parameters
  988. """
  989. try:
  990. lifecycle_plugin_utils.do_pre_ops(self.context, self,
  991. None, action)
  992. except Exception as e:
  993. self.state_set(action, self.FAILED, e.args[0] if e.args else
  994. 'Failed stack pre-ops: %s' % six.text_type(e))
  995. if callable(post_func):
  996. post_func()
  997. if notify is not None:
  998. # No need to call notify.signal(), because persistence of the
  999. # state is always deferred here.
  1000. assert self.defer_state_persist()
  1001. return
  1002. self.state_set(action, self.IN_PROGRESS,
  1003. 'Stack %s started' % action)
  1004. if notify is not None:
  1005. notify.signal()
  1006. stack_status = self.COMPLETE
  1007. reason = 'Stack %s completed successfully' % action
  1008. action_method = action.lower()
  1009. # If a local _$action_kwargs function exists, call it to get the
  1010. # action specific argument list, otherwise an empty arg list
  1011. handle_kwargs = getattr(self,
  1012. '_%s_kwargs' % action_method,
  1013. lambda x: {})
  1014. @functools.wraps(getattr(resource.Resource, action_method))
  1015. @scheduler.wrappertask
  1016. def resource_action(r):
  1017. # Find e.g resource.create and call it
  1018. handle = getattr(r, action_method)
  1019. yield handle(**handle_kwargs(r))
  1020. if action == self.CREATE:
  1021. stk_defn.update_resource_data(self.defn, r.name, r.node_data())
  1022. def get_error_wait_time(resource):
  1023. return resource.cancel_grace_period()
  1024. action_task = scheduler.DependencyTaskGroup(
  1025. self.dependencies,
  1026. resource_action,
  1027. reverse,
  1028. error_wait_time=get_error_wait_time,
  1029. aggregate_exceptions=aggregate_exceptions)
  1030. try:
  1031. yield action_task()
  1032. except scheduler.Timeout:
  1033. stack_status = self.FAILED
  1034. reason = '%s timed out' % action.title()
  1035. except Exception as ex:
  1036. # We use a catch-all here to ensure any raised exceptions
  1037. # make the stack fail. This is necessary for when
  1038. # aggregate_exceptions is false, as in that case we don't get
  1039. # ExceptionGroup, but the raw exception.
  1040. # see scheduler.py line 395-399
  1041. stack_status = self.FAILED
  1042. reason = 'Resource %s failed: %s' % (action, six.text_type(ex))
  1043. if pre_completion_func:
  1044. pre_completion_func(self, action, stack_status, reason)
  1045. self.state_set(action, stack_status, reason)
  1046. if callable(post_func):
  1047. post_func()
  1048. lifecycle_plugin_utils.do_post_ops(self.context, self, None, action,
  1049. (self.status == self.FAILED))
  1050. @profiler.trace('Stack.check', hide_args=False)
  1051. @reset_state_on_error
  1052. def check(self, notify=None):
  1053. if self.convergence:
  1054. self._update_or_store_resources()
  1055. self.updated_time = oslo_timeutils.utcnow()
  1056. checker = scheduler.TaskRunner(
  1057. self.stack_task, self.CHECK,
  1058. post_func=self.supports_check_action,
  1059. aggregate_exceptions=True,
  1060. notify=notify)
  1061. checker()
  1062. def supports_check_action(self):
  1063. def is_supported(res):
  1064. if res.has_nested() and res.nested():
  1065. return res.nested().supports_check_action()
  1066. else:
  1067. return hasattr(res, 'handle_%s' % res.CHECK.lower())
  1068. all_supported = all(is_supported(res)
  1069. for res in six.itervalues(self.resources))
  1070. if not all_supported:
  1071. msg = ". '%s' not fully supported (see resources)" % self.CHECK
  1072. reason = self.status_reason + msg
  1073. self.state_set(self.CHECK, self.status, reason)
  1074. return all_supported
  1075. @profiler.trace('Stack._backup_stack', hide_args=False)
  1076. def _backup_stack(self, create_if_missing=True):
  1077. """Backup the stack.
  1078. Get a Stack containing any in-progress resources from the previous
  1079. stack state prior to an update.
  1080. """
  1081. s = stack_object.Stack.get_by_name_and_owner_id(
  1082. self.context,
  1083. self._backup_name(),
  1084. owner_id=self.id)
  1085. if s is not None:
  1086. LOG.debug('Loaded existing backup stack')
  1087. return self.load(self.context, stack=s)
  1088. elif create_if_missing:
  1089. kwargs = self.get_kwargs_for_cloning(keep_tags=True)
  1090. kwargs['owner_id'] = self.id
  1091. del(kwargs['prev_raw_template_id'])
  1092. prev = type(self)(self.context, self._backup_name(),
  1093. copy.deepcopy(self.t),
  1094. **kwargs)
  1095. prev.store(backup=True)
  1096. LOG.debug('Created new backup stack')
  1097. return prev
  1098. else:
  1099. return None
  1100. @profiler.trace('Stack.adopt', hide_args=False)
  1101. @reset_state_on_error
  1102. def adopt(self):
  1103. """Adopt existing resources into a new stack."""
  1104. def rollback():
  1105. if not self.disable_rollback and self.state == (self.ADOPT,
  1106. self.FAILED):
  1107. # enter the same flow as abandon and just delete the stack
  1108. for res in six.itervalues(self.resources):
  1109. res.abandon_in_progress = True
  1110. self.delete(action=self.ROLLBACK, abandon=True)
  1111. creator = scheduler.TaskRunner(
  1112. self.stack_task,
  1113. action=self.ADOPT,
  1114. reverse=False,
  1115. post_func=rollback)
  1116. creator(timeout=self.timeout_secs())
  1117. @profiler.trace('Stack.update', hide_args=False)
  1118. @reset_state_on_error
  1119. def update(self, newstack, msg_queue=None, notify=None):
  1120. """Update the stack.
  1121. Compare the current stack with newstack,
  1122. and where necessary create/update/delete the resources until
  1123. this stack aligns with newstack.
  1124. Note update of existing stack resources depends on update
  1125. being implemented in the underlying resource types
  1126. Update will fail if it exceeds the specified timeout. The default is
  1127. 60 minutes, set in the constructor
  1128. """
  1129. self.updated_time = oslo_timeutils.utcnow()
  1130. updater = scheduler.TaskRunner(self.update_task, newstack,
  1131. msg_queue=msg_queue, notify=notify)
  1132. updater()
  1133. @profiler.trace('Stack.converge_stack', hide_args=False)
  1134. @reset_state_on_error
  1135. def converge_stack(self, template, action=UPDATE, new_stack=None,
  1136. pre_converge=None):
  1137. """Update the stack template and trigger convergence for resources."""
  1138. if action not in [self.CREATE, self.ADOPT]:
  1139. # no back-up template for create action
  1140. self.prev_raw_template_id = getattr(self.t, 'id', None)
  1141. # switch template and reset dependencies
  1142. self.defn = self.defn.clone_with_new_template(template,
  1143. self.identifier(),
  1144. clear_resource_data=True)
  1145. self.reset_dependencies()
  1146. self._resources = None
  1147. if action != self.CREATE:
  1148. self.updated_time = oslo_timeutils.utcnow()
  1149. if new_stack is not None:
  1150. self.disable_rollback = new_stack.disable_rollback
  1151. self.timeout_mins = new_stack.timeout_mins
  1152. self.converge = new_stack.converge
  1153. self.defn = new_stack.defn
  1154. self._set_param_stackid()
  1155. self.tags = new_stack.tags
  1156. if new_stack.tags:
  1157. stack_tag_object.StackTagList.set(self.context, self.id,
  1158. new_stack.tags)
  1159. else:
  1160. stack_tag_object.StackTagList.delete(self.context, self.id)
  1161. self.action = action
  1162. self.status = self.IN_PROGRESS
  1163. self.status_reason = 'Stack %s started' % self.action
  1164. # generate new traversal and store
  1165. previous_traversal = self.current_traversal
  1166. self.current_traversal = uuidutils.generate_uuid()
  1167. # we expect to update the stack having previous traversal ID
  1168. stack_id = self.store(exp_trvsl=previous_traversal)
  1169. if stack_id is None:
  1170. LOG.warning("Failed to store stack %(name)s with traversal "
  1171. "ID %(trvsl_id)s, aborting stack %(action)s",
  1172. {'name': self.name, 'trvsl_id': previous_traversal,
  1173. 'action': self.action})
  1174. return
  1175. self._send_notification_and_add_event()
  1176. # delete the prev traversal sync_points
  1177. if previous_traversal:
  1178. sync_point.delete_all(self.context, self.id, previous_traversal)
  1179. # TODO(later): lifecycle_plugin_utils.do_pre_ops
  1180. self.thread_group_mgr.start(self.id, self._converge_create_or_update,
  1181. pre_converge=pre_converge)
  1182. @reset_state_on_error
  1183. def _converge_create_or_update(self, pre_converge=None):
  1184. current_resources = self._update_or_store_resources()
  1185. self._compute_convg_dependencies(self.ext_rsrcs_db, self.dependencies,
  1186. current_resources)
  1187. # Store list of edges
  1188. self.current_deps = {
  1189. 'edges': [[rqr, rqd] for rqr, rqd in
  1190. self.convergence_dependencies.graph().edges()]}
  1191. stack_id = self.store()
  1192. if stack_id is None:
  1193. # Failed concurrent update
  1194. LOG.warning("Failed to store stack %(name)s with traversal "
  1195. "ID %(trvsl_id)s, aborting stack %(action)s",
  1196. {'name': self.name, 'trvsl_id': self.current_traversal,
  1197. 'action': self.action})
  1198. return
  1199. if callable(pre_converge):
  1200. pre_converge()
  1201. if self.action == self.DELETE:
  1202. try:
  1203. self.delete_all_snapshots()
  1204. except Exception as exc:
  1205. self.state_set(self.action, self.FAILED, six.text_type(exc))
  1206. self.purge_db()
  1207. return
  1208. LOG.debug('Starting traversal %s with dependencies: %s',
  1209. self.current_traversal, self.convergence_dependencies)
  1210. # create sync_points for resources in DB
  1211. for rsrc_id, is_update in self.convergence_dependencies:
  1212. sync_point.create(self.context, rsrc_id,
  1213. self.current_traversal, is_update,
  1214. self.id)
  1215. # create sync_point entry for stack
  1216. sync_point.create(
  1217. self.context, self.id, self.current_traversal, True, self.id)
  1218. leaves = set(self.convergence_dependencies.leaves())
  1219. if not leaves:
  1220. self.mark_complete()
  1221. else:
  1222. for rsrc_id, is_update in sorted(leaves,
  1223. key=lambda n: n.is_update):
  1224. if is_update:
  1225. LOG.info("Triggering resource %s for update", rsrc_id)
  1226. else:
  1227. LOG.info("Triggering resource %s for cleanup",
  1228. rsrc_id)
  1229. input_data = sync_point.serialize_input_data({})
  1230. self.worker_client.check_resource(self.context, rsrc_id,
  1231. self.current_traversal,
  1232. input_data, is_update,
  1233. self.adopt_stack_data,
  1234. self.converge)
  1235. if scheduler.ENABLE_SLEEP:
  1236. eventlet.sleep(1)
  1237. def rollback(self):
  1238. old_tmpl_id = self.prev_raw_template_id
  1239. if old_tmpl_id is None:
  1240. rollback_tmpl = tmpl.Template.create_empty_template(
  1241. version=self.t.version)
  1242. else:
  1243. rollback_tmpl = tmpl.Template.load(self.context, old_tmpl_id)
  1244. self.prev_raw_template_id = None
  1245. stack_id = self.store()
  1246. if stack_id is None:
  1247. # Failed concurrent update
  1248. LOG.warning("Failed to store stack %(name)s with traversal"
  1249. " ID %(trvsl_id)s, not triggering rollback.",
  1250. {'name': self.name,
  1251. 'trvsl_id': self.current_traversal})
  1252. return
  1253. self.converge_stack(rollback_tmpl, action=self.ROLLBACK)
  1254. def _get_best_existing_rsrc_db(self, rsrc_name):
  1255. if self.ext_rsrcs_db:
  1256. def suitability(ext_rsrc):
  1257. score = 0
  1258. if ext_rsrc.status == status.ResourceStatus.FAILED:
  1259. score -= 30
  1260. if ext_rsrc.action == status.ResourceStatus.DELETE:
  1261. score -= 50
  1262. if ext_rsrc.replaced_by:
  1263. score -= 1
  1264. if ext_rsrc.current_template_id == self.prev_raw_template_id:
  1265. # Current resource
  1266. score += 5
  1267. if ext_rsrc.current_template_id == self.t.id:
  1268. # Rolling back to previous resource
  1269. score += 10
  1270. return score, ext_rsrc.updated_at
  1271. candidates = sorted((r for r in self.ext_rsrcs_db.values()
  1272. if r.name == rsrc_name),
  1273. key=suitability,
  1274. reverse=True)
  1275. if candidates:
  1276. return candidates[0]
  1277. return None
  1278. def _update_or_store_resources(self):
  1279. self.ext_rsrcs_db = self.db_active_resources_get()
  1280. rsrcs = {}
  1281. for rsrc in reversed(self.dependencies):
  1282. existing_rsrc_db = self._get_best_existing_rsrc_db(rsrc.name)
  1283. if existing_rsrc_db is None:
  1284. rsrc.current_template_id = self.t.id
  1285. rsrc.store()
  1286. rsrcs[rsrc.name] = rsrc
  1287. else:
  1288. rsrcs[existing_rsrc_db.name] = existing_rsrc_db
  1289. return rsrcs
  1290. def _compute_convg_dependencies(self, existing_resources,
  1291. current_template_deps, current_resources):
  1292. def make_graph_key(rsrc):
  1293. return ConvergenceNode(current_resources[rsrc.name].id, True)
  1294. dep = current_template_deps.translate(make_graph_key)
  1295. if existing_resources:
  1296. for rsrc_id, rsrc in existing_resources.items():
  1297. dep += ConvergenceNode(rsrc_id, False), None
  1298. for requirement in rsrc.requires:
  1299. if requirement in existing_resources:
  1300. dep += (ConvergenceNode(requirement, False),
  1301. ConvergenceNode(rsrc_id, False))
  1302. if rsrc.replaces in existing_resources:
  1303. dep += (ConvergenceNode(rsrc.replaces, False),
  1304. ConvergenceNode(rsrc_id, False))
  1305. if ConvergenceNode(rsrc.id, True) in dep:
  1306. dep += (ConvergenceNode(rsrc_id, False),
  1307. ConvergenceNode(rsrc_id, True))
  1308. self._convg_deps = dep
  1309. @property
  1310. def convergence_dependencies(self):
  1311. if self._convg_deps is None:
  1312. current_deps = ((ConvergenceNode(*i),
  1313. ConvergenceNode(*j) if j is not None else None)
  1314. for i, j in self.current_deps['edges'])
  1315. self._convg_deps = dependencies.Dependencies(edges=current_deps)
  1316. return self._convg_deps
  1317. def dependent_resource_ids(self, resource_id):
  1318. """Return a set of resource IDs that are dependent on another.
  1319. Given a resource ID, return a set of all other resource IDs that are
  1320. dependent on that one - that is to say, those that must be cleaned up
  1321. before the given resource is cleaned up.
  1322. """
  1323. assert self.convergence, 'Invalid call for non-convergence stack'
  1324. clean_node = ConvergenceNode(resource_id, False)
  1325. deps = self.convergence_dependencies
  1326. if clean_node not in deps:
  1327. return set()
  1328. # Looking for the cleanup node, so use requires instead of required_by
  1329. dep_nodes = deps.requires(clean_node)
  1330. return set(n.rsrc_id for n in dep_nodes if not n.is_update)
  1331. def reset_stack_and_resources_in_progress(self, reason):
  1332. for name, rsrc in six.iteritems(self.resources):
  1333. if rsrc.status == rsrc.IN_PROGRESS:
  1334. rsrc.state_set(rsrc.action,
  1335. rsrc.FAILED,
  1336. six.text_type(reason))
  1337. if self.action == self.UPDATE and not self.convergence:
  1338. backup_stack = self._backup_stack(False)
  1339. existing_params = environment.Environment({env_fmt.PARAMETERS:
  1340. self.t.env.params})
  1341. template = tmpl.Template.load(self.context,
  1342. self.prev_raw_template_id)
  1343. bkp_stack_template = backup_stack.t if backup_stack else None
  1344. self._merge_user_param_template(existing_params, template,
  1345. bkp_stack_template)
  1346. self.state_set(self.action, self.FAILED, six.text_type(reason))
  1347. @scheduler.wrappertask
  1348. def update_task(self, newstack, action=UPDATE,
  1349. msg_queue=None, notify=None):
  1350. if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
  1351. LOG.error("Unexpected action %s passed to update!", action)
  1352. self.state_set(self.UPDATE, self.FAILED,
  1353. "Invalid action %s" % action)
  1354. if notify is not None:
  1355. notify.signal()
  1356. return
  1357. try:
  1358. lifecycle_plugin_utils.do_pre_ops(self.context, self,
  1359. newstack, action)
  1360. except Exception as e:
  1361. self.state_set(action, self.FAILED, e.args[0] if e.args else
  1362. 'Failed stack pre-ops: %s' % six.text_type(e))
  1363. if notify is not None:
  1364. notify.signal()
  1365. return
  1366. if self.status == self.IN_PROGRESS:
  1367. if action == self.ROLLBACK:
  1368. LOG.debug("Starting update rollback for %s", self.name)
  1369. else:
  1370. reason = _('Attempted to %s an IN_PROGRESS '
  1371. 'stack') % action
  1372. self.reset_stack_and_resources_in_progress(reason)
  1373. if notify is not None:
  1374. notify.signal()
  1375. return
  1376. # Save a copy of the new template. To avoid two DB writes
  1377. # we store the ID at the same time as the action/status
  1378. prev_tmpl_id = self.prev_raw_template_id
  1379. # newstack.t may have been pre-stored, so save with that one
  1380. bu_tmpl, newstack.t = newstack.t, copy.deepcopy(newstack.t)
  1381. self.prev_raw_template_id = bu_tmpl.store(self.context)
  1382. self.action = action
  1383. self.status = self.IN_PROGRESS
  1384. self.status_reason = 'Stack %s started' % action
  1385. self._send_notification_and_add_event()
  1386. self.store()
  1387. # Notify the caller that the state is stored
  1388. if notify is not None:
  1389. notify.signal()
  1390. if prev_tmpl_id is not None:
  1391. raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id)
  1392. if action == self.UPDATE:
  1393. # Oldstack is useless when the action is not UPDATE , so we don't
  1394. # need to build it, this can avoid some unexpected errors.
  1395. kwargs = self.get_kwargs_for_cloning(keep_tags=True)
  1396. self._ensure_encrypted_param_names_valid()
  1397. oldstack = Stack(self.context, self.name, copy.deepcopy(self.t),
  1398. **kwargs)
  1399. backup_stack = self._backup_stack()
  1400. existing_params = environment.Environment({env_fmt.PARAMETERS:
  1401. self.t.env.params})
  1402. previous_template_id = None
  1403. should_rollback = False
  1404. update_task = update.StackUpdate(
  1405. self, newstack, backup_stack,
  1406. rollback=action == self.ROLLBACK)
  1407. try:
  1408. updater = scheduler.TaskRunner(update_task)
  1409. self.defn.parameters = newstack.defn.parameters
  1410. self.defn.t.files = newstack.defn.t.files
  1411. self.defn.t.env = newstack.defn.t.env
  1412. self.disable_rollback = newstack.disable_rollback
  1413. self.timeout_mins = newstack.timeout_mins
  1414. self._set_param_stackid()
  1415. self.tags = newstack.tags
  1416. if newstack.tags:
  1417. stack_tag_object.StackTagList.set(self.context, self.id,
  1418. newstack.tags)
  1419. else:
  1420. stack_tag_object.StackTagList.delete(self.context, self.id)
  1421. check_message = functools.partial(self._check_for_message,
  1422. msg_queue)
  1423. try:
  1424. yield updater.as_task(timeout=self.timeout_secs(),
  1425. progress_callback=check_message)
  1426. finally:
  1427. self.reset_dependencies()
  1428. self.status_reason = 'Stack %s completed successfully' % action
  1429. self.status = self.COMPLETE
  1430. except scheduler.Timeout:
  1431. self.status = self.FAILED
  1432. self.status_reason = 'Timed out'
  1433. except Exception as e:
  1434. # If rollback is enabled when resource failure occurred,
  1435. # we do another update, with the existing template,
  1436. # so we roll back to the original state
  1437. should_rollback = self._update_exception_handler(e, action)
  1438. if should_rollback:
  1439. yield self.update_task(oldstack, action=self.ROLLBACK)
  1440. except BaseException as e:
  1441. with excutils.save_and_reraise_exception():
  1442. self._update_exception_handler(e, action)
  1443. else:
  1444. LOG.debug('Deleting backup stack')
  1445. backup_stack.delete(backup=True)
  1446. # flip the template to the newstack values
  1447. previous_template_id = self.t.id
  1448. self.t = newstack.t
  1449. self._outputs = None
  1450. finally:
  1451. if should_rollback:
  1452. # Already handled in rollback task
  1453. return
  1454. # Don't use state_set to do only one update query and avoid race
  1455. # condition with the COMPLETE status
  1456. self.action = action
  1457. self._log_status()
  1458. self._send_notification_and_add_event()
  1459. if self.status == self.FAILED:
  1460. self._merge_user_param_template(existing_params, newstack.t,
  1461. backup_stack.t)
  1462. self.store()
  1463. if previous_template_id is not None:
  1464. raw_template_object.RawTemplate.delete(self.context,
  1465. previous_template_id)
  1466. lifecycle_plugin_utils.do_post_ops(self.context, self,
  1467. newstack, action,
  1468. (self.status == self.FAILED))
  1469. def _merge_user_param_template(self, existing_params, new_template,
  1470. bkp_stack_template):
  1471. # Since template was incrementally updated based on existing
  1472. # and new stack resources, we should have user params of both.
  1473. existing_params.load(new_template.env.user_env_as_dict())
  1474. self.t.env = existing_params
  1475. # Update the template version, in case new things were used
  1476. self.t.t[new_template.version[0]] = max(new_template.version[1],
  1477. self.t.version[1])
  1478. self.t.merge_snippets(new_template)
  1479. self.t.store(self.context)
  1480. if bkp_stack_template:
  1481. bkp_stack_template.env = existing_params
  1482. bkp_stack_template.t[new_template.version[0]] = max(
  1483. new_template.version[1], self.t.version[1])
  1484. bkp_stack_template.merge_snippets(new_template)
  1485. bkp_stack_template.store(self.context)
  1486. def _update_exception_handler(self, exc, action):
  1487. """Handle exceptions in update_task.
  1488. Decide if we should cancel tasks or not. Also decide if we should
  1489. rollback or not, depend on disable rollback flag if force rollback flag
  1490. not triggered.
  1491. :returns: a boolean for require rollback flag.
  1492. """
  1493. self.status_reason = six.text_type(exc)
  1494. self.status = self.FAILED
  1495. if action != self.UPDATE:
  1496. return False
  1497. if isinstance(exc, ForcedCancel):
  1498. return exc.with_rollback or not self.disable_rollback
  1499. elif isinstance(exc, exception.ResourceFailure):
  1500. return not self.disable_rollback
  1501. else:
  1502. return False
  1503. def _ensure_encrypted_param_names_valid(self):
  1504. # If encryption was enabled when the stack was created but
  1505. # then disabled when the stack was updated, env.params and
  1506. # env.encrypted_param_names will be in an inconsistent
  1507. # state
  1508. if not cfg.CONF.encrypt_parameters_and_properties:
  1509. self.t.env.encrypted_param_names = []
  1510. @staticmethod
  1511. def _check_for_message(msg_queue):
  1512. if msg_queue is None:
  1513. return
  1514. try:
  1515. message = msg_queue.get_nowait()
  1516. except eventlet.queue.Empty:
  1517. return
  1518. if message == rpc_api.THREAD_CANCEL:
  1519. raise ForcedCancel(with_rollback=False)
  1520. elif message == rpc_api.THREAD_CANCEL_WITH_ROLLBACK:
  1521. raise ForcedCancel(with_rollback=True)
  1522. LOG.error('Unknown message "%s" received', message)
  1523. def _delete_backup_stack(self, stack):
  1524. # Delete resources in the backup stack referred to by 'stack'
  1525. def failed(child):
  1526. return (child.action == child.CREATE and
  1527. child.status in (child.FAILED, child.IN_PROGRESS))
  1528. def copy_data(source_res, destination_res):
  1529. if source_res.data():
  1530. for key, val in six.iteritems(source_res.data()):
  1531. destination_res.data_set(key, val)
  1532. for key, backup_res in stack.resources.items():
  1533. # If UpdateReplace is failed, we must restore backup_res
  1534. # to existing_stack in case of it may have dependencies in
  1535. # these stacks. curr_res is the resource that just
  1536. # created and failed, so put into the stack to delete anyway.
  1537. backup_res_id = backup_res.resource_id
  1538. curr_res = self.resources.get(key)
  1539. if backup_res_id is not None and curr_res is not None:
  1540. curr_res_id = curr_res.resource_id
  1541. if (any(failed(child) for child in
  1542. self.dependencies[curr_res]) or
  1543. curr_res.status in
  1544. (curr_res.FAILED, curr_res.IN_PROGRESS)):
  1545. # If child resource failed to update, curr_res
  1546. # should be replaced to resolve dependencies. But this
  1547. # is not fundamental solution. If there are update
  1548. # failer and success resources in the children, cannot
  1549. # delete the stack.
  1550. # Stack class owns dependencies as set of resource's
  1551. # objects, so we switch members of the resource that is
  1552. # needed to delete it.
  1553. self.resources[key].resource_id = backup_res_id
  1554. self.resources[key].properties = backup_res.properties
  1555. copy_data(backup_res, self.resources[key])
  1556. stack.resources[key].resource_id = curr_res_id
  1557. stack.resources[key].properties = curr_res.properties
  1558. copy_data(curr_res, stack.resources[key])
  1559. stack.delete(backup=True)
  1560. def _try_get_user_creds(self):
  1561. # There are cases where the user_creds cannot be returned
  1562. # due to credentials truncated when being saved to DB.
  1563. # Ignore this error instead of blocking stack deletion.
  1564. try:
  1565. return ucreds_object.UserCreds.get_by_id(self.context,
  1566. self.user_creds_id)
  1567. except exception.Error:
  1568. LOG.exception("Failed to retrieve user_creds")
  1569. return None
  1570. def _delete_credentials(self, stack_status, reason, abandon):
  1571. # Cleanup stored user_creds so they aren't accessible via
  1572. # the soft-deleted stack which remains in the DB
  1573. # The stack_status and reason passed in are current values, which
  1574. # may get rewritten and returned from this method
  1575. if self.user_creds_id:
  1576. user_creds = self._try_get_user_creds()
  1577. # If we created a trust, delete it
  1578. if user_creds is not None:
  1579. trust_id = user_creds.get('trust_id')
  1580. if trust_id:
  1581. try:
  1582. # If the trustor doesn't match the context user the
  1583. # we have to use the stored context to cleanup the
  1584. # trust, as although the user evidently has
  1585. # permission to delete the stack, they don't have
  1586. # rights to delete the trust unless an admin
  1587. trustor_id = user_creds.get('trustor_user_id')
  1588. if self.context.user_id != trustor_id:
  1589. LOG.debug("Context user_id doesn't match "
  1590. "trustor, using stored context")
  1591. sc = self.stored_context()
  1592. sc.clients.client('keystone').delete_trust(
  1593. trust_id)
  1594. else:
  1595. self.clients.client('keystone').delete_trust(
  1596. trust_id)
  1597. except Exception as ex:
  1598. # We want the admin to be able to delete the stack
  1599. # Do not FAIL a delete when we cannot delete a trust.
  1600. # We already carry through and delete the credentials
  1601. # Without this, they would need to issue
  1602. # an additional stack-delete
  1603. LOG.exception("Error deleting trust")
  1604. # Delete the stored credentials
  1605. try:
  1606. ucreds_object.UserCreds.delete(self.context,
  1607. self.user_creds_id)
  1608. except exception.NotFound:
  1609. LOG.info("Tried to delete user_creds that do not exist "
  1610. "(stack=%(stack)s user_creds_id=%(uc)s)",
  1611. {'stack': self.id, 'uc': self.user_creds_id})
  1612. try:
  1613. self.user_creds_id = None
  1614. self.store()
  1615. except exception.NotFound:
  1616. LOG.info("Tried to store a stack that does not exist %s",
  1617. self.id)
  1618. # If the stack has a domain project, delete it
  1619. if self.stack_user_project_id and not abandon:
  1620. try:
  1621. keystone = self.clients.client('keystone')
  1622. keystone.delete_stack_domain_project(
  1623. project_id=self.stack_user_project_id)
  1624. except Exception as ex:
  1625. LOG.exception("Error deleting project")
  1626. stack_status = self.FAILED
  1627. reason = "Error deleting project: %s" % six.text_type(ex)
  1628. return stack_status, reason
  1629. @profiler.trace('Stack.delete', hide_args=False)
  1630. @reset_state_on_error
  1631. def delete(self, action=DELETE, backup=False, abandon=False, notify=None):
  1632. """Delete all of the resources, and then the stack itself.
  1633. The action parameter is used to differentiate between a user
  1634. initiated delete and an automatic stack rollback after a failed
  1635. create, which amount to the same thing, but the states are recorded
  1636. differently.
  1637. Note abandon is a delete where all resources have been set to a
  1638. RETAIN deletion policy, but we also don't want to delete anything
  1639. required for those resources, e.g the stack_user_project.
  1640. """
  1641. if action not in (self.DELETE, self.ROLLBACK):
  1642. LOG.error("Unexpected action %s passed to delete!", action)
  1643. self.state_set(self.DELETE, self.FAILED,
  1644. "Invalid action %s" % action)
  1645. if notify is not None:
  1646. notify.signal()
  1647. return
  1648. stack_status = self.COMPLETE
  1649. reason = 'Stack %s completed successfully' % action
  1650. self.state_set(action, self.IN_PROGRESS, 'Stack %s started' %
  1651. action)
  1652. if notify is not None:
  1653. notify.signal()
  1654. backup_stack = self._backup_stack(False)
  1655. if backup_stack:
  1656. self._delete_backup_stack(backup_stack)
  1657. if backup_stack.status != backup_stack.COMPLETE:
  1658. errs = backup_stack.status_reason
  1659. failure = 'Error deleting backup resources: %s' % errs
  1660. self.state_set(action, self.FAILED,
  1661. 'Failed to %s : %s' % (action, failure))
  1662. return
  1663. self.delete_all_snapshots()
  1664. if not backup:
  1665. try:
  1666. lifecycle_plugin_utils.do_pre_ops(self.context, self,
  1667. None, action)
  1668. except Exception as e:
  1669. self.state_set(action, self.FAILED,
  1670. e.args[0] if e.args else
  1671. 'Failed stack pre-ops: %s' % six.text_type(e))
  1672. return
  1673. action_task = scheduler.DependencyTaskGroup(self.dependencies,
  1674. resource.Resource.destroy,
  1675. reverse=True)
  1676. try:
  1677. scheduler.TaskRunner(action_task)(timeout=self.timeout_secs())
  1678. except exception.ResourceFailure as ex:
  1679. stack_status = self.FAILED
  1680. reason = 'Resource %s failed: %s' % (action, six.text_type(ex))
  1681. except scheduler.Timeout:
  1682. stack_status = self.FAILED
  1683. reason = '%s timed out' % action.title()
  1684. # If the stack delete succeeded, this is not a backup stack and it's
  1685. # not a nested stack, we should delete the credentials
  1686. if stack_status != self.FAILED and not backup and not self.owner_id:
  1687. stack_status, reason = self._delete_credentials(stack_status,
  1688. reason,
  1689. abandon)
  1690. try:
  1691. self.state_set(action, stack_status, reason)
  1692. except exception.NotFound:
  1693. LOG.info("Tried to delete stack that does not exist "
  1694. "%s ", self.id)
  1695. if not backup:
  1696. lifecycle_plugin_utils.do_post_ops(self.context, self,
  1697. None, action,
  1698. (self.status == self.FAILED))
  1699. if stack_status != self.FAILED:
  1700. # delete the stack
  1701. try:
  1702. stack_object.Stack.delete(self.context, self.id)
  1703. except exception.NotFound:
  1704. LOG.info("Tried to delete stack that does not exist "
  1705. "%s ", self.id)
  1706. self.id = None
  1707. @profiler.trace('Stack.suspend', hide_args=False)
  1708. @reset_state_on_error
  1709. def suspend(self, notify=None):
  1710. """Suspend the stack.
  1711. Invokes handle_suspend for all stack resources.
  1712. Waits for all resources to become SUSPEND_COMPLETE then declares the
  1713. stack SUSPEND_COMPLETE.
  1714. Note the default implementation for all resources is to do nothing
  1715. other than move to SUSPEND_COMPLETE, so the resources must implement
  1716. handle_suspend for this to have any effect.
  1717. """
  1718. LOG.debug("Suspending stack %s", self)
  1719. # No need to suspend if the stack has been suspended
  1720. if self.state == (self.SUSPEND, self.COMPLETE):
  1721. LOG.info('%s is already suspended', self)
  1722. return
  1723. if self.convergence:
  1724. self._update_or_store_resources()
  1725. self.updated_time = oslo_timeutils.utcnow()
  1726. sus_task = scheduler.TaskRunner(
  1727. self.stack_task,
  1728. action=self.SUSPEND,
  1729. reverse=True,
  1730. notify=notify)
  1731. sus_task(timeout=self.timeout_secs())
  1732. @profiler.trace('Stack.resume', hide_args=False)
  1733. @reset_state_on_error
  1734. def resume(self, notify=None):
  1735. """Resume the stack.
  1736. Invokes handle_resume for all stack resources.
  1737. Waits for all resources to become RESUME_COMPLETE then declares the
  1738. stack RESUME_COMPLETE.
  1739. Note the default implementation for all resources is to do nothing
  1740. other than move to RESUME_COMPLETE, so the resources must implement
  1741. handle_resume for this to have any effect.
  1742. """
  1743. LOG.debug("Resuming stack %s", self)
  1744. # No need to resume if the stack has been resumed
  1745. if self.state == (self.RESUME, self.COMPLETE):
  1746. LOG.info('%s is already resumed', self)
  1747. return
  1748. if self.convergence:
  1749. self._update_or_store_resources()
  1750. self.updated_time = oslo_timeutils.utcnow()
  1751. sus_task = scheduler.TaskRunner(
  1752. self.stack_task,
  1753. action=self.RESUME,
  1754. reverse=False,
  1755. notify=notify)
  1756. sus_task(timeout=self.timeout_secs())
  1757. @profiler.trace('Stack.snapshot', hide_args=False)
  1758. @reset_state_on_error
  1759. def snapshot(self, save_snapshot_func):
  1760. """Snapshot the stack, invoking handle_snapshot on all resources."""
  1761. self.updated_time = oslo_timeutils.utcnow()
  1762. sus_task = scheduler.TaskRunner(
  1763. self.stack_task,
  1764. action=self.SNAPSHOT,
  1765. reverse=False,
  1766. pre_completion_func=save_snapshot_func)
  1767. sus_task(timeout=self.timeout_secs())
  1768. def delete_all_snapshots(self):
  1769. """Remove all snapshots for this stack."""
  1770. snapshots = snapshot_object.Snapshot.get_all(self.context, self.id)
  1771. for snapshot in snapshots:
  1772. self.delete_snapshot(snapshot)
  1773. snapshot_object.Snapshot.delete(self.context, snapshot.id)
  1774. @staticmethod
  1775. def _template_from_snapshot_data(snapshot_data):
  1776. env = environment.Environment(snapshot_data['environment'])
  1777. files = snapshot_data['files']
  1778. return tmpl.Template(snapshot_data['template'], env=env, files=files)
  1779. @profiler.trace('Stack.delete_snapshot', hide_args=False)
  1780. def delete_snapshot(self, snapshot):
  1781. """Remove a snapshot from the backends."""
  1782. snapshot_data = snapshot.data
  1783. if snapshot_data:
  1784. template = self._template_from_snapshot_data(snapshot_data)
  1785. ss_defn = self.defn.clone_with_new_template(template,
  1786. self.identifier())
  1787. resources = self._resources_for_defn(ss_defn)
  1788. for name, rsrc in six.iteritems(resources):
  1789. data = snapshot.data['resources'].get(name)
  1790. if data:
  1791. scheduler.TaskRunner(rsrc.delete_snapshot, data)()
  1792. def restore_data(self, snapshot):
  1793. template = self._template_from_snapshot_data(snapshot.data)
  1794. newstack = self.__class__(self.context, self.name, template,
  1795. timeout_mins=self.timeout_mins,
  1796. disable_rollback=self.disable_rollback)
  1797. for name in newstack.defn.enabled_rsrc_names():
  1798. defn = newstack.defn.resource_definition(name)
  1799. rsrc = resource.Resource(name, defn, self)
  1800. data = snapshot.data['resources'].get(name)
  1801. handle_restore = getattr(rsrc, 'handle_restore', None)
  1802. if callable(handle_restore):
  1803. defn = handle_restore(defn, data)
  1804. template.add_resource(defn, name)
  1805. newstack.parameters.set_stack_id(self.identifier())
  1806. return newstack, template
  1807. @reset_state_on_error
  1808. def restore(self, snapshot, notify=None):
  1809. """Restore the given snapshot.
  1810. Invokes handle_restore on all resources.
  1811. """
  1812. LOG.debug("Restoring stack %s", self)
  1813. self.updated_time = oslo_timeutils.utcnow()
  1814. newstack = self.restore_data(snapshot)[0]
  1815. updater = scheduler.TaskRunner(self.update_task, newstack,
  1816. action=self.RESTORE, notify=notify)
  1817. updater()
  1818. def get_availability_zones(self):
  1819. nova = self.clients.client('nova')
  1820. if self._zones is None:
  1821. self._zones = [
  1822. zone.zoneName for zone in
  1823. nova.availability_zones.list(detailed=False)]
  1824. return self._zones
  1825. def set_stack_user_project_id(self, project_id):
  1826. self.stack_user_project_id = project_id
  1827. self.store()
  1828. @profiler.trace('Stack.create_stack_user_project_id', hide_args=False)
  1829. def create_stack_user_project_id(self):
  1830. project_id = self.clients.client(
  1831. 'keystone').create_stack_domain_project(self.id)
  1832. self.set_stack_user_project_id(project_id)
  1833. @profiler.trace('Stack.prepare_abandon', hide_args=False)
  1834. def prepare_abandon(self):
  1835. return {
  1836. 'name': self.name,
  1837. 'id': self.id,
  1838. 'action': self.action,
  1839. 'environment': self.env.user_env_as_dict(),
  1840. 'files': self.t.files,
  1841. 'status': self.status,
  1842. 'template': self.t.t,
  1843. 'resources': dict((res.name, res.prepare_abandon())
  1844. for res in six.itervalues(self.resources)),
  1845. 'project_id': self.tenant_id,
  1846. 'stack_user_project_id': self.stack_user_project_id,
  1847. 'tags': self.tags,
  1848. }
  1849. def mark_failed(self, failure_reason):
  1850. """Mark the convergence update as failed."""
  1851. updated = self.state_set(self.action, self.FAILED, failure_reason)
  1852. if not updated:
  1853. return False
  1854. if not self.convergence:
  1855. # This function is not generally used in the legacy path, but to
  1856. # allow it to be used by any kind of stack in the
  1857. # reset_state_on_error decorator, bail out before the
  1858. # convergence-specific part in legacy stacks.
  1859. return
  1860. if (not self.disable_rollback and
  1861. self.action in (self.CREATE, self.ADOPT, self.UPDATE,
  1862. self.RESTORE)):
  1863. LOG.info("Triggering rollback of %(stack_name)s %(action)s ",
  1864. {'action': self.action, 'stack_name': self.name})
  1865. self.rollback()
  1866. else:
  1867. self.purge_db()
  1868. return True
  1869. def mark_complete(self):
  1870. """Mark the convergence update as complete."""
  1871. LOG.info('[%(name)s(%(id)s)] update traversal %(tid)s complete',
  1872. {'name': self.name, 'id': self.id,
  1873. 'tid': self.current_traversal})
  1874. reason = 'Stack %s completed successfully' % self.action
  1875. updated = self.state_set(self.action, self.COMPLETE, reason)
  1876. if not updated:
  1877. return
  1878. self.purge_db()
  1879. def purge_db(self):
  1880. """Cleanup database after stack has completed/failed.
  1881. 1. Delete the resources from DB.
  1882. 2. If the stack failed, update the current_traversal to empty string
  1883. so that the resource workers bail out.
  1884. 3. Delete previous raw template if stack completes successfully.
  1885. 4. Deletes all sync points. They are no longer needed after stack
  1886. has completed/failed.
  1887. 5. Delete the stack if the action is DELETE.
  1888. """
  1889. resource_objects.Resource.purge_deleted(self.context, self.id)
  1890. exp_trvsl = self.current_traversal
  1891. if self.status == self.FAILED:
  1892. self.current_traversal = ''
  1893. prev_tmpl_id = None
  1894. if (self.prev_raw_template_id is not None and
  1895. self.status != self.FAILED):
  1896. prev_tmpl_id = self.prev_raw_template_id
  1897. self.prev_raw_template_id = None
  1898. stack_id = self.store(exp_trvsl=exp_trvsl)
  1899. if stack_id is None:
  1900. # Failed concurrent update
  1901. LOG.warning("Failed to store stack %(name)s with traversal ID "
  1902. "%(trvsl_id)s, aborting stack purge",
  1903. {'name': self.name,
  1904. 'trvsl_id': self.current_traversal})
  1905. return
  1906. if prev_tmpl_id is not None:
  1907. raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id)
  1908. sync_point.delete_all(self.context, self.id, exp_trvsl)
  1909. if (self.action, self.status) == (self.DELETE, self.COMPLETE):
  1910. if not self.owner_id:
  1911. status, reason = self._delete_credentials(
  1912. self.status,
  1913. self.status_reason,
  1914. False)
  1915. if status == self.FAILED:
  1916. # something wrong when delete credentials, set FAILED
  1917. self.state_set(self.action, status, reason)
  1918. return
  1919. try:
  1920. stack_object.Stack.delete(self.context, self.id)
  1921. except exception.NotFound:
  1922. pass
  1923. def time_elapsed(self):
  1924. """Time elapsed in seconds since the stack operation started."""
  1925. start_time = self.updated_time or self.created_time
  1926. return (oslo_timeutils.utcnow() - start_time).total_seconds()
  1927. def time_remaining(self):
  1928. """Time left before stack times out."""
  1929. return self.timeout_secs() - self.time_elapsed()
  1930. def has_timed_out(self):
  1931. """Returns True if this stack has timed-out."""
  1932. if self.status == self.IN_PROGRESS:
  1933. return self.time_elapsed() > self.timeout_secs()
  1934. return False
  1935. def migrate_to_convergence(self):
  1936. db_rsrcs = self.db_active_resources_get()
  1937. res_id_dep = self.dependencies.translate(lambda res: res.id)
  1938. current_template_id = self.t.id
  1939. if db_rsrcs is not None:
  1940. for db_res in db_rsrcs.values():
  1941. requires = set(res_id_dep.requires(db_res.id))
  1942. r = self.resources.get(db_res.name)
  1943. if r is None:
  1944. # delete db resources not in current_template_id
  1945. LOG.warning("Resource %(res)s not found in template "
  1946. "for stack %(st)s, deleting from db.",
  1947. {'res': db_res.name, 'st': self.id})
  1948. resource_objects.Resource.delete(self.context, db_res.id)
  1949. else:
  1950. r.requires = requires
  1951. db_res.convert_to_convergence(current_template_id,
  1952. requires)
  1953. self.current_traversal = uuidutils.generate_uuid()
  1954. self.convergence = True
  1955. prev_raw_template_id = self.prev_raw_template_id
  1956. self.prev_raw_template_id = None
  1957. self.store(ignore_traversal_check=True)
  1958. if prev_raw_template_id:
  1959. raw_template_object.RawTemplate.delete(self.context,
  1960. prev_raw_template_id)