OpenStack Compute (Nova)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

5742 lines
201KB

  1. # Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
  2. # Copyright 2010 United States Government as represented by the
  3. # Administrator of the National Aeronautics and Space Administration.
  4. # All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  7. # not use this file except in compliance with the License. You may obtain
  8. # a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  14. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  15. # License for the specific language governing permissions and limitations
  16. # under the License.
  17. """Implementation of SQLAlchemy backend."""
  18. import collections
  19. import copy
  20. import datetime
  21. import functools
  22. import inspect
  23. import sys
  24. from oslo_db import api as oslo_db_api
  25. from oslo_db import exception as db_exc
  26. from oslo_db.sqlalchemy import enginefacade
  27. from oslo_db.sqlalchemy import update_match
  28. from oslo_db.sqlalchemy import utils as sqlalchemyutils
  29. from oslo_log import log as logging
  30. from oslo_utils import excutils
  31. from oslo_utils import importutils
  32. from oslo_utils import timeutils
  33. from oslo_utils import uuidutils
  34. import six
  35. from six.moves import range
  36. import sqlalchemy as sa
  37. from sqlalchemy import and_
  38. from sqlalchemy import Boolean
  39. from sqlalchemy.exc import NoSuchTableError
  40. from sqlalchemy.ext.compiler import compiles
  41. from sqlalchemy import Integer
  42. from sqlalchemy import MetaData
  43. from sqlalchemy import or_
  44. from sqlalchemy.orm import aliased
  45. from sqlalchemy.orm import contains_eager
  46. from sqlalchemy.orm import joinedload
  47. from sqlalchemy.orm import noload
  48. from sqlalchemy.orm import undefer
  49. from sqlalchemy.schema import Table
  50. from sqlalchemy import sql
  51. from sqlalchemy.sql.expression import asc
  52. from sqlalchemy.sql.expression import cast
  53. from sqlalchemy.sql.expression import desc
  54. from sqlalchemy.sql.expression import UpdateBase
  55. from sqlalchemy.sql import false
  56. from sqlalchemy.sql import func
  57. from sqlalchemy.sql import null
  58. from sqlalchemy.sql import true
  59. from nova import block_device
  60. from nova.compute import task_states
  61. from nova.compute import vm_states
  62. import nova.conf
  63. import nova.context
  64. from nova.db.sqlalchemy import models
  65. from nova import exception
  66. from nova.i18n import _
  67. from nova import safe_utils
  68. profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
  69. CONF = nova.conf.CONF
  70. LOG = logging.getLogger(__name__)
  71. main_context_manager = enginefacade.transaction_context()
  72. api_context_manager = enginefacade.transaction_context()
  73. def _get_db_conf(conf_group, connection=None):
  74. kw = dict(conf_group.items())
  75. if connection is not None:
  76. kw['connection'] = connection
  77. return kw
  78. def _context_manager_from_context(context):
  79. if context:
  80. try:
  81. return context.db_connection
  82. except AttributeError:
  83. pass
  84. def _joinedload_all(column):
  85. elements = column.split('.')
  86. joined = joinedload(elements.pop(0))
  87. for element in elements:
  88. joined = joined.joinedload(element)
  89. return joined
  90. def configure(conf):
  91. main_context_manager.configure(**_get_db_conf(conf.database))
  92. api_context_manager.configure(**_get_db_conf(conf.api_database))
  93. if profiler_sqlalchemy and CONF.profiler.enabled \
  94. and CONF.profiler.trace_sqlalchemy:
  95. main_context_manager.append_on_engine_create(
  96. lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
  97. api_context_manager.append_on_engine_create(
  98. lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
  99. def create_context_manager(connection=None):
  100. """Create a database context manager object.
  101. : param connection: The database connection string
  102. """
  103. ctxt_mgr = enginefacade.transaction_context()
  104. ctxt_mgr.configure(**_get_db_conf(CONF.database, connection=connection))
  105. return ctxt_mgr
  106. def get_context_manager(context):
  107. """Get a database context manager object.
  108. :param context: The request context that can contain a context manager
  109. """
  110. return _context_manager_from_context(context) or main_context_manager
  111. def get_engine(use_slave=False, context=None):
  112. """Get a database engine object.
  113. :param use_slave: Whether to use the slave connection
  114. :param context: The request context that can contain a context manager
  115. """
  116. ctxt_mgr = get_context_manager(context)
  117. if use_slave:
  118. return ctxt_mgr.reader.get_engine()
  119. return ctxt_mgr.writer.get_engine()
  120. def get_api_engine():
  121. return api_context_manager.writer.get_engine()
  122. _SHADOW_TABLE_PREFIX = 'shadow_'
  123. _DEFAULT_QUOTA_NAME = 'default'
  124. PER_PROJECT_QUOTAS = ['fixed_ips', 'floating_ips', 'networks']
  125. # NOTE(stephenfin): This is required and used by oslo.db
  126. def get_backend():
  127. """The backend is this module itself."""
  128. return sys.modules[__name__]
  129. def require_context(f):
  130. """Decorator to require *any* user or admin context.
  131. This does no authorization for user or project access matching, see
  132. :py:func:`nova.context.authorize_project_context` and
  133. :py:func:`nova.context.authorize_user_context`.
  134. The first argument to the wrapped function must be the context.
  135. """
  136. @functools.wraps(f)
  137. def wrapper(*args, **kwargs):
  138. nova.context.require_context(args[0])
  139. return f(*args, **kwargs)
  140. return wrapper
  141. def select_db_reader_mode(f):
  142. """Decorator to select synchronous or asynchronous reader mode.
  143. The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
  144. will be used if 'use_slave' is True and synchronous reader otherwise.
  145. If 'use_slave' is not specified default value 'False' will be used.
  146. Wrapped function must have a context in the arguments.
  147. """
  148. @functools.wraps(f)
  149. def wrapper(*args, **kwargs):
  150. wrapped_func = safe_utils.get_wrapped_function(f)
  151. keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)
  152. context = keyed_args['context']
  153. use_slave = keyed_args.get('use_slave', False)
  154. if use_slave:
  155. reader_mode = get_context_manager(context).async_
  156. else:
  157. reader_mode = get_context_manager(context).reader
  158. with reader_mode.using(context):
  159. return f(*args, **kwargs)
  160. return wrapper
  161. def pick_context_manager_writer(f):
  162. """Decorator to use a writer db context manager.
  163. The db context manager will be picked from the RequestContext.
  164. Wrapped function must have a RequestContext in the arguments.
  165. """
  166. @functools.wraps(f)
  167. def wrapped(context, *args, **kwargs):
  168. ctxt_mgr = get_context_manager(context)
  169. with ctxt_mgr.writer.using(context):
  170. return f(context, *args, **kwargs)
  171. return wrapped
  172. def pick_context_manager_reader(f):
  173. """Decorator to use a reader db context manager.
  174. The db context manager will be picked from the RequestContext.
  175. Wrapped function must have a RequestContext in the arguments.
  176. """
  177. @functools.wraps(f)
  178. def wrapped(context, *args, **kwargs):
  179. ctxt_mgr = get_context_manager(context)
  180. with ctxt_mgr.reader.using(context):
  181. return f(context, *args, **kwargs)
  182. return wrapped
  183. def pick_context_manager_reader_allow_async(f):
  184. """Decorator to use a reader.allow_async db context manager.
  185. The db context manager will be picked from the RequestContext.
  186. Wrapped function must have a RequestContext in the arguments.
  187. """
  188. @functools.wraps(f)
  189. def wrapped(context, *args, **kwargs):
  190. ctxt_mgr = get_context_manager(context)
  191. with ctxt_mgr.reader.allow_async.using(context):
  192. return f(context, *args, **kwargs)
  193. return wrapped
  194. def model_query(context, model,
  195. args=None,
  196. read_deleted=None,
  197. project_only=False):
  198. """Query helper that accounts for context's `read_deleted` field.
  199. :param context: NovaContext of the query.
  200. :param model: Model to query. Must be a subclass of ModelBase.
  201. :param args: Arguments to query. If None - model is used.
  202. :param read_deleted: If not None, overrides context's read_deleted field.
  203. Permitted values are 'no', which does not return
  204. deleted values; 'only', which only returns deleted
  205. values; and 'yes', which does not filter deleted
  206. values.
  207. :param project_only: If set and context is user-type, then restrict
  208. query to match the context's project_id. If set to
  209. 'allow_none', restriction includes project_id = None.
  210. """
  211. if read_deleted is None:
  212. read_deleted = context.read_deleted
  213. query_kwargs = {}
  214. if 'no' == read_deleted:
  215. query_kwargs['deleted'] = False
  216. elif 'only' == read_deleted:
  217. query_kwargs['deleted'] = True
  218. elif 'yes' == read_deleted:
  219. pass
  220. else:
  221. raise ValueError(_("Unrecognized read_deleted value '%s'")
  222. % read_deleted)
  223. query = sqlalchemyutils.model_query(
  224. model, context.session, args, **query_kwargs)
  225. # We can't use oslo.db model_query's project_id here, as it doesn't allow
  226. # us to return both our projects and unowned projects.
  227. if nova.context.is_user_context(context) and project_only:
  228. if project_only == 'allow_none':
  229. query = query.\
  230. filter(or_(model.project_id == context.project_id,
  231. model.project_id == null()))
  232. else:
  233. query = query.filter_by(project_id=context.project_id)
  234. return query
  235. def convert_objects_related_datetimes(values, *datetime_keys):
  236. if not datetime_keys:
  237. datetime_keys = ('created_at', 'deleted_at', 'updated_at')
  238. for key in datetime_keys:
  239. if key in values and values[key]:
  240. if isinstance(values[key], six.string_types):
  241. try:
  242. values[key] = timeutils.parse_strtime(values[key])
  243. except ValueError:
  244. # Try alternate parsing since parse_strtime will fail
  245. # with say converting '2015-05-28T19:59:38+00:00'
  246. values[key] = timeutils.parse_isotime(values[key])
  247. # NOTE(danms): Strip UTC timezones from datetimes, since they're
  248. # stored that way in the database
  249. values[key] = values[key].replace(tzinfo=None)
  250. return values
  251. ###################
  252. def constraint(**conditions):
  253. return Constraint(conditions)
  254. def equal_any(*values):
  255. return EqualityCondition(values)
  256. def not_equal(*values):
  257. return InequalityCondition(values)
  258. class Constraint(object):
  259. def __init__(self, conditions):
  260. self.conditions = conditions
  261. def apply(self, model, query):
  262. for key, condition in self.conditions.items():
  263. for clause in condition.clauses(getattr(model, key)):
  264. query = query.filter(clause)
  265. return query
  266. class EqualityCondition(object):
  267. def __init__(self, values):
  268. self.values = values
  269. def clauses(self, field):
  270. # method signature requires us to return an iterable even if for OR
  271. # operator this will actually be a single clause
  272. return [or_(*[field == value for value in self.values])]
  273. class InequalityCondition(object):
  274. def __init__(self, values):
  275. self.values = values
  276. def clauses(self, field):
  277. return [field != value for value in self.values]
  278. class DeleteFromSelect(UpdateBase):
  279. def __init__(self, table, select, column):
  280. self.table = table
  281. self.select = select
  282. self.column = column
  283. # NOTE(guochbo): some versions of MySQL doesn't yet support subquery with
  284. # 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select .
  285. @compiles(DeleteFromSelect)
  286. def visit_delete_from_select(element, compiler, **kw):
  287. return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % (
  288. compiler.process(element.table, asfrom=True),
  289. compiler.process(element.column),
  290. element.column.name,
  291. compiler.process(element.select))
  292. ###################
  293. @pick_context_manager_writer
  294. def service_destroy(context, service_id):
  295. service = service_get(context, service_id)
  296. model_query(context, models.Service).\
  297. filter_by(id=service_id).\
  298. soft_delete(synchronize_session=False)
  299. if service.binary == 'nova-compute':
  300. # TODO(sbauza): Remove the service_id filter in a later release
  301. # once we are sure that all compute nodes report the host field
  302. model_query(context, models.ComputeNode).\
  303. filter(or_(models.ComputeNode.service_id == service_id,
  304. models.ComputeNode.host == service['host'])).\
  305. soft_delete(synchronize_session=False)
  306. @pick_context_manager_reader
  307. def service_get(context, service_id):
  308. query = model_query(context, models.Service).filter_by(id=service_id)
  309. result = query.first()
  310. if not result:
  311. raise exception.ServiceNotFound(service_id=service_id)
  312. return result
  313. @pick_context_manager_reader
  314. def service_get_by_uuid(context, service_uuid):
  315. query = model_query(context, models.Service).filter_by(uuid=service_uuid)
  316. result = query.first()
  317. if not result:
  318. raise exception.ServiceNotFound(service_id=service_uuid)
  319. return result
  320. @pick_context_manager_reader_allow_async
  321. def service_get_minimum_version(context, binaries):
  322. min_versions = context.session.query(
  323. models.Service.binary,
  324. func.min(models.Service.version)).\
  325. filter(models.Service.binary.in_(binaries)).\
  326. filter(models.Service.deleted == 0).\
  327. filter(models.Service.forced_down == false()).\
  328. group_by(models.Service.binary)
  329. return dict(min_versions)
  330. @pick_context_manager_reader
  331. def service_get_all(context, disabled=None):
  332. query = model_query(context, models.Service)
  333. if disabled is not None:
  334. query = query.filter_by(disabled=disabled)
  335. return query.all()
  336. @pick_context_manager_reader
  337. def service_get_all_by_topic(context, topic):
  338. return model_query(context, models.Service, read_deleted="no").\
  339. filter_by(disabled=False).\
  340. filter_by(topic=topic).\
  341. all()
  342. @pick_context_manager_reader
  343. def service_get_by_host_and_topic(context, host, topic):
  344. return model_query(context, models.Service, read_deleted="no").\
  345. filter_by(disabled=False).\
  346. filter_by(host=host).\
  347. filter_by(topic=topic).\
  348. first()
  349. @pick_context_manager_reader
  350. def service_get_all_by_binary(context, binary, include_disabled=False):
  351. query = model_query(context, models.Service).filter_by(binary=binary)
  352. if not include_disabled:
  353. query = query.filter_by(disabled=False)
  354. return query.all()
  355. @pick_context_manager_reader
  356. def service_get_all_computes_by_hv_type(context, hv_type,
  357. include_disabled=False):
  358. query = model_query(context, models.Service, read_deleted="no").\
  359. filter_by(binary='nova-compute')
  360. if not include_disabled:
  361. query = query.filter_by(disabled=False)
  362. query = query.join(models.ComputeNode,
  363. models.Service.host == models.ComputeNode.host).\
  364. filter(models.ComputeNode.hypervisor_type == hv_type).\
  365. distinct('host')
  366. return query.all()
  367. @pick_context_manager_reader
  368. def service_get_by_host_and_binary(context, host, binary):
  369. result = model_query(context, models.Service, read_deleted="no").\
  370. filter_by(host=host).\
  371. filter_by(binary=binary).\
  372. first()
  373. if not result:
  374. raise exception.HostBinaryNotFound(host=host, binary=binary)
  375. return result
  376. @pick_context_manager_reader
  377. def service_get_all_by_host(context, host):
  378. return model_query(context, models.Service, read_deleted="no").\
  379. filter_by(host=host).\
  380. all()
  381. @pick_context_manager_reader_allow_async
  382. def service_get_by_compute_host(context, host):
  383. result = model_query(context, models.Service, read_deleted="no").\
  384. filter_by(host=host).\
  385. filter_by(binary='nova-compute').\
  386. first()
  387. if not result:
  388. raise exception.ComputeHostNotFound(host=host)
  389. return result
  390. @pick_context_manager_writer
  391. def service_create(context, values):
  392. service_ref = models.Service()
  393. service_ref.update(values)
  394. # We only auto-disable nova-compute services since those are the only
  395. # ones that can be enabled using the os-services REST API and they are
  396. # the only ones where being disabled means anything. It does
  397. # not make sense to be able to disable non-compute services like
  398. # nova-scheduler or nova-osapi_compute since that does nothing.
  399. if not CONF.enable_new_services and values.get('binary') == 'nova-compute':
  400. msg = _("New compute service disabled due to config option.")
  401. service_ref.disabled = True
  402. service_ref.disabled_reason = msg
  403. try:
  404. service_ref.save(context.session)
  405. except db_exc.DBDuplicateEntry as e:
  406. if 'binary' in e.columns:
  407. raise exception.ServiceBinaryExists(host=values.get('host'),
  408. binary=values.get('binary'))
  409. raise exception.ServiceTopicExists(host=values.get('host'),
  410. topic=values.get('topic'))
  411. return service_ref
  412. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  413. @pick_context_manager_writer
  414. def service_update(context, service_id, values):
  415. service_ref = service_get(context, service_id)
  416. # Only servicegroup.drivers.db.DbDriver._report_state() updates
  417. # 'report_count', so if that value changes then store the timestamp
  418. # as the last time we got a state report.
  419. if 'report_count' in values:
  420. if values['report_count'] > service_ref.report_count:
  421. service_ref.last_seen_up = timeutils.utcnow()
  422. service_ref.update(values)
  423. return service_ref
  424. ###################
  425. def _compute_node_select(context, filters=None, limit=None, marker=None):
  426. if filters is None:
  427. filters = {}
  428. cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn')
  429. select = sa.select([cn_tbl])
  430. if context.read_deleted == "no":
  431. select = select.where(cn_tbl.c.deleted == 0)
  432. if "compute_id" in filters:
  433. select = select.where(cn_tbl.c.id == filters["compute_id"])
  434. if "service_id" in filters:
  435. select = select.where(cn_tbl.c.service_id == filters["service_id"])
  436. if "host" in filters:
  437. select = select.where(cn_tbl.c.host == filters["host"])
  438. if "hypervisor_hostname" in filters:
  439. hyp_hostname = filters["hypervisor_hostname"]
  440. select = select.where(cn_tbl.c.hypervisor_hostname == hyp_hostname)
  441. if "mapped" in filters:
  442. select = select.where(cn_tbl.c.mapped < filters['mapped'])
  443. if marker is not None:
  444. try:
  445. compute_node_get(context, marker)
  446. except exception.ComputeHostNotFound:
  447. raise exception.MarkerNotFound(marker=marker)
  448. select = select.where(cn_tbl.c.id > marker)
  449. if limit is not None:
  450. select = select.limit(limit)
  451. # Explicitly order by id, so we're not dependent on the native sort
  452. # order of the underlying DB.
  453. select = select.order_by(asc("id"))
  454. return select
  455. def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
  456. select = _compute_node_select(context, filters, limit=limit, marker=marker)
  457. engine = get_engine(context=context)
  458. conn = engine.connect()
  459. results = conn.execute(select).fetchall()
  460. # Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
  461. results = [dict(r) for r in results]
  462. conn.close()
  463. return results
  464. @pick_context_manager_reader
  465. def compute_node_get(context, compute_id):
  466. results = _compute_node_fetchall(context, {"compute_id": compute_id})
  467. if not results:
  468. raise exception.ComputeHostNotFound(host=compute_id)
  469. return results[0]
  470. @pick_context_manager_reader
  471. def compute_node_get_model(context, compute_id):
  472. # TODO(edleafe): remove once the compute node resource provider migration
  473. # is complete, and this distinction is no longer necessary.
  474. result = model_query(context, models.ComputeNode).\
  475. filter_by(id=compute_id).\
  476. first()
  477. if not result:
  478. raise exception.ComputeHostNotFound(host=compute_id)
  479. return result
  480. @pick_context_manager_reader
  481. def compute_nodes_get_by_service_id(context, service_id):
  482. results = _compute_node_fetchall(context, {"service_id": service_id})
  483. if not results:
  484. raise exception.ServiceNotFound(service_id=service_id)
  485. return results
  486. @pick_context_manager_reader
  487. def compute_node_get_by_host_and_nodename(context, host, nodename):
  488. results = _compute_node_fetchall(context,
  489. {"host": host, "hypervisor_hostname": nodename})
  490. if not results:
  491. raise exception.ComputeHostNotFound(host=host)
  492. return results[0]
  493. @pick_context_manager_reader
  494. def compute_node_get_by_nodename(context, hypervisor_hostname):
  495. results = _compute_node_fetchall(context,
  496. {"hypervisor_hostname": hypervisor_hostname})
  497. if not results:
  498. raise exception.ComputeHostNotFound(host=hypervisor_hostname)
  499. return results[0]
  500. @pick_context_manager_reader_allow_async
  501. def compute_node_get_all_by_host(context, host):
  502. results = _compute_node_fetchall(context, {"host": host})
  503. if not results:
  504. raise exception.ComputeHostNotFound(host=host)
  505. return results
  506. @pick_context_manager_reader
  507. def compute_node_get_all(context):
  508. return _compute_node_fetchall(context)
  509. @pick_context_manager_reader
  510. def compute_node_get_all_mapped_less_than(context, mapped_less_than):
  511. return _compute_node_fetchall(context,
  512. {'mapped': mapped_less_than})
  513. @pick_context_manager_reader
  514. def compute_node_get_all_by_pagination(context, limit=None, marker=None):
  515. return _compute_node_fetchall(context, limit=limit, marker=marker)
  516. @pick_context_manager_reader
  517. def compute_node_search_by_hypervisor(context, hypervisor_match):
  518. field = models.ComputeNode.hypervisor_hostname
  519. return model_query(context, models.ComputeNode).\
  520. filter(field.like('%%%s%%' % hypervisor_match)).\
  521. all()
  522. @pick_context_manager_writer
  523. def compute_node_create(context, values):
  524. """Creates a new ComputeNode and populates the capacity fields
  525. with the most recent data.
  526. """
  527. convert_objects_related_datetimes(values)
  528. compute_node_ref = models.ComputeNode()
  529. compute_node_ref.update(values)
  530. try:
  531. compute_node_ref.save(context.session)
  532. except db_exc.DBDuplicateEntry:
  533. with excutils.save_and_reraise_exception(logger=LOG) as err_ctx:
  534. # Check to see if we have a (soft) deleted ComputeNode with the
  535. # same UUID and if so just update it and mark as no longer (soft)
  536. # deleted. See bug 1839560 for details.
  537. if 'uuid' in values:
  538. # Get a fresh context for a new DB session and allow it to
  539. # get a deleted record.
  540. ctxt = nova.context.get_admin_context(read_deleted='yes')
  541. compute_node_ref = _compute_node_get_and_update_deleted(
  542. ctxt, values)
  543. # If we didn't get anything back we failed to find the node
  544. # by uuid and update it so re-raise the DBDuplicateEntry.
  545. if compute_node_ref:
  546. err_ctx.reraise = False
  547. return compute_node_ref
  548. @pick_context_manager_writer
  549. def _compute_node_get_and_update_deleted(context, values):
  550. """Find a ComputeNode by uuid, update and un-delete it.
  551. This is a special case from the ``compute_node_create`` method which
  552. needs to be separate to get a new Session.
  553. This method will update the ComputeNode, if found, to have deleted=0 and
  554. deleted_at=None values.
  555. :param context: request auth context which should be able to read deleted
  556. records
  557. :param values: values used to update the ComputeNode record - must include
  558. uuid
  559. :return: updated ComputeNode sqlalchemy model object if successfully found
  560. and updated, None otherwise
  561. """
  562. cn = model_query(
  563. context, models.ComputeNode).filter_by(uuid=values['uuid']).first()
  564. if cn:
  565. # Update with the provided values but un-soft-delete.
  566. update_values = copy.deepcopy(values)
  567. update_values['deleted'] = 0
  568. update_values['deleted_at'] = None
  569. return compute_node_update(context, cn.id, update_values)
  570. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  571. @pick_context_manager_writer
  572. def compute_node_update(context, compute_id, values):
  573. """Updates the ComputeNode record with the most recent data."""
  574. compute_ref = compute_node_get_model(context, compute_id)
  575. # Always update this, even if there's going to be no other
  576. # changes in data. This ensures that we invalidate the
  577. # scheduler cache of compute node data in case of races.
  578. values['updated_at'] = timeutils.utcnow()
  579. convert_objects_related_datetimes(values)
  580. compute_ref.update(values)
  581. return compute_ref
  582. @pick_context_manager_writer
  583. def compute_node_delete(context, compute_id):
  584. """Delete a ComputeNode record."""
  585. result = model_query(context, models.ComputeNode).\
  586. filter_by(id=compute_id).\
  587. soft_delete(synchronize_session=False)
  588. if not result:
  589. raise exception.ComputeHostNotFound(host=compute_id)
  590. @pick_context_manager_reader
  591. def compute_node_statistics(context):
  592. """Compute statistics over all compute nodes."""
  593. engine = get_engine(context=context)
  594. services_tbl = models.Service.__table__
  595. inner_sel = sa.alias(_compute_node_select(context), name='inner_sel')
  596. # TODO(sbauza): Remove the service_id filter in a later release
  597. # once we are sure that all compute nodes report the host field
  598. j = sa.join(
  599. inner_sel, services_tbl,
  600. sql.and_(
  601. sql.or_(
  602. inner_sel.c.host == services_tbl.c.host,
  603. inner_sel.c.service_id == services_tbl.c.id
  604. ),
  605. services_tbl.c.disabled == false(),
  606. services_tbl.c.binary == 'nova-compute',
  607. services_tbl.c.deleted == 0
  608. )
  609. )
  610. # NOTE(jaypipes): This COALESCE() stuff is temporary while the data
  611. # migration to the new resource providers inventories and allocations
  612. # tables is completed.
  613. agg_cols = [
  614. func.count().label('count'),
  615. sql.func.sum(
  616. inner_sel.c.vcpus
  617. ).label('vcpus'),
  618. sql.func.sum(
  619. inner_sel.c.memory_mb
  620. ).label('memory_mb'),
  621. sql.func.sum(
  622. inner_sel.c.local_gb
  623. ).label('local_gb'),
  624. sql.func.sum(
  625. inner_sel.c.vcpus_used
  626. ).label('vcpus_used'),
  627. sql.func.sum(
  628. inner_sel.c.memory_mb_used
  629. ).label('memory_mb_used'),
  630. sql.func.sum(
  631. inner_sel.c.local_gb_used
  632. ).label('local_gb_used'),
  633. sql.func.sum(
  634. inner_sel.c.free_ram_mb
  635. ).label('free_ram_mb'),
  636. sql.func.sum(
  637. inner_sel.c.free_disk_gb
  638. ).label('free_disk_gb'),
  639. sql.func.sum(
  640. inner_sel.c.current_workload
  641. ).label('current_workload'),
  642. sql.func.sum(
  643. inner_sel.c.running_vms
  644. ).label('running_vms'),
  645. sql.func.sum(
  646. inner_sel.c.disk_available_least
  647. ).label('disk_available_least'),
  648. ]
  649. select = sql.select(agg_cols).select_from(j)
  650. conn = engine.connect()
  651. results = conn.execute(select).fetchone()
  652. # Build a dict of the info--making no assumptions about result
  653. fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
  654. 'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb',
  655. 'current_workload', 'running_vms', 'disk_available_least')
  656. results = {field: int(results[idx] or 0)
  657. for idx, field in enumerate(fields)}
  658. conn.close()
  659. return results
  660. ###################
  661. @pick_context_manager_writer
  662. def certificate_create(context, values):
  663. certificate_ref = models.Certificate()
  664. for (key, value) in values.items():
  665. certificate_ref[key] = value
  666. certificate_ref.save(context.session)
  667. return certificate_ref
  668. @pick_context_manager_reader
  669. def certificate_get_all_by_project(context, project_id):
  670. return model_query(context, models.Certificate, read_deleted="no").\
  671. filter_by(project_id=project_id).\
  672. all()
  673. @pick_context_manager_reader
  674. def certificate_get_all_by_user(context, user_id):
  675. return model_query(context, models.Certificate, read_deleted="no").\
  676. filter_by(user_id=user_id).\
  677. all()
  678. @pick_context_manager_reader
  679. def certificate_get_all_by_user_and_project(context, user_id, project_id):
  680. return model_query(context, models.Certificate, read_deleted="no").\
  681. filter_by(user_id=user_id).\
  682. filter_by(project_id=project_id).\
  683. all()
  684. ###################
  685. @require_context
  686. @pick_context_manager_reader
  687. def floating_ip_get(context, id):
  688. try:
  689. result = model_query(context, models.FloatingIp, project_only=True).\
  690. filter_by(id=id).\
  691. options(_joinedload_all('fixed_ip.instance')).\
  692. first()
  693. if not result:
  694. raise exception.FloatingIpNotFound(id=id)
  695. except db_exc.DBError:
  696. LOG.warning("Invalid floating IP ID %s in request", id)
  697. raise exception.InvalidID(id=id)
  698. return result
  699. @require_context
  700. @pick_context_manager_reader
  701. def floating_ip_get_pools(context):
  702. pools = []
  703. for result in model_query(context, models.FloatingIp,
  704. (models.FloatingIp.pool,)).distinct():
  705. pools.append({'name': result[0]})
  706. return pools
  707. @require_context
  708. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  709. @pick_context_manager_writer
  710. def floating_ip_allocate_address(context, project_id, pool,
  711. auto_assigned=False):
  712. nova.context.authorize_project_context(context, project_id)
  713. floating_ip_ref = model_query(context, models.FloatingIp,
  714. read_deleted="no").\
  715. filter_by(fixed_ip_id=None).\
  716. filter_by(project_id=None).\
  717. filter_by(pool=pool).\
  718. first()
  719. if not floating_ip_ref:
  720. raise exception.NoMoreFloatingIps()
  721. params = {'project_id': project_id, 'auto_assigned': auto_assigned}
  722. rows_update = model_query(context, models.FloatingIp, read_deleted="no").\
  723. filter_by(id=floating_ip_ref['id']).\
  724. filter_by(fixed_ip_id=None).\
  725. filter_by(project_id=None).\
  726. filter_by(pool=pool).\
  727. update(params, synchronize_session='evaluate')
  728. if not rows_update:
  729. LOG.debug('The row was updated in a concurrent transaction, '
  730. 'we will fetch another one')
  731. raise db_exc.RetryRequest(exception.FloatingIpAllocateFailed())
  732. return floating_ip_ref['address']
  733. @require_context
  734. @pick_context_manager_writer
  735. def floating_ip_bulk_create(context, ips, want_result=True):
  736. try:
  737. tab = models.FloatingIp().__table__
  738. context.session.execute(tab.insert(), ips)
  739. except db_exc.DBDuplicateEntry as e:
  740. raise exception.FloatingIpExists(address=e.value)
  741. if want_result:
  742. return model_query(context, models.FloatingIp).filter(
  743. models.FloatingIp.address.in_(
  744. [ip['address'] for ip in ips])).all()
  745. def _ip_range_splitter(ips, block_size=256):
  746. """Yields blocks of IPs no more than block_size elements long."""
  747. out = []
  748. count = 0
  749. for ip in ips:
  750. out.append(ip['address'])
  751. count += 1
  752. if count > block_size - 1:
  753. yield out
  754. out = []
  755. count = 0
  756. if out:
  757. yield out
  758. @require_context
  759. @pick_context_manager_writer
  760. def floating_ip_bulk_destroy(context, ips):
  761. project_id_to_quota_count = collections.defaultdict(int)
  762. for ip_block in _ip_range_splitter(ips):
  763. # Find any floating IPs that were not auto_assigned and
  764. # thus need quota released.
  765. query = model_query(context, models.FloatingIp).\
  766. filter(models.FloatingIp.address.in_(ip_block)).\
  767. filter_by(auto_assigned=False)
  768. for row in query.all():
  769. # The count is negative since we release quota by
  770. # reserving negative quota.
  771. project_id_to_quota_count[row['project_id']] -= 1
  772. # Delete the floating IPs.
  773. model_query(context, models.FloatingIp).\
  774. filter(models.FloatingIp.address.in_(ip_block)).\
  775. soft_delete(synchronize_session='fetch')
  776. @require_context
  777. @pick_context_manager_writer
  778. def floating_ip_create(context, values):
  779. floating_ip_ref = models.FloatingIp()
  780. floating_ip_ref.update(values)
  781. try:
  782. floating_ip_ref.save(context.session)
  783. except db_exc.DBDuplicateEntry:
  784. raise exception.FloatingIpExists(address=values['address'])
  785. return floating_ip_ref
  786. @require_context
  787. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  788. @pick_context_manager_writer
  789. def floating_ip_fixed_ip_associate(context, floating_address,
  790. fixed_address, host):
  791. fixed_ip_ref = model_query(context, models.FixedIp).\
  792. filter_by(address=fixed_address).\
  793. options(joinedload('network')).\
  794. first()
  795. if not fixed_ip_ref:
  796. raise exception.FixedIpNotFoundForAddress(address=fixed_address)
  797. rows = model_query(context, models.FloatingIp).\
  798. filter_by(address=floating_address).\
  799. filter(models.FloatingIp.project_id ==
  800. context.project_id).\
  801. filter(or_(models.FloatingIp.fixed_ip_id ==
  802. fixed_ip_ref['id'],
  803. models.FloatingIp.fixed_ip_id.is_(None))).\
  804. update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host})
  805. if not rows:
  806. raise exception.FloatingIpAssociateFailed(address=floating_address)
  807. return fixed_ip_ref
  808. @require_context
  809. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  810. @pick_context_manager_writer
  811. def floating_ip_deallocate(context, address):
  812. return model_query(context, models.FloatingIp).\
  813. filter_by(address=address).\
  814. filter(and_(models.FloatingIp.project_id != null()),
  815. models.FloatingIp.fixed_ip_id == null()).\
  816. update({'project_id': None,
  817. 'host': None,
  818. 'auto_assigned': False},
  819. synchronize_session=False)
  820. @require_context
  821. @pick_context_manager_writer
  822. def floating_ip_destroy(context, address):
  823. model_query(context, models.FloatingIp).\
  824. filter_by(address=address).\
  825. delete()
  826. @require_context
  827. @pick_context_manager_writer
  828. def floating_ip_disassociate(context, address):
  829. floating_ip_ref = model_query(context,
  830. models.FloatingIp).\
  831. filter_by(address=address).\
  832. first()
  833. if not floating_ip_ref:
  834. raise exception.FloatingIpNotFoundForAddress(address=address)
  835. fixed_ip_ref = model_query(context, models.FixedIp).\
  836. filter_by(id=floating_ip_ref['fixed_ip_id']).\
  837. options(joinedload('network')).\
  838. first()
  839. floating_ip_ref.fixed_ip_id = None
  840. floating_ip_ref.host = None
  841. return fixed_ip_ref
  842. def _floating_ip_get_all(context):
  843. return model_query(context, models.FloatingIp, read_deleted="no")
  844. @pick_context_manager_reader
  845. def floating_ip_get_all(context):
  846. floating_ip_refs = _floating_ip_get_all(context).\
  847. options(joinedload('fixed_ip')).\
  848. all()
  849. if not floating_ip_refs:
  850. raise exception.NoFloatingIpsDefined()
  851. return floating_ip_refs
  852. @pick_context_manager_reader
  853. def floating_ip_get_all_by_host(context, host):
  854. floating_ip_refs = _floating_ip_get_all(context).\
  855. filter_by(host=host).\
  856. options(joinedload('fixed_ip')).\
  857. all()
  858. if not floating_ip_refs:
  859. raise exception.FloatingIpNotFoundForHost(host=host)
  860. return floating_ip_refs
  861. @require_context
  862. @pick_context_manager_reader
  863. def floating_ip_get_all_by_project(context, project_id):
  864. nova.context.authorize_project_context(context, project_id)
  865. # TODO(tr3buchet): why do we not want auto_assigned floating IPs here?
  866. return _floating_ip_get_all(context).\
  867. filter_by(project_id=project_id).\
  868. filter_by(auto_assigned=False).\
  869. options(_joinedload_all('fixed_ip.instance')).\
  870. all()
  871. @require_context
  872. @pick_context_manager_reader
  873. def floating_ip_get_by_address(context, address):
  874. return _floating_ip_get_by_address(context, address)
  875. def _floating_ip_get_by_address(context, address):
  876. # if address string is empty explicitly set it to None
  877. if not address:
  878. address = None
  879. try:
  880. result = model_query(context, models.FloatingIp).\
  881. filter_by(address=address).\
  882. options(_joinedload_all('fixed_ip.instance')).\
  883. first()
  884. if not result:
  885. raise exception.FloatingIpNotFoundForAddress(address=address)
  886. except db_exc.DBError:
  887. msg = _("Invalid floating IP %s in request") % address
  888. LOG.warning(msg)
  889. raise exception.InvalidIpAddressError(msg)
  890. # If the floating IP has a project ID set, check to make sure
  891. # the non-admin user has access.
  892. if result.project_id and nova.context.is_user_context(context):
  893. nova.context.authorize_project_context(context, result.project_id)
  894. return result
  895. @require_context
  896. @pick_context_manager_reader
  897. def floating_ip_get_by_fixed_address(context, fixed_address):
  898. return model_query(context, models.FloatingIp).\
  899. outerjoin(models.FixedIp,
  900. models.FixedIp.id ==
  901. models.FloatingIp.fixed_ip_id).\
  902. filter(models.FixedIp.address == fixed_address).\
  903. all()
  904. @require_context
  905. @pick_context_manager_reader
  906. def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id):
  907. return model_query(context, models.FloatingIp).\
  908. filter_by(fixed_ip_id=fixed_ip_id).\
  909. all()
  910. @require_context
  911. @pick_context_manager_writer
  912. def floating_ip_update(context, address, values):
  913. float_ip_ref = _floating_ip_get_by_address(context, address)
  914. float_ip_ref.update(values)
  915. try:
  916. float_ip_ref.save(context.session)
  917. except db_exc.DBDuplicateEntry:
  918. raise exception.FloatingIpExists(address=values['address'])
  919. return float_ip_ref
  920. ###################
  921. @require_context
  922. @pick_context_manager_reader
  923. def dnsdomain_get(context, fqdomain):
  924. return model_query(context, models.DNSDomain, read_deleted="no").\
  925. filter_by(domain=fqdomain).\
  926. with_for_update().\
  927. first()
  928. def _dnsdomain_get_or_create(context, fqdomain):
  929. domain_ref = dnsdomain_get(context, fqdomain)
  930. if not domain_ref:
  931. dns_ref = models.DNSDomain()
  932. dns_ref.update({'domain': fqdomain,
  933. 'availability_zone': None,
  934. 'project_id': None})
  935. return dns_ref
  936. return domain_ref
  937. @pick_context_manager_writer
  938. def dnsdomain_register_for_zone(context, fqdomain, zone):
  939. domain_ref = _dnsdomain_get_or_create(context, fqdomain)
  940. domain_ref.scope = 'private'
  941. domain_ref.availability_zone = zone
  942. context.session.add(domain_ref)
  943. @pick_context_manager_writer
  944. def dnsdomain_register_for_project(context, fqdomain, project):
  945. domain_ref = _dnsdomain_get_or_create(context, fqdomain)
  946. domain_ref.scope = 'public'
  947. domain_ref.project_id = project
  948. context.session.add(domain_ref)
  949. @pick_context_manager_writer
  950. def dnsdomain_unregister(context, fqdomain):
  951. model_query(context, models.DNSDomain).\
  952. filter_by(domain=fqdomain).\
  953. delete()
  954. @pick_context_manager_reader
  955. def dnsdomain_get_all(context):
  956. return model_query(context, models.DNSDomain, read_deleted="no").all()
  957. ###################
  958. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  959. @pick_context_manager_writer
  960. def fixed_ip_associate(context, address, instance_uuid, network_id=None,
  961. reserved=False, virtual_interface_id=None):
  962. """Keyword arguments:
  963. reserved -- should be a boolean value(True or False), exact value will be
  964. used to filter on the fixed IP address
  965. """
  966. if not uuidutils.is_uuid_like(instance_uuid):
  967. raise exception.InvalidUUID(uuid=instance_uuid)
  968. network_or_none = or_(models.FixedIp.network_id == network_id,
  969. models.FixedIp.network_id == null())
  970. fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
  971. filter(network_or_none).\
  972. filter_by(reserved=reserved).\
  973. filter_by(address=address).\
  974. first()
  975. if fixed_ip_ref is None:
  976. raise exception.FixedIpNotFoundForNetwork(address=address,
  977. network_uuid=network_id)
  978. if fixed_ip_ref.instance_uuid:
  979. raise exception.FixedIpAlreadyInUse(address=address,
  980. instance_uuid=instance_uuid)
  981. params = {'instance_uuid': instance_uuid,
  982. 'allocated': virtual_interface_id is not None}
  983. if not fixed_ip_ref.network_id:
  984. params['network_id'] = network_id
  985. if virtual_interface_id:
  986. params['virtual_interface_id'] = virtual_interface_id
  987. rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
  988. filter_by(id=fixed_ip_ref.id).\
  989. filter(network_or_none).\
  990. filter_by(reserved=reserved).\
  991. filter_by(address=address).\
  992. update(params, synchronize_session='evaluate')
  993. if not rows_updated:
  994. LOG.debug('The row was updated in a concurrent transaction, '
  995. 'we will fetch another row')
  996. raise db_exc.RetryRequest(
  997. exception.FixedIpAssociateFailed(net=network_id))
  998. return fixed_ip_ref
  999. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  1000. @pick_context_manager_writer
  1001. def fixed_ip_associate_pool(context, network_id, instance_uuid=None,
  1002. host=None, virtual_interface_id=None):
  1003. """allocate a fixed ip out of a fixed ip network pool.
  1004. This allocates an unallocated fixed ip out of a specified
  1005. network. We sort by updated_at to hand out the oldest address in
  1006. the list.
  1007. """
  1008. if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
  1009. raise exception.InvalidUUID(uuid=instance_uuid)
  1010. network_or_none = or_(models.FixedIp.network_id == network_id,
  1011. models.FixedIp.network_id == null())
  1012. fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
  1013. filter(network_or_none).\
  1014. filter_by(reserved=False).\
  1015. filter_by(instance_uuid=None).\
  1016. filter_by(host=None).\
  1017. filter_by(leased=False).\
  1018. order_by(asc(models.FixedIp.updated_at)).\
  1019. first()
  1020. if not fixed_ip_ref:
  1021. raise exception.NoMoreFixedIps(net=network_id)
  1022. params = {'allocated': virtual_interface_id is not None}
  1023. if fixed_ip_ref['network_id'] is None:
  1024. params['network_id'] = network_id
  1025. if instance_uuid:
  1026. params['instance_uuid'] = instance_uuid
  1027. if host:
  1028. params['host'] = host
  1029. if virtual_interface_id:
  1030. params['virtual_interface_id'] = virtual_interface_id
  1031. rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
  1032. filter_by(id=fixed_ip_ref['id']).\
  1033. filter_by(network_id=fixed_ip_ref['network_id']).\
  1034. filter_by(reserved=False).\
  1035. filter_by(instance_uuid=None).\
  1036. filter_by(host=None).\
  1037. filter_by(leased=False).\
  1038. filter_by(address=fixed_ip_ref['address']).\
  1039. update(params, synchronize_session='evaluate')
  1040. if not rows_updated:
  1041. LOG.debug('The row was updated in a concurrent transaction, '
  1042. 'we will fetch another row')
  1043. raise db_exc.RetryRequest(
  1044. exception.FixedIpAssociateFailed(net=network_id))
  1045. return fixed_ip_ref
  1046. @require_context
  1047. @pick_context_manager_writer
  1048. def fixed_ip_create(context, values):
  1049. fixed_ip_ref = models.FixedIp()
  1050. fixed_ip_ref.update(values)
  1051. try:
  1052. fixed_ip_ref.save(context.session)
  1053. except db_exc.DBDuplicateEntry:
  1054. raise exception.FixedIpExists(address=values['address'])
  1055. return fixed_ip_ref
  1056. @require_context
  1057. @pick_context_manager_writer
  1058. def fixed_ip_bulk_create(context, ips):
  1059. try:
  1060. tab = models.FixedIp.__table__
  1061. context.session.execute(tab.insert(), ips)
  1062. except db_exc.DBDuplicateEntry as e:
  1063. raise exception.FixedIpExists(address=e.value)
  1064. @require_context
  1065. @pick_context_manager_writer
  1066. def fixed_ip_disassociate(context, address):
  1067. _fixed_ip_get_by_address(context, address).update(
  1068. {'instance_uuid': None,
  1069. 'virtual_interface_id': None})
  1070. @pick_context_manager_writer
  1071. def fixed_ip_disassociate_all_by_timeout(context, host, time):
  1072. # NOTE(vish): only update fixed ips that "belong" to this
  1073. # host; i.e. the network host or the instance
  1074. # host matches. Two queries necessary because
  1075. # join with update doesn't work.
  1076. host_filter = or_(and_(models.Instance.host == host,
  1077. models.Network.multi_host == true()),
  1078. models.Network.host == host)
  1079. result = model_query(context, models.FixedIp, (models.FixedIp.id,),
  1080. read_deleted="no").\
  1081. filter(models.FixedIp.allocated == false()).\
  1082. filter(models.FixedIp.updated_at < time).\
  1083. join((models.Network,
  1084. models.Network.id == models.FixedIp.network_id)).\
  1085. join((models.Instance,
  1086. models.Instance.uuid == models.FixedIp.instance_uuid)).\
  1087. filter(host_filter).\
  1088. all()
  1089. fixed_ip_ids = [fip[0] for fip in result]
  1090. if not fixed_ip_ids:
  1091. return 0
  1092. result = model_query(context, models.FixedIp).\
  1093. filter(models.FixedIp.id.in_(fixed_ip_ids)).\
  1094. update({'instance_uuid': None,
  1095. 'leased': False,
  1096. 'updated_at': timeutils.utcnow()},
  1097. synchronize_session='fetch')
  1098. return result
  1099. @require_context
  1100. @pick_context_manager_reader
  1101. def fixed_ip_get(context, id, get_network=False):
  1102. query = model_query(context, models.FixedIp).filter_by(id=id)
  1103. if get_network:
  1104. query = query.options(joinedload('network'))
  1105. result = query.first()
  1106. if not result:
  1107. raise exception.FixedIpNotFound(id=id)
  1108. # FIXME(sirp): shouldn't we just use project_only here to restrict the
  1109. # results?
  1110. if (nova.context.is_user_context(context) and
  1111. result['instance_uuid'] is not None):
  1112. instance = instance_get_by_uuid(context.elevated(read_deleted='yes'),
  1113. result['instance_uuid'])
  1114. nova.context.authorize_project_context(context, instance.project_id)
  1115. return result
  1116. @pick_context_manager_reader
  1117. def fixed_ip_get_all(context):
  1118. result = model_query(context, models.FixedIp, read_deleted="yes").all()
  1119. if not result:
  1120. raise exception.NoFixedIpsDefined()
  1121. return result
  1122. @require_context
  1123. @pick_context_manager_reader
  1124. def fixed_ip_get_by_address(context, address, columns_to_join=None):
  1125. return _fixed_ip_get_by_address(context, address,
  1126. columns_to_join=columns_to_join)
  1127. def _fixed_ip_get_by_address(context, address, columns_to_join=None):
  1128. if columns_to_join is None:
  1129. columns_to_join = []
  1130. try:
  1131. result = model_query(context, models.FixedIp)
  1132. for column in columns_to_join:
  1133. result = result.options(_joinedload_all(column))
  1134. result = result.filter_by(address=address).first()
  1135. if not result:
  1136. raise exception.FixedIpNotFoundForAddress(address=address)
  1137. except db_exc.DBError:
  1138. msg = _("Invalid fixed IP Address %s in request") % address
  1139. LOG.warning(msg)
  1140. raise exception.FixedIpInvalid(msg)
  1141. # NOTE(sirp): shouldn't we just use project_only here to restrict the
  1142. # results?
  1143. if (nova.context.is_user_context(context) and
  1144. result['instance_uuid'] is not None):
  1145. instance = _instance_get_by_uuid(
  1146. context.elevated(read_deleted='yes'),
  1147. result['instance_uuid'])
  1148. nova.context.authorize_project_context(context,
  1149. instance.project_id)
  1150. return result
  1151. @require_context
  1152. @pick_context_manager_reader
  1153. def fixed_ip_get_by_floating_address(context, floating_address):
  1154. return model_query(context, models.FixedIp).\
  1155. join(models.FloatingIp,
  1156. models.FloatingIp.fixed_ip_id ==
  1157. models.FixedIp.id).\
  1158. filter(models.FloatingIp.address == floating_address).\
  1159. first()
  1160. # NOTE(tr3buchet) please don't invent an exception here, None is fine
  1161. @require_context
  1162. @pick_context_manager_reader
  1163. def fixed_ip_get_by_instance(context, instance_uuid):
  1164. if not uuidutils.is_uuid_like(instance_uuid):
  1165. raise exception.InvalidUUID(uuid=instance_uuid)
  1166. vif_and = and_(models.VirtualInterface.id ==
  1167. models.FixedIp.virtual_interface_id,
  1168. models.VirtualInterface.deleted == 0)
  1169. result = model_query(context, models.FixedIp, read_deleted="no").\
  1170. filter_by(instance_uuid=instance_uuid).\
  1171. outerjoin(models.VirtualInterface, vif_and).\
  1172. options(contains_eager("virtual_interface")).\
  1173. options(joinedload('network')).\
  1174. options(joinedload('floating_ips')).\
  1175. order_by(asc(models.VirtualInterface.created_at),
  1176. asc(models.VirtualInterface.id)).\
  1177. all()
  1178. if not result:
  1179. raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)
  1180. return result
  1181. @pick_context_manager_reader
  1182. def fixed_ip_get_by_host(context, host):
  1183. instance_uuids = _instance_get_all_uuids_by_hosts(
  1184. context, [host]).get(host, [])
  1185. if not instance_uuids:
  1186. return []
  1187. return model_query(context, models.FixedIp).\
  1188. filter(models.FixedIp.instance_uuid.in_(instance_uuids)).\
  1189. all()
  1190. @require_context
  1191. @pick_context_manager_reader
  1192. def fixed_ip_get_by_network_host(context, network_id, host):
  1193. result = model_query(context, models.FixedIp, read_deleted="no").\
  1194. filter_by(network_id=network_id).\
  1195. filter_by(host=host).\
  1196. first()
  1197. if not result:
  1198. raise exception.FixedIpNotFoundForNetworkHost(network_id=network_id,
  1199. host=host)
  1200. return result
  1201. @require_context
  1202. @pick_context_manager_reader
  1203. def fixed_ips_by_virtual_interface(context, vif_id):
  1204. result = model_query(context, models.FixedIp, read_deleted="no").\
  1205. filter_by(virtual_interface_id=vif_id).\
  1206. options(joinedload('network')).\
  1207. options(joinedload('floating_ips')).\
  1208. all()
  1209. return result
  1210. @require_context
  1211. @pick_context_manager_writer
  1212. def fixed_ip_update(context, address, values):
  1213. _fixed_ip_get_by_address(context, address).update(values)
  1214. ###################
  1215. @require_context
  1216. @pick_context_manager_writer
  1217. def virtual_interface_create(context, values):
  1218. """Create a new virtual interface record in the database.
  1219. :param values: = dict containing column values
  1220. """
  1221. try:
  1222. vif_ref = models.VirtualInterface()
  1223. vif_ref.update(values)
  1224. vif_ref.save(context.session)
  1225. except db_exc.DBError:
  1226. LOG.exception("VIF creation failed with a database error.")
  1227. raise exception.VirtualInterfaceCreateException()
  1228. return vif_ref
  1229. def _virtual_interface_query(context):
  1230. return model_query(context, models.VirtualInterface, read_deleted="no")
  1231. @require_context
  1232. @pick_context_manager_writer
  1233. def virtual_interface_update(context, address, values):
  1234. vif_ref = virtual_interface_get_by_address(context, address)
  1235. vif_ref.update(values)
  1236. vif_ref.save(context.session)
  1237. return vif_ref
  1238. @require_context
  1239. @pick_context_manager_reader
  1240. def virtual_interface_get(context, vif_id):
  1241. """Gets a virtual interface from the table.
  1242. :param vif_id: = id of the virtual interface
  1243. """
  1244. vif_ref = _virtual_interface_query(context).\
  1245. filter_by(id=vif_id).\
  1246. first()
  1247. return vif_ref
  1248. @require_context
  1249. @pick_context_manager_reader
  1250. def virtual_interface_get_by_address(context, address):
  1251. """Gets a virtual interface from the table.
  1252. :param address: = the address of the interface you're looking to get
  1253. """
  1254. try:
  1255. vif_ref = _virtual_interface_query(context).\
  1256. filter_by(address=address).\
  1257. first()
  1258. except db_exc.DBError:
  1259. msg = _("Invalid virtual interface address %s in request") % address
  1260. LOG.warning(msg)
  1261. raise exception.InvalidIpAddressError(msg)
  1262. return vif_ref
  1263. @require_context
  1264. @pick_context_manager_reader
  1265. def virtual_interface_get_by_uuid(context, vif_uuid):
  1266. """Gets a virtual interface from the table.
  1267. :param vif_uuid: the uuid of the interface you're looking to get
  1268. """
  1269. vif_ref = _virtual_interface_query(context).\
  1270. filter_by(uuid=vif_uuid).\
  1271. first()
  1272. return vif_ref
  1273. @require_context
  1274. @pick_context_manager_reader_allow_async
  1275. def virtual_interface_get_by_instance(context, instance_uuid):
  1276. """Gets all virtual interfaces for instance.
  1277. :param instance_uuid: = uuid of the instance to retrieve vifs for
  1278. """
  1279. vif_refs = _virtual_interface_query(context).\
  1280. filter_by(instance_uuid=instance_uuid).\
  1281. order_by(asc("created_at"), asc("id")).\
  1282. all()
  1283. return vif_refs
  1284. @require_context
  1285. @pick_context_manager_reader
  1286. def virtual_interface_get_by_instance_and_network(context, instance_uuid,
  1287. network_id):
  1288. """Gets virtual interface for instance that's associated with network."""
  1289. vif_ref = _virtual_interface_query(context).\
  1290. filter_by(instance_uuid=instance_uuid).\
  1291. filter_by(network_id=network_id).\
  1292. first()
  1293. return vif_ref
  1294. @require_context
  1295. @pick_context_manager_writer
  1296. def virtual_interface_delete_by_instance(context, instance_uuid):
  1297. """Delete virtual interface records that are associated
  1298. with the instance given by instance_id.
  1299. :param instance_uuid: = uuid of instance
  1300. """
  1301. _virtual_interface_query(context).\
  1302. filter_by(instance_uuid=instance_uuid).\
  1303. soft_delete()
  1304. @require_context
  1305. @pick_context_manager_writer
  1306. def virtual_interface_delete(context, id):
  1307. """Delete virtual interface records.
  1308. :param id: id of the interface
  1309. """
  1310. _virtual_interface_query(context).\
  1311. filter_by(id=id).\
  1312. soft_delete()
  1313. @require_context
  1314. @pick_context_manager_reader
  1315. def virtual_interface_get_all(context):
  1316. """Get all vifs."""
  1317. vif_refs = _virtual_interface_query(context).all()
  1318. return vif_refs
  1319. ###################
  1320. def _metadata_refs(metadata_dict, meta_class):
  1321. metadata_refs = []
  1322. if metadata_dict:
  1323. for k, v in metadata_dict.items():
  1324. metadata_ref = meta_class()
  1325. metadata_ref['key'] = k
  1326. metadata_ref['value'] = v
  1327. metadata_refs.append(metadata_ref)
  1328. return metadata_refs
  1329. def _validate_unique_server_name(context, name):
  1330. if not CONF.osapi_compute_unique_server_name_scope:
  1331. return
  1332. lowername = name.lower()
  1333. base_query = model_query(context, models.Instance, read_deleted='no').\
  1334. filter(func.lower(models.Instance.hostname) == lowername)
  1335. if CONF.osapi_compute_unique_server_name_scope == 'project':
  1336. instance_with_same_name = base_query.\
  1337. filter_by(project_id=context.project_id).\
  1338. count()
  1339. elif CONF.osapi_compute_unique_server_name_scope == 'global':
  1340. instance_with_same_name = base_query.count()
  1341. else:
  1342. return
  1343. if instance_with_same_name > 0:
  1344. raise exception.InstanceExists(name=lowername)
  1345. def _handle_objects_related_type_conversions(values):
  1346. """Make sure that certain things in values (which may have come from
  1347. an objects.instance.Instance object) are in suitable form for the
  1348. database.
  1349. """
  1350. # NOTE(danms): Make sure IP addresses are passed as strings to
  1351. # the database engine
  1352. for key in ('access_ip_v4', 'access_ip_v6'):
  1353. if key in values and values[key] is not None:
  1354. values[key] = str(values[key])
  1355. datetime_keys = ('created_at', 'deleted_at', 'updated_at',
  1356. 'launched_at', 'terminated_at')
  1357. convert_objects_related_datetimes(values, *datetime_keys)
  1358. def _check_instance_exists_in_project(context, instance_uuid):
  1359. if not model_query(context, models.Instance, read_deleted="no",
  1360. project_only=True).filter_by(
  1361. uuid=instance_uuid).first():
  1362. raise exception.InstanceNotFound(instance_id=instance_uuid)
  1363. @require_context
  1364. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  1365. @pick_context_manager_writer
  1366. def instance_create(context, values):
  1367. """Create a new Instance record in the database.
  1368. context - request context object
  1369. values - dict containing column values.
  1370. """
  1371. default_group = security_group_ensure_default(context)
  1372. values = values.copy()
  1373. values['metadata'] = _metadata_refs(
  1374. values.get('metadata'), models.InstanceMetadata)
  1375. values['system_metadata'] = _metadata_refs(
  1376. values.get('system_metadata'), models.InstanceSystemMetadata)
  1377. _handle_objects_related_type_conversions(values)
  1378. instance_ref = models.Instance()
  1379. if not values.get('uuid'):
  1380. values['uuid'] = uuidutils.generate_uuid()
  1381. instance_ref['info_cache'] = models.InstanceInfoCache()
  1382. info_cache = values.pop('info_cache', None)
  1383. if info_cache is not None:
  1384. instance_ref['info_cache'].update(info_cache)
  1385. security_groups = values.pop('security_groups', [])
  1386. instance_ref['extra'] = models.InstanceExtra()
  1387. instance_ref['extra'].update(
  1388. {'numa_topology': None,
  1389. 'pci_requests': None,
  1390. 'vcpu_model': None,
  1391. 'trusted_certs': None,
  1392. 'resources': None,
  1393. })
  1394. instance_ref['extra'].update(values.pop('extra', {}))
  1395. instance_ref.update(values)
  1396. # Gather the security groups for the instance
  1397. sg_models = []
  1398. if 'default' in security_groups:
  1399. sg_models.append(default_group)
  1400. # Generate a new list, so we don't modify the original
  1401. security_groups = [x for x in security_groups if x != 'default']
  1402. if security_groups:
  1403. sg_models.extend(_security_group_get_by_names(
  1404. context, security_groups))
  1405. if 'hostname' in values:
  1406. _validate_unique_server_name(context, values['hostname'])
  1407. instance_ref.security_groups = sg_models
  1408. context.session.add(instance_ref)
  1409. # create the instance uuid to ec2_id mapping entry for instance
  1410. ec2_instance_create(context, instance_ref['uuid'])
  1411. # Parity with the return value of instance_get_all_by_filters_sort()
  1412. # Obviously a newly-created instance record can't already have a fault
  1413. # record because of the FK constraint, so this is fine.
  1414. instance_ref.fault = None
  1415. return instance_ref
  1416. @require_context
  1417. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  1418. @pick_context_manager_writer
  1419. def instance_destroy(context, instance_uuid, constraint=None,
  1420. hard_delete=False):
  1421. if uuidutils.is_uuid_like(instance_uuid):
  1422. instance_ref = _instance_get_by_uuid(context, instance_uuid)
  1423. else:
  1424. raise exception.InvalidUUID(uuid=instance_uuid)
  1425. query = model_query(context, models.Instance).\
  1426. filter_by(uuid=instance_uuid)
  1427. if constraint is not None:
  1428. query = constraint.apply(models.Instance, query)
  1429. # Either in hard or soft delete, we soft delete the instance first
  1430. # to make sure that that the constraints were met.
  1431. count = query.soft_delete()
  1432. if count == 0:
  1433. raise exception.ConstraintNotMet()
  1434. models_to_delete = [
  1435. models.SecurityGroupInstanceAssociation, models.InstanceInfoCache,
  1436. models.InstanceMetadata, models.InstanceFault, models.InstanceExtra,
  1437. models.InstanceSystemMetadata, models.BlockDeviceMapping,
  1438. models.Migration, models.VirtualInterface
  1439. ]
  1440. # For most referenced models we filter by the instance_uuid column, but for
  1441. # these models we filter by the uuid column.
  1442. filtered_by_uuid = [models.InstanceIdMapping]
  1443. for model in models_to_delete + filtered_by_uuid:
  1444. key = 'instance_uuid' if model not in filtered_by_uuid else 'uuid'
  1445. filter_ = {key: instance_uuid}
  1446. if hard_delete:
  1447. # We need to read any soft-deleted related records to make sure
  1448. # and clean those up as well otherwise we can fail with ForeignKey
  1449. # constraint errors when hard deleting the instance.
  1450. model_query(context, model, read_deleted='yes').filter_by(
  1451. **filter_).delete()
  1452. else:
  1453. model_query(context, model).filter_by(**filter_).soft_delete()
  1454. # NOTE(snikitin): We can't use model_query here, because there is no
  1455. # column 'deleted' in 'tags' or 'console_auth_tokens' tables.
  1456. context.session.query(models.Tag).filter_by(
  1457. resource_id=instance_uuid).delete()
  1458. context.session.query(models.ConsoleAuthToken).filter_by(
  1459. instance_uuid=instance_uuid).delete()
  1460. # NOTE(cfriesen): We intentionally do not soft-delete entries in the
  1461. # instance_actions or instance_actions_events tables because they
  1462. # can be used by operators to find out what actions were performed on a
  1463. # deleted instance. Both of these tables are special-cased in
  1464. # _archive_deleted_rows_for_table().
  1465. if hard_delete:
  1466. # NOTE(ttsiousts): In case of hard delete, we need to remove the
  1467. # instance actions too since instance_uuid is a foreign key and
  1468. # for this we need to delete the corresponding InstanceActionEvents
  1469. actions = context.session.query(models.InstanceAction).filter_by(
  1470. instance_uuid=instance_uuid).all()
  1471. for action in actions:
  1472. context.session.query(models.InstanceActionEvent).filter_by(
  1473. action_id=action.id).delete()
  1474. context.session.query(models.InstanceAction).filter_by(
  1475. instance_uuid=instance_uuid).delete()
  1476. # NOTE(ttsiouts): The instance is the last thing to be deleted in
  1477. # order to respect all constraints
  1478. context.session.query(models.Instance).filter_by(
  1479. uuid=instance_uuid).delete()
  1480. return instance_ref
  1481. @require_context
  1482. @pick_context_manager_reader_allow_async
  1483. def instance_get_by_uuid(context, uuid, columns_to_join=None):
  1484. return _instance_get_by_uuid(context, uuid,
  1485. columns_to_join=columns_to_join)
  1486. def _instance_get_by_uuid(context, uuid, columns_to_join=None):
  1487. result = _build_instance_get(context, columns_to_join=columns_to_join).\
  1488. filter_by(uuid=uuid).\
  1489. first()
  1490. if not result:
  1491. raise exception.InstanceNotFound(instance_id=uuid)
  1492. return result
  1493. @require_context
  1494. @pick_context_manager_reader
  1495. def instance_get(context, instance_id, columns_to_join=None):
  1496. try:
  1497. result = _build_instance_get(context, columns_to_join=columns_to_join
  1498. ).filter_by(id=instance_id).first()
  1499. if not result:
  1500. raise exception.InstanceNotFound(instance_id=instance_id)
  1501. return result
  1502. except db_exc.DBError:
  1503. # NOTE(sdague): catch all in case the db engine chokes on the
  1504. # id because it's too long of an int to store.
  1505. LOG.warning("Invalid instance id %s in request", instance_id)
  1506. raise exception.InvalidID(id=instance_id)
  1507. def _build_instance_get(context, columns_to_join=None):
  1508. query = model_query(context, models.Instance, project_only=True).\
  1509. options(_joinedload_all('security_groups.rules')).\
  1510. options(joinedload('info_cache'))
  1511. if columns_to_join is None:
  1512. columns_to_join = ['metadata', 'system_metadata']
  1513. for column in columns_to_join:
  1514. if column in ['info_cache', 'security_groups']:
  1515. # Already always joined above
  1516. continue
  1517. if 'extra.' in column:
  1518. query = query.options(undefer(column))
  1519. else:
  1520. query = query.options(joinedload(column))
  1521. # NOTE(alaski) Stop lazy loading of columns not needed.
  1522. for col in ['metadata', 'system_metadata']:
  1523. if col not in columns_to_join:
  1524. query = query.options(noload(col))
  1525. return query
  1526. def _instances_fill_metadata(context, instances, manual_joins=None):
  1527. """Selectively fill instances with manually-joined metadata. Note that
  1528. instance will be converted to a dict.
  1529. :param context: security context
  1530. :param instances: list of instances to fill
  1531. :param manual_joins: list of tables to manually join (can be any
  1532. combination of 'metadata' and 'system_metadata' or
  1533. None to take the default of both)
  1534. """
  1535. uuids = [inst['uuid'] for inst in instances]
  1536. if manual_joins is None:
  1537. manual_joins = ['metadata', 'system_metadata']
  1538. meta = collections.defaultdict(list)
  1539. if 'metadata' in manual_joins:
  1540. for row in _instance_metadata_get_multi(context, uuids):
  1541. meta[row['instance_uuid']].append(row)
  1542. sys_meta = collections.defaultdict(list)
  1543. if 'system_metadata' in manual_joins:
  1544. for row in _instance_system_metadata_get_multi(context, uuids):
  1545. sys_meta[row['instance_uuid']].append(row)
  1546. pcidevs = collections.defaultdict(list)
  1547. if 'pci_devices' in manual_joins:
  1548. for row in _instance_pcidevs_get_multi(context, uuids):
  1549. pcidevs[row['instance_uuid']].append(row)
  1550. if 'fault' in manual_joins:
  1551. faults = instance_fault_get_by_instance_uuids(context, uuids,
  1552. latest=True)
  1553. else:
  1554. faults = {}
  1555. filled_instances = []
  1556. for inst in instances:
  1557. inst = dict(inst)
  1558. inst['system_metadata'] = sys_meta[inst['uuid']]
  1559. inst['metadata'] = meta[inst['uuid']]
  1560. if 'pci_devices' in manual_joins:
  1561. inst['pci_devices'] = pcidevs[inst['uuid']]
  1562. inst_faults = faults.get(inst['uuid'])
  1563. inst['fault'] = inst_faults and inst_faults[0] or None
  1564. filled_instances.append(inst)
  1565. return filled_instances
  1566. def _manual_join_columns(columns_to_join):
  1567. """Separate manually joined columns from columns_to_join
  1568. If columns_to_join contains 'metadata', 'system_metadata', 'fault', or
  1569. 'pci_devices' those columns are removed from columns_to_join and added
  1570. to a manual_joins list to be used with the _instances_fill_metadata method.
  1571. The columns_to_join formal parameter is copied and not modified, the return
  1572. tuple has the modified columns_to_join list to be used with joinedload in
  1573. a model query.
  1574. :param:columns_to_join: List of columns to join in a model query.
  1575. :return: tuple of (manual_joins, columns_to_join)
  1576. """
  1577. manual_joins = []
  1578. columns_to_join_new = copy.copy(columns_to_join)
  1579. for column in ('metadata', 'system_metadata', 'pci_devices', 'fault'):
  1580. if column in columns_to_join_new:
  1581. columns_to_join_new.remove(column)
  1582. manual_joins.append(column)
  1583. return manual_joins, columns_to_join_new
  1584. @require_context
  1585. @pick_context_manager_reader
  1586. def instance_get_all(context, columns_to_join=None):
  1587. if columns_to_join is None:
  1588. columns_to_join_new = ['info_cache', 'security_groups']
  1589. manual_joins = ['metadata', 'system_metadata']
  1590. else:
  1591. manual_joins, columns_to_join_new = (
  1592. _manual_join_columns(columns_to_join))
  1593. query = model_query(context, models.Instance)
  1594. for column in columns_to_join_new:
  1595. query = query.options(joinedload(column))
  1596. if not context.is_admin:
  1597. # If we're not admin context, add appropriate filter..
  1598. if context.project_id:
  1599. query = query.filter_by(project_id=context.project_id)
  1600. else:
  1601. query = query.filter_by(user_id=context.user_id)
  1602. instances = query.all()
  1603. return _instances_fill_metadata(context, instances, manual_joins)
  1604. @require_context
  1605. @pick_context_manager_reader_allow_async
  1606. def instance_get_all_by_filters(context, filters, sort_key, sort_dir,
  1607. limit=None, marker=None, columns_to_join=None):
  1608. """Return instances matching all filters sorted by the primary key.
  1609. See instance_get_all_by_filters_sort for more information.
  1610. """
  1611. # Invoke the API with the multiple sort keys and directions using the
  1612. # single sort key/direction
  1613. return instance_get_all_by_filters_sort(context, filters, limit=limit,
  1614. marker=marker,
  1615. columns_to_join=columns_to_join,
  1616. sort_keys=[sort_key],
  1617. sort_dirs=[sort_dir])
  1618. def _get_query_nova_resource_by_changes_time(query, filters, model_object):
  1619. """Filter resources by changes-since or changes-before.
  1620. Special keys are used to tweek the query further::
  1621. | 'changes-since' - only return resources updated after
  1622. | 'changes-before' - only return resources updated before
  1623. Return query results.
  1624. :param query: query to apply filters to.
  1625. :param filters: dictionary of filters with regex values.
  1626. :param model_object: object of the operation target.
  1627. """
  1628. for change_filter in ['changes-since', 'changes-before']:
  1629. if filters and filters.get(change_filter):
  1630. changes_filter_time = timeutils.normalize_time(
  1631. filters.get(change_filter))
  1632. updated_at = getattr(model_object, 'updated_at')
  1633. if change_filter == 'changes-since':
  1634. query = query.filter(updated_at >= changes_filter_time)
  1635. else:
  1636. query = query.filter(updated_at <= changes_filter_time)
  1637. return query
  1638. @require_context
  1639. @pick_context_manager_reader_allow_async
  1640. def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
  1641. columns_to_join=None, sort_keys=None,
  1642. sort_dirs=None):
  1643. """Return instances that match all filters sorted by the given keys.
  1644. Deleted instances will be returned by default, unless there's a filter that
  1645. says otherwise.
  1646. Depending on the name of a filter, matching for that filter is
  1647. performed using either exact matching or as regular expression
  1648. matching. Exact matching is applied for the following filters::
  1649. | ['project_id', 'user_id', 'image_ref',
  1650. | 'vm_state', 'instance_type_id', 'uuid',
  1651. | 'metadata', 'host', 'system_metadata', 'locked', 'hidden']
  1652. Hidden instances will *not* be returned by default, unless there's a
  1653. filter that says otherwise.
  1654. A third type of filter (also using exact matching), filters
  1655. based on instance metadata tags when supplied under a special
  1656. key named 'filter'::
  1657. | filters = {
  1658. | 'filter': [
  1659. | {'name': 'tag-key', 'value': '<metakey>'},
  1660. | {'name': 'tag-value', 'value': '<metaval>'},
  1661. | {'name': 'tag:<metakey>', 'value': '<metaval>'}
  1662. | ]
  1663. | }
  1664. Special keys are used to tweek the query further::
  1665. | 'changes-since' - only return instances updated after
  1666. | 'changes-before' - only return instances updated before
  1667. | 'deleted' - only return (or exclude) deleted instances
  1668. | 'soft_deleted' - modify behavior of 'deleted' to either
  1669. | include or exclude instances whose
  1670. | vm_state is SOFT_DELETED.
  1671. A fourth type of filter (also using exact matching), filters
  1672. based on instance tags (not metadata tags). There are two types
  1673. of these tags:
  1674. `tags` -- One or more strings that will be used to filter results
  1675. in an AND expression: T1 AND T2
  1676. `tags-any` -- One or more strings that will be used to filter results in
  1677. an OR expression: T1 OR T2
  1678. `not-tags` -- One or more strings that will be used to filter results in
  1679. an NOT AND expression: NOT (T1 AND T2)
  1680. `not-tags-any` -- One or more strings that will be used to filter results
  1681. in an NOT OR expression: NOT (T1 OR T2)
  1682. Tags should be represented as list::
  1683. | filters = {
  1684. | 'tags': [some-tag, some-another-tag],
  1685. | 'tags-any: [some-any-tag, some-another-any-tag],
  1686. | 'not-tags: [some-not-tag, some-another-not-tag],
  1687. | 'not-tags-any: [some-not-any-tag, some-another-not-any-tag]
  1688. | }
  1689. """
  1690. # NOTE(mriedem): If the limit is 0 there is no point in even going
  1691. # to the database since nothing is going to be returned anyway.
  1692. if limit == 0:
  1693. return []
  1694. sort_keys, sort_dirs = process_sort_params(sort_keys,
  1695. sort_dirs,
  1696. default_dir='desc')
  1697. if columns_to_join is None:
  1698. columns_to_join_new = ['info_cache', 'security_groups']
  1699. manual_joins = ['metadata', 'system_metadata']
  1700. else:
  1701. manual_joins, columns_to_join_new = (
  1702. _manual_join_columns(columns_to_join))
  1703. query_prefix = context.session.query(models.Instance)
  1704. for column in columns_to_join_new:
  1705. if 'extra.' in column:
  1706. query_prefix = query_prefix.options(undefer(column))
  1707. else:
  1708. query_prefix = query_prefix.options(joinedload(column))
  1709. # Note: order_by is done in the sqlalchemy.utils.py paginate_query(),
  1710. # no need to do it here as well
  1711. # Make a copy of the filters dictionary to use going forward, as we'll
  1712. # be modifying it and we shouldn't affect the caller's use of it.
  1713. filters = copy.deepcopy(filters)
  1714. model_object = models.Instance
  1715. query_prefix = _get_query_nova_resource_by_changes_time(query_prefix,
  1716. filters,
  1717. model_object)
  1718. if 'deleted' in filters:
  1719. # Instances can be soft or hard deleted and the query needs to
  1720. # include or exclude both
  1721. deleted = filters.pop('deleted')
  1722. if deleted:
  1723. if filters.pop('soft_deleted', True):
  1724. delete = or_(
  1725. models.Instance.deleted == models.Instance.id,
  1726. models.Instance.vm_state == vm_states.SOFT_DELETED
  1727. )
  1728. query_prefix = query_prefix.\
  1729. filter(delete)
  1730. else:
  1731. query_prefix = query_prefix.\
  1732. filter(models.Instance.deleted == models.Instance.id)
  1733. else:
  1734. query_prefix = query_prefix.\
  1735. filter_by(deleted=0)
  1736. if not filters.pop('soft_deleted', False):
  1737. # It would be better to have vm_state not be nullable
  1738. # but until then we test it explicitly as a workaround.
  1739. not_soft_deleted = or_(
  1740. models.Instance.vm_state != vm_states.SOFT_DELETED,
  1741. models.Instance.vm_state == null()
  1742. )
  1743. query_prefix = query_prefix.filter(not_soft_deleted)
  1744. if 'cleaned' in filters:
  1745. cleaned = 1 if filters.pop('cleaned') else 0
  1746. query_prefix = query_prefix.filter(models.Instance.cleaned == cleaned)
  1747. if 'tags' in filters:
  1748. tags = filters.pop('tags')
  1749. # We build a JOIN ladder expression for each tag, JOIN'ing
  1750. # the first tag to the instances table, and each subsequent
  1751. # tag to the last JOIN'd tags table
  1752. first_tag = tags.pop(0)
  1753. query_prefix = query_prefix.join(models.Instance.tags)
  1754. query_prefix = query_prefix.filter(models.Tag.tag == first_tag)
  1755. for tag in tags:
  1756. tag_alias = aliased(models.Tag)
  1757. query_prefix = query_prefix.join(tag_alias,
  1758. models.Instance.tags)
  1759. query_prefix = query_prefix.filter(tag_alias.tag == tag)
  1760. if 'tags-any' in filters:
  1761. tags = filters.pop('tags-any')
  1762. tag_alias = aliased(models.Tag)
  1763. query_prefix = query_prefix.join(tag_alias, models.Instance.tags)
  1764. query_prefix = query_prefix.filter(tag_alias.tag.in_(tags))
  1765. if 'not-tags' in filters:
  1766. tags = filters.pop('not-tags')
  1767. first_tag = tags.pop(0)
  1768. subq = query_prefix.session.query(models.Tag.resource_id)
  1769. subq = subq.join(models.Instance.tags)
  1770. subq = subq.filter(models.Tag.tag == first_tag)
  1771. for tag in tags:
  1772. tag_alias = aliased(models.Tag)
  1773. subq = subq.join(tag_alias, models.Instance.tags)
  1774. subq = subq.filter(tag_alias.tag == tag)
  1775. query_prefix = query_prefix.filter(~models.Instance.uuid.in_(subq))
  1776. if 'not-tags-any' in filters:
  1777. tags = filters.pop('not-tags-any')
  1778. query_prefix = query_prefix.filter(~models.Instance.tags.any(
  1779. models.Tag.tag.in_(tags)))
  1780. if not context.is_admin:
  1781. # If we're not admin context, add appropriate filter..
  1782. if context.project_id:
  1783. filters['project_id'] = context.project_id
  1784. else:
  1785. filters['user_id'] = context.user_id
  1786. if 'hidden' not in filters:
  1787. # Filter out hidden instances by default.
  1788. filters['hidden'] = False
  1789. # Filters for exact matches that we can do along with the SQL query...
  1790. # For other filters that don't match this, we will do regexp matching
  1791. exact_match_filter_names = ['project_id', 'user_id', 'image_ref',
  1792. 'vm_state', 'instance_type_id', 'uuid',
  1793. 'metadata', 'host', 'task_state',
  1794. 'system_metadata', 'locked', 'hidden']
  1795. # Filter the query
  1796. query_prefix = _exact_instance_filter(query_prefix,
  1797. filters, exact_match_filter_names)
  1798. if query_prefix is None:
  1799. return []
  1800. query_prefix = _regex_instance_filter(query_prefix, filters)
  1801. # paginate query
  1802. if marker is not None:
  1803. try:
  1804. marker = _instance_get_by_uuid(
  1805. context.elevated(read_deleted='yes'), marker)
  1806. except exception.InstanceNotFound:
  1807. raise exception.MarkerNotFound(marker=marker)
  1808. try:
  1809. query_prefix = sqlalchemyutils.paginate_query(query_prefix,
  1810. models.Instance, limit,
  1811. sort_keys,
  1812. marker=marker,
  1813. sort_dirs=sort_dirs)
  1814. except db_exc.InvalidSortKey:
  1815. raise exception.InvalidSortKey()
  1816. return _instances_fill_metadata(context, query_prefix.all(), manual_joins)
  1817. @require_context
  1818. @pick_context_manager_reader_allow_async
  1819. def instance_get_by_sort_filters(context, sort_keys, sort_dirs, values):
  1820. """Attempt to get a single instance based on a combination of sort
  1821. keys, directions and filter values. This is used to try to find a
  1822. marker instance when we don't have a marker uuid.
  1823. This returns just a uuid of the instance that matched.
  1824. """
  1825. model = models.Instance
  1826. return _model_get_uuid_by_sort_filters(context, model, sort_keys,
  1827. sort_dirs, values)
  1828. def _model_get_uuid_by_sort_filters(context, model, sort_keys, sort_dirs,
  1829. values):
  1830. query = context.session.query(model.uuid)
  1831. # NOTE(danms): Below is a re-implementation of our
  1832. # oslo_db.sqlalchemy.utils.paginate_query() utility. We can't use that
  1833. # directly because it does not return the marker and we need it to.
  1834. # The below is basically the same algorithm, stripped down to just what
  1835. # we need, and augmented with the filter criteria required for us to
  1836. # get back the instance that would correspond to our query.
  1837. # This is our position in sort_keys,sort_dirs,values for the loop below
  1838. key_index = 0
  1839. # We build a list of criteria to apply to the query, which looks
  1840. # approximately like this (assuming all ascending):
  1841. #
  1842. # OR(row.key1 > val1,
  1843. # AND(row.key1 == val1, row.key2 > val2),
  1844. # AND(row.key1 == val1, row.key2 == val2, row.key3 >= val3),
  1845. # )
  1846. #
  1847. # The final key is compared with the "or equal" variant so that
  1848. # a complete match instance is still returned.
  1849. criteria = []
  1850. for skey, sdir, val in zip(sort_keys, sort_dirs, values):
  1851. # Apply ordering to our query for the key, direction we're processing
  1852. if sdir == 'desc':
  1853. query = query.order_by(desc(getattr(model, skey)))
  1854. else:
  1855. query = query.order_by(asc(getattr(model, skey)))
  1856. # Build a list of equivalence requirements on keys we've already
  1857. # processed through the loop. In other words, if we're adding
  1858. # key2 > val2, make sure that key1 == val1
  1859. crit_attrs = []
  1860. for equal_attr in range(0, key_index):
  1861. crit_attrs.append(
  1862. (getattr(model, sort_keys[equal_attr]) == values[equal_attr]))
  1863. model_attr = getattr(model, skey)
  1864. if isinstance(model_attr.type, Boolean):
  1865. model_attr = cast(model_attr, Integer)
  1866. val = int(val)
  1867. if skey == sort_keys[-1]:
  1868. # If we are the last key, then we should use or-equal to
  1869. # allow a complete match to be returned
  1870. if sdir == 'asc':
  1871. crit = (model_attr >= val)
  1872. else:
  1873. crit = (model_attr <= val)
  1874. else:
  1875. # If we're not the last key, then strict greater or less than
  1876. # so we order strictly.
  1877. if sdir == 'asc':
  1878. crit = (model_attr > val)
  1879. else:
  1880. crit = (model_attr < val)
  1881. # AND together all the above
  1882. crit_attrs.append(crit)
  1883. criteria.append(and_(*crit_attrs))
  1884. key_index += 1
  1885. # OR together all the ANDs
  1886. query = query.filter(or_(*criteria))
  1887. # We can't raise InstanceNotFound because we don't have a uuid to
  1888. # be looking for, so just return nothing if no match.
  1889. result = query.limit(1).first()
  1890. if result:
  1891. # We're querying for a single column, which means we get back a
  1892. # tuple of one thing. Strip that out and just return the uuid
  1893. # for our caller.
  1894. return result[0]
  1895. else:
  1896. return result
  1897. def _db_connection_type(db_connection):
  1898. """Returns a lowercase symbol for the db type.
  1899. This is useful when we need to change what we are doing per DB
  1900. (like handling regexes). In a CellsV2 world it probably needs to
  1901. do something better than use the database configuration string.
  1902. """
  1903. db_string = db_connection.split(':')[0].split('+')[0]
  1904. return db_string.lower()
  1905. def _safe_regex_mysql(raw_string):
  1906. """Make regex safe to mysql.
  1907. Certain items like '|' are interpreted raw by mysql REGEX. If you
  1908. search for a single | then you trigger an error because it's
  1909. expecting content on either side.
  1910. For consistency sake we escape all '|'. This does mean we wouldn't
  1911. support something like foo|bar to match completely different
  1912. things, however, one can argue putting such complicated regex into
  1913. name search probably means you are doing this wrong.
  1914. """
  1915. return raw_string.replace('|', '\\|')
  1916. def _get_regexp_ops(connection):
  1917. """Return safety filter and db opts for regex."""
  1918. regexp_op_map = {
  1919. 'postgresql': '~',
  1920. 'mysql': 'REGEXP',
  1921. 'sqlite': 'REGEXP'
  1922. }
  1923. regex_safe_filters = {
  1924. 'mysql': _safe_regex_mysql
  1925. }
  1926. db_type = _db_connection_type(connection)
  1927. return (regex_safe_filters.get(db_type, lambda x: x),
  1928. regexp_op_map.get(db_type, 'LIKE'))
  1929. def _regex_instance_filter(query, filters):
  1930. """Applies regular expression filtering to an Instance query.
  1931. Returns the updated query.
  1932. :param query: query to apply filters to
  1933. :param filters: dictionary of filters with regex values
  1934. """
  1935. model = models.Instance
  1936. safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection)
  1937. for filter_name in filters:
  1938. try:
  1939. column_attr = getattr(model, filter_name)
  1940. except AttributeError:
  1941. continue
  1942. if 'property' == type(column_attr).__name__:
  1943. continue
  1944. filter_val = filters[filter_name]
  1945. # Sometimes the REGEX filter value is not a string
  1946. if not isinstance(filter_val, six.string_types):
  1947. filter_val = str(filter_val)
  1948. if db_regexp_op == 'LIKE':
  1949. query = query.filter(column_attr.op(db_regexp_op)(
  1950. u'%' + filter_val + u'%'))
  1951. else:
  1952. filter_val = safe_regex_filter(filter_val)
  1953. query = query.filter(column_attr.op(db_regexp_op)(
  1954. filter_val))
  1955. return query
  1956. def _exact_instance_filter(query, filters, legal_keys):
  1957. """Applies exact match filtering to an Instance query.
  1958. Returns the updated query. Modifies filters argument to remove
  1959. filters consumed.
  1960. :param query: query to apply filters to
  1961. :param filters: dictionary of filters; values that are lists,
  1962. tuples, sets, or frozensets cause an 'IN' test to
  1963. be performed, while exact matching ('==' operator)
  1964. is used for other values
  1965. :param legal_keys: list of keys to apply exact filtering to
  1966. """
  1967. filter_dict = {}
  1968. model = models.Instance
  1969. # Walk through all the keys
  1970. for key in legal_keys:
  1971. # Skip ones we're not filtering on
  1972. if key not in filters:
  1973. continue
  1974. # OK, filtering on this key; what value do we search for?
  1975. value = filters.pop(key)
  1976. if key in ('metadata', 'system_metadata'):
  1977. column_attr = getattr(model, key)
  1978. if isinstance(value, list):
  1979. for item in value:
  1980. for k, v in item.items():
  1981. query = query.filter(column_attr.any(key=k))
  1982. query = query.filter(column_attr.any(value=v))
  1983. else:
  1984. for k, v in value.items():
  1985. query = query.filter(column_attr.any(key=k))
  1986. query = query.filter(column_attr.any(value=v))
  1987. elif isinstance(value, (list, tuple, set, frozenset)):
  1988. if not value:
  1989. return None # empty IN-predicate; short circuit
  1990. # Looking for values in a list; apply to query directly
  1991. column_attr = getattr(model, key)
  1992. query = query.filter(column_attr.in_(value))
  1993. else:
  1994. # OK, simple exact match; save for later
  1995. filter_dict[key] = value
  1996. # Apply simple exact matches
  1997. if filter_dict:
  1998. query = query.filter(*[getattr(models.Instance, k) == v
  1999. for k, v in filter_dict.items()])
  2000. return query
  2001. def process_sort_params(sort_keys, sort_dirs,
  2002. default_keys=['created_at', 'id'],
  2003. default_dir='asc'):
  2004. """Process the sort parameters to include default keys.
  2005. Creates a list of sort keys and a list of sort directions. Adds the default
  2006. keys to the end of the list if they are not already included.
  2007. When adding the default keys to the sort keys list, the associated
  2008. direction is:
  2009. 1) The first element in the 'sort_dirs' list (if specified), else
  2010. 2) 'default_dir' value (Note that 'asc' is the default value since this is
  2011. the default in sqlalchemy.utils.paginate_query)
  2012. :param sort_keys: List of sort keys to include in the processed list
  2013. :param sort_dirs: List of sort directions to include in the processed list
  2014. :param default_keys: List of sort keys that need to be included in the
  2015. processed list, they are added at the end of the list
  2016. if not already specified.
  2017. :param default_dir: Sort direction associated with each of the default
  2018. keys that are not supplied, used when they are added
  2019. to the processed list
  2020. :returns: list of sort keys, list of sort directions
  2021. :raise exception.InvalidInput: If more sort directions than sort keys
  2022. are specified or if an invalid sort
  2023. direction is specified
  2024. """
  2025. # Determine direction to use for when adding default keys
  2026. if sort_dirs and len(sort_dirs) != 0:
  2027. default_dir_value = sort_dirs[0]
  2028. else:
  2029. default_dir_value = default_dir
  2030. # Create list of keys (do not modify the input list)
  2031. if sort_keys:
  2032. result_keys = list(sort_keys)
  2033. else:
  2034. result_keys = []
  2035. # If a list of directions is not provided, use the default sort direction
  2036. # for all provided keys
  2037. if sort_dirs:
  2038. result_dirs = []
  2039. # Verify sort direction
  2040. for sort_dir in sort_dirs:
  2041. if sort_dir not in ('asc', 'desc'):
  2042. msg = _("Unknown sort direction, must be 'desc' or 'asc'")
  2043. raise exception.InvalidInput(reason=msg)
  2044. result_dirs.append(sort_dir)
  2045. else:
  2046. result_dirs = [default_dir_value for _sort_key in result_keys]
  2047. # Ensure that the key and direction length match
  2048. while len(result_dirs) < len(result_keys):
  2049. result_dirs.append(default_dir_value)
  2050. # Unless more direction are specified, which is an error
  2051. if len(result_dirs) > len(result_keys):
  2052. msg = _("Sort direction size exceeds sort key size")
  2053. raise exception.InvalidInput(reason=msg)
  2054. # Ensure defaults are included
  2055. for key in default_keys:
  2056. if key not in result_keys:
  2057. result_keys.append(key)
  2058. result_dirs.append(default_dir_value)
  2059. return result_keys, result_dirs
  2060. @require_context
  2061. @pick_context_manager_reader_allow_async
  2062. def instance_get_active_by_window_joined(context, begin, end=None,
  2063. project_id=None, host=None,
  2064. columns_to_join=None, limit=None,
  2065. marker=None):
  2066. """Return instances and joins that were active during window."""
  2067. query = context.session.query(models.Instance)
  2068. if columns_to_join is None:
  2069. columns_to_join_new = ['info_cache', 'security_groups']
  2070. manual_joins = ['metadata', 'system_metadata']
  2071. else:
  2072. manual_joins, columns_to_join_new = (
  2073. _manual_join_columns(columns_to_join))
  2074. for column in columns_to_join_new:
  2075. if 'extra.' in column:
  2076. query = query.options(undefer(column))
  2077. else:
  2078. query = query.options(joinedload(column))
  2079. query = query.filter(or_(models.Instance.terminated_at == null(),
  2080. models.Instance.terminated_at > begin))
  2081. if end:
  2082. query = query.filter(models.Instance.launched_at < end)
  2083. if project_id:
  2084. query = query.filter_by(project_id=project_id)
  2085. if host:
  2086. query = query.filter_by(host=host)
  2087. if marker is not None:
  2088. try:
  2089. marker = _instance_get_by_uuid(
  2090. context.elevated(read_deleted='yes'), marker)
  2091. except exception.InstanceNotFound:
  2092. raise exception.MarkerNotFound(marker=marker)
  2093. query = sqlalchemyutils.paginate_query(
  2094. query, models.Instance, limit, ['project_id', 'uuid'], marker=marker)
  2095. return _instances_fill_metadata(context, query.all(), manual_joins)
  2096. def _instance_get_all_query(context, project_only=False, joins=None):
  2097. if joins is None:
  2098. joins = ['info_cache', 'security_groups']
  2099. query = model_query(context,
  2100. models.Instance,
  2101. project_only=project_only)
  2102. for column in joins:
  2103. if 'extra.' in column:
  2104. query = query.options(undefer(column))
  2105. else:
  2106. query = query.options(joinedload(column))
  2107. return query
  2108. @pick_context_manager_reader_allow_async
  2109. def instance_get_all_by_host(context, host, columns_to_join=None):
  2110. query = _instance_get_all_query(context, joins=columns_to_join)
  2111. return _instances_fill_metadata(context,
  2112. query.filter_by(host=host).all(),
  2113. manual_joins=columns_to_join)
  2114. def _instance_get_all_uuids_by_hosts(context, hosts):
  2115. itbl = models.Instance.__table__
  2116. default_deleted_value = itbl.c.deleted.default.arg
  2117. sel = sql.select([itbl.c.host, itbl.c.uuid])
  2118. sel = sel.where(sql.and_(
  2119. itbl.c.deleted == default_deleted_value,
  2120. itbl.c.host.in_(sa.bindparam('hosts', expanding=True))))
  2121. # group the instance UUIDs by hostname
  2122. res = collections.defaultdict(list)
  2123. for rec in context.session.execute(sel, {'hosts': hosts}).fetchall():
  2124. res[rec[0]].append(rec[1])
  2125. return res
  2126. @pick_context_manager_reader
  2127. def instance_get_all_uuids_by_hosts(context, hosts):
  2128. """Return a dict, keyed by hostname, of a list of the instance uuids on the
  2129. host for each supplied hostname, not Instance model objects.
  2130. The dict is a defaultdict of list, thus inspecting the dict for a host not
  2131. in the dict will return an empty list not a KeyError.
  2132. """
  2133. return _instance_get_all_uuids_by_hosts(context, hosts)
  2134. @pick_context_manager_reader
  2135. def instance_get_all_by_host_and_node(context, host, node,
  2136. columns_to_join=None):
  2137. if columns_to_join is None:
  2138. manual_joins = []
  2139. else:
  2140. candidates = ['system_metadata', 'metadata']
  2141. manual_joins = [x for x in columns_to_join if x in candidates]
  2142. columns_to_join = list(set(columns_to_join) - set(candidates))
  2143. return _instances_fill_metadata(context,
  2144. _instance_get_all_query(
  2145. context,
  2146. joins=columns_to_join).filter_by(host=host).
  2147. filter_by(node=node).all(), manual_joins=manual_joins)
  2148. @pick_context_manager_reader
  2149. def instance_get_all_by_host_and_not_type(context, host, type_id=None):
  2150. return _instances_fill_metadata(context,
  2151. _instance_get_all_query(context).filter_by(host=host).
  2152. filter(models.Instance.instance_type_id != type_id).all())
  2153. @pick_context_manager_reader
  2154. def instance_get_all_by_grantee_security_groups(context, group_ids):
  2155. if not group_ids:
  2156. return []
  2157. return _instances_fill_metadata(context,
  2158. _instance_get_all_query(context).
  2159. join(models.Instance.security_groups).
  2160. filter(models.SecurityGroup.rules.any(
  2161. models.SecurityGroupIngressRule.group_id.in_(group_ids))).
  2162. all())
  2163. @require_context
  2164. @pick_context_manager_reader
  2165. def instance_floating_address_get_all(context, instance_uuid):
  2166. if not uuidutils.is_uuid_like(instance_uuid):
  2167. raise exception.InvalidUUID(uuid=instance_uuid)
  2168. floating_ips = model_query(context,
  2169. models.FloatingIp,
  2170. (models.FloatingIp.address,)).\
  2171. join(models.FloatingIp.fixed_ip).\
  2172. filter_by(instance_uuid=instance_uuid)
  2173. return [floating_ip.address for floating_ip in floating_ips]
  2174. # NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0.
  2175. @pick_context_manager_reader
  2176. def instance_get_all_hung_in_rebooting(context, reboot_window):
  2177. reboot_window = (timeutils.utcnow() -
  2178. datetime.timedelta(seconds=reboot_window))
  2179. # NOTE(danms): this is only used in the _poll_rebooting_instances()
  2180. # call in compute/manager, so we can avoid the metadata lookups
  2181. # explicitly
  2182. return _instances_fill_metadata(context,
  2183. model_query(context, models.Instance).
  2184. filter(models.Instance.updated_at <= reboot_window).
  2185. filter_by(task_state=task_states.REBOOTING).all(),
  2186. manual_joins=[])
  2187. def _retry_instance_update():
  2188. """Wrap with oslo_db_api.wrap_db_retry, and also retry on
  2189. UnknownInstanceUpdateConflict.
  2190. """
  2191. exception_checker = \
  2192. lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,))
  2193. return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
  2194. exception_checker=exception_checker)
  2195. @require_context
  2196. @_retry_instance_update()
  2197. @pick_context_manager_writer
  2198. def instance_update(context, instance_uuid, values, expected=None):
  2199. return _instance_update(context, instance_uuid, values, expected)
  2200. @require_context
  2201. @_retry_instance_update()
  2202. @pick_context_manager_writer
  2203. def instance_update_and_get_original(context, instance_uuid, values,
  2204. columns_to_join=None, expected=None):
  2205. """Set the given properties on an instance and update it. Return
  2206. a shallow copy of the original instance reference, as well as the
  2207. updated one.
  2208. :param context: = request context object
  2209. :param instance_uuid: = instance uuid
  2210. :param values: = dict containing column values
  2211. If "expected_task_state" exists in values, the update can only happen
  2212. when the task state before update matches expected_task_state. Otherwise
  2213. a UnexpectedTaskStateError is thrown.
  2214. :returns: a tuple of the form (old_instance_ref, new_instance_ref)
  2215. Raises NotFound if instance does not exist.
  2216. """
  2217. instance_ref = _instance_get_by_uuid(context, instance_uuid,
  2218. columns_to_join=columns_to_join)
  2219. return (copy.copy(instance_ref), _instance_update(
  2220. context, instance_uuid, values, expected, original=instance_ref))
  2221. # NOTE(danms): This updates the instance's metadata list in-place and in
  2222. # the database to avoid stale data and refresh issues. It assumes the
  2223. # delete=True behavior of instance_metadata_update(...)
  2224. def _instance_metadata_update_in_place(context, instance, metadata_type, model,
  2225. metadata):
  2226. metadata = dict(metadata)
  2227. to_delete = []
  2228. for keyvalue in instance[metadata_type]:
  2229. key = keyvalue['key']
  2230. if key in metadata:
  2231. keyvalue['value'] = metadata.pop(key)
  2232. elif key not in metadata:
  2233. to_delete.append(keyvalue)
  2234. # NOTE: we have to hard_delete here otherwise we will get more than one
  2235. # system_metadata record when we read deleted for an instance;
  2236. # regular metadata doesn't have the same problem because we don't
  2237. # allow reading deleted regular metadata anywhere.
  2238. if metadata_type == 'system_metadata':
  2239. for condemned in to_delete:
  2240. context.session.delete(condemned)
  2241. instance[metadata_type].remove(condemned)
  2242. else:
  2243. for condemned in to_delete:
  2244. condemned.soft_delete(context.session)
  2245. for key, value in metadata.items():
  2246. newitem = model()
  2247. newitem.update({'key': key, 'value': value,
  2248. 'instance_uuid': instance['uuid']})
  2249. context.session.add(newitem)
  2250. instance[metadata_type].append(newitem)
  2251. def _instance_update(context, instance_uuid, values, expected, original=None):
  2252. if not uuidutils.is_uuid_like(instance_uuid):
  2253. raise exception.InvalidUUID(uuid=instance_uuid)
  2254. # NOTE(mdbooth): We pop values from this dict below, so we copy it here to
  2255. # ensure there are no side effects for the caller or if we retry the
  2256. # function due to a db conflict.
  2257. updates = copy.copy(values)
  2258. if expected is None:
  2259. expected = {}
  2260. else:
  2261. # Coerce all single values to singleton lists
  2262. expected = {k: [None] if v is None else sqlalchemyutils.to_list(v)
  2263. for (k, v) in expected.items()}
  2264. # Extract 'expected_' values from values dict, as these aren't actually
  2265. # updates
  2266. for field in ('task_state', 'vm_state'):
  2267. expected_field = 'expected_%s' % field
  2268. if expected_field in updates:
  2269. value = updates.pop(expected_field, None)
  2270. # Coerce all single values to singleton lists
  2271. if value is None:
  2272. expected[field] = [None]
  2273. else:
  2274. expected[field] = sqlalchemyutils.to_list(value)
  2275. # Values which need to be updated separately
  2276. metadata = updates.pop('metadata', None)
  2277. system_metadata = updates.pop('system_metadata', None)
  2278. _handle_objects_related_type_conversions(updates)
  2279. # Hostname is potentially unique, but this is enforced in code rather
  2280. # than the DB. The query below races, but the number of users of
  2281. # osapi_compute_unique_server_name_scope is small, and a robust fix
  2282. # will be complex. This is intentionally left as is for the moment.
  2283. if 'hostname' in updates:
  2284. _validate_unique_server_name(context, updates['hostname'])
  2285. compare = models.Instance(uuid=instance_uuid, **expected)
  2286. try:
  2287. instance_ref = model_query(context, models.Instance,
  2288. project_only=True).\
  2289. update_on_match(compare, 'uuid', updates)
  2290. except update_match.NoRowsMatched:
  2291. # Update failed. Try to find why and raise a specific error.
  2292. # We should get here only because our expected values were not current
  2293. # when update_on_match executed. Having failed, we now have a hint that
  2294. # the values are out of date and should check them.
  2295. # This code is made more complex because we are using repeatable reads.
  2296. # If we have previously read the original instance in the current
  2297. # transaction, reading it again will return the same data, even though
  2298. # the above update failed because it has changed: it is not possible to
  2299. # determine what has changed in this transaction. In this case we raise
  2300. # UnknownInstanceUpdateConflict, which will cause the operation to be
  2301. # retried in a new transaction.
  2302. # Because of the above, if we have previously read the instance in the
  2303. # current transaction it will have been passed as 'original', and there
  2304. # is no point refreshing it. If we have not previously read the
  2305. # instance, we can fetch it here and we will get fresh data.
  2306. if original is None:
  2307. original = _instance_get_by_uuid(context, instance_uuid)
  2308. conflicts_expected = {}
  2309. conflicts_actual = {}
  2310. for (field, expected_values) in expected.items():
  2311. actual = original[field]
  2312. if actual not in expected_values:
  2313. conflicts_expected[field] = expected_values
  2314. conflicts_actual[field] = actual
  2315. # Exception properties
  2316. exc_props = {
  2317. 'instance_uuid': instance_uuid,
  2318. 'expected': conflicts_expected,
  2319. 'actual': conflicts_actual
  2320. }
  2321. # There was a conflict, but something (probably the MySQL read view,
  2322. # but possibly an exceptionally unlikely second race) is preventing us
  2323. # from seeing what it is. When we go round again we'll get a fresh
  2324. # transaction and a fresh read view.
  2325. if len(conflicts_actual) == 0:
  2326. raise exception.UnknownInstanceUpdateConflict(**exc_props)
  2327. # Task state gets special handling for convenience. We raise the
  2328. # specific error UnexpectedDeletingTaskStateError or
  2329. # UnexpectedTaskStateError as appropriate
  2330. if 'task_state' in conflicts_actual:
  2331. conflict_task_state = conflicts_actual['task_state']
  2332. if conflict_task_state == task_states.DELETING:
  2333. exc = exception.UnexpectedDeletingTaskStateError
  2334. else:
  2335. exc = exception.UnexpectedTaskStateError
  2336. # Everything else is an InstanceUpdateConflict
  2337. else:
  2338. exc = exception.InstanceUpdateConflict
  2339. raise exc(**exc_props)
  2340. if metadata is not None:
  2341. _instance_metadata_update_in_place(context, instance_ref,
  2342. 'metadata',
  2343. models.InstanceMetadata,
  2344. metadata)
  2345. if system_metadata is not None:
  2346. _instance_metadata_update_in_place(context, instance_ref,
  2347. 'system_metadata',
  2348. models.InstanceSystemMetadata,
  2349. system_metadata)
  2350. return instance_ref
  2351. @pick_context_manager_writer
  2352. def instance_add_security_group(context, instance_uuid, security_group_id):
  2353. """Associate the given security group with the given instance."""
  2354. sec_group_ref = models.SecurityGroupInstanceAssociation()
  2355. sec_group_ref.update({'instance_uuid': instance_uuid,
  2356. 'security_group_id': security_group_id})
  2357. sec_group_ref.save(context.session)
  2358. @require_context
  2359. @pick_context_manager_writer
  2360. def instance_remove_security_group(context, instance_uuid, security_group_id):
  2361. """Disassociate the given security group from the given instance."""
  2362. model_query(context, models.SecurityGroupInstanceAssociation).\
  2363. filter_by(instance_uuid=instance_uuid).\
  2364. filter_by(security_group_id=security_group_id).\
  2365. soft_delete()
  2366. ###################
  2367. @require_context
  2368. @pick_context_manager_reader
  2369. def instance_info_cache_get(context, instance_uuid):
  2370. """Gets an instance info cache from the table.
  2371. :param instance_uuid: = uuid of the info cache's instance
  2372. """
  2373. return model_query(context, models.InstanceInfoCache).\
  2374. filter_by(instance_uuid=instance_uuid).\
  2375. first()
  2376. @require_context
  2377. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  2378. @pick_context_manager_writer
  2379. def instance_info_cache_update(context, instance_uuid, values):
  2380. """Update an instance info cache record in the table.
  2381. :param instance_uuid: = uuid of info cache's instance
  2382. :param values: = dict containing column values to update
  2383. """
  2384. convert_objects_related_datetimes(values)
  2385. info_cache = model_query(context, models.InstanceInfoCache).\
  2386. filter_by(instance_uuid=instance_uuid).\
  2387. first()
  2388. needs_create = False
  2389. if info_cache and info_cache['deleted']:
  2390. raise exception.InstanceInfoCacheNotFound(
  2391. instance_uuid=instance_uuid)
  2392. elif not info_cache:
  2393. # NOTE(tr3buchet): just in case someone blows away an instance's
  2394. # cache entry, re-create it.
  2395. values['instance_uuid'] = instance_uuid
  2396. info_cache = models.InstanceInfoCache(**values)
  2397. needs_create = True
  2398. try:
  2399. with get_context_manager(context).writer.savepoint.using(context):
  2400. if needs_create:
  2401. info_cache.save(context.session)
  2402. else:
  2403. info_cache.update(values)
  2404. except db_exc.DBDuplicateEntry:
  2405. # NOTE(sirp): Possible race if two greenthreads attempt to
  2406. # recreate the instance cache entry at the same time. First one
  2407. # wins.
  2408. pass
  2409. return info_cache
  2410. @require_context
  2411. @pick_context_manager_writer
  2412. def instance_info_cache_delete(context, instance_uuid):
  2413. """Deletes an existing instance_info_cache record
  2414. :param instance_uuid: = uuid of the instance tied to the cache record
  2415. """
  2416. model_query(context, models.InstanceInfoCache).\
  2417. filter_by(instance_uuid=instance_uuid).\
  2418. soft_delete()
  2419. ###################
  2420. def _instance_extra_create(context, values):
  2421. inst_extra_ref = models.InstanceExtra()
  2422. inst_extra_ref.update(values)
  2423. inst_extra_ref.save(context.session)
  2424. return inst_extra_ref
  2425. @pick_context_manager_writer
  2426. def instance_extra_update_by_uuid(context, instance_uuid, values):
  2427. rows_updated = model_query(context, models.InstanceExtra).\
  2428. filter_by(instance_uuid=instance_uuid).\
  2429. update(values)
  2430. if not rows_updated:
  2431. LOG.debug("Created instance_extra for %s", instance_uuid)
  2432. create_values = copy.copy(values)
  2433. create_values["instance_uuid"] = instance_uuid
  2434. _instance_extra_create(context, create_values)
  2435. rows_updated = 1
  2436. return rows_updated
  2437. @pick_context_manager_reader
  2438. def instance_extra_get_by_instance_uuid(context, instance_uuid,
  2439. columns=None):
  2440. query = model_query(context, models.InstanceExtra).\
  2441. filter_by(instance_uuid=instance_uuid)
  2442. if columns is None:
  2443. columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model',
  2444. 'trusted_certs', 'resources', 'migration_context']
  2445. for column in columns:
  2446. query = query.options(undefer(column))
  2447. instance_extra = query.first()
  2448. return instance_extra
  2449. ###################
  2450. @require_context
  2451. @pick_context_manager_writer
  2452. def key_pair_create(context, values):
  2453. try:
  2454. key_pair_ref = models.KeyPair()
  2455. key_pair_ref.update(values)
  2456. key_pair_ref.save(context.session)
  2457. return key_pair_ref
  2458. except db_exc.DBDuplicateEntry:
  2459. raise exception.KeyPairExists(key_name=values['name'])
  2460. @require_context
  2461. @pick_context_manager_writer
  2462. def key_pair_destroy(context, user_id, name):
  2463. result = model_query(context, models.KeyPair).\
  2464. filter_by(user_id=user_id).\
  2465. filter_by(name=name).\
  2466. soft_delete()
  2467. if not result:
  2468. raise exception.KeypairNotFound(user_id=user_id, name=name)
  2469. @require_context
  2470. @pick_context_manager_reader
  2471. def key_pair_get(context, user_id, name):
  2472. result = model_query(context, models.KeyPair).\
  2473. filter_by(user_id=user_id).\
  2474. filter_by(name=name).\
  2475. first()
  2476. if not result:
  2477. raise exception.KeypairNotFound(user_id=user_id, name=name)
  2478. return result
  2479. @require_context
  2480. @pick_context_manager_reader
  2481. def key_pair_get_all_by_user(context, user_id, limit=None, marker=None):
  2482. marker_row = None
  2483. if marker is not None:
  2484. marker_row = model_query(context, models.KeyPair, read_deleted="no").\
  2485. filter_by(name=marker).filter_by(user_id=user_id).first()
  2486. if not marker_row:
  2487. raise exception.MarkerNotFound(marker=marker)
  2488. query = model_query(context, models.KeyPair, read_deleted="no").\
  2489. filter_by(user_id=user_id)
  2490. query = sqlalchemyutils.paginate_query(
  2491. query, models.KeyPair, limit, ['name'], marker=marker_row)
  2492. return query.all()
  2493. @require_context
  2494. @pick_context_manager_reader
  2495. def key_pair_count_by_user(context, user_id):
  2496. return model_query(context, models.KeyPair, read_deleted="no").\
  2497. filter_by(user_id=user_id).\
  2498. count()
  2499. ###################
  2500. @pick_context_manager_writer
  2501. def network_associate(context, project_id, network_id=None, force=False):
  2502. """Associate a project with a network.
  2503. called by project_get_networks under certain conditions
  2504. and network manager add_network_to_project()
  2505. only associate if the project doesn't already have a network
  2506. or if force is True
  2507. force solves race condition where a fresh project has multiple instance
  2508. builds simultaneously picked up by multiple network hosts which attempt
  2509. to associate the project with multiple networks
  2510. force should only be used as a direct consequence of user request
  2511. all automated requests should not use force
  2512. """
  2513. def network_query(project_filter, id=None):
  2514. filter_kwargs = {'project_id': project_filter}
  2515. if id is not None:
  2516. filter_kwargs['id'] = id
  2517. return model_query(context, models.Network, read_deleted="no").\
  2518. filter_by(**filter_kwargs).\
  2519. with_for_update().\
  2520. first()
  2521. if not force:
  2522. # find out if project has a network
  2523. network_ref = network_query(project_id)
  2524. if force or not network_ref:
  2525. # in force mode or project doesn't have a network so associate
  2526. # with a new network
  2527. # get new network
  2528. network_ref = network_query(None, network_id)
  2529. if not network_ref:
  2530. raise exception.NoMoreNetworks()
  2531. # associate with network
  2532. # NOTE(vish): if with_lockmode isn't supported, as in sqlite,
  2533. # then this has concurrency issues
  2534. network_ref['project_id'] = project_id
  2535. context.session.add(network_ref)
  2536. return network_ref
  2537. def _network_ips_query(context, network_id):
  2538. return model_query(context, models.FixedIp, read_deleted="no").\
  2539. filter_by(network_id=network_id)
  2540. @pick_context_manager_reader
  2541. def network_count_reserved_ips(context, network_id):
  2542. return _network_ips_query(context, network_id).\
  2543. filter_by(reserved=True).\
  2544. count()
  2545. @pick_context_manager_writer
  2546. def network_create_safe(context, values):
  2547. network_ref = models.Network()
  2548. network_ref['uuid'] = uuidutils.generate_uuid()
  2549. network_ref.update(values)
  2550. try:
  2551. network_ref.save(context.session)
  2552. return network_ref
  2553. except db_exc.DBDuplicateEntry:
  2554. raise exception.DuplicateVlan(vlan=values['vlan'])
  2555. @pick_context_manager_writer
  2556. def network_delete_safe(context, network_id):
  2557. result = model_query(context, models.FixedIp, read_deleted="no").\
  2558. filter_by(network_id=network_id).\
  2559. filter_by(allocated=True).\
  2560. count()
  2561. if result != 0:
  2562. raise exception.NetworkInUse(network_id=network_id)
  2563. network_ref = _network_get(context, network_id=network_id)
  2564. model_query(context, models.FixedIp, read_deleted="no").\
  2565. filter_by(network_id=network_id).\
  2566. soft_delete()
  2567. context.session.delete(network_ref)
  2568. @pick_context_manager_writer
  2569. def network_disassociate(context, network_id, disassociate_host,
  2570. disassociate_project):
  2571. net_update = {}
  2572. if disassociate_project:
  2573. net_update['project_id'] = None
  2574. if disassociate_host:
  2575. net_update['host'] = None
  2576. network_update(context, network_id, net_update)
  2577. def _network_get(context, network_id, project_only='allow_none'):
  2578. result = model_query(context, models.Network, project_only=project_only).\
  2579. filter_by(id=network_id).\
  2580. first()
  2581. if not result:
  2582. raise exception.NetworkNotFound(network_id=network_id)
  2583. return result
  2584. @require_context
  2585. @pick_context_manager_reader
  2586. def network_get(context, network_id, project_only='allow_none'):
  2587. return _network_get(context, network_id, project_only=project_only)
  2588. @require_context
  2589. @pick_context_manager_reader
  2590. def network_get_all(context, project_only):
  2591. result = model_query(context, models.Network, read_deleted="no",
  2592. project_only=project_only).all()
  2593. if not result:
  2594. raise exception.NoNetworksFound()
  2595. return result
  2596. @require_context
  2597. @pick_context_manager_reader
  2598. def network_get_all_by_uuids(context, network_uuids, project_only):
  2599. result = model_query(context, models.Network, read_deleted="no",
  2600. project_only=project_only).\
  2601. filter(models.Network.uuid.in_(network_uuids)).\
  2602. all()
  2603. if not result:
  2604. raise exception.NoNetworksFound()
  2605. # check if the result contains all the networks
  2606. # we are looking for
  2607. for network_uuid in network_uuids:
  2608. for network in result:
  2609. if network['uuid'] == network_uuid:
  2610. break
  2611. else:
  2612. if project_only:
  2613. raise exception.NetworkNotFoundForProject(
  2614. network_uuid=network_uuid, project_id=context.project_id)
  2615. raise exception.NetworkNotFound(network_id=network_uuid)
  2616. return result
  2617. def _get_associated_fixed_ips_query(context, network_id, host=None):
  2618. # NOTE(vish): The ugly joins here are to solve a performance issue and
  2619. # should be removed once we can add and remove leases
  2620. # without regenerating the whole list
  2621. vif_and = and_(models.VirtualInterface.id ==
  2622. models.FixedIp.virtual_interface_id,
  2623. models.VirtualInterface.deleted == 0)
  2624. inst_and = and_(models.Instance.uuid == models.FixedIp.instance_uuid,
  2625. models.Instance.deleted == 0)
  2626. # NOTE(vish): This subquery left joins the minimum interface id for each
  2627. # instance. If the join succeeds (i.e. the 11th column is not
  2628. # null), then the fixed ip is on the first interface.
  2629. subq = context.session.query(
  2630. func.min(models.VirtualInterface.id).label("id"),
  2631. models.VirtualInterface.instance_uuid).\
  2632. group_by(models.VirtualInterface.instance_uuid).subquery()
  2633. subq_and = and_(subq.c.id == models.FixedIp.virtual_interface_id,
  2634. subq.c.instance_uuid == models.VirtualInterface.instance_uuid)
  2635. query = context.session.query(
  2636. models.FixedIp.address,
  2637. models.FixedIp.instance_uuid,
  2638. models.FixedIp.network_id,
  2639. models.FixedIp.virtual_interface_id,
  2640. models.VirtualInterface.address,
  2641. models.Instance.hostname,
  2642. models.Instance.updated_at,
  2643. models.Instance.created_at,
  2644. models.FixedIp.allocated,
  2645. models.FixedIp.leased,
  2646. subq.c.id).\
  2647. filter(models.FixedIp.deleted == 0).\
  2648. filter(models.FixedIp.network_id == network_id).\
  2649. join((models.VirtualInterface, vif_and)).\
  2650. join((models.Instance, inst_and)).\
  2651. outerjoin((subq, subq_and)).\
  2652. filter(models.FixedIp.instance_uuid != null()).\
  2653. filter(models.FixedIp.virtual_interface_id != null())
  2654. if host:
  2655. query = query.filter(models.Instance.host == host)
  2656. return query
  2657. @pick_context_manager_reader
  2658. def network_get_associated_fixed_ips(context, network_id, host=None):
  2659. # FIXME(sirp): since this returns fixed_ips, this would be better named
  2660. # fixed_ip_get_all_by_network.
  2661. query = _get_associated_fixed_ips_query(context, network_id, host)
  2662. result = query.all()
  2663. data = []
  2664. for datum in result:
  2665. cleaned = {}
  2666. cleaned['address'] = datum[0]
  2667. cleaned['instance_uuid'] = datum[1]
  2668. cleaned['network_id'] = datum[2]
  2669. cleaned['vif_id'] = datum[3]
  2670. cleaned['vif_address'] = datum[4]
  2671. cleaned['instance_hostname'] = datum[5]
  2672. cleaned['instance_updated'] = datum[6]
  2673. cleaned['instance_created'] = datum[7]
  2674. cleaned['allocated'] = datum[8]
  2675. cleaned['leased'] = datum[9]
  2676. # NOTE(vish): default_route is True if this fixed ip is on the first
  2677. # interface its instance.
  2678. cleaned['default_route'] = datum[10] is not None
  2679. data.append(cleaned)
  2680. return data
  2681. @pick_context_manager_reader
  2682. def network_in_use_on_host(context, network_id, host):
  2683. query = _get_associated_fixed_ips_query(context, network_id, host)
  2684. return query.count() > 0
  2685. def _network_get_query(context):
  2686. return model_query(context, models.Network, read_deleted="no")
  2687. @pick_context_manager_reader
  2688. def network_get_by_uuid(context, uuid):
  2689. result = _network_get_query(context).filter_by(uuid=uuid).first()
  2690. if not result:
  2691. raise exception.NetworkNotFoundForUUID(uuid=uuid)
  2692. return result
  2693. @pick_context_manager_reader
  2694. def network_get_by_cidr(context, cidr):
  2695. result = _network_get_query(context).\
  2696. filter(or_(models.Network.cidr == cidr,
  2697. models.Network.cidr_v6 == cidr)).\
  2698. first()
  2699. if not result:
  2700. raise exception.NetworkNotFoundForCidr(cidr=cidr)
  2701. return result
  2702. @pick_context_manager_reader
  2703. def network_get_all_by_host(context, host):
  2704. fixed_host_filter = or_(models.FixedIp.host == host,
  2705. and_(models.FixedIp.instance_uuid != null(),
  2706. models.Instance.host == host))
  2707. fixed_ip_query = model_query(context, models.FixedIp,
  2708. (models.FixedIp.network_id,)).\
  2709. outerjoin((models.Instance,
  2710. models.Instance.uuid ==
  2711. models.FixedIp.instance_uuid)).\
  2712. filter(fixed_host_filter)
  2713. # NOTE(vish): return networks that have host set
  2714. # or that have a fixed ip with host set
  2715. # or that have an instance with host set
  2716. host_filter = or_(models.Network.host == host,
  2717. models.Network.id.in_(fixed_ip_query.subquery()))
  2718. return _network_get_query(context).filter(host_filter).all()
  2719. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  2720. @pick_context_manager_writer
  2721. def network_set_host(context, network_id, host_id):
  2722. network_ref = _network_get_query(context).\
  2723. filter_by(id=network_id).\
  2724. first()
  2725. if not network_ref:
  2726. raise exception.NetworkNotFound(network_id=network_id)
  2727. if network_ref.host:
  2728. return None
  2729. rows_updated = _network_get_query(context).\
  2730. filter_by(id=network_id).\
  2731. filter_by(host=None).\
  2732. update({'host': host_id})
  2733. if not rows_updated:
  2734. LOG.debug('The row was updated in a concurrent transaction, '
  2735. 'we will fetch another row')
  2736. raise db_exc.RetryRequest(
  2737. exception.NetworkSetHostFailed(network_id=network_id))
  2738. @require_context
  2739. @pick_context_manager_writer
  2740. def network_update(context, network_id, values):
  2741. network_ref = _network_get(context, network_id)
  2742. network_ref.update(values)
  2743. try:
  2744. network_ref.save(context.session)
  2745. except db_exc.DBDuplicateEntry:
  2746. raise exception.DuplicateVlan(vlan=values['vlan'])
  2747. return network_ref
  2748. ###################
  2749. @require_context
  2750. @pick_context_manager_reader
  2751. def quota_get(context, project_id, resource, user_id=None):
  2752. model = models.ProjectUserQuota if user_id else models.Quota
  2753. query = model_query(context, model).\
  2754. filter_by(project_id=project_id).\
  2755. filter_by(resource=resource)
  2756. if user_id:
  2757. query = query.filter_by(user_id=user_id)
  2758. result = query.first()
  2759. if not result:
  2760. if user_id:
  2761. raise exception.ProjectUserQuotaNotFound(project_id=project_id,
  2762. user_id=user_id)
  2763. else:
  2764. raise exception.ProjectQuotaNotFound(project_id=project_id)
  2765. return result
  2766. @require_context
  2767. @pick_context_manager_reader
  2768. def quota_get_all_by_project_and_user(context, project_id, user_id):
  2769. user_quotas = model_query(context, models.ProjectUserQuota,
  2770. (models.ProjectUserQuota.resource,
  2771. models.ProjectUserQuota.hard_limit)).\
  2772. filter_by(project_id=project_id).\
  2773. filter_by(user_id=user_id).\
  2774. all()
  2775. result = {'project_id': project_id, 'user_id': user_id}
  2776. for user_quota in user_quotas:
  2777. result[user_quota.resource] = user_quota.hard_limit
  2778. return result
  2779. @require_context
  2780. @pick_context_manager_reader
  2781. def quota_get_all_by_project(context, project_id):
  2782. rows = model_query(context, models.Quota, read_deleted="no").\
  2783. filter_by(project_id=project_id).\
  2784. all()
  2785. result = {'project_id': project_id}
  2786. for row in rows:
  2787. result[row.resource] = row.hard_limit
  2788. return result
  2789. @require_context
  2790. @pick_context_manager_reader
  2791. def quota_get_all(context, project_id):
  2792. result = model_query(context, models.ProjectUserQuota).\
  2793. filter_by(project_id=project_id).\
  2794. all()
  2795. return result
  2796. def quota_get_per_project_resources():
  2797. return PER_PROJECT_QUOTAS
  2798. @pick_context_manager_writer
  2799. def quota_create(context, project_id, resource, limit, user_id=None):
  2800. per_user = user_id and resource not in PER_PROJECT_QUOTAS
  2801. quota_ref = models.ProjectUserQuota() if per_user else models.Quota()
  2802. if per_user:
  2803. quota_ref.user_id = user_id
  2804. quota_ref.project_id = project_id
  2805. quota_ref.resource = resource
  2806. quota_ref.hard_limit = limit
  2807. try:
  2808. quota_ref.save(context.session)
  2809. except db_exc.DBDuplicateEntry:
  2810. raise exception.QuotaExists(project_id=project_id, resource=resource)
  2811. return quota_ref
  2812. @pick_context_manager_writer
  2813. def quota_update(context, project_id, resource, limit, user_id=None):
  2814. per_user = user_id and resource not in PER_PROJECT_QUOTAS
  2815. model = models.ProjectUserQuota if per_user else models.Quota
  2816. query = model_query(context, model).\
  2817. filter_by(project_id=project_id).\
  2818. filter_by(resource=resource)
  2819. if per_user:
  2820. query = query.filter_by(user_id=user_id)
  2821. result = query.update({'hard_limit': limit})
  2822. if not result:
  2823. if per_user:
  2824. raise exception.ProjectUserQuotaNotFound(project_id=project_id,
  2825. user_id=user_id)
  2826. else:
  2827. raise exception.ProjectQuotaNotFound(project_id=project_id)
  2828. ###################
  2829. @require_context
  2830. @pick_context_manager_reader
  2831. def quota_class_get(context, class_name, resource):
  2832. result = model_query(context, models.QuotaClass, read_deleted="no").\
  2833. filter_by(class_name=class_name).\
  2834. filter_by(resource=resource).\
  2835. first()
  2836. if not result:
  2837. raise exception.QuotaClassNotFound(class_name=class_name)
  2838. return result
  2839. @pick_context_manager_reader
  2840. def quota_class_get_default(context):
  2841. rows = model_query(context, models.QuotaClass, read_deleted="no").\
  2842. filter_by(class_name=_DEFAULT_QUOTA_NAME).\
  2843. all()
  2844. result = {'class_name': _DEFAULT_QUOTA_NAME}
  2845. for row in rows:
  2846. result[row.resource] = row.hard_limit
  2847. return result
  2848. @require_context
  2849. @pick_context_manager_reader
  2850. def quota_class_get_all_by_name(context, class_name):
  2851. rows = model_query(context, models.QuotaClass, read_deleted="no").\
  2852. filter_by(class_name=class_name).\
  2853. all()
  2854. result = {'class_name': class_name}
  2855. for row in rows:
  2856. result[row.resource] = row.hard_limit
  2857. return result
  2858. @pick_context_manager_writer
  2859. def quota_class_create(context, class_name, resource, limit):
  2860. quota_class_ref = models.QuotaClass()
  2861. quota_class_ref.class_name = class_name
  2862. quota_class_ref.resource = resource
  2863. quota_class_ref.hard_limit = limit
  2864. quota_class_ref.save(context.session)
  2865. return quota_class_ref
  2866. @pick_context_manager_writer
  2867. def quota_class_update(context, class_name, resource, limit):
  2868. result = model_query(context, models.QuotaClass, read_deleted="no").\
  2869. filter_by(class_name=class_name).\
  2870. filter_by(resource=resource).\
  2871. update({'hard_limit': limit})
  2872. if not result:
  2873. raise exception.QuotaClassNotFound(class_name=class_name)
  2874. ###################
  2875. @pick_context_manager_writer
  2876. def quota_destroy_all_by_project_and_user(context, project_id, user_id):
  2877. model_query(context, models.ProjectUserQuota, read_deleted="no").\
  2878. filter_by(project_id=project_id).\
  2879. filter_by(user_id=user_id).\
  2880. soft_delete(synchronize_session=False)
  2881. @pick_context_manager_writer
  2882. def quota_destroy_all_by_project(context, project_id):
  2883. model_query(context, models.Quota, read_deleted="no").\
  2884. filter_by(project_id=project_id).\
  2885. soft_delete(synchronize_session=False)
  2886. model_query(context, models.ProjectUserQuota, read_deleted="no").\
  2887. filter_by(project_id=project_id).\
  2888. soft_delete(synchronize_session=False)
  2889. ###################
  2890. def _ec2_volume_get_query(context):
  2891. return model_query(context, models.VolumeIdMapping, read_deleted='yes')
  2892. def _ec2_snapshot_get_query(context):
  2893. return model_query(context, models.SnapshotIdMapping, read_deleted='yes')
  2894. @require_context
  2895. @pick_context_manager_writer
  2896. def ec2_volume_create(context, volume_uuid, id=None):
  2897. """Create ec2 compatible volume by provided uuid."""
  2898. ec2_volume_ref = models.VolumeIdMapping()
  2899. ec2_volume_ref.update({'uuid': volume_uuid})
  2900. if id is not None:
  2901. ec2_volume_ref.update({'id': id})
  2902. ec2_volume_ref.save(context.session)
  2903. return ec2_volume_ref
  2904. @require_context
  2905. @pick_context_manager_reader
  2906. def ec2_volume_get_by_uuid(context, volume_uuid):
  2907. result = _ec2_volume_get_query(context).\
  2908. filter_by(uuid=volume_uuid).\
  2909. first()
  2910. if not result:
  2911. raise exception.VolumeNotFound(volume_id=volume_uuid)
  2912. return result
  2913. @require_context
  2914. @pick_context_manager_reader
  2915. def ec2_volume_get_by_id(context, volume_id):
  2916. result = _ec2_volume_get_query(context).\
  2917. filter_by(id=volume_id).\
  2918. first()
  2919. if not result:
  2920. raise exception.VolumeNotFound(volume_id=volume_id)
  2921. return result
  2922. @require_context
  2923. @pick_context_manager_writer
  2924. def ec2_snapshot_create(context, snapshot_uuid, id=None):
  2925. """Create ec2 compatible snapshot by provided uuid."""
  2926. ec2_snapshot_ref = models.SnapshotIdMapping()
  2927. ec2_snapshot_ref.update({'uuid': snapshot_uuid})
  2928. if id is not None:
  2929. ec2_snapshot_ref.update({'id': id})
  2930. ec2_snapshot_ref.save(context.session)
  2931. return ec2_snapshot_ref
  2932. @require_context
  2933. @pick_context_manager_reader
  2934. def ec2_snapshot_get_by_ec2_id(context, ec2_id):
  2935. result = _ec2_snapshot_get_query(context).\
  2936. filter_by(id=ec2_id).\
  2937. first()
  2938. if not result:
  2939. raise exception.SnapshotNotFound(snapshot_id=ec2_id)
  2940. return result
  2941. @require_context
  2942. @pick_context_manager_reader
  2943. def ec2_snapshot_get_by_uuid(context, snapshot_uuid):
  2944. result = _ec2_snapshot_get_query(context).\
  2945. filter_by(uuid=snapshot_uuid).\
  2946. first()
  2947. if not result:
  2948. raise exception.SnapshotNotFound(snapshot_id=snapshot_uuid)
  2949. return result
  2950. ###################
  2951. def _block_device_mapping_get_query(context, columns_to_join=None):
  2952. if columns_to_join is None:
  2953. columns_to_join = []
  2954. query = model_query(context, models.BlockDeviceMapping)
  2955. for column in columns_to_join:
  2956. query = query.options(joinedload(column))
  2957. return query
  2958. def _scrub_empty_str_values(dct, keys_to_scrub):
  2959. """Remove any keys found in sequence keys_to_scrub from the dict
  2960. if they have the value ''.
  2961. """
  2962. for key in keys_to_scrub:
  2963. if key in dct and dct[key] == '':
  2964. del dct[key]
  2965. def _from_legacy_values(values, legacy, allow_updates=False):
  2966. if legacy:
  2967. if allow_updates and block_device.is_safe_for_update(values):
  2968. return values
  2969. else:
  2970. return block_device.BlockDeviceDict.from_legacy(values)
  2971. else:
  2972. return values
  2973. def _set_or_validate_uuid(values):
  2974. uuid = values.get('uuid')
  2975. # values doesn't contain uuid, or it's blank
  2976. if not uuid:
  2977. values['uuid'] = uuidutils.generate_uuid()
  2978. # values contains a uuid
  2979. else:
  2980. if not uuidutils.is_uuid_like(uuid):
  2981. raise exception.InvalidUUID(uuid=uuid)
  2982. @require_context
  2983. @pick_context_manager_writer
  2984. def block_device_mapping_create(context, values, legacy=True):
  2985. _scrub_empty_str_values(values, ['volume_size'])
  2986. values = _from_legacy_values(values, legacy)
  2987. convert_objects_related_datetimes(values)
  2988. _set_or_validate_uuid(values)
  2989. bdm_ref = models.BlockDeviceMapping()
  2990. bdm_ref.update(values)
  2991. bdm_ref.save(context.session)
  2992. return bdm_ref
  2993. @require_context
  2994. @pick_context_manager_writer
  2995. def block_device_mapping_update(context, bdm_id, values, legacy=True):
  2996. _scrub_empty_str_values(values, ['volume_size'])
  2997. values = _from_legacy_values(values, legacy, allow_updates=True)
  2998. convert_objects_related_datetimes(values)
  2999. query = _block_device_mapping_get_query(context).filter_by(id=bdm_id)
  3000. query.update(values)
  3001. return query.first()
  3002. @pick_context_manager_writer
  3003. def block_device_mapping_update_or_create(context, values, legacy=True):
  3004. # TODO(mdbooth): Remove this method entirely. Callers should know whether
  3005. # they require update or create, and call the appropriate method.
  3006. _scrub_empty_str_values(values, ['volume_size'])
  3007. values = _from_legacy_values(values, legacy, allow_updates=True)
  3008. convert_objects_related_datetimes(values)
  3009. result = None
  3010. # NOTE(xqueralt,danms): Only update a BDM when device_name or
  3011. # uuid was provided. Prefer the uuid, if available, but fall
  3012. # back to device_name if no uuid is provided, which can happen
  3013. # for BDMs created before we had a uuid. We allow empty device
  3014. # names so they will be set later by the manager.
  3015. if 'uuid' in values:
  3016. query = _block_device_mapping_get_query(context)
  3017. result = query.filter_by(instance_uuid=values['instance_uuid'],
  3018. uuid=values['uuid']).one_or_none()
  3019. if not result and values['device_name']:
  3020. query = _block_device_mapping_get_query(context)
  3021. result = query.filter_by(instance_uuid=values['instance_uuid'],
  3022. device_name=values['device_name']).first()
  3023. if result:
  3024. result.update(values)
  3025. else:
  3026. # Either the device_name or uuid doesn't exist in the database yet, or
  3027. # neither was provided. Both cases mean creating a new BDM.
  3028. _set_or_validate_uuid(values)
  3029. result = models.BlockDeviceMapping(**values)
  3030. result.save(context.session)
  3031. # NOTE(xqueralt): Prevent from having multiple swap devices for the
  3032. # same instance. This will delete all the existing ones.
  3033. if block_device.new_format_is_swap(values):
  3034. query = _block_device_mapping_get_query(context)
  3035. query = query.filter_by(instance_uuid=values['instance_uuid'],
  3036. source_type='blank', guest_format='swap')
  3037. query = query.filter(models.BlockDeviceMapping.id != result.id)
  3038. query.soft_delete()
  3039. return result
  3040. @require_context
  3041. @pick_context_manager_reader_allow_async
  3042. def block_device_mapping_get_all_by_instance_uuids(context, instance_uuids):
  3043. if not instance_uuids:
  3044. return []
  3045. return _block_device_mapping_get_query(context).filter(
  3046. models.BlockDeviceMapping.instance_uuid.in_(instance_uuids)).all()
  3047. @require_context
  3048. @pick_context_manager_reader_allow_async
  3049. def block_device_mapping_get_all_by_instance(context, instance_uuid):
  3050. return _block_device_mapping_get_query(context).\
  3051. filter_by(instance_uuid=instance_uuid).\
  3052. all()
  3053. @require_context
  3054. @pick_context_manager_reader
  3055. def block_device_mapping_get_all_by_volume_id(context, volume_id,
  3056. columns_to_join=None):
  3057. return _block_device_mapping_get_query(context,
  3058. columns_to_join=columns_to_join).\
  3059. filter_by(volume_id=volume_id).\
  3060. all()
  3061. @require_context
  3062. @pick_context_manager_reader
  3063. def block_device_mapping_get_by_instance_and_volume_id(context, volume_id,
  3064. instance_uuid,
  3065. columns_to_join=None):
  3066. return _block_device_mapping_get_query(context,
  3067. columns_to_join=columns_to_join).\
  3068. filter_by(volume_id=volume_id).\
  3069. filter_by(instance_uuid=instance_uuid).\
  3070. first()
  3071. @require_context
  3072. @pick_context_manager_writer
  3073. def block_device_mapping_destroy(context, bdm_id):
  3074. _block_device_mapping_get_query(context).\
  3075. filter_by(id=bdm_id).\
  3076. soft_delete()
  3077. @require_context
  3078. @pick_context_manager_writer
  3079. def block_device_mapping_destroy_by_instance_and_volume(context, instance_uuid,
  3080. volume_id):
  3081. _block_device_mapping_get_query(context).\
  3082. filter_by(instance_uuid=instance_uuid).\
  3083. filter_by(volume_id=volume_id).\
  3084. soft_delete()
  3085. @require_context
  3086. @pick_context_manager_writer
  3087. def block_device_mapping_destroy_by_instance_and_device(context, instance_uuid,
  3088. device_name):
  3089. _block_device_mapping_get_query(context).\
  3090. filter_by(instance_uuid=instance_uuid).\
  3091. filter_by(device_name=device_name).\
  3092. soft_delete()
  3093. ###################
  3094. @require_context
  3095. @pick_context_manager_writer
  3096. def security_group_create(context, values):
  3097. security_group_ref = models.SecurityGroup()
  3098. # FIXME(devcamcar): Unless I do this, rules fails with lazy load exception
  3099. # once save() is called. This will get cleaned up in next orm pass.
  3100. security_group_ref.rules = []
  3101. security_group_ref.update(values)
  3102. try:
  3103. with get_context_manager(context).writer.savepoint.using(context):
  3104. security_group_ref.save(context.session)
  3105. except db_exc.DBDuplicateEntry:
  3106. raise exception.SecurityGroupExists(
  3107. project_id=values['project_id'],
  3108. security_group_name=values['name'])
  3109. return security_group_ref
  3110. def _security_group_get_query(context, read_deleted=None,
  3111. project_only=False, join_rules=True):
  3112. query = model_query(context, models.SecurityGroup,
  3113. read_deleted=read_deleted, project_only=project_only)
  3114. if join_rules:
  3115. query = query.options(_joinedload_all('rules.grantee_group'))
  3116. return query
  3117. def _security_group_get_by_names(context, group_names):
  3118. """Get security group models for a project by a list of names.
  3119. Raise SecurityGroupNotFoundForProject for a name not found.
  3120. """
  3121. query = _security_group_get_query(context, read_deleted="no",
  3122. join_rules=False).\
  3123. filter_by(project_id=context.project_id).\
  3124. filter(models.SecurityGroup.name.in_(group_names))
  3125. sg_models = query.all()
  3126. if len(sg_models) == len(group_names):
  3127. return sg_models
  3128. # Find the first one missing and raise
  3129. group_names_from_models = [x.name for x in sg_models]
  3130. for group_name in group_names:
  3131. if group_name not in group_names_from_models:
  3132. raise exception.SecurityGroupNotFoundForProject(
  3133. project_id=context.project_id, security_group_id=group_name)
  3134. # Not Reached
  3135. @require_context
  3136. @pick_context_manager_reader
  3137. def security_group_get_all(context):
  3138. return _security_group_get_query(context).all()
  3139. @require_context
  3140. @pick_context_manager_reader
  3141. def security_group_get(context, security_group_id, columns_to_join=None):
  3142. join_rules = columns_to_join and 'rules' in columns_to_join
  3143. if join_rules:
  3144. columns_to_join.remove('rules')
  3145. query = _security_group_get_query(context, project_only=True,
  3146. join_rules=join_rules).\
  3147. filter_by(id=security_group_id)
  3148. if columns_to_join is None:
  3149. columns_to_join = []
  3150. for column in columns_to_join:
  3151. if column.startswith('instances'):
  3152. query = query.options(_joinedload_all(column))
  3153. result = query.first()
  3154. if not result:
  3155. raise exception.SecurityGroupNotFound(
  3156. security_group_id=security_group_id)
  3157. return result
  3158. @require_context
  3159. @pick_context_manager_reader
  3160. def security_group_get_by_name(context, project_id, group_name,
  3161. columns_to_join=None):
  3162. query = _security_group_get_query(context,
  3163. read_deleted="no", join_rules=False).\
  3164. filter_by(project_id=project_id).\
  3165. filter_by(name=group_name)
  3166. if columns_to_join is None:
  3167. columns_to_join = ['instances', 'rules.grantee_group']
  3168. for column in columns_to_join:
  3169. query = query.options(_joinedload_all(column))
  3170. result = query.first()
  3171. if not result:
  3172. raise exception.SecurityGroupNotFoundForProject(
  3173. project_id=project_id, security_group_id=group_name)
  3174. return result
  3175. @require_context
  3176. @pick_context_manager_reader
  3177. def security_group_get_by_project(context, project_id):
  3178. return _security_group_get_query(context, read_deleted="no").\
  3179. filter_by(project_id=project_id).\
  3180. all()
  3181. @require_context
  3182. @pick_context_manager_reader
  3183. def security_group_get_by_instance(context, instance_uuid):
  3184. return _security_group_get_query(context, read_deleted="no").\
  3185. join(models.SecurityGroup.instances).\
  3186. filter_by(uuid=instance_uuid).\
  3187. all()
  3188. @require_context
  3189. @pick_context_manager_reader
  3190. def security_group_in_use(context, group_id):
  3191. # Are there any instances that haven't been deleted
  3192. # that include this group?
  3193. inst_assoc = model_query(context,
  3194. models.SecurityGroupInstanceAssociation,
  3195. read_deleted="no").\
  3196. filter_by(security_group_id=group_id).\
  3197. all()
  3198. for ia in inst_assoc:
  3199. num_instances = model_query(context, models.Instance,
  3200. read_deleted="no").\
  3201. filter_by(uuid=ia.instance_uuid).\
  3202. count()
  3203. if num_instances:
  3204. return True
  3205. return False
  3206. @require_context
  3207. @pick_context_manager_writer
  3208. def security_group_update(context, security_group_id, values,
  3209. columns_to_join=None):
  3210. query = model_query(context, models.SecurityGroup).filter_by(
  3211. id=security_group_id)
  3212. if columns_to_join:
  3213. for column in columns_to_join:
  3214. query = query.options(_joinedload_all(column))
  3215. security_group_ref = query.first()
  3216. if not security_group_ref:
  3217. raise exception.SecurityGroupNotFound(
  3218. security_group_id=security_group_id)
  3219. security_group_ref.update(values)
  3220. name = security_group_ref['name']
  3221. project_id = security_group_ref['project_id']
  3222. try:
  3223. security_group_ref.save(context.session)
  3224. except db_exc.DBDuplicateEntry:
  3225. raise exception.SecurityGroupExists(
  3226. project_id=project_id,
  3227. security_group_name=name)
  3228. return security_group_ref
  3229. def security_group_ensure_default(context):
  3230. """Ensure default security group exists for a project_id."""
  3231. try:
  3232. # NOTE(rpodolyaka): create the default security group, if it doesn't
  3233. # exist. This must be done in a separate transaction, so that
  3234. # this one is not aborted in case a concurrent one succeeds first
  3235. # and the unique constraint for security group names is violated
  3236. # by a concurrent INSERT
  3237. with get_context_manager(context).writer.independent.using(context):
  3238. return _security_group_ensure_default(context)
  3239. except exception.SecurityGroupExists:
  3240. # NOTE(rpodolyaka): a concurrent transaction has succeeded first,
  3241. # suppress the error and proceed
  3242. return security_group_get_by_name(context, context.project_id,
  3243. 'default')
  3244. @pick_context_manager_writer
  3245. def _security_group_ensure_default(context):
  3246. try:
  3247. default_group = _security_group_get_by_names(context, ['default'])[0]
  3248. except exception.NotFound:
  3249. values = {'name': 'default',
  3250. 'description': 'default',
  3251. 'user_id': context.user_id,
  3252. 'project_id': context.project_id}
  3253. default_group = security_group_create(context, values)
  3254. return default_group
  3255. @require_context
  3256. @pick_context_manager_writer
  3257. def security_group_destroy(context, security_group_id):
  3258. model_query(context, models.SecurityGroup).\
  3259. filter_by(id=security_group_id).\
  3260. soft_delete()
  3261. model_query(context, models.SecurityGroupInstanceAssociation).\
  3262. filter_by(security_group_id=security_group_id).\
  3263. soft_delete()
  3264. model_query(context, models.SecurityGroupIngressRule).\
  3265. filter_by(group_id=security_group_id).\
  3266. soft_delete()
  3267. model_query(context, models.SecurityGroupIngressRule).\
  3268. filter_by(parent_group_id=security_group_id).\
  3269. soft_delete()
  3270. ###################
  3271. def _security_group_rule_create(context, values):
  3272. security_group_rule_ref = models.SecurityGroupIngressRule()
  3273. security_group_rule_ref.update(values)
  3274. security_group_rule_ref.save(context.session)
  3275. return security_group_rule_ref
  3276. def _security_group_rule_get_query(context):
  3277. return model_query(context, models.SecurityGroupIngressRule)
  3278. @require_context
  3279. @pick_context_manager_reader
  3280. def security_group_rule_get(context, security_group_rule_id):
  3281. result = (_security_group_rule_get_query(context).
  3282. filter_by(id=security_group_rule_id).
  3283. first())
  3284. if not result:
  3285. raise exception.SecurityGroupNotFoundForRule(
  3286. rule_id=security_group_rule_id)
  3287. return result
  3288. @require_context
  3289. @pick_context_manager_reader
  3290. def security_group_rule_get_by_security_group(context, security_group_id,
  3291. columns_to_join=None):
  3292. if columns_to_join is None:
  3293. columns_to_join = ['grantee_group.instances.system_metadata',
  3294. 'grantee_group.instances.info_cache']
  3295. query = (_security_group_rule_get_query(context).
  3296. filter_by(parent_group_id=security_group_id))
  3297. for column in columns_to_join:
  3298. query = query.options(_joinedload_all(column))
  3299. return query.all()
  3300. @require_context
  3301. @pick_context_manager_reader
  3302. def security_group_rule_get_by_instance(context, instance_uuid):
  3303. return (_security_group_rule_get_query(context).
  3304. join('parent_group', 'instances').
  3305. filter_by(uuid=instance_uuid).
  3306. options(joinedload('grantee_group')).
  3307. all())
  3308. @require_context
  3309. @pick_context_manager_writer
  3310. def security_group_rule_create(context, values):
  3311. return _security_group_rule_create(context, values)
  3312. @require_context
  3313. @pick_context_manager_writer
  3314. def security_group_rule_destroy(context, security_group_rule_id):
  3315. count = (_security_group_rule_get_query(context).
  3316. filter_by(id=security_group_rule_id).
  3317. soft_delete())
  3318. if count == 0:
  3319. raise exception.SecurityGroupNotFoundForRule(
  3320. rule_id=security_group_rule_id)
  3321. @require_context
  3322. @pick_context_manager_reader
  3323. def security_group_rule_count_by_group(context, security_group_id):
  3324. return (model_query(context, models.SecurityGroupIngressRule,
  3325. read_deleted="no").
  3326. filter_by(parent_group_id=security_group_id).
  3327. count())
  3328. ###################
  3329. @pick_context_manager_writer
  3330. def provider_fw_rule_create(context, rule):
  3331. fw_rule_ref = models.ProviderFirewallRule()
  3332. fw_rule_ref.update(rule)
  3333. fw_rule_ref.save(context.session)
  3334. return fw_rule_ref
  3335. @pick_context_manager_reader
  3336. def provider_fw_rule_get_all(context):
  3337. return model_query(context, models.ProviderFirewallRule).all()
  3338. @pick_context_manager_writer
  3339. def provider_fw_rule_destroy(context, rule_id):
  3340. context.session.query(models.ProviderFirewallRule).\
  3341. filter_by(id=rule_id).\
  3342. soft_delete()
  3343. ###################
  3344. @require_context
  3345. @pick_context_manager_writer
  3346. def project_get_networks(context, project_id, associate=True):
  3347. # NOTE(tr3buchet): as before this function will associate
  3348. # a project with a network if it doesn't have one and
  3349. # associate is true
  3350. result = model_query(context, models.Network, read_deleted="no").\
  3351. filter_by(project_id=project_id).\
  3352. all()
  3353. if not result:
  3354. if not associate:
  3355. return []
  3356. return [network_associate(context, project_id)]
  3357. return result
  3358. ###################
  3359. @pick_context_manager_writer
  3360. def migration_create(context, values):
  3361. migration = models.Migration()
  3362. migration.update(values)
  3363. migration.save(context.session)
  3364. return migration
  3365. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  3366. @pick_context_manager_writer
  3367. def migration_update(context, id, values):
  3368. migration = migration_get(context, id)
  3369. migration.update(values)
  3370. return migration
  3371. @pick_context_manager_reader
  3372. def migration_get(context, id):
  3373. result = model_query(context, models.Migration, read_deleted="yes").\
  3374. filter_by(id=id).\
  3375. first()
  3376. if not result:
  3377. raise exception.MigrationNotFound(migration_id=id)
  3378. return result
  3379. @pick_context_manager_reader
  3380. def migration_get_by_uuid(context, migration_uuid):
  3381. result = model_query(context, models.Migration, read_deleted="yes").\
  3382. filter_by(uuid=migration_uuid).\
  3383. first()
  3384. if not result:
  3385. raise exception.MigrationNotFound(migration_id=migration_uuid)
  3386. return result
  3387. @pick_context_manager_reader
  3388. def migration_get_by_id_and_instance(context, id, instance_uuid):
  3389. result = model_query(context, models.Migration).\
  3390. filter_by(id=id).\
  3391. filter_by(instance_uuid=instance_uuid).\
  3392. first()
  3393. if not result:
  3394. raise exception.MigrationNotFoundForInstance(migration_id=id,
  3395. instance_id=instance_uuid)
  3396. return result
  3397. @pick_context_manager_reader
  3398. def migration_get_by_instance_and_status(context, instance_uuid, status):
  3399. result = model_query(context, models.Migration, read_deleted="yes").\
  3400. filter_by(instance_uuid=instance_uuid).\
  3401. filter_by(status=status).\
  3402. first()
  3403. if not result:
  3404. raise exception.MigrationNotFoundByStatus(instance_id=instance_uuid,
  3405. status=status)
  3406. return result
  3407. @pick_context_manager_reader_allow_async
  3408. def migration_get_unconfirmed_by_dest_compute(context, confirm_window,
  3409. dest_compute):
  3410. confirm_window = (timeutils.utcnow() -
  3411. datetime.timedelta(seconds=confirm_window))
  3412. return model_query(context, models.Migration, read_deleted="yes").\
  3413. filter(models.Migration.updated_at <= confirm_window).\
  3414. filter_by(status="finished").\
  3415. filter_by(dest_compute=dest_compute).\
  3416. all()
  3417. @pick_context_manager_reader
  3418. def migration_get_in_progress_by_host_and_node(context, host, node):
  3419. # TODO(mriedem): Tracking what various code flows set for
  3420. # migration status is nutty, since it happens all over the place
  3421. # and several of the statuses are redundant (done and completed).
  3422. # We need to define these in an enum somewhere and just update
  3423. # that one central place that defines what "in progress" means.
  3424. # NOTE(mriedem): The 'finished' status is not in this list because
  3425. # 'finished' means a resize is finished on the destination host
  3426. # and the instance is in VERIFY_RESIZE state, so the end state
  3427. # for a resize is actually 'confirmed' or 'reverted'.
  3428. return model_query(context, models.Migration).\
  3429. filter(or_(and_(models.Migration.source_compute == host,
  3430. models.Migration.source_node == node),
  3431. and_(models.Migration.dest_compute == host,
  3432. models.Migration.dest_node == node))).\
  3433. filter(~models.Migration.status.in_(['confirmed', 'reverted',
  3434. 'error', 'failed',
  3435. 'completed', 'cancelled',
  3436. 'done'])).\
  3437. options(_joinedload_all('instance.system_metadata')).\
  3438. all()
  3439. @pick_context_manager_reader
  3440. def migration_get_in_progress_by_instance(context, instance_uuid,
  3441. migration_type=None):
  3442. # TODO(Shaohe Feng) we should share the in-progress list.
  3443. # TODO(Shaohe Feng) will also summarize all status to a new
  3444. # MigrationStatus class.
  3445. query = model_query(context, models.Migration).\
  3446. filter_by(instance_uuid=instance_uuid).\
  3447. filter(models.Migration.status.in_(['queued', 'preparing',
  3448. 'running',
  3449. 'post-migrating']))
  3450. if migration_type:
  3451. query = query.filter(models.Migration.migration_type == migration_type)
  3452. return query.all()
  3453. @pick_context_manager_reader
  3454. def migration_get_all_by_filters(context, filters,
  3455. sort_keys=None, sort_dirs=None,
  3456. limit=None, marker=None):
  3457. if limit == 0:
  3458. return []
  3459. query = model_query(context, models.Migration)
  3460. if "uuid" in filters:
  3461. # The uuid filter is here for the MigrationLister and multi-cell
  3462. # paging support in the compute API.
  3463. uuid = filters["uuid"]
  3464. uuid = [uuid] if isinstance(uuid, six.string_types) else uuid
  3465. query = query.filter(models.Migration.uuid.in_(uuid))
  3466. model_object = models.Migration
  3467. query = _get_query_nova_resource_by_changes_time(query,
  3468. filters,
  3469. model_object)
  3470. if "status" in filters:
  3471. status = filters["status"]
  3472. status = [status] if isinstance(status, six.string_types) else status
  3473. query = query.filter(models.Migration.status.in_(status))
  3474. if "host" in filters:
  3475. host = filters["host"]
  3476. query = query.filter(or_(models.Migration.source_compute == host,
  3477. models.Migration.dest_compute == host))
  3478. elif "source_compute" in filters:
  3479. host = filters['source_compute']
  3480. query = query.filter(models.Migration.source_compute == host)
  3481. if "migration_type" in filters:
  3482. migtype = filters["migration_type"]
  3483. query = query.filter(models.Migration.migration_type == migtype)
  3484. if "hidden" in filters:
  3485. hidden = filters["hidden"]
  3486. query = query.filter(models.Migration.hidden == hidden)
  3487. if "instance_uuid" in filters:
  3488. instance_uuid = filters["instance_uuid"]
  3489. query = query.filter(models.Migration.instance_uuid == instance_uuid)
  3490. if 'user_id' in filters:
  3491. user_id = filters['user_id']
  3492. query = query.filter(models.Migration.user_id == user_id)
  3493. if 'project_id' in filters:
  3494. project_id = filters['project_id']
  3495. query = query.filter(models.Migration.project_id == project_id)
  3496. if marker:
  3497. try:
  3498. marker = migration_get_by_uuid(context, marker)
  3499. except exception.MigrationNotFound:
  3500. raise exception.MarkerNotFound(marker=marker)
  3501. if limit or marker or sort_keys or sort_dirs:
  3502. # Default sort by desc(['created_at', 'id'])
  3503. sort_keys, sort_dirs = process_sort_params(sort_keys, sort_dirs,
  3504. default_dir='desc')
  3505. return sqlalchemyutils.paginate_query(query,
  3506. models.Migration,
  3507. limit=limit,
  3508. sort_keys=sort_keys,
  3509. marker=marker,
  3510. sort_dirs=sort_dirs).all()
  3511. else:
  3512. return query.all()
  3513. @require_context
  3514. @pick_context_manager_reader_allow_async
  3515. def migration_get_by_sort_filters(context, sort_keys, sort_dirs, values):
  3516. """Attempt to get a single migration based on a combination of sort
  3517. keys, directions and filter values. This is used to try to find a
  3518. marker migration when we don't have a marker uuid.
  3519. This returns just a uuid of the migration that matched.
  3520. """
  3521. model = models.Migration
  3522. return _model_get_uuid_by_sort_filters(context, model, sort_keys,
  3523. sort_dirs, values)
  3524. @pick_context_manager_writer
  3525. def migration_migrate_to_uuid(context, count):
  3526. # Avoid circular import
  3527. from nova import objects
  3528. db_migrations = model_query(context, models.Migration).filter_by(
  3529. uuid=None).limit(count).all()
  3530. done = 0
  3531. for db_migration in db_migrations:
  3532. mig = objects.Migration(context)
  3533. mig._from_db_object(context, mig, db_migration)
  3534. done += 1
  3535. # We don't have any situation where we can (detectably) not
  3536. # migrate a thing, so report anything that matched as "completed".
  3537. return done, done
  3538. ##################
  3539. @pick_context_manager_reader
  3540. def console_pool_get_all_by_host_type(context, host, console_type):
  3541. return model_query(context, models.ConsolePool, read_deleted="no").\
  3542. filter_by(host=host).\
  3543. filter_by(console_type=console_type).\
  3544. options(joinedload('consoles')).\
  3545. all()
  3546. ########################
  3547. # User-provided metadata
  3548. def _instance_metadata_get_multi(context, instance_uuids):
  3549. if not instance_uuids:
  3550. return []
  3551. return model_query(context, models.InstanceMetadata).filter(
  3552. models.InstanceMetadata.instance_uuid.in_(instance_uuids))
  3553. def _instance_metadata_get_query(context, instance_uuid):
  3554. return model_query(context, models.InstanceMetadata, read_deleted="no").\
  3555. filter_by(instance_uuid=instance_uuid)
  3556. @require_context
  3557. @pick_context_manager_reader
  3558. def instance_metadata_get(context, instance_uuid):
  3559. rows = _instance_metadata_get_query(context, instance_uuid).all()
  3560. return {row['key']: row['value'] for row in rows}
  3561. @require_context
  3562. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  3563. @pick_context_manager_writer
  3564. def instance_metadata_delete(context, instance_uuid, key):
  3565. _instance_metadata_get_query(context, instance_uuid).\
  3566. filter_by(key=key).\
  3567. soft_delete()
  3568. @require_context
  3569. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  3570. @pick_context_manager_writer
  3571. def instance_metadata_update(context, instance_uuid, metadata, delete):
  3572. all_keys = metadata.keys()
  3573. if delete:
  3574. _instance_metadata_get_query(context, instance_uuid).\
  3575. filter(~models.InstanceMetadata.key.in_(all_keys)).\
  3576. soft_delete(synchronize_session=False)
  3577. already_existing_keys = []
  3578. meta_refs = _instance_metadata_get_query(context, instance_uuid).\
  3579. filter(models.InstanceMetadata.key.in_(all_keys)).\
  3580. all()
  3581. for meta_ref in meta_refs:
  3582. already_existing_keys.append(meta_ref.key)
  3583. meta_ref.update({"value": metadata[meta_ref.key]})
  3584. new_keys = set(all_keys) - set(already_existing_keys)
  3585. for key in new_keys:
  3586. meta_ref = models.InstanceMetadata()
  3587. meta_ref.update({"key": key, "value": metadata[key],
  3588. "instance_uuid": instance_uuid})
  3589. context.session.add(meta_ref)
  3590. return metadata
  3591. #######################
  3592. # System-owned metadata
  3593. def _instance_system_metadata_get_multi(context, instance_uuids):
  3594. if not instance_uuids:
  3595. return []
  3596. return model_query(context, models.InstanceSystemMetadata,
  3597. read_deleted='yes').filter(
  3598. models.InstanceSystemMetadata.instance_uuid.in_(instance_uuids))
  3599. def _instance_system_metadata_get_query(context, instance_uuid):
  3600. return model_query(context, models.InstanceSystemMetadata).\
  3601. filter_by(instance_uuid=instance_uuid)
  3602. @require_context
  3603. @pick_context_manager_reader
  3604. def instance_system_metadata_get(context, instance_uuid):
  3605. rows = _instance_system_metadata_get_query(context, instance_uuid).all()
  3606. return {row['key']: row['value'] for row in rows}
  3607. @require_context
  3608. @pick_context_manager_writer
  3609. def instance_system_metadata_update(context, instance_uuid, metadata, delete):
  3610. all_keys = metadata.keys()
  3611. if delete:
  3612. _instance_system_metadata_get_query(context, instance_uuid).\
  3613. filter(~models.InstanceSystemMetadata.key.in_(all_keys)).\
  3614. soft_delete(synchronize_session=False)
  3615. already_existing_keys = []
  3616. meta_refs = _instance_system_metadata_get_query(context, instance_uuid).\
  3617. filter(models.InstanceSystemMetadata.key.in_(all_keys)).\
  3618. all()
  3619. for meta_ref in meta_refs:
  3620. already_existing_keys.append(meta_ref.key)
  3621. meta_ref.update({"value": metadata[meta_ref.key]})
  3622. new_keys = set(all_keys) - set(already_existing_keys)
  3623. for key in new_keys:
  3624. meta_ref = models.InstanceSystemMetadata()
  3625. meta_ref.update({"key": key, "value": metadata[key],
  3626. "instance_uuid": instance_uuid})
  3627. context.session.add(meta_ref)
  3628. return metadata
  3629. ####################
  3630. @pick_context_manager_writer
  3631. def agent_build_create(context, values):
  3632. agent_build_ref = models.AgentBuild()
  3633. agent_build_ref.update(values)
  3634. try:
  3635. agent_build_ref.save(context.session)
  3636. except db_exc.DBDuplicateEntry:
  3637. raise exception.AgentBuildExists(hypervisor=values['hypervisor'],
  3638. os=values['os'], architecture=values['architecture'])
  3639. return agent_build_ref
  3640. @pick_context_manager_reader
  3641. def agent_build_get_by_triple(context, hypervisor, os, architecture):
  3642. return model_query(context, models.AgentBuild, read_deleted="no").\
  3643. filter_by(hypervisor=hypervisor).\
  3644. filter_by(os=os).\
  3645. filter_by(architecture=architecture).\
  3646. first()
  3647. @pick_context_manager_reader
  3648. def agent_build_get_all(context, hypervisor=None):
  3649. if hypervisor:
  3650. return model_query(context, models.AgentBuild, read_deleted="no").\
  3651. filter_by(hypervisor=hypervisor).\
  3652. all()
  3653. else:
  3654. return model_query(context, models.AgentBuild, read_deleted="no").\
  3655. all()
  3656. @pick_context_manager_writer
  3657. def agent_build_destroy(context, agent_build_id):
  3658. rows_affected = model_query(context, models.AgentBuild).filter_by(
  3659. id=agent_build_id).soft_delete()
  3660. if rows_affected == 0:
  3661. raise exception.AgentBuildNotFound(id=agent_build_id)
  3662. @pick_context_manager_writer
  3663. def agent_build_update(context, agent_build_id, values):
  3664. rows_affected = model_query(context, models.AgentBuild).\
  3665. filter_by(id=agent_build_id).\
  3666. update(values)
  3667. if rows_affected == 0:
  3668. raise exception.AgentBuildNotFound(id=agent_build_id)
  3669. ####################
  3670. @require_context
  3671. @pick_context_manager_reader_allow_async
  3672. def bw_usage_get(context, uuid, start_period, mac):
  3673. values = {'start_period': start_period}
  3674. values = convert_objects_related_datetimes(values, 'start_period')
  3675. return model_query(context, models.BandwidthUsage, read_deleted="yes").\
  3676. filter_by(start_period=values['start_period']).\
  3677. filter_by(uuid=uuid).\
  3678. filter_by(mac=mac).\
  3679. first()
  3680. @require_context
  3681. @pick_context_manager_reader_allow_async
  3682. def bw_usage_get_by_uuids(context, uuids, start_period):
  3683. values = {'start_period': start_period}
  3684. values = convert_objects_related_datetimes(values, 'start_period')
  3685. return (
  3686. model_query(context, models.BandwidthUsage, read_deleted="yes").
  3687. filter(models.BandwidthUsage.uuid.in_(uuids)).
  3688. filter_by(start_period=values['start_period']).
  3689. all()
  3690. )
  3691. @require_context
  3692. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  3693. @pick_context_manager_writer
  3694. def bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out,
  3695. last_ctr_in, last_ctr_out, last_refreshed=None):
  3696. if last_refreshed is None:
  3697. last_refreshed = timeutils.utcnow()
  3698. # NOTE(comstud): More often than not, we'll be updating records vs
  3699. # creating records. Optimize accordingly, trying to update existing
  3700. # records. Fall back to creation when no rows are updated.
  3701. ts_values = {'last_refreshed': last_refreshed,
  3702. 'start_period': start_period}
  3703. ts_keys = ('start_period', 'last_refreshed')
  3704. ts_values = convert_objects_related_datetimes(ts_values, *ts_keys)
  3705. values = {'last_refreshed': ts_values['last_refreshed'],
  3706. 'last_ctr_in': last_ctr_in,
  3707. 'last_ctr_out': last_ctr_out,
  3708. 'bw_in': bw_in,
  3709. 'bw_out': bw_out}
  3710. # NOTE(pkholkin): order_by() is needed here to ensure that the
  3711. # same record is updated every time. It can be removed after adding
  3712. # unique constraint to this model.
  3713. bw_usage = model_query(context, models.BandwidthUsage,
  3714. read_deleted='yes').\
  3715. filter_by(start_period=ts_values['start_period']).\
  3716. filter_by(uuid=uuid).\
  3717. filter_by(mac=mac).\
  3718. order_by(asc(models.BandwidthUsage.id)).first()
  3719. if bw_usage:
  3720. bw_usage.update(values)
  3721. return bw_usage
  3722. bwusage = models.BandwidthUsage()
  3723. bwusage.start_period = ts_values['start_period']
  3724. bwusage.uuid = uuid
  3725. bwusage.mac = mac
  3726. bwusage.last_refreshed = ts_values['last_refreshed']
  3727. bwusage.bw_in = bw_in
  3728. bwusage.bw_out = bw_out
  3729. bwusage.last_ctr_in = last_ctr_in
  3730. bwusage.last_ctr_out = last_ctr_out
  3731. bwusage.save(context.session)
  3732. return bwusage
  3733. ####################
  3734. @require_context
  3735. @pick_context_manager_reader
  3736. def vol_get_usage_by_time(context, begin):
  3737. """Return volumes usage that have been updated after a specified time."""
  3738. return model_query(context, models.VolumeUsage, read_deleted="yes").\
  3739. filter(or_(models.VolumeUsage.tot_last_refreshed == null(),
  3740. models.VolumeUsage.tot_last_refreshed > begin,
  3741. models.VolumeUsage.curr_last_refreshed == null(),
  3742. models.VolumeUsage.curr_last_refreshed > begin,
  3743. )).all()
  3744. @require_context
  3745. @pick_context_manager_writer
  3746. def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
  3747. instance_id, project_id, user_id, availability_zone,
  3748. update_totals=False):
  3749. refreshed = timeutils.utcnow()
  3750. values = {}
  3751. # NOTE(dricco): We will be mostly updating current usage records vs
  3752. # updating total or creating records. Optimize accordingly.
  3753. if not update_totals:
  3754. values = {'curr_last_refreshed': refreshed,
  3755. 'curr_reads': rd_req,
  3756. 'curr_read_bytes': rd_bytes,
  3757. 'curr_writes': wr_req,
  3758. 'curr_write_bytes': wr_bytes,
  3759. 'instance_uuid': instance_id,
  3760. 'project_id': project_id,
  3761. 'user_id': user_id,
  3762. 'availability_zone': availability_zone}
  3763. else:
  3764. values = {'tot_last_refreshed': refreshed,
  3765. 'tot_reads': models.VolumeUsage.tot_reads + rd_req,
  3766. 'tot_read_bytes': models.VolumeUsage.tot_read_bytes +
  3767. rd_bytes,
  3768. 'tot_writes': models.VolumeUsage.tot_writes + wr_req,
  3769. 'tot_write_bytes': models.VolumeUsage.tot_write_bytes +
  3770. wr_bytes,
  3771. 'curr_reads': 0,
  3772. 'curr_read_bytes': 0,
  3773. 'curr_writes': 0,
  3774. 'curr_write_bytes': 0,
  3775. 'instance_uuid': instance_id,
  3776. 'project_id': project_id,
  3777. 'user_id': user_id,
  3778. 'availability_zone': availability_zone}
  3779. current_usage = model_query(context, models.VolumeUsage,
  3780. read_deleted="yes").\
  3781. filter_by(volume_id=id).\
  3782. first()
  3783. if current_usage:
  3784. if (rd_req < current_usage['curr_reads'] or
  3785. rd_bytes < current_usage['curr_read_bytes'] or
  3786. wr_req < current_usage['curr_writes'] or
  3787. wr_bytes < current_usage['curr_write_bytes']):
  3788. LOG.info("Volume(%s) has lower stats then what is in "
  3789. "the database. Instance must have been rebooted "
  3790. "or crashed. Updating totals.", id)
  3791. if not update_totals:
  3792. values['tot_reads'] = (models.VolumeUsage.tot_reads +
  3793. current_usage['curr_reads'])
  3794. values['tot_read_bytes'] = (
  3795. models.VolumeUsage.tot_read_bytes +
  3796. current_usage['curr_read_bytes'])
  3797. values['tot_writes'] = (models.VolumeUsage.tot_writes +
  3798. current_usage['curr_writes'])
  3799. values['tot_write_bytes'] = (
  3800. models.VolumeUsage.tot_write_bytes +
  3801. current_usage['curr_write_bytes'])
  3802. else:
  3803. values['tot_reads'] = (models.VolumeUsage.tot_reads +
  3804. current_usage['curr_reads'] +
  3805. rd_req)
  3806. values['tot_read_bytes'] = (
  3807. models.VolumeUsage.tot_read_bytes +
  3808. current_usage['curr_read_bytes'] + rd_bytes)
  3809. values['tot_writes'] = (models.VolumeUsage.tot_writes +
  3810. current_usage['curr_writes'] +
  3811. wr_req)
  3812. values['tot_write_bytes'] = (
  3813. models.VolumeUsage.tot_write_bytes +
  3814. current_usage['curr_write_bytes'] + wr_bytes)
  3815. current_usage.update(values)
  3816. current_usage.save(context.session)
  3817. context.session.refresh(current_usage)
  3818. return current_usage
  3819. vol_usage = models.VolumeUsage()
  3820. vol_usage.volume_id = id
  3821. vol_usage.instance_uuid = instance_id
  3822. vol_usage.project_id = project_id
  3823. vol_usage.user_id = user_id
  3824. vol_usage.availability_zone = availability_zone
  3825. if not update_totals:
  3826. vol_usage.curr_last_refreshed = refreshed
  3827. vol_usage.curr_reads = rd_req
  3828. vol_usage.curr_read_bytes = rd_bytes
  3829. vol_usage.curr_writes = wr_req
  3830. vol_usage.curr_write_bytes = wr_bytes
  3831. else:
  3832. vol_usage.tot_last_refreshed = refreshed
  3833. vol_usage.tot_reads = rd_req
  3834. vol_usage.tot_read_bytes = rd_bytes
  3835. vol_usage.tot_writes = wr_req
  3836. vol_usage.tot_write_bytes = wr_bytes
  3837. vol_usage.save(context.session)
  3838. return vol_usage
  3839. ####################
  3840. @pick_context_manager_reader
  3841. def s3_image_get(context, image_id):
  3842. """Find local s3 image represented by the provided id."""
  3843. result = model_query(context, models.S3Image, read_deleted="yes").\
  3844. filter_by(id=image_id).\
  3845. first()
  3846. if not result:
  3847. raise exception.ImageNotFound(image_id=image_id)
  3848. return result
  3849. @pick_context_manager_reader
  3850. def s3_image_get_by_uuid(context, image_uuid):
  3851. """Find local s3 image represented by the provided uuid."""
  3852. result = model_query(context, models.S3Image, read_deleted="yes").\
  3853. filter_by(uuid=image_uuid).\
  3854. first()
  3855. if not result:
  3856. raise exception.ImageNotFound(image_id=image_uuid)
  3857. return result
  3858. @pick_context_manager_writer
  3859. def s3_image_create(context, image_uuid):
  3860. """Create local s3 image represented by provided uuid."""
  3861. try:
  3862. s3_image_ref = models.S3Image()
  3863. s3_image_ref.update({'uuid': image_uuid})
  3864. s3_image_ref.save(context.session)
  3865. except Exception as e:
  3866. raise db_exc.DBError(e)
  3867. return s3_image_ref
  3868. ####################
  3869. @pick_context_manager_writer
  3870. def instance_fault_create(context, values):
  3871. """Create a new InstanceFault."""
  3872. fault_ref = models.InstanceFault()
  3873. fault_ref.update(values)
  3874. fault_ref.save(context.session)
  3875. return dict(fault_ref)
  3876. @pick_context_manager_reader
  3877. def instance_fault_get_by_instance_uuids(context, instance_uuids,
  3878. latest=False):
  3879. """Get all instance faults for the provided instance_uuids.
  3880. :param instance_uuids: List of UUIDs of instances to grab faults for
  3881. :param latest: Optional boolean indicating we should only return the latest
  3882. fault for the instance
  3883. """
  3884. if not instance_uuids:
  3885. return {}
  3886. faults_tbl = models.InstanceFault.__table__
  3887. # NOTE(rpodolyaka): filtering by instance_uuids is performed in both
  3888. # code branches below for the sake of a better query plan. On change,
  3889. # make sure to update the other one as well.
  3890. query = model_query(context, models.InstanceFault,
  3891. [faults_tbl],
  3892. read_deleted='no')
  3893. if latest:
  3894. # NOTE(jaypipes): We join instance_faults to a derived table of the
  3895. # latest faults per instance UUID. The SQL produced below looks like
  3896. # this:
  3897. #
  3898. # SELECT instance_faults.*
  3899. # FROM instance_faults
  3900. # JOIN (
  3901. # SELECT instance_uuid, MAX(id) AS max_id
  3902. # FROM instance_faults
  3903. # WHERE instance_uuid IN ( ... )
  3904. # AND deleted = 0
  3905. # GROUP BY instance_uuid
  3906. # ) AS latest_faults
  3907. # ON instance_faults.id = latest_faults.max_id;
  3908. latest_faults = model_query(
  3909. context, models.InstanceFault,
  3910. [faults_tbl.c.instance_uuid,
  3911. sql.func.max(faults_tbl.c.id).label('max_id')],
  3912. read_deleted='no'
  3913. ).filter(
  3914. faults_tbl.c.instance_uuid.in_(instance_uuids)
  3915. ).group_by(
  3916. faults_tbl.c.instance_uuid
  3917. ).subquery(name="latest_faults")
  3918. query = query.join(latest_faults,
  3919. faults_tbl.c.id == latest_faults.c.max_id)
  3920. else:
  3921. query = query.filter(models.InstanceFault.instance_uuid.in_(
  3922. instance_uuids)).order_by(desc("id"))
  3923. output = {}
  3924. for instance_uuid in instance_uuids:
  3925. output[instance_uuid] = []
  3926. for row in query:
  3927. output[row.instance_uuid].append(row._asdict())
  3928. return output
  3929. ##################
  3930. @pick_context_manager_writer
  3931. def action_start(context, values):
  3932. convert_objects_related_datetimes(values, 'start_time', 'updated_at')
  3933. action_ref = models.InstanceAction()
  3934. action_ref.update(values)
  3935. action_ref.save(context.session)
  3936. return action_ref
  3937. @pick_context_manager_writer
  3938. def action_finish(context, values):
  3939. convert_objects_related_datetimes(values, 'start_time', 'finish_time',
  3940. 'updated_at')
  3941. query = model_query(context, models.InstanceAction).\
  3942. filter_by(instance_uuid=values['instance_uuid']).\
  3943. filter_by(request_id=values['request_id'])
  3944. if query.update(values) != 1:
  3945. raise exception.InstanceActionNotFound(
  3946. request_id=values['request_id'],
  3947. instance_uuid=values['instance_uuid'])
  3948. return query.one()
  3949. @pick_context_manager_reader
  3950. def actions_get(context, instance_uuid, limit=None, marker=None,
  3951. filters=None):
  3952. """Get all instance actions for the provided uuid and filters."""
  3953. if limit == 0:
  3954. return []
  3955. sort_keys = ['created_at', 'id']
  3956. sort_dirs = ['desc', 'desc']
  3957. query_prefix = model_query(context, models.InstanceAction).\
  3958. filter_by(instance_uuid=instance_uuid)
  3959. model_object = models.InstanceAction
  3960. query_prefix = _get_query_nova_resource_by_changes_time(query_prefix,
  3961. filters,
  3962. model_object)
  3963. if marker is not None:
  3964. marker = action_get_by_request_id(context, instance_uuid, marker)
  3965. if not marker:
  3966. raise exception.MarkerNotFound(marker=marker)
  3967. actions = sqlalchemyutils.paginate_query(query_prefix,
  3968. models.InstanceAction, limit,
  3969. sort_keys, marker=marker,
  3970. sort_dirs=sort_dirs).all()
  3971. return actions
  3972. @pick_context_manager_reader
  3973. def action_get_by_request_id(context, instance_uuid, request_id):
  3974. """Get the action by request_id and given instance."""
  3975. action = _action_get_by_request_id(context, instance_uuid, request_id)
  3976. return action
  3977. def _action_get_by_request_id(context, instance_uuid, request_id):
  3978. result = model_query(context, models.InstanceAction).\
  3979. filter_by(instance_uuid=instance_uuid).\
  3980. filter_by(request_id=request_id).\
  3981. order_by(desc("created_at"), desc("id")).\
  3982. first()
  3983. return result
  3984. def _action_get_last_created_by_instance_uuid(context, instance_uuid):
  3985. result = (model_query(context, models.InstanceAction).
  3986. filter_by(instance_uuid=instance_uuid).
  3987. order_by(desc("created_at"), desc("id")).
  3988. first())
  3989. return result
  3990. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  3991. @pick_context_manager_writer
  3992. def action_event_start(context, values):
  3993. """Start an event on an instance action."""
  3994. convert_objects_related_datetimes(values, 'start_time')
  3995. action = _action_get_by_request_id(context, values['instance_uuid'],
  3996. values['request_id'])
  3997. # When nova-compute restarts, the context is generated again in
  3998. # init_host workflow, the request_id was different with the request_id
  3999. # recorded in InstanceAction, so we can't get the original record
  4000. # according to request_id. Try to get the last created action so that
  4001. # init_instance can continue to finish the recovery action, like:
  4002. # powering_off, unpausing, and so on.
  4003. update_action = True
  4004. if not action and not context.project_id:
  4005. action = _action_get_last_created_by_instance_uuid(
  4006. context, values['instance_uuid'])
  4007. # If we couldn't find an action by the request_id, we don't want to
  4008. # update this action since it likely represents an inactive action.
  4009. update_action = False
  4010. if not action:
  4011. raise exception.InstanceActionNotFound(
  4012. request_id=values['request_id'],
  4013. instance_uuid=values['instance_uuid'])
  4014. values['action_id'] = action['id']
  4015. event_ref = models.InstanceActionEvent()
  4016. event_ref.update(values)
  4017. context.session.add(event_ref)
  4018. # Update action updated_at.
  4019. if update_action:
  4020. action.update({'updated_at': values['start_time']})
  4021. action.save(context.session)
  4022. return event_ref
  4023. # NOTE: We need the retry_on_deadlock decorator for cases like resize where
  4024. # a lot of events are happening at once between multiple hosts trying to
  4025. # update the same action record in a small time window.
  4026. @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
  4027. @pick_context_manager_writer
  4028. def action_event_finish(context, values):
  4029. """Finish an event on an instance action."""
  4030. convert_objects_related_datetimes(values, 'start_time', 'finish_time')
  4031. action = _action_get_by_request_id(context, values['instance_uuid'],
  4032. values['request_id'])
  4033. # When nova-compute restarts, the context is generated again in
  4034. # init_host workflow, the request_id was different with the request_id
  4035. # recorded in InstanceAction, so we can't get the original record
  4036. # according to request_id. Try to get the last created action so that
  4037. # init_instance can continue to finish the recovery action, like:
  4038. # powering_off, unpausing, and so on.
  4039. update_action = True
  4040. if not action and not context.project_id:
  4041. action = _action_get_last_created_by_instance_uuid(
  4042. context, values['instance_uuid'])
  4043. # If we couldn't find an action by the request_id, we don't want to
  4044. # update this action since it likely represents an inactive action.
  4045. update_action = False
  4046. if not action:
  4047. raise exception.InstanceActionNotFound(
  4048. request_id=values['request_id'],
  4049. instance_uuid=values['instance_uuid'])
  4050. event_ref = model_query(context, models.InstanceActionEvent).\
  4051. filter_by(action_id=action['id']).\
  4052. filter_by(event=values['event']).\
  4053. first()
  4054. if not event_ref:
  4055. raise exception.InstanceActionEventNotFound(action_id=action['id'],
  4056. event=values['event'])
  4057. event_ref.update(values)
  4058. if values['result'].lower() == 'error':
  4059. action.update({'message': 'Error'})
  4060. # Update action updated_at.
  4061. if update_action:
  4062. action.update({'updated_at': values['finish_time']})
  4063. action.save(context.session)
  4064. return event_ref
  4065. @pick_context_manager_reader
  4066. def action_events_get(context, action_id):
  4067. events = model_query(context, models.InstanceActionEvent).\
  4068. filter_by(action_id=action_id).\
  4069. order_by(desc("created_at"), desc("id")).\
  4070. all()
  4071. return events
  4072. @pick_context_manager_reader
  4073. def action_event_get_by_id(context, action_id, event_id):
  4074. event = model_query(context, models.InstanceActionEvent).\
  4075. filter_by(action_id=action_id).\
  4076. filter_by(id=event_id).\
  4077. first()
  4078. return event
  4079. ##################
  4080. @require_context
  4081. @pick_context_manager_writer
  4082. def ec2_instance_create(context, instance_uuid, id=None):
  4083. """Create ec2 compatible instance by provided uuid."""
  4084. ec2_instance_ref = models.InstanceIdMapping()
  4085. ec2_instance_ref.update({'uuid': instance_uuid})
  4086. if id is not None:
  4087. ec2_instance_ref.update({'id': id})
  4088. ec2_instance_ref.save(context.session)
  4089. return ec2_instance_ref
  4090. @require_context
  4091. @pick_context_manager_reader
  4092. def ec2_instance_get_by_uuid(context, instance_uuid):
  4093. result = _ec2_instance_get_query(context).\
  4094. filter_by(uuid=instance_uuid).\
  4095. first()
  4096. if not result:
  4097. raise exception.InstanceNotFound(instance_id=instance_uuid)
  4098. return result
  4099. @require_context
  4100. @pick_context_manager_reader
  4101. def ec2_instance_get_by_id(context, instance_id):
  4102. result = _ec2_instance_get_query(context).\
  4103. filter_by(id=instance_id).\
  4104. first()
  4105. if not result:
  4106. raise exception.InstanceNotFound(instance_id=instance_id)
  4107. return result
  4108. @require_context
  4109. @pick_context_manager_reader
  4110. def get_instance_uuid_by_ec2_id(context, ec2_id):
  4111. result = ec2_instance_get_by_id(context, ec2_id)
  4112. return result['uuid']
  4113. def _ec2_instance_get_query(context):
  4114. return model_query(context, models.InstanceIdMapping, read_deleted='yes')
  4115. ##################
  4116. def _task_log_get_query(context, task_name, period_beginning,
  4117. period_ending, host=None, state=None):
  4118. values = {'period_beginning': period_beginning,
  4119. 'period_ending': period_ending}
  4120. values = convert_objects_related_datetimes(values, *values.keys())
  4121. query = model_query(context, models.TaskLog).\
  4122. filter_by(task_name=task_name).\
  4123. filter_by(period_beginning=values['period_beginning']).\
  4124. filter_by(period_ending=values['period_ending'])
  4125. if host is not None:
  4126. query = query.filter_by(host=host)
  4127. if state is not None:
  4128. query = query.filter_by(state=state)
  4129. return query
  4130. @pick_context_manager_reader
  4131. def task_log_get(context, task_name, period_beginning, period_ending, host,
  4132. state=None):
  4133. return _task_log_get_query(context, task_name, period_beginning,
  4134. period_ending, host, state).first()
  4135. @pick_context_manager_reader
  4136. def task_log_get_all(context, task_name, period_beginning, period_ending,
  4137. host=None, state=None):
  4138. return _task_log_get_query(context, task_name, period_beginning,
  4139. period_ending, host, state).all()
  4140. @pick_context_manager_writer
  4141. def task_log_begin_task(context, task_name, period_beginning, period_ending,
  4142. host, task_items=None, message=None):
  4143. values = {'period_beginning': period_beginning,
  4144. 'period_ending': period_ending}
  4145. values = convert_objects_related_datetimes(values, *values.keys())
  4146. task = models.TaskLog()
  4147. task.task_name = task_name
  4148. task.period_beginning = values['period_beginning']
  4149. task.period_ending = values['period_ending']
  4150. task.host = host
  4151. task.state = "RUNNING"
  4152. if message:
  4153. task.message = message
  4154. if task_items:
  4155. task.task_items = task_items
  4156. try:
  4157. task.save(context.session)
  4158. except db_exc.DBDuplicateEntry:
  4159. raise exception.TaskAlreadyRunning(task_name=task_name, host=host)
  4160. @pick_context_manager_writer
  4161. def task_log_end_task(context, task_name, period_beginning, period_ending,
  4162. host, errors, message=None):
  4163. values = dict(state="DONE", errors=errors)
  4164. if message:
  4165. values["message"] = message
  4166. rows = _task_log_get_query(context, task_name, period_beginning,
  4167. period_ending, host).update(values)
  4168. if rows == 0:
  4169. # It's not running!
  4170. raise exception.TaskNotRunning(task_name=task_name, host=host)
  4171. ##################
  4172. def _archive_if_instance_deleted(table, shadow_table, instances, conn,
  4173. max_rows, before):
  4174. """Look for records that pertain to deleted instances, but may not be
  4175. deleted themselves. This catches cases where we delete an instance,
  4176. but leave some residue because of a failure in a cleanup path or
  4177. similar.
  4178. Logic is: if I have a column called instance_uuid, and that instance
  4179. is deleted, then I can be deleted.
  4180. """
  4181. # NOTE(jake): handle instance_actions_events differently as it relies on
  4182. # instance_actions.id not instances.uuid
  4183. if table.name == "instance_actions_events":
  4184. instance_actions = models.BASE.metadata.tables["instance_actions"]
  4185. query_select = sql.select(
  4186. [table],
  4187. and_(instances.c.deleted != instances.c.deleted.default.arg,
  4188. instances.c.uuid == instance_actions.c.instance_uuid,
  4189. instance_actions.c.id == table.c.action_id))
  4190. else:
  4191. query_select = sql.select(
  4192. [table],
  4193. and_(instances.c.deleted != instances.c.deleted.default.arg,
  4194. instances.c.uuid == table.c.instance_uuid))
  4195. if before:
  4196. query_select = query_select.where(instances.c.deleted_at < before)
  4197. query_select = query_select.order_by(table.c.id).limit(max_rows)
  4198. query_insert = shadow_table.insert(inline=True).\
  4199. from_select([c.name for c in table.c], query_select)
  4200. delete_statement = DeleteFromSelect(table, query_select,
  4201. table.c.id)
  4202. try:
  4203. with conn.begin():
  4204. conn.execute(query_insert)
  4205. result_delete = conn.execute(delete_statement)
  4206. return result_delete.rowcount
  4207. except db_exc.DBReferenceError as ex:
  4208. LOG.warning('Failed to archive %(table)s: %(error)s',
  4209. {'table': table.name,
  4210. 'error': six.text_type(ex)})
  4211. return 0
  4212. def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
  4213. """Move up to max_rows rows from one tables to the corresponding
  4214. shadow table.
  4215. :returns: 2-item tuple:
  4216. - number of rows archived
  4217. - list of UUIDs of instances that were archived
  4218. """
  4219. conn = metadata.bind.connect()
  4220. # NOTE(tdurakov): table metadata should be received
  4221. # from models, not db tables. Default value specified by SoftDeleteMixin
  4222. # is known only by models, not DB layer.
  4223. # IMPORTANT: please do not change source of metadata information for table.
  4224. table = models.BASE.metadata.tables[tablename]
  4225. shadow_tablename = _SHADOW_TABLE_PREFIX + tablename
  4226. rows_archived = 0
  4227. deleted_instance_uuids = []
  4228. try:
  4229. shadow_table = Table(shadow_tablename, metadata, autoload=True)
  4230. except NoSuchTableError:
  4231. # No corresponding shadow table; skip it.
  4232. return rows_archived, deleted_instance_uuids
  4233. if tablename == "dns_domains":
  4234. # We have one table (dns_domains) where the key is called
  4235. # "domain" rather than "id"
  4236. column = table.c.domain
  4237. else:
  4238. column = table.c.id
  4239. # NOTE(guochbo): Use DeleteFromSelect to avoid
  4240. # database's limit of maximum parameter in one SQL statement.
  4241. deleted_column = table.c.deleted
  4242. columns = [c.name for c in table.c]
  4243. select = sql.select([column],
  4244. deleted_column != deleted_column.default.arg)
  4245. if before:
  4246. select = select.where(table.c.deleted_at < before)
  4247. select = select.order_by(column).limit(max_rows)
  4248. rows = conn.execute(select).fetchall()
  4249. records = [r[0] for r in rows]
  4250. if records:
  4251. insert = shadow_table.insert(inline=True).\
  4252. from_select(columns, sql.select([table], column.in_(records)))
  4253. delete = table.delete().where(column.in_(records))
  4254. # NOTE(tssurya): In order to facilitate the deletion of records from
  4255. # instance_mappings, request_specs and instance_group_member tables in
  4256. # the nova_api DB, the rows of deleted instances from the instances
  4257. # table are stored prior to their deletion. Basically the uuids of the
  4258. # archived instances are queried and returned.
  4259. if tablename == "instances":
  4260. query_select = sql.select([table.c.uuid], table.c.id.in_(records))
  4261. rows = conn.execute(query_select).fetchall()
  4262. deleted_instance_uuids = [r[0] for r in rows]
  4263. try:
  4264. # Group the insert and delete in a transaction.
  4265. with conn.begin():
  4266. conn.execute(insert)
  4267. result_delete = conn.execute(delete)
  4268. rows_archived = result_delete.rowcount
  4269. except db_exc.DBReferenceError as ex:
  4270. # A foreign key constraint keeps us from deleting some of
  4271. # these rows until we clean up a dependent table. Just
  4272. # skip this table for now; we'll come back to it later.
  4273. LOG.warning("IntegrityError detected when archiving table "
  4274. "%(tablename)s: %(error)s",
  4275. {'tablename': tablename, 'error': six.text_type(ex)})
  4276. # NOTE(jake): instance_actions_events doesn't have a instance_uuid column
  4277. # but still needs to be archived as it is a FK constraint
  4278. if ((max_rows is None or rows_archived < max_rows) and
  4279. ('instance_uuid' in columns or
  4280. tablename == 'instance_actions_events')):
  4281. instances = models.BASE.metadata.tables['instances']
  4282. limit = max_rows - rows_archived if max_rows is not None else None
  4283. extra = _archive_if_instance_deleted(table, shadow_table, instances,
  4284. conn, limit, before)
  4285. rows_archived += extra
  4286. return rows_archived, deleted_instance_uuids
  4287. def archive_deleted_rows(context=None, max_rows=None, before=None):
  4288. """Move up to max_rows rows from production tables to the corresponding
  4289. shadow tables.
  4290. :param context: nova.context.RequestContext for database access
  4291. :param max_rows: Maximum number of rows to archive (required)
  4292. :param before: optional datetime which when specified filters the records
  4293. to only archive those records deleted before the given date
  4294. :returns: 3-item tuple:
  4295. - dict that maps table name to number of rows archived from that table,
  4296. for example::
  4297. {
  4298. 'instances': 5,
  4299. 'block_device_mapping': 5,
  4300. 'pci_devices': 2,
  4301. }
  4302. - list of UUIDs of instances that were archived
  4303. - total number of rows that were archived
  4304. """
  4305. table_to_rows_archived = {}
  4306. deleted_instance_uuids = []
  4307. total_rows_archived = 0
  4308. meta = MetaData(get_engine(use_slave=True, context=context))
  4309. meta.reflect()
  4310. # Reverse sort the tables so we get the leaf nodes first for processing.
  4311. for table in reversed(meta.sorted_tables):
  4312. tablename = table.name
  4313. rows_archived = 0
  4314. # skip the special sqlalchemy-migrate migrate_version table and any
  4315. # shadow tables
  4316. if (tablename == 'migrate_version' or
  4317. tablename.startswith(_SHADOW_TABLE_PREFIX)):
  4318. continue
  4319. rows_archived, _deleted_instance_uuids = (
  4320. _archive_deleted_rows_for_table(
  4321. meta, tablename,
  4322. max_rows=max_rows - total_rows_archived,
  4323. before=before))
  4324. total_rows_archived += rows_archived
  4325. if tablename == 'instances':
  4326. deleted_instance_uuids = _deleted_instance_uuids
  4327. # Only report results for tables that had updates.
  4328. if rows_archived:
  4329. table_to_rows_archived[tablename] = rows_archived
  4330. if total_rows_archived >= max_rows:
  4331. break
  4332. return table_to_rows_archived, deleted_instance_uuids, total_rows_archived
  4333. def _purgeable_tables(metadata):
  4334. return [t for t in metadata.sorted_tables
  4335. if (t.name.startswith(_SHADOW_TABLE_PREFIX) and not
  4336. t.name.endswith('migrate_version'))]
  4337. def purge_shadow_tables(context, before_date, status_fn=None):
  4338. engine = get_engine(context=context)
  4339. conn = engine.connect()
  4340. metadata = MetaData()
  4341. metadata.bind = engine
  4342. metadata.reflect()
  4343. total_deleted = 0
  4344. if status_fn is None:
  4345. status_fn = lambda m: None
  4346. # Some things never get formally deleted, and thus deleted_at
  4347. # is never set. So, prefer specific timestamp columns here
  4348. # for those special cases.
  4349. overrides = {
  4350. 'shadow_instance_actions': 'created_at',
  4351. 'shadow_instance_actions_events': 'created_at',
  4352. }
  4353. for table in _purgeable_tables(metadata):
  4354. if before_date is None:
  4355. col = None
  4356. elif table.name in overrides:
  4357. col = getattr(table.c, overrides[table.name])
  4358. elif hasattr(table.c, 'deleted_at'):
  4359. col = table.c.deleted_at
  4360. elif hasattr(table.c, 'updated_at'):
  4361. col = table.c.updated_at
  4362. elif hasattr(table.c, 'created_at'):
  4363. col = table.c.created_at
  4364. else:
  4365. status_fn(_('Unable to purge table %(table)s because it '
  4366. 'has no timestamp column') % {
  4367. 'table': table.name})
  4368. continue
  4369. if col is not None:
  4370. delete = table.delete().where(col < before_date)
  4371. else:
  4372. delete = table.delete()
  4373. deleted = conn.execute(delete)
  4374. if deleted.rowcount > 0:
  4375. status_fn(_('Deleted %(rows)i rows from %(table)s based on '
  4376. 'timestamp column %(col)s') % {
  4377. 'rows': deleted.rowcount,
  4378. 'table': table.name,
  4379. 'col': col is None and '(n/a)' or col.name})
  4380. total_deleted += deleted.rowcount
  4381. return total_deleted
  4382. ####################
  4383. @pick_context_manager_reader
  4384. def pci_device_get_by_addr(context, node_id, dev_addr):
  4385. pci_dev_ref = model_query(context, models.PciDevice).\
  4386. filter_by(compute_node_id=node_id).\
  4387. filter_by(address=dev_addr).\
  4388. first()
  4389. if not pci_dev_ref:
  4390. raise exception.PciDeviceNotFound(node_id=node_id, address=dev_addr)
  4391. return pci_dev_ref
  4392. @pick_context_manager_reader
  4393. def pci_device_get_by_id(context, id):
  4394. pci_dev_ref = model_query(context, models.PciDevice).\
  4395. filter_by(id=id).\
  4396. first()
  4397. if not pci_dev_ref:
  4398. raise exception.PciDeviceNotFoundById(id=id)
  4399. return pci_dev_ref
  4400. @pick_context_manager_reader
  4401. def pci_device_get_all_by_node(context, node_id):
  4402. return model_query(context, models.PciDevice).\
  4403. filter_by(compute_node_id=node_id).\
  4404. all()
  4405. @pick_context_manager_reader
  4406. def pci_device_get_all_by_parent_addr(context, node_id, parent_addr):
  4407. return model_query(context, models.PciDevice).\
  4408. filter_by(compute_node_id=node_id).\
  4409. filter_by(parent_addr=parent_addr).\
  4410. all()
  4411. @require_context
  4412. @pick_context_manager_reader
  4413. def pci_device_get_all_by_instance_uuid(context, instance_uuid):
  4414. return model_query(context, models.PciDevice).\
  4415. filter_by(status='allocated').\
  4416. filter_by(instance_uuid=instance_uuid).\
  4417. all()
  4418. @pick_context_manager_reader
  4419. def _instance_pcidevs_get_multi(context, instance_uuids):
  4420. if not instance_uuids:
  4421. return []
  4422. return model_query(context, models.PciDevice).\
  4423. filter_by(status='allocated').\
  4424. filter(models.PciDevice.instance_uuid.in_(instance_uuids))
  4425. @pick_context_manager_writer
  4426. def pci_device_destroy(context, node_id, address):
  4427. result = model_query(context, models.PciDevice).\
  4428. filter_by(compute_node_id=node_id).\
  4429. filter_by(address=address).\
  4430. soft_delete()
  4431. if not result:
  4432. raise exception.PciDeviceNotFound(node_id=node_id, address=address)
  4433. @pick_context_manager_writer
  4434. def pci_device_update(context, node_id, address, values):
  4435. query = model_query(context, models.PciDevice, read_deleted="no").\
  4436. filter_by(compute_node_id=node_id).\
  4437. filter_by(address=address)
  4438. if query.update(values) == 0:
  4439. device = models.PciDevice()
  4440. device.update(values)
  4441. context.session.add(device)
  4442. return query.one()
  4443. ####################
  4444. @pick_context_manager_writer
  4445. def instance_tag_add(context, instance_uuid, tag):
  4446. tag_ref = models.Tag()
  4447. tag_ref.resource_id = instance_uuid
  4448. tag_ref.tag = tag
  4449. try:
  4450. _check_instance_exists_in_project(context, instance_uuid)
  4451. with get_context_manager(context).writer.savepoint.using(context):
  4452. context.session.add(tag_ref)
  4453. except db_exc.DBDuplicateEntry:
  4454. # NOTE(snikitin): We should ignore tags duplicates
  4455. pass
  4456. return tag_ref
  4457. @pick_context_manager_writer
  4458. def instance_tag_set(context, instance_uuid, tags):
  4459. _check_instance_exists_in_project(context, instance_uuid)
  4460. existing = context.session.query(models.Tag.tag).filter_by(
  4461. resource_id=instance_uuid).all()
  4462. existing = set(row.tag for row in existing)
  4463. tags = set(tags)
  4464. to_delete = existing - tags
  4465. to_add = tags - existing
  4466. if to_delete:
  4467. context.session.query(models.Tag).filter_by(
  4468. resource_id=instance_uuid).filter(
  4469. models.Tag.tag.in_(to_delete)).delete(
  4470. synchronize_session=False)
  4471. if to_add:
  4472. data = [
  4473. {'resource_id': instance_uuid, 'tag': tag} for tag in to_add]
  4474. context.session.execute(models.Tag.__table__.insert(None), data)
  4475. return context.session.query(models.Tag).filter_by(
  4476. resource_id=instance_uuid).all()
  4477. @pick_context_manager_reader
  4478. def instance_tag_get_by_instance_uuid(context, instance_uuid):
  4479. _check_instance_exists_in_project(context, instance_uuid)
  4480. return context.session.query(models.Tag).filter_by(
  4481. resource_id=instance_uuid).all()
  4482. @pick_context_manager_writer
  4483. def instance_tag_delete(context, instance_uuid, tag):
  4484. _check_instance_exists_in_project(context, instance_uuid)
  4485. result = context.session.query(models.Tag).filter_by(
  4486. resource_id=instance_uuid, tag=tag).delete()
  4487. if not result:
  4488. raise exception.InstanceTagNotFound(instance_id=instance_uuid,
  4489. tag=tag)
  4490. @pick_context_manager_writer
  4491. def instance_tag_delete_all(context, instance_uuid):
  4492. _check_instance_exists_in_project(context, instance_uuid)
  4493. context.session.query(models.Tag).filter_by(
  4494. resource_id=instance_uuid).delete()
  4495. @pick_context_manager_reader
  4496. def instance_tag_exists(context, instance_uuid, tag):
  4497. _check_instance_exists_in_project(context, instance_uuid)
  4498. q = context.session.query(models.Tag).filter_by(
  4499. resource_id=instance_uuid, tag=tag)
  4500. return context.session.query(q.exists()).scalar()
  4501. ####################
  4502. @pick_context_manager_writer
  4503. def console_auth_token_create(context, values):
  4504. instance_uuid = values.get('instance_uuid')
  4505. _check_instance_exists_in_project(context, instance_uuid)
  4506. token_ref = models.ConsoleAuthToken()
  4507. token_ref.update(values)
  4508. context.session.add(token_ref)
  4509. return token_ref
  4510. @pick_context_manager_reader
  4511. def console_auth_token_get_valid(context, token_hash, instance_uuid=None):
  4512. if instance_uuid is not None:
  4513. _check_instance_exists_in_project(context, instance_uuid)
  4514. query = context.session.query(models.ConsoleAuthToken).\
  4515. filter_by(token_hash=token_hash)
  4516. if instance_uuid is not None:
  4517. query = query.filter_by(instance_uuid=instance_uuid)
  4518. return query.filter(
  4519. models.ConsoleAuthToken.expires > timeutils.utcnow_ts()).first()
  4520. @pick_context_manager_writer
  4521. def console_auth_token_destroy_all_by_instance(context, instance_uuid):
  4522. context.session.query(models.ConsoleAuthToken).\
  4523. filter_by(instance_uuid=instance_uuid).delete()
  4524. @pick_context_manager_writer
  4525. def console_auth_token_destroy_expired(context):
  4526. context.session.query(models.ConsoleAuthToken).\
  4527. filter(models.ConsoleAuthToken.expires <= timeutils.utcnow_ts()).\
  4528. delete()
  4529. @pick_context_manager_writer
  4530. def console_auth_token_destroy_expired_by_host(context, host):
  4531. context.session.query(models.ConsoleAuthToken).\
  4532. filter_by(host=host).\
  4533. filter(models.ConsoleAuthToken.expires <= timeutils.utcnow_ts()).\
  4534. delete()